about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-05T12·22+0300
committerclbot <clbot@tvl.fyi>2023-09-05T21·13+0000
commit7923cc19f698bdd128f93087d203cd6182b21ef2 (patch)
tree572174cd040a106e9fb5f25feb6ef4eae6b36b07
parentf499d2e031c100b6e1af53c8d77c045667ec1909 (diff)
refactor(tvix/store): use tokio::task::JoinHandle r/6558
This makes the inside code a bit less verbose.

I wasn't able to describe the type of the async move closure itself,
which would allow us to remove the JoinHandle<_> type annotation
entirely.

Change-Id: I06193982a0c7010bd72d3ffa4f760bea1b097632
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9268
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
-rw-r--r--tvix/store/src/blobservice/grpc.rs23
-rw-r--r--tvix/store/src/directoryservice/grpc.rs9
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs40
3 files changed, 35 insertions, 37 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
index 71cde35cb2..c6d28860f8 100644
--- a/tvix/store/src/blobservice/grpc.rs
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -94,16 +94,15 @@ impl BlobService for GRPCBlobService {
         let mut grpc_client = self.grpc_client.clone();
         let digest = digest.clone();
 
-        let task: tokio::task::JoinHandle<Result<_, Status>> =
-            self.tokio_handle.spawn(async move {
-                Ok(grpc_client
-                    .stat(proto::StatBlobRequest {
-                        digest: digest.into(),
-                        ..Default::default()
-                    })
-                    .await?
-                    .into_inner())
-            });
+        let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
+            Ok(grpc_client
+                .stat(proto::StatBlobRequest {
+                    digest: digest.into(),
+                    ..Default::default()
+                })
+                .await?
+                .into_inner())
+        });
 
         match self.tokio_handle.block_on(task)? {
             Ok(_blob_meta) => Ok(true),
@@ -122,7 +121,7 @@ impl BlobService for GRPCBlobService {
         // Construct the task that'll send out the request and return the stream
         // the gRPC client should use to send [proto::BlobChunk], or an error if
         // the blob doesn't exist.
-        let task: tokio::task::JoinHandle<Result<Streaming<proto::BlobChunk>, Status>> =
+        let task: JoinHandle<Result<Streaming<proto::BlobChunk>, Status>> =
             self.tokio_handle.spawn(async move {
                 let stream = grpc_client
                     .read(proto::ReadBlobRequest {
@@ -172,7 +171,7 @@ impl BlobService for GRPCBlobService {
         let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x });
 
         // That receiver stream is used as a stream in the gRPC BlobService.put rpc call.
-        let task: tokio::task::JoinHandle<Result<_, Status>> = self
+        let task: JoinHandle<Result<_, Status>> = self
             .tokio_handle
             .spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) });
 
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs
index 2280552384..73d88bb688 100644
--- a/tvix/store/src/directoryservice/grpc.rs
+++ b/tvix/store/src/directoryservice/grpc.rs
@@ -5,6 +5,7 @@ use crate::proto::{self, get_directory_request::ByWhat};
 use crate::{B3Digest, Error};
 use tokio::net::UnixStream;
 use tokio::sync::mpsc::UnboundedSender;
+use tokio::task::JoinHandle;
 use tokio_stream::wrappers::UnboundedReceiverStream;
 use tonic::{transport::Channel, Status};
 use tonic::{Code, Streaming};
@@ -162,7 +163,7 @@ impl DirectoryService for GRPCDirectoryService {
         // clone so we can move it
         let root_directory_digest_cpy = root_directory_digest.clone();
 
-        let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> =
+        let task: JoinHandle<Result<Streaming<proto::Directory>, Status>> =
             self.tokio_handle.spawn(async move {
                 let s = grpc_client
                     .get(proto::GetDirectoryRequest {
@@ -193,7 +194,7 @@ impl DirectoryService for GRPCDirectoryService {
 
         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
 
-        let task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>> =
+        let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> =
             self.tokio_handle.spawn(async move {
                 let s = grpc_client
                     .put(UnboundedReceiverStream::new(rx))
@@ -303,7 +304,7 @@ pub struct GRPCPutter {
     /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed.
     #[allow(clippy::type_complexity)] // lol
     rq: Option<(
-        tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
+        JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
         UnboundedSender<proto::Directory>,
     )>,
 }
@@ -312,7 +313,7 @@ impl GRPCPutter {
     pub fn new(
         tokio_handle: tokio::runtime::Handle,
         directory_sender: UnboundedSender<proto::Directory>,
-        task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
+        task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
     ) -> Self {
         Self {
             tokio_handle,
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 2bd766697b..1649655b69 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -5,7 +5,7 @@ use crate::{
     proto::{self, ListPathInfoRequest},
 };
 use std::sync::Arc;
-use tokio::net::UnixStream;
+use tokio::{net::UnixStream, task::JoinHandle};
 use tonic::{transport::Channel, Code, Status, Streaming};
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
@@ -96,7 +96,7 @@ impl PathInfoService for GRPCPathInfoService {
         // Get a new handle to the gRPC client.
         let mut grpc_client = self.grpc_client.clone();
 
-        let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> =
+        let task: JoinHandle<Result<proto::PathInfo, Status>> =
             self.tokio_handle.spawn(async move {
                 let path_info = grpc_client
                     .get(proto::GetPathInfoRequest {
@@ -121,7 +121,7 @@ impl PathInfoService for GRPCPathInfoService {
         // Get a new handle to the gRPC client.
         let mut grpc_client = self.grpc_client.clone();
 
-        let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> =
+        let task: JoinHandle<Result<proto::PathInfo, Status>> =
             self.tokio_handle.spawn(async move {
                 let path_info = grpc_client.put(path_info).await?.into_inner();
                 Ok(path_info)
@@ -140,16 +140,15 @@ impl PathInfoService for GRPCPathInfoService {
         let mut grpc_client = self.grpc_client.clone();
         let root_node = root_node.clone();
 
-        let task: tokio::task::JoinHandle<Result<_, Status>> =
-            self.tokio_handle.spawn(async move {
-                let path_info = grpc_client
-                    .calculate_nar(proto::Node {
-                        node: Some(root_node),
-                    })
-                    .await?
-                    .into_inner();
-                Ok(path_info)
-            });
+        let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
+            let path_info = grpc_client
+                .calculate_nar(proto::Node {
+                    node: Some(root_node),
+                })
+                .await?
+                .into_inner();
+            Ok(path_info)
+        });
 
         let resp = self
             .tokio_handle
@@ -169,15 +168,14 @@ impl PathInfoService for GRPCPathInfoService {
         // Get a new handle to the gRPC client.
         let mut grpc_client = self.grpc_client.clone();
 
-        let task: tokio::task::JoinHandle<Result<_, Status>> =
-            self.tokio_handle.spawn(async move {
-                let s = grpc_client
-                    .list(ListPathInfoRequest::default())
-                    .await?
-                    .into_inner();
+        let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move {
+            let s = grpc_client
+                .list(ListPathInfoRequest::default())
+                .await?
+                .into_inner();
 
-                Ok(s)
-            });
+            Ok(s)
+        });
 
         let stream = self.tokio_handle.block_on(task).unwrap().unwrap();