diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.nix | 4 | ||||
-rw-r--r-- | tvix/castore/Cargo.toml | 5 | ||||
-rw-r--r-- | tvix/castore/default.nix | 2 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/from_addr.rs | 2 | ||||
-rw-r--r-- | tvix/castore/src/composition.rs | 204 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/from_addr.rs | 12 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 2 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/utils.rs | 2 |
9 files changed, 151 insertions, 91 deletions
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 3bce8c74afe5..7ef142fe162e 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -15401,7 +15401,7 @@ rec { "tonic-reflection" = [ "dep:tonic-reflection" ]; "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ]; }; - resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" ]; + resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" "xp-store-composition" ]; }; "tvix-cli" = rec { crateName = "tvix-cli"; @@ -16170,7 +16170,7 @@ rec { "tonic-reflection" = [ "dep:tonic-reflection" "tvix-castore/tonic-reflection" ]; "tracy" = [ "tvix-tracing/tracy" ]; "virtiofs" = [ "tvix-castore/virtiofs" ]; - "xp-store-composition" = [ "toml" ]; + "xp-store-composition" = [ "toml" "tvix-castore/xp-store-composition" ]; }; resolvedDefaultFeatures = [ "cloud" "default" "fuse" "integration" "otlp" "toml" "tonic-reflection" "tracy" "virtiofs" "xp-store-composition" ]; }; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index 00f4e44e5b70..5fdcba62febb 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -91,6 +91,11 @@ virtiofs = [ ] fuse = ["fs"] tonic-reflection = ["dep:tonic-reflection"] +# It's already possible for other crates to build a +# fully fledged store composition system based on castore composition. +# However, this feature enables anonymous url syntax which might +# inherently expose arbitrary composition possibilities to the user. +xp-store-composition = [] # Whether to run the integration tests. # Requires the following packages in $PATH: # cbtemulator, google-cloud-bigtable-tool diff --git a/tvix/castore/default.nix b/tvix/castore/default.nix index 9c210884f6e3..5314da9e0333 100644 --- a/tvix/castore/default.nix +++ b/tvix/castore/default.nix @@ -9,7 +9,7 @@ meta.ci.targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru); passthru = (depot.tvix.utils.mkFeaturePowerset { inherit (old) crateName; - features = ([ "cloud" "fuse" "tonic-reflection" ] + features = ([ "cloud" "fuse" "tonic-reflection" "xp-store-composition" ] # virtiofs feature currently fails to build on Darwin ++ lib.optional pkgs.stdenv.isLinux "virtiofs"); override.testPreRun = '' diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index c5cabaa9d945..803e7fa6a575 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -27,7 +27,7 @@ pub async fn from_addr( })? .0; let blob_service = blob_service_config - .build("anonymous", &CompositionContext::blank()) + .build("anonymous", &CompositionContext::blank(®)) .await?; Ok(blob_service) diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs index c76daafc523d..d52cc4060653 100644 --- a/tvix/castore/src/composition.rs +++ b/tvix/castore/src/composition.rs @@ -70,7 +70,7 @@ //! }); //! //! let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json))?; -//! let mut blob_service_composition = Composition::default(); +//! let mut blob_service_composition = Composition::new(®); //! blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); //! let blob_service: Arc<dyn BlobService> = blob_service_composition.build("default").await?; //! # Ok(()) @@ -88,6 +88,13 @@ //! ``` //! //! Continue with Example 2, with my_registry instead of REG +//! +//! EXPERIMENTAL: If the xp-store-composition feature is enabled, +//! entrypoints can also be URL strings, which are created as +//! anonymous stores. Instantiations of the same URL will +//! result in a new, distinct anonymous store each time, so creating +//! two `memory://` stores with this method will not share the same view. +//! This behavior might change in the future. use erased_serde::deserialize; use futures::future::BoxFuture; @@ -278,12 +285,15 @@ pub struct CompositionContext<'a> { // The TypeId of the trait object is included to distinguish e.g. the // BlobService "default" and the DirectoryService "default". stack: Vec<(TypeId, String)>, + registry: &'static Registry, composition: Option<&'a Composition>, } impl<'a> CompositionContext<'a> { - pub fn blank() -> Self { + /// Get a composition context for one-off store creation. + pub fn blank(registry: &'static Registry) -> Self { Self { + registry, stack: Default::default(), composition: None, } @@ -303,10 +313,104 @@ impl<'a> CompositionContext<'a> { ) .into()); } - match self.composition { - Some(comp) => Ok(comp.build_internal(self.stack.clone(), entrypoint).await?), - None => Err(CompositionError::NotFound(entrypoint).into()), + + Ok(self.build_internal(entrypoint).await?) + } + + #[cfg(feature = "xp-store-composition")] + async fn build_anonymous<T: ?Sized + Send + Sync + 'static>( + &self, + entrypoint: String, + ) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync>> { + let url = url::Url::parse(&entrypoint)?; + let config: DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>> = + with_registry(self.registry, || url.try_into())?; + config.0.build("anonymous", self).await + } + + fn build_internal<T: ?Sized + Send + Sync + 'static>( + &self, + entrypoint: String, + ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> { + #[cfg(feature = "xp-store-composition")] + if entrypoint.contains("://") { + // There is a chance this is a url. we are building an anonymous store + return Box::pin(async move { + self.build_anonymous(entrypoint.clone()) + .await + .map_err(|e| CompositionError::Failed(entrypoint, Arc::from(e))) + }); } + + let mut stores = match self.composition { + Some(comp) => comp.stores.lock().unwrap(), + None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), + }; + let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) { + Some(v) => v, + None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), + }; + // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what + // the new value should be. the Mutex stays locked the entire time, so nobody will ever see + // this temporary value. + let prev_val = std::mem::replace( + entry, + Box::new(InstantiationState::<T>::Done(Err( + CompositionError::Poisoned(entrypoint.clone()), + ))), + ); + let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() { + InstantiationState::Done(service) => ( + InstantiationState::Done(service.clone()), + futures::future::ready(service).boxed(), + ), + // the construction of the store has not started yet. + InstantiationState::Config(config) => { + let (tx, rx) = tokio::sync::watch::channel(None); + ( + InstantiationState::InProgress(rx), + (async move { + let mut new_context = CompositionContext { + composition: self.composition, + registry: self.registry, + stack: self.stack.clone(), + }; + new_context + .stack + .push((TypeId::of::<T>(), entrypoint.clone())); + let res = + config.build(&entrypoint, &new_context).await.map_err(|e| { + match e.downcast() { + Ok(e) => *e, + Err(e) => CompositionError::Failed(entrypoint, e.into()), + } + }); + tx.send(Some(res.clone())).unwrap(); + res + }) + .boxed(), + ) + } + // there is already a task driving forward the construction of this store, wait for it + // to notify us via the provided channel + InstantiationState::InProgress(mut recv) => { + (InstantiationState::InProgress(recv.clone()), { + (async move { + loop { + if let Some(v) = + recv.borrow_and_update().as_ref().map(|res| res.clone()) + { + break v; + } + recv.changed().await.unwrap(); + } + }) + .boxed() + }) + } + }; + *entry = Box::new(new_val); + ret } } @@ -336,8 +440,8 @@ enum InstantiationState<T: ?Sized> { Done(Result<Arc<T>, CompositionError>), } -#[derive(Default)] pub struct Composition { + registry: &'static Registry, stores: std::sync::Mutex<HashMap<(TypeId, String), Box<dyn Any + Send + Sync>>>, } @@ -381,6 +485,14 @@ impl<T: ?Sized + Send + Sync + 'static> } impl Composition { + /// The given registry will be used for creation of anonymous stores during composition + pub fn new(registry: &'static Registry) -> Self { + Self { + registry, + stores: Default::default(), + } + } + pub fn extend_with_configs<T: ?Sized + Send + Sync + 'static>( &mut self, // Keep the concrete `HashMap` type here since it allows for type @@ -390,87 +502,17 @@ impl Composition { self.extend(configs); } + /// Looks up the entrypoint name in the composition and returns an instantiated service. pub async fn build<T: ?Sized + Send + Sync + 'static>( &self, entrypoint: &str, ) -> Result<Arc<T>, CompositionError> { - self.build_internal(vec![], entrypoint.to_string()).await - } - - fn build_internal<T: ?Sized + Send + Sync + 'static>( - &self, - stack: Vec<(TypeId, String)>, - entrypoint: String, - ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> { - let mut stores = self.stores.lock().unwrap(); - let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) { - Some(v) => v, - None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), - }; - // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what - // the new value should be. the Mutex stays locked the entire time, so nobody will ever see - // this temporary value. - let prev_val = std::mem::replace( - entry, - Box::new(InstantiationState::<T>::Done(Err( - CompositionError::Poisoned(entrypoint.clone()), - ))), - ); - let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() { - InstantiationState::Done(service) => ( - InstantiationState::Done(service.clone()), - futures::future::ready(service).boxed(), - ), - // the construction of the store has not started yet. - InstantiationState::Config(config) => { - let (tx, rx) = tokio::sync::watch::channel(None); - ( - InstantiationState::InProgress(rx), - (async move { - let mut new_context = CompositionContext { - stack: stack.clone(), - composition: Some(self), - }; - new_context - .stack - .push((TypeId::of::<T>(), entrypoint.clone())); - let res = - config.build(&entrypoint, &new_context).await.map_err(|e| { - match e.downcast() { - Ok(e) => *e, - Err(e) => CompositionError::Failed(entrypoint, e.into()), - } - }); - tx.send(Some(res.clone())).unwrap(); - res - }) - .boxed(), - ) - } - // there is already a task driving forward the construction of this store, wait for it - // to notify us via the provided channel - InstantiationState::InProgress(mut recv) => { - (InstantiationState::InProgress(recv.clone()), { - (async move { - loop { - if let Some(v) = - recv.borrow_and_update().as_ref().map(|res| res.clone()) - { - break v; - } - recv.changed().await.unwrap(); - } - }) - .boxed() - }) - } - }; - *entry = Box::new(new_val); - ret + self.context().build_internal(entrypoint.to_string()).await } pub fn context(&self) -> CompositionContext { CompositionContext { + registry: self.registry, stack: vec![], composition: Some(self), } @@ -496,7 +538,7 @@ mod test { let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json)).unwrap(); - let mut blob_service_composition = Composition::default(); + let mut blob_service_composition = Composition::new(®); blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); let (blob_service1, blob_service2) = tokio::join!( blob_service_composition.build::<dyn BlobService>("default"), @@ -526,7 +568,7 @@ mod test { let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json)).unwrap(); - let mut blob_service_composition = Composition::default(); + let mut blob_service_composition = Composition::new(®); blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); match blob_service_composition .build::<dyn BlobService>("default") diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index 87a717b3fc2e..244d78a17bf0 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -37,7 +37,7 @@ pub async fn from_addr( })? .0; let directory_service = directory_service_config - .build("anonymous", &CompositionContext::blank()) + .build("anonymous", &CompositionContext::blank(®)) .await?; Ok(directory_service) @@ -88,6 +88,16 @@ mod tests { #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)] /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] + /// A valid example for store composition using anonymous urls + #[cfg_attr( + feature = "xp-store-composition", + case::anonymous_url_composition("cache://?near=memory://&far=memory://", true) + )] + /// Store composition with anonymous urls should fail if the feature is disabled + #[cfg_attr( + not(feature = "xp-store-composition"), + case::anonymous_url_composition("cache://?near=memory://&far=memory://", false) + )] /// A valid example for Bigtable #[cfg_attr( all(feature = "cloud", feature = "integration"), diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 3dc816e4ba74..c19f34bbbcf0 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -73,7 +73,7 @@ otlp = ["tvix-tracing/otlp"] tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"] tracy = ["tvix-tracing/tracy"] virtiofs = ["tvix-castore/virtiofs"] -xp-store-composition = ["toml"] +xp-store-composition = ["toml", "tvix-castore/xp-store-composition"] # Whether to run the integration tests. # Requires the following packages in $PATH: # cbtemulator, google-cloud-bigtable-tool diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index b2e8b473934a..3dfb08c9e817 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -44,7 +44,10 @@ pub async fn from_addr( })? .0; let path_info_service = path_info_service_config - .build("anonymous", context.unwrap_or(&CompositionContext::blank())) + .build( + "anonymous", + context.unwrap_or(&CompositionContext::blank(®)), + ) .await?; Ok(path_info_service) @@ -53,7 +56,7 @@ pub async fn from_addr( #[cfg(test)] mod tests { use super::from_addr; - use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder}; + use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder, REG}; use lazy_static::lazy_static; use rstest::rstest; use tempfile::TempDir; @@ -125,7 +128,7 @@ mod tests { )] #[tokio::test] async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { - let mut comp = Composition::default(); + let mut comp = Composition::new(®); comp.extend(vec![( "default".into(), DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {}) diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index 1385ece39f8a..86ec367b66ad 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -188,7 +188,7 @@ pub async fn construct_services_from_configs( ), Box<dyn std::error::Error + Send + Sync>, > { - let mut comp = Composition::default(); + let mut comp = Composition::new(®); comp.extend(configs.blobservices); comp.extend(configs.directoryservices); |