about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/Cargo.lock1
-rw-r--r--tvix/Cargo.nix4
-rw-r--r--tvix/store/Cargo.toml1
-rw-r--r--tvix/store/src/import.rs248
-rw-r--r--tvix/store/src/lib.rs1
-rw-r--r--tvix/store/src/tests/import.rs136
-rw-r--r--tvix/store/src/tests/mod.rs1
7 files changed, 392 insertions, 0 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 534e04dbeeef..cba7166ce878 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -2809,6 +2809,7 @@ dependencies = [
  "tonic-reflection",
  "tracing",
  "tracing-subscriber",
+ "walkdir",
 ]
 
 [[package]]
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 4d01c2f0de94..2c3572742705 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -8370,6 +8370,10 @@ rec {
             packageId = "tracing-subscriber";
             features = [ "json" ];
           }
+          {
+            name = "walkdir";
+            packageId = "walkdir";
+          }
         ];
         buildDependencies = [
           {
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 0a1ee9ce5c57..09a62bc79fba 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -24,6 +24,7 @@ tokio = { version = "1.23.0", features = ["rt-multi-thread"] }
 tonic = "0.8.2"
 tracing = "0.1.37"
 tracing-subscriber = { version = "0.3.16", features = ["json"] }
+walkdir = "2.3.2"
 
 [dependencies.tonic-reflection]
 optional = true
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
new file mode 100644
index 000000000000..d2abb292fcce
--- /dev/null
+++ b/tvix/store/src/import.rs
@@ -0,0 +1,248 @@
+use crate::{proto, BlobWriter};
+use std::{
+    collections::HashMap,
+    fmt::Debug,
+    fs,
+    fs::File,
+    io::BufReader,
+    os::unix::prelude::PermissionsExt,
+    path::{Path, PathBuf},
+};
+use tracing::instrument;
+use walkdir::WalkDir;
+
+use crate::{
+    blobservice::BlobService, chunkservice::ChunkService, directoryservice::DirectoryService,
+};
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("failed to upload directory at {0}: {1}")]
+    UploadDirectoryError(PathBuf, crate::Error),
+
+    #[error("invalid encoding encountered for entry {0:?}")]
+    InvalidEncoding(PathBuf),
+
+    #[error("unable to stat {0}: {1}")]
+    UnableToStat(PathBuf, std::io::Error),
+
+    #[error("unable to open {0}: {1}")]
+    UnableToOpen(PathBuf, std::io::Error),
+
+    #[error("unable to read {0}: {1}")]
+    UnableToRead(PathBuf, std::io::Error),
+}
+
+impl From<super::Error> for Error {
+    fn from(value: super::Error) -> Self {
+        match value {
+            crate::Error::InvalidRequest(_) => panic!("tvix bug"),
+            crate::Error::StorageError(_) => panic!("error"),
+        }
+    }
+}
+
+// This processes a given [walkdir::DirEntry] and returns a
+// proto::node::Node, depending on the type of the entry.
+//
+// If the entry is a file, its contents are uploaded.
+// If the entry is a directory, the Directory is uploaded as well.
+// For this to work, it relies on the caller to provide the directory object
+// with the previously returned (child) nodes.
+//
+// It assumes entries to be returned in "contents first" order, means this
+// will only be called with a directory if all children of it have been
+// visited. If the entry is indeed a directory, it'll also upload that
+// directory to the store. For this, the so-far-assembled Directory object for
+// this path needs to be passed in.
+//
+// It assumes the caller adds returned nodes to the directories it assembles.
+#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
+fn process_entry<BS: BlobService, CS: ChunkService, DS: DirectoryService>(
+    blob_service: &mut BS,
+    chunk_service: &mut CS,
+    directory_service: &mut DS,
+    entry: &walkdir::DirEntry,
+    maybe_directory: Option<proto::Directory>,
+) -> Result<proto::node::Node, Error> {
+    let file_type = entry.file_type();
+
+    let entry_path: PathBuf = entry.path().to_path_buf();
+
+    if file_type.is_dir() {
+        let directory = maybe_directory
+            .expect("tvix bug: must be called with some directory in the case of directory");
+        let directory_digest = directory.digest();
+        let directory_size = directory.size();
+
+        // upload this directory
+        directory_service
+            .put(directory)
+            .map_err(|e| Error::UploadDirectoryError(entry.path().to_path_buf(), e))?;
+
+        return Ok(proto::node::Node::Directory(proto::DirectoryNode {
+            name: entry
+                .file_name()
+                .to_str()
+                .map(|s| Ok(s.to_owned()))
+                .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
+            digest: directory_digest,
+            size: directory_size,
+        }));
+    }
+
+    if file_type.is_symlink() {
+        let target = std::fs::read_link(&entry_path)
+            .map_err(|e| Error::UnableToStat(entry_path.clone(), e))?;
+
+        return Ok(proto::node::Node::Symlink(proto::SymlinkNode {
+            name: entry
+                .file_name()
+                .to_str()
+                .map(|s| Ok(s.to_owned()))
+                .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
+            target: target
+                .to_str()
+                .map(|s| Ok(s.to_owned()))
+                .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
+        }));
+    }
+
+    if file_type.is_file() {
+        let metadata = entry
+            .metadata()
+            .map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?;
+
+        // hash the file contents, upload chunks if not there yet
+        let (blob_digest, blob_meta) = {
+            let mut blob_writer = BlobWriter::new(chunk_service);
+
+            let file = File::open(entry_path.clone())
+                .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
+
+            let mut file_reader = BufReader::new(file);
+
+            std::io::copy(&mut file_reader, &mut blob_writer)
+                .map_err(|e| Error::UnableToRead(entry_path, e))?;
+
+            // TODO: handle errors
+            blob_writer.finalize().unwrap()
+        };
+
+        // upload blobmeta if not there yet
+        if blob_service
+            .stat(&proto::StatBlobRequest {
+                digest: blob_digest.to_vec(),
+                include_chunks: false,
+                include_bao: false,
+            })?
+            .is_none()
+        {
+            // upload blobmeta
+            blob_service.put(&blob_digest, blob_meta)?;
+        }
+
+        return Ok(proto::node::Node::File(proto::FileNode {
+            name: entry
+                .file_name()
+                .to_str()
+                .map(|s| Ok(s.to_owned()))
+                .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
+            digest: blob_digest,
+            size: metadata.len() as u32,
+            // If it's executable by the user, it'll become executable.
+            // This matches nix's dump() function behaviour.
+            executable: metadata.permissions().mode() & 64 != 0,
+        }));
+    }
+    todo!("handle other types")
+}
+
+/// Imports the contents at a given Path into the tvix store.
+///
+/// It doesn't register the contents at a Path in the store itself, that's up
+/// to the PathInfoService.
+//
+// returns the root node, or an error.
+#[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))]
+pub fn import_path<
+    BS: BlobService,
+    CS: ChunkService,
+    DS: DirectoryService,
+    P: AsRef<Path> + Debug,
+>(
+    blob_service: &mut BS,
+    chunk_service: &mut CS,
+    directory_service: &mut DS,
+    p: P,
+) -> Result<proto::node::Node, Error> {
+    // Probe if the path points to a symlink. If it does, we process it manually,
+    // due to https://github.com/BurntSushi/walkdir/issues/175.
+    let symlink_metadata = fs::symlink_metadata(p.as_ref())
+        .map_err(|e| Error::UnableToStat(p.as_ref().to_path_buf(), e))?;
+    if symlink_metadata.is_symlink() {
+        let target = std::fs::read_link(p.as_ref())
+            .map_err(|e| Error::UnableToStat(p.as_ref().to_path_buf(), e))?;
+        return Ok(proto::node::Node::Symlink(proto::SymlinkNode {
+            name: p
+                .as_ref()
+                .file_name()
+                .unwrap_or_default()
+                .to_str()
+                .map(|s| Ok(s.to_owned()))
+                .unwrap_or(Err(Error::InvalidEncoding(p.as_ref().to_path_buf())))?,
+            target: target
+                .to_str()
+                .map(|s| Ok(s.to_owned()))
+                .unwrap_or(Err(Error::InvalidEncoding(p.as_ref().to_path_buf())))?,
+        }));
+    }
+
+    let mut directories: HashMap<PathBuf, proto::Directory> = HashMap::default();
+
+    // TODO: make sure we traverse in sorted order, or insert to parent_directory in sorted order at least.
+    for entry in WalkDir::new(p).follow_links(false).contents_first(true) {
+        let entry = entry.unwrap();
+
+        // process_entry wants an Option<Directory> in case the entry points to a directory.
+        // make sure to provide it.
+        let maybe_directory: Option<proto::Directory> = {
+            if entry.file_type().is_dir() {
+                Some(
+                    directories
+                        .entry(entry.path().to_path_buf())
+                        .or_default()
+                        .clone(),
+                )
+            } else {
+                None
+            }
+        };
+
+        let node = process_entry(
+            blob_service,
+            chunk_service,
+            directory_service,
+            &entry,
+            maybe_directory,
+        )?;
+
+        if entry.depth() == 0 {
+            return Ok(node);
+        } else {
+            // calculate the parent path, and make sure we register the node there.
+            // NOTE: entry.depth() > 0
+            let parent_path = entry.path().parent().unwrap().to_path_buf();
+
+            // record node in parent directory, creating a new [proto:Directory] if not there yet.
+            let parent_directory = directories.entry(parent_path).or_default();
+            match node {
+                proto::node::Node::Directory(e) => parent_directory.directories.push(e),
+                proto::node::Node::File(e) => parent_directory.files.push(e),
+                proto::node::Node::Symlink(e) => parent_directory.symlinks.push(e),
+            }
+        }
+    }
+    // unreachable, we already bailed out before if root doesn't exist.
+    panic!("tvix bug")
+}
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index c345cb9b1009..3e7852134810 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -5,6 +5,7 @@ mod errors;
 pub mod blobservice;
 pub mod chunkservice;
 pub mod directoryservice;
