about summary refs log tree commit diff
path: root/tvix/castore/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src')
-rw-r--r--tvix/castore/src/blobservice/combinator.rs29
-rw-r--r--tvix/castore/src/blobservice/grpc.rs22
-rw-r--r--tvix/castore/src/blobservice/memory.rs17
-rw-r--r--tvix/castore/src/blobservice/mod.rs18
-rw-r--r--tvix/castore/src/blobservice/object_store.rs36
-rw-r--r--tvix/castore/src/composition.rs377
-rw-r--r--tvix/castore/src/directoryservice/bigtable.rs87
-rw-r--r--tvix/castore/src/directoryservice/combinators.rs29
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs23
-rw-r--r--tvix/castore/src/directoryservice/memory.rs17
-rw-r--r--tvix/castore/src/directoryservice/mod.rs25
-rw-r--r--tvix/castore/src/directoryservice/object_store.rs31
-rw-r--r--tvix/castore/src/lib.rs1
13 files changed, 666 insertions, 46 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs
index 067eff96f488..0bce657e9176 100644
--- a/tvix/castore/src/blobservice/combinator.rs
+++ b/tvix/castore/src/blobservice/combinator.rs
@@ -1,8 +1,11 @@
+use std::sync::Arc;
+
 use futures::{StreamExt, TryStreamExt};
 use tokio_util::io::{ReaderStream, StreamReader};
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::B3Digest;
 
 use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
@@ -93,6 +96,32 @@ where
     }
 }
 
+#[derive(serde::Deserialize, Debug, Clone)]
+#[serde(deny_unknown_fields)]
+pub struct CombinedBlobServiceConfig {
+    local: String,
+    remote: String,
+}
+
+#[async_trait]
+impl ServiceBuilder for CombinedBlobServiceConfig {
+    type Output = dyn BlobService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        context: &CompositionContext<dyn BlobService>,
+    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
+        let (local, remote) = futures::join!(
+            context.resolve(self.local.clone()),
+            context.resolve(self.remote.clone())
+        );
+        Ok(Arc::new(CombinedBlobService {
+            local: local?,
+            remote: remote?,
+        }))
+    }
+}
+
 fn make_chunked_reader<BS>(
     // This must consume, as we can't retain references to blob_service,
     // as it'd add a lifetime to BlobReader in general, which will get
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 85250da99d05..56a2ebf00038 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -1,4 +1,5 @@
 use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{
     proto::{self, stat_blob_response::ChunkMeta},
     B3Digest,
@@ -180,6 +181,27 @@ where
     }
 }
 
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct GRPCBlobServiceConfig {
+    url: String,
+}
+
+#[async_trait]
+impl ServiceBuilder for GRPCBlobServiceConfig {
+    type Output = dyn BlobService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn BlobService>,
+    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let client = proto::blob_service_client::BlobServiceClient::new(
+            crate::tonic::channel_from_url(&self.url.parse()?).await?,
+        );
+        Ok(Arc::new(GRPCBlobService::from_client(client)))
+    }
+}
+
 pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> {
     /// The task containing the put request, and the inner writer, if we're still writing.
     task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>,
diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs
index 873d06b461de..0205dcf7bd70 100644
--- a/tvix/castore/src/blobservice/memory.rs
+++ b/tvix/castore/src/blobservice/memory.rs
@@ -6,6 +6,7 @@ use tonic::async_trait;
 use tracing::instrument;
 
 use super::{BlobReader, BlobService, BlobWriter};
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::B3Digest;
 
 #[derive(Clone, Default)]
@@ -37,6 +38,22 @@ impl BlobService for MemoryBlobService {
     }
 }
 
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct MemoryBlobServiceConfig {}
+
+#[async_trait]
+impl ServiceBuilder for MemoryBlobServiceConfig {
+    type Output = dyn BlobService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn BlobService>,
+    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        Ok(Arc::new(MemoryBlobService::default()))
+    }
+}
+
 pub struct MemoryBlobWriter {
     db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
 
diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs
index 50acd40bf769..83fb5b674bb2 100644
--- a/tvix/castore/src/blobservice/mod.rs
+++ b/tvix/castore/src/blobservice/mod.rs
@@ -1,6 +1,8 @@
 use std::io;
+
 use tonic::async_trait;
 
+use crate::composition::{Registry, ServiceBuilder};
 use crate::proto::stat_blob_response::ChunkMeta;
 use crate::B3Digest;
 
@@ -16,11 +18,11 @@ mod object_store;
 pub mod tests;
 
 pub use self::chunked_reader::ChunkedReader;
-pub use self::combinator::CombinedBlobService;
+pub use self::combinator::{CombinedBlobService, CombinedBlobServiceConfig};
 pub use self::from_addr::from_addr;
-pub use self::grpc::GRPCBlobService;
-pub use self::memory::MemoryBlobService;
-pub use self::object_store::ObjectStoreBlobService;
+pub use self::grpc::{GRPCBlobService, GRPCBlobServiceConfig};
+pub use self::memory::{MemoryBlobService, MemoryBlobServiceConfig};
+pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfig};
 
 /// The base trait all BlobService services need to implement.
 /// It provides functions to check whether a given blob exists,
