about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r--tvix/castore/src/blobservice/combinator.rs11
-rw-r--r--tvix/castore/src/blobservice/grpc.rs28
-rw-r--r--tvix/castore/src/blobservice/memory.rs14
-rw-r--r--tvix/castore/src/blobservice/object_store.rs13
-rw-r--r--tvix/castore/src/blobservice/tests/utils.rs23
5 files changed, 56 insertions, 33 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs
index 6a964c8a8440..e5f61d6d0c2a 100644
--- a/tvix/castore/src/blobservice/combinator.rs
+++ b/tvix/castore/src/blobservice/combinator.rs
@@ -16,6 +16,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
 /// blobservice again, before falling back to the remote one.
 /// The remote BlobService is never written to.
 pub struct CombinedBlobService<BL, BR> {
+    instance_name: String,
     local: BL,
     remote: BR,
 }
@@ -27,6 +28,7 @@ where
 {
     fn clone(&self) -> Self {
         Self {
+            instance_name: self.instance_name.clone(),
             local: self.local.clone(),
             remote: self.remote.clone(),
         }
@@ -39,12 +41,12 @@ where
     BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
     BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
 {
-    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
         Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?)
     }
 
-    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
     async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
         if self.local.as_ref().has(digest).await? {
             // local store has the blob, so we can assume it also has all chunks.
@@ -84,7 +86,7 @@ where
         }
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     async fn open_write(&self) -> Box<dyn BlobWriter> {
         // direct writes to the local one.
         self.local.as_ref().open_write().await
@@ -113,7 +115,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig {
     type Output = dyn BlobService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         context: &CompositionContext,
     ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
         let (local, remote) = futures::join!(
@@ -121,6 +123,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig {
             context.resolve(self.remote.clone())
         );
         Ok(Arc::new(CombinedBlobService {
+            instance_name: instance_name.to_string(),
             local: local?,
             remote: remote?,
         }))
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 0db3dfea4ad8..3453657d07ab 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -24,6 +24,7 @@ use tracing::{instrument, Instrument as _};
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
 pub struct GRPCBlobService<T> {
+    instance_name: String,
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
     grpc_client: proto::blob_service_client::BlobServiceClient<T>,
@@ -31,8 +32,14 @@ pub struct GRPCBlobService<T> {
 
 impl<T> GRPCBlobService<T> {
     /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient].
-    pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self {
-        Self { grpc_client }
+    pub fn from_client(
+        instance_name: String,
+        grpc_client: proto::blob_service_client::BlobServiceClient<T>,
+    ) -> Self {
+        Self {
+            instance_name,
+            grpc_client,
+        }
     }
 }
 
@@ -44,7 +51,7 @@ where
     <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
     T::Future: Send,
 {
-    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
         match self
             .grpc_client
@@ -61,7 +68,7 @@ where
         }
     }
 
-    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
         // First try to get a list of chunks. In case there's only one chunk returned,
         // buffer its data into a Vec, otherwise use a ChunkedReader.
@@ -124,7 +131,7 @@ where
 
     /// Returns a BlobWriter, that'll internally wrap each write in a
     /// [proto::BlobChunk], which is send to the gRPC server.
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     async fn open_write(&self) -> Box<dyn BlobWriter> {
         // set up an mpsc channel passing around Bytes.
         let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
@@ -154,7 +161,7 @@ where
         })
     }
 
-    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
     async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
         let resp = self
             .grpc_client
@@ -205,13 +212,16 @@ impl ServiceBuilder for GRPCBlobServiceConfig {
     type Output = dyn BlobService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let client = proto::blob_service_client::BlobServiceClient::new(
             crate::tonic::channel_from_url(&self.url.parse()?).await?,
         );
-        Ok(Arc::new(GRPCBlobService::from_client(client)))
+        Ok(Arc::new(GRPCBlobService::from_client(
+            instance_name.to_string(),
+            client,
+        )))
     }
 }
 
@@ -375,7 +385,7 @@ mod tests {
                     .await
                     .expect("must succeed"),
             );
-            GRPCBlobService::from_client(client)
+            GRPCBlobService::from_client("default".into(), client)
         };
 
         let has = grpc_client
diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs
index 3d733f950470..348b8bb56d5b 100644
--- a/tvix/castore/src/blobservice/memory.rs
+++ b/tvix/castore/src/blobservice/memory.rs
@@ -11,18 +11,19 @@ use crate::{B3Digest, Error};
 
 #[derive(Clone, Default)]
 pub struct MemoryBlobService {
+    instance_name: String,
     db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
 }
 
 #[async_trait]
 impl BlobService for MemoryBlobService {
-    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, ret, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
         let db = self.db.read();
         Ok(db.contains_key(digest))
     }
 
-    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
         let db = self.db.read();
 
@@ -32,7 +33,7 @@ impl BlobService for MemoryBlobService {
         }
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     async fn open_write(&self) -> Box<dyn BlobWriter> {
         Box::new(MemoryBlobWriter::new(self.db.clone()))
     }
@@ -58,10 +59,13 @@ impl ServiceBuilder for MemoryBlobServiceConfig {
     type Output = dyn BlobService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
-        Ok(Arc::new(MemoryBlobService::default()))
+        Ok(Arc::new(MemoryBlobService {
+            instance_name: instance_name.to_string(),
+            db: Default::default(),
+        }))
     }
 }
 
diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs
index b688ebafc7f4..10874af64011 100644
--- a/tvix/castore/src/blobservice/object_store.rs
+++ b/tvix/castore/src/blobservice/object_store.rs
@@ -64,6 +64,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
 /// all keys stored so far, but no promises ;-)
 #[derive(Clone)]
 pub struct ObjectStoreBlobService {
+    instance_name: String,
     object_store: Arc<dyn ObjectStore>,
     base_path: Path,
 
@@ -92,7 +93,7 @@ fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path {
 
 #[async_trait]
 impl BlobService for ObjectStoreBlobService {
-    #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
         // TODO: clarify if this should work for chunks or not, and explicitly
         // document in the proto docs.
@@ -112,7 +113,7 @@ impl BlobService for ObjectStoreBlobService {
         }
     }
 
-    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
         // handle reading the empty blob.
         if digest.as_slice() == blake3::hash(b"").as_bytes() {
@@ -169,7 +170,7 @@ impl BlobService for ObjectStoreBlobService {
         }
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     async fn open_write(&self) -> Box<dyn BlobWriter> {
         // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking
         // needs an AsyncRead, so we create a pipe here.
@@ -192,7 +193,7 @@ impl BlobService for ObjectStoreBlobService {
         })
     }
 
-    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
         match self
             .object_store
@@ -294,7 +295,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig {
     type Output = dyn BlobService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let (object_store, path) = object_store::parse_url_opts(
@@ -302,6 +303,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig {
             &self.object_store_options,
         )?;
         Ok(Arc::new(ObjectStoreBlobService {
+            instance_name: instance_name.to_string(),
             object_store: Arc::new(object_store),
             base_path: path,
             avg_chunk_size: self.avg_chunk_size,
@@ -582,6 +584,7 @@ mod test {
             object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap();
         let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store);
         let blobsvc = Arc::new(ObjectStoreBlobService {
+            instance_name: "test".into(),
             object_store: object_store.clone(),
             avg_chunk_size: default_avg_chunk_size(),
             base_path,
diff --git a/tvix/castore/src/blobservice/tests/utils.rs b/tvix/castore/src/blobservice/tests/utils.rs
index 7df4f00d3a09..7032d633dad0 100644
--- a/tvix/castore/src/blobservice/tests/utils.rs
+++ b/tvix/castore/src/blobservice/tests/utils.rs
@@ -29,14 +29,17 @@ pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> {
     // Create a client, connecting to the right side. The URI is unused.
     let mut maybe_right = Some(right);
 
-    Box::new(GRPCBlobService::from_client(BlobServiceClient::new(
-        Endpoint::try_from("http://[::]:50051")
-            .unwrap()
-            .connect_with_connector(tower::service_fn(move |_: Uri| {
-                let right = maybe_right.take().unwrap();
-                async move { Ok::<_, std::io::Error>(TokioIo::new(right)) }
-            }))
-            .await
-            .unwrap(),
-    )))
+    Box::new(GRPCBlobService::from_client(
+        "default".into(),
+        BlobServiceClient::new(
+            Endpoint::try_from("http://[::]:50051")
+                .unwrap()
+                .connect_with_connector(tower::service_fn(move |_: Uri| {
+                    let right = maybe_right.take().unwrap();
+                    async move { Ok::<_, std::io::Error>(TokioIo::new(right)) }
+                }))
+                .await
+                .unwrap(),
+        ),
+    ))
 }