about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice/grpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/directoryservice/grpc.rs')
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs40
1 files changed, 14 insertions, 26 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index 4dc3931ed410..2079514d2b79 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -1,8 +1,9 @@
 use std::collections::HashSet;
 
-use super::{DirectoryPutter, DirectoryService};
+use super::{Directory, DirectoryPutter, DirectoryService};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::proto::{self, get_directory_request::ByWhat};
+use crate::ValidateDirectoryError;
 use crate::{B3Digest, Error};
 use async_stream::try_stream;
 use futures::stream::BoxStream;
@@ -41,10 +42,7 @@ where
     T::Future: Send,
 {
     #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))]
-    async fn get(
-        &self,
-        digest: &B3Digest,
-    ) -> Result<Option<crate::proto::Directory>, crate::Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
         let digest_cpy = digest.clone();
@@ -72,15 +70,10 @@ where
                         "requested directory with digest {}, but got {}",
                         digest, actual_digest
                     )))
-                } else if let Err(e) = directory.validate() {
-                    // Validate the Directory itself is valid.
-                    warn!("directory failed validation: {}", e.to_string());
-                    Err(crate::Error::StorageError(format!(
-                        "directory {} failed validation: {}",
-                        digest, e,
-                    )))
                 } else {
-                    Ok(Some(directory))
+                    Ok(Some(directory.try_into().map_err(|_| {
+                        Error::StorageError("invalid root digest length in response".to_string())
+                    })?))
                 }
             }
             Ok(None) => Ok(None),
@@ -90,11 +83,11 @@ where
     }
 
     #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> {
         let resp = self
             .grpc_client
             .clone()
-            .put(tokio_stream::once(directory))
+            .put(tokio_stream::once(proto::Directory::from(directory)))
             .await;
 
         match resp {
@@ -113,7 +106,7 @@ where
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         let mut grpc_client = self.grpc_client.clone();
         let root_directory_digest = root_directory_digest.clone();
 
@@ -135,14 +128,6 @@ where
             loop {
                 match stream.message().await {
                     Ok(Some(directory)) => {
-                        // validate the directory itself.
-                        if let Err(e) = directory.validate() {
-                            Err(crate::Error::StorageError(format!(
-                                "directory {} failed validation: {}",
-                                directory.digest(),
-                                e,
-                            )))?;
-                        }
                         // validate we actually expected that directory, and move it from expected to received.
                         let directory_digest = directory.digest();
                         let was_expected = expected_directory_digests.remove(&directory_digest);
@@ -168,6 +153,9 @@ where
                                 .insert(child_directory_digest);
                         }
 
+                        let directory = directory.try_into()
+                            .map_err(|e: ValidateDirectoryError| Error::StorageError(e.to_string()))?;
+
                         yield directory;
                     },
                     Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => {
@@ -279,11 +267,11 @@ pub struct GRPCPutter {
 #[async_trait]
 impl DirectoryPutter for GRPCPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> {
         match self.rq {
             // If we're not already closed, send the directory to directory_sender.
             Some((_, ref directory_sender)) => {
-                if directory_sender.send(directory).is_err() {
+                if directory_sender.send(directory.into()).is_err() {
                     // If the channel has been prematurely closed, invoke close (so we can peek at the error code)
                     // That error code is much more helpful, because it
                     // contains the error message from the server.