@@ -101,3 +103,11 @@ impl BlobReader for io::Cursor<&'static [u8; 0]> {}
 impl BlobReader for io::Cursor<Vec<u8>> {}
 impl BlobReader for io::Cursor<bytes::Bytes> {}
 impl BlobReader for tokio::fs::File {}
+
+/// Registers the builtin BlobService implementations with the registry
+pub(crate) fn register_blob_services(reg: &mut Registry) {
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::ObjectStoreBlobServiceConfig>("objectstore");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::MemoryBlobServiceConfig>("memory");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::CombinedBlobServiceConfig>("combined");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::GRPCBlobServiceConfig>("grpc");
+}
diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs
index d2d0a288a557..bd8c2bd747fb 100644
--- a/tvix/castore/src/blobservice/object_store.rs
+++ b/tvix/castore/src/blobservice/object_store.rs
@@ -1,4 +1,5 @@
 use std::{
+    collections::HashMap,
     io::{self, Cursor},
     pin::pin,
     sync::Arc,
@@ -18,6 +19,7 @@ use tracing::{debug, instrument, trace, Level};
 use url::Url;
 
 use crate::{
+    composition::{CompositionContext, ServiceBuilder},
     proto::{stat_blob_response::ChunkMeta, StatBlobResponse},
     B3Digest, B3HashingReader,
 };
@@ -269,6 +271,40 @@ impl BlobService for ObjectStoreBlobService {
     }
 }
 
+fn default_avg_chunk_size() -> u32 {
+    256 * 1024
+}
+
+#[derive(serde::Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct ObjectStoreBlobServiceConfig {
+    object_store_url: String,
+    #[serde(default = "default_avg_chunk_size")]
+    avg_chunk_size: u32,
+    #[serde(default)]
+    object_store_options: HashMap<String, String>,
+}
+
+#[async_trait]
+impl ServiceBuilder for ObjectStoreBlobServiceConfig {
+    type Output = dyn BlobService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn BlobService>,
+    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let (object_store, path) = object_store::parse_url_opts(
+            &self.object_store_url.parse()?,
+            &self.object_store_options,
+        )?;
+        Ok(Arc::new(ObjectStoreBlobService {
+            object_store: Arc::new(object_store),
+            base_path: path,
+            avg_chunk_size: self.avg_chunk_size,
+        }))
+    }
+}
+
 /// Reads blob contents from a AsyncRead, chunks and uploads them.
 /// On success, returns a [StatBlobResponse] pointing to the individual chunks.
 #[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)]
diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs
new file mode 100644
index 000000000000..88cedcb832d0
--- /dev/null
+++ b/tvix/castore/src/composition.rs
@@ -0,0 +1,377 @@
+//! The composition module allows composing different kinds of services based on a set of service
+//! configurations _at runtime_.
+//!
+//! Store configs are deserialized with serde. The registry provides a stateful mapping from the
+//! `type` tag of an internally tagged enum on the serde side to a Config struct which is
+//! deserialized and then returned as a `Box<dyn ServiceBuilder<Output = dyn BlobService>>`
+//! (the same for DirectoryService instead of BlobService etc).
+//!
+//! ### Example 1.: Implementing a new BlobService
+//!
+//! You need a Config struct which implements `DeserializeOwned` and
+//! `ServiceBuilder<Output = dyn BlobService>`.
+//! Provide the user with a function to call with
+//! their registry. You register your new type as:
+//!
+//! ```
+//! use std::sync::Arc;
+//!
+//! use tvix_castore::composition::*;
+//! use tvix_castore::blobservice::BlobService;
+//!
+//! #[derive(serde::Deserialize)]
+//! struct MyBlobServiceConfig {
+//! }
+//!
+//! #[tonic::async_trait]
+//! impl ServiceBuilder for MyBlobServiceConfig {
+//!     type Output = dyn BlobService;
+//!     async fn build(&self, _: &str, _: &CompositionContext<Self::Output>) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+//!         todo!()
+//!     }
+//! }
+//!
+//! pub fn add_my_service(reg: &mut Registry) {
+//!     reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, MyBlobServiceConfig>("myblobservicetype");
+//! }
+//! ```
+//!
+//! Now, when a user deserializes a store config with the type tag "myblobservicetype" into a
+//! `Box<dyn ServiceBuilder<Output = Arc<dyn BlobService>>>`, it will be done via `MyBlobServiceConfig`.
+//!
+//! ### Example 2.: Composing stores to get one store
+//!
+//! ```
+//! use tvix_castore::composition::*;
+//! use tvix_castore::blobservice::BlobService;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async move {
+//! let blob_services_configs_json = serde_json::json!({
+//!   "blobstore1": {
+//!     "type": "memory"
+//!   },
+//!   "blobstore2": {
+//!     "type": "memory"
+//!   },
+//!   "default": {
+//!     "type": "combined",
+//!     "local": "blobstore1",
+//!     "remote": "blobstore2"
+//!   }
+//! });
+//!
+//! let blob_services_configs = with_registry(&REG, || serde_json::from_value(blob_services_configs_json))?;
+//! let blob_service_composition = Composition::<dyn BlobService>::from_configs(blob_services_configs);
+//! let blob_service = blob_service_composition.build("default").await?;
+//! # Ok(())
+//! # })
+//! # }
+//! ```
+//!
+//! ### Example 3.: Creating another registry extending the default registry with third-party types
+//!
+//! ```
+//! # pub fn add_my_service(reg: &mut tvix_castore::composition::Registry) {}
+//! let mut my_registry = tvix_castore::composition::Registry::default();
+//! tvix_castore::composition::add_default_services(&mut my_registry);
+//! add_my_service(&mut my_registry);
+//! ```
+//!
+//! Continue with Example 2, with my_registry instead of REG
+
+use erased_serde::deserialize;
+use futures::future::BoxFuture;
+use futures::FutureExt;
+use lazy_static::lazy_static;
+use serde::de::DeserializeOwned;
+use serde_tagged::de::{BoxFnSeed, SeedFactory};
+use serde_tagged::util::TagString;
+use std::any::{Any, TypeId};
+use std::cell::Cell;
+use std::collections::BTreeMap;
+use std::collections::HashMap;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use tonic::async_trait;
+
+/// Resolves tag names to the corresponding Config type.
+// Registry implementation details:
+// This is really ugly. Really we would want to store this as a generic static field:
+//
+// ```
+// struct Registry<T>(BTreeMap<(&'static str), BoxSeedFn<T>);
+// static REG<T>: Registry<T>;
+// ```
+//
+// so that one version of the static is generated for each Type that the registry is accessed for.
+// However, this is not possible, because generics are only a thing in functions, and even there
+// they will not interact with static items:
+// https://doc.rust-lang.org/reference/items/static-items.html#statics--generics
+//
+// So instead, we make this lookup at runtime by putting the TypeId into the key.
+// But now we can no longer store the `BoxFnSeed<T>` because we are lacking the generic parameter
+// T, so instead store it as `Box<dyn Any>` and downcast to `&BoxFnSeed<T>` when performing the
+// lookup.
+// I said it was ugly...
+#[derive(Default)]
+pub struct Registry(BTreeMap<(TypeId, &'static str), Box<dyn Any + Sync>>);
+
+struct RegistryWithFakeType<'r, T>(&'r Registry, PhantomData<T>);
+
+impl<'r, 'de: 'r, T: 'static> SeedFactory<'de, TagString<'de>> for RegistryWithFakeType<'r, T> {
+    type Value = DeserializeWithRegistry<T>;
+    type Seed = &'r BoxFnSeed<Self::Value>;
+
+    // Required method
+    fn seed<E>(self, tag: TagString<'de>) -> Result<Self::Seed, E>
+    where
+        E: serde::de::Error,
+    {
+        // using find() and not get() because of https://github.com/rust-lang/rust/issues/80389
+        let seed: &Box<dyn Any + Sync> = self
+            .0
+             .0
+            .iter()
+            .find(|(k, _)| *k == &(TypeId::of::<T>(), tag.as_ref()))
+            .ok_or_else(|| serde::de::Error::custom("Unknown tag"))?
+            .1;
+
+        Ok(<dyn Any>::downcast_ref(&**seed).unwrap())
+    }
+}
+
+/// Wrapper type which implements Deserialize using the registry
+///
+/// Wrap your type in this in order to deserialize it using a registry, e.g.
+/// `RegistryWithFakeType<Box<dyn MyTrait>>`, then the types registered for `Box<dyn MyTrait>`
+/// will be used.
+pub struct DeserializeWithRegistry<T>(T);
+
+impl Registry {
+    /// Registers a mapping from type tag to a concrete type into the registry.
+    ///
+    /// The type parameters are very important:
+    /// After calling `register::<Box<dyn FooTrait>, FooStruct>("footype")`, when a user
+    /// deserializes into an input with the type tag "myblobservicetype" into a
+    /// `Box<dyn FooTrait>`, it will first call the Deserialize imple of `FooStruct` and
+    /// then convert it into a `Box<dyn FooTrait>` using From::from.
+    pub fn register<T: 'static, C: DeserializeOwned + Into<T>>(&mut self, type_name: &'static str) {
+        let seed = BoxFnSeed::new(|x| {
+            deserialize::<C>(x)
+                .map(Into::into)
+                .map(DeserializeWithRegistry)
+        });
+        self.0
+            .insert((TypeId::of::<T>(), type_name), Box::new(seed));
+    }
+}
+
+impl<'de, T: 'static> serde::Deserialize<'de> for DeserializeWithRegistry<T> {
+    fn deserialize<D>(de: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        serde_tagged::de::internal::deserialize(
+            de,
+            "type",
+            RegistryWithFakeType(ACTIVE_REG.get().unwrap(), PhantomData::<T>),
+        )
+    }
+}
+
+thread_local! {
+    /// The active Registry is global state, because there is no convenient and universal way to pass state
+    /// into the functions usually used for deserialization, e.g. `serde_json::from_str`, `toml::from_str`,
+    /// `serde_qs::from_str`.
+    static ACTIVE_REG: Cell<Option<&'static Registry>> = panic!("reg was accessed before initialization");
+}
+
+/// Run the provided closure with a registry context.
+/// Any serde deserialize calls within the closure will use the registry to resolve tag names to
+/// the corresponding Config type.
+pub fn with_registry<R>(reg: &'static Registry, f: impl FnOnce() -> R) -> R {
+    ACTIVE_REG.set(Some(reg));
+    let result = f();
+    ACTIVE_REG.set(None);
+    result
+}
+
+lazy_static! {
+    /// The provided registry of tvix_castore, with all builtin BlobStore/DirectoryStore implementations
+    pub static ref REG: Registry = {
+        let mut reg = Registry(Default::default());
+        add_default_services(&mut reg);
+        reg
+    };
+}
+
+// ---------- End of generic registry code --------- //
+
+/// Register the builtin services of tvix_castore with the given registry.
+/// This is useful for creating your own registry with the builtin types _and_
+/// extra third party types.
+pub fn add_default_services(reg: &mut Registry) {
+    crate::blobservice::register_blob_services(reg);
+    crate::directoryservice::register_directory_services(reg);
+}
+
+pub struct CompositionContext<'a, T: ?Sized> {
+    stack: Vec<String>,
+    composition: &'a Composition<T>,
+}
+
+impl<'a, T: ?Sized + Send + Sync + 'static> CompositionContext<'a, T> {
+    pub async fn resolve(
+        &self,
+        entrypoint: String,
+    ) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        // disallow recursion
+        if self.stack.contains(&entrypoint) {
+            return Err(CompositionError::Recursion(self.stack.clone()).into());
+        }
+        Ok(self
+            .composition
+            .build_internal(self.stack.clone(), entrypoint)
+            .await?)
+    }
+}
+
+#[async_trait]
+/// This is the trait usually implemented on a per-store-type Config struct and
+/// used to instantiate it.
+pub trait ServiceBuilder: Send + Sync {
+    type Output: ?Sized;
+    async fn build(
+        &self,
+        instance_name: &str,
+        context: &CompositionContext<Self::Output>,
+    ) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>>;
+}
+
+impl<T: ?Sized, S: ServiceBuilder<Output = T> + 'static> From<S>
+    for Box<dyn ServiceBuilder<Output = T>>
+{
+    fn from(t: S) -> Self {
+        Box::new(t)
+    }
+}
+
+enum InstantiationState<T: ?Sized> {
+    Config(Box<dyn ServiceBuilder<Output = T>>),
+    InProgress(tokio::sync::watch::Receiver<Option<Result<Arc<T>, CompositionError>>>),
+    Done(Result<Arc<T>, CompositionError>),
+}
+
+pub struct Composition<T: ?Sized> {
+    stores: std::sync::Mutex<HashMap<String, InstantiationState<T>>>,
+}
+
+#[derive(thiserror::Error, Clone, Debug)]
+pub enum CompositionError {
+    #[error("store not found: {0}")]
+    NotFound(String),
+    #[error("recursion not allowed {0:?}")]
+    Recursion(Vec<String>),
+    #[error("store construction panicked {0}")]
+    Poisoned(String),
+    #[error("instantiation of service {0} failed: {1}")]
+    Failed(String, Arc<dyn std::error::Error + Send + Sync>),
+}
+
+impl<T: ?Sized + Send + Sync + 'static> Composition<T> {
+    pub fn from_configs(
+        // Keep the concrete `HashMap` type here since it allows for type
+        // inference of what type is previously being deserialized.
+        configs: HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>>,
+    ) -> Self {
+        Self::from_configs_iter(configs)
+    }
+
+    pub fn from_configs_iter(
+        configs: impl IntoIterator<
+            Item = (
+                String,
+                DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>,
+            ),
+        >,
+    ) -> Self {
+        Composition {
+            stores: std::sync::Mutex::new(
+                configs
+                    .into_iter()
+                    .map(|(k, v)| (k, InstantiationState::Config(v.0)))
+                    .collect(),
+            ),
+        }
+    }
+
+    pub async fn build(&self, entrypoint: &str) -> Result<Arc<T>, CompositionError> {
+        self.build_internal(vec![], entrypoint.to_string()).await
+    }
+
+    fn build_internal(
+        &self,
+        stack: Vec<String>,
+        entrypoint: String,
+    ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
+        let mut stores = self.stores.lock().unwrap();
+        let entry = match stores.get_mut(&entrypoint) {
+            Some(v) => v,
+            None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
+        };
+        // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what
+        // the new value should be. the Mutex stays locked the entire time, so nobody will ever see
+        // this temporary value.
+        let prev_val = std::mem::replace(
+            entry,
+            InstantiationState::Done(Err(CompositionError::Poisoned(entrypoint.clone()))),
+        );
+        let (new_val, ret) = match prev_val {
+            InstantiationState::Done(service) => (
+                InstantiationState::Done(service.clone()),
+                futures::future::ready(service).boxed(),
+            ),
+            // the construction of the store has not started yet.
+            InstantiationState::Config(config) => {
+                let (tx, rx) = tokio::sync::watch::channel(None);
+                (
+                    InstantiationState::InProgress(rx),
+                    (async move {
+                        let mut new_context = CompositionContext {
+                            stack: stack.clone(),
+                            composition: self,
+                        };
+                        new_context.stack.push(entrypoint.clone());
+                        let res = config
+                            .build(&entrypoint, &new_context)
+                            .await
+                            .map_err(|e| CompositionError::Failed(entrypoint, e.into()));
+                        tx.send(Some(res.clone())).unwrap();
+                        res
+                    })
+                    .boxed(),
+                )
+            }
+            // there is already a task driving forward the construction of this store, wait for it
+            // to notify us via the provided channel
+            InstantiationState::InProgress(mut recv) => {
+                (InstantiationState::InProgress(recv.clone()), {
+                    (async move {
+                        loop {
+                            if let Some(v) =
+                                recv.borrow_and_update().as_ref().map(|res| res.clone())
+                            {
+                                break v;
+                            }
+                            recv.changed().await.unwrap();
+                        }
+                    })
+                    .boxed()
+                })
+            }
+        };
+        *entry = new_val;
+        ret
+    }
+}
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs
index 1194c6ddc999..69edc05d787d 100644
--- a/tvix/castore/src/directoryservice/bigtable.rs
+++ b/tvix/castore/src/directoryservice/bigtable.rs
@@ -5,10 +5,12 @@ use futures::stream::BoxStream;
 use prost::Message;
 use serde::{Deserialize, Serialize};
 use serde_with::{serde_as, DurationSeconds};
