about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r--tvix/castore/src/directoryservice/bigtable.rs25
-rw-r--r--tvix/castore/src/directoryservice/combinators.rs18
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs22
-rw-r--r--tvix/castore/src/directoryservice/memory.rs16
-rw-r--r--tvix/castore/src/directoryservice/object_store.rs27
-rw-r--r--tvix/castore/src/directoryservice/redb.rs25
-rw-r--r--tvix/castore/src/directoryservice/tests/utils.rs1
7 files changed, 91 insertions, 43 deletions
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs
index 73ab4342d832..7473481c94b5 100644
--- a/tvix/castore/src/directoryservice/bigtable.rs
+++ b/tvix/castore/src/directoryservice/bigtable.rs
@@ -37,6 +37,7 @@ const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
 /// directly at the root, so rely on store composition.
 #[derive(Clone)]
 pub struct BigtableDirectoryService {
+    instance_name: String,
     client: bigtable::BigTable,
     params: BigtableParameters,
 
@@ -49,7 +50,10 @@ pub struct BigtableDirectoryService {
 
 impl BigtableDirectoryService {
     #[cfg(not(test))]
-    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+    pub async fn connect(
+        instance_name: String,
+        params: BigtableParameters,
+    ) -> Result<Self, bigtable::Error> {
         let connection = bigtable::BigTableConnection::new(
             &params.project_id,
             &params.instance_name,
@@ -60,13 +64,17 @@ impl BigtableDirectoryService {
         .await?;
 
         Ok(Self {
+            instance_name,
             client: connection.client(),
             params,
         })
     }
 
     #[cfg(test)]
-    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+    pub async fn connect(
+        instance_name: String,
+        params: BigtableParameters,
+    ) -> Result<Self, bigtable::Error> {
         use std::time::Duration;
 
         use async_process::{Command, Stdio};
@@ -135,6 +143,7 @@ impl BigtableDirectoryService {
         )?;
 
         Ok(Self {
+            instance_name,
             client: connection.client(),
             params,
             emulator: (tmpdir, emulator_process).into(),
@@ -150,7 +159,7 @@ fn derive_directory_key(digest: &B3Digest) -> String {
 
 #[async_trait]
 impl DirectoryService for BigtableDirectoryService {
-    #[instrument(skip(self, digest), err, fields(directory.digest = %digest))]
+    #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
     async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let mut client = self.client.clone();
         let directory_key = derive_directory_key(digest);
@@ -250,7 +259,7 @@ impl DirectoryService for BigtableDirectoryService {
         Ok(Some(directory))
     }
 
-    #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))]
+    #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
     async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         let directory_digest = directory.digest();
         let mut client = self.client.clone();
@@ -300,7 +309,7 @@ impl DirectoryService for BigtableDirectoryService {
         Ok(directory_digest)
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
@@ -308,7 +317,7 @@ impl DirectoryService for BigtableDirectoryService {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
     where
         Self: Clone,
@@ -346,11 +355,11 @@ impl ServiceBuilder for BigtableParameters {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
         Ok(Arc::new(
-            BigtableDirectoryService::connect(self.clone()).await?,
+            BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
         ))
     }
 }
diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs
index 4dfc19540c47..450c642715ac 100644
--- a/tvix/castore/src/directoryservice/combinators.rs
+++ b/tvix/castore/src/directoryservice/combinators.rs
@@ -22,13 +22,18 @@ use crate::Error;
 /// Inserts and listings are not implemented for now.
 #[derive(Clone)]
 pub struct Cache<DS1, DS2> {
+    instance_name: String,
     near: DS1,
     far: DS2,
 }
 
 impl<DS1, DS2> Cache<DS1, DS2> {
-    pub fn new(near: DS1, far: DS2) -> Self {
-        Self { near, far }
+    pub fn new(instance_name: String, near: DS1, far: DS2) -> Self {
+        Self {
+            instance_name,
+            near,
+            far,
+        }
     }
 }
 
@@ -38,7 +43,7 @@ where
     DS1: DirectoryService + Clone + 'static,
     DS2: DirectoryService + Clone + 'static,
 {
-    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
+    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
     async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         match self.near.get(digest).await? {
             Some(directory) => {
@@ -80,12 +85,12 @@ where
         }
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name = %self.instance_name))]
     async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> {
         Err(Error::StorageError("unimplemented".to_string()))
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
@@ -166,7 +171,7 @@ impl ServiceBuilder for CacheConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let (near, far) = futures::join!(
@@ -174,6 +179,7 @@ impl ServiceBuilder for CacheConfig {
             context.resolve::<Self::Output>(self.far.clone())
         );
         Ok(Arc::new(Cache {
+            instance_name: instance_name.to_string(),
             near: near?,
             far: far?,
         }))
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index 9696c5631949..3fd177a34f28 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -17,6 +17,7 @@ use tracing::{instrument, warn, Instrument as _};
 /// Connects to a (remote) tvix-store DirectoryService over gRPC.
 #[derive(Clone)]
 pub struct GRPCDirectoryService<T> {
+    instance_name: String,
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
     grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
@@ -26,9 +27,13 @@ impl<T> GRPCDirectoryService<T> {
     /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient].
     /// panics if called outside the context of a tokio runtime.
     pub fn from_client(
+        instance_name: String,
         grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
     ) -> Self {
-        Self { grpc_client }
+        Self {
+            instance_name,
+            grpc_client,
+        }
     }
 }
 
@@ -40,7 +45,7 @@ where
     <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
     T::Future: Send,
 {
-    #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))]
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))]
     async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
@@ -81,7 +86,7 @@ where
         }
     }
 
-    #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))]
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
     async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> {
         let resp = self
             .grpc_client
@@ -101,7 +106,7 @@ where
         }
     }
 
-    #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
@@ -240,13 +245,16 @@ impl ServiceBuilder for GRPCDirectoryServiceConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let client = proto::directory_service_client::DirectoryServiceClient::new(
             crate::tonic::channel_from_url(&self.url.parse()?).await?,
         );
-        Ok(Arc::new(GRPCDirectoryService::from_client(client)))
+        Ok(Arc::new(GRPCDirectoryService::from_client(
+            instance_name.to_string(),
+            client,
+        )))
     }
 }
 
@@ -374,7 +382,7 @@ mod tests {
                     .await
                     .expect("must succeed"),
             );
-            GRPCDirectoryService::from_client(client)
+            GRPCDirectoryService::from_client("test-instance".into(), client)
         };
 
         assert!(grpc_client
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
index b039d9bc7d84..a43d7b8d8d31 100644
--- a/tvix/castore/src/directoryservice/memory.rs
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -13,12 +13,13 @@ use crate::proto;
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
+    instance_name: String,
     db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>,
 }
 
 #[async_trait]
 impl DirectoryService for MemoryDirectoryService {
-    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
+    #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
     async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let db = self.db.read().await;
 
@@ -45,7 +46,7 @@ impl DirectoryService for MemoryDirectoryService {
         }
     }
 
-    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
+    #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
     async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
@@ -56,7 +57,7 @@ impl DirectoryService for MemoryDirectoryService {
         Ok(digest)
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
@@ -64,7 +65,7 @@ impl DirectoryService for MemoryDirectoryService {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
     where
         Self: Clone,
@@ -93,9 +94,12 @@ impl ServiceBuilder for MemoryDirectoryServiceConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
-        Ok(Arc::new(MemoryDirectoryService::default()))
+        Ok(Arc::new(MemoryDirectoryService {
+            instance_name: instance_name.to_string(),
+            db: Default::default(),
+        }))
     }
 }
diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs
index 5b5281abcd2f..77578ec92f02 100644
--- a/tvix/castore/src/directoryservice/object_store.rs
+++ b/tvix/castore/src/directoryservice/object_store.rs
@@ -32,6 +32,7 @@ use crate::{proto, B3Digest, Error, Node};
 /// be returned to the client in get_recursive.
 #[derive(Clone)]
 pub struct ObjectStoreDirectoryService {
+    instance_name: String,
     object_store: Arc<dyn ObjectStore>,
     base_path: Path,
 }
@@ -63,6 +64,7 @@ impl ObjectStoreDirectoryService {
         let (object_store, path) = object_store::parse_url_opts(url, options)?;
 
         Ok(Self {
+            instance_name: "default".into(),
             object_store: Arc::new(object_store),
             base_path: path,
         })
@@ -72,18 +74,26 @@ impl ObjectStoreDirectoryService {
     pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> {
         Self::parse_url_opts(url, Vec::<(String, String)>::new())
     }
+
+    pub fn new(instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path) -> Self {
+        Self {
+            instance_name,
+            object_store,
+            base_path,
+        }
+    }
 }
 
 #[async_trait]
 impl DirectoryService for ObjectStoreDirectoryService {
     /// This is the same steps as for get_recursive anyways, so we just call get_recursive and
     /// return the first element of the stream and drop the request.
-    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))]
     async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         self.get_recursive(digest).take(1).next().await.transpose()
     }
 
-    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
     async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         // Ensure the directory doesn't contain other directory children
         if directory
@@ -100,7 +110,7 @@ impl DirectoryService for ObjectStoreDirectoryService {
         handle.close().await
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
@@ -219,17 +229,18 @@ impl ServiceBuilder for ObjectStoreDirectoryServiceConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let (object_store, path) = object_store::parse_url_opts(
             &self.object_store_url.parse()?,
             &self.object_store_options,
         )?;
-        Ok(Arc::new(ObjectStoreDirectoryService {
-            object_store: Arc::new(object_store),
-            base_path: path,
-        }))
+        Ok(Arc::new(ObjectStoreDirectoryService::new(
+            instance_name.to_string(),
+            Arc::new(object_store),
+            path,
+        )))
     }
 }
 
diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs
index d253df503bb3..ba1fb682d5da 100644
--- a/tvix/castore/src/directoryservice/redb.rs
+++ b/tvix/castore/src/directoryservice/redb.rs
@@ -19,6 +19,7 @@ const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> =
 
 #[derive(Clone)]
 pub struct RedbDirectoryService {
+    instance_name: String,
     // We wrap the db in an Arc to be able to move it into spawn_blocking,
     // as discussed in https://github.com/cberner/redb/issues/789
     db: Arc<Database>,
@@ -27,7 +28,7 @@ pub struct RedbDirectoryService {
 impl RedbDirectoryService {
     /// Constructs a new instance using the specified filesystem path for
     /// storage.
-    pub async fn new(path: PathBuf) -> Result<Self, Error> {
+    pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
         if path == PathBuf::from("/") {
             return Err(Error::StorageError(
                 "cowardly refusing to open / with redb".to_string(),
@@ -41,7 +42,10 @@ impl RedbDirectoryService {
         })
         .await??;
 
-        Ok(Self { db: Arc::new(db) })
+        Ok(Self {
+            instance_name,
+            db: Arc::new(db),
+        })
     }
 
     /// Constructs a new instance using the in-memory backend.
@@ -51,7 +55,10 @@ impl RedbDirectoryService {
 
         create_schema(&db)?;
 
-        Ok(Self { db: Arc::new(db) })
+        Ok(Self {
+            instance_name: "default".into(),
+            db: Arc::new(db),
+        })
     }
 }
 
@@ -68,7 +75,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
 
 #[async_trait]
 impl DirectoryService for RedbDirectoryService {
-    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
+    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
     async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let db = self.db.clone();
 
@@ -121,7 +128,7 @@ impl DirectoryService for RedbDirectoryService {
         Ok(Some(directory))
     }
 
-    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
+    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
     async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         tokio::task::spawn_blocking({
             let db = self.db.clone();
@@ -146,7 +153,7 @@ impl DirectoryService for RedbDirectoryService {
         .await?
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
@@ -275,7 +282,7 @@ impl ServiceBuilder for RedbDirectoryServiceConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         match self {
@@ -297,7 +304,9 @@ impl ServiceBuilder for RedbDirectoryServiceConfig {
             RedbDirectoryServiceConfig {
                 is_temporary: false,
                 path: Some(path),
-            } => Ok(Arc::new(RedbDirectoryService::new(path.into()).await?)),
+            } => Ok(Arc::new(
+                RedbDirectoryService::new(instance_name.to_string(), path.into()).await?,
+            )),
         }
     }
 }
diff --git a/tvix/castore/src/directoryservice/tests/utils.rs b/tvix/castore/src/directoryservice/tests/utils.rs
index 3d245ea412d5..1c5d68de0108 100644
--- a/tvix/castore/src/directoryservice/tests/utils.rs
+++ b/tvix/castore/src/directoryservice/tests/utils.rs
@@ -33,6 +33,7 @@ pub async fn make_grpc_directory_service_client() -> Box<dyn DirectoryService> {
     // Create a client, connecting to the right side. The URI is unused.
     let mut maybe_right = Some(right);
     Box::new(GRPCDirectoryService::from_client(
+        "default".into(),
         DirectoryServiceClient::new(
             Endpoint::try_from("http://[::]:50051")
                 .unwrap()