From 5ad5a0da00622beb713fb85520821212416b02f9 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 2 Feb 2024 17:45:33 +0200 Subject: refactor(tvix/castore/grpc/blobsvc): inline stream_mapper This can be written without the additional function. Change-Id: Ib11c5d5254d3e44c8fa9661414835b0622eb1ac4 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10735 Reviewed-by: Connor Brewster Autosubmit: flokli Tested-by: BuildkiteCI --- tvix/castore/src/proto/grpc_blobservice_wrapper.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) (limited to 'tvix') diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs index f8c2341689..33f9a73ea4 100644 --- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -3,7 +3,6 @@ use core::pin::pin; use futures::{stream::BoxStream, TryFutureExt}; use std::{ collections::VecDeque, - io, ops::{Deref, DerefMut}, }; use tokio_stream::StreamExt; @@ -118,17 +117,9 @@ where .map_err(|_e| Status::invalid_argument("invalid digest length"))?; match self.blob_service.open_read(&req_digest).await { - Ok(Some(reader)) => { - fn stream_mapper( - x: Result, - ) -> Result { - match x { - Ok(bytes) => Ok(super::BlobChunk { data: bytes }), - Err(e) => Err(Status::from(e)), - } - } - - let chunks_stream = ReaderStream::new(reader).map(stream_mapper); + Ok(Some(r)) => { + let chunks_stream = + ReaderStream::new(r).map(|chunk| Ok(super::BlobChunk { data: chunk? })); Ok(Response::new(Box::pin(chunks_stream))) } Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), -- cgit 1.4.1