diff options
Diffstat (limited to 'tvix/castore/src/directoryservice/closure_validator.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/closure_validator.rs | 309 |
1 files changed, 0 insertions, 309 deletions
diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs deleted file mode 100644 index b9746a5a0501..000000000000 --- a/tvix/castore/src/directoryservice/closure_validator.rs +++ /dev/null @@ -1,309 +0,0 @@ -use std::collections::{HashMap, HashSet}; - -use bstr::ByteSlice; - -use petgraph::{ - graph::{DiGraph, NodeIndex}, - visit::{Bfs, Walker}, -}; -use tracing::instrument; - -use crate::{ - proto::{self, Directory}, - B3Digest, Error, -}; - -type DirectoryGraph = DiGraph<Directory, ()>; - -/// This can be used to validate a Directory closure (DAG of connected -/// Directories), and their insertion order. -/// -/// Directories need to be inserted (via `add`), in an order from the leaves to -/// the root (DFS Post-Order). -/// During insertion, We validate as much as we can at that time: -/// -/// - individual validation of Directory messages -/// - validation of insertion order (no upload of not-yet-known Directories) -/// - validation of size fields of referred Directories -/// -/// Internally it keeps all received Directories in a directed graph, -/// with node weights being the Directories and edges pointing to child -/// directories. -/// -/// Once all Directories have been inserted, a finalize function can be -/// called to get a (deduplicated and) validated list of directories, in -/// insertion order. -/// During finalize, a check for graph connectivity is performed too, to ensure -/// there's no disconnected components, and only one root. -#[derive(Default)] -pub struct ClosureValidator { - // A directed graph, using Directory as node weight, without edge weights. - // Edges point from parents to children. - graph: DirectoryGraph, - - // A lookup table from directory digest to node index. - digest_to_node_ix: HashMap<B3Digest, NodeIndex>, - - /// Keeps track of the last-inserted directory graph node index. - /// On a correct insert, this will be the root node, from which the DFS post - /// order traversal will start from. - last_directory_ix: Option<NodeIndex>, -} - -impl ClosureValidator { - /// Insert a new Directory into the closure. - /// Perform individual Directory validation, validation of insertion order - /// and size fields. - #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { - let digest = directory.digest(); - - // If we already saw this node previously, it's already validated and in the graph. - if self.digest_to_node_ix.contains_key(&digest) { - return Ok(()); - } - - // Do some general validation - directory - .validate() - .map_err(|e| Error::InvalidRequest(e.to_string()))?; - - // Ensure the directory only refers to directories which we already accepted. - // We lookup their node indices and add them to a HashSet. - let mut child_ixs = HashSet::new(); - for dir in &directory.directories { - let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated - - // Ensure the digest has already been seen - let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| { - Error::InvalidRequest(format!( - "'{}' refers to unseen child dir: {}", - dir.name.as_bstr(), - &child_digest - )) - })?; - - // Ensure the size specified in the child node matches the directory size itself. - let recorded_child_size = self - .graph - .node_weight(child_ix) - .expect("node not found") - .size(); - - // Ensure the size specified in the child node matches our records. - if dir.size != recorded_child_size { - return Err(Error::InvalidRequest(format!( - "'{}' has wrong size, specified {}, recorded {}", - dir.name.as_bstr(), - dir.size, - recorded_child_size - ))); - } - - child_ixs.insert(child_ix); - } - - // Insert node into the graph, and add edges to all children. - let node_ix = self.graph.add_node(directory); - for child_ix in child_ixs { - self.graph.add_edge(node_ix, child_ix, ()); - } - - // Record the mapping from digest to node_ix in our lookup table. - self.digest_to_node_ix.insert(digest, node_ix); - - // Update last_directory_ix. - self.last_directory_ix = Some(node_ix); - - Ok(()) - } - - /// Ensure that all inserted Directories are connected, then return a - /// (deduplicated) and validated list of directories, in from-leaves-to-root - /// order. - /// In case no elements have been inserted, returns an empty list. - #[instrument(level = "trace", skip_all, err)] - pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> { - let (graph, _) = match self.finalize_raw()? { - None => return Ok(vec![]), - Some(v) => v, - }; - // Dissolve the graph, returning the nodes as a Vec. - // As the graph was populated in a valid DFS PostOrder, we can return - // nodes in that same order. - let (nodes, _edges) = graph.into_nodes_edges(); - Ok(nodes.into_iter().map(|x| x.weight).collect()) - } - - /// Ensure that all inserted Directories are connected, then return a - /// (deduplicated) and validated list of directories, in from-root-to-leaves - /// order. - /// In case no elements have been inserted, returns an empty list. - #[instrument(level = "trace", skip_all, err)] - pub(crate) fn finalize_root_to_leaves(self) -> Result<Vec<Directory>, Error> { - let (graph, root) = match self.finalize_raw()? { - None => return Ok(vec![]), - Some(v) => v, - }; - - // do a BFS traversal of the graph, starting with the root node to get - // all nodes reachable from there. - let traversal = Bfs::new(&graph, root); - - let order = traversal.iter(&graph).collect::<Vec<_>>(); - - let (nodes, _edges) = graph.into_nodes_edges(); - - // Convert to option, so that we can take individual nodes out without messing up the - // indices - let mut nodes = nodes.into_iter().map(Some).collect::<Vec<_>>(); - - Ok(order - .iter() - .map(|i| nodes[i.index()].take().unwrap().weight) - .collect()) - } - - /// Internal implementation of closure validation - #[instrument(level = "trace", skip_all, err)] - fn finalize_raw(self) -> Result<Option<(DirectoryGraph, NodeIndex)>, Error> { - // If no nodes were inserted, an empty list is returned. - let last_directory_ix = if let Some(x) = self.last_directory_ix { - x - } else { - return Ok(None); - }; - - // do a BFS traversal of the graph, starting with the root node to get - // (the count of) all nodes reachable from there. - let mut traversal = Bfs::new(&self.graph, last_directory_ix); - - let mut visited_directory_count = 0; - #[cfg(debug_assertions)] - let mut visited_directory_ixs = HashSet::new(); - #[cfg_attr(not(debug_assertions), allow(unused))] - while let Some(directory_ix) = traversal.next(&self.graph) { - #[cfg(debug_assertions)] - visited_directory_ixs.insert(directory_ix); - - visited_directory_count += 1; - } - - // If the number of nodes collected equals the total number of nodes in - // the graph, we know all nodes are connected. - if visited_directory_count != self.graph.node_count() { - // more or less exhaustive error reporting. - #[cfg(debug_assertions)] - { - let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect(); - - let unvisited_directories: HashSet<_> = all_directory_ixs - .difference(&visited_directory_ixs) - .map(|ix| self.graph.node_weight(*ix).expect("node not found")) - .collect(); - - return Err(Error::InvalidRequest(format!( - "found {} disconnected directories: {:?}", - self.graph.node_count() - visited_directory_ixs.len(), - unvisited_directories - ))); - } - #[cfg(not(debug_assertions))] - { - return Err(Error::InvalidRequest(format!( - "found {} disconnected directories", - self.graph.node_count() - visited_directory_count - ))); - } - } - - Ok(Some((self.graph, last_directory_ix))) - } -} - -#[cfg(test)] -mod tests { - use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, - }; - use lazy_static::lazy_static; - use rstest::rstest; - - lazy_static! { - pub static ref BROKEN_DIRECTORY : Directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // invalid name! - target: "doesntmatter".into(), - }], - ..Default::default() - }; - - pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() - }; - } - - use super::ClosureValidator; - - #[rstest] - /// Uploading an empty directory should succeed. - #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] - /// Uploading A, then B (referring to A) should succeed. - #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] - /// Uploading A, then A, then C (referring to A twice) should succeed. - /// We pretend to be a dumb client not deduping directories. - #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] - /// Uploading A, then C (referring to A twice) should succeed. - #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] - /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, - /// as B itself would be left unconnected. - #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] - /// Uploading B (referring to A) should fail immediately, because A was never uploaded. - #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] - /// Uploading a directory failing validation should fail immediately. - #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] - /// Uploading a directory which refers to another Directory with a wrong size should fail. - #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] - fn test_uploads( - #[case] directories_to_upload: &[&Directory], - #[case] exp_fail_upload_last: bool, - #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not. - ) { - let mut dcv = ClosureValidator::default(); - let len_directories_to_upload = directories_to_upload.len(); - - for (i, d) in directories_to_upload.iter().enumerate() { - let resp = dcv.add((*d).clone()); - if i == len_directories_to_upload - 1 && exp_fail_upload_last { - assert!(resp.is_err(), "expect last put to fail"); - - // We don't really care anymore what finalize() would return, as - // the add() failed. - return; - } else { - assert!(resp.is_ok(), "expect put to succeed"); - } - } - - // everything was uploaded successfully. Test finalize(). - let resp = dcv.finalize(); - - match exp_finalize { - Some(directories) => { - assert_eq!( - Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), - resp.expect("drain should succeed") - ); - } - None => { - resp.expect_err("drain should fail"); - } - } - } -} |