diff options
author | Yureka <tvl@yuka.dev> | 2024-07-18T17·09+0200 |
---|---|---|
committer | yuka <tvl@yuka.dev> | 2024-07-18T19·19+0000 |
commit | 168e4fda5909e535f33051051ef426e221ef20d4 (patch) | |
tree | e23b8ad4ced3f4232bdb0ad186f3b63f693c57e5 /tvix/castore/src/blobservice | |
parent | 79317be214ce2f1e3347438319d3482bb773a649 (diff) |
refactor(tvix): use composition & registry for from_addr r/8368
Change-Id: I3c94ecb5958294b5973c6fcdf5ee9c0d37fa54ad Reviewed-on: https://cl.tvl.fyi/c/depot/+/11976 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Autosubmit: yuka <yuka@yuka.dev>
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r-- | tvix/castore/src/blobservice/combinator.rs | 12 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/from_addr.rs | 62 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 13 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/memory.rs | 13 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/object_store.rs | 91 |
5 files changed, 102 insertions, 89 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index 0bce657e9176..8ec5a859bcda 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -6,7 +6,7 @@ use tonic::async_trait; use tracing::{instrument, warn}; use crate::composition::{CompositionContext, ServiceBuilder}; -use crate::B3Digest; +use crate::{B3Digest, Error}; use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; @@ -103,6 +103,16 @@ pub struct CombinedBlobServiceConfig { remote: String, } +impl TryFrom<url::Url> for CombinedBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a CombinedBlobService from a url is not supported".into(), + ) + .into()) + } +} + #[async_trait] impl ServiceBuilder for CombinedBlobServiceConfig { type Output = dyn BlobService; diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index b7e266c4eaec..c5cabaa9d945 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -1,8 +1,12 @@ +use std::sync::Arc; + use url::Url; -use crate::{proto::blob_service_client::BlobServiceClient, Error}; +use crate::composition::{ + with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, +}; -use super::{BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobService}; +use super::BlobService; /// Constructs a new instance of a [BlobService] from an URI. /// @@ -12,53 +16,19 @@ use super::{BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobServ /// - `objectstore+*://` ([ObjectStoreBlobService]) /// /// See their `from_url` methods for more details about their syntax. -pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> { +pub async fn from_addr( + uri: &str, +) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> { let url = Url::parse(uri) .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; - let blob_service: Box<dyn BlobService> = match url.scheme() { - "memory" => { - // memory doesn't support host or path in the URL. - if url.has_host() || !url.path().is_empty() { - return Err(Error::StorageError("invalid url".to_string())); - } - Box::<MemoryBlobService>::default() - } - scheme if scheme.starts_with("grpc+") => { - // schemes starting with grpc+ go to the GRPCPathInfoService. - // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. - // - In the case of unix sockets, there must be a path, but may not be a host. - // - In the case of non-unix sockets, there must be a host, but no path. - // Constructing the channel is handled by tvix_castore::channel::from_url. - Box::new(GRPCBlobService::from_client( - BlobServiceClient::with_interceptor( - crate::tonic::channel_from_url(&url).await?, - tvix_tracing::propagate::tonic::send_trace, - ), - )) - } - scheme if scheme.starts_with("objectstore+") => { - // We need to convert the URL to string, strip the prefix there, and then - // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. - let trimmed_url = { - let s = url.to_string(); - let mut url = Url::parse(s.strip_prefix("objectstore+").unwrap()).unwrap(); - // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. - url.set_query(None); - url - }; - Box::new( - ObjectStoreBlobService::parse_url_opts(&trimmed_url, url.query_pairs()) - .map_err(|e| Error::StorageError(e.to_string()))?, - ) - } - scheme => { - return Err(crate::Error::StorageError(format!( - "unknown scheme: {}", - scheme - ))) - } - }; + let blob_service_config = with_registry(®, || { + <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>::try_from(url) + })? + .0; + let blob_service = blob_service_config + .build("anonymous", &CompositionContext::blank()) + .await?; Ok(blob_service) } diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 56a2ebf00038..f5705adbf432 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -187,6 +187,19 @@ pub struct GRPCBlobServiceConfig { url: String, } +impl TryFrom<url::Url> for GRPCBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCBlobServiceConfig { + url: url.to_string(), + }) + } +} + #[async_trait] impl ServiceBuilder for GRPCBlobServiceConfig { type Output = dyn BlobService; diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs index 0205dcf7bd70..83b37edb1c89 100644 --- a/tvix/castore/src/blobservice/memory.rs +++ b/tvix/castore/src/blobservice/memory.rs @@ -7,7 +7,7 @@ use tracing::instrument; use super::{BlobReader, BlobService, BlobWriter}; use crate::composition::{CompositionContext, ServiceBuilder}; -use crate::B3Digest; +use crate::{B3Digest, Error}; #[derive(Clone, Default)] pub struct MemoryBlobService { @@ -42,6 +42,17 @@ impl BlobService for MemoryBlobService { #[serde(deny_unknown_fields)] pub struct MemoryBlobServiceConfig {} +impl TryFrom<url::Url> for MemoryBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string()).into()); + } + Ok(MemoryBlobServiceConfig {}) + } +} + #[async_trait] impl ServiceBuilder for MemoryBlobServiceConfig { type Output = dyn BlobService; diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index bd8c2bd747fb..224ebae9e2c6 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -21,21 +21,11 @@ use url::Url; use crate::{ composition::{CompositionContext, ServiceBuilder}, proto::{stat_blob_response::ChunkMeta, StatBlobResponse}, - B3Digest, B3HashingReader, + B3Digest, B3HashingReader, Error, }; use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; -#[derive(Clone)] -pub struct ObjectStoreBlobService { - object_store: Arc<dyn ObjectStore>, - base_path: Path, - - /// Average chunk size for FastCDC, in bytes. - /// min value is half, max value double of that number. - avg_chunk_size: u32, -} - /// Uses any object storage supported by the [object_store] crate to provide a /// tvix-castore [BlobService]. /// @@ -72,31 +62,14 @@ pub struct ObjectStoreBlobService { /// It also allows signalling any compression of chunks in the content-type. /// Migration *should* be possible by simply adding the right content-types to /// all keys stored so far, but no promises ;-) -impl ObjectStoreBlobService { - /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by - /// [object_store]. - /// Any path suffix becomes the base path of the object store. - /// additional options, the same as in [object_store::parse_url_opts] can - /// be passed. - pub fn parse_url_opts<I, K, V>(url: &Url, options: I) -> Result<Self, object_store::Error> - where - I: IntoIterator<Item = (K, V)>, - K: AsRef<str>, - V: Into<String>, - { - let (object_store, path) = object_store::parse_url_opts(url, options)?; - - Ok(Self { - object_store: Arc::new(object_store), - base_path: path, - avg_chunk_size: 256 * 1024, - }) - } +#[derive(Clone)] +pub struct ObjectStoreBlobService { + object_store: Arc<dyn ObjectStore>, + base_path: Path, - /// Like [Self::parse_url_opts], except without the options. - pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> { - Self::parse_url_opts(url, Vec::<(String, String)>::new()) - } + /// Average chunk size for FastCDC, in bytes. + /// min value is half, max value double of that number. + avg_chunk_size: u32, } #[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))] @@ -281,10 +254,41 @@ pub struct ObjectStoreBlobServiceConfig { object_store_url: String, #[serde(default = "default_avg_chunk_size")] avg_chunk_size: u32, - #[serde(default)] object_store_options: HashMap<String, String>, } +impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// [object_store]. + /// Any path suffix becomes the base path of the object store. + /// additional options, the same as in [object_store::parse_url_opts] can + /// be passed. + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // We need to convert the URL to string, strip the prefix there, and then + // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. + let trimmed_url = { + let s = url.to_string(); + let mut url = Url::parse( + s.strip_prefix("objectstore+") + .ok_or(Error::StorageError("Missing objectstore uri".into()))?, + )?; + // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. + url.set_query(None); + url + }; + Ok(ObjectStoreBlobServiceConfig { + object_store_url: trimmed_url.into(), + object_store_options: url + .query_pairs() + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + avg_chunk_size: 256 * 1024, + }) + } +} + #[async_trait] impl ServiceBuilder for ObjectStoreBlobServiceConfig { type Output = dyn BlobService; @@ -548,7 +552,7 @@ where #[cfg(test)] mod test { - use super::chunk_and_upload; + use super::{chunk_and_upload, default_avg_chunk_size}; use crate::{ blobservice::{BlobService, ObjectStoreBlobService}, fixtures::{BLOB_A, BLOB_A_DIGEST}, @@ -559,13 +563,18 @@ mod test { /// Tests chunk_and_upload directly, bypassing the BlobWriter at open_write(). #[tokio::test] async fn test_chunk_and_upload() { - let blobsvc = Arc::new( - ObjectStoreBlobService::parse_url(&Url::parse("memory:///").unwrap()).unwrap(), - ); + let (object_store, base_path) = + object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap(); + let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store); + let blobsvc = Arc::new(ObjectStoreBlobService { + object_store: object_store.clone(), + avg_chunk_size: default_avg_chunk_size(), + base_path, + }); let blob_digest = chunk_and_upload( &mut Cursor::new(BLOB_A.to_vec()), - blobsvc.object_store.clone(), + object_store, object_store::path::Path::from("/"), 1024 / 2, 1024, |