+use std::sync::Arc;
 use tonic::async_trait;
 use tracing::{instrument, trace, warn};
 
 use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter};
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
 
 /// There should not be more than 10 MiB in a single cell.
@@ -43,41 +45,6 @@ pub struct BigtableDirectoryService {
     emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
 }
 
-/// Represents configuration of [BigtableDirectoryService].
-/// This currently conflates both connect parameters and data model/client
-/// behaviour parameters.
-#[serde_as]
-#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
-pub struct BigtableParameters {
-    project_id: String,
-    instance_name: String,
-    #[serde(default)]
-    is_read_only: bool,
-    #[serde(default = "default_channel_size")]
-    channel_size: usize,
-
-    #[serde_as(as = "Option<DurationSeconds<String>>")]
-    #[serde(default = "default_timeout")]
-    timeout: Option<std::time::Duration>,
-    table_name: String,
-    family_name: String,
-
-    #[serde(default = "default_app_profile_id")]
-    app_profile_id: String,
-}
-
-fn default_app_profile_id() -> String {
-    "default".to_owned()
-}
-
-fn default_channel_size() -> usize {
-    4
-}
-
-fn default_timeout() -> Option<std::time::Duration> {
-    Some(std::time::Duration::from_secs(4))
-}
-
 impl BigtableDirectoryService {
     #[cfg(not(test))]
     pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
@@ -355,3 +322,53 @@ impl DirectoryService for BigtableDirectoryService {
         Box::new(SimplePutter::new(self.clone()))
     }
 }
