use std::collections::{HashMap, HashSet}; use bstr::ByteSlice; use tracing::{instrument, trace}; use crate::{ proto::{self, Directory}, B3Digest, Error, }; /// This can be used to validate a Directory closure (DAG of connected /// Directories), and their insertion order. /// /// Directories need to be inserted (via `add`), in an order from the leaves to /// the root. During insertion, We validate as much as we can at that time: /// /// - individual validation of Directory messages /// - validation of insertion order (no upload of not-yet-known Directories) /// - validation of size fields of referred Directories /// /// Internally it keeps all received Directories (and their sizes) in a HashMap, /// keyed by digest. /// /// Once all Directories have been inserted, a finalize function can be /// called to get a (deduplicated and) validated list of directories, in /// insertion order. /// During finalize, a check for graph connectivity is performed too, to ensure /// there's no disconnected components, and only one root. #[derive(Default)] pub struct ClosureValidator { directories_and_sizes: HashMap, /// Keeps track of the last-inserted directory digest. Used to start the /// connectivity check. last_directory_digest: Option, } impl ClosureValidator { /// Insert a new Directory into the closure. /// Perform individual Directory validation, validation of insertion order /// and size fields. #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { let digest = directory.digest(); if self.directories_and_sizes.contains_key(&digest) { trace!("already seen, skipping"); return Ok(()); } // Do some general validation directory .validate() .map_err(|e| Error::InvalidRequest(e.to_string()))?; // Ensure the directory only refers to directories which we already accepted. for dir in &directory.directories { let dir_dgst = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated // Ensure the digest has already been seen let (_, recorded_dir_size) = self.directories_and_sizes.get(&dir_dgst).ok_or_else(|| { Error::InvalidRequest(format!( "'{}' refers to unseen child dir: {}", dir.name.as_bstr(), dir_dgst )) })?; // Ensure the size specified in the child node matches our records. if dir.size != *recorded_dir_size { return Err(Error::InvalidRequest(format!( "'{}' has wrong size, specified {}, recorded {}", dir.name.as_bstr(), dir.size, recorded_dir_size ))); } } trace!("inserting"); let directory_size = directory.size(); self.directories_and_sizes .insert(digest.clone(), (directory, directory_size)); self.last_directory_digest = Some(digest); Ok(()) } /// Ensure that all inserted Directories are connected, then return a /// (deduplicated) and validated list of directories, in from-leaves-to-root /// order. /// In case no elements have been inserted, returns an empty list. #[instrument(level = "trace", skip_all, err)] pub(crate) fn finalize(mut self) -> Result, Error> { if self.last_directory_digest.is_none() { return Ok(vec![]); } let root_digest = self.last_directory_digest.unwrap(); // recursively walk all directories reachable from there, // ensure we visited all nodes that were uploaded. // If not, we might have received multiple disconnected graphs. // The list of directories we know we need to visit. // Once we're finished working, and (there's no errors), this in reversed order will // be a valid insertion order, and directories_and_sizes will be empty. let mut dirs_to_visit = Vec::with_capacity(self.directories_and_sizes.len()); dirs_to_visit.push( self.directories_and_sizes .remove(&root_digest) .expect("root digest not found") .0, ); // The set of digests seen while going visiting all directories. let mut seen_dir_digests = HashSet::new(); // This loop moves gradually to the end of `dirs_to_visit`, while it's being // extended. // The loop stops once it reaches the end. let mut i = 0; while let Some(directory) = dirs_to_visit.get(i).map(|d| d.to_owned()) { // lookup all child directories and put them in the back of dirs_to_visit, // if they're not already there. for child_dir in &directory.directories { let child_digest = B3Digest::try_from(child_dir.digest.to_owned()).unwrap(); // validated // In case the digest is neither already visited nor already in dirs_to_visit, // add it to the end of it. // We don't need to check for the hash we're currently // visiting, as we can't self-reference. if !seen_dir_digests.contains(&child_digest) { dirs_to_visit.push( self.directories_and_sizes .remove(&child_digest) .expect("child digest not found") .0, ); seen_dir_digests.insert(child_digest); } } i += 1; } // check directories_and_sizes is empty. if !self.directories_and_sizes.is_empty() { if cfg!(debug_assertions) { return Err(Error::InvalidRequest(format!( "found {} disconnected nodes: {:?}", self.directories_and_sizes.len(), self.directories_and_sizes ))); } else { return Err(Error::InvalidRequest(format!( "found {} disconnected nodes", self.directories_and_sizes.len() ))); } } // Reverse to have correct insertion order. dirs_to_visit.reverse(); Ok(dirs_to_visit) } } #[cfg(test)] mod tests { use crate::{ fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, proto::{self, Directory}, }; use lazy_static::lazy_static; use rstest::rstest; lazy_static! { pub static ref BROKEN_DIRECTORY : Directory = Directory { symlinks: vec![proto::SymlinkNode { name: "".into(), // invalid name! target: "doesntmatter".into(), }], ..Default::default() }; pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { directories: vec![proto::DirectoryNode { name: "foo".into(), digest: DIRECTORY_A.digest().into(), size: DIRECTORY_A.size() + 42, // wrong! }], ..Default::default() }; } use super::ClosureValidator; #[rstest] /// Uploading an empty directory should succeed. #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] /// Uploading A, then B (referring to A) should succeed. #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] /// Uploading A, then A, then C (referring to A twice) should succeed. /// We pretend to be a dumb client not deduping directories. #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] /// Uploading A, then C (referring to A twice) should succeed. #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, /// as B itself would be left unconnected. #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] /// Uploading B (referring to A) should fail immediately, because A was never uploaded. #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] /// Uploading a directory failing validation should fail immediately. #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] /// Uploading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] fn test_uploads( #[case] directories_to_upload: &[&Directory], #[case] exp_fail_upload_last: bool, #[case] exp_finalize: Option>, // Some(_) if finalize successful, None if not. ) { let mut dcv = ClosureValidator::default(); let len_directories_to_upload = directories_to_upload.len(); for (i, d) in directories_to_upload.iter().enumerate() { let resp = dcv.add((*d).clone()); if i == len_directories_to_upload - 1 && exp_fail_upload_last { assert!(resp.is_err(), "expect last put to fail"); // We don't really care anymore what finalize() would return, as // the add() failed. return; } else { assert!(resp.is_ok(), "expect put to succeed"); } } // everything was uploaded successfully. Test drain(). let resp = dcv.finalize(); match exp_finalize { Some(exp_drain) => { assert_eq!( Vec::from_iter(exp_drain.iter().map(|e| (*e).to_owned())), resp.expect("drain should succeed") ); } None => { resp.expect_err("drain should fail"); } } } }