about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice/grpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs28
1 files changed, 19 insertions, 9 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 0db3dfea4ad8..3453657d07ab 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -24,6 +24,7 @@ use tracing::{instrument, Instrument as _};
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
 pub struct GRPCBlobService<T> {
+    instance_name: String,
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
     grpc_client: proto::blob_service_client::BlobServiceClient<T>,
@@ -31,8 +32,14 @@ pub struct GRPCBlobService<T> {
 
 impl<T> GRPCBlobService<T> {
     /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient].
-    pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self {
-        Self { grpc_client }
+    pub fn from_client(
+        instance_name: String,
+        grpc_client: proto::blob_service_client::BlobServiceClient<T>,
+    ) -> Self {
+        Self {
+            instance_name,
+            grpc_client,
+        }
     }
 }
 
@@ -44,7 +51,7 @@ where
     <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
     T::Future: Send,
 {
-    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
         match self
             .grpc_client
@@ -61,7 +68,7 @@ where
         }
     }
 
-    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
         // First try to get a list of chunks. In case there's only one chunk returned,
         // buffer its data into a Vec, otherwise use a ChunkedReader.
@@ -124,7 +131,7 @@ where
 
     /// Returns a BlobWriter, that'll internally wrap each write in a
     /// [proto::BlobChunk], which is send to the gRPC server.
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     async fn open_write(&self) -> Box<dyn BlobWriter> {
         // set up an mpsc channel passing around Bytes.
         let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
@@ -154,7 +161,7 @@ where
         })
     }
 
-    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
     async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
         let resp = self
             .grpc_client
@@ -205,13 +212,16 @@ impl ServiceBuilder for GRPCBlobServiceConfig {
     type Output = dyn BlobService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let client = proto::blob_service_client::BlobServiceClient::new(
             crate::tonic::channel_from_url(&self.url.parse()?).await?,
         );
-        Ok(Arc::new(GRPCBlobService::from_client(client)))
+        Ok(Arc::new(GRPCBlobService::from_client(
+            instance_name.to_string(),
+            client,
+        )))
     }
 }
 
@@ -375,7 +385,7 @@ mod tests {
                     .await
                     .expect("must succeed"),
             );
-            GRPCBlobService::from_client(client)
+            GRPCBlobService::from_client("default".into(), client)
         };
 
         let has = grpc_client