about summary refs log tree commit diff
path: root/tvix/store
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store')
-rw-r--r--tvix/store/Cargo.toml110
-rw-r--r--tvix/store/build.rs12
-rw-r--r--tvix/store/default.nix48
-rw-r--r--tvix/store/docs/api.md288
-rw-r--r--tvix/store/protos/default.nix21
-rw-r--r--tvix/store/src/bin/tvix-store.rs381
-rw-r--r--tvix/store/src/composition.rs22
-rw-r--r--tvix/store/src/import.rs67
-rw-r--r--tvix/store/src/lib.rs1
-rw-r--r--tvix/store/src/nar/import.rs109
-rw-r--r--tvix/store/src/nar/mod.rs18
-rw-r--r--tvix/store/src/nar/renderer.rs84
-rw-r--r--tvix/store/src/nar/seekable.rs422
-rw-r--r--tvix/store/src/pathinfoservice/bigtable.rs128
-rw-r--r--tvix/store/src/pathinfoservice/combinators.rs38
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs163
-rw-r--r--tvix/store/src/pathinfoservice/fs/mod.rs35
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs87
-rw-r--r--tvix/store/src/pathinfoservice/lru.rs42
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs28
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs49
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs120
-rw-r--r--tvix/store/src/pathinfoservice/redb.rs218
-rw-r--r--tvix/store/src/pathinfoservice/signing_wrapper.rs204
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs77
-rw-r--r--tvix/store/src/pathinfoservice/tests/mod.rs92
-rw-r--r--tvix/store/src/pathinfoservice/tests/utils.rs39
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs29
-rw-r--r--tvix/store/src/proto/mod.rs77
-rw-r--r--tvix/store/src/proto/tests/pathinfo.rs54
-rw-r--r--tvix/store/src/tests/fixtures.rs72
-rw-r--r--tvix/store/src/tests/mod.rs1
-rw-r--r--tvix/store/src/tests/nar_renderer.rs219
-rw-r--r--tvix/store/src/tests/nar_renderer_seekable.rs111
-rw-r--r--tvix/store/src/utils.rs234
35 files changed, 2365 insertions, 1335 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 4727f43f78ff..328e2ae2e80b 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -4,66 +4,64 @@ version = "0.1.0"
 edition = "2021"
 
 [dependencies]
-anyhow = "1.0.68"
-async-compression = { version = "0.4.9", features = ["tokio", "bzip2", "gzip", "xz", "zstd"]}
-async-stream = "0.3.5"
-blake3 = { version = "1.3.1", features = ["rayon", "std"] }
-bstr = "1.6.0"
-bytes = "1.4.0"
-clap = { version = "4.0", features = ["derive", "env"] }
-count-write = "0.1.0"
-data-encoding = "2.3.3"
-futures = "0.3.30"
-lazy_static = "1.4.0"
+anyhow = { workspace = true }
+async-compression = { workspace = true, features = ["tokio", "bzip2", "gzip", "xz", "zstd"] }
+async-stream = { workspace = true }
+blake3 = { workspace = true, features = ["rayon", "std"] }
+bstr = { workspace = true }
+bytes = { workspace = true }
+clap = { workspace = true, features = ["derive", "env"] }
+count-write = { workspace = true }
+data-encoding = { workspace = true }
+ed25519 = { workspace = true }
+ed25519-dalek = { workspace = true }
+futures = { workspace = true }
+lazy_static = { workspace = true }
 nix-compat = { path = "../nix-compat", features = ["async"] }
-pin-project-lite = "0.2.13"
-prost = "0.12.1"
-opentelemetry = { version = "0.22.0", optional = true}
-opentelemetry-otlp = { version = "0.15.0", optional = true }
-opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"], optional = true}
-serde = { version = "1.0.197", features = [ "derive" ] }
-serde_json = "1.0"
-serde_with = "3.7.0"
-serde_qs = "0.12.0"
-sha2 = "0.10.6"
-sled = { version = "0.34.7" }
-thiserror = "1.0.38"
-tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
-tokio-listener = { version = "0.4.1", features = [ "tonic011" ] }
-tokio-stream = { version = "0.1.14", features = ["fs"] }
-tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
-tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
-tower = "0.4.13"
-tracing = "0.1.37"
-tracing-opentelemetry = "0.23.0"
-tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+pin-project-lite = { workspace = true }
+prost = { workspace = true }
+serde = { workspace = true, features = ["derive"] }
+serde_json = { workspace = true }
+serde_with = { workspace = true }
+serde_qs = { workspace = true }
+sha2 = { workspace = true }
+sled = { workspace = true }
+thiserror = { workspace = true }
+tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
+tokio-listener = { workspace = true, features = ["clap", "multi-listener", "sd_listen", "tonic012"] }
+tokio-stream = { workspace = true, features = ["fs"] }
+tokio-util = { workspace = true, features = ["io", "io-util", "compat"] }
+tonic = { workspace = true, features = ["tls", "tls-roots"] }
+tower = { workspace = true }
+tower-http = { workspace = true, features = ["trace"] }
 tvix-castore = { path = "../castore" }
-url = "2.4.0"
-walkdir = "2.4.0"
-reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false }
-lru = "0.12.3"
-parking_lot = "0.12.2"
-
-[dependencies.tonic-reflection]
-optional = true
-version = "0.11.0"
-
-[dependencies.bigtable_rs]
-optional = true
-# https://github.com/liufuyang/bigtable_rs/pull/72
-git = "https://github.com/flokli/bigtable_rs"
-rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7"
+url = { workspace = true }
+walkdir = { workspace = true }
+reqwest = { workspace = true, features = ["rustls-tls-native-roots", "stream"] }
+reqwest-middleware = { workspace = true }
+lru = { workspace = true }
+parking_lot = { workspace = true }
+tvix-tracing = { path = "../tracing", features = ["tonic", "reqwest"] }
+tracing = { workspace = true }
+tracing-indicatif = { workspace = true }
+hyper-util = { workspace = true }
+toml = { version = "0.8.19", optional = true }
+tonic-health = { workspace = true }
+redb = { workspace = true }
+mimalloc = { workspace = true }
+tonic-reflection = { workspace = true, optional = true }
+bigtable_rs = { workspace = true, optional = true }
 
 [build-dependencies]
-prost-build = "0.12.1"
-tonic-build = "0.11.0"
+prost-build = { workspace = true }
+tonic-build = { workspace = true }
 
 [dev-dependencies]
-async-process = "2.1.0"
-rstest = "0.19.0"
-rstest_reuse = "0.6.0"
-tempfile = "3.3.0"
-tokio-retry = "0.3.0"
+async-process = { workspace = true }
+rstest = { workspace = true }
+rstest_reuse = { workspace = true }
+tempfile = { workspace = true }
+tokio-retry = { workspace = true }
 
 [features]
 default = ["cloud", "fuse", "otlp", "tonic-reflection"]
@@ -72,9 +70,11 @@ cloud = [
   "tvix-castore/cloud"
 ]
 fuse = ["tvix-castore/fuse"]
-otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"]
+otlp = ["tvix-tracing/otlp"]
 tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"]
+tracy = ["tvix-tracing/tracy"]
 virtiofs = ["tvix-castore/virtiofs"]
+xp-store-composition = ["toml"]
 # Whether to run the integration tests.
 # Requires the following packages in $PATH:
 # cbtemulator, google-cloud-bigtable-tool
diff --git a/tvix/store/build.rs b/tvix/store/build.rs
index 809fa29578b5..4e0371311220 100644
--- a/tvix/store/build.rs
+++ b/tvix/store/build.rs
@@ -12,24 +12,20 @@ fn main() -> Result<()> {
         builder = builder.file_descriptor_set_path(descriptor_path);
     };
 
-    // https://github.com/hyperium/tonic/issues/908
-    let mut config = prost_build::Config::new();
-    config.bytes(["."]);
-    config.extern_path(".tvix.castore.v1", "::tvix_castore::proto");
-
     builder
         .build_server(true)
         .build_client(true)
         .emit_rerun_if_changed(false)
-        .compile_with_config(
-            config,
+        .bytes(["."])
+        .extern_path(".tvix.castore.v1", "::tvix_castore::proto")
+        .compile(
             &[
                 "tvix/store/protos/pathinfo.proto",
                 "tvix/store/protos/rpc_pathinfo.proto",
             ],
             // If we are in running `cargo build` manually, using `../..` works fine,
             // but in case we run inside a nix build, we need to instead point PROTO_ROOT
-            // to a sparseTree containing that structure.
+            // to a custom tree containing that structure.
             &[match std::env::var_os("PROTO_ROOT") {
                 Some(proto_root) => proto_root.to_str().unwrap().to_owned(),
                 None => "../..".to_string(),
diff --git a/tvix/store/default.nix b/tvix/store/default.nix
index ad47994f2446..863ddb6de23f 100644
--- a/tvix/store/default.nix
+++ b/tvix/store/default.nix
@@ -1,4 +1,4 @@
-{ depot, pkgs, ... }:
+{ depot, pkgs, lib, ... }:
 
 let
   mkImportCheck = p: expectedPath: {
@@ -22,31 +22,35 @@ let
   };
 in
 
-(depot.tvix.crates.workspaceMembers.tvix-store.build.override {
+(depot.tvix.crates.workspaceMembers.tvix-store.build.override (old: {
   runTests = true;
   testPreRun = ''
-    export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
+    export SSL_CERT_FILE=/dev/null
   '';
-
-  # enable some optional features.
-  features = [ "default" "cloud" ]
-    # virtiofs feature currently fails to build on Darwin.
-    ++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs";
-}).overrideAttrs (_: {
-  meta.ci.targets = [ "integration-tests" ];
-  meta.ci.extraSteps = {
-    import-docs = (mkImportCheck "tvix/store/docs" ./docs);
+  features = old.features
+    # virtiofs feature currently fails to build on Darwin
+    ++ lib.optional pkgs.stdenv.isLinux "virtiofs";
+})).overrideAttrs (old: rec {
+  meta.ci = {
+    targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru);
+    extraSteps.import-docs = (mkImportCheck "tvix/docs/src/store" ../docs/src/store);
   };
-  passthru.integration-tests = depot.tvix.crates.workspaceMembers.tvix-store.build.override {
-    runTests = true;
-    testPreRun = ''
-      export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
-      export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}"
+  passthru = old.passthru // (depot.tvix.utils.mkFeaturePowerset {
+    inherit (old) crateName;
+    features = ([ "cloud" "fuse" "otlp" "tonic-reflection" "xp-store-composition" ]
+      # virtiofs feature currently fails to build on Darwin
+      ++ lib.optional pkgs.stdenv.isLinux "virtiofs");
+    override.testPreRun = ''
+      export SSL_CERT_FILE=/dev/null
     '';
-
-    # enable some optional features.
-    features = [ "default" "cloud" "integration" ]
-      # virtiofs feature currently fails to build on Darwin.
-      ++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs";
+  }) // {
+    integration-tests = depot.tvix.crates.workspaceMembers.${old.crateName}.build.override (old: {
+      runTests = true;
+      testPreRun = ''
+        export SSL_CERT_FILE=/dev/null
+        export PATH="$PATH:${pkgs.lib.makeBinPath [ pkgs.cbtemulator pkgs.google-cloud-bigtable-tool ]}"
+      '';
+      features = old.features ++ [ "integration" ];
+    });
   };
 })
diff --git a/tvix/store/docs/api.md b/tvix/store/docs/api.md
deleted file mode 100644
index 01e72671a743..000000000000
--- a/tvix/store/docs/api.md
+++ /dev/null
@@ -1,288 +0,0 @@
-tvix-[ca]store API
-==============
-
-This document outlines the design of the API exposed by tvix-castore and tvix-
-store, as well as other implementations of this store protocol.
-
-This document is meant to be read side-by-side with
-[castore.md](../../tvix-castore/docs/castore.md) which describes the data model
-in more detail.
-
-The store API has four main consumers:
-
-1. The evaluator (or more correctly, the CLI/coordinator, in the Tvix
-   case) communicates with the store to:
-
-   * Upload files and directories (e.g. from `builtins.path`, or `src = ./path`
-     Nix expressions).
-   * Read files from the store where necessary (e.g. when `nixpkgs` is
-     located in the store, or for IFD).
-
-2. The builder communicates with the store to:
-
-   * Upload files and directories after a build, to persist build artifacts in
-     the store.
-
-3. Tvix clients (such as users that have Tvix installed, or, depending
-   on perspective, builder environments) expect the store to
-   "materialise" on disk to provide a directory layout with store
-   paths.
-
-4. Stores may communicate with other stores, to substitute already built store
-   paths, i.e. a store acts as a binary cache for other stores.
-
-The store API attempts to reuse parts of its API between these three
-consumers by making similarities explicit in the protocol. This leads
-to a protocol that is slightly more complex than a simple "file
-upload/download"-system, but at significantly greater efficiency, both in terms
-of deduplication opportunities as well as granularity.
-
-## The Store model
-
-Contents inside a tvix-store can be grouped into three different message types:
-
- * Blobs
- * Directories
- * PathInfo (see further down)
-
-(check `castore.md` for more detailed field descriptions)
-
-### Blobs
-A blob object contains the literal file contents of regular (or executable)
-files.
-
-### Directory
-A directory object describes the direct children of a directory.
-
-It contains:
- - name of child (regular or executable) files, and their [blake3][blake3] hash.
- - name of child symlinks, and their target (as string)
- - name of child directories, and their [blake3][blake3] hash (forming a Merkle DAG)
-
-### Content-addressed Store Model
-For example, lets consider a directory layout like this, with some
-imaginary hashes of file contents:
-
-```
-.
-├── file-1.txt        hash: 5891b5b522d5df086d0ff0b110fb
-└── nested
-    └── file-2.txt    hash: abc6fd595fc079d3114d4b71a4d8
-```
-
-A hash for the *directory* `nested` can be created by creating the `Directory`
-object:
-
-```json
-{
-  "directories": [],
-  "files": [{
-    "name": "file-2.txt",
-    "digest": "abc6fd595fc079d3114d4b71a4d8",
-    "size": 123,
-  }],
-  "symlink": [],
-}
-```
-
-And then hashing a serialised form of that data structure. We use the blake3
-hash of the canonical protobuf representation. Let's assume the hash was
-`ff0029485729bcde993720749232`.
-
-To create the directory object one layer up, we now refer to our `nested`
-directory object in `directories`, and to `file-1.txt` in `files`:
-
-```json
-{
-  "directories": [{
-    "name": "nested",
-    "digest": "ff0029485729bcde993720749232",
-    "size": 1,
-  }],
-  "files": [{
-    "name": "file-1.txt",
-    "digest": "5891b5b522d5df086d0ff0b110fb",
-    "size": 124,
-  }]
-}
-```
-
-This Merkle DAG of Directory objects, and flat store of blobs can be used to
-describe any file/directory/symlink inside a store path. Due to its content-
-addressed nature, it'll automatically deduplicate (re-)used (sub)directories,
-and allow substitution from any (untrusted) source.
-
-The thing that's now only missing is the metadata to map/"mount" from the
-content-addressed world to a physical path.
-
-### PathInfo
-As most paths in the Nix store currently are input-addressed [^input-addressed],
-and the `tvix-castore` data model is also not intrinsically using NAR hashes,
-we need something mapping from an input-addressed "output path hash" (or a Nix-
-specific content-addressed path) to the contents in the `tvix-castore` world.
-
-That's what `PathInfo` provides. It embeds the root node (Directory, File or
-Symlink) at a given store path.
-
-The root nodes' `name` field is populated with the (base)name inside
-`/nix/store`, so `xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-pname-1.2.3`.
-
-The `PathInfo` message also stores references to other store paths, and some
-more NARInfo-specific metadata (signatures, narhash, narsize).
-
-
-## API overview
-
-There's three different services:
-
-### BlobService
-`BlobService` can be used to store and retrieve blobs of data, used to host
-regular file contents.
-
-It is content-addressed, using [blake3][blake3]
-as a hashing function.
-
-As blake3 is a tree hash, there's an opportunity to do
-[verified streaming][bao] of parts of the file,
-which doesn't need to trust any more information than the root hash itself.
-Future extensions of the `BlobService` protocol will enable this.
-
-### DirectoryService
-`DirectoryService` allows lookups (and uploads) of `Directory` messages, and
-whole reference graphs of them.
-
-
-### PathInfoService
-The PathInfo service provides lookups from a store path hash to a `PathInfo`
-message.
-
-## Example flows
-
-Below there are some common use cases of tvix-store, and how the different
-services are used.
-
-###  Upload files and directories
-This is needed for `builtins.path` or `src = ./path` in Nix expressions (A), as
-well as for uploading build artifacts to a store (B).
-
-The path specified needs to be (recursively, BFS-style) traversed.
- * All file contents need to be hashed with blake3, and submitted to the
-   *BlobService* if not already present.
-   A reference to them needs to be added to the parent Directory object that's
-   constructed.
- * All symlinks need to be added to the parent directory they reside in.
- * Whenever a Directory has been fully traversed, it needs to be uploaded to
-   the *DirectoryService* and a reference to it needs to be added to the parent
-   Directory object.
-
-Most of the hashing / directory traversal/uploading can happen in parallel,
-as long as Directory objects only refer to Directory objects and Blobs that
-have already been uploaded.
-
-When reaching the root, a `PathInfo` object needs to be constructed.
-
- * In the case of content-addressed paths (A), the name of the root node is
-   based on the NAR representation of the contents.
-   It might make sense to be able to offload the NAR calculation to the store,
-   which can cache it.
- * In the case of build artifacts (B), the output path is input-addressed and
-   known upfront.
-
-Contrary to Nix, this has the advantage of not having to upload a lot of things
-to the store that didn't change.
-
-### Reading files from the store from the evaluator
-This is the case when `nixpkgs` is located in the store, or IFD in general.
-
-The store client asks the `PathInfoService` for the `PathInfo` of the output
-path in the request, and looks at the root node.
-
-If something other than the root of the store path is requested, like for
-example `maintainers/maintainer-list.nix`, the root_node Directory is inspected
-and potentially a chain of `Directory` objects requested from
-*DirectoryService*. [^n+1query].
-
-When the desired file is reached, the *BlobService* can be used to read the
-contents of this file, and return it back to the evaluator.
-
-FUTUREWORK: define how importing from symlinks should/does work.
-
-Contrary to Nix, this has the advantage of not having to copy all of the
-contents of a store path to the evaluating machine, but really only fetching
-the files the evaluator currently cares about.
-
-### Materializing store paths on disk
-This is useful for people running a Tvix-only system, or running builds on a
-"Tvix remote builder" in its own mount namespace.
-
-In a system with Nix installed, we can't simply manually "extract" things to
-`/nix/store`, as Nix assumes to own all writes to this location.
-In these use cases, we're probably better off exposing a tvix-store as a local
-binary cache (that's what `//tvix/nar-bridge-go` does).
-
-Assuming we are in an environment where we control `/nix/store` exclusively, a
-"realize to disk" would either "extract" things from the `tvix-store` to a
-filesystem, or expose a `FUSE`/`virtio-fs` filesystem.
-
-The latter is already implemented, and particularly interesting for (remote)
-build workloads, as build inputs can be realized on-demand, which saves copying
-around a lot of never- accessed files.
-
-In both cases, the API interactions are similar.
- * The *PathInfoService* is asked for the `PathInfo` of the requested store path.
- * If everything should be "extracted", the *DirectoryService* is asked for all
-   `Directory` objects in the closure, the file structure is created, all Blobs
-   are downloaded and placed in their corresponding location and all symlinks
-   are created accordingly.
- * If this is a FUSE filesystem, we can decide to only request a subset,
-   similar to the "Reading files from the store from the evaluator" use case,
-   even though it might make sense to keep all Directory objects around.
-   (See the caveat in "Trust model" though!)
-
-### Stores communicating with other stores
-The gRPC API exposed by the tvix-store allows composing multiple stores, and
-implementing some caching strategies, that store clients don't need to be aware
-of.
-
- * For example, a caching strategy could have a fast local tvix-store, that's
-   asked first and filled with data from a slower remote tvix-store.
-
- * Multiple stores could be asked for the same data, and whatever store returns
-   the right data first wins.
-
-
-## Trust model / Distribution
-As already described above, the only non-content-addressed service is the
-`PathInfo` service.
-
-This means, all other messages (such as `Blob` and `Directory` messages) can be
-substituted from many different, untrusted sources/mirrors, which will make
-plugging in additional substitution strategies like IPFS, local network
-neighbors super simple. That's also why it's living in the `tvix-castore` crate.
-
-As for `PathInfo`, we don't specify an additional signature mechanism yet, but
-carry the NAR-based signatures from Nix along.
-
-This means, if we don't trust a remote `PathInfo` object, we currently need to
-"stream" the NAR representation to validate these signatures.
-
-However, the slow part is downloading of NAR files, and considering we have
-more granularity available, we might only need to download some small blobs,
-rather than a whole NAR file.
-
-A future signature mechanism, that is only signing (parts of) the `PathInfo`
-message, which only points to content-addressed data will enable verified
-partial access into a store path, opening up opportunities for lazy filesystem
-access etc.
-
-
-
-[blake3]: https://github.com/BLAKE3-team/BLAKE3
-[bao]: https://github.com/oconnor663/bao
-[^input-addressed]: Nix hashes the A-Term representation of a .drv, after doing
-                    some replacements on refered Input Derivations to calculate
-                    output paths.
-[^n+1query]: This would expose an N+1 query problem. However it's not a problem
-             in practice, as there's usually always a "local" caching store in
-             the loop, and *DirectoryService* supports a recursive lookup for
-             all `Directory` children of a `Directory`
diff --git a/tvix/store/protos/default.nix b/tvix/store/protos/default.nix
index 56345d9338b2..005a2697e5e9 100644
--- a/tvix/store/protos/default.nix
+++ b/tvix/store/protos/default.nix
@@ -1,17 +1,12 @@
-{ depot, pkgs, ... }:
+{ depot, pkgs, lib, ... }:
 let
-  protos = depot.nix.sparseTree {
-    name = "store-protos";
-    root = depot.path.origSrc;
-    paths = [
-      # We need to include castore.proto (only), as it's referred.
-      ../../castore/protos/castore.proto
-      ./pathinfo.proto
-      ./rpc_pathinfo.proto
-      ../../../buf.yaml
-      ../../../buf.gen.yaml
-    ];
-  };
+  protos = lib.sourceByRegex depot.path.origSrc [
+    "buf.yaml"
+    "buf.gen.yaml"
+    # We need to include castore.proto (only), as it's referred.
+    "^tvix(/castore(/protos(/castore\.proto)?)?)?$"
+    "^tvix(/store(/protos(/.*\.proto)?)?)?$"
+  ];
 in
 depot.nix.readTree.drvTargets {
   inherit protos;
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 906d0ab5206a..b77a46dace7c 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -9,19 +9,16 @@ use serde::Deserialize;
 use serde::Serialize;
 use std::path::PathBuf;
 use std::sync::Arc;
-use tokio_listener::Listener;
-use tokio_listener::SystemOptions;
-use tokio_listener::UserOptions;
 use tonic::transport::Server;
-use tracing::info;
-use tracing::Level;
-use tracing_subscriber::EnvFilter;
-use tracing_subscriber::Layer;
-use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
+use tower::ServiceBuilder;
+use tower_http::trace::{DefaultMakeSpan, TraceLayer};
+use tracing::{info, info_span, instrument, Level, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
 use tvix_castore::import::fs::ingest_path;
 use tvix_store::nar::NarCalculationService;
 use tvix_store::proto::NarInfo;
 use tvix_store::proto::PathInfo;
+use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc};
 
 use tvix_castore::proto::blob_service_server::BlobServiceServer;
 use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
@@ -37,15 +34,6 @@ use tvix_store::pathinfoservice::make_fs;
 #[cfg(feature = "fuse")]
 use tvix_castore::fs::fuse::FuseDaemon;
 
-#[cfg(feature = "otlp")]
-use opentelemetry::KeyValue;
-#[cfg(feature = "otlp")]
-use opentelemetry_sdk::{
-    resource::{ResourceDetector, SdkProvidedResourceDetector},
-    trace::BatchConfig,
-    Resource,
-};
-
 #[cfg(feature = "virtiofs")]
 use tvix_castore::fs::virtiofs::start_virtiofs_daemon;
 
@@ -54,6 +42,11 @@ use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET;
 #[cfg(feature = "tonic-reflection")]
 use tvix_store::proto::FILE_DESCRIPTOR_SET;
 
+use mimalloc::MiMalloc;
+
+#[global_allocator]
+static GLOBAL: MiMalloc = MiMalloc;
+
 #[derive(Parser)]
 #[command(author, version, about, long_about = None)]
 struct Cli {
@@ -65,8 +58,8 @@ struct Cli {
     /// It's also possible to set `RUST_LOG` according to
     /// `tracing_subscriber::filter::EnvFilter`, which will always have
     /// priority.
-    #[arg(long)]
-    log_level: Option<Level>,
+    #[arg(long, default_value_t=Level::INFO)]
+    log_level: Level,
 
     #[command(subcommand)]
     command: Commands,
@@ -76,51 +69,26 @@ struct Cli {
 enum Commands {
     /// Runs the tvix-store daemon.
     Daemon {
-        #[arg(long, short = 'l')]
-        listen_address: Option<String>,
-
-        #[arg(
-            long,
-            env,
-            default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
-        )]
-        blob_service_addr: String,
-
-        #[arg(
-            long,
-            env,
-            default_value = "sled:///var/lib/tvix-store/directories.sled"
-        )]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
-        path_info_service_addr: String,
+        /// The address to listen on.
+        #[clap(flatten)]
+        listen_args: tokio_listener::ListenerAddressLFlag,
+
+        #[clap(flatten)]
+        service_addrs: ServiceUrls,
     },
     /// Imports a list of paths into the store, print the store path for each of them.
     Import {
         #[clap(value_name = "PATH")]
         paths: Vec<PathBuf>,
 
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
     },
 
     /// Copies a list of store paths on the system into tvix-store.
     Copy {
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
 
         /// A path pointing to a JSON file produced by the Nix
         /// `__structuredAttrs` containing reference graph information provided
@@ -140,14 +108,8 @@ enum Commands {
         #[clap(value_name = "PATH")]
         dest: PathBuf,
 
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
 
         /// Number of FUSE threads to spawn.
         #[arg(long, env, default_value_t = default_threads())]
@@ -176,14 +138,8 @@ enum Commands {
         #[clap(value_name = "PATH")]
         socket: PathBuf,
 
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
 
         /// Whether to list elements at the root of the mount point.
         /// This is useful if your PathInfoService doesn't provide an
@@ -197,113 +153,41 @@ enum Commands {
     },
 }
 
-#[cfg(all(feature = "fuse", not(target_os = "macos")))]
+#[cfg(feature = "fuse")]
 fn default_threads() -> usize {
     std::thread::available_parallelism()
         .map(|threads| threads.into())
         .unwrap_or(4)
 }
-// On MacFUSE only a single channel will receive ENODEV when the file system is
-// unmounted and so all the other channels will block forever.
-// See https://github.com/osxfuse/osxfuse/issues/974
-#[cfg(all(feature = "fuse", target_os = "macos"))]
-fn default_threads() -> usize {
-    1
-}
-
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    let cli = Cli::parse();
-
-    // configure log settings
-    let level = cli.log_level.unwrap_or(Level::INFO);
-
-    // Set up the tracing subscriber.
-    let subscriber = tracing_subscriber::registry().with(
-        tracing_subscriber::fmt::Layer::new()
-            .with_writer(std::io::stderr)
-            .compact()
-            .with_filter(
-                EnvFilter::builder()
-                    .with_default_directive(level.into())
-                    .from_env()
-                    .expect("invalid RUST_LOG"),
-            ),
-    );
-
-    // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
-    // then init the registry.
-    // If the feature is feature-flagged out, just init without adding the layer.
-    // It's necessary to do this separately, as every with() call chains the
-    // layer into the type of the registry.
-    #[cfg(feature = "otlp")]
-    {
-        let subscriber = if cli.otlp {
-            let tracer = opentelemetry_otlp::new_pipeline()
-                .tracing()
-                .with_exporter(opentelemetry_otlp::new_exporter().tonic())
-                .with_batch_config(BatchConfig::default())
-                .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
-                    // use SdkProvidedResourceDetector.detect to detect resources,
-                    // but replace the default service name with our default.
-                    // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
-                    let resources =
-                        SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
-                    // SdkProvidedResourceDetector currently always sets
-                    // `service.name`, but we don't like its default.
-                    if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
-                        resources.merge(&Resource::new([KeyValue::new(
-                            "service.name",
-                            "tvix.store",
-                        )]))
-                    } else {
-                        resources
-                    }
-                }))
-                .install_batch(opentelemetry_sdk::runtime::Tokio)?;
-
-            // Create a tracing layer with the configured tracer
-            let layer = tracing_opentelemetry::layer().with_tracer(tracer);
-
-            subscriber.with(Some(layer))
-        } else {
-            subscriber.with(None)
-        };
-
-        subscriber.try_init()?;
-    }
-
-    // Init the registry (when otlp is not enabled)
-    #[cfg(not(feature = "otlp"))]
-    {
-        subscriber.try_init()?;
-    }
 
+#[instrument(skip_all)]
+async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
     match cli.command {
         Commands::Daemon {
-            listen_address,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            listen_args,
+            service_addrs,
         } => {
             // initialize stores
             let (blob_service, directory_service, path_info_service, nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
-
-            let listen_address = listen_address
-                .unwrap_or_else(|| "[::]:8000".to_string())
-                .parse()
-                .unwrap();
+                tvix_store::utils::construct_services(service_addrs).await?;
+
+            let mut server = Server::builder().layer(
+                ServiceBuilder::new()
+                    .layer(
+                        TraceLayer::new_for_grpc().make_span_with(
+                            DefaultMakeSpan::new()
+                                .level(Level::INFO)
+                                .include_headers(true),
+                        ),
+                    )
+                    .map_request(tvix_tracing::propagate::tonic::accept_trace),
+            );
 
-            let mut server = Server::builder();
+            let (_health_reporter, health_service) = tonic_health::server::health_reporter();
 
             #[allow(unused_mut)]
             let mut router = server
+                .add_service(health_service)
                 .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
                     blob_service,
                 )))
@@ -311,47 +195,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                     GRPCDirectoryServiceWrapper::new(directory_service),
                 ))
                 .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
