From b919f297528b74d9eab95fe93aa87541d7dffc53 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sat, 25 Mar 2023 22:17:23 +0100 Subject: feat(tvix/store/pathinfosvc): add gRPC client Change-Id: Ie8e205c691bd11db99fcf097357c1e49161c6e19 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8349 Autosubmit: flokli Reviewed-by: tazjin Tested-by: BuildkiteCI --- tvix/store/src/errors.rs | 7 ++++ tvix/store/src/pathinfoservice/grpc.rs | 71 ++++++++++++++++++++++++++++++++++ tvix/store/src/pathinfoservice/mod.rs | 2 + 3 files changed, 80 insertions(+) create mode 100644 tvix/store/src/pathinfoservice/grpc.rs diff --git a/tvix/store/src/errors.rs b/tvix/store/src/errors.rs index 36a4320cb3..3b23f972b0 100644 --- a/tvix/store/src/errors.rs +++ b/tvix/store/src/errors.rs @@ -1,5 +1,6 @@ use std::sync::PoisonError; use thiserror::Error; +use tokio::task::JoinError; use tonic::Status; /// Errors related to communication with the store. @@ -18,6 +19,12 @@ impl From> for Error { } } +impl From for Error { + fn from(value: JoinError) -> Self { + Error::StorageError(value.to_string()) + } +} + impl From for Status { fn from(value: Error) -> Self { match value { diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs new file mode 100644 index 0000000000..821331fbde --- /dev/null +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -0,0 +1,71 @@ +use super::PathInfoService; +use crate::proto; +use tonic::{transport::Channel, Code, Status}; + +/// Connects to a (remote) tvix-store PathInfoService over gRPC. +#[derive(Clone)] +pub struct GRPCPathInfoService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::path_info_service_client::PathInfoServiceClient, +} + +impl GRPCPathInfoService { + /// Construct a new [GRPCPathInfoService], by passing a handle to the tokio + /// runtime, and a gRPC client. + pub fn new( + tokio_handle: tokio::runtime::Handle, + grpc_client: proto::path_info_service_client::PathInfoServiceClient, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } +} + +impl PathInfoService for GRPCPathInfoService { + fn get( + &self, + by_what: proto::get_path_info_request::ByWhat, + ) -> Result, crate::Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let task: tokio::task::JoinHandle> = + self.tokio_handle.spawn(async move { + let path_info = grpc_client + .get(proto::GetPathInfoRequest { + by_what: Some(by_what), + }) + .await? + .into_inner(); + + Ok(path_info) + }); + + match self.tokio_handle.block_on(task)? { + Ok(path_info) => Ok(Some(path_info)), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + fn put(&self, path_info: proto::PathInfo) -> Result { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let task: tokio::task::JoinHandle> = + self.tokio_handle.spawn(async move { + let path_info = grpc_client.put(path_info).await?.into_inner(); + Ok(path_info) + }); + + self.tokio_handle + .block_on(task)? + .map_err(|e| crate::Error::StorageError(e.to_string())) + } +} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index b20985dd80..410bd205be 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -1,8 +1,10 @@ +mod grpc; mod memory; mod sled; use crate::{proto, Error}; +pub use self::grpc::GRPCPathInfoService; pub use self::memory::MemoryPathInfoService; pub use self::sled::SledPathInfoService; -- cgit 1.4.1