about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/Cargo.toml2
-rw-r--r--tvix/castore/src/blobservice/from_addr.rs8
-rw-r--r--tvix/castore/src/blobservice/grpc.rs27
-rw-r--r--tvix/castore/src/directoryservice/from_addr.rs8
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs26
5 files changed, 49 insertions, 22 deletions
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index ea4c598fe884..951613057064 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -29,7 +29,7 @@ tonic = "0.11.0"
 tower = "0.4.13"
 tracing = "0.1.37"
 tracing-indicatif = "0.3.6"
-tvix-tracing = { path = "../tracing" }
+tvix-tracing = { path = "../tracing", features = ["tonic"] }
 url = "2.4.0"
 walkdir = "2.4.0"
 zstd = "0.13.0"
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs
index 8898bbfb95ce..f76592e509f8 100644
--- a/tvix/castore/src/blobservice/from_addr.rs
+++ b/tvix/castore/src/blobservice/from_addr.rs
@@ -30,8 +30,12 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error>
             // - In the case of unix sockets, there must be a path, but may not be a host.
             // - In the case of non-unix sockets, there must be a host, but no path.
             // Constructing the channel is handled by tvix_castore::channel::from_url.
-            let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?);
-            Box::new(GRPCBlobService::from_client(client))
+            Box::new(GRPCBlobService::from_client(
+                BlobServiceClient::with_interceptor(
+                    crate::tonic::channel_from_url(&url).await?,
+                    tvix_tracing::propagate::tonic::send_trace,
+                ),
+            ))
         }
         scheme if scheme.starts_with("objectstore+") => {
             // We need to convert the URL to string, strip the prefix there, and then
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 8bde6a120d0a..9b54d81de29c 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -17,29 +17,39 @@ use tokio_util::{
     io::{CopyToBytes, SinkWriter},
     sync::PollSender,
 };
-use tonic::{async_trait, transport::Channel, Code, Status};
+use tonic::{async_trait, Code, Status};
 use tracing::{instrument, Instrument as _};
 
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
-pub struct GRPCBlobService {
+pub struct GRPCBlobService<T>
+where
+    T: Clone,
+{
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
-    grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
+    grpc_client: proto::blob_service_client::BlobServiceClient<T>,
 }
 
-impl GRPCBlobService {
+impl<T> GRPCBlobService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
+{
     /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient].
     /// panics if called outside the context of a tokio runtime.
-    pub fn from_client(
-        grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
-    ) -> Self {
+    pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self {
         Self { grpc_client }
     }
 }
 
 #[async_trait]
-impl BlobService for GRPCBlobService {
+impl<T> BlobService for GRPCBlobService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
+    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
+    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
+    T::Future: Send,
+{
     #[instrument(skip(self, digest), fields(blob.digest=%digest))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
         let mut grpc_client = self.grpc_client.clone();
@@ -337,7 +347,6 @@ mod tests {
                     .await
                     .expect("must succeed"),
             );
-
             GRPCBlobService::from_client(client)
         };
 
diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs
index ee675ca68a9f..9aa01df171d7 100644
--- a/tvix/castore/src/directoryservice/from_addr.rs
+++ b/tvix/castore/src/directoryservice/from_addr.rs
@@ -63,8 +63,12 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Er
             // - In the case of unix sockets, there must be a path, but may not be a host.
             // - In the case of non-unix sockets, there must be a host, but no path.
             // Constructing the channel is handled by tvix_castore::channel::from_url.
-            let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?);
-            Box::new(GRPCDirectoryService::from_client(client))
+            Box::new(GRPCDirectoryService::from_client(
+                DirectoryServiceClient::with_interceptor(
+                    crate::tonic::channel_from_url(&url).await?,
+                    tvix_tracing::propagate::tonic::send_trace,
+                ),
+            ))
         }
         scheme if scheme.starts_with("objectstore+") => {
             // We need to convert the URL to string, strip the prefix there, and then
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index 24e498a997ef..54d7575adcc1 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -9,31 +9,41 @@ use tokio::spawn;
 use tokio::sync::mpsc::UnboundedSender;
 use tokio::task::JoinHandle;
 use tokio_stream::wrappers::UnboundedReceiverStream;
-use tonic::async_trait;
-use tonic::Code;
-use tonic::{transport::Channel, Status};
+use tonic::{async_trait, Code, Status};
 use tracing::{instrument, warn, Instrument as _};
 
 /// Connects to a (remote) tvix-store DirectoryService over gRPC.
 #[derive(Clone)]
-pub struct GRPCDirectoryService {
+pub struct GRPCDirectoryService<T>
+where
+    T: Clone,
+{
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
-    grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
+    grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
 }
 
-impl GRPCDirectoryService {
+impl<T> GRPCDirectoryService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
+{
     /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient].
     /// panics if called outside the context of a tokio runtime.
     pub fn from_client(
-        grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
+        grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
     ) -> Self {
         Self { grpc_client }
     }
 }
 
 #[async_trait]
-impl DirectoryService for GRPCDirectoryService {
+impl<T> DirectoryService for GRPCDirectoryService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
+    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
+    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
+    T::Future: Send,
+{
     #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))]
     async fn get(
         &self,