diff options
Diffstat (limited to 'tvix')
23 files changed, 270 insertions, 137 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index 6a964c8a8440..e5f61d6d0c2a 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -16,6 +16,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; /// blobservice again, before falling back to the remote one. /// The remote BlobService is never written to. pub struct CombinedBlobService<BL, BR> { + instance_name: String, local: BL, remote: BR, } @@ -27,6 +28,7 @@ where { fn clone(&self) -> Self { Self { + instance_name: self.instance_name.clone(), local: self.local.clone(), remote: self.remote.clone(), } @@ -39,12 +41,12 @@ where BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> { Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> { if self.local.as_ref().has(digest).await? { // local store has the blob, so we can assume it also has all chunks. @@ -84,7 +86,7 @@ where } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // direct writes to the local one. self.local.as_ref().open_write().await @@ -113,7 +115,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { let (local, remote) = futures::join!( @@ -121,6 +123,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig { context.resolve(self.remote.clone()) ); Ok(Arc::new(CombinedBlobService { + instance_name: instance_name.to_string(), local: local?, remote: remote?, })) diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 0db3dfea4ad8..3453657d07ab 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -24,6 +24,7 @@ use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] pub struct GRPCBlobService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::blob_service_client::BlobServiceClient<T>, @@ -31,8 +32,14 @@ pub struct GRPCBlobService<T> { impl<T> GRPCBlobService<T> { /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. - pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self { - Self { grpc_client } + pub fn from_client( + instance_name: String, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, + ) -> Self { + Self { + instance_name, + grpc_client, + } } } @@ -44,7 +51,7 @@ where <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, T::Future: Send, { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { match self .grpc_client @@ -61,7 +68,7 @@ where } } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // First try to get a list of chunks. In case there's only one chunk returned, // buffer its data into a Vec, otherwise use a ChunkedReader. @@ -124,7 +131,7 @@ where /// Returns a BlobWriter, that'll internally wrap each write in a /// [proto::BlobChunk], which is send to the gRPC server. - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // set up an mpsc channel passing around Bytes. let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); @@ -154,7 +161,7 @@ where }) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { let resp = self .grpc_client @@ -205,13 +212,16 @@ impl ServiceBuilder for GRPCBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let client = proto::blob_service_client::BlobServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, ); - Ok(Arc::new(GRPCBlobService::from_client(client))) + Ok(Arc::new(GRPCBlobService::from_client( + instance_name.to_string(), + client, + ))) } } @@ -375,7 +385,7 @@ mod tests { .await .expect("must succeed"), ); - GRPCBlobService::from_client(client) + GRPCBlobService::from_client("default".into(), client) }; let has = grpc_client diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs index 3d733f950470..348b8bb56d5b 100644 --- a/tvix/castore/src/blobservice/memory.rs +++ b/tvix/castore/src/blobservice/memory.rs @@ -11,18 +11,19 @@ use crate::{B3Digest, Error}; #[derive(Clone, Default)] pub struct MemoryBlobService { + instance_name: String, db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, } #[async_trait] impl BlobService for MemoryBlobService { - #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + #[instrument(skip_all, ret, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { let db = self.db.read(); Ok(db.contains_key(digest)) } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { let db = self.db.read(); @@ -32,7 +33,7 @@ impl BlobService for MemoryBlobService { } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { Box::new(MemoryBlobWriter::new(self.db.clone())) } @@ -58,10 +59,13 @@ impl ServiceBuilder for MemoryBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { - Ok(Arc::new(MemoryBlobService::default())) + Ok(Arc::new(MemoryBlobService { + instance_name: instance_name.to_string(), + db: Default::default(), + })) } } diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index b688ebafc7f4..10874af64011 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -64,6 +64,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; /// all keys stored so far, but no promises ;-) #[derive(Clone)] pub struct ObjectStoreBlobService { + instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path, @@ -92,7 +93,7 @@ fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path { #[async_trait] impl BlobService for ObjectStoreBlobService { - #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest))] + #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { // TODO: clarify if this should work for chunks or not, and explicitly // document in the proto docs. @@ -112,7 +113,7 @@ impl BlobService for ObjectStoreBlobService { } } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // handle reading the empty blob. if digest.as_slice() == blake3::hash(b"").as_bytes() { @@ -169,7 +170,7 @@ impl BlobService for ObjectStoreBlobService { } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking // needs an AsyncRead, so we create a pipe here. @@ -192,7 +193,7 @@ impl BlobService for ObjectStoreBlobService { }) } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { match self .object_store @@ -294,7 +295,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (object_store, path) = object_store::parse_url_opts( @@ -302,6 +303,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig { &self.object_store_options, )?; Ok(Arc::new(ObjectStoreBlobService { + instance_name: instance_name.to_string(), object_store: Arc::new(object_store), base_path: path, avg_chunk_size: self.avg_chunk_size, @@ -582,6 +584,7 @@ mod test { object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap(); let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store); let blobsvc = Arc::new(ObjectStoreBlobService { + instance_name: "test".into(), object_store: object_store.clone(), avg_chunk_size: default_avg_chunk_size(), base_path, diff --git a/tvix/castore/src/blobservice/tests/utils.rs b/tvix/castore/src/blobservice/tests/utils.rs index 7df4f00d3a09..7032d633dad0 100644 --- a/tvix/castore/src/blobservice/tests/utils.rs +++ b/tvix/castore/src/blobservice/tests/utils.rs @@ -29,14 +29,17 @@ pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); - Box::new(GRPCBlobService::from_client(BlobServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } - })) - .await - .unwrap(), - ))) + Box::new(GRPCBlobService::from_client( + "default".into(), + BlobServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } + })) + .await + .unwrap(), + ), + )) } diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index 73ab4342d832..7473481c94b5 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -37,6 +37,7 @@ const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; /// directly at the root, so rely on store composition. #[derive(Clone)] pub struct BigtableDirectoryService { + instance_name: String, client: bigtable::BigTable, params: BigtableParameters, @@ -49,7 +50,10 @@ pub struct BigtableDirectoryService { impl BigtableDirectoryService { #[cfg(not(test))] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { let connection = bigtable::BigTableConnection::new( ¶ms.project_id, ¶ms.instance_name, @@ -60,13 +64,17 @@ impl BigtableDirectoryService { .await?; Ok(Self { + instance_name, client: connection.client(), params, }) } #[cfg(test)] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { use std::time::Duration; use async_process::{Command, Stdio}; @@ -135,6 +143,7 @@ impl BigtableDirectoryService { )?; Ok(Self { + instance_name, client: connection.client(), params, emulator: (tmpdir, emulator_process).into(), @@ -150,7 +159,7 @@ fn derive_directory_key(digest: &B3Digest) -> String { #[async_trait] impl DirectoryService for BigtableDirectoryService { - #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] + #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let mut client = self.client.clone(); let directory_key = derive_directory_key(digest); @@ -250,7 +259,7 @@ impl DirectoryService for BigtableDirectoryService { Ok(Some(directory)) } - #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let directory_digest = directory.digest(); let mut client = self.client.clone(); @@ -300,7 +309,7 @@ impl DirectoryService for BigtableDirectoryService { Ok(directory_digest) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -308,7 +317,7 @@ impl DirectoryService for BigtableDirectoryService { traverse_directory(self.clone(), root_directory_digest) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, @@ -346,11 +355,11 @@ impl ServiceBuilder for BigtableParameters { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> { Ok(Arc::new( - BigtableDirectoryService::connect(self.clone()).await?, + BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?, )) } } diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index 4dfc19540c47..450c642715ac 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -22,13 +22,18 @@ use crate::Error; /// Inserts and listings are not implemented for now. #[derive(Clone)] pub struct Cache<DS1, DS2> { + instance_name: String, near: DS1, far: DS2, } impl<DS1, DS2> Cache<DS1, DS2> { - pub fn new(near: DS1, far: DS2) -> Self { - Self { near, far } + pub fn new(instance_name: String, near: DS1, far: DS2) -> Self { + Self { + instance_name, + near, + far, + } } } @@ -38,7 +43,7 @@ where DS1: DirectoryService + Clone + 'static, DS2: DirectoryService + Clone + 'static, { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] + #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { match self.near.get(digest).await? { Some(directory) => { @@ -80,12 +85,12 @@ where } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name = %self.instance_name))] async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> { Err(Error::StorageError("unimplemented".to_string())) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -166,7 +171,7 @@ impl ServiceBuilder for CacheConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (near, far) = futures::join!( @@ -174,6 +179,7 @@ impl ServiceBuilder for CacheConfig { context.resolve::<Self::Output>(self.far.clone()) ); Ok(Arc::new(Cache { + instance_name: instance_name.to_string(), near: near?, far: far?, })) diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 9696c5631949..3fd177a34f28 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -17,6 +17,7 @@ use tracing::{instrument, warn, Instrument as _}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] pub struct GRPCDirectoryService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, @@ -26,9 +27,13 @@ impl<T> GRPCDirectoryService<T> { /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( + instance_name: String, grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, ) -> Self { - Self { grpc_client } + Self { + instance_name, + grpc_client, + } } } @@ -40,7 +45,7 @@ where <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, T::Future: Send, { - #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); @@ -81,7 +86,7 @@ where } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client @@ -101,7 +106,7 @@ where } } - #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -240,13 +245,16 @@ impl ServiceBuilder for GRPCDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let client = proto::directory_service_client::DirectoryServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, ); - Ok(Arc::new(GRPCDirectoryService::from_client(client))) + Ok(Arc::new(GRPCDirectoryService::from_client( + instance_name.to_string(), + client, + ))) } } @@ -374,7 +382,7 @@ mod tests { .await .expect("must succeed"), ); - GRPCDirectoryService::from_client(client) + GRPCDirectoryService::from_client("test-instance".into(), client) }; assert!(grpc_client diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index b039d9bc7d84..a43d7b8d8d31 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -13,12 +13,13 @@ use crate::proto; #[derive(Clone, Default)] pub struct MemoryDirectoryService { + instance_name: String, db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, } #[async_trait] impl DirectoryService for MemoryDirectoryService { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] + #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.read().await; @@ -45,7 +46,7 @@ impl DirectoryService for MemoryDirectoryService { } } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); @@ -56,7 +57,7 @@ impl DirectoryService for MemoryDirectoryService { Ok(digest) } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -64,7 +65,7 @@ impl DirectoryService for MemoryDirectoryService { traverse_directory(self.clone(), root_directory_digest) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, @@ -93,9 +94,12 @@ impl ServiceBuilder for MemoryDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { - Ok(Arc::new(MemoryDirectoryService::default())) + Ok(Arc::new(MemoryDirectoryService { + instance_name: instance_name.to_string(), + db: Default::default(), + })) } } diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index 5b5281abcd2f..77578ec92f02 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -32,6 +32,7 @@ use crate::{proto, B3Digest, Error, Node}; /// be returned to the client in get_recursive. #[derive(Clone)] pub struct ObjectStoreDirectoryService { + instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path, } @@ -63,6 +64,7 @@ impl ObjectStoreDirectoryService { let (object_store, path) = object_store::parse_url_opts(url, options)?; Ok(Self { + instance_name: "default".into(), object_store: Arc::new(object_store), base_path: path, }) @@ -72,18 +74,26 @@ impl ObjectStoreDirectoryService { pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { Self::parse_url_opts(url, Vec::<(String, String)>::new()) } + + pub fn new(instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path) -> Self { + Self { + instance_name, + object_store, + base_path, + } + } } #[async_trait] impl DirectoryService for ObjectStoreDirectoryService { /// This is the same steps as for get_recursive anyways, so we just call get_recursive and /// return the first element of the stream and drop the request. - #[instrument(skip(self, digest), fields(directory.digest = %digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.get_recursive(digest).take(1).next().await.transpose() } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { // Ensure the directory doesn't contain other directory children if directory @@ -100,7 +110,7 @@ impl DirectoryService for ObjectStoreDirectoryService { handle.close().await } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -219,17 +229,18 @@ impl ServiceBuilder for ObjectStoreDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (object_store, path) = object_store::parse_url_opts( &self.object_store_url.parse()?, &self.object_store_options, )?; - Ok(Arc::new(ObjectStoreDirectoryService { - object_store: Arc::new(object_store), - base_path: path, - })) + Ok(Arc::new(ObjectStoreDirectoryService::new( + instance_name.to_string(), + Arc::new(object_store), + path, + ))) } } diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs index d253df503bb3..ba1fb682d5da 100644 --- a/tvix/castore/src/directoryservice/redb.rs +++ b/tvix/castore/src/directoryservice/redb.rs @@ -19,6 +19,7 @@ const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = #[derive(Clone)] pub struct RedbDirectoryService { + instance_name: String, // We wrap the db in an Arc to be able to move it into spawn_blocking, // as discussed in https://github.com/cberner/redb/issues/789 db: Arc<Database>, @@ -27,7 +28,7 @@ pub struct RedbDirectoryService { impl RedbDirectoryService { /// Constructs a new instance using the specified filesystem path for /// storage. - pub async fn new(path: PathBuf) -> Result<Self, Error> { + pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> { if path == PathBuf::from("/") { return Err(Error::StorageError( "cowardly refusing to open / with redb".to_string(), @@ -41,7 +42,10 @@ impl RedbDirectoryService { }) .await??; - Ok(Self { db: Arc::new(db) }) + Ok(Self { + instance_name, + db: Arc::new(db), + }) } /// Constructs a new instance using the in-memory backend. @@ -51,7 +55,10 @@ impl RedbDirectoryService { create_schema(&db)?; - Ok(Self { db: Arc::new(db) }) + Ok(Self { + instance_name: "default".into(), + db: Arc::new(db), + }) } } @@ -68,7 +75,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { #[async_trait] impl DirectoryService for RedbDirectoryService { - #[instrument(skip(self, digest), fields(directory.digest = %digest))] + #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))] async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.clone(); @@ -121,7 +128,7 @@ impl DirectoryService for RedbDirectoryService { Ok(Some(directory)) } - #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))] async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); @@ -146,7 +153,7 @@ impl DirectoryService for RedbDirectoryService { .await? } - #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))] fn get_recursive( &self, root_directory_digest: &B3Digest, @@ -275,7 +282,7 @@ impl ServiceBuilder for RedbDirectoryServiceConfig { type Output = dyn DirectoryService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { match self { @@ -297,7 +304,9 @@ impl ServiceBuilder for RedbDirectoryServiceConfig { RedbDirectoryServiceConfig { is_temporary: false, path: Some(path), - } => Ok(Arc::new(RedbDirectoryService::new(path.into()).await?)), + } => Ok(Arc::new( + RedbDirectoryService::new(instance_name.to_string(), path.into()).await?, + )), } } } diff --git a/tvix/castore/src/directoryservice/tests/utils.rs b/tvix/castore/src/directoryservice/tests/utils.rs index 3d245ea412d5..1c5d68de0108 100644 --- a/tvix/castore/src/directoryservice/tests/utils.rs +++ b/tvix/castore/src/directoryservice/tests/utils.rs @@ -33,6 +33,7 @@ pub async fn make_grpc_directory_service_client() -> Box<dyn DirectoryService> { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); Box::new(GRPCDirectoryService::from_client( + "default".into(), DirectoryServiceClient::new( Endpoint::try_from("http://[::]:50051") .unwrap() diff --git a/tvix/docs/src/TODO.md b/tvix/docs/src/TODO.md index 4db77b04d442..57a685989a4c 100644 --- a/tvix/docs/src/TODO.md +++ b/tvix/docs/src/TODO.md @@ -189,10 +189,6 @@ This requires some more designing. Some goals: - Maybe add a ?cache=$other_url parameter support to the URL syntax, to easily wrap a store with a caching frontend, using $other_url as the "near" store URL. - - Each store should get its instance name passed down, and add this as a - field in the instrumentation calls. This causes log messages and - per-instance store metrics to be traceable back to the specific instance - (if multiple backends of the same type are present). ### Store Config There's already serde for some store options (bigtable uses `serde_qs`). diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs index 3d8db8e5044a..ce6faa2d3743 100644 --- a/tvix/store/src/pathinfoservice/bigtable.rs +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -35,6 +35,7 @@ const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; /// "unimplemented" error. #[derive(Clone)] pub struct BigtablePathInfoService { + instance_name: String, client: bigtable::BigTable, params: BigtableParameters, @@ -47,7 +48,10 @@ pub struct BigtablePathInfoService { impl BigtablePathInfoService { #[cfg(not(test))] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { let connection = bigtable::BigTableConnection::new( ¶ms.project_id, ¶ms.instance_name, @@ -58,13 +62,17 @@ impl BigtablePathInfoService { .await?; Ok(Self { + instance_name, client: connection.client(), params, }) } #[cfg(test)] - pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + pub async fn connect( + instance_name: String, + params: BigtableParameters, + ) -> Result<Self, bigtable::Error> { use std::time::Duration; use async_process::{Command, Stdio}; @@ -133,6 +141,7 @@ impl BigtablePathInfoService { )?; Ok(Self { + instance_name: instance_name.to_string(), client: connection.client(), params, emulator: (tmpdir, emulator_process).into(), @@ -148,7 +157,7 @@ fn derive_pathinfo_key(digest: &[u8; 20]) -> String { #[async_trait] impl PathInfoService for BigtablePathInfoService { - #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let mut client = self.client.clone(); let path_info_key = derive_pathinfo_key(&digest); @@ -244,7 +253,7 @@ impl PathInfoService for BigtablePathInfoService { Ok(Some(path_info)) } - #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { let mut client = self.client.clone(); let path_info_key = derive_pathinfo_key(path_info.store_path.digest()); @@ -410,11 +419,11 @@ impl ServiceBuilder for BigtableParameters { type Output = dyn PathInfoService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> { Ok(Arc::new( - BigtablePathInfoService::connect(self.clone()).await?, + BigtablePathInfoService::connect(instance_name.to_string(), self.clone()).await?, )) } } diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs index f386fd52dc3c..df0046875057 100644 --- a/tvix/store/src/pathinfoservice/combinators.rs +++ b/tvix/store/src/pathinfoservice/combinators.rs @@ -15,13 +15,18 @@ use super::{PathInfo, PathInfoService}; /// There is no negative cache. /// Inserts and listings are not implemented for now. pub struct Cache<PS1, PS2> { + instance_name: String, near: PS1, far: PS2, } impl<PS1, PS2> Cache<PS1, PS2> { - pub fn new(near: PS1, far: PS2) -> Self { - Self { near, far } + pub fn new(instance_name: String, near: PS1, far: PS2) -> Self { + Self { + instance_name, + near, + far, + } } } @@ -31,7 +36,7 @@ where PS1: PathInfoService, PS2: PathInfoService, { - #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { match self.near.get(digest).await? { Some(path_info) => { @@ -84,7 +89,7 @@ impl ServiceBuilder for CacheConfig { type Output = dyn PathInfoService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, context: &CompositionContext, ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (near, far) = futures::join!( @@ -92,6 +97,7 @@ impl ServiceBuilder for CacheConfig { context.resolve::<Self::Output>(self.far.clone()) ); Ok(Arc::new(Cache { + instance_name: instance_name.to_string(), near: near?, far: far?, })) @@ -114,10 +120,10 @@ mod test { let far = MemoryPathInfoService::default(); // … and an instance of a "near" PathInfoService. - let near = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + let near = LruPathInfoService::with_capacity("test".into(), NonZeroUsize::new(1).unwrap()); // create a Pathinfoservice combining the two and return it. - super::Cache::new(near, far) + super::Cache::new("test".into(), near, far) } /// Getting from the far backend is gonna insert it into the near one. diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 453044cba13d..2e18f510253e 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -17,6 +17,8 @@ use tvix_castore::Node; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] pub struct GRPCPathInfoService<T> { + instance_name: String, + /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>, @@ -26,9 +28,13 @@ impl<T> GRPCPathInfoService<T> { /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( + instance_name: String, grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>, ) -> Self { - Self { grpc_client } + Self { + instance_name, + grpc_client, + } } } @@ -40,7 +46,7 @@ where <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, T::Future: Send, { - #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let path_info = self .grpc_client @@ -62,7 +68,7 @@ where } } - #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { let path_info = self .grpc_client @@ -99,6 +105,7 @@ where #[instrument(level = "trace", skip_all)] fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> { Some(Box::new(GRPCPathInfoService { + instance_name: self.instance_name.clone(), grpc_client: self.grpc_client.clone(), }) as Box<dyn NarCalculationService>) } @@ -163,13 +170,16 @@ impl ServiceBuilder for GRPCPathInfoServiceConfig { type Output = dyn PathInfoService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let client = proto::path_info_service_client::PathInfoServiceClient::new( tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?, ); - Ok(Arc::new(GRPCPathInfoService::from_client(client))) + Ok(Arc::new(GRPCPathInfoService::from_client( + instance_name.to_string(), + client, + ))) } } diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs index 3055d73c22f9..0b55441860fe 100644 --- a/tvix/store/src/pathinfoservice/lru.rs +++ b/tvix/store/src/pathinfoservice/lru.rs @@ -14,12 +14,14 @@ use tvix_castore::Error; use super::{PathInfo, PathInfoService}; pub struct LruPathInfoService { + instance_name: String, lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>, } impl LruPathInfoService { - pub fn with_capacity(capacity: NonZeroUsize) -> Self { + pub fn with_capacity(instance_name: String, capacity: NonZeroUsize) -> Self { Self { + instance_name, lru: Arc::new(RwLock::new(LruCache::new(capacity))), } } @@ -27,12 +29,12 @@ impl LruPathInfoService { #[async_trait] impl PathInfoService for LruPathInfoService { - #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { Ok(self.lru.write().await.get(&digest).cloned()) } - #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { self.lru .write() @@ -76,10 +78,13 @@ impl ServiceBuilder for LruPathInfoServiceConfig { type Output = dyn PathInfoService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { - Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity))) + Ok(Arc::new(LruPathInfoService::with_capacity( + instance_name.to_string(), + self.capacity, + ))) } } @@ -103,7 +108,7 @@ mod test { #[tokio::test] async fn evict() { - let svc = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + let svc = LruPathInfoService::with_capacity("test".into(), NonZeroUsize::new(1).unwrap()); // pathinfo_1 should not be there assert!(svc diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index fd013fe9a573..88a8a379f175 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -11,12 +11,13 @@ use tvix_castore::Error; #[derive(Default)] pub struct MemoryPathInfoService { + instance_name: String, db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, } #[async_trait] impl PathInfoService for MemoryPathInfoService { - #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let db = self.db.read().await; @@ -26,7 +27,7 @@ impl PathInfoService for MemoryPathInfoService { } } - #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // This overwrites existing PathInfo objects with the same store path digest. let mut db = self.db.write().await; @@ -69,9 +70,12 @@ impl ServiceBuilder for MemoryPathInfoServiceConfig { type Output = dyn PathInfoService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { - Ok(Arc::new(MemoryPathInfoService::default())) + Ok(Arc::new(MemoryPathInfoService { + instance_name: instance_name.to_string(), + db: Default::default(), + })) } } diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index 29e04aa45867..6b8960b75c33 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -32,6 +32,7 @@ use url::Url; /// [PathInfoService::put] is not implemented and returns an error if called. /// TODO: what about reading from nix-cache-info? pub struct NixHTTPPathInfoService<BS, DS> { + instance_name: String, base_url: url::Url, http_client: reqwest_middleware::ClientWithMiddleware, @@ -44,8 +45,14 @@ pub struct NixHTTPPathInfoService<BS, DS> { } impl<BS, DS> NixHTTPPathInfoService<BS, DS> { - pub fn new(base_url: url::Url, blob_service: BS, directory_service: DS) -> Self { + pub fn new( + instance_name: String, + base_url: url::Url, + blob_service: BS, + directory_service: DS, + ) -> Self { Self { + instance_name, base_url, http_client: reqwest_middleware::ClientBuilder::new(reqwest::Client::new()) .with(tvix_tracing::propagate::reqwest::tracing_middleware()) @@ -69,7 +76,7 @@ where BS: BlobService + Send + Sync + Clone + 'static, DS: DirectoryService + Send + Sync + Clone + 'static, { - #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))] + #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest), instance_name=%self.instance_name))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let narinfo_url = self .base_url @@ -241,7 +248,7 @@ where })) } - #[instrument(skip_all, fields(path_info=?_path_info))] + #[instrument(skip_all, fields(path_info=?_path_info, instance_name=%self.instance_name))] async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> { Err(Error::InvalidRequest( "put not supported for this backend".to_string(), @@ -314,7 +321,7 @@ impl ServiceBuilder for NixHTTPPathInfoServiceConfig { type Output = dyn PathInfoService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, context: &CompositionContext, ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (blob_service, directory_service) = futures::join!( @@ -322,6 +329,7 @@ impl ServiceBuilder for NixHTTPPathInfoServiceConfig { context.resolve::<dyn DirectoryService>(self.directory_service.clone()) ); let mut svc = NixHTTPPathInfoService::new( + instance_name.to_string(), Url::parse(&self.base_url)?, blob_service?, directory_service?, diff --git a/tvix/store/src/pathinfoservice/redb.rs b/tvix/store/src/pathinfoservice/redb.rs index 6e794e1981f0..8dc3083f3305 100644 --- a/tvix/store/src/pathinfoservice/redb.rs +++ b/tvix/store/src/pathinfoservice/redb.rs @@ -19,6 +19,7 @@ const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new( /// redb stores all of its data in a single file with a K/V pointing from a path's output hash to /// its corresponding protobuf-encoded PathInfo. pub struct RedbPathInfoService { + instance_name: String, // We wrap db in an Arc to be able to move it into spawn_blocking, // as discussed in https://github.com/cberner/redb/issues/789 db: Arc<Database>, @@ -27,7 +28,7 @@ pub struct RedbPathInfoService { impl RedbPathInfoService { /// Constructs a new instance using the specified file system path for /// storage. - pub async fn new(path: PathBuf) -> Result<Self, Error> { + pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> { if path == PathBuf::from("/") { return Err(Error::StorageError( "cowardly refusing to open / with redb".to_string(), @@ -41,17 +42,23 @@ impl RedbPathInfoService { }) .await??; - Ok(Self { db: Arc::new(db) }) + Ok(Self { + instance_name, + db: Arc::new(db), + }) } /// Constructs a new instance using the in-memory backend. - pub fn new_temporary() -> Result<Self, Error> { + pub fn new_temporary(instance_name: String) -> Result<Self, Error> { let db = redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?; create_schema(&db)?; - Ok(Self { db: Arc::new(db) }) + Ok(Self { + instance_name, + db: Arc::new(db), + }) } } @@ -68,7 +75,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { #[async_trait] impl PathInfoService for RedbPathInfoService { - #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))] + #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest), instance_name = %self.instance_name))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let db = self.db.clone(); @@ -93,7 +100,7 @@ impl PathInfoService for RedbPathInfoService { .await? } - #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node, instance_name = %self.instance_name))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { let db = self.db.clone(); @@ -193,14 +200,16 @@ impl ServiceBuilder for RedbPathInfoServiceConfig { type Output = dyn PathInfoService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { match self { RedbPathInfoServiceConfig { is_temporary: true, path: None, - } => Ok(Arc::new(RedbPathInfoService::new_temporary()?)), + } => Ok(Arc::new(RedbPathInfoService::new_temporary( + instance_name.to_string(), + )?)), RedbPathInfoServiceConfig { is_temporary: true, path: Some(_), @@ -215,7 +224,9 @@ impl ServiceBuilder for RedbPathInfoServiceConfig { RedbPathInfoServiceConfig { is_temporary: false, path: Some(path), - } => Ok(Arc::new(RedbPathInfoService::new(path.to_owned()).await?)), + } => Ok(Arc::new( + RedbPathInfoService::new(instance_name.to_string(), path.to_owned()).await?, + )), } } } diff --git a/tvix/store/src/pathinfoservice/signing_wrapper.rs b/tvix/store/src/pathinfoservice/signing_wrapper.rs index 525e60e416a8..f4ac44b5edd0 100644 --- a/tvix/store/src/pathinfoservice/signing_wrapper.rs +++ b/tvix/store/src/pathinfoservice/signing_wrapper.rs @@ -27,6 +27,7 @@ use super::MemoryPathInfoService; /// /// The service signs the [PathInfo] **only if it has a narinfo attribute** pub struct SigningPathInfoService<T, S> { + instance_name: String, /// The inner [PathInfoService] inner: T, /// The key to sign narinfos @@ -34,8 +35,12 @@ pub struct SigningPathInfoService<T, S> { } impl<T, S> SigningPathInfoService<T, S> { - pub fn new(inner: T, signing_key: Arc<SigningKey<S>>) -> Self { - Self { inner, signing_key } + pub fn new(instance_name: String, inner: T, signing_key: Arc<SigningKey<S>>) -> Self { + Self { + instance_name, + inner, + signing_key, + } } } @@ -45,7 +50,7 @@ where T: PathInfoService, S: ed25519::signature::Signer<ed25519::Signature> + Sync + Send, { - #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest), instance_name = %self.instance_name))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { self.inner.get(digest).await } @@ -101,7 +106,7 @@ impl ServiceBuilder for KeyFileSigningPathInfoServiceConfig { type Output = dyn PathInfoService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, context: &CompositionContext, ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let inner = context.resolve::<Self::Output>(self.inner.clone()).await?; @@ -110,7 +115,11 @@ impl ServiceBuilder for KeyFileSigningPathInfoServiceConfig { .map_err(|e| Error::StorageError(e.to_string()))? .0, ); - Ok(Arc::new(SigningPathInfoService { inner, signing_key })) + Ok(Arc::new(SigningPathInfoService { + instance_name: instance_name.to_string(), + inner, + signing_key, + })) } } @@ -118,6 +127,7 @@ impl ServiceBuilder for KeyFileSigningPathInfoServiceConfig { pub(crate) fn test_signing_service() -> Arc<dyn PathInfoService> { let memory_svc: Arc<dyn PathInfoService> = Arc::new(MemoryPathInfoService::default()); Arc::new(SigningPathInfoService { + instance_name: "test".into(), inner: memory_svc, signing_key: Arc::new( parse_keypair(DUMMY_KEYPAIR) diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs index 12c685c80fad..f56e1015ea26 100644 --- a/tvix/store/src/pathinfoservice/tests/mod.rs +++ b/tvix/store/src/pathinfoservice/tests/mod.rs @@ -26,7 +26,7 @@ use self::utils::make_bigtable_path_info_service; let (_, _, svc) = make_grpc_path_info_service_client().await; svc })] -#[case::redb(RedbPathInfoService::new_temporary().unwrap())] +#[case::redb(RedbPathInfoService::new_temporary("test".into()).unwrap())] #[case::signing(test_signing_service())] #[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))] pub fn path_info_services(#[case] svc: impl PathInfoService) {} diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs index 8b192e303b89..3b0684841d49 100644 --- a/tvix/store/src/pathinfoservice/tests/utils.rs +++ b/tvix/store/src/pathinfoservice/tests/utils.rs @@ -53,16 +53,19 @@ pub async fn make_grpc_path_info_service_client() -> ( // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); - let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } - })) - .await - .unwrap(), - )); + let path_info_service = GRPCPathInfoService::from_client( + "default".into(), + PathInfoServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } + })) + .await + .unwrap(), + ), + ); (blob_service, directory_service, path_info_service) } @@ -73,7 +76,7 @@ pub(crate) async fn make_bigtable_path_info_service( use crate::pathinfoservice::bigtable::BigtableParameters; use crate::pathinfoservice::BigtablePathInfoService; - BigtablePathInfoService::connect(BigtableParameters::default_for_tests()) + BigtablePathInfoService::connect("test".into(), BigtableParameters::default_for_tests()) .await .unwrap() } |