From 7275288f0e0a0f29c3f023a7d8a4c38157fb637b Mon Sep 17 00:00:00 2001 From: Ryan Lahfa Date: Mon, 8 Jan 2024 09:46:52 +0100 Subject: refactor(tvix/castore): break down `ingest_path` In one function that does the heavy lifting: `ingest_entries`, and three additional helpers: - `walk_path_for_ingestion` which perform the tree walking in a very naive way and can be replaced by the user - `leveled_entries_to_stream` which transforms a list of a list of entries ordered by their depth in the tree to a stream of entries in the bottom to top order (Merkle-compatible order I will say in the future). - `ingest_path` which calls the previous functions. Change-Id: I724b972d3c5bffc033f03363255eae448f017cef Reviewed-on: https://cl.tvl.fyi/c/depot/+/10573 Tested-by: BuildkiteCI Reviewed-by: flokli Autosubmit: raitobezarius --- tvix/castore/src/import.rs | 257 +++++++++++++++++++++++++++++++++------------ 1 file changed, 187 insertions(+), 70 deletions(-) diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs index 0000a83fd05a..1c99df480d14 100644 --- a/tvix/castore/src/import.rs +++ b/tvix/castore/src/import.rs @@ -7,6 +7,9 @@ use crate::proto::DirectoryNode; use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::Error as CastoreError; +use async_stream::stream; +use futures::pin_mut; +use futures::Stream; use std::os::unix::ffi::OsStrExt; use std::{ collections::HashMap, @@ -14,10 +17,14 @@ use std::{ os::unix::prelude::PermissionsExt, path::{Path, PathBuf}, }; +use tokio_stream::StreamExt; use tracing::instrument; use walkdir::DirEntry; use walkdir::WalkDir; +#[cfg(debug_assertions)] +use std::collections::HashSet; + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("failed to upload directory at {0}: {1}")] @@ -141,34 +148,24 @@ where todo!("handle other types") } -/// Ingests the contents at the given path into the tvix store, -/// interacting with a [BlobService] and [DirectoryService]. -/// It returns the root node or an error. +/// Walk the filesystem at a given path and returns a level-keyed list of directory entries. /// -/// It does not follow symlinks at the root, they will be ingested as actual -/// symlinks. +/// This is how [`ingest_path`] assembles the set of entries to pass on [`ingest_entries`]. +/// This low-level function can be used if additional filtering or processing is required on the +/// entries. /// -/// It's not interacting with a PathInfoService (from tvix-store), or anything -/// else giving it a "non-content-addressed name". -/// It's up to the caller to possibly register it somewhere (and potentially -/// rename it based on some naming scheme) -#[instrument(skip(blob_service, directory_service), fields(path=?p), err)] -pub async fn ingest_path<'a, BS, DS, P>( - blob_service: BS, - directory_service: DS, - p: P, -) -> Result +/// Level here is in the context of graph theory, e.g. 2-level nodes +/// are nodes that are at depth 2. +/// +/// This function will walk the filesystem using `walkdir` and will consume +/// `O(#number of entries)` space. +#[instrument(fields(path), err)] +pub fn walk_path_for_ingestion

