about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice/grpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs23
1 files changed, 8 insertions, 15 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 3765dda78012..691cf8529d34 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -7,7 +7,7 @@ use tokio::task::JoinHandle;
 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
 use tokio_util::{
     io::{CopyToBytes, SinkWriter},
-    sync::{PollSendError, PollSender},
+    sync::PollSender,
 };
 use tonic::{async_trait, transport::Channel, Code, Status};
 use tracing::instrument;
@@ -79,10 +79,8 @@ impl BlobService for GRPCBlobService {
     }
 
     /// Returns a BlobWriter, that'll internally wrap each write in a
-    // [proto::BlobChunk], which is send to the gRPC server.
+    /// [proto::BlobChunk], which is send to the gRPC server.
     async fn open_write(&self) -> Box<dyn BlobWriter> {
-        let mut grpc_client = self.grpc_client.clone();
-
         // set up an mpsc channel passing around Bytes.
         let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
 
@@ -90,20 +88,15 @@ impl BlobService for GRPCBlobService {
         // [proto::BlobChunk], and a [ReceiverStream] is constructed.
         let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x });
 
-        // That receiver stream is used as a stream in the gRPC BlobService.put rpc call.
-        let task: JoinHandle<Result<_, Status>> =
-            tokio::spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) });
+        // spawn the gRPC put request, which will read from blobchunk_stream.
+        let task = tokio::spawn({
+            let mut grpc_client = self.grpc_client.clone();
+            async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) }
+        });
 
         // The tx part of the channel is converted to a sink of byte chunks.
-
-        // We need to make this a function pointer, not a closure.
-        fn convert_error(_: PollSendError<bytes::Bytes>) -> io::Error {
-            io::Error::from(io::ErrorKind::BrokenPipe)
-        }
-
         let sink = PollSender::new(tx)
-            .sink_map_err(convert_error as fn(PollSendError<bytes::Bytes>) -> io::Error);
-        // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item"
+            .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e));
 
         // … which is turned into an [tokio::io::AsyncWrite].
         let writer = SinkWriter::new(CopyToBytes::new(sink));