about summary refs log tree commit diff
path: root/tvix/castore/src/import/mod.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-20T16·32+0300
committerclbot <clbot@tvl.fyi>2024-04-20T18·54+0000
commit5fc403587ffb38635e767697f1c18e7e3559aea6 (patch)
tree8779c1714af0145ca580de482a41e63821cd59f2 /tvix/castore/src/import/mod.rs
parent01239a4f6f871733231c01d6126c3ffedcc504b7 (diff)
refactor(tvix/castore): ingest filesystem entries in parallel r/7987
Rather than carrying around an Future in the IngestionEntry::Regular,
simply carry the plain B3Digest.

Code reading through a non-seekable data stream has no choice but to
read and upload blobs immediately, and code seeking through something
seekable (like a filesystem) probably knows better what concurrency to
pick when ingesting, rather than the consuming side.

(Our only) one of these seekable source implementations is now doing
exactly that. We produce a stream of futures, and then use
[StreamExt::buffered] to process more than one, concurrently.

We still keep the same order, to avoid shuffling things and violating
the stream order.

This also cleans up walk_path_for_ingestion in castore/import, as well
as ingest_dir_entries in glue/tvix_store_io.

Change-Id: I5eb70f3e1e372c74bcbfcf6b6e2653eba36e151d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11491
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/import/mod.rs')
-rw-r--r--tvix/castore/src/import/mod.rs17
1 files changed, 7 insertions, 10 deletions
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index 7e60cf44d903..09d5b8d06ea1 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -13,10 +13,9 @@ use crate::proto::DirectoryNode;
 use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
 use crate::B3Digest;
-use futures::Future;
 use futures::{Stream, StreamExt};
 use std::fs::FileType;
-use std::pin::Pin;
+
 use tracing::Level;
 
 #[cfg(target_family = "unix")]
@@ -52,10 +51,10 @@ pub mod fs;
 ///
 /// On success, returns the root node.
 #[instrument(skip_all, ret(level = Level::TRACE), err)]
-pub async fn ingest_entries<'a, DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error>
+pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error>
 where
     DS: AsRef<dyn DirectoryService>,
-    S: Stream<Item = Result<IngestionEntry<'a>, Error>> + Send + std::marker::Unpin,
+    S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin,
 {
     // For a given path, this holds the [Directory] structs as they are populated.
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
@@ -124,7 +123,7 @@ where
                 ..
             } => Node::File(FileNode {
                 name,
-                digest: digest.await?.into(),
+                digest: digest.to_owned().into(),
                 size: *size,
                 executable: *executable,
             }),
@@ -200,14 +199,12 @@ where
     Ok(digest)
 }
 
-type BlobFut<'a> = Pin<Box<dyn Future<Output = Result<B3Digest, Error>> + Send + 'a>>;
-
-pub enum IngestionEntry<'a> {
+pub enum IngestionEntry {
     Regular {
         path: PathBuf,
         size: u64,
         executable: bool,
-        digest: BlobFut<'a>,
+        digest: B3Digest,
     },
     Symlink {
         path: PathBuf,
@@ -222,7 +219,7 @@ pub enum IngestionEntry<'a> {
     },
 }
 
-impl<'a> IngestionEntry<'a> {
+impl IngestionEntry {
     fn path(&self) -> &Path {
         match self {
             IngestionEntry::Regular { path, .. } => path,