diff options
Diffstat (limited to 'tvix/store/src/directoryservice/utils.rs')
-rw-r--r-- | tvix/store/src/directoryservice/utils.rs | 161 |
1 files changed, 72 insertions, 89 deletions
diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs index 95f02f1f9ce8..4c5e7cfde37c 100644 --- a/tvix/store/src/directoryservice/utils.rs +++ b/tvix/store/src/directoryservice/utils.rs @@ -3,103 +3,85 @@ use super::DirectoryService; use crate::proto; use crate::B3Digest; use crate::Error; +use async_stream::stream; +use futures::Stream; use std::collections::{HashSet, VecDeque}; -use tracing::{debug_span, instrument, warn}; +use std::pin::Pin; +use tonic::async_trait; +use tracing::warn; /// Traverses a [proto::Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. -pub struct DirectoryTraverser<DS: DirectoryService> { +pub fn traverse_directory<DS: DirectoryService + 'static>( directory_service: DS, - /// The list of all directories that still need to be traversed. The next - /// element is picked from the front, new elements are enqueued at the - /// back. - worklist_directory_digests: VecDeque<B3Digest>, - /// The list of directory digests already sent to the consumer. - /// We omit sending the same directories multiple times. - sent_directory_digests: HashSet<B3Digest>, -} - -impl<DS: DirectoryService> DirectoryTraverser<DS> { - pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self { - Self { - directory_service, - worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]), - sent_directory_digests: HashSet::new(), - } - } - - // enqueue all child directory digests to the work queue, as - // long as they're not part of the worklist or already sent. - // This panics if the digest looks invalid, it's supposed to be checked first. - fn enqueue_child_directories(&mut self, directory: &proto::Directory) { - for child_directory_node in &directory.directories { - // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); - - if self.worklist_directory_digests.contains(&child_digest) - || self.sent_directory_digests.contains(&child_digest) - { - continue; - } - self.worklist_directory_digests.push_back(child_digest); - } - } -} - -impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { - type Item = Result<proto::Directory, Error>; - - #[instrument(skip_all)] - fn next(&mut self) -> Option<Self::Item> { - // fetch the next directory digest from the top of the work queue. - match self.worklist_directory_digests.pop_front() { - None => None, - Some(current_directory_digest) => { - let span = debug_span!("directory.digest", "{}", current_directory_digest); - let _ = span.enter(); - - // look up the directory itself. - let current_directory = match self.directory_service.get(¤t_directory_digest) - { - // if we got it - Ok(Some(current_directory)) => { - // validate, we don't want to send invalid directories. - if let Err(e) = current_directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Some(Err(Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - )))); - } - current_directory - } - // if it's not there, we have an inconsistent store! - Ok(None) => { - warn!("directory {} does not exist", current_directory_digest); - return Some(Err(Error::StorageError(format!( - "directory {} does not exist", + root_directory_digest: &B3Digest, +) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + // The list of all directories that still need to be traversed. The next + // element is picked from the front, new elements are enqueued at the + // back. + let mut worklist_directory_digests: VecDeque<B3Digest> = + VecDeque::from([root_directory_digest.clone()]); + // The list of directory digests already sent to the consumer. + // We omit sending the same directories multiple times. + let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); + + let stream = stream! { + while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { + match directory_service.get(¤t_directory_digest).await { + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_digest); + yield Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + ))); + } + Err(e) => { + warn!("failed to look up directory"); + yield Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_digest, e + ))); + } + + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + yield Err(Error::StorageError(format!( + "invalid directory: {}", current_directory_digest - )))); - } - Err(e) => { - warn!("failed to look up directory"); - return Some(Err(Error::StorageError(format!( - "unable to look up directory {}: {}", - current_directory_digest, e - )))); + ))); } - }; - // All DirectoryServices MUST validate directory nodes, before returning them out, so we - // can be sure [enqueue_child_directories] doesn't panic. + // We're about to send this directory, so let's avoid sending it again if a + // descendant has it. + sent_directory_digests.insert(current_directory_digest); + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + for child_directory_node in ¤t_directory.directories { + // TODO: propagate error + let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + + if worklist_directory_digests.contains(&child_digest) + || sent_directory_digests.contains(&child_digest) + { + continue; + } + worklist_directory_digests.push_back(child_digest); + } - // enqueue child directories - self.enqueue_child_directories(¤t_directory); - Some(Ok(current_directory)) - } + yield Ok(current_directory); + } + }; } - } + }; + + Box::pin(stream) } /// This is a simple implementation of a Directory uploader. @@ -120,13 +102,14 @@ impl<DS: DirectoryService> SimplePutter<DS> { } } +#[async_trait] impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { - fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { if self.closed { return Err(Error::StorageError("already closed".to_string())); } - let digest = self.directory_service.put(directory)?; + let digest = self.directory_service.put(directory).await?; // track the last directory digest self.last_directory_digest = Some(digest); @@ -135,7 +118,7 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { } /// We need to be mutable here, as that's the signature of the trait. - fn close(&mut self) -> Result<B3Digest, Error> { + async fn close(&mut self) -> Result<B3Digest, Error> { if self.closed { return Err(Error::StorageError("already closed".to_string())); } |