diff options
author | Florian Klink <flokli@flokli.de> | 2024-01-09T12·37+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-01-09T14·13+0000 |
commit | b1c556b7e190035e63d7ddf142b7517c2425f806 (patch) | |
tree | 91aa865a8562186a988c72d50bbfa9fdf69ef347 /tvix/castore/src/blobservice | |
parent | 89882ff9b13ff1c25fc64605e3fc87ae7b9ab877 (diff) |
refactor(tvix/castore/blobservice/grpc): remove fn pointer hack r/7360
It looks like the workaround isn't necessary anymore. Change-Id: Ifbcef1d631b3f369cac3db25a2c793480043f697 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10583 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 23 |
1 files changed, 8 insertions, 15 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 3765dda78012..691cf8529d34 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -7,7 +7,7 @@ use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_util::{ io::{CopyToBytes, SinkWriter}, - sync::{PollSendError, PollSender}, + sync::PollSender, }; use tonic::{async_trait, transport::Channel, Code, Status}; use tracing::instrument; @@ -79,10 +79,8 @@ impl BlobService for GRPCBlobService { } /// Returns a BlobWriter, that'll internally wrap each write in a - // [proto::BlobChunk], which is send to the gRPC server. + /// [proto::BlobChunk], which is send to the gRPC server. async fn open_write(&self) -> Box<dyn BlobWriter> { - let mut grpc_client = self.grpc_client.clone(); - // set up an mpsc channel passing around Bytes. let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); @@ -90,20 +88,15 @@ impl BlobService for GRPCBlobService { // [proto::BlobChunk], and a [ReceiverStream] is constructed. let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); - // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. - let task: JoinHandle<Result<_, Status>> = - tokio::spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); + // spawn the gRPC put request, which will read from blobchunk_stream. + let task = tokio::spawn({ + let mut grpc_client = self.grpc_client.clone(); + async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } + }); // The tx part of the channel is converted to a sink of byte chunks. - - // We need to make this a function pointer, not a closure. - fn convert_error(_: PollSendError<bytes::Bytes>) -> io::Error { - io::Error::from(io::ErrorKind::BrokenPipe) - } - let sink = PollSender::new(tx) - .sink_map_err(convert_error as fn(PollSendError<bytes::Bytes>) -> io::Error); - // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item" + .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)); // … which is turned into an [tokio::io::AsyncWrite]. let writer = SinkWriter::new(CopyToBytes::new(sink)); |