diff options
Diffstat (limited to 'tvix/castore/src/proto/grpc_blobservice_wrapper.rs')
-rw-r--r-- | tvix/castore/src/proto/grpc_blobservice_wrapper.rs | 15 |
1 files changed, 3 insertions, 12 deletions
diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs index f8c2341689c6..33f9a73ea431 100644 --- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -3,7 +3,6 @@ use core::pin::pin; use futures::{stream::BoxStream, TryFutureExt}; use std::{ collections::VecDeque, - io, ops::{Deref, DerefMut}, }; use tokio_stream::StreamExt; @@ -118,17 +117,9 @@ where .map_err(|_e| Status::invalid_argument("invalid digest length"))?; match self.blob_service.open_read(&req_digest).await { - Ok(Some(reader)) => { - fn stream_mapper( - x: Result<bytes::Bytes, io::Error>, - ) -> Result<super::BlobChunk, Status> { - match x { - Ok(bytes) => Ok(super::BlobChunk { data: bytes }), - Err(e) => Err(Status::from(e)), - } - } - - let chunks_stream = ReaderStream::new(reader).map(stream_mapper); + Ok(Some(r)) => { + let chunks_stream = + ReaderStream::new(r).map(|chunk| Ok(super::BlobChunk { data: chunk? })); Ok(Response::new(Box::pin(chunks_stream))) } Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), |