diff options
Diffstat (limited to 'tvix/castore/src/directoryservice/grpc.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 153 |
1 files changed, 101 insertions, 52 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index fe935629bfcb..3fd177a34f28 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,44 +1,52 @@ use std::collections::HashSet; -use super::{DirectoryPutter, DirectoryService}; +use super::{Directory, DirectoryPutter, DirectoryService}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; -use crate::{B3Digest, Error}; +use crate::{B3Digest, DirectoryError, Error}; use async_stream::try_stream; use futures::stream::BoxStream; +use std::sync::Arc; use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::async_trait; -use tonic::Code; -use tonic::{transport::Channel, Status}; -use tracing::{instrument, warn}; +use tonic::{async_trait, Code, Status}; +use tracing::{instrument, warn, Instrument as _}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] -pub struct GRPCDirectoryService { +pub struct GRPCDirectoryService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, } -impl GRPCDirectoryService { +impl<T> GRPCDirectoryService<T> { /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + instance_name: String, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, ) -> Self { - Self { grpc_client } + Self { + instance_name, + grpc_client, + } } } #[async_trait] -impl DirectoryService for GRPCDirectoryService { - #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] - async fn get( - &self, - digest: &B3Digest, - ) -> Result<Option<crate::proto::Directory>, crate::Error> { +impl<T> DirectoryService for GRPCDirectoryService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ + #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))] + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); @@ -66,15 +74,10 @@ impl DirectoryService for GRPCDirectoryService { "requested directory with digest {}, but got {}", digest, actual_digest ))) - } else if let Err(e) = directory.validate() { - // Validate the Directory itself is valid. - warn!("directory failed validation: {}", e.to_string()); - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - digest, e, - ))) } else { - Ok(Some(directory)) + Ok(Some(directory.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?)) } } Ok(None) => Ok(None), @@ -83,12 +86,12 @@ impl DirectoryService for GRPCDirectoryService { } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] + async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client .clone() - .put(tokio_stream::once(directory)) + .put(tokio_stream::once(proto::Directory::from(directory))) .await; match resp { @@ -103,11 +106,11 @@ impl DirectoryService for GRPCDirectoryService { } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); @@ -124,19 +127,11 @@ impl DirectoryService for GRPCDirectoryService { // The Directory digests we received so far let mut received_directory_digests: HashSet<B3Digest> = HashSet::new(); // The Directory digests we're still expecting to get sent. - let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]); + let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest.clone()]); loop { match stream.message().await { Ok(Some(directory)) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))?; - } // validate we actually expected that directory, and move it from expected to received. let directory_digest = directory.digest(); let was_expected = expected_directory_digests.remove(&directory_digest); @@ -162,14 +157,28 @@ impl DirectoryService for GRPCDirectoryService { .insert(child_directory_digest); } + let directory = directory.try_into() + .map_err(|e: DirectoryError| Error::StorageError(e.to_string()))?; + yield directory; }, + Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => { + // The root directory of the requested closure was not found, return an + // empty stream + return + } Ok(None) => { - // If we were still expecting something, that's an error. - if !expected_directory_digests.is_empty() { + // The stream has ended + let diff_len = expected_directory_digests + // Account for directories which have been referenced more than once, + // but only received once since they were deduplicated + .difference(&received_directory_digests) + .count(); + // If this is not empty, then the closure is incomplete + if diff_len != 0 { Err(crate::Error::StorageError(format!( "still expected {} directories, but got premature end of stream", - expected_directory_digests.len(), + diff_len )))? } else { return @@ -194,14 +203,17 @@ impl DirectoryService for GRPCDirectoryService { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move { - let s = grpc_client - .put(UnboundedReceiverStream::new(rx)) - .await? - .into_inner(); + let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn( + async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); - Ok(s) - }); + Ok(s) + } // instrument the task with the current span, this is not done by default + .in_current_span(), + ); Box::new(GRPCPutter { rq: Some((task, tx)), @@ -209,6 +221,43 @@ impl DirectoryService for GRPCDirectoryService { } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCDirectoryServiceConfig { + url: String, +} + +impl TryFrom<url::Url> for GRPCDirectoryServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // This is normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCDirectoryServiceConfig { + url: url.to_string(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for GRPCDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::directory_service_client::DirectoryServiceClient::new( + crate::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCDirectoryService::from_client( + instance_name.to_string(), + client, + ))) + } +} + /// Allows uploading multiple Directory messages in the same gRPC stream. pub struct GRPCPutter { /// Data about the current request - a handle to the task, and the tx part @@ -225,11 +274,11 @@ pub struct GRPCPutter { #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { - if directory_sender.send(directory).is_err() { + if directory_sender.send(directory.into()).is_err() { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. @@ -333,7 +382,7 @@ mod tests { .await .expect("must succeed"), ); - GRPCDirectoryService::from_client(client) + GRPCDirectoryService::from_client("test-instance".into(), client) }; assert!(grpc_client |