+
+/// Represents configuration of [BigtableDirectoryService].
+/// This currently conflates both connect parameters and data model/client
+/// behaviour parameters.
+#[serde_as]
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+#[serde(deny_unknown_fields)]
+pub struct BigtableParameters {
+    project_id: String,
+    instance_name: String,
+    #[serde(default)]
+    is_read_only: bool,
+    #[serde(default = "default_channel_size")]
+    channel_size: usize,
+
+    #[serde_as(as = "Option<DurationSeconds<String>>")]
+    #[serde(default = "default_timeout")]
+    timeout: Option<std::time::Duration>,
+    table_name: String,
+    family_name: String,
+
+    #[serde(default = "default_app_profile_id")]
+    app_profile_id: String,
+}
+
+#[async_trait]
+impl ServiceBuilder for BigtableParameters {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
+        Ok(Arc::new(
+            BigtableDirectoryService::connect(self.clone()).await?,
+        ))
+    }
+}
+
+fn default_app_profile_id() -> String {
+    "default".to_owned()
+}
+
+fn default_channel_size() -> usize {
+    4
+}
+
+fn default_timeout() -> Option<std::time::Duration> {
+    Some(std::time::Duration::from_secs(4))
+}
diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs
index d3f351d6b689..2364a313d5f4 100644
--- a/tvix/castore/src/directoryservice/combinators.rs
+++ b/tvix/castore/src/directoryservice/combinators.rs
@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
 use futures::stream::BoxStream;
 use futures::StreamExt;
 use futures::TryFutureExt;
