diff options
Diffstat (limited to 'tvix/castore/src/directoryservice/closure_validator.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/closure_validator.rs | 192 |
1 files changed, 102 insertions, 90 deletions
diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs index 6f263157a9fb..461fc907bdad 100644 --- a/tvix/castore/src/directoryservice/closure_validator.rs +++ b/tvix/castore/src/directoryservice/closure_validator.rs @@ -2,7 +2,11 @@ use std::collections::{HashMap, HashSet}; use bstr::ByteSlice; -use tracing::{instrument, trace}; +use petgraph::{ + graph::{DiGraph, NodeIndex}, + visit::Bfs, +}; +use tracing::instrument; use crate::{ proto::{self, Directory}, @@ -13,14 +17,16 @@ use crate::{ /// Directories), and their insertion order. /// /// Directories need to be inserted (via `add`), in an order from the leaves to -/// the root. During insertion, We validate as much as we can at that time: +/// the root (DFS Post-Order). +/// During insertion, We validate as much as we can at that time: /// /// - individual validation of Directory messages /// - validation of insertion order (no upload of not-yet-known Directories) /// - validation of size fields of referred Directories /// -/// Internally it keeps all received Directories (and their sizes) in a HashMap, -/// keyed by digest. +/// Internally it keeps all received Directories in a directed graph, +/// with node weights being the Directories and edges pointing to child +/// directories. /// /// Once all Directories have been inserted, a finalize function can be /// called to get a (deduplicated and) validated list of directories, in @@ -29,23 +35,29 @@ use crate::{ /// there's no disconnected components, and only one root. #[derive(Default)] pub struct ClosureValidator { - directories_and_sizes: HashMap<B3Digest, (Directory, u64)>, + // A directed graph, using Directory as node weight, without edge weights. + // Edges point from parents to children. + graph: DiGraph<Directory, ()>, + + // A lookup table from directory digest to node index. + digest_to_node_ix: HashMap<B3Digest, NodeIndex>, - /// Keeps track of the last-inserted directory digest. Used to start the - /// connectivity check. - last_directory_digest: Option<B3Digest>, + /// Keeps track of the last-inserted directory graph node index. + /// On a correct insert, this will be the root node, from which the DFS post + /// order traversal will start from. + last_directory_ix: Option<NodeIndex>, } impl ClosureValidator { /// Insert a new Directory into the closure. /// Perform individual Directory validation, validation of insertion order /// and size fields. - #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { let digest = directory.digest(); - if self.directories_and_sizes.contains_key(&digest) { - trace!("already seen, skipping"); + // If we already saw this node previously, it's already validated and in the graph. + if self.digest_to_node_ix.contains_key(&digest) { return Ok(()); } @@ -55,35 +67,51 @@ impl ClosureValidator { .map_err(|e| Error::InvalidRequest(e.to_string()))?; // Ensure the directory only refers to directories which we already accepted. + // We lookup their node indices and add them to a HashSet. + let mut child_ixs = HashSet::new(); for dir in &directory.directories { - let dir_dgst = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated + let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated // Ensure the digest has already been seen - let (_, recorded_dir_size) = - self.directories_and_sizes.get(&dir_dgst).ok_or_else(|| { - Error::InvalidRequest(format!( - "'{}' refers to unseen child dir: {}", - dir.name.as_bstr(), - dir_dgst - )) - })?; + let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| { + Error::InvalidRequest(format!( + "'{}' refers to unseen child dir: {}", + dir.name.as_bstr(), + &child_digest + )) + })?; + + // Ensure the size specified in the child node matches the directory size itself. + let recorded_child_size = self + .graph + .node_weight(child_ix) + .expect("node not found") + .size(); // Ensure the size specified in the child node matches our records. - if dir.size != *recorded_dir_size { + if dir.size != recorded_child_size { return Err(Error::InvalidRequest(format!( "'{}' has wrong size, specified {}, recorded {}", dir.name.as_bstr(), dir.size, - recorded_dir_size + recorded_child_size ))); } + + child_ixs.insert(child_ix); } - trace!("inserting"); - let directory_size = directory.size(); - self.directories_and_sizes - .insert(digest.clone(), (directory, directory_size)); - self.last_directory_digest = Some(digest); + // Insert node into the graph, and add edges to all children. + let node_ix = self.graph.add_node(directory); + for child_ix in child_ixs { + self.graph.add_edge(node_ix, child_ix, ()); + } + + // Record the mapping from digest to node_ix in our lookup table. + self.digest_to_node_ix.insert(digest, node_ix); + + // Update last_directory_ix. + self.last_directory_ix = Some(node_ix); Ok(()) } @@ -93,77 +121,61 @@ impl ClosureValidator { /// order. /// In case no elements have been inserted, returns an empty list. #[instrument(level = "trace", skip_all, err)] - pub(crate) fn finalize(mut self) -> Result<Vec<Directory>, Error> { - if self.last_directory_digest.is_none() { + pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> { + // If no nodes were inserted, an empty list is returned. + let last_directory_ix = if let Some(x) = self.last_directory_ix { + x + } else { return Ok(vec![]); - } - let root_digest = self.last_directory_digest.unwrap(); - - // recursively walk all directories reachable from there, - // ensure we visited all nodes that were uploaded. - // If not, we might have received multiple disconnected graphs. - - // The list of directories we know we need to visit. - // Once we're finished working, and (there's no errors), this in reversed order will - // be a valid insertion order, and directories_and_sizes will be empty. - let mut dirs_to_visit = Vec::with_capacity(self.directories_and_sizes.len()); - dirs_to_visit.push( - self.directories_and_sizes - .remove(&root_digest) - .expect("root digest not found") - .0, - ); - // The set of digests seen while going visiting all directories. - let mut seen_dir_digests = HashSet::new(); - - // This loop moves gradually to the end of `dirs_to_visit`, while it's being - // extended. - // The loop stops once it reaches the end. - let mut i = 0; - while let Some(directory) = dirs_to_visit.get(i).map(|d| d.to_owned()) { - // lookup all child directories and put them in the back of dirs_to_visit, - // if they're not already there. - for child_dir in &directory.directories { - let child_digest = B3Digest::try_from(child_dir.digest.to_owned()).unwrap(); // validated - - // In case the digest is neither already visited nor already in dirs_to_visit, - // add it to the end of it. - // We don't need to check for the hash we're currently - // visiting, as we can't self-reference. - if !seen_dir_digests.contains(&child_digest) { - dirs_to_visit.push( - self.directories_and_sizes - .remove(&child_digest) - .expect("child digest not found") - .0, - ); - seen_dir_digests.insert(child_digest); - } - } + }; + + // do a BFS traversal of the graph, starting with the root node to get + // (the count of) all nodes reachable from there. + let mut traversal = Bfs::new(&self.graph, last_directory_ix); - i += 1; + let mut visited_directory_count = 0; + #[cfg(debug_assertions)] + let mut visited_directory_ixs = HashSet::new(); + while let Some(directory_ix) = traversal.next(&self.graph) { + #[cfg(debug_assertions)] + visited_directory_ixs.insert(directory_ix); + + visited_directory_count += 1; } - // check directories_and_sizes is empty. - if !self.directories_and_sizes.is_empty() { - if cfg!(debug_assertions) { + // If the number of nodes collected equals the total number of nodes in + // the graph, we know all nodes are connected. + if visited_directory_count != self.graph.node_count() { + // more or less exhaustive error reporting. + #[cfg(debug_assertions)] + { + let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect(); + + let unvisited_directories: HashSet<_> = all_directory_ixs + .difference(&visited_directory_ixs) + .map(|ix| self.graph.node_weight(*ix).expect("node not found")) + .collect(); + return Err(Error::InvalidRequest(format!( - "found {} disconnected nodes: {:?}", - self.directories_and_sizes.len(), - self.directories_and_sizes + "found {} disconnected directories: {:?}", + self.graph.node_count() - visited_directory_ixs.len(), + unvisited_directories ))); - } else { + } + #[cfg(not(debug_assertions))] + { return Err(Error::InvalidRequest(format!( - "found {} disconnected nodes", - self.directories_and_sizes.len() + "found {} disconnected directories", + self.graph.node_count() - visited_directory_count ))); } } - // Reverse to have correct insertion order. - dirs_to_visit.reverse(); - - Ok(dirs_to_visit) + // Dissolve the graph, returning the nodes as a Vec. + // As the graph was populated in a valid DFS PostOrder, we can return + // nodes in that same order. + let (nodes, _edges) = self.graph.into_nodes_edges(); + Ok(nodes.into_iter().map(|x| x.weight).collect()) } } @@ -237,13 +249,13 @@ mod tests { } } - // everything was uploaded successfully. Test drain(). + // everything was uploaded successfully. Test finalize(). let resp = dcv.finalize(); match exp_finalize { - Some(exp_drain) => { + Some(directories) => { assert_eq!( - Vec::from_iter(exp_drain.iter().map(|e| (*e).to_owned())), + Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), resp.expect("drain should succeed") ); } |