about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice/sled.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/directoryservice/sled.rs')
-rw-r--r--tvix/castore/src/directoryservice/sled.rs46
1 files changed, 20 insertions, 26 deletions
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
index 5766dec1a5c2..4f3a860d14e4 100644
--- a/tvix/castore/src/directoryservice/sled.rs
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -1,5 +1,3 @@
-use crate::proto::Directory;
-use crate::{proto, B3Digest, Error};
 use futures::stream::BoxStream;
 use prost::Message;
 use std::ops::Deref;
@@ -9,8 +7,9 @@ use tonic::async_trait;
 use tracing::{instrument, warn};
 
 use super::utils::traverse_directory;
-use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
+use super::{Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
 use crate::composition::{CompositionContext, ServiceBuilder};
+use crate::{proto, B3Digest, Error};
 
 #[derive(Clone)]
 pub struct SledDirectoryService {
@@ -44,7 +43,7 @@ impl SledDirectoryService {
 #[async_trait]
 impl DirectoryService for SledDirectoryService {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let resp = tokio::task::spawn_blocking({
             let db = self.db.clone();
             let digest = digest.clone();
@@ -61,7 +60,7 @@ impl DirectoryService for SledDirectoryService {
             None => Ok(None),
 
             // The directory was found, try to parse the data as Directory message
-            Some(data) => match Directory::decode(&*data) {
+            Some(data) => match proto::Directory::decode(&*data) {
                 Ok(directory) => {
                     // Validate the retrieved Directory indeed has the
                     // digest we expect it to have, to detect corruptions.
@@ -73,14 +72,10 @@ impl DirectoryService for SledDirectoryService {
                         )));
                     }
 
-                    // Validate the Directory itself is valid.
-                    if let Err(e) = directory.validate() {
-                        warn!("directory failed validation: {}", e.to_string());
-                        return Err(Error::StorageError(format!(
-                            "directory {} failed validation: {}",
-                            actual_digest, e,
-                        )));
-                    }
+                    let directory = directory.try_into().map_err(|e| {
+                        warn!("failed to retrieve directory: {}", e);
+                        Error::StorageError(format!("failed to retrieve directory: {}", e))
+                    })?;
 
                     Ok(Some(directory))
                 }
@@ -93,22 +88,18 @@ impl DirectoryService for SledDirectoryService {
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         tokio::task::spawn_blocking({
             let db = self.db.clone();
             move || {
                 let digest = directory.digest();
 
-                // validate the directory itself.
-                if let Err(e) = directory.validate() {
-                    return Err(Error::InvalidRequest(format!(
-                        "directory {} failed validation: {}",
-                        digest, e,
-                    )));
-                }
                 // store it
-                db.insert(digest.as_slice(), directory.encode_to_vec())
-                    .map_err(|e| Error::StorageError(e.to_string()))?;
+                db.insert(
+                    digest.as_slice(),
+                    proto::Directory::from(directory).encode_to_vec(),
+                )
+                .map_err(|e| Error::StorageError(e.to_string()))?;
 
                 Ok(digest)
             }
@@ -120,7 +111,7 @@ impl DirectoryService for SledDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
@@ -215,7 +206,7 @@ pub struct SledDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for SledDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -252,7 +243,10 @@ impl DirectoryPutter for SledDirectoryPutter {
 
                         let mut batch = sled::Batch::default();
                         for directory in directories {
-                            batch.insert(directory.digest().as_slice(), directory.encode_to_vec());
+                            batch.insert(
+                                directory.digest().as_slice(),
+                                proto::Directory::from(directory).encode_to_vec(),
+                            );
                         }
 
                         tree.apply_batch(batch).map_err(|e| {