-                    Arc::from(path_info_service),
+                    path_info_service,
                     nar_calculation_service,
                 )));
 
             #[cfg(feature = "tonic-reflection")]
             {
-                let reflection_svc = tonic_reflection::server::Builder::configure()
-                    .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
-                    .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
-                    .build()?;
-                router = router.add_service(reflection_svc);
+                router = router.add_service(
+                    tonic_reflection::server::Builder::configure()
+                        .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                        .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                        .build_v1alpha()?,
+                );
+                router = router.add_service(
+                    tonic_reflection::server::Builder::configure()
+                        .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                        .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                        .build_v1()?,
+                );
             }
 
-            info!(listen_address=%listen_address, "starting daemon");
+            let listen_address = &listen_args.listen_address.unwrap_or_else(|| {
+                "[::]:8000"
+                    .parse()
+                    .expect("invalid fallback listen address")
+            });
 
-            let listener = Listener::bind(
-                &listen_address,
-                &SystemOptions::default(),
-                &UserOptions::default(),
+            let listener = tokio_listener::Listener::bind(
+                listen_address,
+                &Default::default(),
+                &listen_args.listener_options,
             )
             .await?;
 
+            info!(listen_address=%listen_address, "starting daemon");
+
             router.serve_with_incoming(listener).await?;
         }
         Commands::Import {
             paths,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
         } => {
             // FUTUREWORK: allow flat for single files?
             let (blob_service, directory_service, path_info_service, nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+                tvix_store::utils::construct_services(service_addrs).await?;
 
-            // Arc PathInfoService and NarCalculationService, as we clone it .
-            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            // Arc NarCalculationService, as we clone it .
             let nar_calculation_service: Arc<dyn NarCalculationService> =
                 nar_calculation_service.into();
 
@@ -388,18 +277,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             try_join_all(tasks).await?;
         }
         Commands::Copy {
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
             reference_graph_path,
         } => {
             let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+                tvix_store::utils::construct_services(service_addrs).await?;
 
             // Parse the file at reference_graph_path.
             let reference_graph_json = tokio::fs::read(&reference_graph_path).await?;
@@ -413,18 +295,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             let reference_graph: ReferenceGraph<'_> =
                 serde_json::from_slice(reference_graph_json.as_slice())?;
 
-            // Arc the PathInfoService, as we clone it .
-            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            let lookups_span = info_span!(
+                "lookup pathinfos",
+                "indicatif.pb_show" = tracing::field::Empty
+            );
+            lookups_span.pb_set_length(reference_graph.closure.len() as u64);
+            lookups_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+            lookups_span.pb_start();
 
             // From our reference graph, lookup all pathinfos that might exist.
             let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
                 .map(|elem| {
                     let path_info_service = path_info_service.clone();
                     async move {
-                        path_info_service
+                        let resp = path_info_service
                             .get(*elem.path.digest())
                             .await
-                            .map(|resp| (elem, resp))
+                            .map(|resp| (elem, resp));
+
+                        Span::current().pb_inc(1);
+                        resp
                     }
                 })
                 .buffer_unordered(50)
@@ -449,10 +339,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                     async move {
                         // Ingest the given path.
 
-                        ingest_path(
+                        ingest_path::<_, _, _, &[u8]>(
                             blob_service,
                             directory_service,
                             PathBuf::from(elem.path.to_absolute_path()),
+                            None,
                         )
                         .await
                         .map(|root_node| (elem, root_node))
@@ -468,9 +359,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 // Create and upload a PathInfo pointing to the root_node,
                 // annotated with information we have from the reference graph.
                 let path_info = PathInfo {
-                    node: Some(tvix_castore::proto::Node {
-                        node: Some(root_node),
-                    }),
+                    node: Some(tvix_castore::proto::Node::from_name_and_node(
+                        elem.path.to_string().into(),
+                        root_node,
+                    )),
                     references: Vec::from_iter(
                         elem.references.iter().map(|e| e.digest().to_vec().into()),
                     ),
@@ -492,27 +384,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
         #[cfg(feature = "fuse")]
         Commands::Mount {
             dest,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
             list_root,
             threads,
             allow_other,
             show_xattr,
         } => {
             let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+                tvix_store::utils::construct_services(service_addrs).await?;
 
-            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
+            let fuse_daemon = tokio::task::spawn_blocking(move || {
                 let fs = make_fs(
                     blob_service,
                     directory_service,
-                    Arc::from(path_info_service),
+                    path_info_service,
                     list_root,
                     show_xattr,
                 );
@@ -522,39 +407,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             })
             .await??;
 
-            // grab a handle to unmount the file system, and register a signal
-            // handler.
-            tokio::spawn(async move {
-                tokio::signal::ctrl_c().await.unwrap();
-                info!("interrupt received, unmounting…");
-                tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
-                info!("unmount occured, terminating…");
-                Ok::<_, std::io::Error>(())
-            })
-            .await??;
+            // Wait for a ctrl_c and then call fuse_daemon.unmount().
+            tokio::spawn({
+                let fuse_daemon = fuse_daemon.clone();
+                async move {
+                    tokio::signal::ctrl_c().await.unwrap();
+                    info!("interrupt received, unmounting…");
+                    tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
+                    info!("unmount occured, terminating…");
+                    Ok::<_, std::io::Error>(())
+                }
+            });
+
+            // Wait for the server to finish, which can either happen through it
+            // being unmounted externally, or receiving a signal invoking the
+            // handler above.
+            tokio::task::spawn_blocking(move || fuse_daemon.wait()).await?
         }
         #[cfg(feature = "virtiofs")]
         Commands::VirtioFs {
             socket,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
             list_root,
             show_xattr,
         } => {
             let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+                tvix_store::utils::construct_services(service_addrs).await?;
 
             tokio::task::spawn_blocking(move || {
                 let fs = make_fs(
                     blob_service,
                     directory_service,
-                    Arc::from(path_info_service),
+                    path_info_service,
                     list_root,
                     show_xattr,
                 );
@@ -567,3 +451,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     };
     Ok(())
 }
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+    let cli = Cli::parse();
+
+    let tracing_handle = {
+        let mut builder = tvix_tracing::TracingBuilder::default();
+        builder = builder.level(cli.log_level).enable_progressbar();
+        #[cfg(feature = "otlp")]
+        {
+            if cli.otlp {
+                builder = builder.enable_otlp("tvix.store");
+            }
+        }
+        builder.build()?
+    };
+
+    tokio::select! {
+        res = tokio::signal::ctrl_c() => {
+            res?;
+            if let Err(e) = tracing_handle.force_shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            Ok(())
+        },
+        res = run_cli(cli) => {
+            if let Err(e) = tracing_handle.shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            res
+        }
+    }
+}
diff --git a/tvix/store/src/composition.rs b/tvix/store/src/composition.rs
new file mode 100644
index 000000000000..a32f22cf7796
--- /dev/null
+++ b/tvix/store/src/composition.rs
@@ -0,0 +1,22 @@
+use lazy_static::lazy_static;
+
+pub use tvix_castore::composition::*;
+
+lazy_static! {
+    /// The provided registry of tvix_store, which has all the builtin
+    /// tvix_castore (BlobStore/DirectoryStore) and tvix_store
+    /// (PathInfoService) implementations.
+    pub static ref REG: Registry = {
+        let mut reg = Default::default();
+        add_default_services(&mut reg);
+        reg
+    };
+}
+
+/// Register the builtin services of tvix_castore and tvix_store with the given
+/// registry. This is useful for creating your own registry with the builtin
+/// types _and_ extra third party types.
+pub fn add_default_services(reg: &mut Registry) {
+    tvix_castore::composition::add_default_services(reg);
+    crate::pathinfoservice::register_pathinfo_services(reg);
+}
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index 888380bca9a0..2c2b205016a1 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -1,13 +1,14 @@
+use bstr::ByteSlice;
 use std::path::Path;
 use tracing::{debug, instrument};
 use tvix_castore::{
-    blobservice::BlobService, directoryservice::DirectoryService, import::fs::ingest_path,
-    proto::node::Node, B3Digest,
+    blobservice::BlobService, directoryservice::DirectoryService, import::fs::ingest_path, Node,
+    PathComponent,
 };
 
 use nix_compat::{
     nixhash::{CAHash, NixHash},
-    store_path::{self, StorePath},
+    store_path::{self, StorePathRef},
 };
 
 use crate::{
@@ -27,29 +28,29 @@ impl From<CAHash> for nar_info::Ca {
     }
 }
 
-pub fn log_node(node: &Node, path: &Path) {
+pub fn log_node(name: &[u8], node: &Node, path: &Path) {
     match node {
-        Node::Directory(directory_node) => {
+        Node::Directory { digest, .. } => {
             debug!(
                 path = ?path,
-                name = ?directory_node.name,
-                digest = %B3Digest::try_from(directory_node.digest.clone()).unwrap(),
+                name = %name.as_bstr(),
+                digest = %digest,
                 "import successful",
             )
         }
-        Node::File(file_node) => {
+        Node::File { digest, .. } => {
             debug!(
                 path = ?path,
-                name = ?file_node.name,
-                digest = %B3Digest::try_from(file_node.digest.clone()).unwrap(),
+                name = %name.as_bstr(),
+                digest = %digest,
                 "import successful"
             )
         }
-        Node::Symlink(symlink_node) => {
+        Node::Symlink { target } => {
             debug!(
                 path = ?path,
-                name = ?symlink_node.name,
-                target = ?symlink_node.target,
+                name = %name.as_bstr(),
+                target = ?target,
                 "import successful"
             )
         }
@@ -81,14 +82,15 @@ pub fn path_to_name(path: &Path) -> std::io::Result<&str> {
 pub fn derive_nar_ca_path_info(
     nar_size: u64,
     nar_sha256: [u8; 32],
-    ca: Option<CAHash>,
+    ca: Option<&CAHash>,
+    name: bytes::Bytes,
     root_node: Node,
 ) -> PathInfo {
     // assemble the [crate::proto::PathInfo] object.
     PathInfo {
-        node: Some(tvix_castore::proto::Node {
-            node: Some(root_node),
-        }),
+        node: Some(tvix_castore::proto::Node::from_name_and_node(
+            name, root_node,
+        )),
         // There's no reference scanning on path contents ingested like this.
         references: vec![],
         narinfo: Some(NarInfo {
@@ -102,8 +104,9 @@ pub fn derive_nar_ca_path_info(
     }
 }
 
-/// Ingest the given path `path` and register the resulting output path in the
-/// [`PathInfoService`] as a recursive fixed output NAR.
+/// Ingest the contents at the given path `path` into castore, and registers the
+/// resulting root node in the passed PathInfoService, using the "NAR sha256
+/// digest" and the passed name for output path calculation.
 #[instrument(skip_all, fields(store_name=name, path=?path), err)]
 pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>(
     path: P,
@@ -112,7 +115,7 @@ pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>(
     directory_service: DS,
     path_info_service: PS,
     nar_calculation_service: NS,
-) -> Result<StorePath, std::io::Error>
+) -> Result<StorePathRef, std::io::Error>
 where
     P: AsRef<Path> + std::fmt::Debug,
     BS: BlobService + Clone,
@@ -120,9 +123,10 @@ where
     PS: AsRef<dyn PathInfoService>,
     NS: NarCalculationService,
 {
-    let root_node = ingest_path(blob_service, directory_service, path.as_ref())
-        .await
-        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
+    let root_node =
+        ingest_path::<_, _, _, &[u8]>(blob_service, directory_service, path.as_ref(), None)
+            .await
+            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
 
     // Ask for the NAR size and sha256
     let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?;
@@ -137,23 +141,28 @@ where
         )
     })?;
 
-    // assemble a new root_node with a name that is derived from the nar hash.
-    let root_node = root_node.rename(output_path.to_string().into_bytes().into());
-    log_node(&root_node, path.as_ref());
+    let name: PathComponent = output_path
+        .to_string()
+        .as_str()
+        .try_into()
+        .expect("Tvix bug: StorePath must be PathComponent");
+
+    log_node(name.as_ref(), &root_node, path.as_ref());
 
     let path_info = derive_nar_ca_path_info(
         nar_size,
         nar_sha256,
-        Some(CAHash::Nar(NixHash::Sha256(nar_sha256))),
+        Some(&CAHash::Nar(NixHash::Sha256(nar_sha256))),
+        name.into(),
         root_node,
     );
 
     // This new [`PathInfo`] that we get back from there might contain additional signatures or
     // information set by the service itself. In this function, we silently swallow it because
-    // callers doesn't really need it.
+    // callers don't really need it.
     let _path_info = path_info_service.as_ref().put(path_info).await?;
 
-    Ok(output_path.to_owned())
+    Ok(output_path)
 }
 
 #[cfg(test)]
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index 8c32aaf885e8..81a77cd978a2 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,3 +1,4 @@
+pub mod composition;
 pub mod import;
 pub mod nar;
 pub mod pathinfoservice;
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 3d7c50014aa8..b9a15fe71384 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,15 +1,56 @@
 use nix_compat::nar::reader::r#async as nar_reader;
-use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
+use sha2::Digest;
+use tokio::{
+    io::{AsyncBufRead, AsyncRead},
+    sync::mpsc,
+    try_join,
+};
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
-    import::{ingest_entries, IngestionEntry, IngestionError},
-    proto::{node::Node, NamedNode},
-    PathBuf,
+    import::{
+        blobs::{self, ConcurrentBlobUploader},
+        ingest_entries, IngestionEntry, IngestionError,
+    },
+    Node, PathBuf,
 };
 
 /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
 /// interacting with a [BlobService] and [DirectoryService].
+/// Returns the castore root node, as well as the sha256 and size of the NAR
+/// contents ingested.
+pub async fn ingest_nar_and_hash<R, BS, DS>(
+    blob_service: BS,
+    directory_service: DS,
+    r: &mut R,
+) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
+where
+    R: AsyncRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
+{
+    let mut nar_hash = sha2::Sha256::new();
+    let mut nar_size = 0;
+
+    // Assemble NarHash and NarSize as we read bytes.
+    let r = tokio_util::io::InspectReader::new(r, |b| {
+        nar_size += b.len() as u64;
+        use std::io::Write;
+        nar_hash.write_all(b).unwrap();
+    });
+
+    // HACK: InspectReader doesn't implement AsyncBufRead.
+    // See if this can be propagated through and we can then require our input
+    // reader to be buffered too.
+    let mut r = tokio::io::BufReader::new(r);
+
+    let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
+
+    Ok((root_node, nar_hash.finalize().into(), nar_size))
+}
+
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
 /// It returns the castore root node or an error.
 pub async fn ingest_nar<R, BS, DS>(
     blob_service: BS,
@@ -18,7 +59,7 @@ pub async fn ingest_nar<R, BS, DS>(
 ) -> Result<Node, IngestionError<Error>>
 where
     R: AsyncBufRead + Unpin + Send,
-    BS: BlobService + Clone,
+    BS: BlobService + Clone + 'static,
     DS: DirectoryService,
 {
     // open the NAR for reading.
@@ -29,14 +70,22 @@ where
     let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
 
     let produce = async move {
+        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
+
         let res = produce_nar_inner(
-            blob_service,
+            &mut blob_uploader,
             root_node,
             "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
             tx.clone(),
         )
         .await;
 
+        if let Err(err) = blob_uploader.join().await {
+            tx.send(Err(err.into()))
+                .await
+                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        }
+
         tx.send(res)
             .await
             .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
@@ -48,19 +97,17 @@ where
 
     let (_, node) = try_join!(produce, consume)?;
 
-    // remove the fake "root" name again
-    debug_assert_eq!(&node.get_name(), b"root");
-    Ok(node.rename("".into()))
+    Ok(node)
 }
 
 async fn produce_nar_inner<BS>(
-    blob_service: BS,
+    blob_uploader: &mut ConcurrentBlobUploader<BS>,
     node: nar_reader::Node<'_, '_>,
     path: PathBuf,
     tx: mpsc::Sender<Result<IngestionEntry, Error>>,
 ) -> Result<IngestionEntry, Error>
 where
-    BS: BlobService + Clone,
+    BS: BlobService + Clone + 'static,
 {
     Ok(match node {
         nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
@@ -68,12 +115,8 @@ where
             executable,
             mut reader,
         } => {
-            let (digest, size) = {
-                let mut blob_writer = blob_service.open_write().await;
-                let size = tokio::io::copy_buf(&mut reader, &mut blob_writer).await?;
-
-                (blob_writer.close().await?, size)
-            };
+            let size = reader.len();
+            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
 
             IngestionEntry::Regular {
                 path,
@@ -91,7 +134,7 @@ where
                     .expect("Tvix bug: failed to join name");
 
                 let entry = Box::pin(produce_nar_inner(
-                    blob_service.clone(),
+                    blob_uploader,
                     entry.node,
                     path,
                     tx.clone(),
@@ -112,6 +155,9 @@ where
 pub enum Error {
     #[error(transparent)]
     IO(#[from] std::io::Error),
+
+    #[error(transparent)]
+    BlobUpload(#[from] blobs::Error),
 }
 
 #[cfg(test)]
@@ -128,7 +174,7 @@ mod test {
         DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS,
         HELLOWORLD_BLOB_DIGEST,
     };
-    use tvix_castore::proto as castorepb;
+    use tvix_castore::{Directory, Node};
 
     use crate::tests::fixtures::{
         blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD,
@@ -150,10 +196,9 @@ mod test {
         .expect("must parse");
 
         assert_eq!(
-            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
-                name: "".into(), // name must be empty
-                target: "/nix/store/somewhereelse".into(),
-            }),
+            Node::Symlink {
+                target: "/nix/store/somewhereelse".try_into().unwrap()
+            },
             root_node
         );
     }
@@ -173,12 +218,11 @@ mod test {
         .expect("must parse");
 
         assert_eq!(
-            castorepb::node::Node::File(castorepb::FileNode {
-                name: "".into(), // name must be empty
-                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            Node::File {
+                digest: HELLOWORLD_BLOB_DIGEST.clone(),
                 size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
                 executable: false,
-            }),
+            },
             root_node
         );
 
@@ -201,11 +245,10 @@ mod test {
         .expect("must parse");
 
         assert_eq!(
-            castorepb::node::Node::Directory(castorepb::DirectoryNode {
-                name: "".into(), // name must be empty
-                digest: DIRECTORY_COMPLICATED.digest().into(),
-                size: DIRECTORY_COMPLICATED.size(),
-            }),
+            Node::Directory {
+                digest: DIRECTORY_COMPLICATED.digest(),
+                size: DIRECTORY_COMPLICATED.size()
+            },
             root_node,
         );
 
@@ -213,7 +256,7 @@ mod test {
         assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
 
         // directoryservice must contain the directories, at least with get_recursive.
-        let resp: Result<Vec<castorepb::Directory>, _> = directory_service
+        let resp: Result<Vec<Directory>, _> = directory_service
             .get_recursive(&DIRECTORY_COMPLICATED.digest())
             .collect()
             .await;
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
index 164748a655e8..86505bcb0c07 100644
--- a/tvix/store/src/nar/mod.rs
+++ b/tvix/store/src/nar/mod.rs
@@ -3,20 +3,20 @@ use tvix_castore::B3Digest;
 
 mod import;
 mod renderer;
+pub mod seekable;
 pub use import::ingest_nar;
+pub use import::ingest_nar_and_hash;
 pub use renderer::calculate_size_and_sha256;
 pub use renderer::write_nar;
 pub use renderer::SimpleRenderer;
-use tvix_castore::proto as castorepb;
+use tvix_castore::Node;
 
 #[async_trait]
 pub trait NarCalculationService: Send + Sync {
     /// Return the nar size and nar sha256 digest for a given root node.
     /// This can be used to calculate NAR-based output paths.
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), tvix_castore::Error>;
+    async fn calculate_nar(&self, root_node: &Node)
+        -> Result<(u64, [u8; 32]), tvix_castore::Error>;
 }
 
 #[async_trait]
@@ -26,7 +26,7 @@ where
 {
     async fn calculate_nar(
         &self,
-        root_node: &castorepb::node::Node,
+        root_node: &Node,
     ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
         self.as_ref().calculate_nar(root_node).await
     }
@@ -38,13 +38,13 @@ pub enum RenderError {
     #[error("failure talking to a backing store client: {0}")]
     StoreError(#[source] std::io::Error),
 
-    #[error("unable to find directory {}, referred from {:?}", .0, .1)]
+    #[error("unable to find directory {0}, referred from {1:?}")]
     DirectoryNotFound(B3Digest, bytes::Bytes),
 
-    #[error("unable to find blob {}, referred from {:?}", .0, .1)]
+    #[error("unable to find blob {0}, referred from {1:?}")]
     BlobNotFound(B3Digest, bytes::Bytes),
 
-    #[error("unexpected size in metadata for blob {}, referred from {:?} returned, expected {}, got {}", .0, .1, .2, .3)]
+    #[error("unexpected size in metadata for blob {0}, referred from {1:?} returned, expected {2}, got {3}")]
     UnexpectedBlobMeta(B3Digest, bytes::Bytes, u32, u32),
 
     #[error("failure using the NAR writer: {0}")]
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index efd67671db70..afd85f267c6c 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -6,11 +6,9 @@ use nix_compat::nar::writer::r#async as nar_writer;
 use sha2::{Digest, Sha256};
 use tokio::io::{self, AsyncWrite, BufReader};
 use tonic::async_trait;
-use tvix_castore::{
-    blobservice::BlobService,
-    directoryservice::DirectoryService,
-    proto::{self as castorepb, NamedNode},
-};
+use tracing::{instrument, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Node};
 
 pub struct SimpleRenderer<BS, DS> {
     blob_service: BS,
@@ -34,7 +32,7 @@ where
 {
     async fn calculate_nar(
         &self,
-        root_node: &castorepb::node::Node,
+        root_node: &Node,
     ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
         calculate_size_and_sha256(
             root_node,
@@ -48,8 +46,9 @@ where
 
 /// Invoke [write_nar], and return the size and sha256 digest of the produced
 /// NAR output.
+#[instrument(skip_all, fields(indicatif.pb_show=1))]
 pub async fn calculate_size_and_sha256<BS, DS>(
-    root_node: &castorepb::node::Node,
+    root_node: &Node,
     blob_service: BS,
     directory_service: DS,
 ) -> Result<(u64, [u8; 32]), RenderError>
@@ -60,6 +59,10 @@ where
     let mut h = Sha256::new();
     let mut cw = CountWrite::from(&mut h);
 
+    let span = Span::current();
+    span.pb_set_message("Calculating NAR");
+    span.pb_start();
+
     write_nar(
         // The hasher doesn't speak async. It doesn't
         // actually do any I/O, so it's fine to wrap.
@@ -73,13 +76,13 @@ where
     Ok((cw.count(), h.finalize().into()))
 }
 
-/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
+/// Accepts a [Node] pointing to the root of a (store) path,
 /// and uses the passed blob_service and directory_service to perform the
 /// necessary lookups as it traverses the structure.
 /// The contents in NAR serialization are writen to the passed [AsyncWrite].
 pub async fn write_nar<W, BS, DS>(
     mut w: W,
-    proto_root_node: &castorepb::node::Node,
+    root_node: &Node,
     blob_service: BS,
     directory_service: DS,
 ) -> Result<(), RenderError>
@@ -95,7 +98,8 @@ where
 
     walk_node(
         nar_root_node,
-        proto_root_node,
+        root_node,
+        b"",
         blob_service,
         directory_service,
     )
@@ -108,7 +112,8 @@ where
 /// This consumes the node.
 async fn walk_node<BS, DS>(
     nar_node: nar_writer::Node<'_, '_>,
-    proto_node: &castorepb::node::Node,
+    castore_node: &Node,
+    name: &[u8],
     blob_service: BS,
     directory_service: DS,
 ) -> Result<(BS, DS), RenderError>
@@ -116,24 +121,20 @@ where
     BS: BlobService + Send,
     DS: DirectoryService + Send,
 {
-    match proto_node {
-        castorepb::node::Node::Symlink(proto_symlink_node) => {
+    match castore_node {
+        Node::Symlink { target, .. } => {
             nar_node
-                .symlink(&proto_symlink_node.target)
+                .symlink(target.as_ref())
                 .await
                 .map_err(RenderError::NARWriterError)?;
         }
-        castorepb::node::Node::File(proto_file_node) => {
-            let digest_len = proto_file_node.digest.len();
-            let digest = proto_file_node.digest.clone().try_into().map_err(|_| {
-                RenderError::StoreError(io::Error::new(
-                    io::ErrorKind::Other,
-                    format!("invalid digest len {} in file node", digest_len),
-                ))
-            })?;
-
+        Node::File {
+            digest,
+            size,
+            executable,
+        } => {
             let mut blob_reader = match blob_service
-                .open_read(&digest)
+                .open_read(digest)
                 .await
                 .map_err(RenderError::StoreError)?
             {
@@ -145,39 +146,23 @@ where
             }?;
 
             nar_node
-                .file(
-                    proto_file_node.executable,
-                    proto_file_node.size,
-                    &mut blob_reader,
-                )
+                .file(*executable, *size, &mut blob_reader)
                 .await
                 .map_err(RenderError::NARWriterError)?;
         }
-        castorepb::node::Node::Directory(proto_directory_node) => {
-            let digest_len = proto_directory_node.digest.len();
-            let digest = proto_directory_node
-                .digest
-                .clone()
-                .try_into()
-                .map_err(|_| {
-                    RenderError::StoreError(io::Error::new(
-                        io::ErrorKind::InvalidData,
-                        format!("invalid digest len {} in directory node", digest_len),
-                    ))
-                })?;
-
+        Node::Directory { digest, .. } => {
             // look it up with the directory service
             match directory_service
-                .get(&digest)
+                .get(digest)
                 .await
                 .map_err(|e| RenderError::StoreError(e.into()))?
             {
                 // if it's None, that's an error!
                 None => Err(RenderError::DirectoryNotFound(
-                    digest,
-                    proto_directory_node.name.clone(),
+                    digest.clone(),
+                    bytes::Bytes::copy_from_slice(name),
                 ))?,
-                Some(proto_directory) => {
+                Some(directory) => {
                     // start a directory node
                     let mut nar_node_directory = nar_node
                         .directory()
@@ -191,15 +176,16 @@ where
 
                     // for each node in the directory, create a new entry with its name,
                     // and then recurse on that entry.
-                    for proto_node in proto_directory.nodes() {
+                    for (name, node) in directory.nodes() {
                         let child_node = nar_node_directory
-                            .entry(proto_node.get_name())
+                            .entry(name.as_ref())
                             .await
                             .map_err(RenderError::NARWriterError)?;
 
                         (blob_service, directory_service) = Box::pin(walk_node(
                             child_node,
-                            &proto_node,
+                            node,
+                            name.as_ref(),
                             blob_service,
                             directory_service,
                         ))
diff --git a/tvix/store/src/nar/seekable.rs b/tvix/store/src/nar/seekable.rs
new file mode 100644
index 000000000000..951c1dfc0198
--- /dev/null
+++ b/tvix/store/src/nar/seekable.rs
@@ -0,0 +1,422 @@
+use std::{
+    cmp::min,
+    io,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use super::RenderError;
+
+use bytes::{BufMut, Bytes};
+
+use nix_compat::nar::writer::sync as nar_writer;
+use tvix_castore::blobservice::{BlobReader, BlobService};
+use tvix_castore::directoryservice::{
+    DirectoryGraph, DirectoryService, RootToLeavesValidator, ValidatedDirectoryGraph,
+};
+use tvix_castore::Directory;
+use tvix_castore::{B3Digest, Node};
+
+use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
+use futures::FutureExt;
+use futures::TryStreamExt;
+
+use tokio::io::AsyncSeekExt;
+
+#[derive(Debug)]
+struct BlobRef {
+    digest: B3Digest,
+    size: u64,
+}
+
+#[derive(Debug)]
+enum Data {
+    Literal(Bytes),
+    Blob(BlobRef),
+}
+
+impl Data {
+    pub fn len(&self) -> u64 {
+        match self {
+            Data::Literal(data) => data.len() as u64,
+            Data::Blob(BlobRef { size, .. }) => *size,
+        }
+    }
+}
+
+pub struct Reader<B: BlobService> {
+    segments: Vec<(u64, Data)>,
+    position_bytes: u64,
+    position_index: usize,
+    blob_service: Arc<B>,
+    seeking: bool,
+    current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
+}
+
+/// Used during construction.
+/// Converts the current buffer (passed as `cur_segment`) into a `Data::Literal` segment and
+/// inserts it into `self.segments`.
+fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
+    let segment_size = cur_segment.len();
+    segments.push((*offset, Data::Literal(cur_segment.into())));
+    *offset += segment_size as u64;
+}
+
+/// Used during construction.
+/// Recursively walks the node and its children, and fills `segments` with the appropriate
+/// `Data::Literal` and `Data::Blob` elements.
+fn walk_node(
+    segments: &mut Vec<(u64, Data)>,
+    offset: &mut u64,
+    get_directory: &impl Fn(&B3Digest) -> Directory,
+    node: Node,
+    // Includes a reference to the current segment's buffer
+    nar_node: nar_writer::Node<'_, Vec<u8>>,
+) -> Result<(), RenderError> {
+    match node {
+        tvix_castore::Node::Symlink { target } => {
+            nar_node
+                .symlink(target.as_ref())
+                .map_err(RenderError::NARWriterError)?;
+        }
+        tvix_castore::Node::File {
+            digest,
+            size,
+            executable,
+        } => {
+            let (cur_segment, skip) = nar_node
+                .file_manual_write(executable, size)
+                .map_err(RenderError::NARWriterError)?;
+
+            // Flush the segment up until the beginning of the blob
+            flush_segment(segments, offset, std::mem::take(cur_segment));
+
+            // Insert the blob segment
+            segments.push((*offset, Data::Blob(BlobRef { digest, size })));
+            *offset += size;
+
+            // Close the file node
+            // We **intentionally** do not write the file contents anywhere.
+            // Instead we have stored the blob reference in a Data::Blob segment,
+            // and the poll_read implementation will take care of serving the
+            // appropriate blob at this offset.
+            skip.close(cur_segment)
+                .map_err(RenderError::NARWriterError)?;
+        }
+        tvix_castore::Node::Directory { digest, .. } => {
+            let directory = get_directory(&digest);
+
+            // start a directory node
+            let mut nar_node_directory =
+                nar_node.directory().map_err(RenderError::NARWriterError)?;
+
+            // for each node in the directory, create a new entry with its name,
+            // and then recurse on that entry.
+            for (name, node) in directory.nodes() {
+                let child_node = nar_node_directory
+                    .entry(name.as_ref())
+                    .map_err(RenderError::NARWriterError)?;
+
+                walk_node(segments, offset, get_directory, node.clone(), child_node)?;
+            }
+
+            // close the directory
+            nar_node_directory
+                .close()
+                .map_err(RenderError::NARWriterError)?;
+        }
+    }
+    Ok(())
+}
+
+impl<B: BlobService + 'static> Reader<B> {
+    /// Creates a new seekable NAR renderer for the given castore root node.
+    ///
+    /// This function pre-fetches the directory closure using `get_recursive()` and assembles the
+    /// NAR structure, except the file contents which are stored as 'holes' with references to a blob
+    /// of a specific BLAKE3 digest and known size. The AsyncRead implementation will then switch
+    /// between serving the precomputed literal segments, and the appropriate blob for the file
+    /// contents.
+    pub async fn new(
+        root_node: Node,
+        blob_service: B,
+        directory_service: impl DirectoryService,
+    ) -> Result<Self, RenderError> {
+        let maybe_directory_closure = match &root_node {
+            // If this is a directory, resolve all subdirectories
+            Node::Directory { digest, .. } => {
+                let mut closure = DirectoryGraph::with_order(
+                    RootToLeavesValidator::new_with_root_digest(digest.clone()),
+                );
+                let mut stream = directory_service.get_recursive(digest);
+                while let Some(dir) = stream
+                    .try_next()
+                    .await
+                    .map_err(|e| RenderError::StoreError(e.into()))?
+                {
+                    closure.add(dir).map_err(|e| {
+                        RenderError::StoreError(
+                            tvix_castore::Error::StorageError(e.to_string()).into(),
+                        )
+                    })?;
+                }
+                Some(closure.validate().map_err(|e| {
+                    RenderError::StoreError(tvix_castore::Error::StorageError(e.to_string()).into())
+                })?)
+            }
+            // If the top-level node is a file or a symlink, just pass it on
+            Node::File { .. } => None,
+            Node::Symlink { .. } => None,
+        };
+
+        Self::new_with_directory_closure(root_node, blob_service, maybe_directory_closure)
+    }
+
+    /// Creates a new seekable NAR renderer for the given castore root node.
+    /// This version of the instantiation does not perform any I/O and as such is not async.
+    /// However it requires all directories to be passed as a ValidatedDirectoryGraph.
+    ///
+    /// panics if the directory closure is not the closure of the root node
+    pub fn new_with_directory_closure(
+        root_node: Node,
+        blob_service: B,
+        directory_closure: Option<ValidatedDirectoryGraph>,
+    ) -> Result<Self, RenderError> {
+        let directories = directory_closure
+            .map(|directory_closure| {
+                let mut directories: Vec<(B3Digest, Directory)> = vec![];
+                for dir in directory_closure.drain_root_to_leaves() {
+                    let digest = dir.digest();
+                    let pos = directories
+                        .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| {
+                            digest.as_slice()
+                        })
+                        .expect_err("duplicate directory"); // DirectoryGraph checks this
+                    directories.insert(pos, (digest, dir));
+                }
+                directories
+            })
+            .unwrap_or_default();
+
+        let mut segments = vec![];
+        let mut cur_segment: Vec<u8> = vec![];
+        let mut offset = 0;
+
+        let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
+
+        walk_node(
+            &mut segments,
+            &mut offset,
+            &|digest| {
+                directories
+                    .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| digest.as_slice())
+                    .map(|pos| directories[pos].clone())
+                    .expect("missing directory") // DirectoryGraph checks this
+                    .1
+            },
+            root_node,
+            nar_node,
+        )?;
+        // Flush the final segment
+        flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
+
+        Ok(Reader {
+            segments,
+            position_bytes: 0,
+            position_index: 0,
+            blob_service: blob_service.into(),
+            seeking: false,
+            current_blob: TryMaybeDone::Gone,
+        })
+    }
+
+    pub fn stream_len(&self) -> u64 {
+        self.segments
+            .last()
+            .map(|&(off, ref data)| off + data.len())
+            .expect("no segment found")
+    }
+}
+
+impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
+    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
+        let stream_len = Reader::stream_len(&self);
+
+        let this = &mut *self;
+        if this.seeking {
+            return Err(io::Error::new(io::ErrorKind::Other, "Already seeking"));
+        }
+        this.seeking = true;
+
+        // TODO(edef): be sane about overflows
+        let pos = match pos {
+            io::SeekFrom::Start(n) => n,
+            io::SeekFrom::End(n) => (stream_len as i64 + n) as u64,
+            io::SeekFrom::Current(n) => (this.position_bytes as i64 + n) as u64,
+        };
+
+        let prev_position_bytes = this.position_bytes;
+        let prev_position_index = this.position_index;
+
+        this.position_bytes = min(pos, stream_len);
+        this.position_index = match this
+            .segments
+            .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
+        {
+            Ok(idx) => idx,
+            Err(idx) => idx - 1,
+        };
+
+        let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
+            this.segments.get(this.position_index)
+        else {
+            // If not seeking into a blob, we clear the active blob reader and then we're done
+            this.current_blob = TryMaybeDone::Gone;
+            return Ok(());
+        };
+        let offset_in_segment = this.position_bytes - offset;
+
+        if prev_position_bytes == this.position_bytes {
+            // position has not changed. do nothing
+        } else if prev_position_index == this.position_index {
+            // seeking within the same segment, re-use the blob reader
+            let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
+            this.current_blob = futures::future::try_maybe_done(
+                (async move {
+                    let mut reader = Pin::new(&mut prev).take_output().unwrap();
+                    reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
+                    Ok(reader)
+                })
+                .boxed(),
+            );
+        } else {
+            // seek to a different segment
+            let blob_service = this.blob_service.clone();
+            let digest = digest.clone();
+            this.current_blob = futures::future::try_maybe_done(
+                (async move {
+                    let mut reader =
+                        blob_service
+                            .open_read(&digest)
+                            .await?
+                            .ok_or(io::Error::new(
+                                io::ErrorKind::NotFound,
+                                RenderError::BlobNotFound(digest.clone(), Default::default()),
+                            ))?;
+                    if offset_in_segment != 0 {
+                        reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
+                    }
+                    Ok(reader)
+                })
+                .boxed(),
+            );
+        };
+
+        Ok(())
+    }
+    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
+        let this = &mut *self;
+
+        if !this.current_blob.is_terminated() {
+            futures::ready!(this.current_blob.poll_unpin(cx))?;
+        }
+        this.seeking = false;
+
+        Poll::Ready(Ok(this.position_bytes))
+    }
+}
+
+impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
+    fn poll_read(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &mut tokio::io::ReadBuf,
+    ) -> Poll<io::Result<()>> {
+        let this = &mut *self;
+
+        let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
+            return Poll::Ready(Ok(())); // EOF
+        };
+
+        let prev_read_buf_pos = buf.filled().len();
+        match segment {
+            Data::Literal(data) => {
+                let offset_in_segment = this.position_bytes - offset;
+                let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
+                let remaining_data = data.len() - offset_in_segment;
+                let read_size = std::cmp::min(remaining_data, buf.remaining());
+                buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
+            }
+            Data::Blob(BlobRef { size, .. }) => {
+                futures::ready!(this.current_blob.poll_unpin(cx))?;
+                this.seeking = false;
+                let blob = Pin::new(&mut this.current_blob)
+                    .output_mut()
+                    .expect("missing blob");
+                futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
+                let read_length = buf.filled().len() - prev_read_buf_pos;
+                let maximum_expected_read_length = (offset + size) - this.position_bytes;
+                let is_eof = read_length == 0;
+                let too_much_returned = read_length as u64 > maximum_expected_read_length;
+                match (is_eof, too_much_returned) {
+                    (true, false) => {
+                        return Poll::Ready(Err(io::Error::new(
+                            io::ErrorKind::UnexpectedEof,
+                            "blob short read",
+                        )))
+                    }
+                    (false, true) => {
+                        buf.set_filled(prev_read_buf_pos);
+                        return Poll::Ready(Err(io::Error::new(
+                            io::ErrorKind::InvalidInput,
+                            "blob continued to yield data beyond end",
+                        )));
+                    }
+                    _ => {}
+                }
+            }
+        };
+        let new_read_buf_pos = buf.filled().len();
+        this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
+
+        let prev_position_index = this.position_index;
+        while {
+            if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
+                (this.position_bytes - offset) >= segment.len()
+            } else {
+                false
+            }
+        } {
+            this.position_index += 1;
+        }
+        if prev_position_index != this.position_index {
+            let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
+                this.segments.get(this.position_index)
+            else {
+                // If the next segment is not a blob, we clear the active blob reader and then we're done
+                this.current_blob = TryMaybeDone::Gone;
+                return Poll::Ready(Ok(()));
+            };
+
+            // The next segment is a blob, open the BlobReader
+            let blob_service = this.blob_service.clone();
+            let digest = digest.clone();
+            this.current_blob = futures::future::try_maybe_done(
+                (async move {
+                    let reader = blob_service
+                        .open_read(&digest)
+                        .await?
+                        .ok_or(io::Error::new(
+                            io::ErrorKind::NotFound,
+                            RenderError::BlobNotFound(digest.clone(), Default::default()),
+                        ))?;
+                    Ok(reader)
+                })
+                .boxed(),
+            );
+        }
+
+        Poll::Ready(Ok(()))
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs
index 7df9989fc522..15128986ff56 100644
--- a/tvix/store/src/pathinfoservice/bigtable.rs
+++ b/tvix/store/src/pathinfoservice/bigtable.rs
@@ -10,15 +10,17 @@ use nix_compat::nixbase32;
 use prost::Message;
 use serde::{Deserialize, Serialize};
 use serde_with::{serde_as, DurationSeconds};
+use std::sync::Arc;
 use tonic::async_trait;
 use tracing::{instrument, trace};
+use tvix_castore::composition::{CompositionContext, ServiceBuilder};
 use tvix_castore::Error;
 
 /// There should not be more than 10 MiB in a single cell.
 /// https://cloud.google.com/bigtable/docs/schema-design#cells
 const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
 
-/// Provides a [DirectoryService] implementation using
+/// Provides a [PathInfoService] implementation using
 /// [Bigtable](https://cloud.google.com/bigtable/docs/)
 /// as an underlying K/V store.
 ///
@@ -44,41 +46,6 @@ pub struct BigtablePathInfoService {
     emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
 }
 
-/// Represents configuration of [BigtablePathInfoService].
-/// This currently conflates both connect parameters and data model/client
-/// behaviour parameters.
-#[serde_as]
-#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
-pub struct BigtableParameters {
-    project_id: String,
-    instance_name: String,
-    #[serde(default)]
-    is_read_only: bool,
-    #[serde(default = "default_channel_size")]
-    channel_size: usize,
-
-    #[serde_as(as = "Option<DurationSeconds<String>>")]
-    #[serde(default = "default_timeout")]
-    timeout: Option<std::time::Duration>,
-    table_name: String,
-    family_name: String,
-
-    #[serde(default = "default_app_profile_id")]
-    app_profile_id: String,
-}
-
-fn default_app_profile_id() -> String {
-    "default".to_owned()
-}
-
-fn default_channel_size() -> usize {
-    4
-}
-
-fn default_timeout() -> Option<std::time::Duration> {
-    Some(std::time::Duration::from_secs(4))
-}
-
 impl BigtablePathInfoService {
     #[cfg(not(test))]
     pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
@@ -382,7 +349,9 @@ impl PathInfoService for BigtablePathInfoService {
                     .validate()
                     .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?;
 
-                if store_path.digest().as_slice() != row_key.as_slice() {
+                let exp_path_info_key = derive_pathinfo_key(store_path.digest());
+
+                if exp_path_info_key.as_bytes() != row_key.as_slice() {
                     Err(Error::StorageError("PathInfo has unexpected digest".into()))?
                 }
 
@@ -394,3 +363,88 @@ impl PathInfoService for BigtablePathInfoService {
         Box::pin(stream)
     }
 }
+
+/// Represents configuration of [BigtablePathInfoService].
+/// This currently conflates both connect parameters and data model/client
+/// behaviour parameters.
+#[serde_as]
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+pub struct BigtableParameters {
+    project_id: String,
+    instance_name: String,
+    #[serde(default)]
+    is_read_only: bool,
+    #[serde(default = "default_channel_size")]
+    channel_size: usize,
+
+    #[serde_as(as = "Option<DurationSeconds<String>>")]
+    #[serde(default = "default_timeout")]
+    timeout: Option<std::time::Duration>,
+    table_name: String,
+    family_name: String,
+
+    #[serde(default = "default_app_profile_id")]
+    app_profile_id: String,
+}
+
+impl BigtableParameters {
+    #[cfg(test)]
+    pub fn default_for_tests() -> Self {
+        Self {
+            project_id: "project-1".into(),
+            instance_name: "instance-1".into(),
+            is_read_only: false,
+            channel_size: default_channel_size(),
+            timeout: default_timeout(),
+            table_name: "table-1".into(),
+            family_name: "cf1".into(),
+            app_profile_id: default_app_profile_id(),
+        }
+    }
+}
+
+fn default_app_profile_id() -> String {
+    "default".to_owned()
+}
+
+fn default_channel_size() -> usize {
+    4
+}
+
+fn default_timeout() -> Option<std::time::Duration> {
+    Some(std::time::Duration::from_secs(4))
+}
+
+#[async_trait]
+impl ServiceBuilder for BigtableParameters {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
+        Ok(Arc::new(
+            BigtablePathInfoService::connect(self.clone()).await?,
+        ))
+    }
+}
+
+impl TryFrom<url::Url> for BigtableParameters {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
+        // parse the instance name from the hostname.
+        let instance_name = url
+            .host_str()
+            .ok_or_else(|| Error::StorageError("instance name missing".into()))?
+            .to_string();
+
+        // … but add it to the query string now, so we just need to parse that.
+        url.query_pairs_mut()
+            .append_pair("instance_name", &instance_name);
+
+        let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
+            .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
+
+        Ok(params)
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs
index 664144ef494b..bb5595f72b10 100644
--- a/tvix/store/src/pathinfoservice/combinators.rs
+++ b/tvix/store/src/pathinfoservice/combinators.rs
@@ -1,8 +1,11 @@
+use std::sync::Arc;
+
 use crate::proto::PathInfo;
 use futures::stream::BoxStream;
 use nix_compat::nixbase32;
 use tonic::async_trait;
 use tracing::{debug, instrument};
+use tvix_castore::composition::{CompositionContext, ServiceBuilder};
 use tvix_castore::Error;
 
 use super::PathInfoService;
@@ -61,6 +64,41 @@ where
     }
 }
 
+#[derive(serde::Deserialize)]
+pub struct CacheConfig {
+    pub near: String,
+    pub far: String,
+}
+
+impl TryFrom<url::Url> for CacheConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
+        Err(Error::StorageError(
+            "Instantiating a CombinedPathInfoService from a url is not supported".into(),
+        )
+        .into())
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for CacheConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let (near, far) = futures::join!(
+            context.resolve(self.near.clone()),
+            context.resolve(self.far.clone())
+        );
+        Ok(Arc::new(Cache {
+            near: near?,
+            far: far?,
+        }))
+    }
+}
+
 #[cfg(test)]
 mod test {
     use std::num::NonZeroUsize;
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 455909e7f235..d4719219b996 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -1,13 +1,10 @@
-use crate::proto::path_info_service_client::PathInfoServiceClient;
+use super::PathInfoService;
 
-use super::{
-    GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService,
-    SledPathInfoService,
+use crate::composition::{
+    with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG,
 };
-
-use nix_compat::narinfo;
 use std::sync::Arc;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+use tvix_castore::Error;
 use url::Url;
 
 /// Constructs a new instance of a [PathInfoService] from an URI.
@@ -20,6 +17,11 @@ use url::Url;
 /// - `sled:///absolute/path/to/somewhere`
 ///   Uses sled, using a path on the disk for persistency. Can be only opened
 ///   from one process at the same time.
+/// - `redb:`
+///   Uses a in-memory redb implementation.
+/// - `redb:///absolute/path/to/somewhere`
+///   Uses redb, using a path on the disk for persistency. Can be only opened
+///   from one process at the same time.
 /// - `nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=`
 ///   Exposes the Nix binary cache as a PathInfoService, ingesting NARs into the
 ///   {Blob,Directory}Service. You almost certainly want to use this with some cache.
@@ -34,110 +36,21 @@ use url::Url;
 /// these also need to be passed in.
 pub async fn from_addr(
     uri: &str,
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-) -> Result<Box<dyn PathInfoService>, Error> {
+    context: Option<&CompositionContext<'_>>,
+) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
     #[allow(unused_mut)]
     let mut url =
         Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
 
-    let path_info_service: Box<dyn PathInfoService> = match url.scheme() {
-        "memory" => {
-            // memory doesn't support host or path in the URL.
-            if url.has_host() || !url.path().is_empty() {
-                return Err(Error::StorageError("invalid url".to_string()));
-            }
-            Box::<MemoryPathInfoService>::default()
-        }
-        "sled" => {
-            // sled doesn't support host, and a path can be provided (otherwise
-            // it'll live in memory only).
-            if url.has_host() {
-                return Err(Error::StorageError("no host allowed".to_string()));
-            }
-
-            if url.path() == "/" {
-                return Err(Error::StorageError(
-                    "cowardly refusing to open / with sled".to_string(),
-                ));
-            }
-
-            // TODO: expose other parameters as URL parameters?
-
-            Box::new(if url.path().is_empty() {
-                SledPathInfoService::new_temporary()
-                    .map_err(|e| Error::StorageError(e.to_string()))?
-            } else {
-                SledPathInfoService::new(url.path())
-                    .map_err(|e| Error::StorageError(e.to_string()))?
-            })
-        }
-        "nix+http" | "nix+https" => {
-            // Stringify the URL and remove the nix+ prefix.
-            // We can't use `url.set_scheme(rest)`, as it disallows
-            // setting something http(s) that previously wasn't.
-            let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap();
-
-            let mut nix_http_path_info_service =
-                NixHTTPPathInfoService::new(new_url, blob_service, directory_service);
-
-            let pairs = &url.query_pairs();
-            for (k, v) in pairs.into_iter() {
-                if k == "trusted-public-keys" {
-                    let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect();
-
-                    let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len());
-                    for pubkey_str in pubkey_strs {
-                        pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| {
-                            Error::StorageError(format!("invalid public key: {e}"))
-                        })?);
-                    }
-
-                    nix_http_path_info_service.set_public_keys(pubkeys);
-                }
-            }
-
-            Box::new(nix_http_path_info_service)
-        }
-        scheme if scheme.starts_with("grpc+") => {
-            // schemes starting with grpc+ go to the GRPCPathInfoService.
-            //   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
-            // - In the case of unix sockets, there must be a path, but may not be a host.
-            // - In the case of non-unix sockets, there must be a host, but no path.
-            // Constructing the channel is handled by tvix_castore::channel::from_url.
-            let client =
-                PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?);
-            Box::new(GRPCPathInfoService::from_client(client))
-        }
-        #[cfg(feature = "cloud")]
-        "bigtable" => {
-            use super::bigtable::BigtableParameters;
-            use super::BigtablePathInfoService;
-
-            // parse the instance name from the hostname.
-            let instance_name = url
-                .host_str()
-                .ok_or_else(|| Error::StorageError("instance name missing".into()))?
-                .to_string();
-
-            // … but add it to the query string now, so we just need to parse that.
-            url.query_pairs_mut()
-                .append_pair("instance_name", &instance_name);
-
-            let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
-                .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
-
-            Box::new(
-                BigtablePathInfoService::connect(params)
-                    .await
-                    .map_err(|e| Error::StorageError(e.to_string()))?,
-            )
-        }
-        _ => Err(Error::StorageError(format!(
-            "unknown scheme: {}",
-            url.scheme()
-        )))?,
-    };
+    let path_info_service_config = with_registry(&REG, || {
+        <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>>::try_from(
+            url,
+        )
+    })?
+    .0;
+    let path_info_service = path_info_service_config
+        .build("anonymous", context.unwrap_or(&CompositionContext::blank()))
+        .await?;
 
     Ok(path_info_service)
 }
@@ -145,18 +58,18 @@ pub async fn from_addr(
 #[cfg(test)]
 mod tests {
     use super::from_addr;
+    use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder};
     use lazy_static::lazy_static;
     use rstest::rstest;
-    use std::sync::Arc;
     use tempfile::TempDir;
-    use tvix_castore::{
-        blobservice::{BlobService, MemoryBlobService},
-        directoryservice::{DirectoryService, MemoryDirectoryService},
-    };
+    use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig};
+    use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig};
 
     lazy_static! {
         static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
         static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap();
+        static ref TMPDIR_REDB_1: TempDir = TempDir::new().unwrap();
+        static ref TMPDIR_REDB_2: TempDir = TempDir::new().unwrap();
     }
 
     // the gRPC tests below don't fail, because we connect lazily.
@@ -182,6 +95,14 @@ mod tests {
     #[case::memory_invalid_root_path("memory:///", false)]
     /// This sets a memory url path to "/foo", which is invalid.
     #[case::memory_invalid_root_path_foo("memory:///foo", false)]
+    /// redb with a host, and a valid path path, which should fail.
+    #[case::redb_invalid_host_with_valid_path(&format!("redb://foo.example{}", &TMPDIR_REDB_1.path().join("bar").to_str().unwrap()), false)]
+    /// redb with / as path, which should fail.
+    #[case::redb_invalid_root("redb:///", false)]
+    /// This configures redb with a valid path, which should succeed.
+    #[case::redb_valid_path(&format!("redb://{}", &TMPDIR_REDB_2.path().join("foo").to_str().unwrap()), true)]
+    /// redb using the in-memory backend, which should succeed.
+    #[case::redb_valid_in_memory("redb://", true)]
     /// Correct Scheme for the cache.nixos.org binary cache.
     #[case::correct_nix_https("nix+https://cache.nixos.org", true)]
     /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL).
@@ -221,11 +142,19 @@ mod tests {
     )]
     #[tokio::test]
     async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
-        let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
-        let directory_service: Arc<dyn DirectoryService> =
-            Arc::from(MemoryDirectoryService::default());
-
-        let resp = from_addr(uri_str, blob_service, directory_service).await;
+        let mut comp = Composition::default();
+        comp.extend(vec![(
+            "default".into(),
+            DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {})
+                as Box<dyn ServiceBuilder<Output = dyn BlobService>>),
+        )]);
+        comp.extend(vec![(
+            "default".into(),
+            DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {})
+                as Box<dyn ServiceBuilder<Output = dyn DirectoryService>>),
+        )]);
+
+        let resp = from_addr(uri_str, Some(&comp.context())).await;
 
         if exp_succeed {
             resp.expect("should succeed");
diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs
index aa64b1c01f16..1f7fa8a8afce 100644
--- a/tvix/store/src/pathinfoservice/fs/mod.rs
+++ b/tvix/store/src/pathinfoservice/fs/mod.rs
@@ -1,10 +1,10 @@
 use futures::stream::BoxStream;
 use futures::StreamExt;
+use nix_compat::store_path::StorePathRef;
 use tonic::async_trait;
 use tvix_castore::fs::{RootNodes, TvixStoreFs};
-use tvix_castore::proto as castorepb;
-use tvix_castore::Error;
 use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
+use tvix_castore::{Error, Node, PathComponent};
 
 use super::PathInfoService;
 
@@ -48,8 +48,8 @@ impl<T> RootNodes for RootNodesWrapper<T>
 where
     T: AsRef<dyn PathInfoService> + Send + Sync,
 {
-    async fn get_by_basename(&self, name: &[u8]) -> Result<Option<castorepb::node::Node>, Error> {
-        let Ok(store_path) = nix_compat::store_path::StorePath::from_bytes(name) else {
+    async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> {
+        let Ok(store_path) = StorePathRef::from_bytes(name.as_ref()) else {
             return Ok(None);
         };
 
@@ -59,22 +59,31 @@ where
             .get(*store_path.digest())
             .await?
             .map(|path_info| {
-                path_info
+                let node = path_info
                     .node
+                    .as_ref()
                     .expect("missing root node")
-                    .node
-                    .expect("empty node")
-            }))
+                    .to_owned();
+
+                match node.into_name_and_node() {
+                    Ok((_name, node)) => Ok(node),
+                    Err(e) => Err(Error::StorageError(e.to_string())),
+                }
+            })
+            .transpose()?)
     }
 
-    fn list(&self) -> BoxStream<Result<castorepb::node::Node, Error>> {
+    fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> {
         Box::pin(self.0.as_ref().list().map(|result| {
-            result.map(|path_info| {
-                path_info
+            result.and_then(|path_info| {
+                let node = path_info
                     .node
+                    .as_ref()
                     .expect("missing root node")
-                    .node
-                    .expect("empty node")
+                    .to_owned();
+
+                node.into_name_and_node()
+                    .map_err(|e| Error::StorageError(e.to_string()))
             })
         }))
     }
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 93d2d67c31dc..7510ccd911f0 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -6,30 +6,40 @@ use crate::{
 use async_stream::try_stream;
 use futures::stream::BoxStream;
 use nix_compat::nixbase32;
-use tonic::{async_trait, transport::Channel, Code};
-use tracing::instrument;
-use tvix_castore::{proto as castorepb, Error};
+use std::sync::Arc;
+use tonic::{async_trait, Code};
+use tracing::{instrument, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tvix_castore::composition::{CompositionContext, ServiceBuilder};
+use tvix_castore::Error;
+use tvix_castore::Node;
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
 #[derive(Clone)]
-pub struct GRPCPathInfoService {
+pub struct GRPCPathInfoService<T> {
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
-    grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+    grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
 }
 
-impl GRPCPathInfoService {
+impl<T> GRPCPathInfoService<T> {
     /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient].
     /// panics if called outside the context of a tokio runtime.
     pub fn from_client(
-        grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+        grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>,
     ) -> Self {
         Self { grpc_client }
     }
 }
 
 #[async_trait]
-impl PathInfoService for GRPCPathInfoService {
+impl<T> PathInfoService for GRPCPathInfoService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
+    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
+    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
+    T::Future: Send,
+{
     #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         let path_info = self
@@ -106,18 +116,26 @@ impl PathInfoService for GRPCPathInfoService {
 }
 
 #[async_trait]
-impl NarCalculationService for GRPCPathInfoService {
-    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
+impl<T> NarCalculationService for GRPCPathInfoService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
+    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
+    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
+    T::Future: Send,
+{
+    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=1))]
+    async fn calculate_nar(&self, root_node: &Node) -> Result<(u64, [u8; 32]), Error> {
+        let span = Span::current();
+        span.pb_set_message("Waiting for NAR calculation");
+        span.pb_start();
+
         let path_info = self
             .grpc_client
             .clone()
-            .calculate_nar(castorepb::Node {
-                node: Some(root_node.clone()),
-            })
+            .calculate_nar(tvix_castore::proto::Node::from_name_and_node(
+                "".into(),
+                root_node.to_owned(),
+            ))
             .await
             .map_err(|e| Error::StorageError(e.to_string()))?
             .into_inner();
@@ -132,9 +150,44 @@ impl NarCalculationService for GRPCPathInfoService {
     }
 }
 
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct GRPCPathInfoServiceConfig {
+    url: String,
+}
+
+impl TryFrom<url::Url> for GRPCPathInfoServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        //   normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+        // - In the case of unix sockets, there must be a path, but may not be a host.
+        // - In the case of non-unix sockets, there must be a host, but no path.
+        // Constructing the channel is handled by tvix_castore::channel::from_url.
+        Ok(GRPCPathInfoServiceConfig {
+            url: url.to_string(),
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for GRPCPathInfoServiceConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let client = proto::path_info_service_client::PathInfoServiceClient::new(
+            tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?,
+        );
+        Ok(Arc::new(GRPCPathInfoService::from_client(client)))
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
+    use crate::pathinfoservice::PathInfoService;
     use crate::tests::fixtures;
 
     /// This ensures connecting via gRPC works as expected.
diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs
index da674f497ad6..695c04636089 100644
--- a/tvix/store/src/pathinfoservice/lru.rs
+++ b/tvix/store/src/pathinfoservice/lru.rs
@@ -9,6 +9,7 @@ use tonic::async_trait;
 use tracing::instrument;
 
 use crate::proto::PathInfo;
+use tvix_castore::composition::{CompositionContext, ServiceBuilder};
 use tvix_castore::Error;
 
 use super::PathInfoService;
@@ -60,6 +61,34 @@ impl PathInfoService for LruPathInfoService {
     }
 }
 
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct LruPathInfoServiceConfig {
+    capacity: NonZeroUsize,
+}
+
+impl TryFrom<url::Url> for LruPathInfoServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
+        Err(Error::StorageError(
+            "Instantiating a LruPathInfoService from a url is not supported".into(),
+        )
+        .into())
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for LruPathInfoServiceConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity)))
+    }
+}
+
 #[cfg(test)]
 mod test {
     use std::num::NonZeroUsize;
@@ -79,8 +108,17 @@ mod test {
             let mut p = PATHINFO_1.clone();
             let root_node = p.node.as_mut().unwrap();
             if let castorepb::Node { node: Some(node) } = root_node {
-                let n = node.to_owned();
-                *node = n.rename("11111111111111111111111111111111-dummy2".into());
+                match node {
+                    castorepb::node::Node::Directory(n) => {
+                        n.name = "11111111111111111111111111111111-dummy2".into()
+                    }
+                    castorepb::node::Node::File(n) => {
+                        n.name = "11111111111111111111111111111111-dummy2".into()
+                    }
+                    castorepb::node::Node::Symlink(n) => {
+                        n.name = "11111111111111111111111111111111-dummy2".into()
+                    }
+                }
             } else {
                 unreachable!()
             }
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
index 3de3221df27e..3fabd239c7b1 100644
--- a/tvix/store/src/pathinfoservice/memory.rs
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -7,6 +7,7 @@ use std::{collections::HashMap, sync::Arc};
 use tokio::sync::RwLock;
 use tonic::async_trait;
 use tracing::instrument;
+use tvix_castore::composition::{CompositionContext, ServiceBuilder};
 use tvix_castore::Error;
 
 #[derive(Default)]
@@ -59,3 +60,30 @@ impl PathInfoService for MemoryPathInfoService {
         })
     }
 }
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct MemoryPathInfoServiceConfig {}
+
+impl TryFrom<url::Url> for MemoryPathInfoServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        // memory doesn't support host or path in the URL.
+        if url.has_host() || !url.path().is_empty() {
+            return Err(Error::StorageError("invalid url".to_string()).into());
+        }
+        Ok(MemoryPathInfoServiceConfig {})
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for MemoryPathInfoServiceConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        Ok(Arc::new(MemoryPathInfoService::default()))
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index 574bcc0b8b88..06ff74b519d8 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -4,6 +4,8 @@ mod grpc;
 mod lru;
 mod memory;
 mod nix_http;
+mod redb;
+mod signing_wrapper;
 mod sled;
 
 #[cfg(any(feature = "fuse", feature = "virtiofs"))]
@@ -14,22 +16,31 @@ mod tests;
 
 use futures::stream::BoxStream;
 use tonic::async_trait;
+use tvix_castore::composition::{Registry, ServiceBuilder};
 use tvix_castore::Error;
 
+use crate::nar::NarCalculationService;
 use crate::proto::PathInfo;
 
-pub use self::combinators::Cache as CachePathInfoService;
+pub use self::combinators::{
+    Cache as CachePathInfoService, CacheConfig as CachePathInfoServiceConfig,
+};
 pub use self::from_addr::from_addr;
-pub use self::grpc::GRPCPathInfoService;
-pub use self::lru::LruPathInfoService;
-pub use self::memory::MemoryPathInfoService;
-pub use self::nix_http::NixHTTPPathInfoService;
-pub use self::sled::SledPathInfoService;
+pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig};
+pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig};
+pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig};
+pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig};
+pub use self::redb::{RedbPathInfoService, RedbPathInfoServiceConfig};
+pub use self::signing_wrapper::{KeyFileSigningPathInfoServiceConfig, SigningPathInfoService};
+pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig};
+
+#[cfg(test)]
+pub(crate) use self::signing_wrapper::test_signing_service;
 
 #[cfg(feature = "cloud")]
 mod bigtable;
 #[cfg(feature = "cloud")]
-pub use self::bigtable::BigtablePathInfoService;
+pub use self::bigtable::{BigtableParameters, BigtablePathInfoService};
 
 #[cfg(any(feature = "fuse", feature = "virtiofs"))]
 pub use self::fs::make_fs;
@@ -52,12 +63,16 @@ pub trait PathInfoService: Send + Sync {
     /// Rust doesn't support this as a generic in traits yet. This is the same thing that
     /// [async_trait] generates, but for streams instead of futures.
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>;
+
+    fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> {
+        None
+    }
 }
 
 #[async_trait]
 impl<A> PathInfoService for A
 where
-    A: AsRef<dyn PathInfoService> + Send + Sync,
+    A: AsRef<dyn PathInfoService> + Send + Sync + 'static,
 {
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         self.as_ref().get(digest).await
@@ -71,3 +86,21 @@ where
         self.as_ref().list()
     }
 }
+
+/// Registers the builtin PathInfoService implementations with the registry
+pub(crate) fn register_pathinfo_services(reg: &mut Registry) {
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, CachePathInfoServiceConfig>("cache");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, GRPCPathInfoServiceConfig>("grpc");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, LruPathInfoServiceConfig>("lru");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, MemoryPathInfoServiceConfig>("memory");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, NixHTTPPathInfoServiceConfig>("nix");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, SledPathInfoServiceConfig>("sled");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, RedbPathInfoServiceConfig>("redb");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, KeyFileSigningPathInfoServiceConfig>("keyfile-signing");
+    #[cfg(feature = "cloud")]
+    {
+        reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, BigtableParameters>(
+            "bigtable",
+        );
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index cccd4805c6ca..5f1eed1a0a9f 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -1,3 +1,5 @@
+use super::PathInfoService;
+use crate::{nar::ingest_nar_and_hash, proto::PathInfo};
 use futures::{stream::BoxStream, TryStreamExt};
 use nix_compat::{
     narinfo::{self, NarInfo},
@@ -5,19 +7,15 @@ use nix_compat::{
     nixhash::NixHash,
 };
 use reqwest::StatusCode;
-use sha2::Digest;
-use std::io::{self, Write};
-use tokio::io::{AsyncRead, BufReader};
-use tokio_util::io::InspectReader;
+use std::sync::Arc;
+use tokio::io::{self, AsyncRead};
 use tonic::async_trait;
 use tracing::{debug, instrument, warn};
+use tvix_castore::composition::{CompositionContext, ServiceBuilder};
 use tvix_castore::{
     blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
 };
-
-use crate::proto::PathInfo;
-
-use super::PathInfoService;
+use url::Url;
 
 /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
 /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
@@ -36,21 +34,23 @@ use super::PathInfoService;
 /// TODO: what about reading from nix-cache-info?
 pub struct NixHTTPPathInfoService<BS, DS> {
     base_url: url::Url,
-    http_client: reqwest::Client,
+    http_client: reqwest_middleware::ClientWithMiddleware,
 
     blob_service: BS,
     directory_service: DS,
 
     /// An optional list of [narinfo::PubKey].
     /// If set, the .narinfo files received need to have correct signature by at least one of these.
-    public_keys: Option<Vec<narinfo::PubKey>>,
+    public_keys: Option<Vec<narinfo::VerifyingKey>>,
 }
 
 impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
     pub fn new(base_url: url::Url, blob_service: BS, directory_service: DS) -> Self {
         Self {
             base_url,
-            http_client: reqwest::Client::new(),
+            http_client: reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
+                .with(tvix_tracing::propagate::reqwest::tracing_middleware())
+                .build(),
             blob_service,
             directory_service,
 
@@ -59,7 +59,7 @@ impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
     }
 
     /// Configures [Self] to validate NARInfo fingerprints with the public keys passed.
-    pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::PubKey>) {
+    pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::VerifyingKey>) {
         self.public_keys = Some(public_keys);
     }
 }
@@ -178,9 +178,9 @@ where
         }));
 
         // handle decompression, depending on the compression field.
-        let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
-            Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
-            Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
+        let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
+            None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
+            Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
                 as Box<dyn AsyncRead + Send + Unpin>,
             Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
                 as Box<dyn AsyncRead + Send + Unpin>,
@@ -194,19 +194,8 @@ where
                 )));
             }
         };
-        let mut nar_hash = sha2::Sha256::new();
-        let mut nar_size = 0;
-
-        // Assemble NarHash and NarSize as we read bytes.
-        let r = InspectReader::new(r, |b| {
-            nar_size += b.len() as u64;
-            nar_hash.write_all(b).unwrap();
-        });
-
-        // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors.
-        let mut r = BufReader::new(r);
 
-        let root_node = crate::nar::ingest_nar(
+        let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
             self.blob_service.clone(),
             self.directory_service.clone(),
             &mut r,
@@ -226,7 +215,6 @@ where
                 "NarSize mismatch".to_string(),
             ))?;
         }
-        let nar_hash: [u8; 32] = nar_hash.finalize().into();
         if narinfo.nar_hash != nar_hash {
             warn!(
                 narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
@@ -240,10 +228,10 @@ where
         }
 
         Ok(Some(PathInfo {
-            node: Some(castorepb::Node {
-                // set the name of the root node to the digest-name of the store path.
-                node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
-            }),
+            node: Some(castorepb::Node::from_name_and_node(
+                narinfo.store_path.to_string().into(),
+                root_node,
+            )),
             references: pathinfo.references,
             narinfo: pathinfo.narinfo,
         }))
@@ -264,3 +252,71 @@ where
         }))
     }
 }
+
+#[derive(serde::Deserialize)]
+pub struct NixHTTPPathInfoServiceConfig {
+    base_url: String,
+    blob_service: String,
+    directory_service: String,
+    #[serde(default)]
+    /// An optional list of [narinfo::PubKey].
+    /// If set, the .narinfo files received need to have correct signature by at least one of these.
+    public_keys: Option<Vec<String>>,
+}
+
+impl TryFrom<Url> for NixHTTPPathInfoServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: Url) -> Result<Self, Self::Error> {
+        let mut public_keys: Option<Vec<String>> = None;
+        for (_, v) in url
+            .query_pairs()
+            .into_iter()
+            .filter(|(k, _)| k == "trusted-public-keys")
+        {
+            public_keys
+                .get_or_insert(Default::default())
+                .extend(v.split_ascii_whitespace().map(ToString::to_string));
+        }
+        Ok(NixHTTPPathInfoServiceConfig {
+            // Stringify the URL and remove the nix+ prefix.
+            // We can't use `url.set_scheme(rest)`, as it disallows
+            // setting something http(s) that previously wasn't.
+            base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(),
+            blob_service: "default".to_string(),
+            directory_service: "default".to_string(),
+            public_keys,
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let (blob_service, directory_service) = futures::join!(
+            context.resolve(self.blob_service.clone()),
+            context.resolve(self.directory_service.clone())
+        );
+        let mut svc = NixHTTPPathInfoService::new(
+            Url::parse(&self.base_url)?,
+            blob_service?,
+            directory_service?,
+        );
+        if let Some(public_keys) = &self.public_keys {
+            svc.set_public_keys(
+                public_keys
+                    .iter()
+                    .map(|pubkey_str| {
+                        narinfo::VerifyingKey::parse(pubkey_str)
+                            .map_err(|e| Error::StorageError(format!("invalid public key: {e}")))
+                    })
+                    .collect::<Result<Vec<_>, Error>>()?,
+            );
+        }
+        Ok(Arc::new(svc))
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/redb.rs b/tvix/store/src/pathinfoservice/redb.rs
new file mode 100644
index 000000000000..bd0e0fc2b686
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/redb.rs
@@ -0,0 +1,218 @@
+use super::PathInfoService;
+use crate::proto::PathInfo;
+use data_encoding::BASE64;
+use futures::{stream::BoxStream, StreamExt};
+use prost::Message;
+use redb::{Database, ReadableTable, TableDefinition};
+use std::{path::PathBuf, sync::Arc};
+use tokio_stream::wrappers::ReceiverStream;
+use tonic::async_trait;
+use tracing::{instrument, warn};
+use tvix_castore::{
+    composition::{CompositionContext, ServiceBuilder},
+    Error,
+};
+
+const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo");
+
+/// PathInfoService implementation using redb under the hood.
+/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to
+/// its corresponding protobuf-encoded PathInfo.
+pub struct RedbPathInfoService {
+    // We wrap db in an Arc to be able to move it into spawn_blocking,
+    // as discussed in https://github.com/cberner/redb/issues/789
+    db: Arc<Database>,
+}
+
+impl RedbPathInfoService {
+    /// Constructs a new instance using the specified file system path for
+    /// storage.
+    pub async fn new(path: PathBuf) -> Result<Self, Error> {
+        if path == PathBuf::from("/") {
+            return Err(Error::StorageError(
+                "cowardly refusing to open / with redb".to_string(),
+            ));
+        }
+
+        let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> {
+            let db = redb::Database::create(path)?;
+            create_schema(&db)?;
+            Ok(db)
+        })
+        .await??;
+
+        Ok(Self { db: Arc::new(db) })
+    }
+
+    /// Constructs a new instance using the in-memory backend.
+    pub fn new_temporary() -> Result<Self, Error> {
+        let db =
+            redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
+
+        create_schema(&db)?;
+
+        Ok(Self { db: Arc::new(db) })
+    }
+}
+
+/// Ensures all tables are present.
+/// Opens a write transaction and calls open_table on PATHINFO_TABLE, which will
+/// create it if not present.
+fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
+    let txn = db.begin_write()?;
+    txn.open_table(PATHINFO_TABLE)?;
+    txn.commit()?;
+
+    Ok(())
+}
+
+#[async_trait]
+impl PathInfoService for RedbPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let db = self.db.clone();
+
+        tokio::task::spawn_blocking({
+            move || {
+                let txn = db.begin_read()?;
+                let table = txn.open_table(PATHINFO_TABLE)?;
+                match table.get(digest)? {
+                    Some(pathinfo_bytes) => Ok(Some(
+                        PathInfo::decode(pathinfo_bytes.value().as_slice()).map_err(|e| {
+                            warn!(err=%e, "failed to decode stored PathInfo");
+                            Error::StorageError("failed to decode stored PathInfo".to_string())
+                        })?,
+                    )),
+                    None => Ok(None),
+                }
+            }
+        })
+        .await?
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Call validate on the received PathInfo message.
+        let store_path = path_info
+            .validate()
+            .map_err(|e| {
+                warn!(err=%e, "failed to validate PathInfo");
+                Error::StorageError("failed to validate PathInfo".to_string())
+            })?
+            .to_owned();
+
+        let path_info_encoded = path_info.encode_to_vec();
+        let db = self.db.clone();
+
+        tokio::task::spawn_blocking({
+            move || -> Result<(), Error> {
+                let txn = db.begin_write()?;
+                {
+                    let mut table = txn.open_table(PATHINFO_TABLE)?;
+                    table
+                        .insert(store_path.digest(), path_info_encoded)
+                        .map_err(|e| {
+                            warn!(err=%e, "failed to insert PathInfo");
+                            Error::StorageError("failed to insert PathInfo".to_string())
+                        })?;
+                }
+                Ok(txn.commit()?)
+            }
+        })
+        .await??;
+
+        Ok(path_info)
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let db = self.db.clone();
+        let (tx, rx) = tokio::sync::mpsc::channel(50);
+
+        // Spawn a blocking task which writes all PathInfos to tx.
+        tokio::task::spawn_blocking({
+            move || -> Result<(), Error> {
+                let read_txn = db.begin_read()?;
+                let table = read_txn.open_table(PATHINFO_TABLE)?;
+
+                for elem in table.iter()? {
+                    let elem = elem?;
+                    tokio::runtime::Handle::current()
+                        .block_on(tx.send(Ok(
+                            PathInfo::decode(elem.1.value().as_slice()).map_err(|e| {
+                                warn!(err=%e, "invalid PathInfo");
+                                Error::StorageError("invalid PathInfo".to_string())
+                            })?,
+                        )))
+                        .map_err(|e| Error::StorageError(e.to_string()))?;
+                }
+
+                Ok(())
+            }
+        });
+
+        ReceiverStream::from(rx).boxed()
+    }
+}
+
+#[derive(serde::Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct RedbPathInfoServiceConfig {
+    is_temporary: bool,
+    #[serde(default)]
+    /// required when is_temporary = false
+    path: Option<PathBuf>,
+}
+
+impl TryFrom<url::Url> for RedbPathInfoServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        // redb doesn't support host, and a path can be provided (otherwise it'll live in memory only)
+        if url.has_host() {
+            return Err(Error::StorageError("no host allowed".to_string()).into());
+        }
+
+        Ok(if url.path().is_empty() {
+            RedbPathInfoServiceConfig {
+                is_temporary: true,
+                path: None,
+            }
+        } else {
+            RedbPathInfoServiceConfig {
+                is_temporary: false,
+                path: Some(url.path().into()),
+            }
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for RedbPathInfoServiceConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        match self {
+            RedbPathInfoServiceConfig {
+                is_temporary: true,
+                path: None,
+            } => Ok(Arc::new(RedbPathInfoService::new_temporary()?)),
+            RedbPathInfoServiceConfig {
+                is_temporary: true,
+                path: Some(_),
+            } => Err(
+                Error::StorageError("Temporary RedbPathInfoService can not have path".into())
+                    .into(),
+            ),
+            RedbPathInfoServiceConfig {
+                is_temporary: false,
+                path: None,
+            } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()),
+            RedbPathInfoServiceConfig {
+                is_temporary: false,
+                path: Some(path),
+            } => Ok(Arc::new(RedbPathInfoService::new(path.to_owned()).await?)),
+        }
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/signing_wrapper.rs b/tvix/store/src/pathinfoservice/signing_wrapper.rs
new file mode 100644
index 000000000000..7f754ec49849
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/signing_wrapper.rs
@@ -0,0 +1,204 @@
+//! This module provides a [PathInfoService] implementation that signs narinfos
+
+use super::PathInfoService;
+use crate::proto::PathInfo;
+use futures::stream::BoxStream;
+use std::path::PathBuf;
+use std::sync::Arc;
+use tonic::async_trait;
+
+use tvix_castore::composition::{CompositionContext, ServiceBuilder};
+
+use tvix_castore::Error;
+
+use nix_compat::narinfo::{parse_keypair, SigningKey};
+use nix_compat::nixbase32;
+use tracing::{instrument, warn};
+
+#[cfg(test)]
+use super::MemoryPathInfoService;
+
+/// PathInfoService that wraps around an inner [PathInfoService] and when put is called it extracts
+/// the underlying narinfo and signs it using a [SigningKey]. For the moment only the
+/// [ed25519::signature::Signer<ed25519::Signature>] is available using a keyfile (see
+/// [KeyFileSigningPathInfoServiceConfig] for more informations). However the implementation is
+/// generic (see [nix_compat::narinfo::SigningKey] documentation).
+///
+/// The [PathInfo] with the added signature is then put into the inner [PathInfoService].
+///
+/// The service signs the [PathInfo] **only if it has a narinfo attribute**
+pub struct SigningPathInfoService<T, S> {
+    /// The inner [PathInfoService]
+    inner: T,
+    /// The key to sign narinfos
+    signing_key: Arc<SigningKey<S>>,
+}
+
+impl<T, S> SigningPathInfoService<T, S> {
+    pub fn new(inner: T, signing_key: Arc<SigningKey<S>>) -> Self {
+        Self { inner, signing_key }
+    }
+}
+
+#[async_trait]
+impl<T, S> PathInfoService for SigningPathInfoService<T, S>
+where
+    T: PathInfoService,
+    S: ed25519::signature::Signer<ed25519::Signature> + Sync + Send,
+{
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        self.inner.get(digest).await
+    }
+
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        let store_path = path_info.validate().map_err(|e| {
+            warn!(err=%e, "invalid PathInfo");
+            Error::StorageError(e.to_string())
+        })?;
+        let root_node = path_info.node.clone();
+        // If we have narinfo then sign it, else passthrough to the upper pathinfoservice
+        let path_info_to_put = match path_info.to_narinfo(store_path.as_ref()) {
+            Some(mut nar_info) => {
+                nar_info.add_signature(self.signing_key.as_ref());
+                let mut signed_path_info = PathInfo::from(&nar_info);
+                signed_path_info.node = root_node;
+                signed_path_info
+            }
+            None => path_info,
+        };
+        self.inner.put(path_info_to_put).await
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        self.inner.list()
+    }
+}
+
+/// [ServiceBuilder] implementation that builds a [SigningPathInfoService] that signs narinfos using
+/// a keyfile. The keyfile is parsed using [parse_keypair], the expected format is the nix one
+/// (`nix-store --generate-binary-cache-key` for more informations).
+#[derive(serde::Deserialize)]
+pub struct KeyFileSigningPathInfoServiceConfig {
+    /// Inner [PathInfoService], will be resolved using a [CompositionContext].
+    pub inner: String,
+    /// Path to the keyfile in the nix format. It will be accessed once when building the service
+    pub keyfile: PathBuf,
+}
+
+impl TryFrom<url::Url> for KeyFileSigningPathInfoServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
+        Err(Error::StorageError(
+            "Instantiating a SigningPathInfoService from a url is not supported".into(),
+        )
+        .into())
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for KeyFileSigningPathInfoServiceConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let inner = context.resolve(self.inner.clone()).await?;
+        let signing_key = Arc::new(
+            parse_keypair(tokio::fs::read_to_string(&self.keyfile).await?.trim())
+                .map_err(|e| Error::StorageError(e.to_string()))?
+                .0,
+        );
+        Ok(Arc::new(SigningPathInfoService { inner, signing_key }))
+    }
+}
+
+#[cfg(test)]
+pub(crate) fn test_signing_service() -> Arc<dyn PathInfoService> {
+    let memory_svc: Arc<dyn PathInfoService> = Arc::new(MemoryPathInfoService::default());
+    Arc::new(SigningPathInfoService {
+        inner: memory_svc,
+        signing_key: Arc::new(
+            parse_keypair(DUMMY_KEYPAIR)
+                .expect("DUMMY_KEYPAIR to be valid")
+                .0,
+        ),
+    })
+}
+
+#[cfg(test)]
+pub const DUMMY_KEYPAIR: &str = "do.not.use:sGPzxuK5WvWPraytx+6sjtaff866sYlfvErE6x0hFEhy5eqe7OVZ8ZMqZ/ME/HaRdKGNGvJkyGKXYTaeA6lR3A==";
+#[cfg(test)]
+pub const DUMMY_VERIFYING_KEY: &str = "do.not.use:cuXqnuzlWfGTKmfzBPx2kXShjRryZMhil2E2ngOpUdw=";
+
+#[cfg(test)]
+mod test {
+    use crate::{
+        pathinfoservice::PathInfoService,
+        proto::PathInfo,
+        tests::fixtures::{DUMMY_PATH, PATH_INFO_WITH_NARINFO},
+    };
+    use nix_compat::narinfo::VerifyingKey;
+
+    use lazy_static::lazy_static;
+    use nix_compat::store_path::StorePath;
+
+    lazy_static! {
+        static ref PATHINFO_1: PathInfo = PATH_INFO_WITH_NARINFO.clone();
+        static ref PATHINFO_1_DIGEST: [u8; 20] = [0; 20];
+    }
+
+    #[tokio::test]
+    async fn put_and_verify_signature() {
+        let svc = super::test_signing_service();
+
+        // pathinfo_1 should not be there ...
+        assert!(svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .is_none());
+
+        // ... and not be signed
+        assert!(PATHINFO_1.narinfo.clone().unwrap().signatures.is_empty());
+
+        // insert it
+        svc.put(PATHINFO_1.clone()).await.expect("no error");
+
+        // now it should be there ...
+        let signed = svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .unwrap();
+
+        // and signed
+        let narinfo = signed
+            .to_narinfo(
+                StorePath::from_bytes(DUMMY_PATH.as_bytes()).expect("DUMMY_PATH to be parsed"),
+            )
+            .expect("no error");
+        let fp = narinfo.fingerprint();
+
+        // load our keypair from the fixtures
+        let (signing_key, _verifying_key) =
+            super::parse_keypair(super::DUMMY_KEYPAIR).expect("must succeed");
+
+        // ensure the signature is added
+        let new_sig = narinfo
+            .signatures
+            .last()
+            .expect("The retrieved narinfo to be signed");
+        assert_eq!(signing_key.name(), *new_sig.name());
+
+        // verify the new signature against the verifying key
+        let verifying_key =
+            VerifyingKey::parse(super::DUMMY_VERIFYING_KEY).expect("parsing dummy verifying key");
+
+        assert!(
+            verifying_key.verify(&fp, new_sig),
+            "expect signature to be valid"
+        );
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index eb3cf2ff1b88..837eb9d079e1 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -5,9 +5,10 @@ use futures::stream::BoxStream;
 use nix_compat::nixbase32;
 use prost::Message;
 use std::path::Path;
+use std::sync::Arc;
 use tonic::async_trait;
-use tracing::instrument;
-use tracing::warn;
+use tracing::{instrument, warn};
+use tvix_castore::composition::{CompositionContext, ServiceBuilder};
 use tvix_castore::Error;
 
 /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
@@ -20,6 +21,12 @@ pub struct SledPathInfoService {
 
 impl SledPathInfoService {
     pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
+        if p.as_ref() == Path::new("/") {
+            return Err(sled::Error::Unsupported(
+                "cowardly refusing to open / with sled".to_string(),
+            ));
+        }
+
         let config = sled::Config::default()
             .use_compression(false) // is a required parameter
             .path(p);
@@ -115,3 +122,69 @@ impl PathInfoService for SledPathInfoService {
         })
     }
 }
+
+#[derive(serde::Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct SledPathInfoServiceConfig {
+    is_temporary: bool,
+    #[serde(default)]
+    /// required when is_temporary = false
+    path: Option<String>,
+}
+
+impl TryFrom<url::Url> for SledPathInfoServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        // sled doesn't support host, and a path can be provided (otherwise
+        // it'll live in memory only).
+        if url.has_host() {
+            return Err(Error::StorageError("no host allowed".to_string()).into());
+        }
+
+        // TODO: expose compression and other parameters as URL parameters?
+
+        Ok(if url.path().is_empty() {
+            SledPathInfoServiceConfig {
+                is_temporary: true,
+                path: None,
+            }
+        } else {
+            SledPathInfoServiceConfig {
+                is_temporary: false,
+                path: Some(url.path().to_string()),
+            }
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for SledPathInfoServiceConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        match self {
+            SledPathInfoServiceConfig {
+                is_temporary: true,
+                path: None,
+            } => Ok(Arc::new(SledPathInfoService::new_temporary()?)),
+            SledPathInfoServiceConfig {
+                is_temporary: true,
+                path: Some(_),
+            } => Err(
+                Error::StorageError("Temporary SledPathInfoService can not have path".into())
+                    .into(),
+            ),
+            SledPathInfoServiceConfig {
+                is_temporary: false,
+                path: None,
+            } => Err(Error::StorageError("SledPathInfoService is missing path".into()).into()),
+            SledPathInfoServiceConfig {
+                is_temporary: false,
+                path: Some(path),
+            } => Ok(Arc::new(SledPathInfoService::new(path)?)),
+        }
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs
index 26166d1b75ca..26e3a1f311e4 100644
--- a/tvix/store/src/pathinfoservice/tests/mod.rs
+++ b/tvix/store/src/pathinfoservice/tests/mod.rs
@@ -2,64 +2,38 @@
 //! We use [rstest] and [rstest_reuse] to provide all services we want to test
 //! against, and then apply this template to all test functions.
 
+use futures::TryStreamExt;
 use rstest::*;
 use rstest_reuse::{self, *};
-use std::sync::Arc;
-use tvix_castore::proto as castorepb;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 
 use super::PathInfoService;
+use crate::pathinfoservice::redb::RedbPathInfoService;
+use crate::pathinfoservice::MemoryPathInfoService;
+use crate::pathinfoservice::SledPathInfoService;
 use crate::proto::PathInfo;
 use crate::tests::fixtures::DUMMY_PATH_DIGEST;
+use tvix_castore::proto as castorepb;
+
+use crate::pathinfoservice::test_signing_service;
 
 mod utils;
 pub use self::utils::make_grpc_path_info_service_client;
 
-/// Convenience type alias batching all three servives together.
-#[allow(clippy::upper_case_acronyms)]
-type BSDSPS = (
-    Arc<dyn BlobService>,
-    Arc<dyn DirectoryService>,
-    Box<dyn PathInfoService>,
-);
-
-/// Creates a PathInfoService using a new Memory{Blob,Directory}Service.
-/// We return a 3-tuple containing all of them, as some tests want to interact
-/// with all three.
-pub async fn make_path_info_service(uri: &str) -> BSDSPS {
-    let blob_service: Arc<dyn BlobService> = tvix_castore::blobservice::from_addr("memory://")
-        .await
-        .unwrap()
-        .into();
-    let directory_service: Arc<dyn DirectoryService> =
-        tvix_castore::directoryservice::from_addr("memory://")
-            .await
-            .unwrap()
-            .into();
-
-    (
-        blob_service.clone(),
-        directory_service.clone(),
-        crate::pathinfoservice::from_addr(uri, blob_service, directory_service)
-            .await
-            .unwrap(),
-    )
-}
+#[cfg(all(feature = "cloud", feature = "integration"))]
+use self::utils::make_bigtable_path_info_service;
 
 #[template]
 #[rstest]
-#[case::memory(make_path_info_service("memory://").await)]
-#[case::grpc(make_grpc_path_info_service_client().await)]
-#[case::sled(make_path_info_service("sled://").await)]
-#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_path_info_service("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await))]
-pub fn path_info_services(
-    #[case] services: (
-        impl BlobService,
-        impl DirectoryService,
-        impl PathInfoService,
-    ),
-) {
-}
+#[case::memory(MemoryPathInfoService::default())]
+#[case::grpc({
+    let (_, _, svc) = make_grpc_path_info_service_client().await;
+    svc
+})]
+#[case::sled(SledPathInfoService::new_temporary().unwrap())]
+#[case::redb(RedbPathInfoService::new_temporary().unwrap())]
+#[case::signing(test_signing_service())]
+#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))]
+pub fn path_info_services(#[case] svc: impl PathInfoService) {}
 
 // FUTUREWORK: add more tests rejecting invalid PathInfo messages.
 // A subset of them should also ensure references to other PathInfos, or
@@ -68,9 +42,8 @@ pub fn path_info_services(
 /// Trying to get a non-existent PathInfo should return Ok(None).
 #[apply(path_info_services)]
 #[tokio::test]
-async fn not_found(services: BSDSPS) {
-    let (_, _, path_info_service) = services;
-    assert!(path_info_service
+async fn not_found(svc: impl PathInfoService) {
+    assert!(svc
         .get(DUMMY_PATH_DIGEST)
         .await
         .expect("must succeed")
@@ -80,9 +53,7 @@ async fn not_found(services: BSDSPS) {
 /// Put a PathInfo into the store, get it back.
 #[apply(path_info_services)]
 #[tokio::test]
-async fn put_get(services: BSDSPS) {
-    let (_, _, path_info_service) = services;
-
+async fn put_get(svc: impl PathInfoService) {
     let path_info = PathInfo {
         node: Some(castorepb::Node {
             node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode {
@@ -94,20 +65,21 @@ async fn put_get(services: BSDSPS) {
     };
 
     // insert
-    let resp = path_info_service
-        .put(path_info.clone())
-        .await
-        .expect("must succeed");
+    let resp = svc.put(path_info.clone()).await.expect("must succeed");
 
     // expect the returned PathInfo to be equal (for now)
     // in the future, some stores might add additional fields/signatures.
     assert_eq!(path_info, resp);
 
     // get it back
-    let resp = path_info_service
-        .get(DUMMY_PATH_DIGEST)
-        .await
-        .expect("must succeed");
+    let resp = svc.get(DUMMY_PATH_DIGEST).await.expect("must succeed");
+
+    assert_eq!(Some(path_info.clone()), resp);
+
+    // Ensure the listing endpoint works, and returns the same path_info.
+    // FUTUREWORK: split this, some impls might (rightfully) not support listing
+    let pathinfos: Vec<PathInfo> = svc.list().try_collect().await.expect("must succeed");
 
-    assert_eq!(Some(path_info), resp);
+    // We should get a single pathinfo back, the one we inserted.
+    assert_eq!(vec![path_info], pathinfos);
 }
diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs
index 30c5902b610f..8b192e303b89 100644
--- a/tvix/store/src/pathinfoservice/tests/utils.rs
+++ b/tvix/store/src/pathinfoservice/tests/utils.rs
@@ -1,6 +1,8 @@
 use std::sync::Arc;
 
+use hyper_util::rt::TokioIo;
 use tonic::transport::{Endpoint, Server, Uri};
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 
 use crate::{
     nar::{NarCalculationService, SimpleRenderer},
@@ -15,7 +17,11 @@ use crate::{
 /// Constructs and returns a gRPC PathInfoService.
 /// We also return memory-based {Blob,Directory}Service,
 /// as the consumer of this function accepts a 3-tuple.
-pub async fn make_grpc_path_info_service_client() -> super::BSDSPS {
+pub async fn make_grpc_path_info_service_client() -> (
+    impl BlobService,
+    impl DirectoryService,
+    GRPCPathInfoService<tonic::transport::Channel>,
+) {
     let (left, right) = tokio::io::duplex(64);
 
     let blob_service = blob_service();
@@ -47,18 +53,27 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS {
     // Create a client, connecting to the right side. The URI is unused.
     let mut maybe_right = Some(right);
 
-    let path_info_service = Box::new(GRPCPathInfoService::from_client(
-        PathInfoServiceClient::new(
-            Endpoint::try_from("http://[::]:50051")
-                .unwrap()
-                .connect_with_connector(tower::service_fn(move |_: Uri| {
-                    let right = maybe_right.take().unwrap();
-                    async move { Ok::<_, std::io::Error>(right) }
-                }))
-                .await
-                .unwrap(),
-        ),
+    let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(TokioIo::new(right)) }
+            }))
+            .await
+            .unwrap(),
     ));
 
     (blob_service, directory_service, path_info_service)
 }
+
+#[cfg(all(feature = "cloud", feature = "integration"))]
+pub(crate) async fn make_bigtable_path_info_service(
+) -> crate::pathinfoservice::BigtablePathInfoService {
+    use crate::pathinfoservice::bigtable::BigtableParameters;
+    use crate::pathinfoservice::BigtablePathInfoService;
+
+    BigtablePathInfoService::connect(BigtableParameters::default_for_tests())
+        .await
+        .unwrap()
+}
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 68f557567629..60da73012df7 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -74,24 +74,19 @@ where
         &self,
         request: Request<castorepb::Node>,
     ) -> Result<Response<proto::CalculateNarResponse>> {
-        match request.into_inner().node {
-            None => Err(Status::invalid_argument("no root node sent")),
-            Some(root_node) => {
-                if let Err(e) = root_node.validate() {
-                    warn!(err = %e, "invalid root node");
-                    Err(Status::invalid_argument("invalid root node"))?
-                }
+        let (_, root_node) = request.into_inner().into_name_and_node().map_err(|e| {
+            warn!(err = %e, "invalid root node");
+            Status::invalid_argument("invalid root node")
+        })?;
 
-                match self.nar_calculation_service.calculate_nar(&root_node).await {
-                    Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
-                        nar_size,
-                        nar_sha256: nar_sha256.to_vec().into(),
-                    })),
-                    Err(e) => {
-                        warn!(err = %e, "error during NAR calculation");
-                        Err(e.into())
-                    }
-                }
+        match self.nar_calculation_service.calculate_nar(&root_node).await {
+            Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
+                nar_size,
+                nar_sha256: nar_sha256.to_vec().into(),
+            })),
+            Err(e) => {
+                warn!(err = %e, "error during NAR calculation");
+                Err(e.into())
             }
         }
     }
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
index a09839c8bdf9..f3ea4b196946 100644
--- a/tvix/store/src/proto/mod.rs
+++ b/tvix/store/src/proto/mod.rs
@@ -9,7 +9,7 @@ use nix_compat::{
     store_path::{self, StorePathRef},
 };
 use thiserror::Error;
-use tvix_castore::proto::{self as castorepb, NamedNode, ValidateNodeError};
+use tvix_castore::DirectoryError;
 
 mod grpc_pathinfoservice_wrapper;
 
@@ -39,7 +39,7 @@ pub enum ValidatePathInfoError {
 
     /// Node fails validation
     #[error("Invalid root node: {:?}", .0.to_string())]
-    InvalidRootNode(ValidateNodeError),
+    InvalidRootNode(DirectoryError),
 
     /// Invalid node name encountered. Root nodes in PathInfos have more strict name requirements
     #[error("Failed to parse {} as StorePath: {1}", .0.to_str_lossy())]
@@ -87,7 +87,7 @@ impl PathInfo {
     /// validate performs some checks on the PathInfo struct,
     /// Returning either a [store_path::StorePath] of the root node, or a
     /// [ValidatePathInfoError].
-    pub fn validate(&self) -> Result<store_path::StorePathRef<'_>, ValidatePathInfoError> {
+    pub fn validate(&self) -> Result<store_path::StorePath<String>, ValidatePathInfoError> {
         // ensure the references have the right number of bytes.
         for (i, reference) in self.references.iter().enumerate() {
             if reference.len() != store_path::DIGEST_SIZE {
@@ -118,7 +118,7 @@ impl PathInfo {
             // parse references in reference_names.
             for (i, reference_name_str) in narinfo.reference_names.iter().enumerate() {
                 // ensure thy parse as (non-absolute) store path
-                let reference_names_store_path = store_path::StorePath::from_bytes(
+                let reference_names_store_path = store_path::StorePathRef::from_bytes(
                     reference_name_str.as_bytes(),
                 )
                 .map_err(|_| {
@@ -158,14 +158,20 @@ impl PathInfo {
 
         // Ensure there is a (root) node present, and it properly parses to a [store_path::StorePath].
         let root_nix_path = match &self.node {
-            None | Some(castorepb::Node { node: None }) => {
-                Err(ValidatePathInfoError::NoNodePresent)?
-            }
-            Some(castorepb::Node { node: Some(node) }) => {
-                node.validate()
+            None => Err(ValidatePathInfoError::NoNodePresent)?,
+            Some(node) => {
+                // NOTE: We could have some PathComponent not allocating here,
+                // so this can return StorePathRef.
+                // However, as this will get refactored away to stricter types
+                // soon anyways, there's no point.
+                let (name, _node) = node
+                    .clone()
+                    .into_name_and_node()
                     .map_err(ValidatePathInfoError::InvalidRootNode)?;
+
                 // parse the name of the node itself and return
-                parse_node_name_root(node.get_name(), ValidatePathInfoError::InvalidNodeName)?
+                parse_node_name_root(name.as_ref(), ValidatePathInfoError::InvalidNodeName)?
+                    .to_owned()
             }
         };
 
@@ -219,7 +225,7 @@ impl PathInfo {
                 .signatures
                 .iter()
                 .map(|sig| {
-                    nix_compat::narinfo::Signature::new(
+                    nix_compat::narinfo::SignatureRef::new(
                         &sig.name,
                         // This shouldn't pass validation
                         sig.data[..].try_into().expect("invalid signature len"),
@@ -260,24 +266,19 @@ impl TryFrom<&nar_info::Ca> for nix_compat::nixhash::CAHash {
 
     fn try_from(value: &nar_info::Ca) -> Result<Self, Self::Error> {
         Ok(match value.r#type {
-            typ if typ == nar_info::ca::Hash::FlatMd5 as i32 => {
-                Self::Flat(NixHash::Md5(value.digest[..].try_into().map_err(|_| {
-                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatMd5")
-                })?))
-            }
-            typ if typ == nar_info::ca::Hash::FlatSha1 as i32 => {
-                Self::Flat(NixHash::Sha1(value.digest[..].try_into().map_err(
-                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha1"),
+            typ if typ == nar_info::ca::Hash::NarSha256 as i32 => {
+                Self::Nar(NixHash::Sha256(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha256"),
                 )?))
             }
-            typ if typ == nar_info::ca::Hash::FlatSha256 as i32 => {
-                Self::Flat(NixHash::Sha256(value.digest[..].try_into().map_err(
-                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha256"),
+            typ if typ == nar_info::ca::Hash::NarSha1 as i32 => {
+                Self::Nar(NixHash::Sha1(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha1"),
                 )?))
             }
-            typ if typ == nar_info::ca::Hash::FlatSha512 as i32 => Self::Flat(NixHash::Sha512(
+            typ if typ == nar_info::ca::Hash::NarSha512 as i32 => Self::Nar(NixHash::Sha512(
                 Box::new(value.digest[..].try_into().map_err(|_| {
-                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha512")
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha512")
                 })?),
             )),
             typ if typ == nar_info::ca::Hash::NarMd5 as i32 => {
@@ -285,19 +286,29 @@ impl TryFrom<&nar_info::Ca> for nix_compat::nixhash::CAHash {
                     ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarMd5")
                 })?))
             }
-            typ if typ == nar_info::ca::Hash::NarSha1 as i32 => {
-                Self::Nar(NixHash::Sha1(value.digest[..].try_into().map_err(
-                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha1"),
+            typ if typ == nar_info::ca::Hash::TextSha256 as i32 => {
+                Self::Text(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "TextSha256")
+                })?)
+            }
+            typ if typ == nar_info::ca::Hash::FlatSha1 as i32 => {
+                Self::Flat(NixHash::Sha1(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha1"),
                 )?))
             }
-            typ if typ == nar_info::ca::Hash::NarSha256 as i32 => {
-                Self::Nar(NixHash::Sha256(value.digest[..].try_into().map_err(
-                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha256"),
+            typ if typ == nar_info::ca::Hash::FlatMd5 as i32 => {
+                Self::Flat(NixHash::Md5(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatMd5")
+                })?))
+            }
+            typ if typ == nar_info::ca::Hash::FlatSha256 as i32 => {
+                Self::Flat(NixHash::Sha256(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha256"),
                 )?))
             }
-            typ if typ == nar_info::ca::Hash::NarSha512 as i32 => Self::Nar(NixHash::Sha512(
+            typ if typ == nar_info::ca::Hash::FlatSha512 as i32 => Self::Flat(NixHash::Sha512(
                 Box::new(value.digest[..].try_into().map_err(|_| {
-                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha512")
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha512")
                 })?),
             )),
             typ => return Err(ConvertCAError::UnknownHashType(typ)),
@@ -349,7 +360,7 @@ impl From<&nix_compat::narinfo::NarInfo<'_>> for NarInfo {
             signatures,
             reference_names: value.references.iter().map(|r| r.to_string()).collect(),
             deriver: value.deriver.as_ref().map(|sp| StorePath {
-                name: sp.name().to_owned(),
+                name: (*sp.name()).to_owned(),
                 digest: Bytes::copy_from_slice(sp.digest()),
             }),
             ca: value.ca.as_ref().map(|ca| ca.into()),
diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs
index 4d0834878d7c..f5ecc798bbfb 100644
--- a/tvix/store/src/proto/tests/pathinfo.rs
+++ b/tvix/store/src/proto/tests/pathinfo.rs
@@ -3,17 +3,18 @@ use crate::tests::fixtures::*;
 use bytes::Bytes;
 use data_encoding::BASE64;
 use nix_compat::nixbase32;
-use nix_compat::store_path::{self, StorePathRef};
+use nix_compat::store_path::{self, StorePath, StorePathRef};
 use rstest::rstest;
 use tvix_castore::proto as castorepb;
+use tvix_castore::{DirectoryError, ValidateNodeError};
 
 #[rstest]
 #[case::no_node(None, Err(ValidatePathInfoError::NoNodePresent))]
-#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::NoNodePresent))]
+#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::InvalidRootNode(DirectoryError::NoNodeSet)))]
 
 fn validate_pathinfo(
     #[case] node: Option<castorepb::Node>,
-    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+    #[case] exp_result: Result<StorePath<String>, ValidatePathInfoError>,
 ) {
     // construct the PathInfo object
     let p = PathInfo {
@@ -22,9 +23,6 @@ fn validate_pathinfo(
     };
 
     assert_eq!(exp_result, p.validate());
-
-    let err = p.validate().expect_err("validation should fail");
-    assert!(matches!(err, ValidatePathInfoError::NoNodePresent));
 }
 
 #[rstest]
@@ -32,12 +30,12 @@ fn validate_pathinfo(
         name: DUMMY_PATH.into(),
         digest: DUMMY_DIGEST.clone().into(),
         size: 0,
-}, Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))]
+}, Ok(StorePath::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))]
 #[case::invalid_digest_length(castorepb::DirectoryNode {
         name: DUMMY_PATH.into(),
         digest: Bytes::new(),
         size: 0,
-}, Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))))]
+}, Err(ValidatePathInfoError::InvalidRootNode(DirectoryError::InvalidNode(DUMMY_PATH.into(), ValidateNodeError::InvalidDigestLen(0)))))]
 #[case::invalid_node_name_no_storepath(castorepb::DirectoryNode {
         name: "invalid".into(),
         digest: DUMMY_DIGEST.clone().into(),
@@ -48,7 +46,7 @@ fn validate_pathinfo(
 )))]
 fn validate_directory(
     #[case] directory_node: castorepb::DirectoryNode,
-    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+    #[case] exp_result: Result<StorePath<String>, ValidatePathInfoError>,
 ) {
     // construct the PathInfo object
     let p = PathInfo {
@@ -68,7 +66,7 @@ fn validate_directory(
         size: 0,
         executable: false,
     },
-    Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
+    Ok(StorePath::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
 )]
 #[case::invalid_digest_len(
     castorepb::FileNode {
@@ -76,7 +74,7 @@ fn validate_directory(
         digest: Bytes::new(),
         ..Default::default()
     },
-    Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0)))
+    Err(ValidatePathInfoError::InvalidRootNode(DirectoryError::InvalidNode(DUMMY_PATH.into(), ValidateNodeError::InvalidDigestLen(0))))
 )]
 #[case::invalid_node_name(
     castorepb::FileNode {
@@ -91,7 +89,7 @@ fn validate_directory(
 )]
 fn validate_file(
     #[case] file_node: castorepb::FileNode,
-    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+    #[case] exp_result: Result<StorePath<String>, ValidatePathInfoError>,
 ) {
     // construct the PathInfo object
     let p = PathInfo {
@@ -109,7 +107,7 @@ fn validate_file(
         name: DUMMY_PATH.into(),
         target: "foo".into(),
     },
-    Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
+    Ok(StorePath::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
 )]
 #[case::invalid_node_name(
     castorepb::SymlinkNode {
@@ -123,7 +121,7 @@ fn validate_file(
 )]
 fn validate_symlink(
     #[case] symlink_node: castorepb::SymlinkNode,
-    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+    #[case] exp_result: Result<StorePath<String>, ValidatePathInfoError>,
 ) {
     // construct the PathInfo object
     let p = PathInfo {
@@ -228,24 +226,28 @@ fn validate_inconsistent_narinfo_reference_name_digest() {
 /// Create a node with an empty symlink target, and ensure it fails validation.
 #[test]
 fn validate_symlink_empty_target_invalid() {
-    let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode {
-        name: "foo".into(),
-        target: "".into(),
-    });
-
-    node.validate().expect_err("must fail validation");
+    castorepb::Node {
+        node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+            name: "foo".into(),
+            target: "".into(),
+        })),
+    }
+    .into_name_and_node()
+    .expect_err("must fail validation");
 }
 
 /// Create a node with a symlink target including null bytes, and ensure it
 /// fails validation.
 #[test]
 fn validate_symlink_target_null_byte_invalid() {
-    let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode {
-        name: "foo".into(),
-        target: "foo\0".into(),
-    });
-
-    node.validate().expect_err("must fail validation");
+    castorepb::Node {
+        node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+            name: "foo".into(),
+            target: "foo\0".into(),
+        })),
+    }
+    .into_name_and_node()
+    .expect_err("must fail validation");
 }
 
 /// Create a PathInfo with a correct deriver field and ensure it succeeds.
diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs
index 1c8359a2c0c7..48cc365365a9 100644
--- a/tvix/store/src/tests/fixtures.rs
+++ b/tvix/store/src/tests/fixtures.rs
@@ -1,11 +1,13 @@
 use lazy_static::lazy_static;
-use rstest::*;
+use rstest::{self, *};
+use rstest_reuse::*;
+use std::io;
 use std::sync::Arc;
 pub use tvix_castore::fixtures::*;
 use tvix_castore::{
     blobservice::{BlobService, MemoryBlobService},
     directoryservice::{DirectoryService, MemoryDirectoryService},
-    proto as castorepb,
+    proto as castorepb, Node,
 };
 
 use crate::proto::{
@@ -17,6 +19,10 @@ pub const DUMMY_PATH: &str = "00000000000000000000000000000000-dummy";
 pub const DUMMY_PATH_DIGEST: [u8; 20] = [0; 20];
 
 lazy_static! {
+    pub static ref CASTORE_NODE_SYMLINK: Node = Node::Symlink {
+        target: "/nix/store/somewhereelse".try_into().unwrap(),
+    };
+
     /// The NAR representation of a symlink pointing to `/nix/store/somewhereelse`
     pub static ref NAR_CONTENTS_SYMLINK: Vec<u8> = vec![
         13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0,
@@ -31,6 +37,12 @@ lazy_static! {
         1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")"
     ];
 
+    pub static ref CASTORE_NODE_HELLOWORLD: Node = Node::File {
+        digest: HELLOWORLD_BLOB_DIGEST.clone(),
+        size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+        executable: false,
+    };
+
     /// The NAR representation of a regular file with the contents "Hello World!"
     pub static ref NAR_CONTENTS_HELLOWORLD: Vec<u8> = vec![
         13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0,
@@ -44,6 +56,22 @@ lazy_static! {
         1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")"
     ];
 
+    pub static ref CASTORE_NODE_TOO_BIG: Node = Node::File {
+        digest: HELLOWORLD_BLOB_DIGEST.clone(),
+        size: 42, // <- note the wrong size here!
+        executable: false,
+    };
+    pub static ref CASTORE_NODE_TOO_SMALL: Node = Node::File {
+        digest: HELLOWORLD_BLOB_DIGEST.clone(),
+        size: 2, // <- note the wrong size here!
+        executable: false,
+    };
+
+    pub static ref CASTORE_NODE_COMPLICATED: Node = Node::Directory {
+        digest: DIRECTORY_COMPLICATED.digest(),
+        size: DIRECTORY_COMPLICATED.size(),
+    };
+
     /// The NAR representation of a more complicated directory structure.
     pub static ref NAR_CONTENTS_COMPLICATED: Vec<u8> = vec![
         13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0,
@@ -137,6 +165,46 @@ pub(crate) fn blob_service() -> Arc<dyn BlobService> {
 }
 
 #[fixture]
+pub(crate) async fn blob_service_with_contents() -> Arc<dyn BlobService> {
+    let blob_service = Arc::from(MemoryBlobService::default());
+    for (blob_contents, blob_digest) in [
+        (EMPTY_BLOB_CONTENTS, &*EMPTY_BLOB_DIGEST),
+        (HELLOWORLD_BLOB_CONTENTS, &*HELLOWORLD_BLOB_DIGEST),
+    ] {
+        // put all data into the stores.
+        // insert blob into the store
+        let mut writer = blob_service.open_write().await;
+        tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut writer)
+            .await
+            .unwrap();
+        assert_eq!(blob_digest.clone(), writer.close().await.unwrap());
+    }
+    blob_service
+}
+
+#[fixture]
 pub(crate) fn directory_service() -> Arc<dyn DirectoryService> {
     Arc::from(MemoryDirectoryService::default())
 }
+
+#[fixture]
+pub(crate) async fn directory_service_with_contents() -> Arc<dyn DirectoryService> {
+    let directory_service = Arc::from(MemoryDirectoryService::default());
+    for directory in [&*DIRECTORY_WITH_KEEP, &*DIRECTORY_COMPLICATED] {
+        directory_service.put(directory.clone()).await.unwrap();
+    }
+    directory_service
+}
+
+#[template]
+#[rstest]
+#[case::symlink    (&*CASTORE_NODE_SYMLINK,     Ok(Ok(&*NAR_CONTENTS_SYMLINK)))]
+#[case::helloworld (&*CASTORE_NODE_HELLOWORLD,  Ok(Ok(&*NAR_CONTENTS_HELLOWORLD)))]
+#[case::too_big    (&*CASTORE_NODE_TOO_BIG,     Ok(Err(io::ErrorKind::UnexpectedEof)))]
+#[case::too_small  (&*CASTORE_NODE_TOO_SMALL,   Ok(Err(io::ErrorKind::InvalidInput)))]
+#[case::complicated(&*CASTORE_NODE_COMPLICATED, Ok(Ok(&*NAR_CONTENTS_COMPLICATED)))]
+fn castore_fixtures_template(
+    #[case] test_input: &Node,
+    #[case] test_output: Result<Result<&Vec<u8>, io::ErrorKind>, crate::nar::RenderError>,
+) {
+}
diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs
index 1e7fc3f6b451..c7e6fee0cc6c 100644
--- a/tvix/store/src/tests/mod.rs
+++ b/tvix/store/src/tests/mod.rs
@@ -1,2 +1,3 @@
 pub mod fixtures;
 mod nar_renderer;
+mod nar_renderer_seekable;
diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs
index 8bfb5a72bb2f..6314ba6ccc49 100644
--- a/tvix/store/src/tests/nar_renderer.rs
+++ b/tvix/store/src/tests/nar_renderer.rs
@@ -1,40 +1,13 @@
-use crate::nar::calculate_size_and_sha256;
 use crate::nar::write_nar;
-use crate::tests::fixtures::blob_service;
-use crate::tests::fixtures::directory_service;
 use crate::tests::fixtures::*;
 use rstest::*;
-use sha2::{Digest, Sha256};
+use rstest_reuse::*;
 use std::io;
 use std::sync::Arc;
 use tokio::io::sink;
 use tvix_castore::blobservice::BlobService;
 use tvix_castore::directoryservice::DirectoryService;
-use tvix_castore::proto as castorepb;
-
-#[rstest]
-#[tokio::test]
-async fn single_symlink(
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-) {
-    let mut buf: Vec<u8> = vec![];
-
-    write_nar(
-        &mut buf,
-        &castorepb::node::Node::Symlink(castorepb::SymlinkNode {
-            name: "doesntmatter".into(),
-            target: "/nix/store/somewhereelse".into(),
-        }),
-        // don't put anything in the stores, as we don't actually do any requests.
-        blob_service,
-        directory_service,
-    )
-    .await
-    .expect("must succeed");
-
-    assert_eq!(buf, NAR_CONTENTS_SYMLINK.to_vec());
-}
+use tvix_castore::Node;
 
 /// Make sure the NARRenderer fails if a referred blob doesn't exist.
 #[rstest]
@@ -45,12 +18,7 @@ async fn single_file_missing_blob(
 ) {
     let e = write_nar(
         sink(),
-        &castorepb::node::Node::File(castorepb::FileNode {
-            name: "doesntmatter".into(),
-            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-            size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
-            executable: false,
-        }),
+        &CASTORE_NODE_HELLOWORLD,
         // the blobservice is empty intentionally, to provoke the error.
         blob_service,
         directory_service,
@@ -66,163 +34,44 @@ async fn single_file_missing_blob(
     }
 }
 
-/// Make sure the NAR Renderer fails if the returned blob meta has another size
-/// than specified in the proto node.
-#[rstest]
-#[tokio::test]
-async fn single_file_wrong_blob_size(
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-) {
-    // insert blob into the store
-    let mut writer = blob_service.open_write().await;
-    tokio::io::copy(
-        &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()),
-        &mut writer,
-    )
-    .await
-    .unwrap();
-    assert_eq!(
-        HELLOWORLD_BLOB_DIGEST.clone(),
-        writer.close().await.unwrap()
-    );
-
-    // Test with a root FileNode of a too big size
-    let e = write_nar(
-        sink(),
-        &castorepb::node::Node::File(castorepb::FileNode {
-            name: "doesntmatter".into(),
-            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-            size: 42, // <- note the wrong size here!
-            executable: false,
-        }),
-        blob_service.clone(),
-        directory_service.clone(),
-    )
-    .await
-    .expect_err("must fail");
-
-    match e {
-        crate::nar::RenderError::NARWriterError(e) => {
-            assert_eq!(io::ErrorKind::UnexpectedEof, e.kind());
-        }
-        _ => panic!("unexpected error: {:?}", e),
-    }
-
-    // Test with a root FileNode of a too small size
-    let e = write_nar(
-        sink(),
-        &castorepb::node::Node::File(castorepb::FileNode {
-            name: "doesntmatter".into(),
-            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-            size: 2, // <- note the wrong size here!
-            executable: false,
-        }),
-        blob_service,
-        directory_service,
-    )
-    .await
-    .expect_err("must fail");
-
-    match e {
-        crate::nar::RenderError::NARWriterError(e) => {
-            assert_eq!(io::ErrorKind::InvalidInput, e.kind());
-        }
-        _ => panic!("unexpected error: {:?}", e),
-    }
-}
-
-#[rstest]
-#[tokio::test]
-async fn single_file(
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-) {
-    // insert blob into the store
-    let mut writer = blob_service.open_write().await;
-    tokio::io::copy(&mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS), &mut writer)
-        .await
-        .unwrap();
-
-    assert_eq!(
-        HELLOWORLD_BLOB_DIGEST.clone(),
-        writer.close().await.unwrap()
-    );
-
-    let mut buf: Vec<u8> = vec![];
-
-    write_nar(
-        &mut buf,
-        &castorepb::node::Node::File(castorepb::FileNode {
-            name: "doesntmatter".into(),
-            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
-            size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
-            executable: false,
-        }),
-        blob_service,
-        directory_service,
-    )
-    .await
-    .expect("must succeed");
-
-    assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec());
-}
-
-#[rstest]
+#[apply(castore_fixtures_template)]
 #[tokio::test]
-async fn test_complicated(
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
+async fn seekable(
+    #[future] blob_service_with_contents: Arc<dyn BlobService>,
+    #[future] directory_service_with_contents: Arc<dyn DirectoryService>,
+    #[case] test_input: &Node,
+    #[case] test_output: Result<Result<&Vec<u8>, io::ErrorKind>, crate::nar::RenderError>,
 ) {
-    // put all data into the stores.
-    // insert blob into the store
-    let mut writer = blob_service.open_write().await;
-    tokio::io::copy(&mut io::Cursor::new(EMPTY_BLOB_CONTENTS), &mut writer)
-        .await
-        .unwrap();
-    assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap());
-
-    // insert directories
-    directory_service
-        .put(DIRECTORY_WITH_KEEP.clone())
-        .await
-        .unwrap();
-    directory_service
-        .put(DIRECTORY_COMPLICATED.clone())
-        .await
-        .unwrap();
+    let blob_service = blob_service_with_contents.await;
+    let directory_service = directory_service_with_contents.await;
 
     let mut buf: Vec<u8> = vec![];
-
-    write_nar(
+    let read_result = write_nar(
         &mut buf,
-        &castorepb::node::Node::Directory(castorepb::DirectoryNode {
-            name: "doesntmatter".into(),
-            digest: DIRECTORY_COMPLICATED.digest().into(),
-            size: DIRECTORY_COMPLICATED.size(),
-        }),
-        blob_service.clone(),
-        directory_service.clone(),
-    )
-    .await
-    .expect("must succeed");
-
-    assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec());
-
-    // ensure calculate_nar does return the correct sha256 digest and sum.
-    let (nar_size, nar_digest) = calculate_size_and_sha256(
-        &castorepb::node::Node::Directory(castorepb::DirectoryNode {
-            name: "doesntmatter".into(),
-            digest: DIRECTORY_COMPLICATED.digest().into(),
-            size: DIRECTORY_COMPLICATED.size(),
-        }),
+        test_input,
+        // don't put anything in the stores, as we don't actually do any requests.
         blob_service,
         directory_service,
     )
-    .await
-    .expect("must succeed");
+    .await;
 
-    assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size);
-    let d = Sha256::digest(NAR_CONTENTS_COMPLICATED.clone());
-    assert_eq!(d.as_slice(), nar_digest);
+    match (read_result, test_output) {
+        (Ok(_), Err(_)) => panic!("creating reader should have failed but succeeded"),
+        (Ok(_), Ok(Err(_))) => panic!("creating reader should have failed but succeeded"),
+        (Err(err), Ok(Ok(_))) => {
+            panic!("creating reader should have succeeded but failed: {}", err)
+        }
+        (Err(reader_err), Err(expected_err)) => {
+            assert_eq!(format!("{}", reader_err), format!("{}", expected_err));
+        }
+        (Err(reader_err), Ok(Err(expected_err))) => {
+            let crate::nar::RenderError::NARWriterError(e) = reader_err else {
+                panic!("expected nar writer error")
+            };
+            assert_eq!(e.kind(), expected_err);
+        }
+        (Ok(_n), Ok(Ok(expected_read_result))) => {
+            assert_eq!(buf, expected_read_result.to_vec());
+        }
+    }
 }
diff --git a/tvix/store/src/tests/nar_renderer_seekable.rs b/tvix/store/src/tests/nar_renderer_seekable.rs
new file mode 100644
index 000000000000..233b95d0b036
--- /dev/null
+++ b/tvix/store/src/tests/nar_renderer_seekable.rs
@@ -0,0 +1,111 @@
+use crate::nar::seekable::Reader;
+use crate::tests::fixtures::blob_service_with_contents as blob_service;
+use crate::tests::fixtures::directory_service_with_contents as directory_service;
+use crate::tests::fixtures::*;
+use rstest::*;
+use rstest_reuse::*;
+use std::io;
+use std::pin::Pin;
+use std::sync::Arc;
+use tokio::io::{AsyncReadExt, AsyncSeek, AsyncSeekExt};
+use tvix_castore::blobservice::BlobService;
+use tvix_castore::directoryservice::DirectoryService;
+use tvix_castore::Node;
+
+#[apply(castore_fixtures_template)]
+#[tokio::test]
+async fn read_to_end(
+    #[future] blob_service: Arc<dyn BlobService>,
+    #[future] directory_service: Arc<dyn DirectoryService>,
+    #[case] test_input: &Node,
+    #[case] test_output: Result<Result<&Vec<u8>, io::ErrorKind>, crate::nar::RenderError>,
+) {
+    let reader_result = Reader::new(
+        test_input.clone(),
+        // don't put anything in the stores, as we don't actually do any requests.
+        blob_service.await,
+        directory_service.await,
+    )
+    .await;
+
+    match (reader_result, test_output) {
+        (Ok(_), Err(_)) => panic!("creating reader should have failed but succeeded"),
+        (Err(err), Ok(_)) => panic!("creating reader should have succeeded but failed: {}", err),
+        (Err(reader_err), Err(expected_err)) => {
+            assert_eq!(format!("{}", reader_err), format!("{}", expected_err));
+        }
+        (Ok(mut reader), Ok(expected_read_result)) => {
+            let mut buf: Vec<u8> = vec![];
+            let read_result = reader.read_to_end(&mut buf).await;
+
+            match (read_result, expected_read_result) {
+                (Ok(_), Err(_)) => panic!("read_to_end should have failed but succeeded"),
+                (Err(err), Ok(_)) => {
+                    panic!("read_to_end should have succeeded but failed: {}", err)
+                }
+                (Err(read_err), Err(expected_read_err)) => {
+                    assert_eq!(read_err.kind(), expected_read_err);
+                }
+                (Ok(_n), Ok(expected_read_result)) => {
+                    assert_eq!(buf, expected_read_result.to_vec());
+                }
+            }
+        }
+    }
+}
+
+#[rstest]
+#[tokio::test]
+/// Check that the Reader does not allow starting a seek while another seek is running
+/// If this is not prevented, it might lead to futures piling up on the heap
+async fn seek_twice(
+    #[future] blob_service: Arc<dyn BlobService>,
+    #[future] directory_service: Arc<dyn DirectoryService>,
+) {
+    let mut reader = Reader::new(
+        CASTORE_NODE_COMPLICATED.clone(),
+        // don't put anything in the stores, as we don't actually do any requests.
+        blob_service.await,
+        directory_service.await,
+    )
+    .await
+    .expect("must succeed");
+
+    Pin::new(&mut reader)
+        .start_seek(io::SeekFrom::Start(1))
+        .expect("must succeed");
+    let seek_err = Pin::new(&mut reader)
+        .start_seek(io::SeekFrom::Start(2))
+        .expect_err("must fail");
+
+    assert_eq!(seek_err.kind(), io::ErrorKind::Other);
+    assert_eq!(seek_err.to_string(), "Already seeking".to_string());
+}
+
+#[rstest]
+#[tokio::test]
+async fn seek(
+    #[future] blob_service: Arc<dyn BlobService>,
+    #[future] directory_service: Arc<dyn DirectoryService>,
+) {
+    let mut reader = Reader::new(
+        CASTORE_NODE_HELLOWORLD.clone(),
+        // don't put anything in the stores, as we don't actually do any requests.
+        blob_service.await,
+        directory_service.await,
+    )
+    .await
+    .expect("must succeed");
+
+    let mut buf = [0u8; 10];
+
+    for position in [
+        io::SeekFrom::Start(0x65), // Just before the file contents
+        io::SeekFrom::Start(0x68), // Seek back the file contents
+        io::SeekFrom::Start(0x70), // Just before the end of the file contents
+    ] {
+        let n = reader.seek(position).await.expect("seek") as usize;
+        reader.read_exact(&mut buf).await.expect("read_exact");
+        assert_eq!(NAR_CONTENTS_HELLOWORLD[n..n + 10], buf);
+    }
+}
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs
index e6e42f6ec4cf..1385ece39f8a 100644
--- a/tvix/store/src/utils.rs
+++ b/tvix/store/src/utils.rs
@@ -1,48 +1,214 @@
-use std::sync::Arc;
 use std::{
+    collections::HashMap,
     pin::Pin,
+    sync::Arc,
     task::{self, Poll},
 };
 use tokio::io::{self, AsyncWrite};
 
-use tvix_castore::{
-    blobservice::{self, BlobService},
-    directoryservice::{self, DirectoryService},
-};
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
+use url::Url;
 
+use crate::composition::{
+    with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG,
+};
 use crate::nar::{NarCalculationService, SimpleRenderer};
-use crate::pathinfoservice::{self, PathInfoService};
+use crate::pathinfoservice::PathInfoService;
+
+#[derive(serde::Deserialize, Default)]
+pub struct CompositionConfigs {
+    pub blobservices:
+        HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>,
+    pub directoryservices: HashMap<
+        String,
+        DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>>,
+    >,
+    pub pathinfoservices: HashMap<
+        String,
+        DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>,
+    >,
+}
+
+/// Provides a set clap arguments to configure tvix-[ca]store services.
+///
+/// This particular variant has defaults tailored for usecases accessing data
+/// directly locally, like the `tvix-store daemon` command.
+#[derive(clap::Parser, Clone)]
+pub struct ServiceUrls {
+    #[arg(
+        long,
+        env,
+        default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
+    )]
+    blob_service_addr: String,
+
+    #[arg(
+        long,
+        env,
+        default_value = "redb:///var/lib/tvix-store/directories.redb"
+    )]
+    directory_service_addr: String,
+
+    #[arg(long, env, default_value = "redb:///var/lib/tvix-store/pathinfo.redb")]
+    path_info_service_addr: String,
+
+    /// Path to a TOML file describing the way the services should be composed
+    /// Experimental because the format is not final.
+    /// If specified, the other service addrs are ignored.
+    #[cfg(feature = "xp-store-composition")]
+    #[arg(long, env)]
+    experimental_store_composition: Option<String>,
+}
+
+/// Provides a set clap arguments to configure tvix-[ca]store services.
+///
+/// This particular variant has defaults tailored for usecases accessing data
+/// from another running tvix daemon, via gRPC.
+#[derive(clap::Parser, Clone)]
+pub struct ServiceUrlsGrpc {
+    #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+    blob_service_addr: String,
+
+    #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+    directory_service_addr: String,
+
+    #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+    path_info_service_addr: String,
+
+    #[cfg(feature = "xp-store-composition")]
+    #[arg(long, env)]
+    experimental_store_composition: Option<String>,
+}
+
+/// Provides a set clap arguments to configure tvix-[ca]store services.
+///
+/// This particular variant has defaults tailored for usecases keeping all data
+/// in memory.
+/// It's currently used in tvix-cli, as we don't really care about persistency
+/// there yet, and using something else here might make some perf output harder
+/// to interpret.
+#[derive(clap::Parser, Clone)]
+pub struct ServiceUrlsMemory {
+    #[arg(long, env, default_value = "memory://")]
+    blob_service_addr: String,
+
+    #[arg(long, env, default_value = "memory://")]
+    directory_service_addr: String,
+
+    #[arg(long, env, default_value = "memory://")]
+    path_info_service_addr: String,
+
+    #[cfg(feature = "xp-store-composition")]
+    #[arg(long, env)]
+    experimental_store_composition: Option<String>,
+}
+
+impl From<ServiceUrlsGrpc> for ServiceUrls {
+    fn from(urls: ServiceUrlsGrpc) -> ServiceUrls {
+        ServiceUrls {
+            blob_service_addr: urls.blob_service_addr,
+            directory_service_addr: urls.directory_service_addr,
+            path_info_service_addr: urls.path_info_service_addr,
+            #[cfg(feature = "xp-store-composition")]
+            experimental_store_composition: urls.experimental_store_composition,
+        }
+    }
+}
+
+impl From<ServiceUrlsMemory> for ServiceUrls {
+    fn from(urls: ServiceUrlsMemory) -> ServiceUrls {
+        ServiceUrls {
+            blob_service_addr: urls.blob_service_addr,
+            directory_service_addr: urls.directory_service_addr,
+            path_info_service_addr: urls.path_info_service_addr,
+            #[cfg(feature = "xp-store-composition")]
+            experimental_store_composition: urls.experimental_store_composition,
+        }
+    }
+}
+
+pub async fn addrs_to_configs(
+    urls: impl Into<ServiceUrls>,
+) -> Result<CompositionConfigs, Box<dyn std::error::Error + Send + Sync>> {
+    let urls: ServiceUrls = urls.into();
+
+    #[cfg(feature = "xp-store-composition")]
+    if let Some(conf_path) = urls.experimental_store_composition {
+        let conf_text = tokio::fs::read_to_string(conf_path).await?;
+        return Ok(with_registry(&REG, || toml::from_str(&conf_text))?);
+    }
+
+    let mut configs: CompositionConfigs = Default::default();
+
+    let blob_service_url = Url::parse(&urls.blob_service_addr)?;
+    let directory_service_url = Url::parse(&urls.directory_service_addr)?;
+    let path_info_service_url = Url::parse(&urls.path_info_service_addr)?;
+
+    configs.blobservices.insert(
+        "default".into(),
+        with_registry(&REG, || blob_service_url.try_into())?,
+    );
+    configs.directoryservices.insert(
+        "default".into(),
+        with_registry(&REG, || directory_service_url.try_into())?,
+    );
+    configs.pathinfoservices.insert(
+        "default".into(),
+        with_registry(&REG, || path_info_service_url.try_into())?,
+    );
+
+    Ok(configs)
+}
 
 /// Construct the store handles from their addrs.
 pub async fn construct_services(
-    blob_service_addr: impl AsRef<str>,
-    directory_service_addr: impl AsRef<str>,
-    path_info_service_addr: impl AsRef<str>,
-) -> std::io::Result<(
-    Arc<dyn BlobService>,
-    Arc<dyn DirectoryService>,
-    Box<dyn PathInfoService>,
-    Box<dyn NarCalculationService>,
-)> {
-    let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref())
-        .await?
-        .into();
-    let directory_service: Arc<dyn DirectoryService> =
-        directoryservice::from_addr(directory_service_addr.as_ref())
-            .await?
-            .into();
-    let path_info_service = pathinfoservice::from_addr(
-        path_info_service_addr.as_ref(),
-        blob_service.clone(),
-        directory_service.clone(),
-    )
-    .await?;
-
-    // TODO: grpc client also implements NarCalculationService
-    let nar_calculation_service = Box::new(SimpleRenderer::new(
-        blob_service.clone(),
-        directory_service.clone(),
-    )) as Box<dyn NarCalculationService>;
+    urls: impl Into<ServiceUrls>,
+) -> Result<
+    (
+        Arc<dyn BlobService>,
+        Arc<dyn DirectoryService>,
+        Arc<dyn PathInfoService>,
+        Box<dyn NarCalculationService>,
+    ),
+    Box<dyn std::error::Error + Send + Sync>,
+> {
+    let configs = addrs_to_configs(urls).await?;
+    construct_services_from_configs(configs).await
+}
+
+/// Construct the store handles from their addrs.
+pub async fn construct_services_from_configs(
+    configs: CompositionConfigs,
+) -> Result<
+    (
+        Arc<dyn BlobService>,
+        Arc<dyn DirectoryService>,
+        Arc<dyn PathInfoService>,
+        Box<dyn NarCalculationService>,
+    ),
+    Box<dyn std::error::Error + Send + Sync>,
+> {
+    let mut comp = Composition::default();
+
+    comp.extend(configs.blobservices);
+    comp.extend(configs.directoryservices);
+    comp.extend(configs.pathinfoservices);
+
+    let blob_service: Arc<dyn BlobService> = comp.build("default").await?;
+    let directory_service: Arc<dyn DirectoryService> = comp.build("default").await?;
+    let path_info_service: Arc<dyn PathInfoService> = comp.build("default").await?;
+
+    // HACK: The grpc client also implements NarCalculationService, and we
+    // really want to use it (otherwise we'd need to fetch everything again for hashing).
+    // Until we revamped store composition and config, detect this special case here.
+    let nar_calculation_service: Box<dyn NarCalculationService> = path_info_service
+        .nar_calculation_service()
+        .unwrap_or_else(|| {
+            Box::new(SimpleRenderer::new(
+                blob_service.clone(),
+                directory_service.clone(),
+            ))
+        });
 
     Ok((
         blob_service,