From b1c556b7e190035e63d7ddf142b7517c2425f806 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 9 Jan 2024 14:37:15 +0200 Subject: refactor(tvix/castore/blobservice/grpc): remove fn pointer hack It looks like the workaround isn't necessary anymore. Change-Id: Ifbcef1d631b3f369cac3db25a2c793480043f697 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10583 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/castore/src/blobservice/grpc.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) (limited to 'tvix/castore') diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 3765dda78012..691cf8529d34 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -7,7 +7,7 @@ use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_util::{ io::{CopyToBytes, SinkWriter}, - sync::{PollSendError, PollSender}, + sync::PollSender, }; use tonic::{async_trait, transport::Channel, Code, Status}; use tracing::instrument; @@ -79,10 +79,8 @@ impl BlobService for GRPCBlobService { } /// Returns a BlobWriter, that'll internally wrap each write in a - // [proto::BlobChunk], which is send to the gRPC server. + /// [proto::BlobChunk], which is send to the gRPC server. async fn open_write(&self) -> Box { - let mut grpc_client = self.grpc_client.clone(); - // set up an mpsc channel passing around Bytes. let (tx, rx) = tokio::sync::mpsc::channel::(10); @@ -90,20 +88,15 @@ impl BlobService for GRPCBlobService { // [proto::BlobChunk], and a [ReceiverStream] is constructed. let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); - // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. - let task: JoinHandle> = - tokio::spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); + // spawn the gRPC put request, which will read from blobchunk_stream. + let task = tokio::spawn({ + let mut grpc_client = self.grpc_client.clone(); + async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } + }); // The tx part of the channel is converted to a sink of byte chunks. - - // We need to make this a function pointer, not a closure. - fn convert_error(_: PollSendError) -> io::Error { - io::Error::from(io::ErrorKind::BrokenPipe) - } - let sink = PollSender::new(tx) - .sink_map_err(convert_error as fn(PollSendError) -> io::Error); - // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item" + .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)); // … which is turned into an [tokio::io::AsyncWrite]. let writer = SinkWriter::new(CopyToBytes::new(sink)); -- cgit 1.4.1