about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/Cargo.lock1
-rw-r--r--tvix/Cargo.nix5
-rw-r--r--tvix/castore/Cargo.toml1
-rw-r--r--tvix/castore/src/directoryservice/closure_validator.rs192
4 files changed, 109 insertions, 90 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index a15c71d26d..9666780c2b 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -4352,6 +4352,7 @@ dependencies = [
  "libc",
  "object_store",
  "parking_lot 0.12.1",
+ "petgraph",
  "pin-project-lite",
  "prost 0.12.3",
  "prost-build",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 48b53a1943..1145b82544 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -7768,6 +7768,7 @@ rec {
           "serde_derive" = [ "dep:serde_derive" ];
           "unstable" = [ "generate" ];
         };
+        resolvedDefaultFeatures = [ "default" "graphmap" "matrix_graph" "stable_graph" ];
       };
       "pin-project" = rec {
         crateName = "pin-project";
@@ -13815,6 +13816,10 @@ rec {
             packageId = "parking_lot 0.12.1";
           }
           {
+            name = "petgraph";
+            packageId = "petgraph";
+          }
+          {
             name = "pin-project-lite";
             packageId = "pin-project-lite";
           }
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index 447ddea1dc..b68922b7ce 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -32,6 +32,7 @@ zstd = "0.13.0"
 serde = { version = "1.0.197", features = [ "derive" ] }
 serde_with = "3.7.0"
 serde_qs = "0.12.0"
+petgraph = "0.6.4"
 
 [dependencies.bigtable_rs]
 optional = true
diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs
index 6f263157a9..461fc907bd 100644
--- a/tvix/castore/src/directoryservice/closure_validator.rs
+++ b/tvix/castore/src/directoryservice/closure_validator.rs
@@ -2,7 +2,11 @@ use std::collections::{HashMap, HashSet};
 
 use bstr::ByteSlice;
 
-use tracing::{instrument, trace};
+use petgraph::{
+    graph::{DiGraph, NodeIndex},
+    visit::Bfs,
+};
+use tracing::instrument;
 
 use crate::{
     proto::{self, Directory},
@@ -13,14 +17,16 @@ use crate::{
 /// Directories), and their insertion order.
 ///
 /// Directories need to be inserted (via `add`), in an order from the leaves to
-/// the root. During insertion, We validate as much as we can at that time:
+/// the root (DFS Post-Order).
+/// During insertion, We validate as much as we can at that time:
 ///
 ///  - individual validation of Directory messages
 ///  - validation of insertion order (no upload of not-yet-known Directories)
 ///  - validation of size fields of referred Directories
 ///
-/// Internally it keeps all received Directories (and their sizes) in a HashMap,
-/// keyed by digest.
+/// Internally it keeps all received Directories in a directed graph,
+/// with node weights being the Directories and edges pointing to child
+/// directories.
 ///
 /// Once all Directories have been inserted, a finalize function can be
 /// called to get a (deduplicated and) validated list of directories, in
@@ -29,23 +35,29 @@ use crate::{
 /// there's no disconnected components, and only one root.
 #[derive(Default)]
 pub struct ClosureValidator {
-    directories_and_sizes: HashMap<B3Digest, (Directory, u64)>,
+    // A directed graph, using Directory as node weight, without edge weights.
+    // Edges point from parents to children.
+    graph: DiGraph<Directory, ()>,
+
+    // A lookup table from directory digest to node index.
+    digest_to_node_ix: HashMap<B3Digest, NodeIndex>,
 
-    /// Keeps track of the last-inserted directory digest. Used to start the
-    /// connectivity check.
-    last_directory_digest: Option<B3Digest>,
+    /// Keeps track of the last-inserted directory graph node index.
+    /// On a correct insert, this will be the root node, from which the DFS post
+    /// order traversal will start from.
+    last_directory_ix: Option<NodeIndex>,
 }
 
 impl ClosureValidator {
     /// Insert a new Directory into the closure.
     /// Perform individual Directory validation, validation of insertion order
     /// and size fields.
-    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
+    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
     pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
         let digest = directory.digest();
 
-        if self.directories_and_sizes.contains_key(&digest) {
-            trace!("already seen, skipping");
+        // If we already saw this node previously, it's already validated and in the graph.
+        if self.digest_to_node_ix.contains_key(&digest) {
             return Ok(());
         }
 
@@ -55,35 +67,51 @@ impl ClosureValidator {
             .map_err(|e| Error::InvalidRequest(e.to_string()))?;
 
         // Ensure the directory only refers to directories which we already accepted.
+        // We lookup their node indices and add them to a HashSet.
+        let mut child_ixs = HashSet::new();
         for dir in &directory.directories {
-            let dir_dgst = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated
+            let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated
 
             // Ensure the digest has already been seen
-            let (_, recorded_dir_size) =
-                self.directories_and_sizes.get(&dir_dgst).ok_or_else(|| {
-                    Error::InvalidRequest(format!(
-                        "'{}' refers to unseen child dir: {}",
-                        dir.name.as_bstr(),
-                        dir_dgst
-                    ))
-                })?;
+            let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| {
+                Error::InvalidRequest(format!(
+                    "'{}' refers to unseen child dir: {}",
+                    dir.name.as_bstr(),
+                    &child_digest
+                ))
+            })?;
+
+            // Ensure the size specified in the child node matches the directory size itself.
+            let recorded_child_size = self
+                .graph
+                .node_weight(child_ix)
+                .expect("node not found")
+                .size();
 
             // Ensure the size specified in the child node matches our records.
-            if dir.size != *recorded_dir_size {
+            if dir.size != recorded_child_size {
                 return Err(Error::InvalidRequest(format!(
                     "'{}' has wrong size, specified {}, recorded {}",
                     dir.name.as_bstr(),
                     dir.size,
-                    recorded_dir_size
+                    recorded_child_size
                 )));
             }
+
+            child_ixs.insert(child_ix);
         }
 
-        trace!("inserting");
-        let directory_size = directory.size();
-        self.directories_and_sizes
-            .insert(digest.clone(), (directory, directory_size));
-        self.last_directory_digest = Some(digest);
+        // Insert node into the graph, and add edges to all children.
+        let node_ix = self.graph.add_node(directory);
+        for child_ix in child_ixs {
+            self.graph.add_edge(node_ix, child_ix, ());
+        }
+
+        // Record the mapping from digest to node_ix in our lookup table.
+        self.digest_to_node_ix.insert(digest, node_ix);
+
+        // Update last_directory_ix.
+        self.last_directory_ix = Some(node_ix);
 
         Ok(())
     }
@@ -93,77 +121,61 @@ impl ClosureValidator {
     /// order.
     /// In case no elements have been inserted, returns an empty list.
     #[instrument(level = "trace", skip_all, err)]
-    pub(crate) fn finalize(mut self) -> Result<Vec<Directory>, Error> {
-        if self.last_directory_digest.is_none() {
+    pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> {
+        // If no nodes were inserted, an empty list is returned.
+        let last_directory_ix = if let Some(x) = self.last_directory_ix {
+            x
+        } else {
             return Ok(vec![]);
-        }
-        let root_digest = self.last_directory_digest.unwrap();
-
-        // recursively walk all directories reachable from there,
-        // ensure we visited all nodes that were uploaded.
-        // If not, we might have received multiple disconnected graphs.
-
-        // The list of directories we know we need to visit.
-        // Once we're finished working, and (there's no errors), this in reversed order will
-        // be a valid insertion order, and directories_and_sizes will be empty.
-        let mut dirs_to_visit = Vec::with_capacity(self.directories_and_sizes.len());
-        dirs_to_visit.push(
-            self.directories_and_sizes
-                .remove(&root_digest)
-                .expect("root digest not found")
-                .0,
-        );
-        // The set of digests seen while going visiting all directories.
-        let mut seen_dir_digests = HashSet::new();
-
-        // This loop moves gradually to the end of `dirs_to_visit`, while it's being
-        // extended.
-        // The loop stops once it reaches the end.
-        let mut i = 0;
-        while let Some(directory) = dirs_to_visit.get(i).map(|d| d.to_owned()) {
-            // lookup all child directories and put them in the back of dirs_to_visit,
-            // if they're not already there.
-            for child_dir in &directory.directories {
-                let child_digest = B3Digest::try_from(child_dir.digest.to_owned()).unwrap(); // validated
-
-                // In case the digest is neither already visited nor already in dirs_to_visit,
-                // add it to the end of it.
-                // We don't need to check for the hash we're currently
-                // visiting, as we can't self-reference.
-                if !seen_dir_digests.contains(&child_digest) {
-                    dirs_to_visit.push(
-                        self.directories_and_sizes
-                            .remove(&child_digest)
-                            .expect("child digest not found")
-                            .0,
-                    );
-                    seen_dir_digests.insert(child_digest);
-                }
-            }
+        };
+
+        // do a BFS traversal of the graph, starting with the root node to get
+        // (the count of) all nodes reachable from there.
+        let mut traversal = Bfs::new(&self.graph, last_directory_ix);
 
-            i += 1;
+        let mut visited_directory_count = 0;
+        #[cfg(debug_assertions)]
+        let mut visited_directory_ixs = HashSet::new();
+        while let Some(directory_ix) = traversal.next(&self.graph) {
+            #[cfg(debug_assertions)]
+            visited_directory_ixs.insert(directory_ix);
+
+            visited_directory_count += 1;
         }
 
-        // check directories_and_sizes is empty.
-        if !self.directories_and_sizes.is_empty() {
-            if cfg!(debug_assertions) {
+        // If the number of nodes collected equals the total number of nodes in
+        // the graph, we know all nodes are connected.
+        if visited_directory_count != self.graph.node_count() {
+            // more or less exhaustive error reporting.
+            #[cfg(debug_assertions)]
+            {
+                let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect();
+
+                let unvisited_directories: HashSet<_> = all_directory_ixs
+                    .difference(&visited_directory_ixs)
+                    .map(|ix| self.graph.node_weight(*ix).expect("node not found"))
+                    .collect();
+
                 return Err(Error::InvalidRequest(format!(
-                    "found {} disconnected nodes: {:?}",
-                    self.directories_and_sizes.len(),
-                    self.directories_and_sizes
+                    "found {} disconnected directories: {:?}",
+                    self.graph.node_count() - visited_directory_ixs.len(),
+                    unvisited_directories
                 )));
-            } else {
+            }
+            #[cfg(not(debug_assertions))]
+            {
                 return Err(Error::InvalidRequest(format!(
-                    "found {} disconnected nodes",
-                    self.directories_and_sizes.len()
+                    "found {} disconnected directories",
+                    self.graph.node_count() - visited_directory_count
                 )));
             }
         }
 
-        // Reverse to have correct insertion order.
-        dirs_to_visit.reverse();
-
-        Ok(dirs_to_visit)
+        // Dissolve the graph, returning the nodes as a Vec.
+        // As the graph was populated in a valid DFS PostOrder, we can return
+        // nodes in that same order.
+        let (nodes, _edges) = self.graph.into_nodes_edges();
+        Ok(nodes.into_iter().map(|x| x.weight).collect())
     }
 }
 
@@ -237,13 +249,13 @@ mod tests {
             }
         }
 
-        // everything was uploaded successfully. Test drain().
+        // everything was uploaded successfully. Test finalize().
         let resp = dcv.finalize();
 
         match exp_finalize {
-            Some(exp_drain) => {
+            Some(directories) => {
                 assert_eq!(
-                    Vec::from_iter(exp_drain.iter().map(|e| (*e).to_owned())),
+                    Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
                     resp.expect("drain should succeed")
                 );
             }