use std::collections::HashMap; use petgraph::{ graph::{DiGraph, NodeIndex}, visit::{Bfs, DfsPostOrder, EdgeRef, IntoNodeIdentifiers, Walker}, Direction, Incoming, }; use tracing::instrument; use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; use crate::{path::PathComponent, B3Digest, Directory, Node}; #[derive(thiserror::Error, Debug)] pub enum Error { #[error("{0}")] ValidationError(String), } struct EdgeWeight { name: PathComponent, size: u64, } /// This can be used to validate and/or re-order a Directory closure (DAG of /// connected Directories), and their insertion order. /// /// The DirectoryGraph is parametrized on the insertion order, and can be /// constructed using the Default trait, or using `with_order` if the /// OrderValidator needs to be customized. /// /// If the user is receiving directories from canonical protobuf encoding in /// root-to-leaves order, and parsing them, she can call `digest_allowed` /// _before_ parsing the protobuf record and then add it with `add_unchecked`. /// All other users insert the directories via `add`, in their specified order. /// During insertion, we validate as much as we can at that time: /// /// - individual validation of Directory messages /// - validation of insertion order /// - validation of size fields of referred Directories /// /// Internally it keeps all received Directories in a directed graph, /// with node weights being the Directories and edges pointing to child/parent /// directories. /// /// Once all Directories have been inserted, a validate function can be /// called to perform a check for graph connectivity and ensure there's no /// disconnected components or missing nodes. /// Finally, the `drain_leaves_to_root` or `drain_root_to_leaves` can be /// _chained_ on validate to get an iterator over the (deduplicated and) /// validated list of directories in either order. #[derive(Default)] pub struct DirectoryGraph { // A directed graph, using Directory as node weight. // Edges point from parents to children. // // Nodes with None weigths might exist when a digest has been referred to but the directory // with this digest has not yet been sent. // // The option in the edge weight tracks the pending validation state of the respective edge, for example if // the child has not been added yet. graph: DiGraph, Option>, // A lookup table from directory digest to node index. digest_to_node_ix: HashMap, order_validator: O, } pub struct ValidatedDirectoryGraph { graph: DiGraph, Option>, root: Option, } fn check_edge(edge: &EdgeWeight, child: &Directory) -> Result<(), Error> { // Ensure the size specified in the child node matches our records. if edge.size != child.size() { return Err(Error::ValidationError(format!( "'{}' has wrong size, specified {}, recorded {}", edge.name, edge.size, child.size(), ))); } Ok(()) } impl DirectoryGraph { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] pub fn add(&mut self, directory: Directory) -> Result<(), Error> { if !self.order_validator.add_directory(&directory) { return Err(Error::ValidationError( "unknown directory was referenced".into(), )); } self.add_order_unchecked(directory) } } impl DirectoryGraph { /// If the user is parsing directories from canonical protobuf encoding, she can /// call `digest_allowed` _before_ parsing the protobuf record and then add it /// with `add_unchecked`. pub fn digest_allowed(&self, digest: B3Digest) -> bool { self.order_validator.digest_allowed(&digest) } /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] pub fn add(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); if !self.order_validator.digest_allowed(&digest) { return Err(Error::ValidationError("unexpected digest".into())); } self.order_validator.add_directory_unchecked(&directory); self.add_order_unchecked(directory) } } impl DirectoryGraph { /// Customize the ordering, i.e. for pre-setting the root of the RootToLeavesValidator pub fn with_order(order_validator: O) -> Self { Self { graph: Default::default(), digest_to_node_ix: Default::default(), order_validator, } } /// Adds a directory which has already been confirmed to be in-order to the graph pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); // Teach the graph about the existence of a node with this digest let ix = *self .digest_to_node_ix .entry(digest) .or_insert_with(|| self.graph.add_node(None)); if self.graph[ix].is_some() { // The node is already in the graph, there is nothing to do here. return Ok(()); } // set up edges to all child directories for (name, node) in directory.nodes() { if let Node::Directory { digest, size } = node { let child_ix = *self .digest_to_node_ix .entry(digest.clone()) .or_insert_with(|| self.graph.add_node(None)); let pending_edge_check = match &self.graph[child_ix] { Some(child) => { // child is already available, validate the edge now check_edge( &EdgeWeight { name: name.clone(), size: *size, }, child, )?; None } None => Some(EdgeWeight { name: name.clone(), size: *size, }), // pending validation }; self.graph.add_edge(ix, child_ix, pending_edge_check); } } // validate the edges from parents to this node // this collects edge ids in a Vec because there is no edges_directed_mut :'c for edge_id in self .graph .edges_directed(ix, Direction::Incoming) .map(|edge_ref| edge_ref.id()) .collect::>() .into_iter() { let edge_weight = self .graph .edge_weight_mut(edge_id) .expect("edge not found") .take() .expect("edge is already validated"); check_edge(&edge_weight, &directory)?; } // finally, store the directory information in the node weight self.graph[ix] = Some(directory); Ok(()) } #[instrument(level = "trace", skip_all, err)] pub fn validate(self) -> Result { // find all initial nodes (nodes without incoming edges) let mut roots = self .graph .node_identifiers() .filter(|&a| self.graph.neighbors_directed(a, Incoming).next().is_none()); let root = roots.next(); if roots.next().is_some() { return Err(Error::ValidationError( "graph has disconnected roots".into(), )); } // test that the graph is complete if self.graph.raw_nodes().iter().any(|n| n.weight.is_none()) { return Err(Error::ValidationError("graph is incomplete".into())); } Ok(ValidatedDirectoryGraph { graph: self.graph, root, }) } } impl ValidatedDirectoryGraph { /// Return the list of directories in from-root-to-leaves order. /// In case no elements have been inserted, returns an empty list. /// /// panics if the specified root is not in the graph #[instrument(level = "trace", skip_all)] pub fn drain_root_to_leaves(self) -> impl Iterator { let order = match self.root { Some(root) => { // do a BFS traversal of the graph, starting with the root node Bfs::new(&self.graph, root) .iter(&self.graph) .collect::>() } None => vec![], // No nodes have been inserted, do not traverse }; let (mut nodes, _edges) = self.graph.into_nodes_edges(); order .into_iter() .filter_map(move |i| nodes[i.index()].weight.take()) } /// Return the list of directories in from-leaves-to-root order. /// In case no elements have been inserted, returns an empty list. /// /// panics when the specified root is not in the graph #[instrument(level = "trace", skip_all)] pub fn drain_leaves_to_root(self) -> impl Iterator { let order = match self.root { Some(root) => { // do a DFS Post-Order traversal of the graph, starting with the root node DfsPostOrder::new(&self.graph, root) .iter(&self.graph) .collect::>() } None => vec![], // No nodes have been inserted, do not traverse }; let (mut nodes, _edges) = self.graph.into_nodes_edges(); order .into_iter() .filter_map(move |i| nodes[i.index()].weight.take()) } } #[cfg(test)] mod tests { use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; use crate::{Directory, Node}; use rstest::rstest; use std::sync::LazyLock; use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; pub static BROKEN_PARENT_DIRECTORY: LazyLock = LazyLock::new(|| { Directory::try_from_iter([( "foo".try_into().unwrap(), Node::Directory { digest: DIRECTORY_A.digest(), size: DIRECTORY_A.size() + 42, // wrong! }, )]) .unwrap() }); #[rstest] /// Uploading an empty directory should succeed. #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] /// Uploading A, then B (referring to A) should succeed. #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] /// Uploading A, then A, then C (referring to A twice) should succeed. /// We pretend to be a dumb client not deduping directories. #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] /// Uploading A, then C (referring to A twice) should succeed. #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, /// as B itself would be left unconnected. #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] /// Uploading B (referring to A) should fail immediately, because A was never uploaded. #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] /// Uploading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] fn test_uploads( #[case] directories_to_upload: &[&Directory], #[case] exp_fail_upload_last: bool, #[case] exp_finalize: Option>, // Some(_) if finalize successful, None if not. ) { let mut dcv = DirectoryGraph::::default(); let len_directories_to_upload = directories_to_upload.len(); for (i, d) in directories_to_upload.iter().enumerate() { let resp = dcv.add((*d).clone()); if i == len_directories_to_upload - 1 && exp_fail_upload_last { assert!(resp.is_err(), "expect last put to fail"); // We don't really care anymore what finalize() would return, as // the add() failed. return; } else { assert!(resp.is_ok(), "expect put to succeed"); } } // everything was uploaded successfully. Test finalize(). let resp = dcv .validate() .map(|validated| validated.drain_leaves_to_root().collect::>()); match exp_finalize { Some(directories) => { assert_eq!( Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), resp.expect("drain should succeed") ); } None => { resp.expect_err("drain should fail"); } } } #[rstest] /// Downloading an empty directory should succeed. #[case::empty_directory(&*DIRECTORY_A, &[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] /// Downlading B, then A (referenced by B) should succeed. #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_B, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] /// Downloading C (referring to A twice), then A should succeed. #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root) #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)] /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)] /// Downloading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)] fn test_downloads( #[case] root: &Directory, #[case] directories_to_upload: &[&Directory], #[case] exp_fail_upload_last: bool, #[case] exp_finalize: Option>, // Some(_) if finalize successful, None if not. ) { let mut dcv = DirectoryGraph::with_order(RootToLeavesValidator::new_with_root_digest(root.digest())); let len_directories_to_upload = directories_to_upload.len(); for (i, d) in directories_to_upload.iter().enumerate() { let resp = dcv.add((*d).clone()); if i == len_directories_to_upload - 1 && exp_fail_upload_last { assert!(resp.is_err(), "expect last put to fail"); // We don't really care anymore what finalize() would return, as // the add() failed. return; } else { assert!(resp.is_ok(), "expect put to succeed"); } } // everything was uploaded successfully. Test finalize(). let resp = dcv .validate() .map(|validated| validated.drain_leaves_to_root().collect::>()); match exp_finalize { Some(directories) => { assert_eq!( Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), resp.expect("drain should succeed") ); } None => { resp.expect_err("drain should fail"); } } } }