diff options
Diffstat (limited to 'tvix/castore/src/directoryservice/redb.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/redb.rs | 43 |
1 files changed, 19 insertions, 24 deletions
diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs index 51dc87f92574..d253df503bb3 100644 --- a/tvix/castore/src/directoryservice/redb.rs +++ b/tvix/castore/src/directoryservice/redb.rs @@ -5,15 +5,15 @@ use std::{path::PathBuf, sync::Arc}; use tonic::async_trait; use tracing::{instrument, warn}; +use super::{ + traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService, + LeavesToRootValidator, +}; use crate::{ composition::{CompositionContext, ServiceBuilder}, digests, proto, B3Digest, Error, }; -use super::{ - traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, -}; - const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = TableDefinition::new("directory"); @@ -69,7 +69,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { #[async_trait] impl DirectoryService for RedbDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.clone(); // Retrieves the protobuf-encoded Directory for the corresponding digest. @@ -107,13 +107,10 @@ impl DirectoryService for RedbDirectoryService { let directory = match proto::Directory::decode(&*directory_data.value()) { Ok(dir) => { // The returned Directory must be valid. - if let Err(e) = dir.validate() { + dir.try_into().map_err(|e| { warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - dir + Error::StorageError("Directory failed validation".to_string()) + })? } Err(e) => { warn!(err=%e, "failed to parse Directory"); @@ -125,26 +122,21 @@ impl DirectoryService for RedbDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // Validate the directory. - if let Err(e) = directory.validate() { - warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - // Store the directory in the table. let txn = db.begin_write()?; { let mut table = txn.open_table(DIRECTORY_TABLE)?; let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } txn.commit()?; @@ -158,7 +150,7 @@ impl DirectoryService for RedbDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single // redb transaction to avoid constantly closing and opening new transactions for the // database. @@ -185,7 +177,7 @@ pub struct RedbDirectoryPutter { #[async_trait] impl DirectoryPutter for RedbDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -228,7 +220,10 @@ impl DirectoryPutter for RedbDirectoryPutter { for directory in directories { let digest_as_array: [u8; digests::B3_LEN] = directory.digest().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } } |