From 37a348b4fae16b2b1c5ec12deaa085a049833d7f Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Tue, 19 Sep 2023 11:46:41 -0500 Subject: refactor(tvix/store): Asyncify PathInfoService and DirectoryService We've decided to asyncify all of the services to reduce some of the pains going back and for between sync<->async. The end goal will be for all the tvix-store internals to be async and then expose a sync interface for things like tvix eval io. Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369 Autosubmit: Connor Brewster Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/store/src/pathinfoservice/grpc.rs | 176 +++++++++++++-------------------- 1 file changed, 66 insertions(+), 110 deletions(-) (limited to 'tvix/store/src/pathinfoservice/grpc.rs') diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 1649655b69..c116ddbc89 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -4,16 +4,15 @@ use crate::{ directoryservice::DirectoryService, proto::{self, ListPathInfoRequest}, }; -use std::sync::Arc; -use tokio::{net::UnixStream, task::JoinHandle}; -use tonic::{transport::Channel, Code, Status, Streaming}; +use async_stream::try_stream; +use futures::Stream; +use std::{pin::Pin, sync::Arc}; +use tokio::net::UnixStream; +use tonic::{async_trait, transport::Channel, Code}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] pub struct GRPCPathInfoService { - /// A handle into the active tokio runtime. Necessary to spawn tasks. - tokio_handle: tokio::runtime::Handle, - /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::path_info_service_client::PathInfoServiceClient, @@ -25,13 +24,11 @@ impl GRPCPathInfoService { pub fn from_client( grpc_client: proto::path_info_service_client::PathInfoServiceClient, ) -> Self { - Self { - tokio_handle: tokio::runtime::Handle::current(), - grpc_client, - } + Self { grpc_client } } } +#[async_trait] impl PathInfoService for GRPCPathInfoService { /// Constructs a [GRPCPathInfoService] from the passed [url::Url]: /// - scheme has to match `grpc+*://`. @@ -92,47 +89,39 @@ impl PathInfoService for GRPCPathInfoService { } } - fn get(&self, digest: [u8; 20]) -> Result, crate::Error> { + async fn get(&self, digest: [u8; 20]) -> Result, crate::Error> { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); - let task: JoinHandle> = - self.tokio_handle.spawn(async move { - let path_info = grpc_client - .get(proto::GetPathInfoRequest { - by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( - digest.to_vec().into(), - )), - }) - .await? - .into_inner(); - - Ok(path_info) - }); + let path_info = grpc_client + .get(proto::GetPathInfoRequest { + by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( + digest.to_vec().into(), + )), + }) + .await; - match self.tokio_handle.block_on(task)? { - Ok(path_info) => Ok(Some(path_info)), + match path_info { + Ok(path_info) => Ok(Some(path_info.into_inner())), Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(crate::Error::StorageError(e.to_string())), } } - fn put(&self, path_info: proto::PathInfo) -> Result { + async fn put(&self, path_info: proto::PathInfo) -> Result { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); - let task: JoinHandle> = - self.tokio_handle.spawn(async move { - let path_info = grpc_client.put(path_info).await?.into_inner(); - Ok(path_info) - }); + let path_info = grpc_client + .put(path_info) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); - self.tokio_handle - .block_on(task)? - .map_err(|e| crate::Error::StorageError(e.to_string())) + Ok(path_info) } - fn calculate_nar( + async fn calculate_nar( &self, root_node: &proto::node::Node, ) -> Result<(u64, [u8; 32]), crate::Error> { @@ -140,83 +129,54 @@ impl PathInfoService for GRPCPathInfoService { let mut grpc_client = self.grpc_client.clone(); let root_node = root_node.clone(); - let task: JoinHandle> = self.tokio_handle.spawn(async move { - let path_info = grpc_client - .calculate_nar(proto::Node { - node: Some(root_node), - }) - .await? - .into_inner(); - Ok(path_info) - }); - - let resp = self - .tokio_handle - .block_on(task)? - .map_err(|e| crate::Error::StorageError(e.to_string()))?; + let path_info = grpc_client + .calculate_nar(proto::Node { + node: Some(root_node), + }) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); - let nar_sha256: [u8; 32] = resp + let nar_sha256: [u8; 32] = path_info .nar_sha256 .to_vec() .try_into() .map_err(|_e| crate::Error::StorageError("invalid digest length".to_string()))?; - Ok((resp.nar_size, nar_sha256)) + Ok((path_info.nar_size, nar_sha256)) } - fn list(&self) -> Box> + Send> { - // Get a new handle to the gRPC client. + fn list(&self) -> Pin> + Send>> { let mut grpc_client = self.grpc_client.clone(); - let task: JoinHandle> = self.tokio_handle.spawn(async move { - let s = grpc_client - .list(ListPathInfoRequest::default()) - .await? - .into_inner(); - - Ok(s) - }); - - let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); - - Box::new(StreamIterator::new(self.tokio_handle.clone(), stream)) - } -} - -pub struct StreamIterator { - tokio_handle: tokio::runtime::Handle, - stream: Streaming, -} - -impl StreamIterator { - pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming) -> Self { - Self { - tokio_handle, - stream, - } - } -} - -impl Iterator for StreamIterator { - type Item = Result; - - fn next(&mut self) -> Option { - match self.tokio_handle.block_on(self.stream.message()) { - Ok(o) => match o { - Some(pathinfo) => { - // validate the pathinfo - if let Err(e) = pathinfo.validate() { - return Some(Err(crate::Error::StorageError(format!( - "pathinfo {:?} failed validation: {}", - pathinfo, e - )))); - } - Some(Ok(pathinfo)) + let stream = try_stream! { + let resp = grpc_client.list(ListPathInfoRequest::default()).await; + + let mut stream = resp.map_err(|e| crate::Error::StorageError(e.to_string()))?.into_inner(); + + loop { + match stream.message().await { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + Err(crate::Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))?; + } + yield pathinfo + } + None => { + return; + }, + }, + Err(e) => Err(crate::Error::StorageError(e.to_string()))?, } - None => None, - }, - Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), - } + } + }; + + Box::pin(stream) } } @@ -227,7 +187,6 @@ mod tests { use tempfile::TempDir; use tokio::net::UnixListener; - use tokio::task; use tokio::time; use tokio_stream::wrappers::UnixListenerStream; @@ -377,13 +336,10 @@ mod tests { ); } - let pi = task::spawn_blocking(move || { - client - .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap()) - .expect("must not be error") - }) - .await - .expect("must not be err"); + let pi = client + .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap()) + .await + .expect("must not be error"); assert!(pi.is_none()); } -- cgit 1.4.1