diff options
Diffstat (limited to 'tvix/store/src/directoryservice/grpc.rs')
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 79 |
1 files changed, 41 insertions, 38 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index d4ac9fd925fd..46e19224c759 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -2,8 +2,7 @@ use std::collections::HashSet; use super::{DirectoryPutter, DirectoryService}; use crate::proto::{self, get_directory_request::ByWhat}; -use crate::Error; -use data_encoding::BASE64; +use crate::{B3Digest, Error}; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{transport::Channel, Status}; @@ -38,16 +37,16 @@ impl GRPCDirectoryService { impl DirectoryService for GRPCDirectoryService { type DirectoriesIterator = StreamIterator; - fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> { + fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); - let digest = digest.to_owned(); + let digest_as_vec = digest.to_vec(); let task = self.tokio_handle.spawn(async move { let mut s = grpc_client .get(proto::GetDirectoryRequest { recursive: false, - by_what: Some(ByWhat::Digest(digest.to_vec())), + by_what: Some(ByWhat::Digest(digest_as_vec)), }) .await? .into_inner(); @@ -56,6 +55,7 @@ impl DirectoryService for GRPCDirectoryService { s.message().await }); + let digest = digest.clone(); match self.tokio_handle.block_on(task)? { Ok(Some(directory)) => { // Validate the retrieved Directory indeed has the @@ -64,16 +64,14 @@ impl DirectoryService for GRPCDirectoryService { if actual_digest != digest { Err(crate::Error::StorageError(format!( "requested directory with digest {}, but got {}", - BASE64.encode(&digest), - BASE64.encode(&actual_digest) + digest, actual_digest ))) } else if let Err(e) = directory.validate() { // Validate the Directory itself is valid. warn!("directory failed validation: {}", e.to_string()); Err(crate::Error::StorageError(format!( "directory {} failed validation: {}", - BASE64.encode(&digest), - e, + digest, e, ))) } else { Ok(Some(directory)) @@ -85,7 +83,7 @@ impl DirectoryService for GRPCDirectoryService { } } - fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> { + fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { let mut grpc_client = self.grpc_client.clone(); let task = self @@ -93,29 +91,27 @@ impl DirectoryService for GRPCDirectoryService { .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); match self.tokio_handle.block_on(task)? { - Ok(put_directory_resp) => Ok(put_directory_resp - .into_inner() - .root_digest - .as_slice() - .try_into() - .map_err(|_| { - Error::StorageError("invalid root digest length in response".to_string()) - })?), + Ok(put_directory_resp) => Ok(B3Digest::from_vec( + put_directory_resp.into_inner().root_digest, + ) + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), Err(e) => Err(crate::Error::StorageError(e.to_string())), } } - #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))] - fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { let mut grpc_client = self.grpc_client.clone(); - let root_directory_digest = root_directory_digest.to_owned(); + let root_directory_digest_as_vec = root_directory_digest.to_vec(); let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> = self.tokio_handle.spawn(async move { let s = grpc_client .get(proto::GetDirectoryRequest { recursive: true, - by_what: Some(ByWhat::Digest(root_directory_digest.to_vec())), + by_what: Some(ByWhat::Digest(root_directory_digest_as_vec)), }) .await? .into_inner(); @@ -125,7 +121,11 @@ impl DirectoryService for GRPCDirectoryService { let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); - StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream) + StreamIterator::new( + self.tokio_handle.clone(), + root_directory_digest.clone(), + stream, + ) } type DirectoryPutter = GRPCPutter; @@ -159,22 +159,22 @@ pub struct StreamIterator { // A stream of [proto::Directory] stream: Streaming<proto::Directory>, // The Directory digests we received so far - received_directory_digests: HashSet<[u8; 32]>, + received_directory_digests: HashSet<B3Digest>, // The Directory digests we're still expecting to get sent. - expected_directory_digests: HashSet<[u8; 32]>, + expected_directory_digests: HashSet<B3Digest>, } impl StreamIterator { pub fn new( tokio_handle: tokio::runtime::Handle, - root_digest: &[u8; 32], + root_digest: B3Digest, stream: Streaming<proto::Directory>, ) -> Self { Self { tokio_handle, stream, received_directory_digests: HashSet::new(), - expected_directory_digests: HashSet::from([*root_digest]), + expected_directory_digests: HashSet::from([root_digest]), } } } @@ -190,7 +190,7 @@ impl Iterator for StreamIterator { if let Err(e) = directory.validate() { return Some(Err(crate::Error::StorageError(format!( "directory {} failed validation: {}", - BASE64.encode(&directory.digest()), + directory.digest(), e, )))); } @@ -204,16 +204,19 @@ impl Iterator for StreamIterator { // means it once was in expected_directory_digests) return Some(Err(crate::Error::StorageError(format!( "received unexpected directory {}", - BASE64.encode(&directory_digest) + directory_digest )))); } self.received_directory_digests.insert(directory_digest); // register all children in expected_directory_digests. - // We ran validate() above, so we know these digests must be correct. for child_directory in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + let child_directory_digest = + B3Digest::from_vec(child_directory.digest.clone()).unwrap(); + self.expected_directory_digests - .insert(child_directory.digest.clone().try_into().unwrap()); + .insert(child_directory_digest); } Some(Ok(directory)) @@ -294,7 +297,7 @@ impl DirectoryPutter for GRPCPutter { } /// Closes the stream for sending, and returns the value - fn close(&mut self) -> Result<[u8; 32], crate::Error> { + fn close(&mut self) -> Result<B3Digest, crate::Error> { // get self.rq, and replace it with None. // This ensures we can only close it once. match std::mem::take(&mut self.rq) { @@ -303,15 +306,15 @@ impl DirectoryPutter for GRPCPutter { // close directory_sender, so blocking on task will finish. drop(directory_sender); - Ok(self + let root_digest = self .tokio_handle .block_on(task)? .map_err(|e| Error::StorageError(e.to_string()))? - .root_digest - .try_into() - .map_err(|_| { - Error::StorageError("invalid root digest length in response".to_string()) - })?) + .root_digest; + + B3Digest::from_vec(root_digest).map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + }) } } } |