diff options
author | Florian Klink <flokli@flokli.de> | 2023-09-21T19·32+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-09-22T12·51+0000 |
commit | 32f41458c0a0f62bf906021ef096c465ccc45581 (patch) | |
tree | 3aaab8c453871f39c46fb43f8278aa933b24519d /tvix/store/src/directoryservice/utils.rs | |
parent | d8ef0cfb4a859af7e33828b013356412d02532da (diff) |
refactor(tvix): move castore into tvix-castore crate r/6629
This splits the pure content-addressed layers from tvix-store into a `castore` crate, and only leaves PathInfo related things, as well as the CLI entrypoint in the tvix-store crate. Notable changes: - `fixtures` and `utils` had to be moved out of the `test` cfg, so they can be imported from tvix-store. - Some ad-hoc fixtures in the test were moved to proper fixtures in the same step. - The protos are now created by a (more static) recipe in the protos/ directory. The (now two) golang targets are commented out, as it's not possible to update them properly in the same CL. This will be done by a followup CL once this is merged (and whitby deployed) Bug: https://b.tvl.fyi/issues/301 Change-Id: I8d675d4bf1fb697eb7d479747c1b1e3635718107 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9370 Reviewed-by: tazjin <tazjin@tvl.su> Reviewed-by: flokli <flokli@flokli.de> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix/store/src/directoryservice/utils.rs')
-rw-r--r-- | tvix/store/src/directoryservice/utils.rs | 140 |
1 files changed, 0 insertions, 140 deletions
diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs deleted file mode 100644 index 4c5e7cfde37c..000000000000 --- a/tvix/store/src/directoryservice/utils.rs +++ /dev/null @@ -1,140 +0,0 @@ -use super::DirectoryPutter; -use super::DirectoryService; -use crate::proto; -use crate::B3Digest; -use crate::Error; -use async_stream::stream; -use futures::Stream; -use std::collections::{HashSet, VecDeque}; -use std::pin::Pin; -use tonic::async_trait; -use tracing::warn; - -/// Traverses a [proto::Directory] from the root to the children. -/// -/// This is mostly BFS, but directories are only returned once. -pub fn traverse_directory<DS: DirectoryService + 'static>( - directory_service: DS, - root_directory_digest: &B3Digest, -) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { - // The list of all directories that still need to be traversed. The next - // element is picked from the front, new elements are enqueued at the - // back. - let mut worklist_directory_digests: VecDeque<B3Digest> = - VecDeque::from([root_directory_digest.clone()]); - // The list of directory digests already sent to the consumer. - // We omit sending the same directories multiple times. - let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); - - let stream = stream! { - while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { - match directory_service.get(¤t_directory_digest).await { - // if it's not there, we have an inconsistent store! - Ok(None) => { - warn!("directory {} does not exist", current_directory_digest); - yield Err(Error::StorageError(format!( - "directory {} does not exist", - current_directory_digest - ))); - } - Err(e) => { - warn!("failed to look up directory"); - yield Err(Error::StorageError(format!( - "unable to look up directory {}: {}", - current_directory_digest, e - ))); - } - - // if we got it - Ok(Some(current_directory)) => { - // validate, we don't want to send invalid directories. - if let Err(e) = current_directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - yield Err(Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - ))); - } - - // We're about to send this directory, so let's avoid sending it again if a - // descendant has it. - sent_directory_digests.insert(current_directory_digest); - - // enqueue all child directory digests to the work queue, as - // long as they're not part of the worklist or already sent. - // This panics if the digest looks invalid, it's supposed to be checked first. - for child_directory_node in ¤t_directory.directories { - // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); - - if worklist_directory_digests.contains(&child_digest) - || sent_directory_digests.contains(&child_digest) - { - continue; - } - worklist_directory_digests.push_back(child_digest); - } - - yield Ok(current_directory); - } - }; - } - }; - - Box::pin(stream) -} - -/// This is a simple implementation of a Directory uploader. -/// TODO: verify connectivity? Factor out these checks into generic helpers? -pub struct SimplePutter<DS: DirectoryService> { - directory_service: DS, - last_directory_digest: Option<B3Digest>, - closed: bool, -} - -impl<DS: DirectoryService> SimplePutter<DS> { - pub fn new(directory_service: DS) -> Self { - Self { - directory_service, - closed: false, - last_directory_digest: None, - } - } -} - -#[async_trait] -impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { - if self.closed { - return Err(Error::StorageError("already closed".to_string())); - } - - let digest = self.directory_service.put(directory).await?; - - // track the last directory digest - self.last_directory_digest = Some(digest); - - Ok(()) - } - - /// We need to be mutable here, as that's the signature of the trait. - async fn close(&mut self) -> Result<B3Digest, Error> { - if self.closed { - return Err(Error::StorageError("already closed".to_string())); - } - - match &self.last_directory_digest { - Some(last_digest) => { - self.closed = true; - Ok(last_digest.clone()) - } - None => Err(Error::InvalidRequest( - "no directories sent, can't show root digest".to_string(), - )), - } - } - - fn is_closed(&self) -> bool { - self.closed - } -} |