about summary refs log tree commit diff
path: root/tvix/glue
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue')
-rw-r--r--tvix/glue/src/builtins/import.rs28
-rw-r--r--tvix/glue/src/tvix_store_io.rs23
2 files changed, 17 insertions, 34 deletions
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs
index 800f8ddc17c2..639095c459e0 100644
--- a/tvix/glue/src/builtins/import.rs
+++ b/tvix/glue/src/builtins/import.rs
@@ -1,9 +1,7 @@
 //! Implements builtins used to import paths in the store.
 
 use crate::builtins::errors::ImportError;
-use futures::pin_mut;
 use std::path::Path;
-use tvix_castore::import::leveled_entries_to_stream;
 use tvix_eval::{
     builtin_macros::builtins,
     generators::{self, GenCo},
@@ -18,17 +16,15 @@ async fn filtered_ingest(
     path: &Path,
     filter: Option<&Value>,
 ) -> Result<tvix_castore::proto::node::Node, ErrorKind> {
-    // produce the leveled-key vector of DirEntry.
-    let mut entries_per_depths: Vec<Vec<walkdir::DirEntry>> = vec![Vec::new()];
+    let mut entries: Vec<walkdir::DirEntry> = vec![];
     let mut it = walkdir::WalkDir::new(path)
         .follow_links(false)
         .follow_root_links(false)
         .contents_first(false)
-        .sort_by_file_name()
         .into_iter();
 
     // Skip root node.
-    entries_per_depths[0].push(
+    entries.push(
         it.next()
             .ok_or_else(|| ErrorKind::IO {
                 path: Some(path.to_path_buf()),
@@ -85,28 +81,14 @@ async fn filtered_ingest(
             continue;
         }
 
-        if entry.depth() >= entries_per_depths.len() {
-            debug_assert!(
-                entry.depth() == entries_per_depths.len(),
-                "Received unexpected entry with depth {} during descent, previously at {}",
-                entry.depth(),
-                entries_per_depths.len()
-            );
-
-            entries_per_depths.push(vec![entry]);
-        } else {
-            entries_per_depths[entry.depth()].push(entry);
-        }
-
-        // FUTUREWORK: determine when it's the right moment to flush a level to the ingester.
+        entries.push(entry);
     }
 
-    let direntry_stream = leveled_entries_to_stream(entries_per_depths);
-    pin_mut!(direntry_stream);
+    let entries_iter = entries.into_iter().rev().map(Ok);
 
     state.tokio_handle.block_on(async {
         state
-            .ingest_entries(direntry_stream)
+            .ingest_dir_entries(entries_iter, path)
             .await
             .map_err(|err| ErrorKind::IO {
                 path: Some(path.to_path_buf()),
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 10a59027852f..46575743c462 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -2,13 +2,11 @@
 
 use async_recursion::async_recursion;
 use bytes::Bytes;
-use futures::Stream;
 use futures::{StreamExt, TryStreamExt};
 use nix_compat::nixhash::NixHash;
 use nix_compat::store_path::{build_ca_path, StorePathRef};
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
 use sha2::{Digest, Sha256};
-use std::marker::Unpin;
 use std::rc::Rc;
 use std::{
     cell::RefCell,
@@ -20,6 +18,7 @@ use std::{
 use tokio_util::io::SyncIoBridge;
 use tracing::{error, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
+use tvix_castore::import::dir_entry_iter_to_ingestion_stream;
 use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
 use tvix_store::utils::AsyncIoBridge;
 use walkdir::DirEntry;
@@ -278,17 +277,19 @@ impl TvixStoreIO {
     /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`],
     /// passing the blob_service and directory_service that's used.
     /// The error is mapped to std::io::Error for simplicity.
-    pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node>
+    pub(crate) async fn ingest_dir_entries<'a, I>(
+        &'a self,
+        iter: I,
+        root: &Path,
+    ) -> io::Result<Node>
     where
-        S: Stream<Item = DirEntry> + Unpin,
+        I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + Sync + 'a,
     {
-        tvix_castore::import::ingest_entries(
-            &self.blob_service,
-            &self.directory_service,
-            entries_stream,
-        )
-        .await
-        .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
+        let entries_stream = dir_entry_iter_to_ingestion_stream(&self.blob_service, iter, root);
+
+        tvix_castore::import::ingest_entries(&self.directory_service, entries_stream)
+            .await
+            .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
     }
 
     pub(crate) async fn node_to_path_info(