about summary refs log tree commit diff
path: root/tvix/glue
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2024-04-18T18·51-0500
committerConnor Brewster <cbrewster@hey.com>2024-04-19T20·37+0000
commit259d7a3cfa214e7eab7b0862024d595489e92592 (patch)
treef6f52a334d5e5d26e6f3c323db0f8e56beaf56c8 /tvix/glue
parent150106610e60e95267c0968a9679797b05db7f3d (diff)
refactor(tvix/castore): generalize store ingestion streams r/7979
Previously the store ingestion code was coupled to `walkdir::DirEntry`s
produced by the `walkdir` crate which made it impossible to reuse
ingesting from other sources like tarballs or NARs.

This introduces a `IngestionEntry` which carries enough information for
store ingestion and a future for computing the Blake3 digest of files.
This allows the producer to perform file uploads in a way that makes
sense for the source, ie. the filesystem upload could concurrently
upload multiple files at the same time, while the NAR ingestor will need
to ingest the entire blob before yielding the next blob in the stream.
In the future we can buffer small blobs and upload them concurrently,
but the full blob still needs to be read from the NAR before advancing.

Change-Id: I6d144063e2ba5b05e765bac1f27d41b3c8e7b283
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11462
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/glue')
-rw-r--r--tvix/glue/src/builtins/import.rs28
-rw-r--r--tvix/glue/src/tvix_store_io.rs23
2 files changed, 17 insertions, 34 deletions
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs
index 800f8ddc17c2..639095c459e0 100644
--- a/tvix/glue/src/builtins/import.rs
+++ b/tvix/glue/src/builtins/import.rs
@@ -1,9 +1,7 @@
 //! Implements builtins used to import paths in the store.
 
 use crate::builtins::errors::ImportError;
-use futures::pin_mut;
 use std::path::Path;
-use tvix_castore::import::leveled_entries_to_stream;
 use tvix_eval::{
     builtin_macros::builtins,
     generators::{self, GenCo},
@@ -18,17 +16,15 @@ async fn filtered_ingest(
     path: &Path,
     filter: Option<&Value>,
 ) -> Result<tvix_castore::proto::node::Node, ErrorKind> {
-    // produce the leveled-key vector of DirEntry.
-    let mut entries_per_depths: Vec<Vec<walkdir::DirEntry>> = vec![Vec::new()];
+    let mut entries: Vec<walkdir::DirEntry> = vec![];
     let mut it = walkdir::WalkDir::new(path)
         .follow_links(false)
         .follow_root_links(false)
         .contents_first(false)
-        .sort_by_file_name()
         .into_iter();
 
     // Skip root node.
-    entries_per_depths[0].push(
+    entries.push(
         it.next()
             .ok_or_else(|| ErrorKind::IO {
                 path: Some(path.to_path_buf()),
@@ -85,28 +81,14 @@ async fn filtered_ingest(
             continue;
         }
 
-        if entry.depth() >= entries_per_depths.len() {
-            debug_assert!(
-                entry.depth() == entries_per_depths.len(),
-                "Received unexpected entry with depth {} during descent, previously at {}",
-                entry.depth(),
-                entries_per_depths.len()
-            );
-
-            entries_per_depths.push(vec![entry]);
-        } else {
-            entries_per_depths[entry.depth()].push(entry);
-        }
-
-        // FUTUREWORK: determine when it's the right moment to flush a level to the ingester.
+        entries.push(entry);
     }
 
-    let direntry_stream = leveled_entries_to_stream(entries_per_depths);
-    pin_mut!(direntry_stream);
+    let entries_iter = entries.into_iter().rev().map(Ok);
 
     state.tokio_handle.block_on(async {
         state
-            .ingest_entries(direntry_stream)
+            .ingest_dir_entries(entries_iter, path)
             .await
             .map_err(|err| ErrorKind::IO {
                 path: Some(path.to_path_buf()),
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 10a59027852f..46575743c462 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -2,13 +2,11 @@
 
 use async_recursion::async_recursion;
 use bytes::Bytes;
-use futures::Stream;
 use futures::{StreamExt, TryStreamExt};
 use nix_compat::nixhash::NixHash;
 use nix_compat::store_path::{build_ca_path, StorePathRef};
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
 use sha2::{Digest, Sha256};
-use std::marker::Unpin;
 use std::rc::Rc;
 use std::{
     cell::RefCell,
@@ -20,6 +18,7 @@ use std::{
 use tokio_util::io::SyncIoBridge;
 use tracing::{error, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
+use tvix_castore::import::dir_entry_iter_to_ingestion_stream;
 use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
 use tvix_store::utils::AsyncIoBridge;
 use walkdir::DirEntry;
@@ -278,17 +277,19 @@ impl TvixStoreIO {
     /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`],
     /// passing the blob_service and directory_service that's used.
     /// The error is mapped to std::io::Error for simplicity.
-    pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node>
+    pub(crate) async fn ingest_dir_entries<'a, I>(
+        &'a self,
+        iter: I,
+        root: &Path,
+    ) -> io::Result<Node>
     where
-        S: Stream<Item = DirEntry> + Unpin,
+        I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + Sync + 'a,
     {
-        tvix_castore::import::ingest_entries(
-            &self.blob_service,
-            &self.directory_service,
-            entries_stream,
-        )
-        .await
-        .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
+        let entries_stream = dir_entry_iter_to_ingestion_stream(&self.blob_service, iter, root);
+
+        tvix_castore::import::ingest_entries(&self.directory_service, entries_stream)
+            .await
+            .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
     }
 
     pub(crate) async fn node_to_path_info(