about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/Cargo.nix4
-rw-r--r--tvix/castore/Cargo.toml5
-rw-r--r--tvix/castore/default.nix2
-rw-r--r--tvix/castore/src/blobservice/from_addr.rs2
-rw-r--r--tvix/castore/src/composition.rs204
-rw-r--r--tvix/castore/src/directoryservice/from_addr.rs12
-rw-r--r--tvix/store/Cargo.toml2
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs9
-rw-r--r--tvix/store/src/utils.rs2
9 files changed, 151 insertions, 91 deletions
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 3bce8c74afe5..7ef142fe162e 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -15401,7 +15401,7 @@ rec {
           "tonic-reflection" = [ "dep:tonic-reflection" ];
           "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ];
         };
-        resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" ];
+        resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" "xp-store-composition" ];
       };
       "tvix-cli" = rec {
         crateName = "tvix-cli";
@@ -16170,7 +16170,7 @@ rec {
           "tonic-reflection" = [ "dep:tonic-reflection" "tvix-castore/tonic-reflection" ];
           "tracy" = [ "tvix-tracing/tracy" ];
           "virtiofs" = [ "tvix-castore/virtiofs" ];
-          "xp-store-composition" = [ "toml" ];
+          "xp-store-composition" = [ "toml" "tvix-castore/xp-store-composition" ];
         };
         resolvedDefaultFeatures = [ "cloud" "default" "fuse" "integration" "otlp" "toml" "tonic-reflection" "tracy" "virtiofs" "xp-store-composition" ];
       };
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index 00f4e44e5b70..5fdcba62febb 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -91,6 +91,11 @@ virtiofs = [
 ]
 fuse = ["fs"]
 tonic-reflection = ["dep:tonic-reflection"]
+# It's already possible for other crates to build a
+# fully fledged store composition system based on castore composition.
+# However, this feature enables anonymous url syntax which might
+# inherently expose arbitrary composition possibilities to the user.
+xp-store-composition = []
 # Whether to run the integration tests.
 # Requires the following packages in $PATH:
 # cbtemulator, google-cloud-bigtable-tool
