about summary refs log tree commit diff
path: root/tvix/store/src/blobservice/grpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/blobservice/grpc.rs')
-rw-r--r--tvix/store/src/blobservice/grpc.rs25
1 files changed, 12 insertions, 13 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
index a50c94faf5..9c07345839 100644
--- a/tvix/store/src/blobservice/grpc.rs
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -1,4 +1,5 @@
-use data_encoding::BASE64;
+use super::{BlobService, BlobWriter};
+use crate::{proto, B3Digest};
 use futures::sink::{SinkExt, SinkMapErr};
 use std::{collections::VecDeque, io};
 use tokio::task::JoinHandle;
@@ -10,9 +11,6 @@ use tokio_util::{
 use tonic::{transport::Channel, Code, Status, Streaming};
 use tracing::instrument;
 
-use super::{BlobService, BlobWriter};
-use crate::proto;
-
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
 pub struct GRPCBlobService {
@@ -30,11 +28,11 @@ impl BlobService for GRPCBlobService {
     type BlobReader = Box<dyn io::Read + Send>;
     type BlobWriter = GRPCBlobWriter;
 
-    #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, crate::Error> {
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.to_owned();
+        let digest = digest.clone();
 
         let task: tokio::task::JoinHandle<Result<_, Status>> =
             self.tokio_handle.spawn(async move {
@@ -56,10 +54,10 @@ impl BlobService for GRPCBlobService {
 
     // On success, this returns a Ok(Some(io::Read)), which can be used to read
     // the contents of the Blob, identified by the digest.
-    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, crate::Error> {
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.to_owned();
+        let digest = digest.clone();
 
         // Construct the task that'll send out the request and return the stream
         // the gRPC client should use to send [proto::BlobChunk], or an error if
@@ -68,7 +66,7 @@ impl BlobService for GRPCBlobService {
             self.tokio_handle.spawn(async move {
                 let stream = grpc_client
                     .read(proto::ReadBlobRequest {
-                        digest: digest.into(),
+                        digest: digest.to_vec(),
                     })
                     .await?
                     .into_inner();
@@ -164,7 +162,7 @@ pub struct GRPCBlobWriter {
 }
 
 impl BlobWriter for GRPCBlobWriter {
-    fn close(mut self) -> Result<[u8; 32], crate::Error> {
+    fn close(mut self) -> Result<B3Digest, crate::Error> {
         // invoke shutdown, so the inner writer closes its internal tx side of
         // the channel.
         self.inner_writer
@@ -177,8 +175,9 @@ impl BlobWriter for GRPCBlobWriter {
         match self.tokio_handle.block_on(self.task)? {
             Ok(resp) => {
                 // return the digest from the response.
-                let digest: Vec<u8> = resp.digest;
-                Ok(digest.try_into().unwrap())
+                B3Digest::from_vec(resp.digest).map_err(|_| {
+                    crate::Error::StorageError("invalid root digest length in response".to_string())
+                })
             }
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
         }