about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-13T20·33+0300
committerclbot <clbot@tvl.fyi>2024-04-15T14·47+0000
commit9498ac936e8eadebfac55d5e54a6c16852953e07 (patch)
treea85518d190a641caef296c447ea662787c714662
parent6ebaa7b88a25636ee90f21b3a42a3674c98d00ef (diff)
fix(tvix/castore/directory): fix graph traversal r/7928
Use a proper graph library to ensure all nodes are reachable from the
root.

We had a bit of that handrolled during add(), as well as later, which
had an annoying bug:

Redundant nodes were omitted during insert, but when returning the list
during finalize, we did not properly account they need to be introduced
before their parents are sent.

We now simply populate a petgraph DiGraph during insert (skipping
inserting nodes we already saw), and use petgraph's DfsPostOrder to
traverse the graph during finalize.

If the number of returned indices equals the total number of nodes in
the graph, all nodes are reachable from the root, we can consume the
graph and return the nodes as a vec, in the same order as the traversal
(and insertion).

Providing a regression test for the initial bug is challenging, as the
current code uses a bunch of HashSets. I manually tested ingesting a
full NixOS closure using this mechanism (via gRPC, which exposes this
problem, as it validates twice), and it now works.

Change-Id: Ic1d5e3e981f2993cc08c5c6b60ad895e578326dc
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11418
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
-rw-r--r--tvix/Cargo.lock1
-rw-r--r--tvix/Cargo.nix5
-rw-r--r--tvix/castore/Cargo.toml1
-rw-r--r--tvix/castore/src/directoryservice/closure_validator.rs192
4 files changed, 109 insertions, 90 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index a15c71d26d..9666780c2b 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -4352,6 +4352,7 @@ dependencies = [
  "libc",
  "object_store",
  "parking_lot 0.12.1",
+ "petgraph",
  "pin-project-lite",
  "prost 0.12.3",
  "prost-build",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 48b53a1943..1145b82544 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -7768,6 +7768,7 @@ rec {
           "serde_derive" = [ "dep:serde_derive" ];
           "unstable" = [ "generate" ];
         };
+        resolvedDefaultFeatures = [ "default" "graphmap" "matrix_graph" "stable_graph" ];
       };
       "pin-project" = rec {
         crateName = "pin-project";
@@ -13815,6 +13816,10 @@ rec {
             packageId = "parking_lot 0.12.1";
           }
           {
+            name = "petgraph";
+            packageId = "petgraph";
+          }
+          {
             name = "pin-project-lite";
             packageId = "pin-project-lite";
           }
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index 447ddea1dc..b68922b7ce 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -32,6 +32,7 @@ zstd = "0.13.0"
 serde = { version = "1.0.197", features = [ "derive" ] }
 serde_with = "3.7.0"
 serde_qs = "0.12.0"
+petgraph = "0.6.4"
 
 [dependencies.bigtable_rs]
 optional = true
diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs
index 6f263157a9..461fc907bd 100644
--- a/tvix/castore/src/directoryservice/closure_validator.rs
+++ b/tvix/castore/src/directoryservice/closure_validator.rs
@@ -2,7 +2,11 @@ use std::collections::{HashMap, HashSet};
 
 use bstr::ByteSlice;
 
-use tracing::{instrument, trace};
+use petgraph::{
+    graph::{DiGraph, NodeIndex},
+    visit::Bfs,
+};
+use tracing::instrument;
 
 use crate::{
     proto::{self, Directory},
@@ -13,14 +17,16 @@ use crate::{
 /// Directories), and their insertion order.
 ///
 /// Directories need to be inserted (via `add`), in an order from the leaves to
-/// the root. During insertion, We validate as much as we can at that time:
+/// the root (DFS Post-Order).
+/// During insertion, We validate as much as we can at that time:
 ///
 ///  - individual validation of Directory messages
 ///  - validation of insertion order (no upload of not-yet-known Directories)
 ///  - validation of size fields of referred Directories
 ///
-/// Internally it keeps all received Directories (and their sizes) in a HashMap,
-/// keyed by digest.
+/// Internally it keeps all received Directories in a directed graph,
+/// with node weights being the Directories and edges pointing to child
+/// directories.
 ///
 /// Once all Directories have been inserted, a finalize function can be
 /// called to get a (deduplicated and) validated list of directories, in
@@ -29,23 +35,29 @@ use crate::{
 /// there's no disconnected components, and only one root.
 #[derive(Default)]
 pub struct ClosureValidator {
-    directories_and_sizes: HashMap<B3Digest, (Directory, u64)>,
+    // A directed graph, using Directory as node weight, without edge weights.
+    // Edges point from parents to children.
+    graph: DiGraph<Directory, ()>,
+
+    // A lookup table from directory digest to node index.
+    digest_to_node_ix: HashMap<B3Digest, NodeIndex>,
 
-    /// Keeps track of the last-inserted directory digest. Used to start the
-    /// connectivity check.
-    last_directory_digest: Option<B3Digest>,
+    /// Keeps track of the last-inserted directory graph node index.
+    /// On a correct insert, this will be the root node, from which the DFS post
+    /// order traversal will start from.
+    last_directory_ix: Option<NodeIndex>,
 }
 
 impl ClosureValidator {
     /// Insert a new Directory into the closure.
     /// Perform individual Directory validation, validation of insertion order
     /// and size fields.
-    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
+    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
     pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
         let digest = directory.digest();
 
-        if self.directories_and_sizes.contains_key(&digest) {
-            trace!("already seen, skipping");
+        // If we already saw this node previously, it's already validated and in the graph.
+        if self.digest_to_node_ix.contains_key(&digest) {
             return Ok(());
         }
 
@@ -55,35 +67,51 @@ impl ClosureValidator {
             .map_err(|e| Error::InvalidRequest(e.to_string()))?;
 
         // Ensure the directory only refers to directories which we already accepted.
+        // We lookup their node indices and add them to a HashSet.
+        let mut child_ixs = HashSet::new();
         for dir in &directory.directories {
-            let dir_dgst = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated
+            let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated
 
             // Ensure the digest has already been seen
-            let (_, recorded_dir_size) =
-                self.directories_and_sizes.get(&dir_dgst).ok_or_else(|| {
-                    Error::InvalidRequest(format!(
-                        "'{}' refers to unseen child dir: {}",
-                        dir.name.as_bstr(),
-                        dir_dgst
-                    ))
-                })?;
+            let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| {
+                Error::InvalidRequest(format!(
+                    "'{}' refers to unseen child dir: {}",
+                    dir.name.as_bstr(),
+                    &child_digest
+                ))
+            })?;
+
+            // Ensure the size specified in the child node matches the directory size itself.
+            let recorded_child_size = self
+                .graph
+                .node_weight(child_ix)
+                .expect("node not found")
+                .size();
 
             // Ensure the size specified in the child node matches our records.
-            if dir.size != *recorded_dir_size {
+            if dir.size != recorded_child_size {
                 return Err(Error::InvalidRequest(format!(
                     "'{}' has wrong size, specified {}, recorded {}",
                     dir.name.as_bstr(),
                     dir.size,
-                    recorded_dir_size
+                    recorded_child_size
                 )));
             }
+
+            child_ixs.insert(child_ix);
         }
 
-        trace!("inserting");
-        let directory_size = directory.size();
-        self.directories_and_sizes
-            .insert(digest.clone(), (directory, directory_size));
-        self.last_directory_digest = Some(digest);
+        // Insert node into the graph, and add edges to all children.
+        let node_ix = self.graph.add_node(directory);
+        for child_ix in child_ixs {
+            self.graph.add_edge(node_ix, child_ix, ());
+        }
+
+        // Record the mapping from digest to node_ix in our lookup table.
+        self.digest_to_node_ix.insert(digest, node_ix);
+
+        // Update last_directory_ix.
+        self.last_directory_ix = Some(node_ix);
 
         Ok(())
     }
@@ -93,77 +121,61 @@ impl ClosureValidator {
     /// order.
     /// In case no elements have been inserted, returns an empty list.
     #[instrument(level = "trace", skip_all, err)]
-    pub(crate) fn finalize(mut self) -> Result<Vec<Directory>, Error> {
-        if self.last_directory_digest.is_none() {
+    pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> {
+        // If no nodes were inserted, an empty list is returned.
+        let last_directory_ix = if let Some(x) = self.last_directory_ix {
+            x
+        } else {
             return Ok(vec![]);
-        }
-        let root_digest = self.last_directory_digest.unwrap();
-
-        // recursively walk all directories reachable from there,
-        // ensure we visited all nodes that were uploaded.
-        // If not, we might have received multiple disconnected graphs.
-
-        // The list of directories we know we need to visit.
-        // Once we're finished working, and (there's no errors), this in reversed order will
-        // be a valid insertion order, and directories_and_sizes will be empty.
-        let mut dirs_to_visit = Vec::with_capacity(self.directories_and_sizes.len());
-        dirs_to_visit.push(
-            self.directories_and_sizes
-                .remove(&root_digest)
-                .expect("root digest not found")
-                .0,
-        );
-        // The set of digests seen while going visiting all directories.
-        let mut seen_dir_digests = HashSet::new();
-
-        // This loop moves gradually to the end of `dirs_to_visit`, while it's being
-        // extended.
-        // The loop stops once it reaches the end.
-        let mut i = 0;
-        while let Some(directory) = dirs_to_visit.get(i).map(|d| d.to_owned()) {
-            // lookup all child directories and put them in the back of dirs_to_visit,
-            // if they're not already there.
-            for child_dir in &directory.directories {
-                let child_digest = B3Digest::try_from(child_dir.digest.to_owned()).unwrap(); // validated
-
-                // In case the digest is neither already visited nor already in dirs_to_visit,
-                // add it to the end of it.
-                // We don't need to check for the hash we're currently
-                // visiting, as we can't self-reference.
-                if !seen_dir_digests.contains(&child_digest) {
-                    dirs_to_visit.push(
-                        self.directories_and_sizes
-                            .remove(&child_digest)
-                            .expect("child digest not found")
-                            .0,
-                    );
-                    seen_dir_digests.insert(child_digest);
-                }
-            }
+        };
+
+        // do a BFS traversal of the graph, starting with the root node to get
+        // (the count of) all nodes reachable from there.
+        let mut traversal = Bfs::new(&self.graph, last_directory_ix);
 
-            i += 1;
+        let mut visited_directory_count = 0;
+        #[cfg(debug_assertions)]
+        let mut visited_directory_ixs = HashSet::new();
+        while let Some(directory_ix) = traversal.next(&self.graph) {
+            #[cfg(debug_assertions)]
+            visited_directory_ixs.insert(directory_ix);
+
+            visited_directory_count += 1;
         }
 
-        // check directories_and_sizes is empty.
-        if !self.directories_and_sizes.is_empty() {
-            if cfg!(debug_assertions) {
+        // If the number of nodes collected equals the total number of nodes in
+        // the graph, we know all nodes are connected.
+        if visited_directory_count != self.graph.node_count() {
+            // more or less exhaustive error reporting.
+            #[cfg(debug_assertions)]
+            {
+                let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect();
+
+                let unvisited_directories: HashSet<_> = all_directory_ixs
+                    .difference(&visited_directory_ixs)
+                    .map(|ix| self.graph.node_weight(*ix).expect("node not found"))
+                    .collect();
+
                 return Err(Error::InvalidRequest(format!(
-                    "found {} disconnected nodes: {:?}",
-                    self.directories_and_sizes.len(),
-                    self.directories_and_sizes
+                    "found {} disconnected directories: {:?}",
+                    self.graph.node_count() - visited_directory_ixs.len(),
+                    unvisited_directories
                 )));
-            } else {
+            }
+            #[cfg(not(debug_assertions))]
+            {
                 return Err(Error::InvalidRequest(format!(
-                    "found {} disconnected nodes",
-                    self.directories_and_sizes.len()
+                    "found {} disconnected directories",
+                    self.graph.node_count() - visited_directory_count
                 )));
             }
         }
 
-        // Reverse to have correct insertion order.
-        dirs_to_visit.reverse();
-
-        Ok(dirs_to_visit)
+        // Dissolve the graph, returning the nodes as a Vec.
+        // As the graph was populated in a valid DFS PostOrder, we can return
+        // nodes in that same order.
+        let (nodes, _edges) = self.graph.into_nodes_edges();
+        Ok(nodes.into_iter().map(|x| x.weight).collect())
     }
 }
 
@@ -237,13 +249,13 @@ mod tests {
             }
         }
 
-        // everything was uploaded successfully. Test drain().
+        // everything was uploaded successfully. Test finalize().
         let resp = dcv.finalize();
 
         match exp_finalize {
-            Some(exp_drain) => {
+            Some(directories) => {
                 assert_eq!(
-                    Vec::from_iter(exp_drain.iter().map(|e| (*e).to_owned())),
+                    Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
                     resp.expect("drain should succeed")
                 );
             }