about summary refs log tree commit diff
path: root/tvix/store
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store')
-rw-r--r--tvix/store/Cargo.toml3
-rw-r--r--tvix/store/src/bin/tvix-store.rs14
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs9
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs32
-rw-r--r--tvix/store/src/pathinfoservice/tests/utils.rs7
-rw-r--r--tvix/store/src/utils.rs14
6 files changed, 59 insertions, 20 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 760569eb8dc6..733b7c11f2c2 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -31,13 +31,14 @@ tokio-stream = { version = "0.1.14", features = ["fs"] }
 tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
 tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
 tower = "0.4.13"
+tower-http = { version = "0.4.4", features = ["trace"] }
 tvix-castore = { path = "../castore" }
 url = "2.4.0"
 walkdir = "2.4.0"
 reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false }
 lru = "0.12.3"
 parking_lot = "0.12.2"
-tvix-tracing = { path = "../tracing" }
+tvix-tracing = { path = "../tracing", features = ["tonic"] }
 tracing = "0.1.40"
 tracing-indicatif = "0.3.6"
 
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 30ebca004873..657ce06720ee 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -13,6 +13,8 @@ use tokio_listener::Listener;
 use tokio_listener::SystemOptions;
 use tokio_listener::UserOptions;
 use tonic::transport::Server;
+use tower::ServiceBuilder;
+use tower_http::trace::{DefaultMakeSpan, TraceLayer};
 use tracing::{info, info_span, instrument, Level, Span};
 use tracing_indicatif::span_ext::IndicatifSpanExt as _;
 use tvix_castore::import::fs::ingest_path;
@@ -215,7 +217,17 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
                 .parse()
                 .unwrap();
 
-            let mut server = Server::builder();
+            let mut server = Server::builder().layer(
+                ServiceBuilder::new()
+                    .layer(
+                        TraceLayer::new_for_grpc().make_span_with(
+                            DefaultMakeSpan::new()
+                                .level(Level::INFO)
+                                .include_headers(true),
+                        ),
+                    )
+                    .map_request(tvix_tracing::propagate::tonic::accept_trace),
+            );
 
             #[allow(unused_mut)]
             let mut router = server
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 455909e7f235..9173d25d05ca 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -105,9 +105,12 @@ pub async fn from_addr(
             // - In the case of unix sockets, there must be a path, but may not be a host.
             // - In the case of non-unix sockets, there must be a host, but no path.
             // Constructing the channel is handled by tvix_castore::channel::from_url.
-            let client =
-                PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?);
-            Box::new(GRPCPathInfoService::from_client(client))
+            Box::new(GRPCPathInfoService::from_client(
+                PathInfoServiceClient::with_interceptor(
+                    tvix_castore::tonic::channel_from_url(&url).await?,
+                    tvix_tracing::propagate::tonic::send_trace,
+                ),
+            ))
         }
         #[cfg(feature = "cloud")]
         "bigtable" => {
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 63656609354d..152bf69a05ac 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -6,31 +6,43 @@ use crate::{
 use async_stream::try_stream;
 use futures::stream::BoxStream;
 use nix_compat::nixbase32;
-use tonic::{async_trait, transport::Channel, Code};
+use tonic::{async_trait, Code};
 use tracing::{instrument, Span};
 use tracing_indicatif::span_ext::IndicatifSpanExt;
 use tvix_castore::{proto as castorepb, Error};
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
 #[derive(Clone)]
-pub struct GRPCPathInfoService {
+pub struct GRPCPathInfoService<T>
+where
+    T: Clone,
+{
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
-    grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+    grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
 }
 
-impl GRPCPathInfoService {
+impl<T> GRPCPathInfoService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
+{
     /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient].
     /// panics if called outside the context of a tokio runtime.
     pub fn from_client(
-        grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+        grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
     ) -> Self {
         Self { grpc_client }
     }
 }
 
 #[async_trait]
-impl PathInfoService for GRPCPathInfoService {
+impl<T> PathInfoService for GRPCPathInfoService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
+    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
+    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
+    T::Future: Send,
+{
     #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         let path_info = self
@@ -107,7 +119,13 @@ impl PathInfoService for GRPCPathInfoService {
 }
 
 #[async_trait]
-impl NarCalculationService for GRPCPathInfoService {
+impl<T> NarCalculationService for GRPCPathInfoService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
+    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
+    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
+    T::Future: Send,
+{
     #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=1))]
     async fn calculate_nar(
         &self,
diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs
index ee170468d1d2..3e4fe5c05a24 100644
--- a/tvix/store/src/pathinfoservice/tests/utils.rs
+++ b/tvix/store/src/pathinfoservice/tests/utils.rs
@@ -16,8 +16,11 @@ use crate::{
 /// Constructs and returns a gRPC PathInfoService.
 /// We also return memory-based {Blob,Directory}Service,
 /// as the consumer of this function accepts a 3-tuple.
-pub async fn make_grpc_path_info_service_client(
-) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) {
+pub async fn make_grpc_path_info_service_client() -> (
+    impl BlobService,
+    impl DirectoryService,
+    GRPCPathInfoService<tonic::transport::Channel>,
+) {
     let (left, right) = tokio::io::duplex(64);
 
     let blob_service = blob_service();
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs
index 67815fa94c4a..bd3c65a77998 100644
--- a/tvix/store/src/utils.rs
+++ b/tvix/store/src/utils.rs
@@ -51,12 +51,14 @@ pub async fn construct_services(
             .map_err(|e| io::Error::other(e.to_string()))?;
 
         if url.scheme().starts_with("grpc+") {
-            let client = PathInfoServiceClient::new(
-                tvix_castore::tonic::channel_from_url(&url)
-                    .await
-                    .map_err(|e| io::Error::other(e.to_string()))?,
-            );
-            Box::new(GRPCPathInfoService::from_client(client))
+            Box::new(GRPCPathInfoService::from_client(
+                PathInfoServiceClient::with_interceptor(
+                    tvix_castore::tonic::channel_from_url(&url)
+                        .await
+                        .map_err(|e| io::Error::other(e.to_string()))?,
+                    tvix_tracing::propagate::tonic::send_trace,
+                ),
+            ))
         } else {
             Box::new(SimpleRenderer::new(
                 blob_service.clone(),