diff options
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 14 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 32 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/tests/utils.rs | 7 | ||||
-rw-r--r-- | tvix/store/src/utils.rs | 14 |
6 files changed, 59 insertions, 20 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 760569eb8dc6..733b7c11f2c2 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -31,13 +31,14 @@ tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } tonic = { version = "0.11.0", features = ["tls", "tls-roots"] } tower = "0.4.13" +tower-http = { version = "0.4.4", features = ["trace"] } tvix-castore = { path = "../castore" } url = "2.4.0" walkdir = "2.4.0" reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false } lru = "0.12.3" parking_lot = "0.12.2" -tvix-tracing = { path = "../tracing" } +tvix-tracing = { path = "../tracing", features = ["tonic"] } tracing = "0.1.40" tracing-indicatif = "0.3.6" diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 30ebca004873..657ce06720ee 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -13,6 +13,8 @@ use tokio_listener::Listener; use tokio_listener::SystemOptions; use tokio_listener::UserOptions; use tonic::transport::Server; +use tower::ServiceBuilder; +use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use tracing::{info, info_span, instrument, Level, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt as _; use tvix_castore::import::fs::ingest_path; @@ -215,7 +217,17 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { .parse() .unwrap(); - let mut server = Server::builder(); + let mut server = Server::builder().layer( + ServiceBuilder::new() + .layer( + TraceLayer::new_for_grpc().make_span_with( + DefaultMakeSpan::new() + .level(Level::INFO) + .include_headers(true), + ), + ) + .map_request(tvix_tracing::propagate::tonic::accept_trace), + ); #[allow(unused_mut)] let mut router = server diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 455909e7f235..9173d25d05ca 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -105,9 +105,12 @@ pub async fn from_addr( // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = - PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?); - Box::new(GRPCPathInfoService::from_client(client)) + Box::new(GRPCPathInfoService::from_client( + PathInfoServiceClient::with_interceptor( + tvix_castore::tonic::channel_from_url(&url).await?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } #[cfg(feature = "cloud")] "bigtable" => { diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 63656609354d..152bf69a05ac 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -6,31 +6,43 @@ use crate::{ use async_stream::try_stream; use futures::stream::BoxStream; use nix_compat::nixbase32; -use tonic::{async_trait, transport::Channel, Code}; +use tonic::{async_trait, Code}; use tracing::{instrument, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::{proto as castorepb, Error}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] -pub struct GRPCPathInfoService { +pub struct GRPCPathInfoService<T> +where + T: Clone, +{ /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>, } -impl GRPCPathInfoService { +impl<T> GRPCPathInfoService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone, +{ /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( - grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>, ) -> Self { Self { grpc_client } } } #[async_trait] -impl PathInfoService for GRPCPathInfoService { +impl<T> PathInfoService for GRPCPathInfoService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let path_info = self @@ -107,7 +119,13 @@ impl PathInfoService for GRPCPathInfoService { } #[async_trait] -impl NarCalculationService for GRPCPathInfoService { +impl<T> NarCalculationService for GRPCPathInfoService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=1))] async fn calculate_nar( &self, diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs index ee170468d1d2..3e4fe5c05a24 100644 --- a/tvix/store/src/pathinfoservice/tests/utils.rs +++ b/tvix/store/src/pathinfoservice/tests/utils.rs @@ -16,8 +16,11 @@ use crate::{ /// Constructs and returns a gRPC PathInfoService. /// We also return memory-based {Blob,Directory}Service, /// as the consumer of this function accepts a 3-tuple. -pub async fn make_grpc_path_info_service_client( -) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) { +pub async fn make_grpc_path_info_service_client() -> ( + impl BlobService, + impl DirectoryService, + GRPCPathInfoService<tonic::transport::Channel>, +) { let (left, right) = tokio::io::duplex(64); let blob_service = blob_service(); diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index 67815fa94c4a..bd3c65a77998 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -51,12 +51,14 @@ pub async fn construct_services( .map_err(|e| io::Error::other(e.to_string()))?; if url.scheme().starts_with("grpc+") { - let client = PathInfoServiceClient::new( - tvix_castore::tonic::channel_from_url(&url) - .await - .map_err(|e| io::Error::other(e.to_string()))?, - ); - Box::new(GRPCPathInfoService::from_client(client)) + Box::new(GRPCPathInfoService::from_client( + PathInfoServiceClient::with_interceptor( + tvix_castore::tonic::channel_from_url(&url) + .await + .map_err(|e| io::Error::other(e.to_string()))?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } else { Box::new(SimpleRenderer::new( blob_service.clone(), |