diff options
Diffstat (limited to 'tvix/store/src/blobservice/grpc.rs')
-rw-r--r-- | tvix/store/src/blobservice/grpc.rs | 25 |
1 files changed, 12 insertions, 13 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index a50c94faf5e9..9c07345839e3 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -1,4 +1,5 @@ -use data_encoding::BASE64; +use super::{BlobService, BlobWriter}; +use crate::{proto, B3Digest}; use futures::sink::{SinkExt, SinkMapErr}; use std::{collections::VecDeque, io}; use tokio::task::JoinHandle; @@ -10,9 +11,6 @@ use tokio_util::{ use tonic::{transport::Channel, Code, Status, Streaming}; use tracing::instrument; -use super::{BlobService, BlobWriter}; -use crate::proto; - /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] pub struct GRPCBlobService { @@ -30,11 +28,11 @@ impl BlobService for GRPCBlobService { type BlobReader = Box<dyn io::Read + Send>; type BlobWriter = GRPCBlobWriter; - #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))] - fn has(&self, digest: &[u8; 32]) -> Result<bool, crate::Error> { + #[instrument(skip(self, digest), fields(blob.digest=%digest))] + fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); - let digest = digest.to_owned(); + let digest = digest.clone(); let task: tokio::task::JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move { @@ -56,10 +54,10 @@ impl BlobService for GRPCBlobService { // On success, this returns a Ok(Some(io::Read)), which can be used to read // the contents of the Blob, identified by the digest. - fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, crate::Error> { + fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); - let digest = digest.to_owned(); + let digest = digest.clone(); // Construct the task that'll send out the request and return the stream // the gRPC client should use to send [proto::BlobChunk], or an error if @@ -68,7 +66,7 @@ impl BlobService for GRPCBlobService { self.tokio_handle.spawn(async move { let stream = grpc_client .read(proto::ReadBlobRequest { - digest: digest.into(), + digest: digest.to_vec(), }) .await? .into_inner(); @@ -164,7 +162,7 @@ pub struct GRPCBlobWriter { } impl BlobWriter for GRPCBlobWriter { - fn close(mut self) -> Result<[u8; 32], crate::Error> { + fn close(mut self) -> Result<B3Digest, crate::Error> { // invoke shutdown, so the inner writer closes its internal tx side of // the channel. self.inner_writer @@ -177,8 +175,9 @@ impl BlobWriter for GRPCBlobWriter { match self.tokio_handle.block_on(self.task)? { Ok(resp) => { // return the digest from the response. - let digest: Vec<u8> = resp.digest; - Ok(digest.try_into().unwrap()) + B3Digest::from_vec(resp.digest).map_err(|_| { + crate::Error::StorageError("invalid root digest length in response".to_string()) + }) } Err(e) => Err(crate::Error::StorageError(e.to_string())), } |