use std::collections::{HashSet, VecDeque}; use tracing::{debug_span, instrument, warn}; use crate::{proto, Error}; mod grpc; mod memory; mod sled; pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; pub use self::sled::SledDirectoryService; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their /// digest. pub trait DirectoryService { type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send; /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error>; /// Get uploads a single Directory message, and returns the calculated /// digest, or an error. fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error>; /// Looks up a closure of [proto::Directory]. /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator; } /// Traverses a [proto::Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. pub struct DirectoryTraverser<DS: DirectoryService> { directory_service: DS, /// The list of all directories that still need to be traversed. The next /// element is picked from the front, new elements are enqueued at the /// back. worklist_directory_digests: VecDeque<[u8; 32]>, /// The list of directory digests already sent to the consumer. /// We omit sending the same directories multiple times. sent_directory_digests: HashSet<[u8; 32]>, } impl<DS: DirectoryService> DirectoryTraverser<DS> { pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self { Self { directory_service, worklist_directory_digests: VecDeque::from([*root_directory_digest]), sent_directory_digests: HashSet::new(), } } // enqueue all child directory digests to the work queue, as // long as they're not part of the worklist or already sent. // This panics if the digest looks invalid, it's supposed to be checked first. fn enqueue_child_directories(&mut self, directory: &proto::Directory) { for child_directory_node in &directory.directories { let child_digest: [u8; 32] = child_directory_node .digest .as_slice() .try_into() .map_err(|_e| Error::StorageError("invalid digest length".to_string())) .unwrap(); if self.worklist_directory_digests.contains(&child_digest) || self.sent_directory_digests.contains(&child_digest) { continue; } self.worklist_directory_digests.push_back(child_digest); } } } impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { type Item = Result<proto::Directory, Error>; #[instrument(skip_all)] fn next(&mut self) -> Option<Self::Item> { // fetch the next directory digest from the top of the work queue. match self.worklist_directory_digests.pop_front() { None => None, Some(current_directory_digest) => { let current_directory_b64 = data_encoding::BASE64.encode(¤t_directory_digest); let span = debug_span!("directory.digest", current_directory_b64); let _ = span.enter(); // look up the directory itself. let current_directory = match self.directory_service.get(¤t_directory_digest) { // if we got it Ok(Some(current_directory)) => { // validate, we don't want to send invalid directories. if let Err(e) = current_directory.validate() { warn!("directory failed validation: {}", e.to_string()); return Some(Err(Error::StorageError(format!( "invalid directory: {}", current_directory_b64 )))); } current_directory } // if it's not there, we have an inconsistent store! Ok(None) => { warn!("directory {} does not exist", current_directory_b64); return Some(Err(Error::StorageError(format!( "directory {} does not exist", current_directory_b64 )))); } Err(e) => { warn!("failed to look up directory"); return Some(Err(Error::StorageError(format!( "unable to look up directory {}: {}", current_directory_b64, e )))); } }; // All DirectoryServices MUST validate directory nodes, before returning them out, so we // can be sure [enqueue_child_directories] doesn't panic. // enqueue child directories self.enqueue_child_directories(¤t_directory); Some(Ok(current_directory)) } } } }