From 1f1a42b4da34bb2ad0cd75d6c822e2d24a19c0a2 Mon Sep 17 00:00:00 2001 From: Ryan Lahfa Date: Sat, 6 Jan 2024 00:41:40 +0100 Subject: feat(tvix/castore): ingestion does DFS and invert it To make use of the filtering feature, we need to revert the internal walker to a real DFS. We will therefore just invert the whole tree by storing all of its contents in a level-keyed vector. This is horribly expensive in memory, this is a compromise between CPU and memory, here is the fundamental reason for why: When you encounter a directory, it's either a leaf or not, i.e. it contains subdirectories or not. To know this fact, you can: - wait until you notice subdirectories under it, i.e. you need to store any intermediate nodes you see in the meantime -> memory penalty. - getdents or readdir on it to determine *NOW* its subdirectories -> CPU penalty and I/O penalty. This is an implementation of the first proposal, we pay memory. In practice, we are paying O(#nb of nodes) in memory. There's a smarter albeit much more complicated algorithm that pays only O(\sum_i #siblings(p_i)) nodes where (p_1, ..., p_n) is the path to a leaf. which means for: A / \ B C / / \ D E F We would never store D, E, F but only E, F at a given time. But we would still store B, C no matter what. Change-Id: I456ed1c3f0db493e018ba1182665d84bebe29c11 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10567 Tested-by: BuildkiteCI Autosubmit: raitobezarius Reviewed-by: flokli --- tvix/castore/src/import.rs | 118 ++++++++++++++++++++++++++------------------- 1 file changed, 69 insertions(+), 49 deletions(-) (limited to 'tvix/castore/src/import.rs') diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs index b0d2786e04..0000a83fd0 100644 --- a/tvix/castore/src/import.rs +++ b/tvix/castore/src/import.rs @@ -15,6 +15,7 @@ use std::{ path::{Path, PathBuf}, }; use tracing::instrument; +use walkdir::DirEntry; use walkdir::WalkDir; #[derive(Debug, thiserror::Error)] @@ -58,11 +59,9 @@ impl From for std::io::Error { /// For this to work, it relies on the caller to provide the directory object /// with the previously returned (child) nodes. /// -/// It assumes entries to be returned in "contents first" order, means this -/// will only be called with a directory if all children of it have been -/// visited. If the entry is indeed a directory, it'll also upload that -/// directory to the store. For this, the so-far-assembled Directory object for -/// this path needs to be passed in. +/// It assumes to be called only if all children of it have already been processed. If the entry is +/// indeed a directory, it'll also upload that directory to the store. For this, the +/// so-far-assembled Directory object for this path needs to be passed in. /// /// It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] @@ -168,14 +167,13 @@ where let mut directory_putter = directory_service.as_ref().put_multiple_start(); + let mut entries_per_depths: Vec> = vec![Vec::new()]; for entry in WalkDir::new(p.as_ref()) .follow_links(false) .follow_root_links(false) - // We need to process a directory's children before processing - // the directory itself in order to have all the data needed - // to compute the hash. - .contents_first(true) + .contents_first(false) .sort_by_file_name() + .into_iter() { // Entry could be a NotFound, if the root path specified does not exist. let entry = entry.map_err(|e| { @@ -185,48 +183,70 @@ where ) })?; - // process_entry wants an Option in case the entry points to a directory. - // make sure to provide it. - // If the directory has contents, we already have it in - // `directories` due to the use of contents_first on WalkDir. - let maybe_directory: Option = { - if entry.file_type().is_dir() { - Some( - directories - .entry(entry.path().to_path_buf()) - .or_default() - .clone(), - ) - } else { - None - } - }; + if entry.depth() >= entries_per_depths.len() { + debug_assert!( + entry.depth() == entries_per_depths.len(), + "Received unexpected entry with depth {} during descent, previously at {}", + entry.depth(), + entries_per_depths.len() + ); - let node = process_entry( - blob_service.clone(), - &mut directory_putter, - &entry, - maybe_directory, - ) - .await?; - - if entry.depth() == 0 { - // Make sure all the directories are flushed. - if entry.file_type().is_dir() { - directory_putter.close().await?; - } - return Ok(node); + entries_per_depths.push(vec![entry]); } else { - // calculate the parent path, and make sure we register the node there. - // NOTE: entry.depth() > 0 - let parent_path = entry.path().parent().unwrap().to_path_buf(); - - // record node in parent directory, creating a new [proto:Directory] if not there yet. - let parent_directory = directories.entry(parent_path).or_default(); - match node { - Node::Directory(e) => parent_directory.directories.push(e), - Node::File(e) => parent_directory.files.push(e), - Node::Symlink(e) => parent_directory.symlinks.push(e), + entries_per_depths[entry.depth()].push(entry); + } + } + + debug_assert!(!entries_per_depths[0].is_empty(), "No root node available!"); + + // We need to process a directory's children before processing + // the directory itself in order to have all the data needed + // to compute the hash. + for level in entries_per_depths.into_iter().rev() { + for entry in level.into_iter() { + // FUTUREWORK: inline `process_entry` + let node = process_entry( + blob_service.clone(), + &mut directory_putter, + &entry, + // process_entry wants an Option in case the entry points to a directory. + // make sure to provide it. + // If the directory has contents, we already have it in + // `directories` because we iterate over depth in reverse order (deepest to + // shallowest). + if entry.file_type().is_dir() { + Some( + directories + .remove(entry.path()) + // In that case, it contained no children + .unwrap_or_default(), + ) + } else { + None + }, + ) + .await?; + + if entry.depth() == 0 { + // Make sure all the directories are flushed. + // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal + // to `directories.get(entry.path())`. + if entry.file_type().is_dir() { + directory_putter.close().await?; + } + return Ok(node); + } else { + // calculate the parent path, and make sure we register the node there. + // NOTE: entry.depth() > 0 + let parent_path = entry.path().parent().unwrap().to_path_buf(); + + // record node in parent directory, creating a new [proto:Directory] if not there yet. + let parent_directory = directories.entry(parent_path).or_default(); + match node { + Node::Directory(e) => parent_directory.directories.push(e), + Node::File(e) => parent_directory.files.push(e), + Node::Symlink(e) => parent_directory.symlinks.push(e), + } } } } -- cgit 1.4.1