about summary refs log tree commit diff
path: root/tvix/glue/src/tvix_store_io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r--tvix/glue/src/tvix_store_io.rs23
1 files changed, 12 insertions, 11 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 10a59027852f..46575743c462 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -2,13 +2,11 @@
 
 use async_recursion::async_recursion;
 use bytes::Bytes;
-use futures::Stream;
 use futures::{StreamExt, TryStreamExt};
 use nix_compat::nixhash::NixHash;
 use nix_compat::store_path::{build_ca_path, StorePathRef};
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
 use sha2::{Digest, Sha256};
-use std::marker::Unpin;
 use std::rc::Rc;
 use std::{
     cell::RefCell,
@@ -20,6 +18,7 @@ use std::{
 use tokio_util::io::SyncIoBridge;
 use tracing::{error, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
+use tvix_castore::import::dir_entry_iter_to_ingestion_stream;
 use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
 use tvix_store::utils::AsyncIoBridge;
 use walkdir::DirEntry;
@@ -278,17 +277,19 @@ impl TvixStoreIO {
     /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`],
     /// passing the blob_service and directory_service that's used.
     /// The error is mapped to std::io::Error for simplicity.
-    pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node>
+    pub(crate) async fn ingest_dir_entries<'a, I>(
+        &'a self,
+        iter: I,
+        root: &Path,
+    ) -> io::Result<Node>
     where
-        S: Stream<Item = DirEntry> + Unpin,
+        I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + Sync + 'a,
     {
-        tvix_castore::import::ingest_entries(
-            &self.blob_service,
-            &self.directory_service,
-            entries_stream,
-        )
-        .await
-        .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
+        let entries_stream = dir_entry_iter_to_ingestion_stream(&self.blob_service, iter, root);
+
+        tvix_castore::import::ingest_entries(&self.directory_service, entries_stream)
+            .await
+            .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
     }
 
     pub(crate) async fn node_to_path_info(