+pub mod import;
 pub mod nar;
 pub mod pathinfoservice;
 pub mod proto;
diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs
new file mode 100644
index 000000000000..e8373b614e91
--- /dev/null
+++ b/tvix/store/src/tests/import.rs
@@ -0,0 +1,136 @@
+use super::utils::{gen_blob_service, gen_chunk_service, gen_directory_service};
+use crate::blobservice::BlobService;
+use crate::directoryservice::DirectoryService;
+use crate::import::import_path;
+use crate::proto;
+use crate::tests::fixtures::DIRECTORY_COMPLICATED;
+use crate::tests::fixtures::*;
+use tempfile::TempDir;
+
+#[cfg(target_family = "unix")]
+#[test]
+fn symlink() {
+    let tmpdir = TempDir::new().unwrap();
+
+    let data_dir = tmpdir.path().join("data");
+    std::fs::create_dir_all(&data_dir).unwrap();
+    std::os::unix::fs::symlink("/nix/store/somewhereelse", data_dir.join("doesntmatter")).unwrap();
+
+    let root_node = import_path(
+        &mut gen_blob_service(tmpdir.path()),
+        &mut gen_chunk_service(tmpdir.path()),
+        &mut gen_directory_service(tmpdir.path()),
+        data_dir.join("doesntmatter"),
+    )
+    .expect("must succeed");
+
+    assert_eq!(
+        crate::proto::node::Node::Symlink(proto::SymlinkNode {
+            name: "doesntmatter".to_string(),
+            target: "/nix/store/somewhereelse".to_string(),
+        }),
+        root_node,
+    )
+}
+
+#[test]
+fn single_file() {
+    let tmpdir = TempDir::new().unwrap();
+
+    let data_dir = tmpdir.path().join("data");
+    std::fs::create_dir_all(&data_dir).unwrap();
+    std::fs::write(data_dir.join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap();
+
+    let mut blob_service = gen_blob_service(tmpdir.path());
+
+    let root_node = import_path(
+        &mut blob_service,
+        &mut gen_chunk_service(tmpdir.path()),
+        &mut gen_directory_service(tmpdir.path()),
+        data_dir.join("root"),
+    )
+    .expect("must succeed");
+
+    assert_eq!(
+        crate::proto::node::Node::File(proto::FileNode {
+            name: "root".to_string(),
+            digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
+            size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
+            executable: false,
+        }),
+        root_node,
+    );
+
+    // ensure the blob has been uploaded
+    assert!(blob_service
+        .stat(&proto::StatBlobRequest {
+            digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
+            include_chunks: false,
+            ..Default::default()
+        })
+        .unwrap()
+        .is_some());
+}
+
+#[test]
+fn complicated() {
+    let tmpdir = TempDir::new().unwrap();
+
+    let data_dir = tmpdir.path().join("data");
+
+    // Populate path to import
+    std::fs::create_dir_all(&data_dir).unwrap();
+    // File ``.keep`
+    std::fs::write(data_dir.join(".keep"), vec![]).unwrap();
+    // Symlink `aa`
+    std::os::unix::fs::symlink("/nix/store/somewhereelse", data_dir.join("aa")).unwrap();
+    // Directory `keep`
+    std::fs::create_dir(data_dir.join("keep")).unwrap();
+    // File ``keep/.keep`
+    std::fs::write(data_dir.join("keep").join(".keep"), vec![]).unwrap();
+
+    let mut blob_service = gen_blob_service(tmpdir.path());
+    let mut directory_service = gen_directory_service(tmpdir.path());
+
+    let root_node = import_path(
+        &mut blob_service,
+        &mut gen_chunk_service(tmpdir.path()),
+        &mut directory_service,
+        data_dir,
+    )
+    .expect("must succeed");
+
+    // ensure root_node matched expectations
+    assert_eq!(
+        crate::proto::node::Node::Directory(proto::DirectoryNode {
+            name: "data".to_string(),
+            digest: DIRECTORY_COMPLICATED.digest(),
+            size: DIRECTORY_COMPLICATED.size(),
+        }),
+        root_node,
+    );
+
+    // ensure DIRECTORY_WITH_KEEP and DIRECTORY_COMPLICATED have been uploaded
+    assert!(directory_service
+        .get(&proto::get_directory_request::ByWhat::Digest(
+            DIRECTORY_WITH_KEEP.digest()
+        ))
+        .unwrap()
+        .is_some());
+    assert!(directory_service
+        .get(&proto::get_directory_request::ByWhat::Digest(
+            DIRECTORY_COMPLICATED.digest()
+        ))
+        .unwrap()
+        .is_some());
+
+    // ensure EMPTY_BLOB_CONTENTS has been uploaded
+    assert!(blob_service
+        .stat(&proto::StatBlobRequest {
+            digest: EMPTY_BLOB_DIGEST.to_vec(),
+            include_chunks: false,
+            include_bao: false
+        })
+        .unwrap()
+        .is_some());
+}
diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs
index daea048deddf..8ceea01e3190 100644
--- a/tvix/store/src/tests/mod.rs
+++ b/tvix/store/src/tests/mod.rs
@@ -1,3 +1,4 @@
 pub mod fixtures;
+mod import;
 mod nar_renderer;
 pub mod utils;