diff options
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 23 |
1 files changed, 8 insertions, 15 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 3765dda78012..691cf8529d34 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -7,7 +7,7 @@ use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_util::{ io::{CopyToBytes, SinkWriter}, - sync::{PollSendError, PollSender}, + sync::PollSender, }; use tonic::{async_trait, transport::Channel, Code, Status}; use tracing::instrument; @@ -79,10 +79,8 @@ impl BlobService for GRPCBlobService { } /// Returns a BlobWriter, that'll internally wrap each write in a - // [proto::BlobChunk], which is send to the gRPC server. + /// [proto::BlobChunk], which is send to the gRPC server. async fn open_write(&self) -> Box<dyn BlobWriter> { - let mut grpc_client = self.grpc_client.clone(); - // set up an mpsc channel passing around Bytes. let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); @@ -90,20 +88,15 @@ impl BlobService for GRPCBlobService { // [proto::BlobChunk], and a [ReceiverStream] is constructed. let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); - // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. - let task: JoinHandle<Result<_, Status>> = - tokio::spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); + // spawn the gRPC put request, which will read from blobchunk_stream. + let task = tokio::spawn({ + let mut grpc_client = self.grpc_client.clone(); + async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } + }); // The tx part of the channel is converted to a sink of byte chunks. - - // We need to make this a function pointer, not a closure. - fn convert_error(_: PollSendError<bytes::Bytes>) -> io::Error { - io::Error::from(io::ErrorKind::BrokenPipe) - } - let sink = PollSender::new(tx) - .sink_map_err(convert_error as fn(PollSendError<bytes::Bytes>) -> io::Error); - // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item" + .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)); // … which is turned into an [tokio::io::AsyncWrite]. let writer = SinkWriter::new(CopyToBytes::new(sink)); |