diff options
author | Yureka <tvl@yuka.dev> | 2024-06-16T23·10+0200 |
---|---|---|
committer | yuka <yuka@yuka.dev> | 2024-07-18T10·42+0000 |
commit | 1a6b6e3ef310c8eea37b55f8007c85a8772ff8e9 (patch) | |
tree | 228afb6571ccb8e010f991841b32f3337f78c332 /tvix/castore/src/blobservice/object_store.rs | |
parent | 64fd1d3e56cb0edcbe77227cf18ca8425438a68a (diff) |
feat(tvix/castore): add composition module r/8365
Change-Id: I0868f3278db85ae5fe030089ee9033837bc08748 Signed-off-by: Yureka <tvl@yuka.dev> Reviewed-on: https://cl.tvl.fyi/c/depot/+/11853 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/blobservice/object_store.rs')
-rw-r--r-- | tvix/castore/src/blobservice/object_store.rs | 36 |
1 files changed, 36 insertions, 0 deletions
diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index d2d0a288a557..bd8c2bd747fb 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, io::{self, Cursor}, pin::pin, sync::Arc, @@ -18,6 +19,7 @@ use tracing::{debug, instrument, trace, Level}; use url::Url; use crate::{ + composition::{CompositionContext, ServiceBuilder}, proto::{stat_blob_response::ChunkMeta, StatBlobResponse}, B3Digest, B3HashingReader, }; @@ -269,6 +271,40 @@ impl BlobService for ObjectStoreBlobService { } } +fn default_avg_chunk_size() -> u32 { + 256 * 1024 +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreBlobServiceConfig { + object_store_url: String, + #[serde(default = "default_avg_chunk_size")] + avg_chunk_size: u32, + #[serde(default)] + object_store_options: HashMap<String, String>, +} + +#[async_trait] +impl ServiceBuilder for ObjectStoreBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext<dyn BlobService>, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (object_store, path) = object_store::parse_url_opts( + &self.object_store_url.parse()?, + &self.object_store_options, + )?; + Ok(Arc::new(ObjectStoreBlobService { + object_store: Arc::new(object_store), + base_path: path, + avg_chunk_size: self.avg_chunk_size, + })) + } +} + /// Reads blob contents from a AsyncRead, chunks and uploads them. /// On success, returns a [StatBlobResponse] pointing to the individual chunks. #[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)] |