diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice/grpc.rs')
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 100 |
1 files changed, 62 insertions, 38 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index bcee49aac6cf..453044cba13d 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -1,15 +1,18 @@ -use super::PathInfoService; +use super::{PathInfo, PathInfoService}; use crate::{ nar::NarCalculationService, - proto::{self, ListPathInfoRequest, PathInfo}, + proto::{self, ListPathInfoRequest}, }; use async_stream::try_stream; use futures::stream::BoxStream; use nix_compat::nixbase32; +use std::sync::Arc; use tonic::{async_trait, Code}; -use tracing::{instrument, Span}; +use tracing::{instrument, warn, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; -use tvix_castore::{proto as castorepb, Error}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; +use tvix_castore::Error; +use tvix_castore::Node; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] @@ -50,15 +53,10 @@ where .await; match path_info { - Ok(path_info) => { - let path_info = path_info.into_inner(); - - path_info - .validate() - .map_err(|e| Error::StorageError(format!("invalid pathinfo: {}", e)))?; - - Ok(Some(path_info)) - } + Ok(path_info) => Ok(Some( + PathInfo::try_from(path_info.into_inner()) + .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?, + )), Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(Error::StorageError(e.to_string())), } @@ -69,12 +67,12 @@ where let path_info = self .grpc_client .clone() - .put(path_info) + .put(proto::PathInfo::from(path_info)) .await .map_err(|e| Error::StorageError(e.to_string()))? .into_inner(); - - Ok(path_info) + Ok(PathInfo::try_from(path_info) + .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?) } #[instrument(level = "trace", skip_all)] @@ -88,21 +86,8 @@ where loop { match stream.message().await { - Ok(o) => match o { - Some(pathinfo) => { - // validate the pathinfo - if let Err(e) = pathinfo.validate() { - Err(Error::StorageError(format!( - "pathinfo {:?} failed validation: {}", - pathinfo, e - )))?; - } - yield pathinfo - } - None => { - return; - }, - }, + Ok(Some(path_info)) => yield PathInfo::try_from(path_info).map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?, + Ok(None) => return, Err(e) => Err(Error::StorageError(e.to_string()))?, } } @@ -110,6 +95,13 @@ where Box::pin(stream) } + + #[instrument(level = "trace", skip_all)] + fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> { + Some(Box::new(GRPCPathInfoService { + grpc_client: self.grpc_client.clone(), + }) as Box<dyn NarCalculationService>) + } } #[async_trait] @@ -121,10 +113,7 @@ where T::Future: Send, { #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=1))] - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { + async fn calculate_nar(&self, root_node: &Node) -> Result<(u64, [u8; 32]), Error> { let span = Span::current(); span.pb_set_message("Waiting for NAR calculation"); span.pb_start(); @@ -132,9 +121,10 @@ where let path_info = self .grpc_client .clone() - .calculate_nar(castorepb::Node { - node: Some(root_node.clone()), - }) + .calculate_nar(tvix_castore::proto::Node::from_name_and_node( + "".into(), + root_node.to_owned(), + )) .await .map_err(|e| Error::StorageError(e.to_string()))? .into_inner(); @@ -149,6 +139,40 @@ where } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCPathInfoServiceConfig { + url: String, +} + +impl TryFrom<url::Url> for GRPCPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCPathInfoServiceConfig { + url: url.to_string(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for GRPCPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::path_info_service_client::PathInfoServiceClient::new( + tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCPathInfoService::from_client(client))) + } +} + #[cfg(test)] mod tests { use crate::pathinfoservice::tests::make_grpc_path_info_service_client; |