diff options
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 0db3dfea4ad8..3453657d07ab 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -24,6 +24,7 @@ use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] pub struct GRPCBlobService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::blob_service_client::BlobServiceClient<T>, @@ -31,8 +32,14 @@ pub struct GRPCBlobService<T> { impl<T> GRPCBlobService<T> { /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. - pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self { - Self { grpc_client } + pub fn from_client( + instance_name: String, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, + ) -> Self { + Self { + instance_name, + grpc_client, + } } } @@ -44,7 +51,7 @@ where <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, T::Future: Send, { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { match self .grpc_client @@ -61,7 +68,7 @@ where } } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // First try to get a list of chunks. In case there's only one chunk returned, // buffer its data into a Vec, otherwise use a ChunkedReader. @@ -124,7 +131,7 @@ where /// Returns a BlobWriter, that'll internally wrap each write in a /// [proto::BlobChunk], which is send to the gRPC server. - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // set up an mpsc channel passing around Bytes. let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); @@ -154,7 +161,7 @@ where }) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { let resp = self .grpc_client @@ -205,13 +212,16 @@ impl ServiceBuilder for GRPCBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let client = proto::blob_service_client::BlobServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, ); - Ok(Arc::new(GRPCBlobService::from_client(client))) + Ok(Arc::new(GRPCBlobService::from_client( + instance_name.to_string(), + client, + ))) } } @@ -375,7 +385,7 @@ mod tests { .await .expect("must succeed"), ); - GRPCBlobService::from_client(client) + GRPCBlobService::from_client("default".into(), client) }; let has = grpc_client |