From 7923cc19f698bdd128f93087d203cd6182b21ef2 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 5 Sep 2023 15:22:57 +0300 Subject: refactor(tvix/store): use tokio::task::JoinHandle This makes the inside code a bit less verbose. I wasn't able to describe the type of the async move closure itself, which would allow us to remove the JoinHandle<_> type annotation entirely. Change-Id: I06193982a0c7010bd72d3ffa4f760bea1b097632 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9268 Autosubmit: flokli Reviewed-by: tazjin Tested-by: BuildkiteCI --- tvix/store/src/blobservice/grpc.rs | 23 +++++++++---------- tvix/store/src/directoryservice/grpc.rs | 9 ++++---- tvix/store/src/pathinfoservice/grpc.rs | 40 ++++++++++++++++----------------- 3 files changed, 35 insertions(+), 37 deletions(-) diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index 71cde35cb21b..c6d28860f8aa 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -94,16 +94,15 @@ impl BlobService for GRPCBlobService { let mut grpc_client = self.grpc_client.clone(); let digest = digest.clone(); - let task: tokio::task::JoinHandle> = - self.tokio_handle.spawn(async move { - Ok(grpc_client - .stat(proto::StatBlobRequest { - digest: digest.into(), - ..Default::default() - }) - .await? - .into_inner()) - }); + let task: JoinHandle> = self.tokio_handle.spawn(async move { + Ok(grpc_client + .stat(proto::StatBlobRequest { + digest: digest.into(), + ..Default::default() + }) + .await? + .into_inner()) + }); match self.tokio_handle.block_on(task)? { Ok(_blob_meta) => Ok(true), @@ -122,7 +121,7 @@ impl BlobService for GRPCBlobService { // Construct the task that'll send out the request and return the stream // the gRPC client should use to send [proto::BlobChunk], or an error if // the blob doesn't exist. - let task: tokio::task::JoinHandle, Status>> = + let task: JoinHandle, Status>> = self.tokio_handle.spawn(async move { let stream = grpc_client .read(proto::ReadBlobRequest { @@ -172,7 +171,7 @@ impl BlobService for GRPCBlobService { let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x }); // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. - let task: tokio::task::JoinHandle> = self + let task: JoinHandle> = self .tokio_handle .spawn(async move { Ok(grpc_client.put(blobchunk_stream).await?.into_inner()) }); diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index 22805523845d..73d88bb688a3 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -5,6 +5,7 @@ use crate::proto::{self, get_directory_request::ByWhat}; use crate::{B3Digest, Error}; use tokio::net::UnixStream; use tokio::sync::mpsc::UnboundedSender; +use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{transport::Channel, Status}; use tonic::{Code, Streaming}; @@ -162,7 +163,7 @@ impl DirectoryService for GRPCDirectoryService { // clone so we can move it let root_directory_digest_cpy = root_directory_digest.clone(); - let task: tokio::task::JoinHandle, Status>> = + let task: JoinHandle, Status>> = self.tokio_handle.spawn(async move { let s = grpc_client .get(proto::GetDirectoryRequest { @@ -193,7 +194,7 @@ impl DirectoryService for GRPCDirectoryService { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let task: tokio::task::JoinHandle> = + let task: JoinHandle> = self.tokio_handle.spawn(async move { let s = grpc_client .put(UnboundedReceiverStream::new(rx)) @@ -303,7 +304,7 @@ pub struct GRPCPutter { /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. #[allow(clippy::type_complexity)] // lol rq: Option<( - tokio::task::JoinHandle>, + JoinHandle>, UnboundedSender, )>, } @@ -312,7 +313,7 @@ impl GRPCPutter { pub fn new( tokio_handle: tokio::runtime::Handle, directory_sender: UnboundedSender, - task: tokio::task::JoinHandle>, + task: JoinHandle>, ) -> Self { Self { tokio_handle, diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 2bd766697bd1..1649655b69ea 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -5,7 +5,7 @@ use crate::{ proto::{self, ListPathInfoRequest}, }; use std::sync::Arc; -use tokio::net::UnixStream; +use tokio::{net::UnixStream, task::JoinHandle}; use tonic::{transport::Channel, Code, Status, Streaming}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. @@ -96,7 +96,7 @@ impl PathInfoService for GRPCPathInfoService { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); - let task: tokio::task::JoinHandle> = + let task: JoinHandle> = self.tokio_handle.spawn(async move { let path_info = grpc_client .get(proto::GetPathInfoRequest { @@ -121,7 +121,7 @@ impl PathInfoService for GRPCPathInfoService { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); - let task: tokio::task::JoinHandle> = + let task: JoinHandle> = self.tokio_handle.spawn(async move { let path_info = grpc_client.put(path_info).await?.into_inner(); Ok(path_info) @@ -140,16 +140,15 @@ impl PathInfoService for GRPCPathInfoService { let mut grpc_client = self.grpc_client.clone(); let root_node = root_node.clone(); - let task: tokio::task::JoinHandle> = - self.tokio_handle.spawn(async move { - let path_info = grpc_client - .calculate_nar(proto::Node { - node: Some(root_node), - }) - .await? - .into_inner(); - Ok(path_info) - }); + let task: JoinHandle> = self.tokio_handle.spawn(async move { + let path_info = grpc_client + .calculate_nar(proto::Node { + node: Some(root_node), + }) + .await? + .into_inner(); + Ok(path_info) + }); let resp = self .tokio_handle @@ -169,15 +168,14 @@ impl PathInfoService for GRPCPathInfoService { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); - let task: tokio::task::JoinHandle> = - self.tokio_handle.spawn(async move { - let s = grpc_client - .list(ListPathInfoRequest::default()) - .await? - .into_inner(); + let task: JoinHandle> = self.tokio_handle.spawn(async move { + let s = grpc_client + .list(ListPathInfoRequest::default()) + .await? + .into_inner(); - Ok(s) - }); + Ok(s) + }); let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); -- cgit 1.4.1