diff --git a/tvix/castore/default.nix b/tvix/castore/default.nix
index 9c210884f6e3..5314da9e0333 100644
--- a/tvix/castore/default.nix
+++ b/tvix/castore/default.nix
@@ -9,7 +9,7 @@
   meta.ci.targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru);
   passthru = (depot.tvix.utils.mkFeaturePowerset {
     inherit (old) crateName;
-    features = ([ "cloud" "fuse" "tonic-reflection" ]
+    features = ([ "cloud" "fuse" "tonic-reflection" "xp-store-composition" ]
       # virtiofs feature currently fails to build on Darwin
       ++ lib.optional pkgs.stdenv.isLinux "virtiofs");
     override.testPreRun = ''
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs
index c5cabaa9d945..803e7fa6a575 100644
--- a/tvix/castore/src/blobservice/from_addr.rs
+++ b/tvix/castore/src/blobservice/from_addr.rs
@@ -27,7 +27,7 @@ pub async fn from_addr(
     })?
     .0;
     let blob_service = blob_service_config
-        .build("anonymous", &CompositionContext::blank())
+        .build("anonymous", &CompositionContext::blank(&REG))
         .await?;
 
     Ok(blob_service)
diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs
index c76daafc523d..d52cc4060653 100644
--- a/tvix/castore/src/composition.rs
+++ b/tvix/castore/src/composition.rs
@@ -70,7 +70,7 @@
 //! });
 //!
 //! let blob_services_configs = with_registry(&REG, || serde_json::from_value(blob_services_configs_json))?;
-//! let mut blob_service_composition = Composition::default();
+//! let mut blob_service_composition = Composition::new(&REG);
 //! blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
 //! let blob_service: Arc<dyn BlobService> = blob_service_composition.build("default").await?;
 //! # Ok(())
@@ -88,6 +88,13 @@
 //! ```
 //!
 //! Continue with Example 2, with my_registry instead of REG
+//!
+//! EXPERIMENTAL: If the xp-store-composition feature is enabled,
+//! entrypoints can also be URL strings, which are created as
+//! anonymous stores. Instantiations of the same URL will
+//! result in a new, distinct anonymous store each time, so creating
+//! two `memory://` stores with this method will not share the same view.
+//! This behavior might change in the future.
 
 use erased_serde::deserialize;
 use futures::future::BoxFuture;
@@ -278,12 +285,15 @@ pub struct CompositionContext<'a> {
     // The TypeId of the trait object is included to distinguish e.g. the
     // BlobService "default" and the DirectoryService "default".
     stack: Vec<(TypeId, String)>,
+    registry: &'static Registry,
     composition: Option<&'a Composition>,
 }
 
 impl<'a> CompositionContext<'a> {
-    pub fn blank() -> Self {
+    /// Get a composition context for one-off store creation.
+    pub fn blank(registry: &'static Registry) -> Self {
         Self {
+            registry,
             stack: Default::default(),
             composition: None,
         }
@@ -303,10 +313,104 @@ impl<'a> CompositionContext<'a> {
             )
             .into());
         }
-        match self.composition {
-            Some(comp) => Ok(comp.build_internal(self.stack.clone(), entrypoint).await?),
-            None => Err(CompositionError::NotFound(entrypoint).into()),
+
+        Ok(self.build_internal(entrypoint).await?)
+    }
+
+    #[cfg(feature = "xp-store-composition")]
+    async fn build_anonymous<T: ?Sized + Send + Sync + 'static>(
+        &self,
+        entrypoint: String,
+    ) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync>> {
+        let url = url::Url::parse(&entrypoint)?;
+        let config: DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>> =
+            with_registry(self.registry, || url.try_into())?;
+        config.0.build("anonymous", self).await
+    }
+
+    fn build_internal<T: ?Sized + Send + Sync + 'static>(
+        &self,
+        entrypoint: String,
+    ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
+        #[cfg(feature = "xp-store-composition")]
+        if entrypoint.contains("://") {
+            // There is a chance this is a url. we are building an anonymous store
+            return Box::pin(async move {
+                self.build_anonymous(entrypoint.clone())
+                    .await
+                    .map_err(|e| CompositionError::Failed(entrypoint, Arc::from(e)))
+            });
         }
+
+        let mut stores = match self.composition {
+            Some(comp) => comp.stores.lock().unwrap(),
+            None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
+        };
+        let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) {
+            Some(v) => v,
+            None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
+        };
+        // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what
+        // the new value should be. the Mutex stays locked the entire time, so nobody will ever see
+        // this temporary value.
+        let prev_val = std::mem::replace(
+            entry,
+            Box::new(InstantiationState::<T>::Done(Err(
+                CompositionError::Poisoned(entrypoint.clone()),
+            ))),
+        );
+        let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() {
+            InstantiationState::Done(service) => (
+                InstantiationState::Done(service.clone()),
+                futures::future::ready(service).boxed(),
+            ),
+            // the construction of the store has not started yet.
+            InstantiationState::Config(config) => {
+                let (tx, rx) = tokio::sync::watch::channel(None);
+                (
+                    InstantiationState::InProgress(rx),
+                    (async move {
+                        let mut new_context = CompositionContext {
+                            composition: self.composition,
+                            registry: self.registry,
+                            stack: self.stack.clone(),
+                        };
+                        new_context
+                            .stack
+                            .push((TypeId::of::<T>(), entrypoint.clone()));
+                        let res =
+                            config.build(&entrypoint, &new_context).await.map_err(|e| {
+                                match e.downcast() {
+                                    Ok(e) => *e,
+                                    Err(e) => CompositionError::Failed(entrypoint, e.into()),
+                                }
+                            });
+                        tx.send(Some(res.clone())).unwrap();
+                        res
+                    })
+                    .boxed(),
+                )
+            }
+            // there is already a task driving forward the construction of this store, wait for it
+            // to notify us via the provided channel
+            InstantiationState::InProgress(mut recv) => {
+                (InstantiationState::InProgress(recv.clone()), {
+                    (async move {
+                        loop {
+                            if let Some(v) =
+                                recv.borrow_and_update().as_ref().map(|res| res.clone())
+                            {
+                                break v;
+                            }
+                            recv.changed().await.unwrap();
+                        }
+                    })
+                    .boxed()
+                })
+            }
+        };
+        *entry = Box::new(new_val);
+        ret
     }
 }
 
