diff options
-rw-r--r-- | tvix/castore/src/import.rs | 68 |
1 files changed, 51 insertions, 17 deletions
diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs index 520ea18a5ac0..6aff7c8c296e 100644 --- a/tvix/castore/src/import.rs +++ b/tvix/castore/src/import.rs @@ -1,4 +1,5 @@ use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryPutter; use crate::directoryservice::DirectoryService; use crate::proto::node::Node; use crate::proto::Directory; @@ -207,10 +208,6 @@ where DS: AsRef<dyn DirectoryService>, S: Stream<Item = DirEntry> + std::marker::Unpin, { - let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); - - let mut directory_putter = directory_service.as_ref().put_multiple_start(); - #[cfg(debug_assertions)] let mut invariant_checker: MerkleInvariantChecker = Default::default(); @@ -230,14 +227,29 @@ where invariant_checker.see(e); }); + // For a given path, this holds the [Directory] structs as they are populated. + let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); + let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None; + // We need to process a directory's children before processing // the directory itself in order to have all the data needed // to compute the hash. - while let Some(entry) = direntry_stream.next().await { + + let root_node = loop { + let entry = match direntry_stream.next().await { + Some(entry) => entry, + None => { + // The last entry of the stream must have depth 0, after which + // we break the loop manually. + panic!("Tvix bug: unexpected end of stream"); + } + }; let file_type = entry.file_type(); let node = if file_type.is_dir() { - // if the entry was a directory, use the directory_putter to upload the Directory. + // If the entry is a directory, we traversed all its children (and + // populated it in `directories`). + // If we don't have it in there, it's an empty directory. let directory = directories .remove(entry.path()) // In that case, it contained no children @@ -246,7 +258,13 @@ where let directory_size = directory.size(); let directory_digest = directory.digest(); - directory_putter.put(directory).await?; + // Use the directory_putter to upload the directory. + // If we don't have one yet (as that's the first one to upload), + // initialize the putter. + maybe_directory_putter + .get_or_insert_with(|| directory_service.as_ref().put_multiple_start()) + .put(directory) + .await?; Node::Directory(DirectoryNode { name: entry.file_name().as_bytes().to_owned().into(), @@ -301,13 +319,7 @@ where }; if entry.depth() == 0 { - // Make sure all the directories are flushed. - // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal - // to `directories.get(entry.path())`. - if entry.file_type().is_dir() { - directory_putter.close().await?; - } - return Ok(node); + break node; } else { // calculate the parent path, and make sure we register the node there. // NOTE: entry.depth() > 0 @@ -321,7 +333,29 @@ where Node::Symlink(e) => parent_directory.symlinks.push(e), } } - } - // unreachable, we already bailed out before if root doesn't exist. - unreachable!("Tvix bug: no root node emitted during ingestion") + }; + + // if there were directories uploaded, make sure we flush the putter, so + // they're all persisted to the backend. + if let Some(mut directory_putter) = maybe_directory_putter { + let root_directory_digest = directory_putter.close().await?; + + #[cfg(debug_assertions)] + { + if let Node::Directory(directory_node) = &root_node { + debug_assert_eq!( + root_directory_digest, + directory_node + .digest + .to_vec() + .try_into() + .expect("invalid digest len") + ) + } else { + unreachable!("Tvix bug: directory putter initialized but no root directory node"); + } + } + }; + + Ok(root_node) } |