diff options
author | Florian Klink <flokli@flokli.de> | 2024-03-20T20·00+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-03-24T19·56+0000 |
commit | 21fcc1c9df2562a6cf39b2a0bf9e1e97fb14f711 (patch) | |
tree | 368d4b5892b12b4f625cd47a18c2a9f25438838f /tvix/castore/src/directoryservice/sled.rs | |
parent | f7281d8fd52e7c921e03531d7189aee8b1635bd4 (diff) |
feat(tvix/castore/directory): add SledDirectoryPutter r/7775
This uses DirectoryClosureValidator for validation and the sled batch API to insert multiple directories at once. Change-Id: I2d6dc513ccbc02e638f8d22173da5463e73182ee Reviewed-on: https://cl.tvl.fyi/c/depot/+/11222 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/castore/src/directoryservice/sled.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/sled.rs | 62 |
1 files changed, 59 insertions, 3 deletions
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index f41c5f718872..e4a4c2bbed78 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -1,14 +1,14 @@ -use crate::directoryservice::DirectoryPutter; use crate::proto::Directory; use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use prost::Message; +use std::ops::Deref; use std::path::Path; use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryService, SimplePutter}; +use super::{ClosureValidator, DirectoryPutter, DirectoryService}; #[derive(Clone)] pub struct SledDirectoryService { @@ -107,6 +107,62 @@ impl DirectoryService for SledDirectoryService { where Self: Clone, { - Box::new(SimplePutter::new(self.clone())) + Box::new(SledDirectoryPutter { + tree: self.db.deref().clone(), + directory_validator: Some(Default::default()), + }) + } +} + +/// Buffers Directory messages to be uploaded and inserts them in a batch +/// transaction on close. +pub struct SledDirectoryPutter { + tree: sled::Tree, + + /// The directories (inside the directory validator) that we insert later, + /// or None, if they were already inserted. + directory_validator: Option<ClosureValidator>, +} + +#[async_trait] +impl DirectoryPutter for SledDirectoryPutter { + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + match self.directory_validator { + None => return Err(Error::StorageError("already closed".to_string())), + Some(ref mut validator) => { + validator.add(directory)?; + } + } + + Ok(()) + } + + #[instrument(level = "trace", skip_all, ret, err)] + async fn close(&mut self) -> Result<B3Digest, Error> { + match self.directory_validator.take() { + None => Err(Error::InvalidRequest("already closed".to_string())), + Some(validator) => { + // retrieve the validated directories. + let directories = validator.finalize()?; + + // Get the root digest, which is at the end (cf. insertion order) + let root_digest = directories + .last() + .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))? + .digest(); + + let mut batch = sled::Batch::default(); + for directory in directories { + batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); + } + + self.tree + .apply_batch(batch) + .map_err(|e| Error::StorageError(format!("unable to apply batch: {}", e)))?; + + Ok(root_digest) + } + } } } |