diff options
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r-- | tvix/castore/src/blobservice/combinator.rs | 29 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 22 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/memory.rs | 17 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/mod.rs | 18 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/object_store.rs | 36 |
5 files changed, 118 insertions, 4 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index 067eff96f488..0bce657e9176 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; + use futures::{StreamExt, TryStreamExt}; use tokio_util::io::{ReaderStream, StreamReader}; use tonic::async_trait; use tracing::{instrument, warn}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::B3Digest; use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; @@ -93,6 +96,32 @@ where } } +#[derive(serde::Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct CombinedBlobServiceConfig { + local: String, + remote: String, +} + +#[async_trait] +impl ServiceBuilder for CombinedBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext<dyn BlobService>, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { + let (local, remote) = futures::join!( + context.resolve(self.local.clone()), + context.resolve(self.remote.clone()) + ); + Ok(Arc::new(CombinedBlobService { + local: local?, + remote: remote?, + })) + } +} + fn make_chunked_reader<BS>( // This must consume, as we can't retain references to blob_service, // as it'd add a lifetime to BlobReader in general, which will get diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 85250da99d05..56a2ebf00038 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,4 +1,5 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{ proto::{self, stat_blob_response::ChunkMeta}, B3Digest, @@ -180,6 +181,27 @@ where } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCBlobServiceConfig { + url: String, +} + +#[async_trait] +impl ServiceBuilder for GRPCBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext<dyn BlobService>, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::blob_service_client::BlobServiceClient::new( + crate::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCBlobService::from_client(client))) + } +} + pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> { /// The task containing the put request, and the inner writer, if we're still writing. task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>, diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs index 873d06b461de..0205dcf7bd70 100644 --- a/tvix/castore/src/blobservice/memory.rs +++ b/tvix/castore/src/blobservice/memory.rs @@ -6,6 +6,7 @@ use tonic::async_trait; use tracing::instrument; use super::{BlobReader, BlobService, BlobWriter}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::B3Digest; #[derive(Clone, Default)] @@ -37,6 +38,22 @@ impl BlobService for MemoryBlobService { } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryBlobServiceConfig {} + +#[async_trait] +impl ServiceBuilder for MemoryBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext<dyn BlobService>, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(MemoryBlobService::default())) + } +} + pub struct MemoryBlobWriter { db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>, diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index 50acd40bf769..83fb5b674bb2 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -1,6 +1,8 @@ use std::io; + use tonic::async_trait; +use crate::composition::{Registry, ServiceBuilder}; use crate::proto::stat_blob_response::ChunkMeta; use crate::B3Digest; @@ -16,11 +18,11 @@ mod object_store; pub mod tests; pub use self::chunked_reader::ChunkedReader; -pub use self::combinator::CombinedBlobService; +pub use self::combinator::{CombinedBlobService, CombinedBlobServiceConfig}; pub use self::from_addr::from_addr; -pub use self::grpc::GRPCBlobService; -pub use self::memory::MemoryBlobService; -pub use self::object_store::ObjectStoreBlobService; +pub use self::grpc::{GRPCBlobService, GRPCBlobServiceConfig}; +pub use self::memory::{MemoryBlobService, MemoryBlobServiceConfig}; +pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfig}; /// The base trait all BlobService services need to implement. /// It provides functions to check whether a given blob exists, @@ -101,3 +103,11 @@ impl BlobReader for io::Cursor<&'static [u8; 0]> {} impl BlobReader for io::Cursor<Vec<u8>> {} impl BlobReader for io::Cursor<bytes::Bytes> {} impl BlobReader for tokio::fs::File {} + +/// Registers the builtin BlobService implementations with the registry +pub(crate) fn register_blob_services(reg: &mut Registry) { + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::ObjectStoreBlobServiceConfig>("objectstore"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::MemoryBlobServiceConfig>("memory"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::CombinedBlobServiceConfig>("combined"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::GRPCBlobServiceConfig>("grpc"); +} diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index d2d0a288a557..bd8c2bd747fb 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, io::{self, Cursor}, pin::pin, sync::Arc, @@ -18,6 +19,7 @@ use tracing::{debug, instrument, trace, Level}; use url::Url; use crate::{ + composition::{CompositionContext, ServiceBuilder}, proto::{stat_blob_response::ChunkMeta, StatBlobResponse}, B3Digest, B3HashingReader, }; @@ -269,6 +271,40 @@ impl BlobService for ObjectStoreBlobService { } } +fn default_avg_chunk_size() -> u32 { + 256 * 1024 +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreBlobServiceConfig { + object_store_url: String, + #[serde(default = "default_avg_chunk_size")] + avg_chunk_size: u32, + #[serde(default)] + object_store_options: HashMap<String, String>, +} + +#[async_trait] +impl ServiceBuilder for ObjectStoreBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext<dyn BlobService>, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (object_store, path) = object_store::parse_url_opts( + &self.object_store_url.parse()?, + &self.object_store_options, + )?; + Ok(Arc::new(ObjectStoreBlobService { + object_store: Arc::new(object_store), + base_path: path, + avg_chunk_size: self.avg_chunk_size, + })) + } +} + /// Reads blob contents from a AsyncRead, chunks and uploads them. /// On success, returns a [StatBlobResponse] pointing to the individual chunks. #[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)] |