diff options
author | Bob van der Linden <bobvanderlinden@gmail.com> | 2024-11-06T22·13+0100 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-11-08T20·16+0000 |
commit | cfa4154131719db3ff687261bca95481cba609ab (patch) | |
tree | 7fa2da507295b48d861e76f1c26851fd0a55855f /tvix/castore/src/directoryservice | |
parent | 951d25676b8a61f3068d7d54958695739a71aa68 (diff) |
feat(tvix): add instance_name to instrumentation of *Services r/8896
Currently it is not possible to distinguish between tracing of the same *Service type whenever there are multiple of them. Now the instance_name of ServiceBuilder is passed into the *Service and used in the existing instrument as the `instance_name` field. Places that did not already have a instance_name in its context use `"default"`. In tests I used `"test"`. Change-Id: Ia20bf2a7bb849a781e370d087ba7ddb3be79f654 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12739 Tested-by: BuildkiteCI Autosubmit: Bob van der Linden <bobvanderlinden@gmail.com> Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r-- | tvix/castore/src/directoryservice/bigtable.rs | 25 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/combinators.rs | 18 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 22 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/memory.rs | 16 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/object_store.rs | 27 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/redb.rs | 25 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/utils.rs | 1 |
7 files changed, 91 insertions, 43 deletions
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index 73ab4342d832..7473481c94b5 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -37,6 +37,7 @@ const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; /// directly at the root, so rely on store composition. #[derive(Clone)] pub struct BigtableDirectoryService { + instance_name: String, client: bigtable::BigTable, params: BigtableParameters, @@ -49,7 +50,10 @@ pub struct BigtableDirectoryService { impl BigtableDirectoryService { #[cfg(not(test))] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { let connection = bigtable::BigTableConnection::new( ¶ms.project_id, ¶ms.instance_name, @@ -60,13 +64,17 @@ impl BigtableDirectoryService { .await?; Ok(Self { + instance_name, client: connection.client(), params, }) } #[cfg(test)] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { use std::time::Duration; use async_process::{Command, Stdio}; @@ -135,6 +143,7 @@ impl BigtableDirectoryService { )?; Ok(Self { + instance_name, client: connection.client(), params, emulator: (tmpdir, emulator_process).into(), @@ -150,7 +159,7 @@ fn derive_directory_key(digest: &B3Digest) -> String { #[async_trait] impl DirectoryService for BigtableDirectoryService { - #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] + #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let mut client = self.client.clone(); let directory_key = derive_directory_key(digest); @@ -250,7 +259,7 @@ impl DirectoryService for BigtableDirectoryService { Ok(Some(directory)) } - #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let directory_digest = directory.digest(); let mut client = self.client.clone(); @@ -300,7 +309,7 @@ impl DirectoryService for BigtableDirectoryService { Ok(directory_digest) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -308,7 +317,7 @@ impl DirectoryService for BigtableDirectoryService { traverse_directory(self.clone(), root_directory_digest) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, @@ -346,11 +355,11 @@ impl ServiceBuilder for BigtableParameters { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> { Ok(Arc::new( - BigtableDirectoryService::connect(self.clone()).await?, + BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?, )) } } diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index 4dfc19540c47..450c642715ac 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -22,13 +22,18 @@ use crate::Error; /// Inserts and listings are not implemented for now. #[derive(Clone)] pub struct Cache<DS1, DS2> { + instance_name: String, near: DS1, far: DS2, } impl<DS1, DS2> Cache<DS1, DS2> { - pub fn new(near: DS1, far: DS2) -> Self { - Self { near, far } + pub fn new(instance_name: String, near: DS1, far: DS2) -> Self { + Self { + instance_name, + near, + far, + } } } @@ -38,7 +43,7 @@ where DS1: DirectoryService + Clone + 'static, DS2: DirectoryService + Clone + 'static, { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] + #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { match self.near.get(digest).await? { Some(directory) => { @@ -80,12 +85,12 @@ where } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name = %self.instance_name))] async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> { Err(Error::StorageError("unimplemented".to_string())) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -166,7 +171,7 @@ impl ServiceBuilder for CacheConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (near, far) = futures::join!( @@ -174,6 +179,7 @@ impl ServiceBuilder for CacheConfig { context.resolve::<Self::Output>(self.far.clone()) ); Ok(Arc::new(Cache { + instance_name: instance_name.to_string(), near: near?, far: far?, })) diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 9696c5631949..3fd177a34f28 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -17,6 +17,7 @@ use tracing::{instrument, warn, Instrument as _}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] pub struct GRPCDirectoryService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, @@ -26,9 +27,13 @@ impl<T> GRPCDirectoryService<T> { /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( + instance_name: String, grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, ) -> Self { - Self { grpc_client } + Self { + instance_name, + grpc_client, + } } } @@ -40,7 +45,7 @@ where <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, T::Future: Send, { - #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); @@ -81,7 +86,7 @@ where } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client @@ -101,7 +106,7 @@ where } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -240,13 +245,16 @@ impl ServiceBuilder for GRPCDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let client = proto::directory_service_client::DirectoryServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, ); - Ok(Arc::new(GRPCDirectoryService::from_client(client))) + Ok(Arc::new(GRPCDirectoryService::from_client( + instance_name.to_string(), + client, + ))) } } @@ -374,7 +382,7 @@ mod tests { .await .expect("must succeed"), ); - GRPCDirectoryService::from_client(client) + GRPCDirectoryService::from_client("test-instance".into(), client) }; assert!(grpc_client diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index b039d9bc7d84..a43d7b8d8d31 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -13,12 +13,13 @@ use crate::proto; #[derive(Clone, Default)] pub struct MemoryDirectoryService { + instance_name: String, db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, } #[async_trait] impl DirectoryService for MemoryDirectoryService { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] + #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.read().await; @@ -45,7 +46,7 @@ impl DirectoryService for MemoryDirectoryService { } } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); @@ -56,7 +57,7 @@ impl DirectoryService for MemoryDirectoryService { Ok(digest) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -64,7 +65,7 @@ impl DirectoryService for MemoryDirectoryService { traverse_directory(self.clone(), root_directory_digest) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, @@ -93,9 +94,12 @@ impl ServiceBuilder for MemoryDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { - Ok(Arc::new(MemoryDirectoryService::default())) + Ok(Arc::new(MemoryDirectoryService { + instance_name: instance_name.to_string(), + db: Default::default(), + })) } } diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index 5b5281abcd2f..77578ec92f02 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -32,6 +32,7 @@ use crate::{proto, B3Digest, Error, Node}; /// be returned to the client in get_recursive. #[derive(Clone)] pub struct ObjectStoreDirectoryService { + instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path, } @@ -63,6 +64,7 @@ impl ObjectStoreDirectoryService { let (object_store, path) = object_store::parse_url_opts(url, options)?; Ok(Self { + instance_name: "default".into(), object_store: Arc::new(object_store), base_path: path, }) @@ -72,18 +74,26 @@ impl ObjectStoreDirectoryService { pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { Self::parse_url_opts(url, Vec::<(String, String)>::new()) } + + pub fn new(instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path) -> Self { + Self { + instance_name, + object_store, + base_path, + } + } } #[async_trait] impl DirectoryService for ObjectStoreDirectoryService { /// This is the same steps as for get_recursive anyways, so we just call get_recursive and /// return the first element of the stream and drop the request. - #[instrument(skip(self, digest), fields(directory.digest = %digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.get_recursive(digest).take(1).next().await.transpose() } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { // Ensure the directory doesn't contain other directory children if directory @@ -100,7 +110,7 @@ impl DirectoryService for ObjectStoreDirectoryService { handle.close().await } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -219,17 +229,18 @@ impl ServiceBuilder for ObjectStoreDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (object_store, path) = object_store::parse_url_opts( &self.object_store_url.parse()?, &self.object_store_options, )?; - Ok(Arc::new(ObjectStoreDirectoryService { - object_store: Arc::new(object_store), - base_path: path, - })) + Ok(Arc::new(ObjectStoreDirectoryService::new( + instance_name.to_string(), + Arc::new(object_store), + path, + ))) } } diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs index d253df503bb3..ba1fb682d5da 100644 --- a/tvix/castore/src/directoryservice/redb.rs +++ b/tvix/castore/src/directoryservice/redb.rs @@ -19,6 +19,7 @@ const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = #[derive(Clone)] pub struct RedbDirectoryService { + instance_name: String, // We wrap the db in an Arc to be able to move it into spawn_blocking, // as discussed in https://github.com/cberner/redb/issues/789 db: Arc<Database>, @@ -27,7 +28,7 @@ pub struct RedbDirectoryService { impl RedbDirectoryService { /// Constructs a new instance using the specified filesystem path for /// storage. - pub async fn new(path: PathBuf) -> Result<Self, Error> { + pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> { if path == PathBuf::from("/") { return Err(Error::StorageError( "cowardly refusing to open / with redb".to_string(), @@ -41,7 +42,10 @@ impl RedbDirectoryService { }) .await??; - Ok(Self { db: Arc::new(db) }) + Ok(Self { + instance_name, + db: Arc::new(db), + }) } /// Constructs a new instance using the in-memory backend. @@ -51,7 +55,10 @@ impl RedbDirectoryService { create_schema(&db)?; - Ok(Self { db: Arc::new(db) }) + Ok(Self { + instance_name: "default".into(), + db: Arc::new(db), + }) } } @@ -68,7 +75,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { #[async_trait] impl DirectoryService for RedbDirectoryService { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] + #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.clone(); @@ -121,7 +128,7 @@ impl DirectoryService for RedbDirectoryService { Ok(Some(directory)) } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); @@ -146,7 +153,7 @@ impl DirectoryService for RedbDirectoryService { .await? } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -275,7 +282,7 @@ impl ServiceBuilder for RedbDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { match self { @@ -297,7 +304,9 @@ impl ServiceBuilder for RedbDirectoryServiceConfig { RedbDirectoryServiceConfig { is_temporary: false, path: Some(path), - } => Ok(Arc::new(RedbDirectoryService::new(path.into()).await?)), + } => Ok(Arc::new( + RedbDirectoryService::new(instance_name.to_string(), path.into()).await?, + )), } } } diff --git a/tvix/castore/src/directoryservice/tests/utils.rs b/tvix/castore/src/directoryservice/tests/utils.rs index 3d245ea412d5..1c5d68de0108 100644 --- a/tvix/castore/src/directoryservice/tests/utils.rs +++ b/tvix/castore/src/directoryservice/tests/utils.rs @@ -33,6 +33,7 @@ pub async fn make_grpc_directory_service_client() -> Box<dyn DirectoryService> { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); Box::new(GRPCDirectoryService::from_client( + "default".into(), DirectoryServiceClient::new( Endpoint::try_from("http://[::]:50051") .unwrap() |