use std::collections::{HashMap, HashSet}; use bstr::ByteSlice; use petgraph::{ graph::{DiGraph, NodeIndex}, visit::{Bfs, Walker}, }; use tracing::instrument; use crate::{ proto::{self, Directory}, B3Digest, Error, }; type DirectoryGraph = DiGraph<Directory, ()>; /// This can be used to validate a Directory closure (DAG of connected /// Directories), and their insertion order. /// /// Directories need to be inserted (via `add`), in an order from the leaves to /// the root (DFS Post-Order). /// During insertion, We validate as much as we can at that time: /// /// - individual validation of Directory messages /// - validation of insertion order (no upload of not-yet-known Directories) /// - validation of size fields of referred Directories /// /// Internally it keeps all received Directories in a directed graph, /// with node weights being the Directories and edges pointing to child /// directories. /// /// Once all Directories have been inserted, a finalize function can be /// called to get a (deduplicated and) validated list of directories, in /// insertion order. /// During finalize, a check for graph connectivity is performed too, to ensure /// there's no disconnected components, and only one root. #[derive(Default)] pub struct ClosureValidator { // A directed graph, using Directory as node weight, without edge weights. // Edges point from parents to children. graph: DirectoryGraph, // A lookup table from directory digest to node index. digest_to_node_ix: HashMap<B3Digest, NodeIndex>, /// Keeps track of the last-inserted directory graph node index. /// On a correct insert, this will be the root node, from which the DFS post /// order traversal will start from. last_directory_ix: Option<NodeIndex>, } impl ClosureValidator { /// Insert a new Directory into the closure. /// Perform individual Directory validation, validation of insertion order /// and size fields. #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { let digest = directory.digest(); // If we already saw this node previously, it's already validated and in the graph. if self.digest_to_node_ix.contains_key(&digest) { return Ok(()); } // Do some general validation directory .validate() .map_err(|e| Error::InvalidRequest(e.to_string()))?; // Ensure the directory only refers to directories which we already accepted. // We lookup their node indices and add them to a HashSet. let mut child_ixs = HashSet::new(); for dir in &directory.directories { let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated // Ensure the digest has already been seen let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| { Error::InvalidRequest(format!( "'{}' refers to unseen child dir: {}", dir.name.as_bstr(), &child_digest )) })?; // Ensure the size specified in the child node matches the directory size itself. let recorded_child_size = self .graph .node_weight(child_ix) .expect("node not found") .size(); // Ensure the size specified in the child node matches our records. if dir.size != recorded_child_size { return Err(Error::InvalidRequest(format!( "'{}' has wrong size, specified {}, recorded {}", dir.name.as_bstr(), dir.size, recorded_child_size ))); } child_ixs.insert(child_ix); } // Insert node into the graph, and add edges to all children. let node_ix = self.graph.add_node(directory); for child_ix in child_ixs { self.graph.add_edge(node_ix, child_ix, ()); } // Record the mapping from digest to node_ix in our lookup table. self.digest_to_node_ix.insert(digest, node_ix); // Update last_directory_ix. self.last_directory_ix = Some(node_ix); Ok(()) } /// Ensure that all inserted Directories are connected, then return a /// (deduplicated) and validated list of directories, in from-leaves-to-root /// order. /// In case no elements have been inserted, returns an empty list. #[instrument(level = "trace", skip_all, err)] pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> { let (graph, _) = match self.finalize_raw()? { None => return Ok(vec![]), Some(v) => v, }; // Dissolve the graph, returning the nodes as a Vec. // As the graph was populated in a valid DFS PostOrder, we can return // nodes in that same order. let (nodes, _edges) = graph.into_nodes_edges(); Ok(nodes.into_iter().map(|x| x.weight).collect()) } /// Ensure that all inserted Directories are connected, then return a /// (deduplicated) and validated list of directories, in from-root-to-leaves /// order. /// In case no elements have been inserted, returns an empty list. #[instrument(level = "trace", skip_all, err)] pub(crate) fn finalize_root_to_leaves(self) -> Result<Vec<Directory>, Error> { let (graph, root) = match self.finalize_raw()? { None => return Ok(vec![]), Some(v) => v, }; // do a BFS traversal of the graph, starting with the root node to get // all nodes reachable from there. let traversal = Bfs::new(&graph, root); let order = traversal.iter(&graph).collect::<Vec<_>>(); let (nodes, _edges) = graph.into_nodes_edges(); // Convert to option, so that we can take individual nodes out without messing up the // indices let mut nodes = nodes.into_iter().map(Some).collect::<Vec<_>>(); Ok(order .iter() .map(|i| nodes[i.index()].take().unwrap().weight) .collect()) } /// Internal implementation of closure validation #[instrument(level = "trace", skip_all, err)] fn finalize_raw(self) -> Result<Option<(DirectoryGraph, NodeIndex)>, Error> { // If no nodes were inserted, an empty list is returned. let last_directory_ix = if let Some(x) = self.last_directory_ix { x } else { return Ok(None); }; // do a BFS traversal of the graph, starting with the root node to get // (the count of) all nodes reachable from there. let mut traversal = Bfs::new(&self.graph, last_directory_ix); let mut visited_directory_count = 0; #[cfg(debug_assertions)] let mut visited_directory_ixs = HashSet::new(); #[cfg_attr(not(debug_assertions), allow(unused))] while let Some(directory_ix) = traversal.next(&self.graph) { #[cfg(debug_assertions)] visited_directory_ixs.insert(directory_ix); visited_directory_count += 1; } // If the number of nodes collected equals the total number of nodes in // the graph, we know all nodes are connected. if visited_directory_count != self.graph.node_count() { // more or less exhaustive error reporting. #[cfg(debug_assertions)] { let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect(); let unvisited_directories: HashSet<_> = all_directory_ixs .difference(&visited_directory_ixs) .map(|ix| self.graph.node_weight(*ix).expect("node not found")) .collect(); return Err(Error::InvalidRequest(format!( "found {} disconnected directories: {:?}", self.graph.node_count() - visited_directory_ixs.len(), unvisited_directories ))); } #[cfg(not(debug_assertions))] { return Err(Error::InvalidRequest(format!( "found {} disconnected directories", self.graph.node_count() - visited_directory_count ))); } } Ok(Some((self.graph, last_directory_ix))) } } #[cfg(test)] mod tests { use crate::{ fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, proto::{self, Directory}, }; use lazy_static::lazy_static; use rstest::rstest; lazy_static! { pub static ref BROKEN_DIRECTORY : Directory = Directory { symlinks: vec![proto::SymlinkNode { name: "".into(), // invalid name! target: "doesntmatter".into(), }], ..Default::default() }; pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { directories: vec![proto::DirectoryNode { name: "foo".into(), digest: DIRECTORY_A.digest().into(), size: DIRECTORY_A.size() + 42, // wrong! }], ..Default::default() }; } use super::ClosureValidator; #[rstest] /// Uploading an empty directory should succeed. #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] /// Uploading A, then B (referring to A) should succeed. #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] /// Uploading A, then A, then C (referring to A twice) should succeed. /// We pretend to be a dumb client not deduping directories. #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] /// Uploading A, then C (referring to A twice) should succeed. #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, /// as B itself would be left unconnected. #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] /// Uploading B (referring to A) should fail immediately, because A was never uploaded. #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] /// Uploading a directory failing validation should fail immediately. #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] /// Uploading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] fn test_uploads( #[case] directories_to_upload: &[&Directory], #[case] exp_fail_upload_last: bool, #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not. ) { let mut dcv = ClosureValidator::default(); let len_directories_to_upload = directories_to_upload.len(); for (i, d) in directories_to_upload.iter().enumerate() { let resp = dcv.add((*d).clone()); if i == len_directories_to_upload - 1 && exp_fail_upload_last { assert!(resp.is_err(), "expect last put to fail"); // We don't really care anymore what finalize() would return, as // the add() failed. return; } else { assert!(resp.is_ok(), "expect put to succeed"); } } // everything was uploaded successfully. Test finalize(). let resp = dcv.finalize(); match exp_finalize { Some(directories) => { assert_eq!( Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), resp.expect("drain should succeed") ); } None => { resp.expect_err("drain should fail"); } } } }