diff options
author | Yureka <tvl@yuka.dev> | 2024-07-18T17·09+0200 |
---|---|---|
committer | yuka <tvl@yuka.dev> | 2024-07-18T19·19+0000 |
commit | 168e4fda5909e535f33051051ef426e221ef20d4 (patch) | |
tree | e23b8ad4ced3f4232bdb0ad186f3b63f693c57e5 /tvix/castore/src/blobservice/object_store.rs | |
parent | 79317be214ce2f1e3347438319d3482bb773a649 (diff) |
refactor(tvix): use composition & registry for from_addr r/8368
Change-Id: I3c94ecb5958294b5973c6fcdf5ee9c0d37fa54ad Reviewed-on: https://cl.tvl.fyi/c/depot/+/11976 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Autosubmit: yuka <yuka@yuka.dev>
Diffstat (limited to 'tvix/castore/src/blobservice/object_store.rs')
-rw-r--r-- | tvix/castore/src/blobservice/object_store.rs | 91 |
1 files changed, 50 insertions, 41 deletions
diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index bd8c2bd747fb..224ebae9e2c6 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -21,21 +21,11 @@ use url::Url; use crate::{ composition::{CompositionContext, ServiceBuilder}, proto::{stat_blob_response::ChunkMeta, StatBlobResponse}, - B3Digest, B3HashingReader, + B3Digest, B3HashingReader, Error, }; use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; -#[derive(Clone)] -pub struct ObjectStoreBlobService { - object_store: Arc<dyn ObjectStore>, - base_path: Path, - - /// Average chunk size for FastCDC, in bytes. - /// min value is half, max value double of that number. - avg_chunk_size: u32, -} - /// Uses any object storage supported by the [object_store] crate to provide a /// tvix-castore [BlobService]. /// @@ -72,31 +62,14 @@ pub struct ObjectStoreBlobService { /// It also allows signalling any compression of chunks in the content-type. /// Migration *should* be possible by simply adding the right content-types to /// all keys stored so far, but no promises ;-) -impl ObjectStoreBlobService { - /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by - /// [object_store]. - /// Any path suffix becomes the base path of the object store. - /// additional options, the same as in [object_store::parse_url_opts] can - /// be passed. - pub fn parse_url_opts<I, K, V>(url: &Url, options: I) -> Result<Self, object_store::Error> - where - I: IntoIterator<Item = (K, V)>, - K: AsRef<str>, - V: Into<String>, - { - let (object_store, path) = object_store::parse_url_opts(url, options)?; - - Ok(Self { - object_store: Arc::new(object_store), - base_path: path, - avg_chunk_size: 256 * 1024, - }) - } +#[derive(Clone)] +pub struct ObjectStoreBlobService { + object_store: Arc<dyn ObjectStore>, + base_path: Path, - /// Like [Self::parse_url_opts], except without the options. - pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { - Self::parse_url_opts(url, Vec::<(String, String)>::new()) - } + /// Average chunk size for FastCDC, in bytes. + /// min value is half, max value double of that number. + avg_chunk_size: u32, } #[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))] @@ -281,10 +254,41 @@ pub struct ObjectStoreBlobServiceConfig { object_store_url: String, #[serde(default = "default_avg_chunk_size")] avg_chunk_size: u32, - #[serde(default)] object_store_options: HashMap<String, String>, } +impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// [object_store]. + /// Any path suffix becomes the base path of the object store. + /// additional options, the same as in [object_store::parse_url_opts] can + /// be passed. + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // We need to convert the URL to string, strip the prefix there, and then + // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. + let trimmed_url = { + let s = url.to_string(); + let mut url = Url::parse( + s.strip_prefix("objectstore+") + .ok_or(Error::StorageError("Missing objectstore uri".into()))?, + )?; + // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. + url.set_query(None); + url + }; + Ok(ObjectStoreBlobServiceConfig { + object_store_url: trimmed_url.into(), + object_store_options: url + .query_pairs() + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + avg_chunk_size: 256 * 1024, + }) + } +} + #[async_trait] impl ServiceBuilder for ObjectStoreBlobServiceConfig { type Output = dyn BlobService; @@ -548,7 +552,7 @@ where #[cfg(test)] mod test { - use super::chunk_and_upload; + use super::{chunk_and_upload, default_avg_chunk_size}; use crate::{ blobservice::{BlobService, ObjectStoreBlobService}, fixtures::{BLOB_A, BLOB_A_DIGEST}, @@ -559,13 +563,18 @@ mod test { /// Tests chunk_and_upload directly, bypassing the BlobWriter at open_write(). #[tokio::test] async fn test_chunk_and_upload() { - let blobsvc = Arc::new( - ObjectStoreBlobService::parse_url(&Url::parse("memory:///").unwrap()).unwrap(), - ); + let (object_store, base_path) = + object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap(); + let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store); + let blobsvc = Arc::new(ObjectStoreBlobService { + object_store: object_store.clone(), + avg_chunk_size: default_avg_chunk_size(), + base_path, + }); let blob_digest = chunk_and_upload( &mut Cursor::new(BLOB_A.to_vec()), - blobsvc.object_store.clone(), + object_store, object_store::path::Path::from("/"), 1024 / 2, 1024, |