about summary refs log tree commit diff
path: root/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-02-02T15·45+0200
committerclbot <clbot@tvl.fyi>2024-02-02T16·25+0000
commit5ad5a0da00622beb713fb85520821212416b02f9 (patch)
tree82a164f06bdf06ddf69507052fbca20172b91314 /tvix/castore/src/proto/grpc_blobservice_wrapper.rs
parent1157eea71045ad0e7d40946a784b5e7d190671ce (diff)
refactor(tvix/castore/grpc/blobsvc): inline stream_mapper r/7469
This can be written without the additional function.

Change-Id: Ib11c5d5254d3e44c8fa9661414835b0622eb1ac4
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10735
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/proto/grpc_blobservice_wrapper.rs')
-rw-r--r--tvix/castore/src/proto/grpc_blobservice_wrapper.rs15
1 files changed, 3 insertions, 12 deletions
diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
index f8c2341689c6..33f9a73ea431 100644
--- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
@@ -3,7 +3,6 @@ use core::pin::pin;
 use futures::{stream::BoxStream, TryFutureExt};
 use std::{
     collections::VecDeque,
-    io,
     ops::{Deref, DerefMut},
 };
 use tokio_stream::StreamExt;
@@ -118,17 +117,9 @@ where
             .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
 
         match self.blob_service.open_read(&req_digest).await {
-            Ok(Some(reader)) => {
-                fn stream_mapper(
-                    x: Result<bytes::Bytes, io::Error>,
-                ) -> Result<super::BlobChunk, Status> {
-                    match x {
-                        Ok(bytes) => Ok(super::BlobChunk { data: bytes }),
-                        Err(e) => Err(Status::from(e)),
-                    }
-                }
-
-                let chunks_stream = ReaderStream::new(reader).map(stream_mapper);
+            Ok(Some(r)) => {
+                let chunks_stream =
+                    ReaderStream::new(r).map(|chunk| Ok(super::BlobChunk { data: chunk? }));
                 Ok(Response::new(Box::pin(chunks_stream)))
             }
             Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),