about summary refs log tree commit diff
diff options
context:
space:
mode:
authorRyan Lahfa <tvl@lahfa.xyz>2024-01-08T08·46+0100
committerclbot <clbot@tvl.fyi>2024-01-20T18·26+0000
commit7275288f0e0a0f29c3f023a7d8a4c38157fb637b (patch)
treebb678826277acf91d8538987fa6fc93badfaa325
parent1f1a42b4da34bb2ad0cd75d6c822e2d24a19c0a2 (diff)
refactor(tvix/castore): break down `ingest_path` r/7431
In one function that does the heavy lifting: `ingest_entries`, and three additional helpers:

- `walk_path_for_ingestion` which perform the tree walking in a very naive way and can be replaced by the user
- `leveled_entries_to_stream` which transforms a list of a list of
  entries ordered by their depth in the tree to a stream of entries in
  the bottom to top order (Merkle-compatible order I will say in the
  future).
- `ingest_path` which calls the previous functions.

Change-Id: I724b972d3c5bffc033f03363255eae448f017cef
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10573
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: raitobezarius <tvl@lahfa.xyz>
-rw-r--r--tvix/castore/src/import.rs257
1 files changed, 187 insertions, 70 deletions
diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs
index 0000a83fd05a..1c99df480d14 100644
--- a/tvix/castore/src/import.rs
+++ b/tvix/castore/src/import.rs
@@ -7,6 +7,9 @@ use crate::proto::DirectoryNode;
 use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
 use crate::Error as CastoreError;
+use async_stream::stream;
+use futures::pin_mut;
+use futures::Stream;
 use std::os::unix::ffi::OsStrExt;
 use std::{
     collections::HashMap,
@@ -14,10 +17,14 @@ use std::{
     os::unix::prelude::PermissionsExt,
     path::{Path, PathBuf},
 };
+use tokio_stream::StreamExt;
 use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
 
+#[cfg(debug_assertions)]
+use std::collections::HashSet;
+
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
     #[error("failed to upload directory at {0}: {1}")]
@@ -141,34 +148,24 @@ where
     todo!("handle other types")
 }
 
-/// Ingests the contents at the given path into the tvix store,
-/// interacting with a [BlobService] and [DirectoryService].
-/// It returns the root node or an error.
+/// Walk the filesystem at a given path and returns a level-keyed list of directory entries.
 ///
-/// It does not follow symlinks at the root, they will be ingested as actual
-/// symlinks.
+/// This is how [`ingest_path`] assembles the set of entries to pass on [`ingest_entries`].
+/// This low-level function can be used if additional filtering or processing is required on the
+/// entries.
 ///
-/// It's not interacting with a PathInfoService (from tvix-store), or anything
-/// else giving it a "non-content-addressed name".
-/// It's up to the caller to possibly register it somewhere (and potentially
-/// rename it based on some naming scheme)
-#[instrument(skip(blob_service, directory_service), fields(path=?p), err)]
-pub async fn ingest_path<'a, BS, DS, P>(
-    blob_service: BS,
-    directory_service: DS,
-    p: P,
-) -> Result<Node, Error>
+/// Level here is in the context of graph theory, e.g. 2-level nodes
+/// are nodes that are at depth 2.
+///
+/// This function will walk the filesystem using `walkdir` and will consume
+/// `O(#number of entries)` space.
+#[instrument(fields(path), err)]
+pub fn walk_path_for_ingestion<P>(path: P) -> Result<Vec<Vec<DirEntry>>, Error>
 where
-    P: AsRef<Path> + Debug,
-    BS: AsRef<dyn BlobService> + Clone,
-    DS: AsRef<dyn DirectoryService>,
+    P: AsRef<Path> + std::fmt::Debug,
 {
-    let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
-
-    let mut directory_putter = directory_service.as_ref().put_multiple_start();
-
     let mut entries_per_depths: Vec<Vec<DirEntry>> = vec![Vec::new()];
-    for entry in WalkDir::new(p.as_ref())
+    for entry in WalkDir::new(path.as_ref())
         .follow_links(false)
         .follow_root_links(false)
         .contents_first(false)
@@ -178,7 +175,7 @@ where
         // Entry could be a NotFound, if the root path specified does not exist.
         let entry = entry.map_err(|e| {
             Error::UnableToOpen(
-                PathBuf::from(p.as_ref()),
+                PathBuf::from(path.as_ref()),
                 e.into_io_error().expect("walkdir err must be some"),
             )
         })?;
@@ -197,59 +194,179 @@ where
         }
     }
 
