about summary refs log tree commit diff
path: root/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/proto/grpc_blobservice_wrapper.rs')
-rw-r--r--tvix/castore/src/proto/grpc_blobservice_wrapper.rs15
1 files changed, 3 insertions, 12 deletions
diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
index f8c2341689..33f9a73ea4 100644
--- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
@@ -3,7 +3,6 @@ use core::pin::pin;
 use futures::{stream::BoxStream, TryFutureExt};
 use std::{
     collections::VecDeque,
-    io,
     ops::{Deref, DerefMut},
 };
 use tokio_stream::StreamExt;
@@ -118,17 +117,9 @@ where
             .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
 
         match self.blob_service.open_read(&req_digest).await {
-            Ok(Some(reader)) => {
-                fn stream_mapper(
-                    x: Result<bytes::Bytes, io::Error>,
-                ) -> Result<super::BlobChunk, Status> {
-                    match x {
-                        Ok(bytes) => Ok(super::BlobChunk { data: bytes }),
-                        Err(e) => Err(Status::from(e)),
-                    }
-                }
-
-                let chunks_stream = ReaderStream::new(reader).map(stream_mapper);
+            Ok(Some(r)) => {
+                let chunks_stream =
+                    ReaderStream::new(r).map(|chunk| Ok(super::BlobChunk { data: chunk? }));
                 Ok(Response::new(Box::pin(chunks_stream)))
             }
             Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),