about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice/grpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/directoryservice/grpc.rs')
-rw-r--r--tvix/store/src/directoryservice/grpc.rs79
1 files changed, 41 insertions, 38 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs
index d4ac9fd925fd..46e19224c759 100644
--- a/tvix/store/src/directoryservice/grpc.rs
+++ b/tvix/store/src/directoryservice/grpc.rs
@@ -2,8 +2,7 @@ use std::collections::HashSet;
 
 use super::{DirectoryPutter, DirectoryService};
 use crate::proto::{self, get_directory_request::ByWhat};
-use crate::Error;
-use data_encoding::BASE64;
+use crate::{B3Digest, Error};
 use tokio::sync::mpsc::UnboundedSender;
 use tokio_stream::wrappers::UnboundedReceiverStream;
 use tonic::{transport::Channel, Status};
@@ -38,16 +37,16 @@ impl GRPCDirectoryService {
 impl DirectoryService for GRPCDirectoryService {
     type DirectoriesIterator = StreamIterator;
 
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> {
+    fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.to_owned();
 
+        let digest_as_vec = digest.to_vec();
         let task = self.tokio_handle.spawn(async move {
             let mut s = grpc_client
                 .get(proto::GetDirectoryRequest {
                     recursive: false,
-                    by_what: Some(ByWhat::Digest(digest.to_vec())),
+                    by_what: Some(ByWhat::Digest(digest_as_vec)),
                 })
                 .await?
                 .into_inner();
@@ -56,6 +55,7 @@ impl DirectoryService for GRPCDirectoryService {
             s.message().await
         });
 
+        let digest = digest.clone();
         match self.tokio_handle.block_on(task)? {
             Ok(Some(directory)) => {
                 // Validate the retrieved Directory indeed has the
@@ -64,16 +64,14 @@ impl DirectoryService for GRPCDirectoryService {
                 if actual_digest != digest {
                     Err(crate::Error::StorageError(format!(
                         "requested directory with digest {}, but got {}",
-                        BASE64.encode(&digest),
-                        BASE64.encode(&actual_digest)
+                        digest, actual_digest
                     )))
                 } else if let Err(e) = directory.validate() {
                     // Validate the Directory itself is valid.
                     warn!("directory failed validation: {}", e.to_string());
                     Err(crate::Error::StorageError(format!(
                         "directory {} failed validation: {}",
-                        BASE64.encode(&digest),
-                        e,
+                        digest, e,
                     )))
                 } else {
                     Ok(Some(directory))
@@ -85,7 +83,7 @@ impl DirectoryService for GRPCDirectoryService {
         }
     }
 
-    fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> {
+    fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
         let mut grpc_client = self.grpc_client.clone();
 
         let task = self
@@ -93,29 +91,27 @@ impl DirectoryService for GRPCDirectoryService {
             .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
 
         match self.tokio_handle.block_on(task)? {
-            Ok(put_directory_resp) => Ok(put_directory_resp
-                .into_inner()
-                .root_digest
-                .as_slice()
-                .try_into()
-                .map_err(|_| {
-                    Error::StorageError("invalid root digest length in response".to_string())
-                })?),
+            Ok(put_directory_resp) => Ok(B3Digest::from_vec(
+                put_directory_resp.into_inner().root_digest,
+            )
+            .map_err(|_| {
+                Error::StorageError("invalid root digest length in response".to_string())
+            })?),
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
         }
     }
 
-    #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))]
-    fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator {
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator {
         let mut grpc_client = self.grpc_client.clone();
-        let root_directory_digest = root_directory_digest.to_owned();
 
+        let root_directory_digest_as_vec = root_directory_digest.to_vec();
         let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> =
             self.tokio_handle.spawn(async move {
                 let s = grpc_client
                     .get(proto::GetDirectoryRequest {
                         recursive: true,
-                        by_what: Some(ByWhat::Digest(root_directory_digest.to_vec())),
+                        by_what: Some(ByWhat::Digest(root_directory_digest_as_vec)),
                     })
                     .await?
                     .into_inner();
@@ -125,7 +121,11 @@ impl DirectoryService for GRPCDirectoryService {
 
         let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
 
-        StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream)
+        StreamIterator::new(
+            self.tokio_handle.clone(),
+            root_directory_digest.clone(),
+            stream,
+        )
     }
 
     type DirectoryPutter = GRPCPutter;
@@ -159,22 +159,22 @@ pub struct StreamIterator {
     // A stream of [proto::Directory]
     stream: Streaming<proto::Directory>,
     // The Directory digests we received so far
-    received_directory_digests: HashSet<[u8; 32]>,
+    received_directory_digests: HashSet<B3Digest>,
     // The Directory digests we're still expecting to get sent.
-    expected_directory_digests: HashSet<[u8; 32]>,
+    expected_directory_digests: HashSet<B3Digest>,
 }
 
 impl StreamIterator {
     pub fn new(
         tokio_handle: tokio::runtime::Handle,
-        root_digest: &[u8; 32],
+        root_digest: B3Digest,
         stream: Streaming<proto::Directory>,
     ) -> Self {
         Self {
             tokio_handle,
             stream,
             received_directory_digests: HashSet::new(),
-            expected_directory_digests: HashSet::from([*root_digest]),
+            expected_directory_digests: HashSet::from([root_digest]),
         }
     }
 }
@@ -190,7 +190,7 @@ impl Iterator for StreamIterator {
                     if let Err(e) = directory.validate() {
                         return Some(Err(crate::Error::StorageError(format!(
                             "directory {} failed validation: {}",
-                            BASE64.encode(&directory.digest()),
+                            directory.digest(),
                             e,
                         ))));
                     }
@@ -204,16 +204,19 @@ impl Iterator for StreamIterator {
                         // means it once was in expected_directory_digests)
                         return Some(Err(crate::Error::StorageError(format!(
                             "received unexpected directory {}",
-                            BASE64.encode(&directory_digest)
+                            directory_digest
                         ))));
                     }
                     self.received_directory_digests.insert(directory_digest);
 
                     // register all children in expected_directory_digests.
-                    // We ran validate() above, so we know these digests must be correct.
                     for child_directory in &directory.directories {
+                        // We ran validate() above, so we know these digests must be correct.
+                        let child_directory_digest =
+                            B3Digest::from_vec(child_directory.digest.clone()).unwrap();
+
                         self.expected_directory_digests
-                            .insert(child_directory.digest.clone().try_into().unwrap());
+                            .insert(child_directory_digest);
                     }
 
                     Some(Ok(directory))
@@ -294,7 +297,7 @@ impl DirectoryPutter for GRPCPutter {
     }
 
     /// Closes the stream for sending, and returns the value
-    fn close(&mut self) -> Result<[u8; 32], crate::Error> {
+    fn close(&mut self) -> Result<B3Digest, crate::Error> {
         // get self.rq, and replace it with None.
         // This ensures we can only close it once.
         match std::mem::take(&mut self.rq) {
@@ -303,15 +306,15 @@ impl DirectoryPutter for GRPCPutter {
                 // close directory_sender, so blocking on task will finish.
                 drop(directory_sender);
 
-                Ok(self
+                let root_digest = self
                     .tokio_handle
                     .block_on(task)?
                     .map_err(|e| Error::StorageError(e.to_string()))?
-                    .root_digest
-                    .try_into()
-                    .map_err(|_| {
-                        Error::StorageError("invalid root digest length in response".to_string())
-                    })?)
+                    .root_digest;
+
+                B3Digest::from_vec(root_digest).map_err(|_| {
+                    Error::StorageError("invalid root digest length in response".to_string())
+                })
             }
         }
     }