diff options
Diffstat (limited to 'tvix/castore/src/directoryservice/grpc.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 40 |
1 files changed, 14 insertions, 26 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 4dc3931ed410..2079514d2b79 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,8 +1,9 @@ use std::collections::HashSet; -use super::{DirectoryPutter, DirectoryService}; +use super::{Directory, DirectoryPutter, DirectoryService}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; +use crate::ValidateDirectoryError; use crate::{B3Digest, Error}; use async_stream::try_stream; use futures::stream::BoxStream; @@ -41,10 +42,7 @@ where T::Future: Send, { #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] - async fn get( - &self, - digest: &B3Digest, - ) -> Result<Option<crate::proto::Directory>, crate::Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); @@ -72,15 +70,10 @@ where "requested directory with digest {}, but got {}", digest, actual_digest ))) - } else if let Err(e) = directory.validate() { - // Validate the Directory itself is valid. - warn!("directory failed validation: {}", e.to_string()); - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - digest, e, - ))) } else { - Ok(Some(directory)) + Ok(Some(directory.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?)) } } Ok(None) => Ok(None), @@ -90,11 +83,11 @@ where } #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client .clone() - .put(tokio_stream::once(directory)) + .put(tokio_stream::once(proto::Directory::from(directory))) .await; match resp { @@ -113,7 +106,7 @@ where fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); @@ -135,14 +128,6 @@ where loop { match stream.message().await { Ok(Some(directory)) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))?; - } // validate we actually expected that directory, and move it from expected to received. let directory_digest = directory.digest(); let was_expected = expected_directory_digests.remove(&directory_digest); @@ -168,6 +153,9 @@ where .insert(child_directory_digest); } + let directory = directory.try_into() + .map_err(|e: ValidateDirectoryError| Error::StorageError(e.to_string()))?; + yield directory; }, Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => { @@ -279,11 +267,11 @@ pub struct GRPCPutter { #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { - if directory_sender.send(directory).is_err() { + if directory_sender.send(directory.into()).is_err() { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. |