diff options
Diffstat (limited to 'tvix/glue')
-rw-r--r-- | tvix/glue/src/builtins/import.rs | 28 | ||||
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 23 |
2 files changed, 17 insertions, 34 deletions
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs index 800f8ddc17c2..639095c459e0 100644 --- a/tvix/glue/src/builtins/import.rs +++ b/tvix/glue/src/builtins/import.rs @@ -1,9 +1,7 @@ //! Implements builtins used to import paths in the store. use crate::builtins::errors::ImportError; -use futures::pin_mut; use std::path::Path; -use tvix_castore::import::leveled_entries_to_stream; use tvix_eval::{ builtin_macros::builtins, generators::{self, GenCo}, @@ -18,17 +16,15 @@ async fn filtered_ingest( path: &Path, filter: Option<&Value>, ) -> Result<tvix_castore::proto::node::Node, ErrorKind> { - // produce the leveled-key vector of DirEntry. - let mut entries_per_depths: Vec<Vec<walkdir::DirEntry>> = vec![Vec::new()]; + let mut entries: Vec<walkdir::DirEntry> = vec![]; let mut it = walkdir::WalkDir::new(path) .follow_links(false) .follow_root_links(false) .contents_first(false) - .sort_by_file_name() .into_iter(); // Skip root node. - entries_per_depths[0].push( + entries.push( it.next() .ok_or_else(|| ErrorKind::IO { path: Some(path.to_path_buf()), @@ -85,28 +81,14 @@ async fn filtered_ingest( continue; } - if entry.depth() >= entries_per_depths.len() { - debug_assert!( - entry.depth() == entries_per_depths.len(), - "Received unexpected entry with depth {} during descent, previously at {}", - entry.depth(), - entries_per_depths.len() - ); - - entries_per_depths.push(vec![entry]); - } else { - entries_per_depths[entry.depth()].push(entry); - } - - // FUTUREWORK: determine when it's the right moment to flush a level to the ingester. + entries.push(entry); } - let direntry_stream = leveled_entries_to_stream(entries_per_depths); - pin_mut!(direntry_stream); + let entries_iter = entries.into_iter().rev().map(Ok); state.tokio_handle.block_on(async { state - .ingest_entries(direntry_stream) + .ingest_dir_entries(entries_iter, path) .await .map_err(|err| ErrorKind::IO { path: Some(path.to_path_buf()), diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 10a59027852f..46575743c462 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -2,13 +2,11 @@ use async_recursion::async_recursion; use bytes::Bytes; -use futures::Stream; use futures::{StreamExt, TryStreamExt}; use nix_compat::nixhash::NixHash; use nix_compat::store_path::{build_ca_path, StorePathRef}; use nix_compat::{nixhash::CAHash, store_path::StorePath}; use sha2::{Digest, Sha256}; -use std::marker::Unpin; use std::rc::Rc; use std::{ cell::RefCell, @@ -20,6 +18,7 @@ use std::{ use tokio_util::io::SyncIoBridge; use tracing::{error, instrument, warn, Level}; use tvix_build::buildservice::BuildService; +use tvix_castore::import::dir_entry_iter_to_ingestion_stream; use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; use tvix_store::utils::AsyncIoBridge; use walkdir::DirEntry; @@ -278,17 +277,19 @@ impl TvixStoreIO { /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`], /// passing the blob_service and directory_service that's used. /// The error is mapped to std::io::Error for simplicity. - pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node> + pub(crate) async fn ingest_dir_entries<'a, I>( + &'a self, + iter: I, + root: &Path, + ) -> io::Result<Node> where - S: Stream<Item = DirEntry> + Unpin, + I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + Sync + 'a, { - tvix_castore::import::ingest_entries( - &self.blob_service, - &self.directory_service, - entries_stream, - ) - .await - .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err)) + let entries_stream = dir_entry_iter_to_ingestion_stream(&self.blob_service, iter, root); + + tvix_castore::import::ingest_entries(&self.directory_service, entries_stream) + .await + .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err)) } pub(crate) async fn node_to_path_info( |