From 5f069a3eb8c3a089f1231bf4a618e4153736df96 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Wed, 20 Mar 2024 23:28:13 +0200 Subject: refactor(tvix/castore/directory): have SimplePutter use Validator This simplifies a bunch of code, and gets rid of some TODOs. Also, move it out of castore/utils, and into its own file. Change-Id: Ie63e05a6cdfb2a73e878cf7107f9172aed1cdf13 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11224 Tested-by: BuildkiteCI Reviewed-by: raitobezarius Autosubmit: flokli --- tvix/castore/src/directoryservice/utils.rs | 55 ------------------------------ 1 file changed, 55 deletions(-) (limited to 'tvix/castore/src/directoryservice/utils.rs') diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index 6fa1a9e5fda0..01c521076c9c 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -1,4 +1,3 @@ -use super::DirectoryPutter; use super::DirectoryService; use crate::proto; use crate::B3Digest; @@ -6,8 +5,6 @@ use crate::Error; use async_stream::stream; use futures::stream::BoxStream; use std::collections::{HashSet, VecDeque}; -use tonic::async_trait; -use tracing::instrument; use tracing::warn; /// Traverses a [proto::Directory] from the root to the children. @@ -83,55 +80,3 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( Box::pin(stream) } - -/// This is a simple implementation of a Directory uploader. -/// TODO: verify connectivity? Factor out these checks into generic helpers? -pub struct SimplePutter { - directory_service: DS, - last_directory_digest: Option, - closed: bool, -} - -impl SimplePutter { - pub fn new(directory_service: DS) -> Self { - Self { - directory_service, - closed: false, - last_directory_digest: None, - } - } -} - -#[async_trait] -impl DirectoryPutter for SimplePutter { - #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { - if self.closed { - return Err(Error::StorageError("already closed".to_string())); - } - - let digest = self.directory_service.put(directory).await?; - - // track the last directory digest - self.last_directory_digest = Some(digest); - - Ok(()) - } - - #[instrument(level = "trace", skip_all, ret, err)] - async fn close(&mut self) -> Result { - if self.closed { - return Err(Error::StorageError("already closed".to_string())); - } - - match &self.last_directory_digest { - Some(last_digest) => { - self.closed = true; - Ok(last_digest.clone()) - } - None => Err(Error::InvalidRequest( - "no directories sent, can't show root digest".to_string(), - )), - } - } -} -- cgit 1.4.1