diff options
-rw-r--r-- | tvix/castore/src/import.rs | 118 |
1 files changed, 69 insertions, 49 deletions
diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs index b0d2786e0412..0000a83fd05a 100644 --- a/tvix/castore/src/import.rs +++ b/tvix/castore/src/import.rs @@ -15,6 +15,7 @@ use std::{ path::{Path, PathBuf}, }; use tracing::instrument; +use walkdir::DirEntry; use walkdir::WalkDir; #[derive(Debug, thiserror::Error)] @@ -58,11 +59,9 @@ impl From<Error> for std::io::Error { /// For this to work, it relies on the caller to provide the directory object /// with the previously returned (child) nodes. /// -/// It assumes entries to be returned in "contents first" order, means this -/// will only be called with a directory if all children of it have been -/// visited. If the entry is indeed a directory, it'll also upload that -/// directory to the store. For this, the so-far-assembled Directory object for -/// this path needs to be passed in. +/// It assumes to be called only if all children of it have already been processed. If the entry is +/// indeed a directory, it'll also upload that directory to the store. For this, the +/// so-far-assembled Directory object for this path needs to be passed in. /// /// It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] @@ -168,14 +167,13 @@ where let mut directory_putter = directory_service.as_ref().put_multiple_start(); + let mut entries_per_depths: Vec<Vec<DirEntry>> = vec![Vec::new()]; for entry in WalkDir::new(p.as_ref()) .follow_links(false) .follow_root_links(false) - // We need to process a directory's children before processing - // the directory itself in order to have all the data needed - // to compute the hash. - .contents_first(true) + .contents_first(false) .sort_by_file_name() + .into_iter() { // Entry could be a NotFound, if the root path specified does not exist. let entry = entry.map_err(|e| { @@ -185,48 +183,70 @@ where ) })?; - // process_entry wants an Option<Directory> in case the entry points to a directory. - // make sure to provide it. - // If the directory has contents, we already have it in - // `directories` due to the use of contents_first on WalkDir. - let maybe_directory: Option<Directory> = { - if entry.file_type().is_dir() { - Some( - directories - .entry(entry.path().to_path_buf()) - .or_default() - .clone(), - ) - } else { - None - } - }; + if entry.depth() >= entries_per_depths.len() { + debug_assert!( + entry.depth() == entries_per_depths.len(), + "Received unexpected entry with depth {} during descent, previously at {}", + entry.depth(), + entries_per_depths.len() + ); - let node = process_entry( - blob_service.clone(), - &mut directory_putter, - &entry, - maybe_directory, - ) - .await?; - - if entry.depth() == 0 { - // Make sure all the directories are flushed. - if entry.file_type().is_dir() { - directory_putter.close().await?; - } - return Ok(node); + entries_per_depths.push(vec![entry]); } else { - // calculate the parent path, and make sure we register the node there. - // NOTE: entry.depth() > 0 - let parent_path = entry.path().parent().unwrap().to_path_buf(); - - // record node in parent directory, creating a new [proto:Directory] if not there yet. - let parent_directory = directories.entry(parent_path).or_default(); - match node { - Node::Directory(e) => parent_directory.directories.push(e), - Node::File(e) => parent_directory.files.push(e), - Node::Symlink(e) => parent_directory.symlinks.push(e), + entries_per_depths[entry.depth()].push(entry); + } + } + + debug_assert!(!entries_per_depths[0].is_empty(), "No root node available!"); + + // We need to process a directory's children before processing + // the directory itself in order to have all the data needed + // to compute the hash. + for level in entries_per_depths.into_iter().rev() { + for entry in level.into_iter() { + // FUTUREWORK: inline `process_entry` + let node = process_entry( + blob_service.clone(), + &mut directory_putter, + &entry, + // process_entry wants an Option<Directory> in case the entry points to a directory. + // make sure to provide it. + // If the directory has contents, we already have it in + // `directories` because we iterate over depth in reverse order (deepest to + // shallowest). + if entry.file_type().is_dir() { + Some( + directories + .remove(entry.path()) + // In that case, it contained no children + .unwrap_or_default(), + ) + } else { + None + }, + ) + .await?; + + if entry.depth() == 0 { + // Make sure all the directories are flushed. + // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal + // to `directories.get(entry.path())`. + if entry.file_type().is_dir() { + directory_putter.close().await?; + } + return Ok(node); + } else { + // calculate the parent path, and make sure we register the node there. + // NOTE: entry.depth() > 0 + let parent_path = entry.path().parent().unwrap().to_path_buf(); + + // record node in parent directory, creating a new [proto:Directory] if not there yet. + let parent_directory = directories.entry(parent_path).or_default(); + match node { + Node::Directory(e) => parent_directory.directories.push(e), + Node::File(e) => parent_directory.files.push(e), + Node::Symlink(e) => parent_directory.symlinks.push(e), + } } } } |