diff options
Diffstat (limited to 'tvix/castore')
-rw-r--r-- | tvix/castore/src/directoryservice/memory.rs | 4 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 3 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/simple_putter.rs | 75 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/sled.rs | 4 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/utils.rs | 55 |
5 files changed, 82 insertions, 59 deletions
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index 528ffe2f2c03..2cbbbd1b1657 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -5,8 +5,8 @@ use std::sync::{Arc, RwLock}; use tonic::async_trait; use tracing::{instrument, warn}; -use super::utils::{traverse_directory, SimplePutter}; -use super::{DirectoryPutter, DirectoryService}; +use super::utils::traverse_directory; +use super::{DirectoryPutter, DirectoryService, SimplePutter}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index b6d1c2fcab13..f9d8e08b31e0 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -6,6 +6,7 @@ mod closure_validator; mod from_addr; mod grpc; mod memory; +mod simple_putter; mod sled; mod traverse; mod utils; @@ -14,8 +15,10 @@ pub use self::closure_validator::ClosureValidator; pub use self::from_addr::from_addr; pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; +pub use self::simple_putter::SimplePutter; pub use self::sled::SledDirectoryService; pub use self::traverse::descend_to; +pub use self::utils::traverse_directory; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs new file mode 100644 index 000000000000..25617ebcac82 --- /dev/null +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -0,0 +1,75 @@ +use super::ClosureValidator; +use super::DirectoryPutter; +use super::DirectoryService; +use crate::proto; +use crate::B3Digest; +use crate::Error; +use tonic::async_trait; +use tracing::instrument; +use tracing::warn; + +/// This is an implementation of DirectoryPutter that simply +/// inserts individual Directory messages one by one, on close, after +/// they successfully validated. +pub struct SimplePutter<DS: DirectoryService> { + directory_service: DS, + + directory_validator: Option<ClosureValidator>, +} + +impl<DS: DirectoryService> SimplePutter<DS> { + pub fn new(directory_service: DS) -> Self { + Self { + directory_service, + directory_validator: Some(Default::default()), + } + } +} + +#[async_trait] +impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + match self.directory_validator { + None => return Err(Error::StorageError("already closed".to_string())), + Some(ref mut validator) => { + validator.add(directory)?; + } + } + + Ok(()) + } + + #[instrument(level = "trace", skip_all, ret, err)] + async fn close(&mut self) -> Result<B3Digest, Error> { + match self.directory_validator.take() { + None => Err(Error::InvalidRequest("already closed".to_string())), + Some(validator) => { + // retrieve the validated directories. + let directories = validator.finalize()?; + + // Get the root digest, which is at the end (cf. insertion order) + let root_digest = directories + .last() + .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))? + .digest(); + + // call an individual put for each directory and await the insertion. + for directory in directories { + let exp_digest = directory.digest(); + let actual_digest = self.directory_service.put(directory).await?; + + // ensure the digest the backend told us matches our expectations. + if exp_digest != actual_digest { + warn!(directory.digest_expected=%exp_digest, directory.digest_actual=%actual_digest, "unexpected digest"); + return Err(Error::StorageError( + "got unexpected digest from backend during put".into(), + )); + } + } + + Ok(root_digest) + } + } + } +} diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 9acd3854184b..f41c5f718872 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -7,8 +7,8 @@ use std::path::Path; use tonic::async_trait; use tracing::{instrument, warn}; -use super::utils::{traverse_directory, SimplePutter}; -use super::DirectoryService; +use super::utils::traverse_directory; +use super::{DirectoryService, SimplePutter}; #[derive(Clone)] pub struct SledDirectoryService { diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index 6fa1a9e5fda0..01c521076c9c 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -1,4 +1,3 @@ -use super::DirectoryPutter; use super::DirectoryService; use crate::proto; use crate::B3Digest; @@ -6,8 +5,6 @@ use crate::Error; use async_stream::stream; use futures::stream::BoxStream; use std::collections::{HashSet, VecDeque}; -use tonic::async_trait; -use tracing::instrument; use tracing::warn; /// Traverses a [proto::Directory] from the root to the children. @@ -83,55 +80,3 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( Box::pin(stream) } - -/// This is a simple implementation of a Directory uploader. -/// TODO: verify connectivity? Factor out these checks into generic helpers? -pub struct SimplePutter<DS: DirectoryService> { - directory_service: DS, - last_directory_digest: Option<B3Digest>, - closed: bool, -} - -impl<DS: DirectoryService> SimplePutter<DS> { - pub fn new(directory_service: DS) -> Self { - Self { - directory_service, - closed: false, - last_directory_digest: None, - } - } -} - -#[async_trait] -impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { - #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { - if self.closed { - return Err(Error::StorageError("already closed".to_string())); - } - - let digest = self.directory_service.put(directory).await?; - - // track the last directory digest - self.last_directory_digest = Some(digest); - - Ok(()) - } - - #[instrument(level = "trace", skip_all, ret, err)] - async fn close(&mut self) -> Result<B3Digest, Error> { - if self.closed { - return Err(Error::StorageError("already closed".to_string())); - } - - match &self.last_directory_digest { - Some(last_digest) => { - self.closed = true; - Ok(last_digest.clone()) - } - None => Err(Error::InvalidRequest( - "no directories sent, can't show root digest".to_string(), - )), - } - } -} |