@@ -6,6 +8,7 @@ use tonic::async_trait;
 use tracing::{instrument, trace};
 
 use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::directoryservice::DirectoryPutter;
 use crate::proto;
 use crate::B3Digest;
@@ -140,3 +143,29 @@ where
         Box::new(SimplePutter::new((*self).clone()))
     }
 }
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct CacheConfig {
+    near: String,
+    far: String,
+}
+
+#[async_trait]
+impl ServiceBuilder for CacheConfig {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let (near, far) = futures::join!(
+            context.resolve(self.near.clone()),
+            context.resolve(self.far.clone())
+        );
+        Ok(Arc::new(Cache {
+            near: near?,
+            far: far?,
+        }))
+    }
+}
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index ca9b0de07b14..e2ca3954c112 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -1,10 +1,12 @@
 use std::collections::HashSet;
 
 use super::{DirectoryPutter, DirectoryService};
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::proto::{self, get_directory_request::ByWhat};
 use crate::{B3Digest, Error};
 use async_stream::try_stream;
 use futures::stream::BoxStream;
+use std::sync::Arc;
 use tokio::spawn;
 use tokio::sync::mpsc::UnboundedSender;
 use tokio::task::JoinHandle;
@@ -216,6 +218,27 @@ where
     }
 }
 
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct GRPCDirectoryServiceConfig {
+    url: String,
+}
+
+#[async_trait]
+impl ServiceBuilder for GRPCDirectoryServiceConfig {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let client = proto::directory_service_client::DirectoryServiceClient::new(
+            crate::tonic::channel_from_url(&self.url.parse()?).await?,
+        );
+        Ok(Arc::new(GRPCDirectoryService::from_client(client)))
+    }
+}
+
 /// Allows uploading multiple Directory messages in the same gRPC stream.
 pub struct GRPCPutter {
     /// Data about the current request - a handle to the task, and the tx part
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
index 3b2795c3968c..f12ef6e977d0 100644
--- a/tvix/castore/src/directoryservice/memory.rs
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -8,6 +8,7 @@ use tracing::{instrument, warn};
 
 use super::utils::traverse_directory;
 use super::{DirectoryPutter, DirectoryService, SimplePutter};
+use crate::composition::{CompositionContext, ServiceBuilder};
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
@@ -85,3 +86,19 @@ impl DirectoryService for MemoryDirectoryService {
         Box::new(SimplePutter::new(self.clone()))
     }
 }
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct MemoryDirectoryServiceConfig {}
+
+#[async_trait]
+impl ServiceBuilder for MemoryDirectoryServiceConfig {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        Ok(Arc::new(MemoryDirectoryService::default()))
+    }
+}
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index eff4a685fa6d..815502c81455 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -1,7 +1,8 @@
+use crate::composition::{Registry, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
+
 use futures::stream::BoxStream;
 use tonic::async_trait;
-
 mod combinators;
 mod directory_graph;
 mod from_addr;
@@ -16,12 +17,12 @@ pub mod tests;
 mod traverse;
 mod utils;
 
-pub use self::combinators::Cache;
+pub use self::combinators::{Cache, CacheConfig};
 pub use self::directory_graph::DirectoryGraph;
 pub use self::from_addr::from_addr;
-pub use self::grpc::GRPCDirectoryService;
-pub use self::memory::MemoryDirectoryService;
-pub use self::object_store::ObjectStoreDirectoryService;
+pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig};
+pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig};
+pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectoryServiceConfig};
 pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
 pub use self::simple_putter::SimplePutter;
 pub use self::sled::SledDirectoryService;
