diff options
author | Florian Klink <flokli@flokli.de> | 2024-03-20T21·28+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-03-24T17·42+0000 |
commit | 5f069a3eb8c3a089f1231bf4a618e4153736df96 (patch) | |
tree | d68eef9c5dde6991483a331e50dcfb62ae8a0507 /tvix/castore/src/directoryservice/simple_putter.rs | |
parent | c92ef2df64f4013e72037cefb548f68d158488cc (diff) |
refactor(tvix/castore/directory): have SimplePutter use Validator r/7773
This simplifies a bunch of code, and gets rid of some TODOs. Also, move it out of castore/utils, and into its own file. Change-Id: Ie63e05a6cdfb2a73e878cf7107f9172aed1cdf13 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11224 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz> Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/castore/src/directoryservice/simple_putter.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/simple_putter.rs | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs new file mode 100644 index 000000000000..25617ebcac82 --- /dev/null +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -0,0 +1,75 @@ +use super::ClosureValidator; +use super::DirectoryPutter; +use super::DirectoryService; +use crate::proto; +use crate::B3Digest; +use crate::Error; +use tonic::async_trait; +use tracing::instrument; +use tracing::warn; + +/// This is an implementation of DirectoryPutter that simply +/// inserts individual Directory messages one by one, on close, after +/// they successfully validated. +pub struct SimplePutter<DS: DirectoryService> { + directory_service: DS, + + directory_validator: Option<ClosureValidator>, +} + +impl<DS: DirectoryService> SimplePutter<DS> { + pub fn new(directory_service: DS) -> Self { + Self { + directory_service, + directory_validator: Some(Default::default()), + } + } +} + +#[async_trait] +impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + match self.directory_validator { + None => return Err(Error::StorageError("already closed".to_string())), + Some(ref mut validator) => { + validator.add(directory)?; + } + } + + Ok(()) + } + + #[instrument(level = "trace", skip_all, ret, err)] + async fn close(&mut self) -> Result<B3Digest, Error> { + match self.directory_validator.take() { + None => Err(Error::InvalidRequest("already closed".to_string())), + Some(validator) => { + // retrieve the validated directories. + let directories = validator.finalize()?; + + // Get the root digest, which is at the end (cf. insertion order) + let root_digest = directories + .last() + .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))? + .digest(); + + // call an individual put for each directory and await the insertion. + for directory in directories { + let exp_digest = directory.digest(); + let actual_digest = self.directory_service.put(directory).await?; + + // ensure the digest the backend told us matches our expectations. + if exp_digest != actual_digest { + warn!(directory.digest_expected=%exp_digest, directory.digest_actual=%actual_digest, "unexpected digest"); + return Err(Error::StorageError( + "got unexpected digest from backend during put".into(), + )); + } + } + + Ok(root_digest) + } + } + } +} |