@@ -336,8 +440,8 @@ enum InstantiationState<T: ?Sized> {
     Done(Result<Arc<T>, CompositionError>),
 }
 
-#[derive(Default)]
 pub struct Composition {
+    registry: &'static Registry,
     stores: std::sync::Mutex<HashMap<(TypeId, String), Box<dyn Any + Send + Sync>>>,
 }
 
@@ -381,6 +485,14 @@ impl<T: ?Sized + Send + Sync + 'static>
 }
 
 impl Composition {
+    /// The given registry will be used for creation of anonymous stores during composition
+    pub fn new(registry: &'static Registry) -> Self {
+        Self {
+            registry,
+            stores: Default::default(),
+        }
+    }
+
     pub fn extend_with_configs<T: ?Sized + Send + Sync + 'static>(
         &mut self,
         // Keep the concrete `HashMap` type here since it allows for type
@@ -390,87 +502,17 @@ impl Composition {
         self.extend(configs);
     }
 
+    /// Looks up the entrypoint name in the composition and returns an instantiated service.
     pub async fn build<T: ?Sized + Send + Sync + 'static>(
         &self,
         entrypoint: &str,
     ) -> Result<Arc<T>, CompositionError> {
-        self.build_internal(vec![], entrypoint.to_string()).await
-    }
-
-    fn build_internal<T: ?Sized + Send + Sync + 'static>(
-        &self,
-        stack: Vec<(TypeId, String)>,
-        entrypoint: String,
-    ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
-        let mut stores = self.stores.lock().unwrap();
-        let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) {
-            Some(v) => v,
-            None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
-        };
-        // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what
-        // the new value should be. the Mutex stays locked the entire time, so nobody will ever see
-        // this temporary value.
-        let prev_val = std::mem::replace(
-            entry,
-            Box::new(InstantiationState::<T>::Done(Err(
-                CompositionError::Poisoned(entrypoint.clone()),
-            ))),
-        );
-        let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() {
-            InstantiationState::Done(service) => (
-                InstantiationState::Done(service.clone()),
-                futures::future::ready(service).boxed(),
-            ),
-            // the construction of the store has not started yet.
-            InstantiationState::Config(config) => {
-                let (tx, rx) = tokio::sync::watch::channel(None);
-                (
-                    InstantiationState::InProgress(rx),
-                    (async move {
-                        let mut new_context = CompositionContext {
-                            stack: stack.clone(),
-                            composition: Some(self),
-                        };
-                        new_context
-                            .stack
-                            .push((TypeId::of::<T>(), entrypoint.clone()));
-                        let res =
-                            config.build(&entrypoint, &new_context).await.map_err(|e| {
-                                match e.downcast() {
-                                    Ok(e) => *e,
-                                    Err(e) => CompositionError::Failed(entrypoint, e.into()),
-                                }
-                            });
-                        tx.send(Some(res.clone())).unwrap();
-                        res
-                    })
-                    .boxed(),
-                )
-            }
-            // there is already a task driving forward the construction of this store, wait for it
-            // to notify us via the provided channel
-            InstantiationState::InProgress(mut recv) => {
-                (InstantiationState::InProgress(recv.clone()), {
-                    (async move {
-                        loop {
-                            if let Some(v) =
-                                recv.borrow_and_update().as_ref().map(|res| res.clone())
-                            {
-                                break v;
-                            }
-                            recv.changed().await.unwrap();
-                        }
-                    })
-                    .boxed()
-                })
-            }
-        };
-        *entry = Box::new(new_val);
-        ret
+        self.context().build_internal(entrypoint.to_string()).await
     }
 
     pub fn context(&self) -> CompositionContext {
         CompositionContext {
+            registry: self.registry,
             stack: vec![],
             composition: Some(self),
         }
@@ -496,7 +538,7 @@ mod test {
 
         let blob_services_configs =
             with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap();
-        let mut blob_service_composition = Composition::default();
+        let mut blob_service_composition = Composition::new(&REG);
         blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
         let (blob_service1, blob_service2) = tokio::join!(
             blob_service_composition.build::<dyn BlobService>("default"),
@@ -526,7 +568,7 @@ mod test {
 
         let blob_services_configs =
             with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap();
-        let mut blob_service_composition = Composition::default();
+        let mut blob_service_composition = Composition::new(&REG);
         blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
         match blob_service_composition
             .build::<dyn BlobService>("default")
diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs
index 87a717b3fc2e..244d78a17bf0 100644
--- a/tvix/castore/src/directoryservice/from_addr.rs
+++ b/tvix/castore/src/directoryservice/from_addr.rs
@@ -37,7 +37,7 @@ pub async fn from_addr(
     })?
     .0;
     let directory_service = directory_service_config
-        .build("anonymous", &CompositionContext::blank())
+        .build("anonymous", &CompositionContext::blank(&REG))
         .await?;
 
     Ok(directory_service)
@@ -88,6 +88,16 @@ mod tests {
     #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)]
     /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
     #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
+    /// A valid example for store composition using anonymous urls
+    #[cfg_attr(
+        feature = "xp-store-composition",
+        case::anonymous_url_composition("cache://?near=memory://&far=memory://", true)
+    )]
+    /// Store composition with anonymous urls should fail if the feature is disabled
+    #[cfg_attr(
+        not(feature = "xp-store-composition"),
+        case::anonymous_url_composition("cache://?near=memory://&far=memory://", false)
+    )]
     /// A valid example for Bigtable
     #[cfg_attr(
         all(feature = "cloud", feature = "integration"),
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 3dc816e4ba74..c19f34bbbcf0 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -73,7 +73,7 @@ otlp = ["tvix-tracing/otlp"]
 tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"]
 tracy = ["tvix-tracing/tracy"]
 virtiofs = ["tvix-castore/virtiofs"]
