about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r--tvix/castore/src/blobservice/from_addr.rs8
-rw-r--r--tvix/castore/src/blobservice/grpc.rs27
2 files changed, 24 insertions, 11 deletions
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs
index 8898bbfb95ce..f76592e509f8 100644
--- a/tvix/castore/src/blobservice/from_addr.rs
+++ b/tvix/castore/src/blobservice/from_addr.rs
@@ -30,8 +30,12 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error>
             // - In the case of unix sockets, there must be a path, but may not be a host.
             // - In the case of non-unix sockets, there must be a host, but no path.
             // Constructing the channel is handled by tvix_castore::channel::from_url.
-            let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?);
-            Box::new(GRPCBlobService::from_client(client))
+            Box::new(GRPCBlobService::from_client(
+                BlobServiceClient::with_interceptor(
+                    crate::tonic::channel_from_url(&url).await?,
+                    tvix_tracing::propagate::tonic::send_trace,
+                ),
+            ))
         }
         scheme if scheme.starts_with("objectstore+") => {
             // We need to convert the URL to string, strip the prefix there, and then
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 8bde6a120d0a..9b54d81de29c 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -17,29 +17,39 @@ use tokio_util::{
     io::{CopyToBytes, SinkWriter},
     sync::PollSender,
 };
-use tonic::{async_trait, transport::Channel, Code, Status};
+use tonic::{async_trait, Code, Status};
 use tracing::{instrument, Instrument as _};
 
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
-pub struct GRPCBlobService {
+pub struct GRPCBlobService<T>
+where
+    T: Clone,
+{
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
-    grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
+    grpc_client: proto::blob_service_client::BlobServiceClient<T>,
 }
 
-impl GRPCBlobService {
+impl<T> GRPCBlobService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
+{
     /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient].
     /// panics if called outside the context of a tokio runtime.
-    pub fn from_client(
-        grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
-    ) -> Self {
+    pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self {
         Self { grpc_client }
     }
 }
 
 #[async_trait]
-impl BlobService for GRPCBlobService {
+impl<T> BlobService for GRPCBlobService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
+    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
+    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
+    T::Future: Send,
+{
     #[instrument(skip(self, digest), fields(blob.digest=%digest))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
         let mut grpc_client = self.grpc_client.clone();
@@ -337,7 +347,6 @@ mod tests {
                     .await
                     .expect("must succeed"),
             );
-
             GRPCBlobService::from_client(client)
         };