diff options
Diffstat (limited to 'tvix/castore/src/directoryservice/sled.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/sled.rs | 46 |
1 files changed, 20 insertions, 26 deletions
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 5766dec1a5c2..4f3a860d14e4 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -1,5 +1,3 @@ -use crate::proto::Directory; -use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use prost::Message; use std::ops::Deref; @@ -9,8 +7,9 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; +use super::{Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{proto, B3Digest, Error}; #[derive(Clone)] pub struct SledDirectoryService { @@ -44,7 +43,7 @@ impl SledDirectoryService { #[async_trait] impl DirectoryService for SledDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let resp = tokio::task::spawn_blocking({ let db = self.db.clone(); let digest = digest.clone(); @@ -61,7 +60,7 @@ impl DirectoryService for SledDirectoryService { None => Ok(None), // The directory was found, try to parse the data as Directory message - Some(data) => match Directory::decode(&*data) { + Some(data) => match proto::Directory::decode(&*data) { Ok(directory) => { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. @@ -73,14 +72,10 @@ impl DirectoryService for SledDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } + let directory = directory.try_into().map_err(|e| { + warn!("failed to retrieve directory: {}", e); + Error::StorageError(format!("failed to retrieve directory: {}", e)) + })?; Ok(Some(directory)) } @@ -93,22 +88,18 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } // store it - db.insert(digest.as_slice(), directory.encode_to_vec()) - .map_err(|e| Error::StorageError(e.to_string()))?; + db.insert( + digest.as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ) + .map_err(|e| Error::StorageError(e.to_string()))?; Ok(digest) } @@ -120,7 +111,7 @@ impl DirectoryService for SledDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } @@ -215,7 +206,7 @@ pub struct SledDirectoryPutter { #[async_trait] impl DirectoryPutter for SledDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -252,7 +243,10 @@ impl DirectoryPutter for SledDirectoryPutter { let mut batch = sled::Batch::default(); for directory in directories { - batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); + batch.insert( + directory.digest().as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ); } tree.apply_batch(batch).map_err(|e| { |