diff options
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r-- | tvix/castore/src/blobservice/combinator.rs | 11 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 28 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/memory.rs | 14 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/object_store.rs | 13 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/tests/utils.rs | 23 |
5 files changed, 56 insertions, 33 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index 6a964c8a8440..e5f61d6d0c2a 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -16,6 +16,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; /// blobservice again, before falling back to the remote one. /// The remote BlobService is never written to. pub struct CombinedBlobService<BL, BR> { + instance_name: String, local: BL, remote: BR, } @@ -27,6 +28,7 @@ where { fn clone(&self) -> Self { Self { + instance_name: self.instance_name.clone(), local: self.local.clone(), remote: self.remote.clone(), } @@ -39,12 +41,12 @@ where BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static, { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> { Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> { if self.local.as_ref().has(digest).await? { // local store has the blob, so we can assume it also has all chunks. @@ -84,7 +86,7 @@ where } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // direct writes to the local one. self.local.as_ref().open_write().await @@ -113,7 +115,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { let (local, remote) = futures::join!( @@ -121,6 +123,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig { context.resolve(self.remote.clone()) ); Ok(Arc::new(CombinedBlobService { + instance_name: instance_name.to_string(), local: local?, remote: remote?, })) diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 0db3dfea4ad8..3453657d07ab 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -24,6 +24,7 @@ use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] pub struct GRPCBlobService<T> { + instance_name: String, /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::blob_service_client::BlobServiceClient<T>, @@ -31,8 +32,14 @@ pub struct GRPCBlobService<T> { impl<T> GRPCBlobService<T> { /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. - pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self { - Self { grpc_client } + pub fn from_client( + instance_name: String, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, + ) -> Self { + Self { + instance_name, + grpc_client, + } } } @@ -44,7 +51,7 @@ where <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, T::Future: Send, { - #[instrument(skip(self, digest), fields(blob.digest=%digest))] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { match self .grpc_client @@ -61,7 +68,7 @@ where } } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // First try to get a list of chunks. In case there's only one chunk returned, // buffer its data into a Vec, otherwise use a ChunkedReader. @@ -124,7 +131,7 @@ where /// Returns a BlobWriter, that'll internally wrap each write in a /// [proto::BlobChunk], which is send to the gRPC server. - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // set up an mpsc channel passing around Bytes. let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10); @@ -154,7 +161,7 @@ where }) } - #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { let resp = self .grpc_client @@ -205,13 +212,16 @@ impl ServiceBuilder for GRPCBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let client = proto::blob_service_client::BlobServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, ); - Ok(Arc::new(GRPCBlobService::from_client(client))) + Ok(Arc::new(GRPCBlobService::from_client( + instance_name.to_string(), + client, + ))) } } @@ -375,7 +385,7 @@ mod tests { .await .expect("must succeed"), ); - GRPCBlobService::from_client(client) + GRPCBlobService::from_client("default".into(), client) }; let has = grpc_client diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs index 3d733f950470..348b8bb56d5b 100644 --- a/tvix/castore/src/blobservice/memory.rs +++ b/tvix/castore/src/blobservice/memory.rs @@ -11,18 +11,19 @@ use crate::{B3Digest, Error}; #[derive(Clone, Default)] pub struct MemoryBlobService { + instance_name: String, db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, } #[async_trait] impl BlobService for MemoryBlobService { - #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] + #[instrument(skip_all, ret, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { let db = self.db.read(); Ok(db.contains_key(digest)) } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { let db = self.db.read(); @@ -32,7 +33,7 @@ impl BlobService for MemoryBlobService { } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { Box::new(MemoryBlobWriter::new(self.db.clone())) } @@ -58,10 +59,13 @@ impl ServiceBuilder for MemoryBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { - Ok(Arc::new(MemoryBlobService::default())) + Ok(Arc::new(MemoryBlobService { + instance_name: instance_name.to_string(), + db: Default::default(), + })) } } diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index b688ebafc7f4..10874af64011 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -64,6 +64,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; /// all keys stored so far, but no promises ;-) #[derive(Clone)] pub struct ObjectStoreBlobService { + instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path, @@ -92,7 +93,7 @@ fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path { #[async_trait] impl BlobService for ObjectStoreBlobService { - #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest))] + #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { // TODO: clarify if this should work for chunks or not, and explicitly // document in the proto docs. @@ -112,7 +113,7 @@ impl BlobService for ObjectStoreBlobService { } } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // handle reading the empty blob. if digest.as_slice() == blake3::hash(b"").as_bytes() { @@ -169,7 +170,7 @@ impl BlobService for ObjectStoreBlobService { } } - #[instrument(skip_all)] + #[instrument(skip_all, fields(instance_name=%self.instance_name))] async fn open_write(&self) -> Box<dyn BlobWriter> { // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking // needs an AsyncRead, so we create a pipe here. @@ -192,7 +193,7 @@ impl BlobService for ObjectStoreBlobService { }) } - #[instrument(skip_all, err, fields(blob.digest=%digest))] + #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))] async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { match self .object_store @@ -294,7 +295,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig { type Output = dyn BlobService; async fn build<'a>( &'a self, - _instance_name: &str, + instance_name: &str, _context: &CompositionContext, ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { let (object_store, path) = object_store::parse_url_opts( @@ -302,6 +303,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig { &self.object_store_options, )?; Ok(Arc::new(ObjectStoreBlobService { + instance_name: instance_name.to_string(), object_store: Arc::new(object_store), base_path: path, avg_chunk_size: self.avg_chunk_size, @@ -582,6 +584,7 @@ mod test { object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap(); let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store); let blobsvc = Arc::new(ObjectStoreBlobService { + instance_name: "test".into(), object_store: object_store.clone(), avg_chunk_size: default_avg_chunk_size(), base_path, diff --git a/tvix/castore/src/blobservice/tests/utils.rs b/tvix/castore/src/blobservice/tests/utils.rs index 7df4f00d3a09..7032d633dad0 100644 --- a/tvix/castore/src/blobservice/tests/utils.rs +++ b/tvix/castore/src/blobservice/tests/utils.rs @@ -29,14 +29,17 @@ pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); - Box::new(GRPCBlobService::from_client(BlobServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } - })) - .await - .unwrap(), - ))) + Box::new(GRPCBlobService::from_client( + "default".into(), + BlobServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } + })) + .await + .unwrap(), + ), + )) } |