@@ -32,7 +33,7 @@ pub use self::utils::traverse_directory;
 mod bigtable;
 
 #[cfg(feature = "cloud")]
-pub use self::bigtable::BigtableDirectoryService;
+pub use self::bigtable::{BigtableDirectoryService, BigtableParameters};
 
 /// The base trait all Directory services need to implement.
 /// This is a simple get and put of [crate::proto::Directory], returning their
@@ -126,3 +127,15 @@ pub trait DirectoryPutter: Send {
     /// be returned.
     async fn close(&mut self) -> Result<B3Digest, Error>;
 }
+
+/// Registers the builtin DirectoryService implementations with the registry
+pub(crate) fn register_directory_services(reg: &mut Registry) {
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::ObjectStoreDirectoryServiceConfig>("objectstore");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::MemoryDirectoryServiceConfig>("memory");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc");
+    #[cfg(feature = "cloud")]
+    {
+        reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable");
+    }
+}
diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs
index feaaaa39cd50..1977de18fbec 100644
--- a/tvix/castore/src/directoryservice/object_store.rs
+++ b/tvix/castore/src/directoryservice/object_store.rs
@@ -1,3 +1,4 @@
+use std::collections::HashMap;
 use std::sync::Arc;
 
 use data_encoding::HEXLOWER;
