diff options
author | Florian Klink <flokli@flokli.de> | 2023-03-26T11·51+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-03-27T09·03+0000 |
commit | 2fe53cce40af94d9c8e6971cbf32073ecc77d4a1 (patch) | |
tree | a57a2daee29058143aec64efe335bfbc9cc55555 /tvix/store/src/directoryservice/mod.rs | |
parent | 2d305fd5b37fa7bf5a0512e8992b4557a1745296 (diff) |
feat(tvix/store/directorysvc): add DirectoryService::get_recursive() r/6046
This moves the recursive BFS traversal of Directory closures from the GRPCDirectoryServiceWrapper out into a a DirectoryTraverser struct implementing Iterator. It is then used from various implementors of DirectoryService in the `get_recursive()` method. This allows distinguishing between recursive requests and non-recursive requests in the gRPC client trait implementation. Change-Id: I50bfd4a0d9eb11832847329b78c587ec7c9dc7b1 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8351 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/directoryservice/mod.rs')
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index 1fc8fbc4c219..d7b143df3efe 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -1,3 +1,7 @@ +use std::collections::{HashSet, VecDeque}; + +use tracing::{debug_span, instrument, warn}; + use crate::{proto, Error}; mod grpc; mod memory; @@ -11,10 +15,119 @@ pub use self::sled::SledDirectoryService; /// This is a simple get and put of [crate::proto::Directory], returning their /// digest. pub trait DirectoryService { + type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send; + /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error>; /// Get uploads a single Directory message, and returns the calculated /// digest, or an error. fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error>; + + /// Looks up a closure of [proto::Directory]. + /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`, + /// and we'd be able to add a default implementation for it here, but + /// we can't have that yet. + fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator; +} + +/// Traverses a [proto::Directory] from the root to the children. +/// +/// This is mostly BFS, but directories are only returned once. +pub struct DirectoryTraverser<DS: DirectoryService> { + directory_service: DS, + /// The list of all directories that still need to be traversed. The next + /// element is picked from the front, new elements are enqueued at the + /// back. + worklist_directory_digests: VecDeque<[u8; 32]>, + /// The list of directory digests already sent to the consumer. + /// We omit sending the same directories multiple times. + sent_directory_digests: HashSet<[u8; 32]>, +} + +impl<DS: DirectoryService> DirectoryTraverser<DS> { + pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self { + Self { + directory_service, + worklist_directory_digests: VecDeque::from([*root_directory_digest]), + sent_directory_digests: HashSet::new(), + } + } + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + fn enqueue_child_directories(&mut self, directory: &proto::Directory) { + for child_directory_node in &directory.directories { + let child_digest: [u8; 32] = child_directory_node + .digest + .as_slice() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string())) + .unwrap(); + + if self.worklist_directory_digests.contains(&child_digest) + || self.sent_directory_digests.contains(&child_digest) + { + continue; + } + self.worklist_directory_digests.push_back(child_digest); + } + } +} + +impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { + type Item = Result<proto::Directory, Error>; + + #[instrument(skip_all)] + fn next(&mut self) -> Option<Self::Item> { + // fetch the next directory digest from the top of the work queue. + match self.worklist_directory_digests.pop_front() { + None => None, + Some(current_directory_digest) => { + let current_directory_b64 = data_encoding::BASE64.encode(¤t_directory_digest); + let span = debug_span!("directory.digest", current_directory_b64); + let _ = span.enter(); + + // look up the directory itself. + let current_directory = match self.directory_service.get(¤t_directory_digest) + { + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Some(Err(Error::StorageError(format!( + "invalid directory: {}", + current_directory_b64 + )))); + } + current_directory + } + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_b64); + return Some(Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_b64 + )))); + } + Err(e) => { + warn!("failed to look up directory"); + return Some(Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_b64, e + )))); + } + }; + + // All DirectoryServices MUST validate directory nodes, before returning them out, so we + // can be sure [enqueue_child_directories] doesn't panic. + + // enqueue child directories + self.enqueue_child_directories(¤t_directory); + Some(Ok(current_directory)) + } + } + } } |