about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice/grpc.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-01-09T12·37+0200
committerclbot <clbot@tvl.fyi>2024-01-09T14·13+0000
commitb1c556b7e190035e63d7ddf142b7517c2425f806 (patch)
tree91aa865a8562186a988c72d50bbfa9fdf69ef347 /tvix/castore/src/blobservice/grpc.rs
parent89882ff9b13ff1c25fc64605e3fc87ae7b9ab877 (diff)
refactor(tvix/castore/blobservice/grpc): remove fn pointer hack r/7360
It looks like the workaround isn't necessary anymore.

Change-Id: Ifbcef1d631b3f369cac3db25a2c793480043f697
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10583
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs23
1 files changed, 8 insertions, 15 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 3765dda78012..691cf8529d34 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -7,7 +7,7 @@ use tokio::task::JoinHandle;
 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
 use tokio_util::{
     io::{CopyToBytes, SinkWriter},
-    sync::{PollSendError, PollSender},
+    sync::PollSender,
 };
 use tonic::{async_trait, transport::Channel, Code, Status};
 use tracing::instrument;
@@ -79,10 +79,8 @@ impl BlobService for GRPCBlobService {
     }
 
     /// Returns a BlobWriter, that'll internally wrap each write in a
-    // [proto::BlobChunk], which is send to the gRPC server.
+    /// [proto::BlobChunk], which is send to the gRPC server.
     async fn open_write(&self) -> Box<dyn BlobWriter> {
-        let mut grpc_client = self.grpc_client.clone();
-
         // set up an mpsc channel passing around Bytes.
         let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
 
@@ -90,20 +88,15 @@ impl BlobService for GRPCBlobService {
         // [proto::BlobChunk], and a [ReceiverStream] is constructed.
         let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x });
 
-        // That receiver stream is used as a stream in the gRPC BlobService.put rpc call.
-        let task: JoinHandle<Result<_, Status>> =
-            tokio::spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) });
+        // spawn the gRPC put request, which will read from blobchunk_stream.
+        let task = tokio::spawn({
+            let mut grpc_client = self.grpc_client.clone();
+            async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) }
+        });
 
         // The tx part of the channel is converted to a sink of byte chunks.
-
-        // We need to make this a function pointer, not a closure.
-        fn convert_error(_: PollSendError<bytes::Bytes>) -> io::Error {
-            io::Error::from(io::ErrorKind::BrokenPipe)
-        }
-
         let sink = PollSender::new(tx)
-            .sink_map_err(convert_error as fn(PollSendError<bytes::Bytes>) -> io::Error);
-        // We need to explicitly cast here, otherwise rustc does error with "expected fn pointer, found fn item"
+            .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e));
 
         // … which is turned into an [tokio::io::AsyncWrite].
         let writer = SinkWriter::new(CopyToBytes::new(sink));