diff options
Diffstat (limited to 'tvix/castore/src/directoryservice/utils.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/utils.rs | 77 |
1 files changed, 77 insertions, 0 deletions
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs new file mode 100644 index 0000000000..a0ba395ecd --- /dev/null +++ b/tvix/castore/src/directoryservice/utils.rs @@ -0,0 +1,77 @@ +use super::DirectoryService; +use crate::proto; +use crate::B3Digest; +use crate::Error; +use async_stream::try_stream; +use futures::stream::BoxStream; +use std::collections::{HashSet, VecDeque}; +use tracing::instrument; +use tracing::warn; + +/// Traverses a [proto::Directory] from the root to the children. +/// +/// This is mostly BFS, but directories are only returned once. +#[instrument(skip(directory_service))] +pub fn traverse_directory<'a, DS: DirectoryService + 'static>( + directory_service: DS, + root_directory_digest: &B3Digest, +) -> BoxStream<'a, Result<proto::Directory, Error>> { + // The list of all directories that still need to be traversed. The next + // element is picked from the front, new elements are enqueued at the + // back. + let mut worklist_directory_digests: VecDeque<B3Digest> = + VecDeque::from([root_directory_digest.clone()]); + // The list of directory digests already sent to the consumer. + // We omit sending the same directories multiple times. + let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); + + Box::pin(try_stream! { + while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { + let current_directory = directory_service.get(¤t_directory_digest).await.map_err(|e| { + warn!("failed to look up directory"); + Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_digest, e + )) + })?.ok_or_else(|| { + // if it's not there, we have an inconsistent store! + warn!("directory {} does not exist", current_directory_digest); + Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + )) + + })?; + + // validate, we don't want to send invalid directories. + current_directory.validate().map_err(|e| { + warn!("directory failed validation: {}", e.to_string()); + Error::StorageError(format!( + "invalid directory: {}", + current_directory_digest + )) + })?; + + // We're about to send this directory, so let's avoid sending it again if a + // descendant has it. + sent_directory_digests.insert(current_directory_digest); + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + for child_directory_node in ¤t_directory.directories { + // TODO: propagate error + let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + + if worklist_directory_digests.contains(&child_digest) + || sent_directory_digests.contains(&child_digest) + { + continue; + } + worklist_directory_digests.push_back(child_digest); + } + + yield current_directory; + } + }) +} |