about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-13T16·57+0300
committerclbot <clbot@tvl.fyi>2024-04-15T14·47+0000
commit6ebaa7b88a25636ee90f21b3a42a3674c98d00ef (patch)
tree95d927faabf4b9d2111526e8ae55e6bb1e54bd77 /tvix/castore
parentc088123d4e8c0b49cf0af568426d6d146a07f44c (diff)
refactor(tvix/castore/import): restructure directory uploader a bit r/7927
Have a Option<Box<dyn DirectoryPutter>>, which is lazily initialized
whenever we first want to upload a directory.

Have the loop explicitly break when it encounters the root_node, and
deal with the flushing after the loop.

Deal with the FUTUREWORK (assertion for root directory digest matching
what the DirectoryPutter returns).

Change-Id: Iefc4904d8b8387e868fb752d40e3e4e4218c7407
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11417
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/import.rs68
1 files changed, 51 insertions, 17 deletions
diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs
index 520ea18a5ac0..6aff7c8c296e 100644
--- a/tvix/castore/src/import.rs
+++ b/tvix/castore/src/import.rs
@@ -1,4 +1,5 @@
 use crate::blobservice::BlobService;
+use crate::directoryservice::DirectoryPutter;
 use crate::directoryservice::DirectoryService;
 use crate::proto::node::Node;
 use crate::proto::Directory;
@@ -207,10 +208,6 @@ where
     DS: AsRef<dyn DirectoryService>,
     S: Stream<Item = DirEntry> + std::marker::Unpin,
 {
-    let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
-
-    let mut directory_putter = directory_service.as_ref().put_multiple_start();
-
     #[cfg(debug_assertions)]
     let mut invariant_checker: MerkleInvariantChecker = Default::default();
 
@@ -230,14 +227,29 @@ where
         invariant_checker.see(e);
     });
 
+    // For a given path, this holds the [Directory] structs as they are populated.
+    let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
+    let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
+
     // We need to process a directory's children before processing
     // the directory itself in order to have all the data needed
     // to compute the hash.
-    while let Some(entry) = direntry_stream.next().await {
+
+    let root_node = loop {
+        let entry = match direntry_stream.next().await {
+            Some(entry) => entry,
+            None => {
+                // The last entry of the stream must have depth 0, after which
+                // we break the loop manually.
+                panic!("Tvix bug: unexpected end of stream");
+            }
+        };
         let file_type = entry.file_type();
 
         let node = if file_type.is_dir() {
-            // if the entry was a directory, use the directory_putter to upload the Directory.
+            // If the entry is a directory, we traversed all its children (and
+            // populated it in `directories`).
+            // If we don't have it in there, it's an empty directory.
             let directory = directories
                 .remove(entry.path())
                 // In that case, it contained no children
@@ -246,7 +258,13 @@ where
             let directory_size = directory.size();
             let directory_digest = directory.digest();
 
-            directory_putter.put(directory).await?;
+            // Use the directory_putter to upload the directory.
+            // If we don't have one yet (as that's the first one to upload),
+            // initialize the putter.
+            maybe_directory_putter
+                .get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
+                .put(directory)
+                .await?;
 
             Node::Directory(DirectoryNode {
                 name: entry.file_name().as_bytes().to_owned().into(),
@@ -301,13 +319,7 @@ where
         };
 
         if entry.depth() == 0 {
-            // Make sure all the directories are flushed.
-            // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal
-            // to `directories.get(entry.path())`.
-            if entry.file_type().is_dir() {
-                directory_putter.close().await?;
-            }
-            return Ok(node);
+            break node;
         } else {
             // calculate the parent path, and make sure we register the node there.
             // NOTE: entry.depth() > 0
@@ -321,7 +333,29 @@ where
                 Node::Symlink(e) => parent_directory.symlinks.push(e),
             }
         }
-    }
-    // unreachable, we already bailed out before if root doesn't exist.
-    unreachable!("Tvix bug: no root node emitted during ingestion")
+    };
+
+    // if there were directories uploaded, make sure we flush the putter, so
+    // they're all persisted to the backend.
+    if let Some(mut directory_putter) = maybe_directory_putter {
+        let root_directory_digest = directory_putter.close().await?;
+
+        #[cfg(debug_assertions)]
+        {
+            if let Node::Directory(directory_node) = &root_node {
+                debug_assert_eq!(
+                    root_directory_digest,
+                    directory_node
+                        .digest
+                        .to_vec()
+                        .try_into()
+                        .expect("invalid digest len")
+                )
+            } else {
+                unreachable!("Tvix bug: directory putter initialized but no root directory node");
+            }
+        }
+    };
+
+    Ok(root_node)
 }