diff options
Diffstat (limited to 'tvix/castore/src/blobservice/object_store.rs')
-rw-r--r-- | tvix/castore/src/blobservice/object_store.rs | 91 |
1 files changed, 50 insertions, 41 deletions
diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index bd8c2bd747fb..224ebae9e2c6 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -21,21 +21,11 @@ use url::Url; use crate::{ composition::{CompositionContext, ServiceBuilder}, proto::{stat_blob_response::ChunkMeta, StatBlobResponse}, - B3Digest, B3HashingReader, + B3Digest, B3HashingReader, Error, }; use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; -#[derive(Clone)] -pub struct ObjectStoreBlobService { - object_store: Arc<dyn ObjectStore>, - base_path: Path, - - /// Average chunk size for FastCDC, in bytes. - /// min value is half, max value double of that number. - avg_chunk_size: u32, -} - /// Uses any object storage supported by the [object_store] crate to provide a /// tvix-castore [BlobService]. /// @@ -72,31 +62,14 @@ pub struct ObjectStoreBlobService { /// It also allows signalling any compression of chunks in the content-type. /// Migration *should* be possible by simply adding the right content-types to /// all keys stored so far, but no promises ;-) -impl ObjectStoreBlobService { - /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by - /// [object_store]. - /// Any path suffix becomes the base path of the object store. - /// additional options, the same as in [object_store::parse_url_opts] can - /// be passed. - pub fn parse_url_opts<I, K, V>(url: &Url, options: I) -> Result<Self, object_store::Error> - where - I: IntoIterator<Item = (K, V)>, - K: AsRef<str>, - V: Into<String>, - { - let (object_store, path) = object_store::parse_url_opts(url, options)?; - - Ok(Self { - object_store: Arc::new(object_store), - base_path: path, - avg_chunk_size: 256 * 1024, - }) - } +#[derive(Clone)] +pub struct ObjectStoreBlobService { + object_store: Arc<dyn ObjectStore>, + base_path: Path, - /// Like [Self::parse_url_opts], except without the options. - pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { - Self::parse_url_opts(url, Vec::<(String, String)>::new()) - } + /// Average chunk size for FastCDC, in bytes. + /// min value is half, max value double of that number. + avg_chunk_size: u32, } #[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))] @@ -281,10 +254,41 @@ pub struct ObjectStoreBlobServiceConfig { object_store_url: String, #[serde(default = "default_avg_chunk_size")] avg_chunk_size: u32, - #[serde(default)] object_store_options: HashMap<String, String>, } +impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// [object_store]. + /// Any path suffix becomes the base path of the object store. + /// additional options, the same as in [object_store::parse_url_opts] can + /// be passed. + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // We need to convert the URL to string, strip the prefix there, and then + // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. + let trimmed_url = { + let s = url.to_string(); + let mut url = Url::parse( + s.strip_prefix("objectstore+") + .ok_or(Error::StorageError("Missing objectstore uri".into()))?, + )?; + // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. + url.set_query(None); + url + }; + Ok(ObjectStoreBlobServiceConfig { + object_store_url: trimmed_url.into(), + object_store_options: url + .query_pairs() + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + avg_chunk_size: 256 * 1024, + }) + } +} + #[async_trait] impl ServiceBuilder for ObjectStoreBlobServiceConfig { type Output = dyn BlobService; @@ -548,7 +552,7 @@ where #[cfg(test)] mod test { - use super::chunk_and_upload; + use super::{chunk_and_upload, default_avg_chunk_size}; use crate::{ blobservice::{BlobService, ObjectStoreBlobService}, fixtures::{BLOB_A, BLOB_A_DIGEST}, @@ -559,13 +563,18 @@ mod test { /// Tests chunk_and_upload directly, bypassing the BlobWriter at open_write(). #[tokio::test] async fn test_chunk_and_upload() { - let blobsvc = Arc::new( - ObjectStoreBlobService::parse_url(&Url::parse("memory:///").unwrap()).unwrap(), - ); + let (object_store, base_path) = + object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap(); + let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store); + let blobsvc = Arc::new(ObjectStoreBlobService { + object_store: object_store.clone(), + avg_chunk_size: default_avg_chunk_size(), + base_path, + }); let blob_digest = chunk_and_upload( &mut Cursor::new(BLOB_A.to_vec()), - blobsvc.object_store.clone(), + object_store, object_store::path::Path::from("/"), 1024 / 2, 1024, |