(path: P) -> Result>, Error> where - P: AsRef + Debug, - BS: AsRef + Clone, - DS: AsRef, + P: AsRef + std::fmt::Debug, { - let mut directories: HashMap = HashMap::default(); - - let mut directory_putter = directory_service.as_ref().put_multiple_start(); - let mut entries_per_depths: Vec> = vec![Vec::new()]; - for entry in WalkDir::new(p.as_ref()) + for entry in WalkDir::new(path.as_ref()) .follow_links(false) .follow_root_links(false) .contents_first(false) @@ -178,7 +175,7 @@ where // Entry could be a NotFound, if the root path specified does not exist. let entry = entry.map_err(|e| { Error::UnableToOpen( - PathBuf::from(p.as_ref()), + PathBuf::from(path.as_ref()), e.into_io_error().expect("walkdir err must be some"), ) })?; @@ -197,59 +194,179 @@ where } } - debug_assert!(!entries_per_depths[0].is_empty(), "No root node available!"); + Ok(entries_per_depths) +} + +/// Convert a leveled-key vector of filesystem entries into a stream of +/// [DirEntry] in a way that honors the Merkle invariant, i.e. from bottom to top. +pub fn leveled_entries_to_stream( + entries_per_depths: Vec>, +) -> impl Stream { + stream! { + for level in entries_per_depths.into_iter().rev() { + for entry in level.into_iter() { + yield entry; + } + } + } +} + +/// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and +/// [DirectoryService]. It returns the root node or an error. +/// +/// It does not follow symlinks at the root, they will be ingested as actual symlinks. +#[instrument(skip(blob_service, directory_service), fields(path), err)] +pub async fn ingest_path<'a, BS, DS, P>( + blob_service: BS, + directory_service: DS, + path: P, +) -> Result +where + P: AsRef + std::fmt::Debug, + BS: AsRef + Clone, + DS: AsRef, +{ + let entries = walk_path_for_ingestion(path)?; + let entries_stream = leveled_entries_to_stream(entries); + pin_mut!(entries_stream); + + ingest_entries(blob_service, directory_service, entries_stream).await +} + +/// The Merkle invariant checker is an internal structure to perform bookkeeping of all directory +/// entries we are ingesting and verifying we are ingesting them in the right order. +/// +/// That is, whenever we process an entry `L`, we would like to verify if we didn't process earlier +/// an entry `P` such that `P` is an **ancestor** of `L`. +/// +/// If such a thing happened, it means that we have processed something like: +/// +///```no_trust +/// A +/// / \ +/// B C +/// / \ \ +/// G F P <--------- processed before this one +/// / \ | +/// D E | +/// \ | +/// L <-----------------------------+ +/// ``` +/// +/// This is exactly what must never happen. +/// +/// Note: this checker is local, it can only see what happens on our side, not on the remote side, +/// i.e. the different remote services. +#[derive(Default)] +#[cfg(debug_assertions)] +struct MerkleInvariantChecker { + seen: HashSet, +} + +#[cfg(debug_assertions)] +impl MerkleInvariantChecker { + /// See a directory entry and remember it. + fn see(&mut self, node: &DirEntry) { + self.seen.insert(node.path().to_owned()); + } + + /// Returns a potential ancestor already seen for that directory entry. + fn find_ancestor(&self, node: &DirEntry) -> Option { + for anc in node.path().ancestors() { + if self.seen.contains(anc) { + return Some(anc.to_owned()); + } + } + + None + } +} + +/// Ingests elements from the given stream of [`DirEntry`] into a the passed [`BlobService`] and +/// [`DirectoryService`]. +/// It does not follow symlinks at the root, they will be ingested as actual symlinks. +#[instrument(skip_all, ret, err)] +pub async fn ingest_entries<'a, BS, DS, S>( + blob_service: BS, + directory_service: DS, + mut entries_async_iterator: S, +) -> Result +where + BS: AsRef + Clone, + DS: AsRef, + S: Stream + std::marker::Unpin, +{ + let mut directories: HashMap = HashMap::default(); + + let mut directory_putter = directory_service.as_ref().put_multiple_start(); + + #[cfg(debug_assertions)] + let mut invariant_checker: MerkleInvariantChecker = Default::default(); // We need to process a directory's children before processing // the directory itself in order to have all the data needed // to compute the hash. - for level in entries_per_depths.into_iter().rev() { - for entry in level.into_iter() { - // FUTUREWORK: inline `process_entry` - let node = process_entry( - blob_service.clone(), - &mut directory_putter, - &entry, - // process_entry wants an Option in case the entry points to a directory. - // make sure to provide it. - // If the directory has contents, we already have it in - // `directories` because we iterate over depth in reverse order (deepest to - // shallowest). - if entry.file_type().is_dir() { - Some( - directories - .remove(entry.path()) - // In that case, it contained no children - .unwrap_or_default(), - ) - } else { - None - }, - ) - .await?; - - if entry.depth() == 0 { - // Make sure all the directories are flushed. - // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal - // to `directories.get(entry.path())`. - if entry.file_type().is_dir() { - directory_putter.close().await?; - } - return Ok(node); + while let Some(entry) = entries_async_iterator.next().await { + #[cfg(debug_assertions)] + { + // If we find an ancestor before we see this entry, this means that the caller + // broke the contract, refer to the documentation of the invariant checker to + // understand the reasoning here. + if let Some(ancestor) = invariant_checker.find_ancestor(&entry) { + panic!("Tvix bug: merkle invariant checker discovered that {} was processed before {}!", + ancestor.display(), + entry.path().display() + ); + } + + invariant_checker.see(&entry); + } + + // FUTUREWORK: handle directory putting here, rather than piping it through process_entry. + let node = process_entry( + blob_service.clone(), + &mut directory_putter, + &entry, + // process_entry wants an Option in case the entry points to a directory. + // make sure to provide it. + // If the directory has contents, we already have it in + // `directories` because we iterate over depth in reverse order (deepest to + // shallowest). + if entry.file_type().is_dir() { + Some( + directories + .remove(entry.path()) + // In that case, it contained no children + .unwrap_or_default(), + ) } else { - // calculate the parent path, and make sure we register the node there. - // NOTE: entry.depth() > 0 - let parent_path = entry.path().parent().unwrap().to_path_buf(); - - // record node in parent directory, creating a new [proto:Directory] if not there yet. - let parent_directory = directories.entry(parent_path).or_default(); - match node { - Node::Directory(e) => parent_directory.directories.push(e), - Node::File(e) => parent_directory.files.push(e), - Node::Symlink(e) => parent_directory.symlinks.push(e), - } + None + }, + ) + .await?; + + if entry.depth() == 0 { + // Make sure all the directories are flushed. + // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal + // to `directories.get(entry.path())`. + if entry.file_type().is_dir() { + directory_putter.close().await?; + } + return Ok(node); + } else { + // calculate the parent path, and make sure we register the node there. + // NOTE: entry.depth() > 0 + let parent_path = entry.path().parent().unwrap().to_path_buf(); + + // record node in parent directory, creating a new [proto:Directory] if not there yet. + let parent_directory = directories.entry(parent_path).or_default(); + match node { + Node::Directory(e) => parent_directory.directories.push(e), + Node::File(e) => parent_directory.files.push(e), + Node::Symlink(e) => parent_directory.symlinks.push(e), } } } // unreachable, we already bailed out before if root doesn't exist. - unreachable!() + unreachable!("Tvix bug: no root node emitted during ingestion") } -- cgit 1.4.1