-    debug_assert!(!entries_per_depths[0].is_empty(), "No root node available!");
+    Ok(entries_per_depths)
+}
+
+/// Convert a leveled-key vector of filesystem entries into a stream of
+/// [DirEntry] in a way that honors the Merkle invariant, i.e. from bottom to top.
+pub fn leveled_entries_to_stream(
+    entries_per_depths: Vec<Vec<DirEntry>>,
+) -> impl Stream<Item = DirEntry> {
+    stream! {
+        for level in entries_per_depths.into_iter().rev() {
+            for entry in level.into_iter() {
+                yield entry;
+            }
+        }
+    }
+}
+
+/// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
+/// [DirectoryService]. It returns the root node or an error.
+///
+/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
+#[instrument(skip(blob_service, directory_service), fields(path), err)]
+pub async fn ingest_path<'a, BS, DS, P>(
+    blob_service: BS,
+    directory_service: DS,
+    path: P,
+) -> Result<Node, Error>
+where
+    P: AsRef<Path> + std::fmt::Debug,
+    BS: AsRef<dyn BlobService> + Clone,
+    DS: AsRef<dyn DirectoryService>,
+{
+    let entries = walk_path_for_ingestion(path)?;
+    let entries_stream = leveled_entries_to_stream(entries);
+    pin_mut!(entries_stream);
+
+    ingest_entries(blob_service, directory_service, entries_stream).await
+}
+
+/// The Merkle invariant checker is an internal structure to perform bookkeeping of all directory
+/// entries we are ingesting and verifying we are ingesting them in the right order.
+///
+/// That is, whenever we process an entry `L`, we would like to verify if we didn't process earlier
+/// an entry `P` such that `P` is an **ancestor** of `L`.
+///
+/// If such a thing happened, it means that we have processed something like:
+///
+///```no_trust
+///        A
+///       / \
+///      B   C
+///     / \   \
+///    G  F    P <--------- processed before this one
+///           / \                                  |
+///          D  E                                  |
+///              \                                 |
+///               L  <-----------------------------+
+/// ```
+///
+/// This is exactly what must never happen.
+///
+/// Note: this checker is local, it can only see what happens on our side, not on the remote side,
+/// i.e. the different remote services.
+#[derive(Default)]
+#[cfg(debug_assertions)]
+struct MerkleInvariantChecker {
+    seen: HashSet<PathBuf>,
+}
+
+#[cfg(debug_assertions)]
+impl MerkleInvariantChecker {
+    /// See a directory entry and remember it.
+    fn see(&mut self, node: &DirEntry) {
+        self.seen.insert(node.path().to_owned());
+    }
+
+    /// Returns a potential ancestor already seen for that directory entry.
+    fn find_ancestor(&self, node: &DirEntry) -> Option<PathBuf> {
+        for anc in node.path().ancestors() {
+            if self.seen.contains(anc) {
+                return Some(anc.to_owned());
+            }
+        }
+
+        None
+    }
+}
+
+/// Ingests elements from the given stream of [`DirEntry`] into a the passed [`BlobService`] and
+/// [`DirectoryService`].
+/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
+#[instrument(skip_all, ret, err)]
+pub async fn ingest_entries<'a, BS, DS, S>(
+    blob_service: BS,
+    directory_service: DS,
+    mut entries_async_iterator: S,
+) -> Result<Node, Error>
+where
+    BS: AsRef<dyn BlobService> + Clone,
+    DS: AsRef<dyn DirectoryService>,
+    S: Stream<Item = DirEntry> + std::marker::Unpin,
+{
+    let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
+
+    let mut directory_putter = directory_service.as_ref().put_multiple_start();
+
+    #[cfg(debug_assertions)]
+    let mut invariant_checker: MerkleInvariantChecker = Default::default();
 
     // We need to process a directory's children before processing
     // the directory itself in order to have all the data needed
     // to compute the hash.
-    for level in entries_per_depths.into_iter().rev() {
-        for entry in level.into_iter() {
-            // FUTUREWORK: inline `process_entry`
-            let node = process_entry(
-                blob_service.clone(),
-                &mut directory_putter,
-                &entry,
-                // process_entry wants an Option<Directory> in case the entry points to a directory.
-                // make sure to provide it.
-                // If the directory has contents, we already have it in
-                // `directories` because we iterate over depth in reverse order (deepest to
-                // shallowest).
-                if entry.file_type().is_dir() {
-                    Some(
-                        directories
-                            .remove(entry.path())
-                            // In that case, it contained no children
-                            .unwrap_or_default(),
-                    )
-                } else {
-                    None
-                },
-            )
-            .await?;
-
-            if entry.depth() == 0 {
-                // Make sure all the directories are flushed.
-                // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal
-                // to `directories.get(entry.path())`.
-                if entry.file_type().is_dir() {
-                    directory_putter.close().await?;
-                }
-                return Ok(node);
+    while let Some(entry) = entries_async_iterator.next().await {
+        #[cfg(debug_assertions)]
+        {
+            // If we find an ancestor before we see this entry, this means that the caller
+            // broke the contract, refer to the documentation of the invariant checker to
+            // understand the reasoning here.
+            if let Some(ancestor) = invariant_checker.find_ancestor(&entry) {
+                panic!("Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
+                    ancestor.display(),
+                    entry.path().display()
+                );
+            }
+
+            invariant_checker.see(&entry);
+        }
+
+        // FUTUREWORK: handle directory putting here, rather than piping it through process_entry.
+        let node = process_entry(
+            blob_service.clone(),
+            &mut directory_putter,
+            &entry,
+            // process_entry wants an Option<Directory> in case the entry points to a directory.
+            // make sure to provide it.
+            // If the directory has contents, we already have it in
+            // `directories` because we iterate over depth in reverse order (deepest to
+            // shallowest).
+            if entry.file_type().is_dir() {
+                Some(
+                    directories
+                        .remove(entry.path())
+                        // In that case, it contained no children
+                        .unwrap_or_default(),
+                )
             } else {
-                // calculate the parent path, and make sure we register the node there.
-                // NOTE: entry.depth() > 0
-                let parent_path = entry.path().parent().unwrap().to_path_buf();
-
-                // record node in parent directory, creating a new [proto:Directory] if not there yet.
-                let parent_directory = directories.entry(parent_path).or_default();
-                match node {
-                    Node::Directory(e) => parent_directory.directories.push(e),
-                    Node::File(e) => parent_directory.files.push(e),
-                    Node::Symlink(e) => parent_directory.symlinks.push(e),
-                }
+                None
+            },
+        )
+        .await?;
+
+        if entry.depth() == 0 {
+            // Make sure all the directories are flushed.
+            // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal
+            // to `directories.get(entry.path())`.
+            if entry.file_type().is_dir() {
+                directory_putter.close().await?;
+            }
+            return Ok(node);
+        } else {
+            // calculate the parent path, and make sure we register the node there.
+            // NOTE: entry.depth() > 0
+            let parent_path = entry.path().parent().unwrap().to_path_buf();
+
+            // record node in parent directory, creating a new [proto:Directory] if not there yet.
+            let parent_directory = directories.entry(parent_path).or_default();
+            match node {
+                Node::Directory(e) => parent_directory.directories.push(e),
+                Node::File(e) => parent_directory.files.push(e),
+                Node::Symlink(e) => parent_directory.symlinks.push(e),
             }
         }
     }
     // unreachable, we already bailed out before if root doesn't exist.
-    unreachable!()
+    unreachable!("Tvix bug: no root node emitted during ingestion")
 }