-xp-store-composition = ["toml"]
+xp-store-composition = ["toml", "tvix-castore/xp-store-composition"]
 # Whether to run the integration tests.
 # Requires the following packages in $PATH:
 # cbtemulator, google-cloud-bigtable-tool
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index b2e8b473934a..3dfb08c9e817 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -44,7 +44,10 @@ pub async fn from_addr(
     })?
     .0;
     let path_info_service = path_info_service_config
-        .build("anonymous", context.unwrap_or(&CompositionContext::blank()))
+        .build(
+            "anonymous",
+            context.unwrap_or(&CompositionContext::blank(&REG)),
+        )
         .await?;
 
     Ok(path_info_service)
@@ -53,7 +56,7 @@ pub async fn from_addr(
 #[cfg(test)]
 mod tests {
     use super::from_addr;
-    use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder};
+    use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder, REG};
     use lazy_static::lazy_static;
     use rstest::rstest;
     use tempfile::TempDir;
@@ -125,7 +128,7 @@ mod tests {
     )]
     #[tokio::test]
     async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
-        let mut comp = Composition::default();
+        let mut comp = Composition::new(&REG);
         comp.extend(vec![(
             "default".into(),
             DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {})
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs
index 1385ece39f8a..86ec367b66ad 100644
--- a/tvix/store/src/utils.rs
+++ b/tvix/store/src/utils.rs
@@ -188,7 +188,7 @@ pub async fn construct_services_from_configs(
     ),
     Box<dyn std::error::Error + Send + Sync>,
 > {
-    let mut comp = Composition::default();
+    let mut comp = Composition::new(&REG);
 
     comp.extend(configs.blobservices);
     comp.extend(configs.directoryservices);