diff options
author | Florian Klink <flokli@flokli.de> | 2023-03-27T12·47+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-03-31T10·54+0000 |
commit | 5bf913432424ad29e976f4acad535aa1340d84fe (patch) | |
tree | 14dbc2cc4bc5fa4b896cc745ac5c02555d60c774 /tvix/store/src/directoryservice/mod.rs | |
parent | e9686f84d9be4aa5adace6ff531ff00558239f75 (diff) |
refactor(tvix/store/directorysvc): move DirectoryTraverser to utils r/6065
Change-Id: Ie60a660e0fda7c80a6c7de20404c1965fe0e0d63 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8355 Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/directoryservice/mod.rs')
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 107 |
1 files changed, 2 insertions, 105 deletions
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index d7b143df3efe..53f6f08b14f1 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -1,15 +1,13 @@ -use std::collections::{HashSet, VecDeque}; - -use tracing::{debug_span, instrument, warn}; - use crate::{proto, Error}; mod grpc; mod memory; mod sled; +mod utils; pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; pub use self::sled::SledDirectoryService; +pub use self::utils::DirectoryTraverser; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their @@ -30,104 +28,3 @@ pub trait DirectoryService { /// we can't have that yet. fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator; } - -/// Traverses a [proto::Directory] from the root to the children. -/// -/// This is mostly BFS, but directories are only returned once. -pub struct DirectoryTraverser<DS: DirectoryService> { - directory_service: DS, - /// The list of all directories that still need to be traversed. The next - /// element is picked from the front, new elements are enqueued at the - /// back. - worklist_directory_digests: VecDeque<[u8; 32]>, - /// The list of directory digests already sent to the consumer. - /// We omit sending the same directories multiple times. - sent_directory_digests: HashSet<[u8; 32]>, -} - -impl<DS: DirectoryService> DirectoryTraverser<DS> { - pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self { - Self { - directory_service, - worklist_directory_digests: VecDeque::from([*root_directory_digest]), - sent_directory_digests: HashSet::new(), - } - } - - // enqueue all child directory digests to the work queue, as - // long as they're not part of the worklist or already sent. - // This panics if the digest looks invalid, it's supposed to be checked first. - fn enqueue_child_directories(&mut self, directory: &proto::Directory) { - for child_directory_node in &directory.directories { - let child_digest: [u8; 32] = child_directory_node - .digest - .as_slice() - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string())) - .unwrap(); - - if self.worklist_directory_digests.contains(&child_digest) - || self.sent_directory_digests.contains(&child_digest) - { - continue; - } - self.worklist_directory_digests.push_back(child_digest); - } - } -} - -impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { - type Item = Result<proto::Directory, Error>; - - #[instrument(skip_all)] - fn next(&mut self) -> Option<Self::Item> { - // fetch the next directory digest from the top of the work queue. - match self.worklist_directory_digests.pop_front() { - None => None, - Some(current_directory_digest) => { - let current_directory_b64 = data_encoding::BASE64.encode(¤t_directory_digest); - let span = debug_span!("directory.digest", current_directory_b64); - let _ = span.enter(); - - // look up the directory itself. - let current_directory = match self.directory_service.get(¤t_directory_digest) - { - // if we got it - Ok(Some(current_directory)) => { - // validate, we don't want to send invalid directories. - if let Err(e) = current_directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Some(Err(Error::StorageError(format!( - "invalid directory: {}", - current_directory_b64 - )))); - } - current_directory - } - // if it's not there, we have an inconsistent store! - Ok(None) => { - warn!("directory {} does not exist", current_directory_b64); - return Some(Err(Error::StorageError(format!( - "directory {} does not exist", - current_directory_b64 - )))); - } - Err(e) => { - warn!("failed to look up directory"); - return Some(Err(Error::StorageError(format!( - "unable to look up directory {}: {}", - current_directory_b64, e - )))); - } - }; - - // All DirectoryServices MUST validate directory nodes, before returning them out, so we - // can be sure [enqueue_child_directories] doesn't panic. - - // enqueue child directories - self.enqueue_child_directories(¤t_directory); - Some(Ok(current_directory)) - } - } - } -} |