@@ -18,6 +19,7 @@ use url::Url;
 use super::{
     DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator,
 };
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
 
 /// Stores directory closures in an object store.
@@ -46,7 +48,7 @@ fn derive_dirs_path(base_path: &Path, digest: &B3Digest) -> Path {
 const MAX_FRAME_LENGTH: usize = 1 * 1024 * 1024 * 1000; // 1 MiB
                                                         //
 impl ObjectStoreDirectoryService {
-    /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by
+    /// Constructs a new [ObjectStoreDirectoryService] from a [Url] supported by
     /// [object_store].
     /// Any path suffix becomes the base path of the object store.
     /// additional options, the same as in [object_store::parse_url_opts] can
@@ -169,6 +171,33 @@ impl DirectoryService for ObjectStoreDirectoryService {
     }
 }
 
+#[derive(serde::Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct ObjectStoreDirectoryServiceConfig {
+    object_store_url: String,
+    #[serde(default)]
+    object_store_options: HashMap<String, String>,
+}
+
+#[async_trait]
+impl ServiceBuilder for ObjectStoreDirectoryServiceConfig {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let (object_store, path) = object_store::parse_url_opts(
+            &self.object_store_url.parse()?,
+            &self.object_store_options,
+        )?;
+        Ok(Arc::new(ObjectStoreDirectoryService {
+            object_store: Arc::new(object_store),
+            base_path: path,
+        }))
+    }
+}
+
 struct ObjectStoreDirectoryPutter {
     object_store: Arc<dyn ObjectStore>,
     base_path: Path,
diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs
index bdc533a8c5e6..4fca9801c97b 100644
--- a/tvix/castore/src/lib.rs
+++ b/tvix/castore/src/lib.rs
@@ -3,6 +3,7 @@ mod errors;
 mod hashing_reader;
 
 pub mod blobservice;
+pub mod composition;
 pub mod directoryservice;
 pub mod fixtures;