+name = "tvix-store"
+version = "0.1.0"
+edition = "2021"
+anyhow = "1.0.68"
+async-compression = { version = "0.4.9", features = ["tokio", "bzip2", "gzip", "xz", "zstd"]}
+async-stream = "0.3.5"
+blake3 = { version = "1.3.1", features = ["rayon", "std"] }
+bstr = "1.6.0"
+bytes = "1.4.0"
+clap = { version = "4.0", features = ["derive", "env"] }
+count-write = "0.1.0"
+data-encoding = "2.3.3"
+futures = "0.3.30"
+lazy_static = "1.4.0"
+nix-compat = { path = "../nix-compat", features = ["async"] }
+pin-project-lite = "0.2.13"
+prost = "0.12.1"
+opentelemetry = { version = "0.22.0", optional = true}
+opentelemetry-otlp = { version = "0.15.0", optional = true }
+opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"], optional = true}
+serde = { version = "1.0.197", features = [ "derive" ] }
+serde_json = "1.0"
+serde_with = "3.7.0"
+serde_qs = "0.12.0"
+sha2 = "0.10.6"
+sled = { version = "0.34.7" }
+thiserror = "1.0.38"
+tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
+tokio-listener = { version = "0.4.1", features = [ "tonic011" ] }
+tokio-stream = { version = "0.1.14", features = ["fs"] }
+tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
+tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
+tower = "0.4.13"
+tracing = "0.1.37"
+tracing-opentelemetry = "0.23.0"
+tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
+tvix-castore = { path = "../castore" }
+url = "2.4.0"
+walkdir = "2.4.0"
+reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false }
+lru = "0.12.3"
+parking_lot = "0.12.2"
+indicatif = "0.17.8"
+tracing-indicatif = "0.3.6"
+optional = true
+version = "0.11.0"
+optional = true
+# https://github.com/liufuyang/bigtable_rs/pull/72
+git = "https://github.com/flokli/bigtable_rs"
+rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7"
+prost-build = "0.12.1"
+tonic-build = "0.11.0"
+async-process = "2.1.0"
+rstest = "0.19.0"
+rstest_reuse = "0.6.0"
+tempfile = "3.3.0"
+tokio-retry = "0.3.0"
+default = ["cloud", "fuse", "otlp", "tonic-reflection"]
+cloud = [
+  "dep:bigtable_rs",
+  "tvix-castore/cloud"
+fuse = ["tvix-castore/fuse"]
+otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"]
+tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"]
+virtiofs = ["tvix-castore/virtiofs"]
+# Whether to run the integration tests.
+# Requires the following packages in $PATH:
+# cbtemulator, google-cloud-bigtable-tool
+integration = []
+workspace = true
diff --git a/tvix/store/README.md b/tvix/store/README.md
new file mode 100644
index 0000000000..a9d29671d8
--- /dev/null
+++ b/tvix/store/README.md
@@ -0,0 +1,63 @@
+# //tvix/store
+This contains the code hosting the tvix-store.
+For the local store, Nix realizes files on the filesystem in `/nix/store` (and
+maintains some metadata in a SQLite database). For "remote stores", it
+communicates this metadata in NAR (Nix ARchive) and NARInfo format.
+Compared to the Nix model, `tvix-store` stores data on a much more granular
+level than that, which provides more deduplication possibilities, and more
+granular copying.
+However, enough information is preserved to still be able to render NAR and
+NARInfo when needed.
+## More Information
+The store consists out of two different gRPC services, `tvix.castore.v1` for
+the low-level content-addressed bits, and `tvix.store.v1` for the Nix and
+`StorePath`-specific bits.
+Check the `protos/` subfolder both here and in `castore` for the definition of
+the exact RPC methods and messages.
+## Interacting with the GRPC service manually
+The shell environment in `//tvix` provides `evans`, which is an interactive
+REPL-based gPRC client.
+You can use it to connect to a `tvix-store` and call the various RPC methods.
+$ cargo run -- daemon &
+$ evans --host localhost --port 8000 -r repl
+  ______
+ |  ____|
+ | |__    __   __   __ _   _ __    ___
+ |  __|   \ \ / /  / _. | | '_ \  / __|
+ | |____   \ V /  | (_| | | | | | \__ \
+ |______|   \_/    \__,_| |_| |_| |___/
+ more expressive universal gRPC client
+localhost:8000> package tvix.castore.v1
+tvix.castore.v1@localhost:8000> service BlobService
+tvix.castore.v1.BlobService@localhost:8000> call Put --bytes-from-file
+data (TYPE_BYTES) => /run/current-system/system
+  "digest": "KOM3/IHEx7YfInAnlJpAElYezq0Sxn9fRz7xuClwNfA="
+tvix.castore.v1.BlobService@localhost:8000> call Read --bytes-as-base64
+digest (TYPE_BYTES) => KOM3/IHEx7YfInAnlJpAElYezq0Sxn9fRz7xuClwNfA=
+  "data": "eDg2XzY0LWxpbnV4"
+$ echo eDg2XzY0LWxpbnV4 | base64 -d
+Thanks to `tvix-store` providing gRPC Server Reflection (with `reflection`
+feature), you don't need to point `evans` to the `.proto` files.
diff --git a/tvix/store/build.rs b/tvix/store/build.rs
new file mode 100644
index 0000000000..809fa29578
--- /dev/null
+++ b/tvix/store/build.rs
@@ -0,0 +1,38 @@
+use std::io::Result;
+fn main() -> Result<()> {
+    #[allow(unused_mut)]
+    let mut builder = tonic_build::configure();
+    #[cfg(feature = "tonic-reflection")]
+    {
+        let out_dir = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap());
+        let descriptor_path = out_dir.join("tvix.store.v1.bin");
+        builder = builder.file_descriptor_set_path(descriptor_path);
+    };
+    // https://github.com/hyperium/tonic/issues/908
+    let mut config = prost_build::Config::new();
+    config.bytes(["."]);
+    config.extern_path(".tvix.castore.v1", "::tvix_castore::proto");
+    builder
+        .build_server(true)
+        .build_client(true)
+        .emit_rerun_if_changed(false)
+        .compile_with_config(
+            config,
+            &[
+                "tvix/store/protos/pathinfo.proto",
+                "tvix/store/protos/rpc_pathinfo.proto",
+            ],
+            // If we are in running `cargo build` manually, using `../..` works fine,
+            // but in case we run inside a nix build, we need to instead point PROTO_ROOT
+            // to a sparseTree containing that structure.
+            &[match std::env::var_os("PROTO_ROOT") {
+                Some(proto_root) => proto_root.to_str().unwrap().to_owned(),
+                None => "../..".to_string(),
+            }],
+        )
diff --git a/tvix/store/default.nix b/tvix/store/default.nix
new file mode 100644
index 0000000000..78b499114c
--- /dev/null
+++ b/tvix/store/default.nix
@@ -0,0 +1,56 @@
+{ depot, pkgs, lib, ... }:
+  mkImportCheck = p: expectedPath: {
+    label = ":nix :import ${p} with tvix-store import";
+    needsOutput = true;
+    command = pkgs.writeShellScript "tvix-import-check" ''
+      export BLOB_SERVICE_ADDR=memory://
+      export DIRECTORY_SERVICE_ADDR=memory://
+      export PATH_INFO_SERVICE_ADDR=memory://
+      TVIX_STORE_OUTPUT=$(result/bin/tvix-store import ${p})
+      EXPECTED='${/* the vebatim expected Tvix output: */expectedPath}'
+      echo "tvix-store output: ''${TVIX_STORE_OUTPUT}"
+      if [ "$TVIX_STORE_OUTPUT" != "$EXPECTED" ]; then
+        echo "Correct would have been ''${EXPECTED}"
+        exit 1
+      fi
+      echo "Output was correct."
+    '';
+  };
+(depot.tvix.crates.workspaceMembers.tvix-store.build.override (old: {
+  runTests = true;
+  testPreRun = ''
+    export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
+  '';
+  features = old.features
+    # virtiofs feature currently fails to build on Darwin
+    ++ lib.optional pkgs.stdenv.isLinux "virtiofs";
+})).overrideAttrs (old: rec {
+  meta.ci = {
+    targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru);
+    extraSteps.import-docs = (mkImportCheck "tvix/store/docs" ./docs);
+  };
+  passthru = (depot.tvix.utils.mkFeaturePowerset {
+    inherit (old) crateName;
+    features = ([ "cloud" "fuse" "otlp" "tonic-reflection" ]
+      # virtiofs feature currently fails to build on Darwin
+      ++ lib.optional pkgs.stdenv.isLinux "virtiofs");
+    override.testPreRun = ''
+      export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
+    '';
+  }) // {
+    integration-tests = depot.tvix.crates.workspaceMembers.${old.crateName}.build.override (old: {
+      runTests = true;
+      testPreRun = ''
+        export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt;
+        export PATH="$PATH:${pkgs.lib.makeBinPath [ pkgs.cbtemulator pkgs.google-cloud-bigtable-tool ]}"
+      '';
+      features = old.features ++ [ "integration" ];
+    });
+  };
diff --git a/tvix/store/docs/api.md b/tvix/store/docs/api.md
new file mode 100644
index 0000000000..01e72671a7
--- /dev/null
+++ b/tvix/store/docs/api.md
@@ -0,0 +1,288 @@
+tvix-[ca]store API
+This document outlines the design of the API exposed by tvix-castore and tvix-
+store, as well as other implementations of this store protocol.
+This document is meant to be read side-by-side with
+[castore.md](../../tvix-castore/docs/castore.md) which describes the data model
+in more detail.
+The store API has four main consumers:
+1. The evaluator (or more correctly, the CLI/coordinator, in the Tvix
+   case) communicates with the store to:
+   * Upload files and directories (e.g. from `builtins.path`, or `src = ./path`
+     Nix expressions).
+   * Read files from the store where necessary (e.g. when `nixpkgs` is
+     located in the store, or for IFD).
+2. The builder communicates with the store to:
+   * Upload files and directories after a build, to persist build artifacts in
+     the store.
+3. Tvix clients (such as users that have Tvix installed, or, depending
+   on perspective, builder environments) expect the store to
+   "materialise" on disk to provide a directory layout with store
+   paths.
+4. Stores may communicate with other stores, to substitute already built store
+   paths, i.e. a store acts as a binary cache for other stores.
+The store API attempts to reuse parts of its API between these three
+consumers by making similarities explicit in the protocol. This leads
+to a protocol that is slightly more complex than a simple "file
+upload/download"-system, but at significantly greater efficiency, both in terms
+of deduplication opportunities as well as granularity.
+## The Store model
+Contents inside a tvix-store can be grouped into three different message types:
+ * Blobs
+ * Directories
+ * PathInfo (see further down)
+(check `castore.md` for more detailed field descriptions)
+### Blobs
+A blob object contains the literal file contents of regular (or executable)
+### Directory
+A directory object describes the direct children of a directory.
+It contains:
+ - name of child (regular or executable) files, and their [blake3][blake3] hash.
+ - name of child symlinks, and their target (as string)
+ - name of child directories, and their [blake3][blake3] hash (forming a Merkle DAG)
+### Content-addressed Store Model
+For example, lets consider a directory layout like this, with some
+imaginary hashes of file contents:
+├── file-1.txt        hash: 5891b5b522d5df086d0ff0b110fb
+└── nested
+    └── file-2.txt    hash: abc6fd595fc079d3114d4b71a4d8
+A hash for the *directory* `nested` can be created by creating the `Directory`
+  "directories": [],
+  "files": [{
+    "name": "file-2.txt",
+    "digest": "abc6fd595fc079d3114d4b71a4d8",
+    "size": 123,
+  }],
+  "symlink": [],
+And then hashing a serialised form of that data structure. We use the blake3
+hash of the canonical protobuf representation. Let's assume the hash was
+To create the directory object one layer up, we now refer to our `nested`
+directory object in `directories`, and to `file-1.txt` in `files`:
+  "directories": [{
+    "name": "nested",
+    "digest": "ff0029485729bcde993720749232",
+    "size": 1,
+  }],
+  "files": [{
+    "name": "file-1.txt",
+    "digest": "5891b5b522d5df086d0ff0b110fb",
+    "size": 124,
+  }]
+This Merkle DAG of Directory objects, and flat store of blobs can be used to
+describe any file/directory/symlink inside a store path. Due to its content-
+addressed nature, it'll automatically deduplicate (re-)used (sub)directories,
+and allow substitution from any (untrusted) source.
+The thing that's now only missing is the metadata to map/"mount" from the
+content-addressed world to a physical path.
+### PathInfo
+As most paths in the Nix store currently are input-addressed [^input-addressed],
+and the `tvix-castore` data model is also not intrinsically using NAR hashes,
+we need something mapping from an input-addressed "output path hash" (or a Nix-
+specific content-addressed path) to the contents in the `tvix-castore` world.
+That's what `PathInfo` provides. It embeds the root node (Directory, File or
+Symlink) at a given store path.
+The root nodes' `name` field is populated with the (base)name inside
+`/nix/store`, so `xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-pname-1.2.3`.
+The `PathInfo` message also stores references to other store paths, and some
+more NARInfo-specific metadata (signatures, narhash, narsize).
+## API overview
+There's three different services:
+### BlobService
+`BlobService` can be used to store and retrieve blobs of data, used to host
+regular file contents.
+It is content-addressed, using [blake3][blake3]
+as a hashing function.
+As blake3 is a tree hash, there's an opportunity to do
+[verified streaming][bao] of parts of the file,
+which doesn't need to trust any more information than the root hash itself.
+Future extensions of the `BlobService` protocol will enable this.
+### DirectoryService
+`DirectoryService` allows lookups (and uploads) of `Directory` messages, and
+whole reference graphs of them.
+### PathInfoService
+The PathInfo service provides lookups from a store path hash to a `PathInfo`
+## Example flows
+Below there are some common use cases of tvix-store, and how the different
+services are used.
+###  Upload files and directories
+This is needed for `builtins.path` or `src = ./path` in Nix expressions (A), as
+well as for uploading build artifacts to a store (B).
+The path specified needs to be (recursively, BFS-style) traversed.
+ * All file contents need to be hashed with blake3, and submitted to the
+   *BlobService* if not already present.
+   A reference to them needs to be added to the parent Directory object that's
+   constructed.
+ * All symlinks need to be added to the parent directory they reside in.
+ * Whenever a Directory has been fully traversed, it needs to be uploaded to
+   the *DirectoryService* and a reference to it needs to be added to the parent
+   Directory object.
+Most of the hashing / directory traversal/uploading can happen in parallel,
+as long as Directory objects only refer to Directory objects and Blobs that
+have already been uploaded.
+When reaching the root, a `PathInfo` object needs to be constructed.
+ * In the case of content-addressed paths (A), the name of the root node is
+   based on the NAR representation of the contents.
+   It might make sense to be able to offload the NAR calculation to the store,
+   which can cache it.
+ * In the case of build artifacts (B), the output path is input-addressed and
+   known upfront.
+Contrary to Nix, this has the advantage of not having to upload a lot of things
+to the store that didn't change.
+### Reading files from the store from the evaluator
+This is the case when `nixpkgs` is located in the store, or IFD in general.
+The store client asks the `PathInfoService` for the `PathInfo` of the output
+path in the request, and looks at the root node.
+If something other than the root of the store path is requested, like for
+example `maintainers/maintainer-list.nix`, the root_node Directory is inspected
+and potentially a chain of `Directory` objects requested from
+*DirectoryService*. [^n+1query].
+When the desired file is reached, the *BlobService* can be used to read the
+contents of this file, and return it back to the evaluator.
+FUTUREWORK: define how importing from symlinks should/does work.
+Contrary to Nix, this has the advantage of not having to copy all of the
+contents of a store path to the evaluating machine, but really only fetching
+the files the evaluator currently cares about.
+### Materializing store paths on disk
+This is useful for people running a Tvix-only system, or running builds on a
+"Tvix remote builder" in its own mount namespace.
+In a system with Nix installed, we can't simply manually "extract" things to
+`/nix/store`, as Nix assumes to own all writes to this location.
+In these use cases, we're probably better off exposing a tvix-store as a local
+binary cache (that's what `//tvix/nar-bridge-go` does).
+Assuming we are in an environment where we control `/nix/store` exclusively, a
+"realize to disk" would either "extract" things from the `tvix-store` to a
+filesystem, or expose a `FUSE`/`virtio-fs` filesystem.
+The latter is already implemented, and particularly interesting for (remote)
+build workloads, as build inputs can be realized on-demand, which saves copying
+around a lot of never- accessed files.
+In both cases, the API interactions are similar.
+ * The *PathInfoService* is asked for the `PathInfo` of the requested store path.
+ * If everything should be "extracted", the *DirectoryService* is asked for all
+   `Directory` objects in the closure, the file structure is created, all Blobs
+   are downloaded and placed in their corresponding location and all symlinks
+   are created accordingly.
+ * If this is a FUSE filesystem, we can decide to only request a subset,
+   similar to the "Reading files from the store from the evaluator" use case,
+   even though it might make sense to keep all Directory objects around.
+   (See the caveat in "Trust model" though!)
+### Stores communicating with other stores
+The gRPC API exposed by the tvix-store allows composing multiple stores, and
+implementing some caching strategies, that store clients don't need to be aware
+ * For example, a caching strategy could have a fast local tvix-store, that's
+   asked first and filled with data from a slower remote tvix-store.
+ * Multiple stores could be asked for the same data, and whatever store returns
+   the right data first wins.
+## Trust model / Distribution
+As already described above, the only non-content-addressed service is the
+`PathInfo` service.
+This means, all other messages (such as `Blob` and `Directory` messages) can be
+substituted from many different, untrusted sources/mirrors, which will make
+plugging in additional substitution strategies like IPFS, local network
+neighbors super simple. That's also why it's living in the `tvix-castore` crate.
+As for `PathInfo`, we don't specify an additional signature mechanism yet, but
+carry the NAR-based signatures from Nix along.
+This means, if we don't trust a remote `PathInfo` object, we currently need to
+"stream" the NAR representation to validate these signatures.
+However, the slow part is downloading of NAR files, and considering we have
+more granularity available, we might only need to download some small blobs,
+rather than a whole NAR file.
+A future signature mechanism, that is only signing (parts of) the `PathInfo`
+message, which only points to content-addressed data will enable verified
+partial access into a store path, opening up opportunities for lazy filesystem
+access etc.
+[blake3]: https://github.com/BLAKE3-team/BLAKE3
+[bao]: https://github.com/oconnor663/bao
+[^input-addressed]: Nix hashes the A-Term representation of a .drv, after doing
+                    some replacements on refered Input Derivations to calculate
+                    output paths.
+[^n+1query]: This would expose an N+1 query problem. However it's not a problem
+             in practice, as there's usually always a "local" caching store in
+             the loop, and *DirectoryService* supports a recursive lookup for
+             all `Directory` children of a `Directory`
diff --git a/tvix/store/protos/LICENSE b/tvix/store/protos/LICENSE
new file mode 100644
index 0000000000..2034ada6fd
--- /dev/null
+++ b/tvix/store/protos/LICENSE
@@ -0,0 +1,21 @@
+Copyright © The Tvix Authors
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+“Software”), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
diff --git a/tvix/store/protos/default.nix b/tvix/store/protos/default.nix
new file mode 100644
index 0000000000..56345d9338
--- /dev/null
+++ b/tvix/store/protos/default.nix
@@ -0,0 +1,55 @@
+{ depot, pkgs, ... }:
+  protos = depot.nix.sparseTree {
+    name = "store-protos";
+    root = depot.path.origSrc;
+    paths = [
+      # We need to include castore.proto (only), as it's referred.
+      ../../castore/protos/castore.proto
+      ./pathinfo.proto
+      ./rpc_pathinfo.proto
+      ../../../buf.yaml
+      ../../../buf.gen.yaml
+    ];
+  };
+depot.nix.readTree.drvTargets {
+  inherit protos;
+  # Lints and ensures formatting of the proto files.
+  check = pkgs.stdenv.mkDerivation {
+    name = "proto-check";
+    src = protos;
+    nativeBuildInputs = [
+      pkgs.buf
+    ];
+    buildPhase = ''
+      export HOME=$TMPDIR
+      buf lint
+      buf format -d --exit-code
+      touch $out
+    '';
+  };
+  # Produces the golang bindings.
+  go-bindings = pkgs.stdenv.mkDerivation {
+    name = "go-bindings";
+    src = protos;
+    nativeBuildInputs = [
+      pkgs.buf
+      pkgs.protoc-gen-go
+      pkgs.protoc-gen-go-grpc
+    ];
+    buildPhase = ''
+      export HOME=$TMPDIR
+      buf generate
+      mkdir -p $out
+      cp tvix/store/protos/*.pb.go $out/
+    '';
+  };
diff --git a/tvix/store/protos/pathinfo.proto b/tvix/store/protos/pathinfo.proto
new file mode 100644
index 0000000000..b03e7e938e
--- /dev/null
+++ b/tvix/store/protos/pathinfo.proto
@@ -0,0 +1,128 @@
+// SPDX-License-Identifier: MIT
+// Copyright © 2022 The Tvix Authors
+syntax = "proto3";
+package tvix.store.v1;
+import "tvix/castore/protos/castore.proto";
+option go_package = "code.tvl.fyi/tvix/store-go;storev1";
+// PathInfo shows information about a Nix Store Path.
+// That's a single element inside /nix/store.
+message PathInfo {
+  // The path can be a directory, file or symlink.
+  tvix.castore.v1.Node node = 1;
+  // List of references (output path hashes)
+  // This really is the raw *bytes*, after decoding nixbase32, and not a
+  // base32-encoded string.
+  repeated bytes references = 2;
+  // see below.
+  NARInfo narinfo = 3;
+// Represents a path in the Nix store (a direct child of STORE_DIR).
+// It is commonly formatted by a nixbase32-encoding the digest, and
+// concatenating the name, separated by a `-`.
+message StorePath {
+  // The string after digest and `-`.
+  string name = 1;
+  // The digest (20 bytes).
+  bytes digest = 2;
+// Nix C++ uses NAR (Nix Archive) as a format to transfer store paths,
+// and stores metadata and signatures in NARInfo files.
+// Store all these attributes in a separate message.
+// This is useful to render .narinfo files to clients, or to preserve/validate
+// these signatures.
+// As verifying these signatures requires the whole NAR file to be synthesized,
+// moving to another signature scheme is desired.
+// Even then, it still makes sense to hold this data, for old clients.
+message NARInfo {
+  // This represents a (parsed) signature line in a .narinfo file.
+  message Signature {
+    string name = 1;
+    bytes data = 2;
+  }
+  // This size of the NAR file, in bytes.
+  uint64 nar_size = 1;
+  // The sha256 of the NAR file representation.
+  bytes nar_sha256 = 2;
+  // The signatures in a .narinfo file.
+  repeated Signature signatures = 3;
+  // A list of references. To validate .narinfo signatures, a fingerprint needs
+  // to be constructed.
+  // This fingerprint doesn't just contain the hashes of the output paths of all
+  // references (like PathInfo.references), but their whole (base)names, so we
+  // need to keep them somewhere.
+  repeated string reference_names = 4;
+  // The StorePath of the .drv file producing this output.
+  // The .drv suffix is omitted in its `name` field.
+  StorePath deriver = 5;
+  // The CA field in the .narinfo.
+  // Its textual representations seen in the wild are one of the following:
+  //  - `fixed:r:sha256:1gcky5hlf5vqfzpyhihydmm54grhc94mcs8w7xr8613qsqb1v2j6`
+  //    fixed-output derivations using "recursive" `outputHashMode`.
+  //  - `fixed:sha256:19xqkh72crbcba7flwxyi3n293vav6d7qkzkh2v4zfyi4iia8vj8
+  //    fixed-output derivations using "flat" `outputHashMode`
+  //  - `text:sha256:19xqkh72crbcba7flwxyi3n293vav6d7qkzkh2v4zfyi4iia8vj8`
+  //    Text hashing, used for uploaded .drv files and outputs produced by
+  //    builtins.toFile.
+  //
+  // Semantically, they can be split into the following components:
+  //  - "content address prefix". Currently, "fixed" and "text" are supported.
+  //  - "hash mode". Currently, "flat" and "recursive" are supported.
+  //  - "hash type". The underlying hash function used.
+  //    Currently, sha1, md5, sha256, sha512.
+  //  - "digest". The digest itself.
+  //
+  // There are some restrictions on the possible combinations.
+  // For example, `text` and `fixed:recursive` always imply sha256.
+  //
+  // We use an enum to encode the possible combinations, and optimize for the
+  // common case, `fixed:recursive`, identified as `NAR_SHA256`.
+  CA ca = 6;
+  message CA {
+    enum Hash {
+      // produced when uploading fixed-output store paths using NAR-based
+      // hashing (`outputHashMode = "recursive"`).
+      NAR_SHA256 = 0;
+      NAR_SHA1 = 1;
+      NAR_SHA512 = 2;
+      NAR_MD5 = 3;
+      // Produced when uploading .drv files or outputs produced by
+      // builtins.toFile.
+      // Produces equivalent digests as FLAT_SHA256, but is a separate
+      // hashing type in Nix, affecting output path calculation.
+      TEXT_SHA256 = 4;
+      // Produced when using fixed-output derivations with
+      // `outputHashMode = "flat"`.
+      FLAT_SHA1 = 5;
+      FLAT_MD5 = 6;
+      FLAT_SHA256 = 7;
+      FLAT_SHA512 = 8;
+      // TODO: what happens in Rust if we introduce a new enum kind here?
+    }
+    // The hashing type used.
+    Hash type = 1;
+    // The digest, in raw bytes.
+    bytes digest = 2;
+  }
diff --git a/tvix/store/protos/rpc_pathinfo.proto b/tvix/store/protos/rpc_pathinfo.proto
new file mode 100644
index 0000000000..c1c91658ad
--- /dev/null
+++ b/tvix/store/protos/rpc_pathinfo.proto
@@ -0,0 +1,76 @@
+// SPDX-License-Identifier: MIT
+// Copyright © 2022 The Tvix Authors
+syntax = "proto3";
+package tvix.store.v1;
+import "tvix/castore/protos/castore.proto";
+import "tvix/store/protos/pathinfo.proto";
+option go_package = "code.tvl.fyi/tvix/store-go;storev1";
+service PathInfoService {
+  // Return a PathInfo message matching the criteria specified in the
+  // GetPathInfoRequest message.
+  rpc Get(GetPathInfoRequest) returns (PathInfo);
+  // Upload a PathInfo object to the remote end. It MUST not return until the
+  // PathInfo object has been written on the the remote end.
+  //
+  // The remote end MAY check if a potential DirectoryNode has already been
+  // uploaded.
+  //
+  // Uploading clients SHOULD obviously not steer other machines to try to
+  // substitute before from the remote end before having finished uploading
+  // PathInfo, Directories and Blobs.
+  // The returned PathInfo object MAY contain additional narinfo signatures, but
+  // is otherwise left untouched.
+  rpc Put(PathInfo) returns (PathInfo);
+  // Calculate the NAR representation of the contents specified by the
+  // root_node. The calculation SHOULD be cached server-side for subsequent
+  // requests.
+  //
+  // All references (to blobs or Directory messages) MUST already exist in the
+  // store.
+  //
+  // The method can be used to produce a Nix fixed-output path, which contains
+  // the (compressed) sha256 of the NAR content representation in the root_node
+  // name (suffixed with the name).
+  //
+  // It can also be used to calculate arbitrary NAR hashes of output paths, in
+  // case a legacy Nix Binary Cache frontend is provided.
+  rpc CalculateNAR(tvix.castore.v1.Node) returns (CalculateNARResponse);
+  // Return a stream of PathInfo messages matching the criteria specified in
+  // ListPathInfoRequest.
+  rpc List(ListPathInfoRequest) returns (stream PathInfo);
+// The parameters that can be used to lookup a (single) PathInfo object.
+// Currently, only a lookup by output hash is supported.
+message GetPathInfoRequest {
+  oneof by_what {
+    // The output hash of a nix path (20 bytes).
+    // This is the nixbase32-decoded portion of a Nix output path, so to substitute
+    // /nix/store/xm35nga2g20mz5sm5l6n8v3bdm86yj83-cowsay-3.04
+    // this field would contain nixbase32dec("xm35nga2g20mz5sm5l6n8v3bdm86yj83").
+    bytes by_output_hash = 1;
+  }
+// The parameters that can be used to lookup (multiple) PathInfo objects.
+// Currently no filtering is possible, all objects are returned.
+message ListPathInfoRequest {}
+// CalculateNARResponse is the response returned by the CalculateNAR request.
+// It contains the size of the NAR representation (in bytes), and the sha56
+// digest.
+message CalculateNARResponse {
+  // This size of the NAR file, in bytes.
+  uint64 nar_size = 1;
+  // The sha256 of the NAR file representation.
+  bytes nar_sha256 = 2;
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
new file mode 100644
index 0000000000..2a2d6fe6f7
--- /dev/null
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -0,0 +1,617 @@
+use clap::Parser;
+use clap::Subcommand;
+use futures::future::try_join_all;
+use futures::StreamExt;
+use futures::TryStreamExt;
+use indicatif::ProgressStyle;
+use nix_compat::path_info::ExportedPathInfo;
+use serde::Deserialize;
+use serde::Serialize;
+use std::path::PathBuf;
+use std::sync::Arc;
+use tokio_listener::Listener;
+use tokio_listener::SystemOptions;
+use tokio_listener::UserOptions;
+use tonic::transport::Server;
+use tracing::info;
+use tracing::info_span;
+use tracing::instrument;
+use tracing::Level;
+use tracing::Span;
+use tracing_indicatif::filter::IndicatifFilter;
+use tracing_indicatif::{span_ext::IndicatifSpanExt, IndicatifLayer};
+use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
+use tvix_castore::import::fs::ingest_path;
+use tvix_store::nar::NarCalculationService;
+use tvix_store::proto::NarInfo;
+use tvix_store::proto::PathInfo;
+use tvix_castore::proto::blob_service_server::BlobServiceServer;
+use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
+use tvix_castore::proto::GRPCBlobServiceWrapper;
+use tvix_castore::proto::GRPCDirectoryServiceWrapper;
+use tvix_store::pathinfoservice::PathInfoService;
+use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
+use tvix_store::proto::GRPCPathInfoServiceWrapper;
+use lazy_static::lazy_static;
+// FUTUREWORK: move this to tracing crate
+lazy_static! {
+    pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template(
+        "{span_child_prefix}{bar:30} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+    )
+    .expect("invalid progress template");
+    pub static ref PB_SPINNER_STYLE: ProgressStyle = ProgressStyle::with_template(
+        "{span_child_prefix}{spinner} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+    )
+    .expect("invalid progress template");
+#[cfg(any(feature = "fuse", feature = "virtiofs"))]
+use tvix_store::pathinfoservice::make_fs;
+#[cfg(feature = "fuse")]
+use tvix_castore::fs::fuse::FuseDaemon;
+#[cfg(feature = "otlp")]
+use opentelemetry::KeyValue;
+#[cfg(feature = "otlp")]
+use opentelemetry_sdk::{
+    resource::{ResourceDetector, SdkProvidedResourceDetector},
+    trace::BatchConfig,
+    Resource,
+#[cfg(feature = "virtiofs")]
+use tvix_castore::fs::virtiofs::start_virtiofs_daemon;
+#[cfg(feature = "tonic-reflection")]
+#[cfg(feature = "tonic-reflection")]
+use tvix_store::proto::FILE_DESCRIPTOR_SET;
+#[command(author, version, about, long_about = None)]
+struct Cli {
+    /// Whether to configure OTLP. Set --otlp=false to disable.
+    #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
+    otlp: bool,
+    /// A global log level to use when printing logs.
+    /// It's also possible to set `RUST_LOG` according to
+    /// `tracing_subscriber::filter::EnvFilter`, which will always have
+    /// priority.
+    #[arg(long)]
+    log_level: Option<Level>,
+    #[command(subcommand)]
+    command: Commands,
+enum Commands {
+    /// Runs the tvix-store daemon.
+    Daemon {
+        #[arg(long, short = 'l')]
+        listen_address: Option<String>,
+        #[arg(
+            long,
+            env,
+            default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
+        )]
+        blob_service_addr: String,
+        #[arg(
+            long,
+            env,
+            default_value = "sled:///var/lib/tvix-store/directories.sled"
+        )]
+        directory_service_addr: String,
+        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
+        path_info_service_addr: String,
+    },
+    /// Imports a list of paths into the store, print the store path for each of them.
+    Import {
+        #[clap(value_name = "PATH")]
+        paths: Vec<PathBuf>,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+    },
+    /// Copies a list of store paths on the system into tvix-store.
+    Copy {
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+        /// A path pointing to a JSON file produced by the Nix
+        /// `__structuredAttrs` containing reference graph information provided
+        /// by the `exportReferencesGraph` feature.
+        ///
+        /// This can be used to invoke tvix-store inside a Nix derivation
+        /// copying to a Tvix store (or outside, if the JSON file is copied
+        /// out).
+        ///
+        /// Currently limited to the `closure` key inside that JSON file.
+        #[arg(value_name = "NIX_ATTRS_JSON_FILE", env = "NIX_ATTRS_JSON_FILE")]
+        reference_graph_path: PathBuf,
+    },
+    /// Mounts a tvix-store at the given mountpoint
+    #[cfg(feature = "fuse")]
+    Mount {
+        #[clap(value_name = "PATH")]
+        dest: PathBuf,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+        /// Number of FUSE threads to spawn.
+        #[arg(long, env, default_value_t = default_threads())]
+        threads: usize,
+        #[arg(long, env, default_value_t = false)]
+        /// Whether to configure the mountpoint with allow_other.
+        /// Requires /etc/fuse.conf to contain the `user_allow_other`
+        /// option, configured via `programs.fuse.userAllowOther` on NixOS.
+        allow_other: bool,
+        /// Whether to list elements at the root of the mount point.
+        /// This is useful if your PathInfoService doesn't provide an
+        /// (exhaustive) listing.
+        #[clap(long, short, action)]
+        list_root: bool,
+        #[arg(long, default_value_t = true)]
+        /// Whether to expose blob and directory digests as extended attributes.
+        show_xattr: bool,
+    },
+    /// Starts a tvix-store virtiofs daemon at the given socket path.
+    #[cfg(feature = "virtiofs")]
+    #[command(name = "virtiofs")]
+    VirtioFs {
+        #[clap(value_name = "PATH")]
+        socket: PathBuf,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+        /// Whether to list elements at the root of the mount point.
+        /// This is useful if your PathInfoService doesn't provide an
+        /// (exhaustive) listing.
+        #[clap(long, short, action)]
+        list_root: bool,
+        #[arg(long, default_value_t = true)]
+        /// Whether to expose blob and directory digests as extended attributes.
+        show_xattr: bool,
+    },
+#[cfg(all(feature = "fuse", not(target_os = "macos")))]
+fn default_threads() -> usize {
+    std::thread::available_parallelism()
+        .map(|threads| threads.into())
+        .unwrap_or(4)
+// On MacFUSE only a single channel will receive ENODEV when the file system is
+// unmounted and so all the other channels will block forever.
+// See https://github.com/osxfuse/osxfuse/issues/974
+#[cfg(all(feature = "fuse", target_os = "macos"))]
+fn default_threads() -> usize {
+    1
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let cli = Cli::parse();
+    // configure log settings
+    let level = cli.log_level.unwrap_or(Level::INFO);
+    let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
+    // Set up the tracing subscriber.
+    let subscriber = tracing_subscriber::registry()
+        .with(
+            tracing_subscriber::fmt::Layer::new()
+                .with_writer(indicatif_layer.get_stderr_writer())
+                .compact()
+                .with_filter(
+                    EnvFilter::builder()
+                        .with_default_directive(level.into())
+                        .from_env()
+                        .expect("invalid RUST_LOG"),
+                ),
+        )
+        .with(indicatif_layer.with_filter(
+            // only show progress for spans with indicatif.pb_show field being set
+            IndicatifFilter::new(false),
+        ));
+    // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
+    // then init the registry.
+    // If the feature is feature-flagged out, just init without adding the layer.
+    // It's necessary to do this separately, as every with() call chains the
+    // layer into the type of the registry.
+    #[cfg(feature = "otlp")]
+    {
+        let subscriber = if cli.otlp {
+            let tracer = opentelemetry_otlp::new_pipeline()
+                .tracing()
+                .with_exporter(opentelemetry_otlp::new_exporter().tonic())
+                .with_batch_config(BatchConfig::default())
+                .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
+                    // use SdkProvidedResourceDetector.detect to detect resources,
+                    // but replace the default service name with our default.
+                    // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
+                    let resources =
+                        SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
+                    // SdkProvidedResourceDetector currently always sets
+                    // `service.name`, but we don't like its default.
+                    if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
+                        resources.merge(&Resource::new([KeyValue::new(
+                            "service.name",
+                            "tvix.store",
+                        )]))
+                    } else {
+                        resources
+                    }
+                }))
+                .install_batch(opentelemetry_sdk::runtime::Tokio)?;
+            // Create a tracing layer with the configured tracer
+            let layer = tracing_opentelemetry::layer().with_tracer(tracer);
+            subscriber.with(Some(layer))
+        } else {
+            subscriber.with(None)
+        };
+        subscriber.try_init()?;
+    }
+    // Init the registry (when otlp is not enabled)
+    #[cfg(not(feature = "otlp"))]
+    {
+        subscriber.try_init()?;
+    }
+    match cli.command {
+        Commands::Daemon {
+            listen_address,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+        } => {
+            // initialize stores
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+            let listen_address = listen_address
+                .unwrap_or_else(|| "[::]:8000".to_string())
+                .parse()
+                .unwrap();
+            let mut server = Server::builder();
+            #[allow(unused_mut)]
+            let mut router = server
+                .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
+                    blob_service,
+                )))
+                .add_service(DirectoryServiceServer::new(
+                    GRPCDirectoryServiceWrapper::new(directory_service),
+                ))
+                .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
+                    Arc::from(path_info_service),
+                    nar_calculation_service,
+                )));
+            #[cfg(feature = "tonic-reflection")]
+            {
+                let reflection_svc = tonic_reflection::server::Builder::configure()
+                    .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                    .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                    .build()?;
+                router = router.add_service(reflection_svc);
+            }
+            info!(listen_address=%listen_address, "starting daemon");
+            let listener = Listener::bind(
+                &listen_address,
+                &SystemOptions::default(),
+                &UserOptions::default(),
+            )
+            .await?;
+            router.serve_with_incoming(listener).await?;
+        }
+        Commands::Import {
+            paths,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+        } => {
+            // FUTUREWORK: allow flat for single files?
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+            // Arc PathInfoService and NarCalculationService, as we clone it .
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            let nar_calculation_service: Arc<dyn NarCalculationService> =
+                nar_calculation_service.into();
+            let root_span = {
+                let s = Span::current();
+                s.pb_set_style(&PB_PROGRESS_STYLE);
+                s.pb_set_message("Importing paths");
+                s.pb_set_length(paths.len() as u64);
+                s.pb_start();
+                s
+            };
+            let tasks = paths
+                .into_iter()
+                .map(|path| {
+                    let paths_span = root_span.clone();
+                    tokio::task::spawn({
+                        let blob_service = blob_service.clone();
+                        let directory_service = directory_service.clone();
+                        let path_info_service = path_info_service.clone();
+                        let nar_calculation_service = nar_calculation_service.clone();
+                        async move {
+                            if let Ok(name) = tvix_store::import::path_to_name(&path) {
+                                let resp = tvix_store::import::import_path_as_nar_ca(
+                                    &path,
+                                    name,
+                                    blob_service,
+                                    directory_service,
+                                    path_info_service,
+                                    nar_calculation_service,
+                                )
+                                .await;
+                                if let Ok(output_path) = resp {
+                                    // If the import was successful, print the path to stdout.
+                                    println!("{}", output_path.to_absolute_path());
+                                }
+                            }
+                            paths_span.pb_inc(1);
+                        }
+                    })
+                })
+                .collect::<Vec<_>>();
+            try_join_all(tasks).await?;
+        }
+        Commands::Copy {
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            reference_graph_path,
+        } => {
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+            // Parse the file at reference_graph_path.
+            let reference_graph_json = tokio::fs::read(&reference_graph_path).await?;
+            #[derive(Deserialize, Serialize)]
+            struct ReferenceGraph<'a> {
+                #[serde(borrow)]
+                closure: Vec<ExportedPathInfo<'a>>,
+            }
+            let reference_graph: ReferenceGraph<'_> =
+                serde_json::from_slice(reference_graph_json.as_slice())?;
+            // Arc the PathInfoService, as we clone it .
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            let lookups_span = info_span!(
+                "lookup pathinfos",
+                "indicatif.pb_show" = tracing::field::Empty
+            );
+            lookups_span.pb_set_length(reference_graph.closure.len() as u64);
+            lookups_span.pb_set_style(&PB_PROGRESS_STYLE);
+            lookups_span.pb_start();
+            // From our reference graph, lookup all pathinfos that might exist.
+            let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
+                .map(|elem| {
+                    let path_info_service = path_info_service.clone();
+                    async move {
+                        let resp = path_info_service
+                            .get(*elem.path.digest())
+                            .await
+                            .map(|resp| (elem, resp));
+                        Span::current().pb_inc(1);
+                        resp
+                    }
+                })
+                .buffer_unordered(50)
+                // Filter out all that are already uploaded.
+                // TODO: check if there's a better combinator for this
+                .try_filter_map(|(elem, path_info)| {
+                    std::future::ready(if path_info.is_none() {
+                        Ok(Some(elem))
+                    } else {
+                        Ok(None)
+                    })
+                })
+                .try_collect()
+                .await?;
+            // Run ingest_path on all of them.
+            let uploads: Vec<_> = futures::stream::iter(elems)
+                .map(|elem| {
+                    // Map to a future returning the root node, alongside with the closure info.
+                    let blob_service = blob_service.clone();
+                    let directory_service = directory_service.clone();
+                    async move {
+                        // Ingest the given path.
+                        ingest_path(
+                            blob_service,
+                            directory_service,
+                            PathBuf::from(elem.path.to_absolute_path()),
+                        )
+                        .await
+                        .map(|root_node| (elem, root_node))
+                    }
+                })
+                .buffer_unordered(10)
+                .try_collect()
+                .await?;
+            // Insert them into the PathInfoService.
+            // FUTUREWORK: do this properly respecting the reference graph.
+            for (elem, root_node) in uploads {
+                // Create and upload a PathInfo pointing to the root_node,
+                // annotated with information we have from the reference graph.
+                let path_info = PathInfo {
+                    node: Some(tvix_castore::proto::Node {
+                        node: Some(root_node),
+                    }),
+                    references: Vec::from_iter(
+                        elem.references.iter().map(|e| e.digest().to_vec().into()),
+                    ),
+                    narinfo: Some(NarInfo {
+                        nar_size: elem.nar_size,
+                        nar_sha256: elem.nar_sha256.to_vec().into(),
+                        signatures: vec![],
+                        reference_names: Vec::from_iter(
+                            elem.references.iter().map(|e| e.to_string()),
+                        ),
+                        deriver: None,
+                        ca: None,
+                    }),
+                };
+                path_info_service.put(path_info).await?;
+            }
+        }
+        #[cfg(feature = "fuse")]
+        Commands::Mount {
+            dest,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            list_root,
+            threads,
+            allow_other,
+            show_xattr,
+        } => {
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
+                let fs = make_fs(
+                    blob_service,
+                    directory_service,
+                    Arc::from(path_info_service),
+                    list_root,
+                    show_xattr,
+                );
+                info!(mount_path=?dest, "mounting");
+                FuseDaemon::new(fs, &dest, threads, allow_other)
+            })
+            .await??;
+            // grab a handle to unmount the file system, and register a signal
+            // handler.
+            tokio::spawn(async move {
+                tokio::signal::ctrl_c().await.unwrap();
+                info!("interrupt received, unmounting…");
+                tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
+                info!("unmount occured, terminating…");
+                Ok::<_, std::io::Error>(())
+            })
+            .await??;
+        }
+        #[cfg(feature = "virtiofs")]
+        Commands::VirtioFs {
+            socket,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            list_root,
+            show_xattr,
+        } => {
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+            tokio::task::spawn_blocking(move || {
+                let fs = make_fs(
+                    blob_service,
+                    directory_service,
+                    Arc::from(path_info_service),
+                    list_root,
+                    show_xattr,
+                );
+                info!(socket_path=?socket, "starting virtiofs-daemon");
+                start_virtiofs_daemon(fs, socket)
+            })
+            .await??;
+        }
+    };
+    Ok(())
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
new file mode 100644
index 0000000000..888380bca9
--- /dev/null
+++ b/tvix/store/src/import.rs
@@ -0,0 +1,183 @@
+use std::path::Path;
+use tracing::{debug, instrument};
+use tvix_castore::{
+    blobservice::BlobService, directoryservice::DirectoryService, import::fs::ingest_path,
+    proto::node::Node, B3Digest,
+use nix_compat::{
+    nixhash::{CAHash, NixHash},
+    store_path::{self, StorePath},
+use crate::{
+    nar::NarCalculationService,
+    pathinfoservice::PathInfoService,
+    proto::{nar_info, NarInfo, PathInfo},
+impl From<CAHash> for nar_info::Ca {
+    fn from(value: CAHash) -> Self {
+        let hash_type: nar_info::ca::Hash = (&value).into();
+        let digest: bytes::Bytes = value.hash().to_string().into();
+        nar_info::Ca {
+            r#type: hash_type.into(),
+            digest,
+        }
+    }
+pub fn log_node(node: &Node, path: &Path) {
+    match node {
+        Node::Directory(directory_node) => {
+            debug!(
+                path = ?path,
+                name = ?directory_node.name,
+                digest = %B3Digest::try_from(directory_node.digest.clone()).unwrap(),
+                "import successful",
+            )
+        }
+        Node::File(file_node) => {
+            debug!(
+                path = ?path,
+                name = ?file_node.name,
+                digest = %B3Digest::try_from(file_node.digest.clone()).unwrap(),
+                "import successful"
+            )
+        }
+        Node::Symlink(symlink_node) => {
+            debug!(
+                path = ?path,
+                name = ?symlink_node.name,
+                target = ?symlink_node.target,
+                "import successful"
+            )
+        }
+    }
+/// Transform a path into its base name and returns an [`std::io::Error`] if it is `..` or if the
+/// basename is not valid unicode.
+pub fn path_to_name(path: &Path) -> std::io::Result<&str> {
+    path.file_name()
+        .and_then(|file_name| file_name.to_str())
+        .ok_or_else(|| {
+            std::io::Error::new(
+                std::io::ErrorKind::InvalidInput,
+                "path must not be .. and the basename valid unicode",
+            )
+        })
+/// Takes the NAR size, SHA-256 of the NAR representation, the root node and optionally
+/// a CA hash information.
+/// Returns the path information object for a NAR-style object.
+/// This [`PathInfo`] can be further filled for signatures, deriver or verified for the expected
+/// hashes.
+pub fn derive_nar_ca_path_info(
+    nar_size: u64,
+    nar_sha256: [u8; 32],
+    ca: Option<CAHash>,
+    root_node: Node,
+) -> PathInfo {
+    // assemble the [crate::proto::PathInfo] object.
+    PathInfo {
+        node: Some(tvix_castore::proto::Node {
+            node: Some(root_node),
+        }),
+        // There's no reference scanning on path contents ingested like this.
+        references: vec![],
+        narinfo: Some(NarInfo {
+            nar_size,
+            nar_sha256: nar_sha256.to_vec().into(),
+            signatures: vec![],
+            reference_names: vec![],
+            deriver: None,
+            ca: ca.map(|ca_hash| ca_hash.into()),
+        }),
+    }
+/// Ingest the given path `path` and register the resulting output path in the
+/// [`PathInfoService`] as a recursive fixed output NAR.
+#[instrument(skip_all, fields(store_name=name, path=?path), err)]
+pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>(
+    path: P,
+    name: &str,
+    blob_service: BS,
+    directory_service: DS,
+    path_info_service: PS,
+    nar_calculation_service: NS,
+) -> Result<StorePath, std::io::Error>
+    P: AsRef<Path> + std::fmt::Debug,
+    BS: BlobService + Clone,
+    DS: DirectoryService,
+    PS: AsRef<dyn PathInfoService>,
+    NS: NarCalculationService,
+    let root_node = ingest_path(blob_service, directory_service, path.as_ref())
+        .await
+        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
+    // Ask for the NAR size and sha256
+    let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?;
+    // Calculate the output path. This might still fail, as some names are illegal.
+    // FUTUREWORK: express the `name` at the type level to be valid and move the conversion
+    // at the caller level.
+    let output_path = store_path::build_nar_based_store_path(&nar_sha256, name).map_err(|_| {
+        std::io::Error::new(
+            std::io::ErrorKind::InvalidData,
+            format!("invalid name: {}", name),
+        )
+    })?;
+    // assemble a new root_node with a name that is derived from the nar hash.
+    let root_node = root_node.rename(output_path.to_string().into_bytes().into());
+    log_node(&root_node, path.as_ref());
+    let path_info = derive_nar_ca_path_info(
+        nar_size,
+        nar_sha256,
+        Some(CAHash::Nar(NixHash::Sha256(nar_sha256))),
+        root_node,
+    );
+    // This new [`PathInfo`] that we get back from there might contain additional signatures or
+    // information set by the service itself. In this function, we silently swallow it because
+    // callers doesn't really need it.
+    let _path_info = path_info_service.as_ref().put(path_info).await?;
+    Ok(output_path.to_owned())
+mod tests {
+    use std::{ffi::OsStr, path::PathBuf};
+    use crate::import::path_to_name;
+    use rstest::rstest;
+    #[rstest]
+    #[case::simple_path("a/b/c", "c")]
+    #[case::simple_path_containing_dotdot("a/b/../c", "c")]
+    #[case::path_containing_multiple_dotdot("a/b/../c/d/../e", "e")]
+    fn test_path_to_name(#[case] path: &str, #[case] expected_name: &str) {
+        let path: PathBuf = path.into();
+        assert_eq!(path_to_name(&path).expect("must succeed"), expected_name);
+    }
+    #[rstest]
+    #[case::path_ending_in_dotdot(b"a/b/..")]
+    #[case::non_unicode_path(b"\xf8\xa1\xa1\xa1\xa1")]
+    fn test_invalid_path_to_name(#[case] invalid_path: &[u8]) {
+        let path: PathBuf = unsafe { OsStr::from_encoded_bytes_unchecked(invalid_path) }.into();
+        path_to_name(&path).expect_err("must fail");
+    }
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
new file mode 100644
index 0000000000..8c32aaf885
--- /dev/null
+++ b/tvix/store/src/lib.rs
@@ -0,0 +1,14 @@
+pub mod import;
+pub mod nar;
+pub mod pathinfoservice;
+pub mod proto;
+pub mod utils;
+mod tests;
+// That's what the rstest_reuse README asks us do, and fails about being unable
+// to find rstest_reuse in crate root.
+use rstest_reuse;
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
new file mode 100644
index 0000000000..36122d419d
--- /dev/null
+++ b/tvix/store/src/nar/import.rs
@@ -0,0 +1,237 @@
+use nix_compat::nar::reader::r#async as nar_reader;
+use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
+use tvix_castore::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    import::{
+        blobs::{self, ConcurrentBlobUploader},
+        ingest_entries, IngestionEntry, IngestionError,
+    },
+    proto::{node::Node, NamedNode},
+    PathBuf,
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// It returns the castore root node or an error.
+pub async fn ingest_nar<R, BS, DS>(
+    blob_service: BS,
+    directory_service: DS,
+    r: &mut R,
+) -> Result<Node, IngestionError<Error>>
+    R: AsyncBufRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
+    // open the NAR for reading.
+    // The NAR reader emits nodes in DFS preorder.
+    let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
+    let (tx, rx) = mpsc::channel(1);
+    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
+    let produce = async move {
+        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
+        let res = produce_nar_inner(
+            &mut blob_uploader,
+            root_node,
+            "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
+            tx.clone(),
+        )
+        .await;
+        if let Err(err) = blob_uploader.join().await {
+            tx.send(Err(err.into()))
+                .await
+                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        }
+        tx.send(res)
+            .await
+            .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        Ok(())
+    };
+    let consume = ingest_entries(directory_service, rx);
+    let (_, node) = try_join!(produce, consume)?;
+    // remove the fake "root" name again
+    debug_assert_eq!(&node.get_name(), b"root");
+    Ok(node.rename("".into()))
+async fn produce_nar_inner<BS>(
+    blob_uploader: &mut ConcurrentBlobUploader<BS>,
+    node: nar_reader::Node<'_, '_>,
+    path: PathBuf,
+    tx: mpsc::Sender<Result<IngestionEntry, Error>>,
+) -> Result<IngestionEntry, Error>
+    BS: BlobService + Clone + 'static,
+    Ok(match node {
+        nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
+        nar_reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            let size = reader.len();
+            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
+            IngestionEntry::Regular {
+                path,
+                size,
+                executable,
+                digest,
+            }
+        }
+        nar_reader::Node::Directory(mut dir_reader) => {
+            while let Some(entry) = dir_reader.next().await? {
+                let mut path = path.clone();
+                // valid NAR names are valid castore names
+                path.try_push(entry.name)
+                    .expect("Tvix bug: failed to join name");
+                let entry = Box::pin(produce_nar_inner(
+                    blob_uploader,
+                    entry.node,
+                    path,
+                    tx.clone(),
+                ))
+                .await?;
+                tx.send(Ok(entry)).await.map_err(|e| {
+                    Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
+                })?;
+            }
+            IngestionEntry::Dir { path }
+        }
+    })
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error(transparent)]
+    IO(#[from] std::io::Error),
+    #[error(transparent)]
+    BlobUpload(#[from] blobs::Error),
+mod test {
+    use crate::nar::ingest_nar;
+    use std::io::Cursor;
+    use std::sync::Arc;
+    use rstest::*;
+    use tokio_stream::StreamExt;
+    use tvix_castore::blobservice::BlobService;
+    use tvix_castore::directoryservice::DirectoryService;
+    use tvix_castore::fixtures::{
+    };
+    use tvix_castore::proto as castorepb;
+    use crate::tests::fixtures::{
+        blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD,
+    };
+    #[rstest]
+    #[tokio::test]
+    async fn single_symlink(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
+        )
+        .await
+        .expect("must parse");
+        assert_eq!(
+            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+                name: "".into(), // name must be empty
+                target: "/nix/store/somewhereelse".into(),
+            }),
+            root_node
+        );
+    }
+    #[rstest]
+    #[tokio::test]
+    async fn single_file(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
+        )
+        .await
+        .expect("must parse");
+        assert_eq!(
+            castorepb::node::Node::File(castorepb::FileNode {
+                name: "".into(), // name must be empty
+                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+                executable: false,
+            }),
+            root_node
+        );
+        // blobservice must contain the blob
+        assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap());
+    }
+    #[rstest]
+    #[tokio::test]
+    async fn complicated(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
+        )
+        .await
+        .expect("must parse");
+        assert_eq!(
+            castorepb::node::Node::Directory(castorepb::DirectoryNode {
+                name: "".into(), // name must be empty
+                digest: DIRECTORY_COMPLICATED.digest().into(),
+                size: DIRECTORY_COMPLICATED.size(),
+            }),
+            root_node,
+        );
+        // blobservice must contain the blob
+        assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
+        // directoryservice must contain the directories, at least with get_recursive.
+        let resp: Result<Vec<castorepb::Directory>, _> = directory_service
+            .get_recursive(&DIRECTORY_COMPLICATED.digest())
+            .collect()
+            .await;
+        let directories = resp.unwrap();
+        assert_eq!(2, directories.len());
+        assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
+        assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
+    }
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
new file mode 100644
index 0000000000..164748a655
--- /dev/null
+++ b/tvix/store/src/nar/mod.rs
@@ -0,0 +1,52 @@
+use tonic::async_trait;
+use tvix_castore::B3Digest;
+mod import;
+mod renderer;
+pub use import::ingest_nar;
+pub use renderer::calculate_size_and_sha256;
+pub use renderer::write_nar;
+pub use renderer::SimpleRenderer;
+use tvix_castore::proto as castorepb;
+pub trait NarCalculationService: Send + Sync {
+    /// Return the nar size and nar sha256 digest for a given root node.
+    /// This can be used to calculate NAR-based output paths.
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error>;
+impl<A> NarCalculationService for A
+    A: AsRef<dyn NarCalculationService> + Send + Sync,
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        self.as_ref().calculate_nar(root_node).await
+    }
+/// Errors that can encounter while rendering NARs.
+#[derive(Debug, thiserror::Error)]
+pub enum RenderError {
+    #[error("failure talking to a backing store client: {0}")]
+    StoreError(#[source] std::io::Error),
+    #[error("unable to find directory {}, referred from {:?}", .0, .1)]
+    DirectoryNotFound(B3Digest, bytes::Bytes),
+    #[error("unable to find blob {}, referred from {:?}", .0, .1)]
+    BlobNotFound(B3Digest, bytes::Bytes),
+    #[error("unexpected size in metadata for blob {}, referred from {:?} returned, expected {}, got {}", .0, .1, .2, .3)]
+    UnexpectedBlobMeta(B3Digest, bytes::Bytes, u32, u32),
+    #[error("failure using the NAR writer: {0}")]
+    NARWriterError(std::io::Error),
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
new file mode 100644
index 0000000000..efd67671db
--- /dev/null
+++ b/tvix/store/src/nar/renderer.rs
@@ -0,0 +1,222 @@
+use crate::utils::AsyncIoBridge;
+use super::{NarCalculationService, RenderError};
+use count_write::CountWrite;
+use nix_compat::nar::writer::r#async as nar_writer;
+use sha2::{Digest, Sha256};
+use tokio::io::{self, AsyncWrite, BufReader};
+use tonic::async_trait;
+use tvix_castore::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    proto::{self as castorepb, NamedNode},
+pub struct SimpleRenderer<BS, DS> {
+    blob_service: BS,
+    directory_service: DS,
+impl<BS, DS> SimpleRenderer<BS, DS> {
+    pub fn new(blob_service: BS, directory_service: DS) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+        }
+    }
+impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS>
+    BS: BlobService + Clone,
+    DS: DirectoryService + Clone,
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        calculate_size_and_sha256(
+            root_node,
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+        )
+        .await
+        .map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e)))
+    }
+/// Invoke [write_nar], and return the size and sha256 digest of the produced
+/// NAR output.
+pub async fn calculate_size_and_sha256<BS, DS>(
+    root_node: &castorepb::node::Node,
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(u64, [u8; 32]), RenderError>
+    BS: BlobService + Send,
+    DS: DirectoryService + Send,
+    let mut h = Sha256::new();
+    let mut cw = CountWrite::from(&mut h);
+    write_nar(
+        // The hasher doesn't speak async. It doesn't
+        // actually do any I/O, so it's fine to wrap.
+        AsyncIoBridge(&mut cw),
+        root_node,
+        blob_service,
+        directory_service,
+    )
+    .await?;
+    Ok((cw.count(), h.finalize().into()))
+/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
+/// and uses the passed blob_service and directory_service to perform the
+/// necessary lookups as it traverses the structure.
+/// The contents in NAR serialization are writen to the passed [AsyncWrite].
+pub async fn write_nar<W, BS, DS>(
+    mut w: W,
+    proto_root_node: &castorepb::node::Node,
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(), RenderError>
+    W: AsyncWrite + Unpin + Send,
+    BS: BlobService + Send,
+    DS: DirectoryService + Send,
+    // Initialize NAR writer
+    let nar_root_node = nar_writer::open(&mut w)
+        .await
+        .map_err(RenderError::NARWriterError)?;
+    walk_node(
+        nar_root_node,
+        proto_root_node,
+        blob_service,
+        directory_service,
+    )
+    .await?;
+    Ok(())
+/// Process an intermediate node in the structure.
+/// This consumes the node.
+async fn walk_node<BS, DS>(
+    nar_node: nar_writer::Node<'_, '_>,
+    proto_node: &castorepb::node::Node,
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(BS, DS), RenderError>
+    BS: BlobService + Send,
+    DS: DirectoryService + Send,
+    match proto_node {
+        castorepb::node::Node::Symlink(proto_symlink_node) => {
+            nar_node
+                .symlink(&proto_symlink_node.target)
+                .await
+                .map_err(RenderError::NARWriterError)?;
+        }
+        castorepb::node::Node::File(proto_file_node) => {
+            let digest_len = proto_file_node.digest.len();
+            let digest = proto_file_node.digest.clone().try_into().map_err(|_| {
+                RenderError::StoreError(io::Error::new(
+                    io::ErrorKind::Other,
+                    format!("invalid digest len {} in file node", digest_len),
+                ))
+            })?;
+            let mut blob_reader = match blob_service
+                .open_read(&digest)
+                .await
+                .map_err(RenderError::StoreError)?
+            {
+                Some(blob_reader) => Ok(BufReader::new(blob_reader)),
+                None => Err(RenderError::NARWriterError(io::Error::new(
+                    io::ErrorKind::NotFound,
+                    format!("blob with digest {} not found", &digest),
+                ))),
+            }?;
+            nar_node
+                .file(
+                    proto_file_node.executable,
+                    proto_file_node.size,
+                    &mut blob_reader,
+                )
+                .await
+                .map_err(RenderError::NARWriterError)?;
+        }
+        castorepb::node::Node::Directory(proto_directory_node) => {
+            let digest_len = proto_directory_node.digest.len();
+            let digest = proto_directory_node
+                .digest
+                .clone()
+                .try_into()
+                .map_err(|_| {
+                    RenderError::StoreError(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!("invalid digest len {} in directory node", digest_len),
+                    ))
+                })?;
+            // look it up with the directory service
+            match directory_service
+                .get(&digest)
+                .await
+                .map_err(|e| RenderError::StoreError(e.into()))?
+            {
+                // if it's None, that's an error!
+                None => Err(RenderError::DirectoryNotFound(
+                    digest,
+                    proto_directory_node.name.clone(),
+                ))?,
+                Some(proto_directory) => {
+                    // start a directory node
+                    let mut nar_node_directory = nar_node
+                        .directory()
+                        .await
+                        .map_err(RenderError::NARWriterError)?;
+                    // We put blob_service, directory_service back here whenever we come up from
+                    // the recursion.
+                    let mut blob_service = blob_service;
+                    let mut directory_service = directory_service;
+                    // for each node in the directory, create a new entry with its name,
+                    // and then recurse on that entry.
+                    for proto_node in proto_directory.nodes() {
+                        let child_node = nar_node_directory
+                            .entry(proto_node.get_name())
+                            .await
+                            .map_err(RenderError::NARWriterError)?;
+                        (blob_service, directory_service) = Box::pin(walk_node(
+                            child_node,
+                            &proto_node,
+                            blob_service,
+                            directory_service,
+                        ))
+                        .await?;
+                    }
+                    // close the directory
+                    nar_node_directory
+                        .close()
+                        .await
+                        .map_err(RenderError::NARWriterError)?;
+                    return Ok((blob_service, directory_service));
+                }
+            }
+        }
+    }
+    Ok((blob_service, directory_service))
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs
new file mode 100644
index 0000000000..707a686c0a
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/bigtable.rs
@@ -0,0 +1,412 @@
+use super::PathInfoService;
+use crate::proto;
+use crate::proto::PathInfo;
+use async_stream::try_stream;
+use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
+use bytes::Bytes;
+use data_encoding::HEXLOWER;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use prost::Message;
+use serde::{Deserialize, Serialize};
+use serde_with::{serde_as, DurationSeconds};
+use tonic::async_trait;
+use tracing::{instrument, trace};
+use tvix_castore::Error;
+/// There should not be more than 10 MiB in a single cell.
+/// https://cloud.google.com/bigtable/docs/schema-design#cells
+const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
+/// Provides a [DirectoryService] implementation using
+/// [Bigtable](https://cloud.google.com/bigtable/docs/)
+/// as an underlying K/V store.
+/// # Data format
+/// We use Bigtable as a plain K/V store.
+/// The row key is the digest of the store path, in hexlower.
+/// Inside the row, we currently have a single column/cell, again using the
+/// hexlower store path digest.
+/// Its value is the PathInfo message, serialized in canonical protobuf.
+/// We currently only populate this column.
+/// Listing is ranging over all rows, and calculate_nar is returning a
+/// "unimplemented" error.
+pub struct BigtablePathInfoService {
+    client: bigtable::BigTable,
+    params: BigtableParameters,
+    #[cfg(test)]
+    #[allow(dead_code)]
+    /// Holds the temporary directory containing the unix socket, and the
+    /// spawned emulator process.
+    emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
+/// Represents configuration of [BigtablePathInfoService].
+/// This currently conflates both connect parameters and data model/client
+/// behaviour parameters.
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+pub struct BigtableParameters {
+    project_id: String,
+    instance_name: String,
+    #[serde(default)]
+    is_read_only: bool,
+    #[serde(default = "default_channel_size")]
+    channel_size: usize,
+    #[serde_as(as = "Option<DurationSeconds<String>>")]
+    #[serde(default = "default_timeout")]
+    timeout: Option<std::time::Duration>,
+    table_name: String,
+    family_name: String,
+    #[serde(default = "default_app_profile_id")]
+    app_profile_id: String,
+impl BigtableParameters {
+    #[cfg(test)]
+    pub fn default_for_tests() -> Self {
+        Self {
+            project_id: "project-1".into(),
+            instance_name: "instance-1".into(),
+            is_read_only: false,
+            channel_size: default_channel_size(),
+            timeout: default_timeout(),
+            table_name: "table-1".into(),
+            family_name: "cf1".into(),
+            app_profile_id: default_app_profile_id(),
+        }
+    }
+fn default_app_profile_id() -> String {
+    "default".to_owned()
+fn default_channel_size() -> usize {
+    4
+fn default_timeout() -> Option<std::time::Duration> {
+    Some(std::time::Duration::from_secs(4))
+impl BigtablePathInfoService {
+    #[cfg(not(test))]
+    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+        let connection = bigtable::BigTableConnection::new(
+            &params.project_id,
+            &params.instance_name,
+            params.is_read_only,
+            params.channel_size,
+            params.timeout,
+        )
+        .await?;
+        Ok(Self {
+            client: connection.client(),
+            params,
+        })
+    }
+    #[cfg(test)]
+    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+        use std::time::Duration;
+        use async_process::{Command, Stdio};
+        use tempfile::TempDir;
+        use tokio_retry::{strategy::ExponentialBackoff, Retry};
+        let tmpdir = TempDir::new().unwrap();
+        let socket_path = tmpdir.path().join("cbtemulator.sock");
+        let emulator_process = Command::new("cbtemulator")
+            .arg("-address")
+            .arg(socket_path.clone())
+            .stderr(Stdio::piped())
+            .stdout(Stdio::piped())
+            .kill_on_drop(true)
+            .spawn()
+            .expect("failed to spawn emulator");
+        Retry::spawn(
+            ExponentialBackoff::from_millis(20)
+                .max_delay(Duration::from_secs(1))
+                .take(3),
+            || async {
+                if socket_path.exists() {
+                    Ok(())
+                } else {
+                    Err(())
+                }
+            },
+        )
+        .await
+        .expect("failed to wait for socket");
+        // populate the emulator
+        for cmd in &[
+            vec!["createtable", &params.table_name],
+            vec!["createfamily", &params.table_name, &params.family_name],
+        ] {
+            Command::new("cbt")
+                .args({
+                    let mut args = vec![
+                        "-instance",
+                        &params.instance_name,
+                        "-project",
+                        &params.project_id,
+                    ];
+                    args.extend_from_slice(cmd);
+                    args
+                })
+                .env(
+                    "BIGTABLE_EMULATOR_HOST",
+                    format!("unix://{}", socket_path.to_string_lossy()),
+                )
+                .output()
+                .await
+                .expect("failed to run cbt setup command");
+        }
+        let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
+            &format!("unix://{}", socket_path.to_string_lossy()),
+            &params.project_id,
+            &params.instance_name,
+            false,
+            None,
+        )?;
+        Ok(Self {
+            client: connection.client(),
+            params,
+            emulator: (tmpdir, emulator_process).into(),
+        })
+    }
+/// Derives the row/column key for a given output path.
+/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
+fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
+    HEXLOWER.encode(digest)
+impl PathInfoService for BigtablePathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let mut client = self.client.clone();
+        let path_info_key = derive_pathinfo_key(&digest);
+        let request = bigtable_v2::ReadRowsRequest {
+            app_profile_id: self.params.app_profile_id.to_string(),
+            table_name: client.get_full_table_name(&self.params.table_name),
+            rows_limit: 1,
+            rows: Some(bigtable_v2::RowSet {
+                row_keys: vec![path_info_key.clone().into()],
+                row_ranges: vec![],
+            }),
+            // Filter selected family name, and column qualifier matching the digest.
+            // The latter is to ensure we don't fail once we start adding more metadata.
+            filter: Some(bigtable_v2::RowFilter {
+                filter: Some(bigtable_v2::row_filter::Filter::Chain(
+                    bigtable_v2::row_filter::Chain {
+                        filters: vec![
+                            bigtable_v2::RowFilter {
+                                filter: Some(
+                                    bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
+                                        self.params.family_name.to_string(),
+                                    ),
+                                ),
+                            },
+                            bigtable_v2::RowFilter {
+                                filter: Some(
+                                    bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
+                                        path_info_key.clone().into(),
+                                    ),
+                                ),
+                            },
+                        ],
+                    },
+                )),
+            }),
+            ..Default::default()
+        };
+        let mut response = client
+            .read_rows(request)
+            .await
+            .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
+        if response.len() != 1 {
+            if response.len() > 1 {
+                // This shouldn't happen, we limit number of rows to 1
+                return Err(Error::StorageError(
+                    "got more than one row from bigtable".into(),
+                ));
+            }
+            // else, this is simply a "not found".
+            return Ok(None);
+        }
+        let (row_key, mut cells) = response.pop().unwrap();
+        if row_key != path_info_key.as_bytes() {
+            // This shouldn't happen, we requested this row key.
+            return Err(Error::StorageError(
+                "got wrong row key from bigtable".into(),
+            ));
+        }
+        let cell = cells
+            .pop()
+            .ok_or_else(|| Error::StorageError("found no cells".into()))?;
+        // Ensure there's only one cell (so no more left after the pop())
+        // This shouldn't happen, We filter out other cells in our query.
+        if !cells.is_empty() {
+            return Err(Error::StorageError(
+                "more than one cell returned from bigtable".into(),
+            ));
+        }
+        // We also require the qualifier to be correct in the filter above,
+        // so this shouldn't happen.
+        if path_info_key.as_bytes() != cell.qualifier {
+            return Err(Error::StorageError("unexpected cell qualifier".into()));
+        }
+        // Try to parse the value into a PathInfo message
+        let path_info = proto::PathInfo::decode(Bytes::from(cell.value))
+            .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?;
+        if store_path.digest() != &digest {
+            return Err(Error::StorageError("PathInfo has unexpected digest".into()));
+        }
+        Ok(Some(path_info))
+    }
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("pathinfo failed validation: {}", e)))?;
+        let mut client = self.client.clone();
+        let path_info_key = derive_pathinfo_key(store_path.digest());
+        let data = path_info.encode_to_vec();
+        if data.len() as u64 > CELL_SIZE_LIMIT {
+            return Err(Error::StorageError(
+                "PathInfo exceeds cell limit on Bigtable".into(),
+            ));
+        }
+        let resp = client
+            .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
+                table_name: client.get_full_table_name(&self.params.table_name),
+                app_profile_id: self.params.app_profile_id.to_string(),
+                row_key: path_info_key.clone().into(),
+                predicate_filter: Some(bigtable_v2::RowFilter {
+                    filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
+                        path_info_key.clone().into(),
+                    )),
+                }),
+                // If the column was already found, do nothing.
+                true_mutations: vec![],
+                // Else, do the insert.
+                false_mutations: vec![
+                    // https://cloud.google.com/bigtable/docs/writes
+                    bigtable_v2::Mutation {
+                        mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
+                            bigtable_v2::mutation::SetCell {
+                                family_name: self.params.family_name.to_string(),
+                                column_qualifier: path_info_key.clone().into(),
+                                timestamp_micros: -1, // use server time to fill timestamp
+                                value: data,
+                            },
+                        )),
+                    },
+                ],
+            })
+            .await
+            .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?;
+        if resp.predicate_matched {
+            trace!("already existed")
+        }
+        Ok(path_info)
+    }
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let mut client = self.client.clone();
+        let request = bigtable_v2::ReadRowsRequest {
+            app_profile_id: self.params.app_profile_id.to_string(),
+            table_name: client.get_full_table_name(&self.params.table_name),
+            filter: Some(bigtable_v2::RowFilter {
+                filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
+                    self.params.family_name.to_string(),
+                )),
+            }),
+            ..Default::default()
+        };
+        let stream = try_stream! {
+            // TODO: add pagination, we don't want to hold all of this in memory.
+            let response = client
+                .read_rows(request)
+                .await
+                .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
+            for (row_key, mut cells) in response {
+                let cell = cells
+                    .pop()
+                    .ok_or_else(|| Error::StorageError("found no cells".into()))?;
+                // The cell must have the same qualifier as the row key
+                if row_key != cell.qualifier {
+                    Err(Error::StorageError("unexpected cell qualifier".into()))?;
+                }
+                // Ensure there's only one cell (so no more left after the pop())
+                // This shouldn't happen, We filter out other cells in our query.
+                if !cells.is_empty() {
+                    Err(Error::StorageError(
+                        "more than one cell returned from bigtable".into(),
+                    ))?
+                }
+                // Try to parse the value into a PathInfo message.
+                let path_info = proto::PathInfo::decode(Bytes::from(cell.value))
+                    .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
+                // Validate the containing PathInfo, ensure its StorePath digest
+                // matches row key.
+                let store_path = path_info
+                    .validate()
+                    .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?;
+                if store_path.digest().as_slice() != row_key.as_slice() {
+                    Err(Error::StorageError("PathInfo has unexpected digest".into()))?
+                }
+                yield path_info
+            }
+        };
+        Box::pin(stream)
+    }
diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs
new file mode 100644
index 0000000000..664144ef49
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/combinators.rs
@@ -0,0 +1,111 @@
+use crate::proto::PathInfo;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use tonic::async_trait;
+use tracing::{debug, instrument};
+use tvix_castore::Error;
+use super::PathInfoService;
+/// Asks near first, if not found, asks far.
+/// If found in there, returns it, and *inserts* it into
+/// near.
+/// There is no negative cache.
+/// Inserts and listings are not implemented for now.
+pub struct Cache<PS1, PS2> {
+    near: PS1,
+    far: PS2,
+impl<PS1, PS2> Cache<PS1, PS2> {
+    pub fn new(near: PS1, far: PS2) -> Self {
+        Self { near, far }
+    }
+impl<PS1, PS2> PathInfoService for Cache<PS1, PS2>
+    PS1: PathInfoService,
+    PS2: PathInfoService,
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        match self.near.get(digest).await? {
+            Some(path_info) => {
+                debug!("serving from cache");
+                Ok(Some(path_info))
+            }
+            None => {
+                debug!("not found in near, asking remote…");
+                match self.far.get(digest).await? {
+                    None => Ok(None),
+                    Some(path_info) => {
+                        debug!("found in remote, adding to cache");
+                        self.near.put(path_info.clone()).await?;
+                        Ok(Some(path_info))
+                    }
+                }
+            }
+        }
+    }
+    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
+        Err(Error::StorageError("unimplemented".to_string()))
+    }
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        Box::pin(tokio_stream::once(Err(Error::StorageError(
+            "unimplemented".to_string(),
+        ))))
+    }
+mod test {
+    use std::num::NonZeroUsize;
+    use crate::{
+        pathinfoservice::{LruPathInfoService, MemoryPathInfoService, PathInfoService},
+        tests::fixtures::PATH_INFO_WITH_NARINFO,
+    };
+    const PATH_INFO_DIGEST: [u8; 20] = [0; 20];
+    /// Helper function setting up an instance of a "far" and "near"
+    /// PathInfoService.
+    async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, MemoryPathInfoService> {
+        // Create an instance of a "far" PathInfoService.
+        let far = MemoryPathInfoService::default();
+        // … and an instance of a "near" PathInfoService.
+        let near = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap());
+        // create a Pathinfoservice combining the two and return it.
+        super::Cache::new(near, far)
+    }
+    /// Getting from the far backend is gonna insert it into the near one.
+    #[tokio::test]
+    async fn test_populate_cache() {
+        let svc = create_pathinfoservice().await;
+        // query the PathInfo, things should not be there.
+        assert!(svc.get(PATH_INFO_DIGEST).await.unwrap().is_none());
+        // insert it into the far one.
+        svc.far.put(PATH_INFO_WITH_NARINFO.clone()).await.unwrap();
+        // now try getting it again, it should succeed.
+        assert_eq!(
+            Some(PATH_INFO_WITH_NARINFO.clone()),
+            svc.get(PATH_INFO_DIGEST).await.unwrap()
+        );
+        // peek near, it should now be there.
+        assert_eq!(
+            Some(PATH_INFO_WITH_NARINFO.clone()),
+            svc.near.get(PATH_INFO_DIGEST).await.unwrap()
+        );
+    }
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
new file mode 100644
index 0000000000..455909e7f2
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -0,0 +1,236 @@
+use crate::proto::path_info_service_client::PathInfoServiceClient;
+use super::{
+    GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService,
+    SledPathInfoService,
+use nix_compat::narinfo;
+use std::sync::Arc;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+use url::Url;
+/// Constructs a new instance of a [PathInfoService] from an URI.
+/// The following URIs are supported:
+/// - `memory:`
+///   Uses a in-memory implementation.
+/// - `sled:`
+///   Uses a in-memory sled implementation.
+/// - `sled:///absolute/path/to/somewhere`
+///   Uses sled, using a path on the disk for persistency. Can be only opened
+///   from one process at the same time.
+/// - `nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=`
+///   Exposes the Nix binary cache as a PathInfoService, ingesting NARs into the
+///   {Blob,Directory}Service. You almost certainly want to use this with some cache.
+///   The `trusted-public-keys` URL parameter can be provided, which will then
+///   enable signature verification.
+/// - `grpc+unix:///absolute/path/to/somewhere`
+///   Connects to a local tvix-store gRPC service via Unix socket.
+/// - `grpc+http://host:port`, `grpc+https://host:port`
+///    Connects to a (remote) tvix-store gRPC service.
+/// As the [PathInfoService] needs to talk to [BlobService] and [DirectoryService],
+/// these also need to be passed in.
+pub async fn from_addr(
+    uri: &str,
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) -> Result<Box<dyn PathInfoService>, Error> {
+    #[allow(unused_mut)]
+    let mut url =
+        Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
+    let path_info_service: Box<dyn PathInfoService> = match url.scheme() {
+        "memory" => {
+            // memory doesn't support host or path in the URL.
+            if url.has_host() || !url.path().is_empty() {
+                return Err(Error::StorageError("invalid url".to_string()));
+            }
+            Box::<MemoryPathInfoService>::default()
+        }
+        "sled" => {
+            // sled doesn't support host, and a path can be provided (otherwise
+            // it'll live in memory only).
+            if url.has_host() {
+                return Err(Error::StorageError("no host allowed".to_string()));
+            }
+            if url.path() == "/" {
+                return Err(Error::StorageError(
+                    "cowardly refusing to open / with sled".to_string(),
+                ));
+            }
+            // TODO: expose other parameters as URL parameters?
+            Box::new(if url.path().is_empty() {
+                SledPathInfoService::new_temporary()
+                    .map_err(|e| Error::StorageError(e.to_string()))?
+            } else {
+                SledPathInfoService::new(url.path())
+                    .map_err(|e| Error::StorageError(e.to_string()))?
+            })
+        }
+        "nix+http" | "nix+https" => {
+            // Stringify the URL and remove the nix+ prefix.
+            // We can't use `url.set_scheme(rest)`, as it disallows
+            // setting something http(s) that previously wasn't.
+            let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap();
+            let mut nix_http_path_info_service =
+                NixHTTPPathInfoService::new(new_url, blob_service, directory_service);
+            let pairs = &url.query_pairs();
+            for (k, v) in pairs.into_iter() {
+                if k == "trusted-public-keys" {
+                    let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect();
+                    let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len());
+                    for pubkey_str in pubkey_strs {
+                        pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| {
+                            Error::StorageError(format!("invalid public key: {e}"))
+                        })?);
+                    }
+                    nix_http_path_info_service.set_public_keys(pubkeys);
+                }
+            }
+            Box::new(nix_http_path_info_service)
+        }
+        scheme if scheme.starts_with("grpc+") => {
+            // schemes starting with grpc+ go to the GRPCPathInfoService.
+            //   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+            // - In the case of unix sockets, there must be a path, but may not be a host.
+            // - In the case of non-unix sockets, there must be a host, but no path.
+            // Constructing the channel is handled by tvix_castore::channel::from_url.
+            let client =
+                PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?);
+            Box::new(GRPCPathInfoService::from_client(client))
+        }
+        #[cfg(feature = "cloud")]
+        "bigtable" => {
+            use super::bigtable::BigtableParameters;
+            use super::BigtablePathInfoService;
+            // parse the instance name from the hostname.
+            let instance_name = url
+                .host_str()
+                .ok_or_else(|| Error::StorageError("instance name missing".into()))?
+                .to_string();
+            // … but add it to the query string now, so we just need to parse that.
+            url.query_pairs_mut()
+                .append_pair("instance_name", &instance_name);
+            let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
+                .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
+            Box::new(
+                BigtablePathInfoService::connect(params)
+                    .await
+                    .map_err(|e| Error::StorageError(e.to_string()))?,
+            )
+        }
+        _ => Err(Error::StorageError(format!(
+            "unknown scheme: {}",
+            url.scheme()
+        )))?,
+    };
+    Ok(path_info_service)
+mod tests {
+    use super::from_addr;
+    use lazy_static::lazy_static;
+    use rstest::rstest;
+    use std::sync::Arc;
+    use tempfile::TempDir;
+    use tvix_castore::{
+        blobservice::{BlobService, MemoryBlobService},
+        directoryservice::{DirectoryService, MemoryDirectoryService},
+    };
+    lazy_static! {
+        static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
+        static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap();
+    }
+    // the gRPC tests below don't fail, because we connect lazily.
+    #[rstest]
+    /// This uses a unsupported scheme.
+    #[case::unsupported_scheme("http://foo.example/test", false)]
+    /// This configures sled in temporary mode.
+    #[case::sled_temporary("sled://", true)]
+    /// This configures sled with /, which should fail.
+    #[case::sled_invalid_root("sled:///", false)]
+    /// This configures sled with a host, not path, which should fail.
+    #[case::sled_invalid_host("sled://foo.example", false)]
+    /// This configures sled with a valid path path, which should succeed.
+    #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)]
+    /// This configures sled with a host, and a valid path path, which should fail.
+    #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)]
+    /// This correctly sets the scheme, and doesn't set a path.
+    #[case::memory_valid("memory://", true)]
+    /// This sets a memory url host to `foo`
+    #[case::memory_invalid_host("memory://foo", false)]
+    /// This sets a memory url path to "/", which is invalid.
+    #[case::memory_invalid_root_path("memory:///", false)]
+    /// This sets a memory url path to "/foo", which is invalid.
+    #[case::memory_invalid_root_path_foo("memory:///foo", false)]
+    /// Correct Scheme for the cache.nixos.org binary cache.
+    #[case::correct_nix_https("nix+https://cache.nixos.org", true)]
+    /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL).
+    #[case::correct_nix_http("nix+http://cache.nixos.org", true)]
+    /// Correct Scheme for Nix HTTP Binary cache, with a subpath.
+    #[case::correct_nix_http_with_subpath("nix+", true)]
+    /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port.
+    #[case::correct_nix_http_with_subpath_and_port("nix+http://[::1]:8080/foo", true)]
+    /// Correct Scheme for the cache.nixos.org binary cache, and correct trusted public key set
+    #[case::correct_nix_https_with_trusted_public_key("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true)]
+    /// Correct Scheme for the cache.nixos.org binary cache, and two correct trusted public keys set
+    #[case::correct_nix_https_with_two_trusted_public_keys("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true)]
+    /// Correct scheme to connect to a unix socket.
+    #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)]
+    /// Correct scheme for unix socket, but setting a host too, which is invalid.
+    #[case::grpc_invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)]
+    /// Correct scheme to connect to localhost, with port 12345
+    #[case::grpc_valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[case::grpc_valid_http_host_without_port("grpc+http://localhost", true)]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)]
+    /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
+    #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
+    /// A valid example for Bigtable.
+    #[cfg_attr(
+        all(feature = "cloud", feature = "integration"),
+        case::bigtable_valid(
+            "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1",
+            true
+        )
+    )]
+    /// An invalid example for Bigtable, missing fields
+    #[cfg_attr(
+        all(feature = "cloud", feature = "integration"),
+        case::bigtable_invalid_missing_fields("bigtable://instance-1", false)
+    )]
+    #[tokio::test]
+    async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
+        let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
+        let directory_service: Arc<dyn DirectoryService> =
+            Arc::from(MemoryDirectoryService::default());
+        let resp = from_addr(uri_str, blob_service, directory_service).await;
+        if exp_succeed {
+            resp.expect("should succeed");
+        } else {
+            assert!(resp.is_err(), "should fail");
+        }
+    }
diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs
new file mode 100644
index 0000000000..aa64b1c01f
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/fs/mod.rs
@@ -0,0 +1,81 @@
+use futures::stream::BoxStream;
+use futures::StreamExt;
+use tonic::async_trait;
+use tvix_castore::fs::{RootNodes, TvixStoreFs};
+use tvix_castore::proto as castorepb;
+use tvix_castore::Error;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
+use super::PathInfoService;
+/// Helper to construct a [TvixStoreFs] from a [BlobService], [DirectoryService]
+/// and [PathInfoService].
+/// This avoids users to have to interact with the wrapper struct directly, as
+/// it leaks into the type signature of TvixStoreFS.
+pub fn make_fs<BS, DS, PS>(
+    blob_service: BS,
+    directory_service: DS,
+    path_info_service: PS,
+    list_root: bool,
+    show_xattr: bool,
+) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>>
+    BS: AsRef<dyn BlobService> + Send + Clone + 'static,
+    DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
+    PS: AsRef<dyn PathInfoService> + Send + Sync + Clone + 'static,
+    TvixStoreFs::new(
+        blob_service,
+        directory_service,
+        RootNodesWrapper(path_info_service),
+        list_root,
+        show_xattr,
+    )
+/// Wrapper to satisfy Rust's orphan rules for trait implementations, as
+/// RootNodes is coming from the [tvix-castore] crate.
+#[derive(Clone, Debug)]
+pub struct RootNodesWrapper<T>(pub(crate) T);
+/// Implements root node lookup for any [PathInfoService]. This represents a flat
+/// directory structure like /nix/store where each entry in the root filesystem
+/// directory corresponds to a CA node.
+#[cfg(any(feature = "fuse", feature = "virtiofs"))]
+impl<T> RootNodes for RootNodesWrapper<T>
+    T: AsRef<dyn PathInfoService> + Send + Sync,
+    async fn get_by_basename(&self, name: &[u8]) -> Result<Option<castorepb::node::Node>, Error> {
+        let Ok(store_path) = nix_compat::store_path::StorePath::from_bytes(name) else {
+            return Ok(None);
+        };
+        Ok(self
+            .0
+            .as_ref()
+            .get(*store_path.digest())
+            .await?
+            .map(|path_info| {
+                path_info
+                    .node
+                    .expect("missing root node")
+                    .node
+                    .expect("empty node")
+            }))
+    }
+    fn list(&self) -> BoxStream<Result<castorepb::node::Node, Error>> {
+        Box::pin(self.0.as_ref().list().map(|result| {
+            result.map(|path_info| {
+                path_info
+                    .node
+                    .expect("missing root node")
+                    .node
+                    .expect("empty node")
+            })
+        }))
+    }
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
new file mode 100644
index 0000000000..f6a356cf18
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -0,0 +1,154 @@
+use super::PathInfoService;
+use crate::{
+    nar::NarCalculationService,
+    proto::{self, ListPathInfoRequest, PathInfo},
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use tonic::{async_trait, transport::Channel, Code};
+use tracing::instrument;
+use tvix_castore::{proto as castorepb, Error};
+/// Connects to a (remote) tvix-store PathInfoService over gRPC.
+pub struct GRPCPathInfoService {
+    /// The internal reference to a gRPC client.
+    /// Cloning it is cheap, and it internally handles concurrent requests.
+    grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+impl GRPCPathInfoService {
+    /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient].
+    /// panics if called outside the context of a tokio runtime.
+    pub fn from_client(
+        grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+    ) -> Self {
+        Self { grpc_client }
+    }
+impl PathInfoService for GRPCPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let path_info = self
+            .grpc_client
+            .clone()
+            .get(proto::GetPathInfoRequest {
+                by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
+                    digest.to_vec().into(),
+                )),
+            })
+            .await;
+        match path_info {
+            Ok(path_info) => {
+                let path_info = path_info.into_inner();
+                path_info
+                    .validate()
+                    .map_err(|e| Error::StorageError(format!("invalid pathinfo: {}", e)))?;
+                Ok(Some(path_info))
+            }
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(Error::StorageError(e.to_string())),
+        }
+    }
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        let path_info = self
+            .grpc_client
+            .clone()
+            .put(path_info)
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .into_inner();
+        Ok(path_info)
+    }
+    #[instrument(level = "trace", skip_all)]
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let mut grpc_client = self.grpc_client.clone();
+        let stream = try_stream! {
+            let resp = grpc_client.list(ListPathInfoRequest::default()).await;
+            let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner();
+            loop {
+                match stream.message().await {
+                    Ok(o) => match o {
+                        Some(pathinfo) => {
+                            // validate the pathinfo
+                            if let Err(e) = pathinfo.validate() {
+                                Err(Error::StorageError(format!(
+                                    "pathinfo {:?} failed validation: {}",
+                                    pathinfo, e
+                                )))?;
+                            }
+                            yield pathinfo
+                        }
+                        None => {
+                            return;
+                        },
+                    },
+                    Err(e) => Err(Error::StorageError(e.to_string()))?,
+                }
+            }
+        };
+        Box::pin(stream)
+    }
+impl NarCalculationService for GRPCPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        let path_info = self
+            .grpc_client
+            .clone()
+            .calculate_nar(castorepb::Node {
+                node: Some(root_node.clone()),
+            })
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .into_inner();
+        let nar_sha256: [u8; 32] = path_info
+            .nar_sha256
+            .to_vec()
+            .try_into()
+            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
+        Ok((path_info.nar_size, nar_sha256))
+    }
+mod tests {
+    use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
+    use crate::pathinfoservice::PathInfoService;
+    use crate::tests::fixtures;
+    /// This ensures connecting via gRPC works as expected.
+    #[tokio::test]
+    async fn test_valid_unix_path_ping_pong() {
+        let (_blob_service, _directory_service, path_info_service) =
+            make_grpc_path_info_service_client().await;
+        let path_info = path_info_service
+            .get(fixtures::DUMMY_PATH_DIGEST)
+            .await
+            .expect("must not be error");
+        assert!(path_info.is_none());
+    }
diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs
new file mode 100644
index 0000000000..da674f497a
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/lru.rs
@@ -0,0 +1,128 @@
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use lru::LruCache;
+use nix_compat::nixbase32;
+use std::num::NonZeroUsize;
+use std::sync::Arc;
+use tokio::sync::RwLock;
+use tonic::async_trait;
+use tracing::instrument;
+use crate::proto::PathInfo;
+use tvix_castore::Error;
+use super::PathInfoService;
+pub struct LruPathInfoService {
+    lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>,
+impl LruPathInfoService {
+    pub fn with_capacity(capacity: NonZeroUsize) -> Self {
+        Self {
+            lru: Arc::new(RwLock::new(LruCache::new(capacity))),
+        }
+    }
+impl PathInfoService for LruPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        Ok(self.lru.write().await.get(&digest).cloned())
+    }
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // call validate
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("invalid PathInfo: {}", e)))?;
+        self.lru
+            .write()
+            .await
+            .put(*store_path.digest(), path_info.clone());
+        Ok(path_info)
+    }
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let lru = self.lru.clone();
+        Box::pin(try_stream! {
+            let lru = lru.read().await;
+            let it = lru.iter();
+            for (_k,v) in it {
+                yield v.clone()
+            }
+        })
+    }
+mod test {
+    use std::num::NonZeroUsize;
+    use crate::{
+        pathinfoservice::{LruPathInfoService, PathInfoService},
+        proto::PathInfo,
+        tests::fixtures::PATH_INFO_WITH_NARINFO,
+    };
+    use lazy_static::lazy_static;
+    use tvix_castore::proto as castorepb;
+    lazy_static! {
+        static ref PATHINFO_1: PathInfo = PATH_INFO_WITH_NARINFO.clone();
+        static ref PATHINFO_1_DIGEST: [u8; 20] = [0; 20];
+        static ref PATHINFO_2: PathInfo = {
+            let mut p = PATHINFO_1.clone();
+            let root_node = p.node.as_mut().unwrap();
+            if let castorepb::Node { node: Some(node) } = root_node {
+                let n = node.to_owned();
+                *node = n.rename("11111111111111111111111111111111-dummy2".into());
+            } else {
+                unreachable!()
+            }
+            p
+        };
+        static ref PATHINFO_2_DIGEST: [u8; 20] = *(PATHINFO_2.validate().unwrap()).digest();
+    }
+    #[tokio::test]
+    async fn evict() {
+        let svc = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap());
+        // pathinfo_1 should not be there
+        assert!(svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .is_none());
+        // insert it
+        svc.put(PATHINFO_1.clone()).await.expect("no error");
+        // now it should be there.
+        assert_eq!(
+            Some(PATHINFO_1.clone()),
+            svc.get(*PATHINFO_1_DIGEST).await.expect("no error")
+        );
+        // insert pathinfo_2. This will evict pathinfo 1
+        svc.put(PATHINFO_2.clone()).await.expect("no error");
+        // now pathinfo 2 should be there.
+        assert_eq!(
+            Some(PATHINFO_2.clone()),
+            svc.get(*PATHINFO_2_DIGEST).await.expect("no error")
+        );
+        // … but pathinfo 1 not anymore.
+        assert!(svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .is_none());
+    }
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
new file mode 100644
index 0000000000..3de3221df2
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -0,0 +1,61 @@
+use super::PathInfoService;
+use crate::proto::PathInfo;
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use std::{collections::HashMap, sync::Arc};
+use tokio::sync::RwLock;
+use tonic::async_trait;
+use tracing::instrument;
+use tvix_castore::Error;
+pub struct MemoryPathInfoService {
+    db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>,
+impl PathInfoService for MemoryPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let db = self.db.read().await;
+        match db.get(&digest) {
+            None => Ok(None),
+            Some(path_info) => Ok(Some(path_info.clone())),
+        }
+    }
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Call validate on the received PathInfo message.
+        match path_info.validate() {
+            Err(e) => Err(Error::InvalidRequest(format!(
+                "failed to validate PathInfo: {}",
+                e
+            ))),
+            // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
+            // This overwrites existing PathInfo objects.
+            Ok(nix_path) => {
+                let mut db = self.db.write().await;
+                db.insert(*nix_path.digest(), path_info.clone());
+                Ok(path_info)
+            }
+        }
+    }
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let db = self.db.clone();
+        Box::pin(try_stream! {
+            let db = db.read().await;
+            let it = db.iter();
+            for (_k, v) in it {
+                yield v.clone()
+            }
+        })
+    }
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
new file mode 100644
index 0000000000..574bcc0b8b
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -0,0 +1,73 @@
+mod combinators;
+mod from_addr;
+mod grpc;
+mod lru;
+mod memory;
+mod nix_http;
+mod sled;
+#[cfg(any(feature = "fuse", feature = "virtiofs"))]
+mod fs;
+mod tests;
+use futures::stream::BoxStream;
+use tonic::async_trait;
+use tvix_castore::Error;
+use crate::proto::PathInfo;
+pub use self::combinators::Cache as CachePathInfoService;
+pub use self::from_addr::from_addr;
+pub use self::grpc::GRPCPathInfoService;
+pub use self::lru::LruPathInfoService;
+pub use self::memory::MemoryPathInfoService;
+pub use self::nix_http::NixHTTPPathInfoService;
+pub use self::sled::SledPathInfoService;
+#[cfg(feature = "cloud")]
+mod bigtable;
+#[cfg(feature = "cloud")]
+pub use self::bigtable::BigtablePathInfoService;
+#[cfg(any(feature = "fuse", feature = "virtiofs"))]
+pub use self::fs::make_fs;
+/// The base trait all PathInfo services need to implement.
+pub trait PathInfoService: Send + Sync {
+    /// Retrieve a PathInfo message by the output digest.
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>;
+    /// Store a PathInfo message. Implementations MUST call validate and reject
+    /// invalid messages.
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>;
+    /// Iterate over all PathInfo objects in the store.
+    /// Implementations can decide to disallow listing.
+    ///
+    /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
+    /// and the box allows different underlying stream implementations to be returned since
+    /// Rust doesn't support this as a generic in traits yet. This is the same thing that
+    /// [async_trait] generates, but for streams instead of futures.
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>;
+impl<A> PathInfoService for A
+    A: AsRef<dyn PathInfoService> + Send + Sync,
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        self.as_ref().get(digest).await
+    }
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        self.as_ref().put(path_info).await
+    }
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        self.as_ref().list()
+    }
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
new file mode 100644
index 0000000000..cccd4805c6
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -0,0 +1,266 @@
+use futures::{stream::BoxStream, TryStreamExt};
+use nix_compat::{
+    narinfo::{self, NarInfo},
+    nixbase32,
+    nixhash::NixHash,
+use reqwest::StatusCode;
+use sha2::Digest;
+use std::io::{self, Write};
+use tokio::io::{AsyncRead, BufReader};
+use tokio_util::io::InspectReader;
+use tonic::async_trait;
+use tracing::{debug, instrument, warn};
+use tvix_castore::{
+    blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
+use crate::proto::PathInfo;
+use super::PathInfoService;
+/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
+/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
+/// Store Model.
+/// It implements the [PathInfoService] trait in an interesting way:
+/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file,
+/// inserting components into a [BlobService] and [DirectoryService], then
+/// returning a [PathInfo] struct with the root.
+/// Due to this being quite a costly operation, clients are expected to layer
+/// this service with store composition, so they're only ingested once.
+/// The client is expected to be (indirectly) using the same [BlobService] and
+/// [DirectoryService], so able to fetch referred Directories and Blobs.
+/// [PathInfoService::put] is not implemented and returns an error if called.
+/// TODO: what about reading from nix-cache-info?
+pub struct NixHTTPPathInfoService<BS, DS> {
+    base_url: url::Url,
+    http_client: reqwest::Client,
+    blob_service: BS,
+    directory_service: DS,
+    /// An optional list of [narinfo::PubKey].
+    /// If set, the .narinfo files received need to have correct signature by at least one of these.
+    public_keys: Option<Vec<narinfo::PubKey>>,
+impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
+    pub fn new(base_url: url::Url, blob_service: BS, directory_service: DS) -> Self {
+        Self {
+            base_url,
+            http_client: reqwest::Client::new(),
+            blob_service,
+            directory_service,
+            public_keys: None,
+        }
+    }
+    /// Configures [Self] to validate NARInfo fingerprints with the public keys passed.
+    pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::PubKey>) {
+        self.public_keys = Some(public_keys);
+    }
+impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
+    BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
+    DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
+    #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let narinfo_url = self
+            .base_url
+            .join(&format!("{}.narinfo", nixbase32::encode(&digest)))
+            .map_err(|e| {
+                warn!(e = %e, "unable to join URL");
+                io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
+            })?;
+        debug!(narinfo_url= %narinfo_url, "constructed NARInfo url");
+        let resp = self
+            .http_client
+            .get(narinfo_url)
+            .send()
+            .await
+            .map_err(|e| {
+                warn!(e=%e,"unable to send NARInfo request");
+                io::Error::new(
+                    io::ErrorKind::InvalidInput,
+                    "unable to send NARInfo request",
+                )
+            })?;
+        // In the case of a 404, return a NotFound.
+        // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix,
+        // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org.
+        if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
+            return Ok(None);
+        }
+        let narinfo_str = resp.text().await.map_err(|e| {
+            warn!(e=%e,"unable to decode response as string");
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "unable to decode response as string",
+            )
+        })?;
+        // parse the received narinfo
+        let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| {
+            warn!(e=%e,"unable to parse response as NarInfo");
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "unable to parse response as NarInfo",
+            )
+        })?;
+        // if [self.public_keys] is set, ensure there's at least one valid signature.
+        if let Some(public_keys) = &self.public_keys {
+            let fingerprint = narinfo.fingerprint();
+            if !public_keys.iter().any(|pubkey| {
+                narinfo
+                    .signatures
+                    .iter()
+                    .any(|sig| pubkey.verify(&fingerprint, sig))
+            }) {
+                warn!("no valid signature found");
+                Err(io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    "no valid signature found",
+                ))?;
+            }
+        }
+        // Convert to a (sparse) PathInfo. We still need to populate the node field,
+        // and for this we need to download the NAR file.
+        // FUTUREWORK: Keep some database around mapping from narsha256 to
+        // (unnamed) rootnode, so we can use that (and the name from the
+        // StorePath) and avoid downloading the same NAR a second time.
+        let pathinfo: PathInfo = (&narinfo).into();
+        // create a request for the NAR file itself.
+        let nar_url = self.base_url.join(narinfo.url).map_err(|e| {
+            warn!(e = %e, "unable to join URL");
+            io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
+        })?;
+        debug!(nar_url= %nar_url, "constructed NAR url");
+        let resp = self
+            .http_client
+            .get(nar_url.clone())
+            .send()
+            .await
+            .map_err(|e| {
+                warn!(e=%e,"unable to send NAR request");
+                io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request")
+            })?;
+        // if the request is not successful, return an error.
+        if !resp.status().is_success() {
+            return Err(Error::StorageError(format!(
+                "unable to retrieve NAR at {}, status {}",
+                nar_url,
+                resp.status()
+            )));
+        }
+        // get a reader of the response body.
+        let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
+            let e = e.without_url();
+            warn!(e=%e, "failed to get response body");
+            io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+        }));
+        // handle decompression, depending on the compression field.
+        let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
+            Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
+            Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some(comp_str) => {
+                return Err(Error::StorageError(format!(
+                    "unsupported compression: {comp_str}"
+                )));
+            }
+        };
+        let mut nar_hash = sha2::Sha256::new();
+        let mut nar_size = 0;
+        // Assemble NarHash and NarSize as we read bytes.
+        let r = InspectReader::new(r, |b| {
+            nar_size += b.len() as u64;
+            nar_hash.write_all(b).unwrap();
+        });
+        // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors.
+        let mut r = BufReader::new(r);
+        let root_node = crate::nar::ingest_nar(
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+            &mut r,
+        )
+        .await
+        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
+        // ensure the ingested narhash and narsize do actually match.
+        if narinfo.nar_size != nar_size {
+            warn!(
+                narinfo.nar_size = narinfo.nar_size,
+                http.nar_size = nar_size,
+                "NarSize mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarSize mismatch".to_string(),
+            ))?;
+        }
+        let nar_hash: [u8; 32] = nar_hash.finalize().into();
+        if narinfo.nar_hash != nar_hash {
+            warn!(
+                narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
+                http.nar_hash = %NixHash::Sha256(nar_hash),
+                "NarHash mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarHash mismatch".to_string(),
+            ))?;
+        }
+        Ok(Some(PathInfo {
+            node: Some(castorepb::Node {
+                // set the name of the root node to the digest-name of the store path.
+                node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
+            }),
+            references: pathinfo.references,
+            narinfo: pathinfo.narinfo,
+        }))
+    }
+    #[instrument(skip_all, fields(path_info=?_path_info))]
+    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
+        Err(Error::InvalidRequest(
+            "put not supported for this backend".to_string(),
+        ))
+    }
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        Box::pin(futures::stream::once(async {
+            Err(Error::InvalidRequest(
+                "list not supported for this backend".to_string(),
+            ))
+        }))
+    }
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
new file mode 100644
index 0000000000..eb3cf2ff1b
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -0,0 +1,117 @@
+use super::PathInfoService;
+use crate::proto::PathInfo;
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use prost::Message;
+use std::path::Path;
+use tonic::async_trait;
+use tracing::instrument;
+use tracing::warn;
+use tvix_castore::Error;
+/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
+/// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
+/// as that's currently the only request type available.
+pub struct SledPathInfoService {
+    db: sled::Db,
+impl SledPathInfoService {
+    pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
+        let config = sled::Config::default()
+            .use_compression(false) // is a required parameter
+            .path(p);
+        let db = config.open()?;
+        Ok(Self { db })
+    }
+    pub fn new_temporary() -> Result<Self, sled::Error> {
+        let config = sled::Config::default().temporary(true);
+        let db = config.open()?;
+        Ok(Self { db })
+    }
+impl PathInfoService for SledPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let resp = tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            move || db.get(digest.as_slice())
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to retrieve PathInfo: {}", e);
+            Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+        })?;
+        match resp {
+            None => Ok(None),
+            Some(data) => {
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
+                    warn!("failed to decode stored PathInfo: {}", e);
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+                Ok(Some(path_info))
+            }
+        }
+    }
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Call validate on the received PathInfo message.
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?;
+        // In case the PathInfo is valid, we were able to parse a StorePath.
+        // Store it in the database, keyed by its digest.
+        // This overwrites existing PathInfo objects.
+        tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            let k = *store_path.digest();
+            let data = path_info.encode_to_vec();
+            move || db.insert(k, data)
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to insert PathInfo: {}", e);
+            Error::StorageError(format! {
+                "failed to insert PathInfo: {}", e
+            })
+        })?;
+        Ok(path_info)
+    }
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let db = self.db.clone();
+        let mut it = db.iter().values();
+        Box::pin(try_stream! {
+            // Don't block the executor while waiting for .next(), so wrap that
+            // in a spawn_blocking call.
+            // We need to pass around it to be able to reuse it.
+            while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || {
+                (it.next(), it)
+            }).await? {
+                it = new_it;
+                let data = elem.map_err(|e| {
+                    warn!("failed to retrieve PathInfo: {}", e);
+                    Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+                })?;
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
+                    warn!("failed to decode stored PathInfo: {}", e);
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+                yield path_info
+            }
+        })
+    }
diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs
new file mode 100644
index 0000000000..061655e4ba
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/tests/mod.rs
@@ -0,0 +1,72 @@
+//! This contains test scenarios that a given [PathInfoService] needs to pass.
+//! We use [rstest] and [rstest_reuse] to provide all services we want to test
+//! against, and then apply this template to all test functions.
+use rstest::*;
+use rstest_reuse::{self, *};
+use super::PathInfoService;
+use crate::pathinfoservice::MemoryPathInfoService;
+use crate::pathinfoservice::SledPathInfoService;
+use crate::proto::PathInfo;
+use crate::tests::fixtures::DUMMY_PATH_DIGEST;
+use tvix_castore::proto as castorepb;
+mod utils;
+pub use self::utils::make_grpc_path_info_service_client;
+#[cfg(all(feature = "cloud", feature = "integration"))]
+use self::utils::make_bigtable_path_info_service;
+    let (_, _, svc) = make_grpc_path_info_service_client().await;
+    svc
+#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))]
+pub fn path_info_services(#[case] svc: impl PathInfoService) {}
+// FUTUREWORK: add more tests rejecting invalid PathInfo messages.
+// A subset of them should also ensure references to other PathInfos, or
+// elements in {Blob,Directory}Service do exist.
+/// Trying to get a non-existent PathInfo should return Ok(None).
+async fn not_found(svc: impl PathInfoService) {
+    assert!(svc
+        .get(DUMMY_PATH_DIGEST)
+        .await
+        .expect("must succeed")
+        .is_none());
+/// Put a PathInfo into the store, get it back.
+async fn put_get(svc: impl PathInfoService) {
+    let path_info = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+                name: "00000000000000000000000000000000-foo".into(),
+                target: "doesntmatter".into(),
+            })),
+        }),
+        ..Default::default()
+    };
+    // insert
+    let resp = svc.put(path_info.clone()).await.expect("must succeed");
+    // expect the returned PathInfo to be equal (for now)
+    // in the future, some stores might add additional fields/signatures.
+    assert_eq!(path_info, resp);
+    // get it back
+    let resp = svc.get(DUMMY_PATH_DIGEST).await.expect("must succeed");
+    assert_eq!(Some(path_info), resp);
diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs
new file mode 100644
index 0000000000..ee170468d1
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/tests/utils.rs
@@ -0,0 +1,75 @@
+use std::sync::Arc;
+use tonic::transport::{Endpoint, Server, Uri};
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
+use crate::{
+    nar::{NarCalculationService, SimpleRenderer},
+    pathinfoservice::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService},
+    proto::{
+        path_info_service_client::PathInfoServiceClient,
+        path_info_service_server::PathInfoServiceServer, GRPCPathInfoServiceWrapper,
+    },
+    tests::fixtures::{blob_service, directory_service},
+/// Constructs and returns a gRPC PathInfoService.
+/// We also return memory-based {Blob,Directory}Service,
+/// as the consumer of this function accepts a 3-tuple.
+pub async fn make_grpc_path_info_service_client(
+) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) {
+    let (left, right) = tokio::io::duplex(64);
+    let blob_service = blob_service();
+    let directory_service = directory_service();
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn({
+        let blob_service = blob_service.clone();
+        let directory_service = directory_service.clone();
+        async move {
+            let path_info_service: Arc<dyn PathInfoService> =
+                Arc::from(MemoryPathInfoService::default());
+            let nar_calculation_service =
+                Box::new(SimpleRenderer::new(blob_service, directory_service))
+                    as Box<dyn NarCalculationService>;
+            // spin up a new PathInfoService
+            let mut server = Server::builder();
+            let router = server.add_service(PathInfoServiceServer::new(
+                GRPCPathInfoServiceWrapper::new(path_info_service, nar_calculation_service),
+            ));
+            router
+                .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+                .await
+        }
+    });
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+    let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    ));
+    (blob_service, directory_service, path_info_service)
+#[cfg(all(feature = "cloud", feature = "integration"))]
+pub(crate) async fn make_bigtable_path_info_service(
+) -> crate::pathinfoservice::BigtablePathInfoService {
+    use crate::pathinfoservice::bigtable::BigtableParameters;
+    use crate::pathinfoservice::BigtablePathInfoService;
+    BigtablePathInfoService::connect(BigtableParameters::default_for_tests())
+        .await
+        .unwrap()
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
new file mode 100644
index 0000000000..68f5575676
--- /dev/null
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -0,0 +1,124 @@
+use crate::nar::{NarCalculationService, RenderError};
+use crate::pathinfoservice::PathInfoService;
+use crate::proto;
+use futures::{stream::BoxStream, TryStreamExt};
+use std::ops::Deref;
+use tonic::{async_trait, Request, Response, Result, Status};
+use tracing::{instrument, warn};
+use tvix_castore::proto as castorepb;
+pub struct GRPCPathInfoServiceWrapper<PS, NS> {
+    path_info_service: PS,
+    // FUTUREWORK: allow exposing without allowing listing
+    nar_calculation_service: NS,
+impl<PS, NS> GRPCPathInfoServiceWrapper<PS, NS> {
+    pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self {
+        Self {
+            path_info_service,
+            nar_calculation_service,
+        }
+    }
+impl<PS, NS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS>
+    PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
+    NS: NarCalculationService + Send + Sync + 'static,
+    type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
+    #[instrument(skip_all)]
+    async fn get(
+        &self,
+        request: Request<proto::GetPathInfoRequest>,
+    ) -> Result<Response<proto::PathInfo>> {
+        match request.into_inner().by_what {
+            None => Err(Status::unimplemented("by_what needs to be specified")),
+            Some(proto::get_path_info_request::ByWhat::ByOutputHash(output_digest)) => {
+                let digest: [u8; 20] = output_digest
+                    .to_vec()
+                    .try_into()
+                    .map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
+                match self.path_info_service.get(digest).await {
+                    Ok(None) => Err(Status::not_found("PathInfo not found")),
+                    Ok(Some(path_info)) => Ok(Response::new(path_info)),
+                    Err(e) => {
+                        warn!(err = %e, "failed to get PathInfo");
+                        Err(e.into())
+                    }
+                }
+            }
+        }
+    }
+    #[instrument(skip_all)]
+    async fn put(&self, request: Request<proto::PathInfo>) -> Result<Response<proto::PathInfo>> {
+        let path_info = request.into_inner();
+        // Store the PathInfo in the client. Clients MUST validate the data
+        // they receive, so we don't validate additionally here.
+        match self.path_info_service.put(path_info).await {
+            Ok(path_info_new) => Ok(Response::new(path_info_new)),
+            Err(e) => {
+                warn!(err = %e, "failed to put PathInfo");
+                Err(e.into())
+            }
+        }
+    }
+    #[instrument(skip_all)]
+    async fn calculate_nar(
+        &self,
+        request: Request<castorepb::Node>,
+    ) -> Result<Response<proto::CalculateNarResponse>> {
+        match request.into_inner().node {
+            None => Err(Status::invalid_argument("no root node sent")),
+            Some(root_node) => {
+                if let Err(e) = root_node.validate() {
+                    warn!(err = %e, "invalid root node");
+                    Err(Status::invalid_argument("invalid root node"))?
+                }
+                match self.nar_calculation_service.calculate_nar(&root_node).await {
+                    Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
+                        nar_size,
+                        nar_sha256: nar_sha256.to_vec().into(),
+                    })),
+                    Err(e) => {
+                        warn!(err = %e, "error during NAR calculation");
+                        Err(e.into())
+                    }
+                }
+            }
+        }
+    }
+    #[instrument(skip_all, err)]
+    async fn list(
+        &self,
+        _request: Request<proto::ListPathInfoRequest>,
+    ) -> Result<Response<Self::ListStream>, Status> {
+        let stream = Box::pin(
+            self.path_info_service
+                .list()
+                .map_err(|e| Status::internal(e.to_string())),
+        );
+        Ok(Response::new(Box::pin(stream)))
+    }
+impl From<RenderError> for tonic::Status {
+    fn from(value: RenderError) -> Self {
+        match value {
+            RenderError::BlobNotFound(_, _) => Self::not_found(value.to_string()),
+            RenderError::DirectoryNotFound(_, _) => Self::not_found(value.to_string()),
+            RenderError::NARWriterError(_) => Self::internal(value.to_string()),
+            RenderError::StoreError(_) => Self::internal(value.to_string()),
+            RenderError::UnexpectedBlobMeta(_, _, _, _) => Self::internal(value.to_string()),
+        }
+    }
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
new file mode 100644
index 0000000000..a09839c8bd
--- /dev/null
+++ b/tvix/store/src/proto/mod.rs
@@ -0,0 +1,374 @@
+#![allow(clippy::derive_partial_eq_without_eq, non_snake_case)]
+use bstr::ByteSlice;
+use bytes::Bytes;
+use data_encoding::BASE64;
+// https://github.com/hyperium/tonic/issues/1056
+use nix_compat::{
+    narinfo::Flags,
+    nixhash::{CAHash, NixHash},
+    store_path::{self, StorePathRef},
+use thiserror::Error;
+use tvix_castore::proto::{self as castorepb, NamedNode, ValidateNodeError};
+mod grpc_pathinfoservice_wrapper;
+pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper;
+#[cfg(feature = "tonic-reflection")]
+/// Compiled file descriptors for implementing [gRPC
+/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g.
+/// [`tonic_reflection`](https://docs.rs/tonic-reflection).
+pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.store.v1");
+mod tests;
+/// Errors that can occur during the validation of PathInfo messages.
+#[derive(Debug, Error, PartialEq)]
+pub enum ValidatePathInfoError {
+    /// Invalid length of a reference
+    #[error("Invalid length of digest at position {}, expected {}, got {}", .0, store_path::DIGEST_SIZE, .1)]
+    InvalidReferenceDigestLen(usize, usize),
+    /// No node present
+    #[error("No node present")]
+    NoNodePresent,
+    /// Node fails validation
+    #[error("Invalid root node: {:?}", .0.to_string())]
+    InvalidRootNode(ValidateNodeError),
+    /// Invalid node name encountered. Root nodes in PathInfos have more strict name requirements
+    #[error("Failed to parse {} as StorePath: {1}", .0.to_str_lossy())]
+    InvalidNodeName(Vec<u8>, store_path::Error),
+    /// The digest in narinfo.nar_sha256 has an invalid len.
+    #[error("Invalid narinfo.nar_sha256 length: expected {}, got {}", 32, .0)]
+    InvalidNarSha256DigestLen(usize),
+    /// The number of references in the narinfo.reference_names field does not match
+    /// the number of references in the .references field.
+    #[error("Inconsistent Number of References: {0} (references) vs {1} (narinfo)")]
+    InconsistentNumberOfReferences(usize, usize),
+    /// A string in narinfo.reference_names does not parse to a [store_path::StorePath].
+    #[error("Invalid reference_name at position {0}: {1}")]
+    InvalidNarinfoReferenceName(usize, String),
+    /// The digest in the parsed `.narinfo.reference_names[i]` does not match
+    /// the one in `.references[i]`.`
+    #[error("digest in reference_name at position {} does not match digest in PathInfo, expected {}, got {}", .0, BASE64.encode(.1), BASE64.encode(.2))]
+    InconsistentNarinfoReferenceNameDigest(
+        usize,
+        [u8; store_path::DIGEST_SIZE],
+        [u8; store_path::DIGEST_SIZE],
+    ),
+    /// The deriver field is invalid.
+    #[error("deriver field is invalid: {0}")]
+    InvalidDeriverField(store_path::Error),
+/// Parses a root node name.
+/// On success, this returns the parsed [store_path::StorePathRef].
+/// On error, it returns an error generated from the supplied constructor.
+fn parse_node_name_root<E>(
+    name: &[u8],
+    err: fn(Vec<u8>, store_path::Error) -> E,
+) -> Result<store_path::StorePathRef<'_>, E> {
+    store_path::StorePathRef::from_bytes(name).map_err(|e| err(name.to_vec(), e))
+impl PathInfo {
+    /// validate performs some checks on the PathInfo struct,
+    /// Returning either a [store_path::StorePath] of the root node, or a
+    /// [ValidatePathInfoError].
+    pub fn validate(&self) -> Result<store_path::StorePathRef<'_>, ValidatePathInfoError> {
+        // ensure the references have the right number of bytes.
+        for (i, reference) in self.references.iter().enumerate() {
+            if reference.len() != store_path::DIGEST_SIZE {
+                return Err(ValidatePathInfoError::InvalidReferenceDigestLen(
+                    i,
+                    reference.len(),
+                ));
+            }
+        }
+        // If there is a narinfo field populated…
+        if let Some(narinfo) = &self.narinfo {
+            // ensure the nar_sha256 digest has the correct length.
+            if narinfo.nar_sha256.len() != 32 {
+                return Err(ValidatePathInfoError::InvalidNarSha256DigestLen(
+                    narinfo.nar_sha256.len(),
+                ));
+            }
+            // ensure the number of references there matches PathInfo.references count.
+            if narinfo.reference_names.len() != self.references.len() {
+                return Err(ValidatePathInfoError::InconsistentNumberOfReferences(
+                    self.references.len(),
+                    narinfo.reference_names.len(),
+                ));
+            }
+            // parse references in reference_names.
+            for (i, reference_name_str) in narinfo.reference_names.iter().enumerate() {
+                // ensure thy parse as (non-absolute) store path
+                let reference_names_store_path = store_path::StorePath::from_bytes(
+                    reference_name_str.as_bytes(),
+                )
+                .map_err(|_| {
+                    ValidatePathInfoError::InvalidNarinfoReferenceName(
+                        i,
+                        reference_name_str.to_owned(),
+                    )
+                })?;
+                // ensure their digest matches the one at self.references[i].
+                {
+                    // This is safe, because we ensured the proper length earlier already.
+                    let reference_digest = self.references[i].to_vec().try_into().unwrap();
+                    if reference_names_store_path.digest() != &reference_digest {
+                        return Err(
+                            ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest(
+                                i,
+                                reference_digest,
+                                *reference_names_store_path.digest(),
+                            ),
+                        );
+                    }
+                }
+                // If the Deriver field is populated, ensure it parses to a
+                // [store_path::StorePath].
+                // We can't check for it to *not* end with .drv, as the .drv files produced by
+                // recursive Nix end with multiple .drv suffixes, and only one is popped when
+                // converting to this field.
+                if let Some(deriver) = &narinfo.deriver {
+                    store_path::StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest)
+                        .map_err(ValidatePathInfoError::InvalidDeriverField)?;
+                }
+            }
+        }
+        // Ensure there is a (root) node present, and it properly parses to a [store_path::StorePath].
+        let root_nix_path = match &self.node {
+            None | Some(castorepb::Node { node: None }) => {
+                Err(ValidatePathInfoError::NoNodePresent)?
+            }
+            Some(castorepb::Node { node: Some(node) }) => {
+                node.validate()
+                    .map_err(ValidatePathInfoError::InvalidRootNode)?;
+                // parse the name of the node itself and return
+                parse_node_name_root(node.get_name(), ValidatePathInfoError::InvalidNodeName)?
+            }
+        };
+        // return the root nix path
+        Ok(root_nix_path)
+    }
+    /// With self and its store path name, this reconstructs a
+    /// [nix_compat::narinfo::NarInfo<'_>].
+    /// It can be used to validate Signatures, or get back a (sparse) NarInfo
+    /// struct to prepare writing it out.
+    ///
+    /// It assumes self to be validated first, and will only return None if the
+    /// `narinfo` field is unpopulated.
+    ///
+    /// It does very little allocation (a Vec each for `signatures` and
+    /// `references`), the rest points to data owned elsewhere.
+    ///
+    /// Keep in mind this is not able to reconstruct all data present in the
+    /// NarInfo<'_>, as some of it is not stored at all:
+    /// - the `system`, `file_hash` and `file_size` fields are set to `None`.
+    /// - the URL is set to an empty string.
+    /// - Compression is set to "none"
+    ///
+    /// If you want to render it out to a string and be able to parse it back
+    /// in, at least URL *must* be set again.
+    pub fn to_narinfo<'a>(
+        &'a self,
+        store_path: store_path::StorePathRef<'a>,
+    ) -> Option<nix_compat::narinfo::NarInfo<'_>> {
+        let narinfo = &self.narinfo.as_ref()?;
+        Some(nix_compat::narinfo::NarInfo {
+            flags: Flags::empty(),
+            store_path,
+            nar_hash: narinfo
+                .nar_sha256
+                .as_ref()
+                .try_into()
+                .expect("invalid narhash"),
+            nar_size: narinfo.nar_size,
+            references: narinfo
+                .reference_names
+                .iter()
+                .map(|ref_name| {
+                    // This shouldn't pass validation
+                    StorePathRef::from_bytes(ref_name.as_bytes()).expect("invalid reference")
+                })
+                .collect(),
+            signatures: narinfo
+                .signatures
+                .iter()
+                .map(|sig| {
+                    nix_compat::narinfo::Signature::new(
+                        &sig.name,
+                        // This shouldn't pass validation
+                        sig.data[..].try_into().expect("invalid signature len"),
+                    )
+                })
+                .collect(),
+            ca: narinfo
+                .ca
+                .as_ref()
+                .map(|ca| ca.try_into().expect("invalid ca")),
+            system: None,
+            deriver: narinfo.deriver.as_ref().map(|deriver| {
+                StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest)
+                    .expect("invalid deriver")
+            }),
+            url: "",
+            compression: Some("none"),
+            file_hash: None,
+            file_size: None,
+        })
+    }
+/// Errors that can occur when converting from a [nar_info::Ca] to a (stricter)
+/// [nix_compat::nixhash::CAHash].
+#[derive(Debug, Error, PartialEq)]
+pub enum ConvertCAError {
+    /// Invalid length of a reference
+    #[error("Invalid digest length '{0}' for type {1}")]
+    InvalidReferenceDigestLen(usize, &'static str),
+    /// Unknown Hash type
+    #[error("Unknown hash type: {0}")]
+    UnknownHashType(i32),
+impl TryFrom<&nar_info::Ca> for nix_compat::nixhash::CAHash {
+    type Error = ConvertCAError;
+    fn try_from(value: &nar_info::Ca) -> Result<Self, Self::Error> {
+        Ok(match value.r#type {
+            typ if typ == nar_info::ca::Hash::FlatMd5 as i32 => {
+                Self::Flat(NixHash::Md5(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatMd5")
+                })?))
+            }
+            typ if typ == nar_info::ca::Hash::FlatSha1 as i32 => {
+                Self::Flat(NixHash::Sha1(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha1"),
+                )?))
+            }
+            typ if typ == nar_info::ca::Hash::FlatSha256 as i32 => {
+                Self::Flat(NixHash::Sha256(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha256"),
+                )?))
+            }
+            typ if typ == nar_info::ca::Hash::FlatSha512 as i32 => Self::Flat(NixHash::Sha512(
+                Box::new(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha512")
+                })?),
+            )),
+            typ if typ == nar_info::ca::Hash::NarMd5 as i32 => {
+                Self::Nar(NixHash::Md5(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarMd5")
+                })?))
+            }
+            typ if typ == nar_info::ca::Hash::NarSha1 as i32 => {
+                Self::Nar(NixHash::Sha1(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha1"),
+                )?))
+            }
+            typ if typ == nar_info::ca::Hash::NarSha256 as i32 => {
+                Self::Nar(NixHash::Sha256(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha256"),
+                )?))
+            }
+            typ if typ == nar_info::ca::Hash::NarSha512 as i32 => Self::Nar(NixHash::Sha512(
+                Box::new(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha512")
+                })?),
+            )),
+            typ => return Err(ConvertCAError::UnknownHashType(typ)),
+        })
+    }
+impl From<&nix_compat::nixhash::CAHash> for nar_info::ca::Hash {
+    fn from(value: &nix_compat::nixhash::CAHash) -> Self {
+        match value {
+            CAHash::Flat(NixHash::Md5(_)) => nar_info::ca::Hash::FlatMd5,
+            CAHash::Flat(NixHash::Sha1(_)) => nar_info::ca::Hash::FlatSha1,
+            CAHash::Flat(NixHash::Sha256(_)) => nar_info::ca::Hash::FlatSha256,
+            CAHash::Flat(NixHash::Sha512(_)) => nar_info::ca::Hash::FlatSha512,
+            CAHash::Nar(NixHash::Md5(_)) => nar_info::ca::Hash::NarMd5,
+            CAHash::Nar(NixHash::Sha1(_)) => nar_info::ca::Hash::NarSha1,
+            CAHash::Nar(NixHash::Sha256(_)) => nar_info::ca::Hash::NarSha256,
+            CAHash::Nar(NixHash::Sha512(_)) => nar_info::ca::Hash::NarSha512,
+            CAHash::Text(_) => nar_info::ca::Hash::TextSha256,
+        }
+    }
+impl From<&nix_compat::nixhash::CAHash> for nar_info::Ca {
+    fn from(value: &nix_compat::nixhash::CAHash) -> Self {
+        nar_info::Ca {
+            r#type: Into::<nar_info::ca::Hash>::into(value) as i32,
+            digest: value.hash().digest_as_bytes().to_vec().into(),
+        }
+    }
+impl From<&nix_compat::narinfo::NarInfo<'_>> for NarInfo {
+    /// Converts from a NarInfo (returned from the NARInfo parser) to the proto-
+    /// level NarInfo struct.
+    fn from(value: &nix_compat::narinfo::NarInfo<'_>) -> Self {
+        let signatures = value
+            .signatures
+            .iter()
+            .map(|sig| nar_info::Signature {
+                name: sig.name().to_string(),
+                data: Bytes::copy_from_slice(sig.bytes()),
+            })
+            .collect();
+        NarInfo {
+            nar_size: value.nar_size,
+            nar_sha256: Bytes::copy_from_slice(&value.nar_hash),
+            signatures,
+            reference_names: value.references.iter().map(|r| r.to_string()).collect(),
+            deriver: value.deriver.as_ref().map(|sp| StorePath {
+                name: sp.name().to_owned(),
+                digest: Bytes::copy_from_slice(sp.digest()),
+            }),
+            ca: value.ca.as_ref().map(|ca| ca.into()),
+        }
+    }
+impl From<&nix_compat::narinfo::NarInfo<'_>> for PathInfo {
+    /// Converts from a NarInfo (returned from the NARInfo parser) to a PathInfo
+    /// struct with the node set to None.
+    fn from(value: &nix_compat::narinfo::NarInfo<'_>) -> Self {
+        Self {
+            node: None,
+            references: value
+                .references
+                .iter()
+                .map(|x| Bytes::copy_from_slice(x.digest()))
+                .collect(),
+            narinfo: Some(value.into()),
+        }
+    }
diff --git a/tvix/store/src/proto/tests/mod.rs b/tvix/store/src/proto/tests/mod.rs
new file mode 100644
index 0000000000..c9c6702027
--- /dev/null
+++ b/tvix/store/src/proto/tests/mod.rs
@@ -0,0 +1 @@
+mod pathinfo;
diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs
new file mode 100644
index 0000000000..4d0834878d
--- /dev/null
+++ b/tvix/store/src/proto/tests/pathinfo.rs
@@ -0,0 +1,431 @@
+use crate::proto::{nar_info::Signature, NarInfo, PathInfo, ValidatePathInfoError};
+use crate::tests::fixtures::*;
+use bytes::Bytes;
+use data_encoding::BASE64;
+use nix_compat::nixbase32;
+use nix_compat::store_path::{self, StorePathRef};
+use rstest::rstest;
+use tvix_castore::proto as castorepb;
+#[case::no_node(None, Err(ValidatePathInfoError::NoNodePresent))]
+#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::NoNodePresent))]
+fn validate_pathinfo(
+    #[case] node: Option<castorepb::Node>,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+) {
+    // construct the PathInfo object
+    let p = PathInfo {
+        node,
+        ..Default::default()
+    };
+    assert_eq!(exp_result, p.validate());
+    let err = p.validate().expect_err("validation should fail");
+    assert!(matches!(err, ValidatePathInfoError::NoNodePresent));
+#[case::ok(castorepb::DirectoryNode {
+        name: DUMMY_PATH.into(),
+        digest: DUMMY_DIGEST.clone().into(),
+        size: 0,
+}, Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))]
+#[case::invalid_digest_length(castorepb::DirectoryNode {
+        name: DUMMY_PATH.into(),
+        digest: Bytes::new(),
+        size: 0,
+}, Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))))]
+#[case::invalid_node_name_no_storepath(castorepb::DirectoryNode {
+        name: "invalid".into(),
+        digest: DUMMY_DIGEST.clone().into(),
+        size: 0,
+}, Err(ValidatePathInfoError::InvalidNodeName(
+        "invalid".into(),
+        store_path::Error::InvalidLength
+fn validate_directory(
+    #[case] directory_node: castorepb::DirectoryNode,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+) {
+    // construct the PathInfo object
+    let p = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::Directory(directory_node)),
+        }),
+        ..Default::default()
+    };
+    assert_eq!(exp_result, p.validate());
+    castorepb::FileNode {
+        name: DUMMY_PATH.into(),
+        digest: DUMMY_DIGEST.clone().into(),
+        size: 0,
+        executable: false,
+    },
+    Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
+    castorepb::FileNode {
+        name: DUMMY_PATH.into(),
+        digest: Bytes::new(),
+        ..Default::default()
+    },
+    Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0)))
+    castorepb::FileNode {
+        name: "invalid".into(),
+        digest: DUMMY_DIGEST.clone().into(),
+        ..Default::default()
+    },
+    Err(ValidatePathInfoError::InvalidNodeName(
+        "invalid".into(),
+        store_path::Error::InvalidLength
+    ))
+fn validate_file(
+    #[case] file_node: castorepb::FileNode,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+) {
+    // construct the PathInfo object
+    let p = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::File(file_node)),
+        }),
+        ..Default::default()
+    };
+    assert_eq!(exp_result, p.validate());
+    castorepb::SymlinkNode {
+        name: DUMMY_PATH.into(),
+        target: "foo".into(),
+    },
+    Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
+    castorepb::SymlinkNode {
+        name: "invalid".into(),
+        target: "foo".into(),
+    },
+    Err(ValidatePathInfoError::InvalidNodeName(
+        "invalid".into(),
+        store_path::Error::InvalidLength
+    ))
+fn validate_symlink(
+    #[case] symlink_node: castorepb::SymlinkNode,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+) {
+    // construct the PathInfo object
+    let p = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::Symlink(symlink_node)),
+        }),
+        ..Default::default()
+    };
+    assert_eq!(exp_result, p.validate());
+/// Ensure parsing a correct PathInfo without narinfo populated succeeds.
+fn validate_references_without_narinfo_ok() {
+    assert!(PATH_INFO_WITHOUT_NARINFO.validate().is_ok());
+/// Ensure parsing a correct PathInfo with narinfo populated succeeds.
+fn validate_references_with_narinfo_ok() {
+    assert!(PATH_INFO_WITH_NARINFO.validate().is_ok());
+/// Create a PathInfo with a wrong digest length in narinfo.nar_sha256, and
+/// ensure validation fails.
+fn validate_wrong_nar_sha256() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+    path_info.narinfo.as_mut().unwrap().nar_sha256 = vec![0xbe, 0xef].into();
+    match path_info.validate().expect_err("must_fail") {
+        ValidatePathInfoError::InvalidNarSha256DigestLen(2) => {}
+        e => panic!("unexpected error: {:?}", e),
+    };
+/// Create a PathInfo with a wrong count of narinfo.reference_names,
+/// and ensure validation fails.
+fn validate_inconsistent_num_refs_fail() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+    path_info.narinfo.as_mut().unwrap().reference_names = vec![];
+    match path_info.validate().expect_err("must_fail") {
+        ValidatePathInfoError::InconsistentNumberOfReferences(1, 0) => {}
+        e => panic!("unexpected error: {:?}", e),
+    };
+/// Create a PathInfo with a wrong digest length in references.
+fn validate_invalid_reference_digest_len() {
+    let mut path_info = PATH_INFO_WITHOUT_NARINFO.clone();
+    path_info.references.push(vec![0xff, 0xff].into());
+    match path_info.validate().expect_err("must fail") {
+        ValidatePathInfoError::InvalidReferenceDigestLen(
+            1, // position
+            2, // unexpected digest len
+        ) => {}
+        e => panic!("unexpected error: {:?}", e),
+    };
+/// Create a PathInfo with a narinfo.reference_name[1] that is no valid store path.
+fn validate_invalid_narinfo_reference_name() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+    // This is invalid, as the store prefix is not part of reference_names.
+    path_info.narinfo.as_mut().unwrap().reference_names[0] =
+        "/nix/store/00000000000000000000000000000000-dummy".to_string();
+    match path_info.validate().expect_err("must fail") {
+        ValidatePathInfoError::InvalidNarinfoReferenceName(0, reference_name) => {
+            assert_eq!(
+                "/nix/store/00000000000000000000000000000000-dummy",
+                reference_name
+            );
+        }
+        e => panic!("unexpected error: {:?}", e),
+    }
+/// Create a PathInfo with a narinfo.reference_name[0] that doesn't match references[0].
+fn validate_inconsistent_narinfo_reference_name_digest() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+    // mutate the first reference, they were all zeroes before
+    path_info.references[0] = vec![0xff; store_path::DIGEST_SIZE].into();
+    match path_info.validate().expect_err("must fail") {
+        ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest(0, e_expected, e_actual) => {
+            assert_eq!(path_info.references[0][..], e_expected[..]);
+            assert_eq!(DUMMY_PATH_DIGEST, e_actual);
+        }
+        e => panic!("unexpected error: {:?}", e),
+    }
+/// Create a node with an empty symlink target, and ensure it fails validation.
+fn validate_symlink_empty_target_invalid() {
+    let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+        name: "foo".into(),
+        target: "".into(),
+    });
+    node.validate().expect_err("must fail validation");
+/// Create a node with a symlink target including null bytes, and ensure it
+/// fails validation.
+fn validate_symlink_target_null_byte_invalid() {
+    let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+        name: "foo".into(),
+        target: "foo\0".into(),
+    });
+    node.validate().expect_err("must fail validation");
+/// Create a PathInfo with a correct deriver field and ensure it succeeds.
+fn validate_valid_deriver() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+    // add a valid deriver
+    let narinfo = path_info.narinfo.as_mut().unwrap();
+    narinfo.deriver = Some(crate::proto::StorePath {
+        name: "foo".to_string(),
+        digest: Bytes::from(DUMMY_PATH_DIGEST.as_slice()),
+    });
+    path_info.validate().expect("must validate");
+/// Create a PathInfo with a broken deriver field and ensure it fails.
+fn validate_invalid_deriver() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+    // add a broken deriver (invalid digest)
+    let narinfo = path_info.narinfo.as_mut().unwrap();
+    narinfo.deriver = Some(crate::proto::StorePath {
+        name: "foo".to_string(),
+        digest: vec![].into(),
+    });
+    match path_info.validate().expect_err("must fail validation") {
+        ValidatePathInfoError::InvalidDeriverField(_) => {}
+        e => panic!("unexpected error: {:?}", e),
+    }
+fn from_nixcompat_narinfo() {
+    let narinfo_parsed = nix_compat::narinfo::NarInfo::parse(
+        r#"StorePath: /nix/store/s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1
+URL: nar/1nhgq6wcggx0plpy4991h3ginj6hipsdslv4fd4zml1n707j26yq.nar.xz
+Compression: xz
+FileHash: sha256:1nhgq6wcggx0plpy4991h3ginj6hipsdslv4fd4zml1n707j26yq
+FileSize: 50088
+NarHash: sha256:0yzhigwjl6bws649vcs2asa4lbs8hg93hyix187gc7s7a74w5h80
+NarSize: 226488
+References: 3n58xw4373jp0ljirf06d8077j15pc4j-glibc-2.37-8 s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1
+Deriver: ib3sh3pcz10wsmavxvkdbayhqivbghlq-hello-2.12.1.drv
+Sig: cache.nixos.org-1:8ijECciSFzWHwwGVOIVYdp2fOIOJAfmzGHPQVwpktfTQJF6kMPPDre7UtFw3o+VqenC5P8RikKOAAfN7CvPEAg=="#).expect("must parse");
+    assert_eq!(
+        PathInfo {
+            node: None,
+            references: vec![
+                Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("3n58xw4373jp0ljirf06d8077j15pc4j").unwrap()),
+                Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("s66mzxpvicwk07gjbjfw9izjfa797vsw").unwrap()),
+            ],
+            narinfo: Some(
+                NarInfo {
+                    nar_size: 226488,
+                    nar_sha256: Bytes::copy_from_slice(
+                        &nixbase32::decode_fixed::<32>("0yzhigwjl6bws649vcs2asa4lbs8hg93hyix187gc7s7a74w5h80".as_bytes())
+                            .unwrap()
+                    ),
+                    signatures: vec![Signature {
+                        name: "cache.nixos.org-1".to_string(),
+                        data: BASE64.decode("8ijECciSFzWHwwGVOIVYdp2fOIOJAfmzGHPQVwpktfTQJF6kMPPDre7UtFw3o+VqenC5P8RikKOAAfN7CvPEAg==".as_bytes()).unwrap().into(),
+                    }],
+                    reference_names: vec![
+                        "3n58xw4373jp0ljirf06d8077j15pc4j-glibc-2.37-8".to_string(),
+                        "s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1".to_string()
+                    ],
+                    deriver: Some(crate::proto::StorePath {
+                        digest: Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("ib3sh3pcz10wsmavxvkdbayhqivbghlq").unwrap()),
+                        name: "hello-2.12.1".to_string(),
+                     }),
+                    ca: None,
+                }
+            )
+        },
+        (&narinfo_parsed).into(),
+    );
+fn from_nixcompat_narinfo_fod() {
+    let narinfo_parsed = nix_compat::narinfo::NarInfo::parse(
+        r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz
+URL: nar/1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r.nar.xz
+Compression: xz
+FileHash: sha256:1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r
+FileSize: 1033524
+NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh
+NarSize: 1033416
+Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv
+Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==
+CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"#
+    ).expect("must parse");
+    assert_eq!(
+        PathInfo {
+            node: None,
+            references: vec![],
+            narinfo: Some(
+                NarInfo {
+                    nar_size: 1033416,
+                    nar_sha256: Bytes::copy_from_slice(
+                        &nixbase32::decode_fixed::<32>(
+                            "1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh"
+                        )
+                        .unwrap()
+                    ),
+                    signatures: vec![Signature {
+                        name: "cache.nixos.org-1".to_string(),
+                        data: BASE64
+                            .decode("ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==".as_bytes())
+                            .unwrap()
+                            .into(),
+                    }],
+                    reference_names: vec![],
+                    deriver: Some(crate::proto::StorePath {
+                        digest: Bytes::copy_from_slice(
+                            &nixbase32::decode_fixed::<20>("dyivpmlaq2km6c11i0s6bi6mbsx0ylqf").unwrap()
+                        ),
+                        name: "hello-2.12.1.tar.gz".to_string(),
+                    }),
+                    ca: Some(crate::proto::nar_info::Ca {
+                        r#type: crate::proto::nar_info::ca::Hash::FlatSha256.into(),
+                        digest: Bytes::copy_from_slice(
+                            &nixbase32::decode_fixed::<32>(
+                                "086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"
+                            )
+                            .unwrap()
+                        )
+                    }),
+                }
+            ),
+        },
+        (&narinfo_parsed).into()
+    );
+/// Exercise .as_narinfo() on a PathInfo and ensure important fields are preserved..
+fn as_narinfo() {
+    let narinfo_parsed = nix_compat::narinfo::NarInfo::parse(
+        r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz
+URL: nar/1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r.nar.xz
+Compression: xz
+FileHash: sha256:1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r
+FileSize: 1033524
+NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh
+NarSize: 1033416
+Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv
+Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==
+CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"#
+    ).expect("must parse");
+    let path_info: PathInfo = (&narinfo_parsed).into();
+    let mut narinfo_returned = path_info
+        .to_narinfo(
+            StorePathRef::from_bytes(b"pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz")
+                .expect("invalid storepath"),
+        )
+        .expect("must be some");
+    narinfo_returned.url = "some.nar";
+    assert_eq!(
+        r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz
+URL: some.nar
+Compression: none
+NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh
+NarSize: 1033416
+Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv
+Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==
+CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd
+        narinfo_returned.to_string(),
+    );
diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs
new file mode 100644
index 0000000000..1c8359a2c0
--- /dev/null
+++ b/tvix/store/src/tests/fixtures.rs
@@ -0,0 +1,142 @@
+use lazy_static::lazy_static;
+use rstest::*;
+use std::sync::Arc;
+pub use tvix_castore::fixtures::*;
+use tvix_castore::{
+    blobservice::{BlobService, MemoryBlobService},
+    directoryservice::{DirectoryService, MemoryDirectoryService},
+    proto as castorepb,
+use crate::proto::{
+    nar_info::{ca, Ca},
+    NarInfo, PathInfo,
+pub const DUMMY_PATH: &str = "00000000000000000000000000000000-dummy";
+pub const DUMMY_PATH_DIGEST: [u8; 20] = [0; 20];
+lazy_static! {
+    /// The NAR representation of a symlink pointing to `/nix/store/somewhereelse`
+    pub static ref NAR_CONTENTS_SYMLINK: Vec<u8> = vec![
+        13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0,
+        0, 0, // "nix-archive-1"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink"
+        6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target
+        24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o',
+        b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's',
+        b'e', // "/nix/store/somewhereelse"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")"
+    ];
+    /// The NAR representation of a regular file with the contents "Hello World!"
+    pub static ref NAR_CONTENTS_HELLOWORLD: Vec<u8> = vec![
+        13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0,
+        0, 0, // "nix-archive-1"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular"
+        8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents"
+        12, 0, 0, 0, 0, 0, 0, 0, b'H', b'e', b'l', b'l', b'o', b' ', b'W', b'o', b'r', b'l', b'd', b'!', 0, 0,
+        0, 0, // "Hello World!"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")"
+    ];
+    /// The NAR representation of a more complicated directory structure.
+    pub static ref NAR_CONTENTS_COMPLICATED: Vec<u8> = vec![
+        13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0,
+        0, 0, // "nix-archive-1"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory"
+        5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name"
+        5, 0, 0, 0, 0, 0, 0, 0, b'.', b'k', b'e', b'e', b'p', 0, 0, 0, // ".keep"
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular"
+        8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents"
+        0, 0, 0, 0, 0, 0, 0, 0, // ""
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name"
+        2, 0, 0, 0, 0, 0, 0, 0, b'a', b'a', 0, 0, 0, 0, 0, 0, // "aa"
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink"
+        6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target
+        24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o',
+        b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's',
+        b'e', // "/nix/store/somewhereelse"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name"
+        4, 0, 0, 0, 0, 0, 0, 0, b'k', b'e', b'e', b'p', 0, 0, 0, 0, // "keep"
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory"
+        5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name"
+        5, 0, 0, 0, 0, 0, 0, 0, 46, 107, 101, 101, 112, 0, 0, 0, // ".keep"
+        4, 0, 0, 0, 0, 0, 0, 0, 110, 111, 100, 101, 0, 0, 0, 0, // "node"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular"
+        8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents"
+        0, 0, 0, 0, 0, 0, 0, 0, // ""
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+    ];
+    /// A PathInfo message without .narinfo populated.
+    pub static ref PATH_INFO_WITHOUT_NARINFO : PathInfo = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode {
+                name: DUMMY_PATH.into(),
+                digest: DUMMY_DIGEST.clone().into(),
+                size: 0,
+            })),
+        }),
+        references: vec![DUMMY_PATH_DIGEST.as_slice().into()],
+        narinfo: None,
+    };
+    /// A PathInfo message with .narinfo populated.
+    /// The references in `narinfo.reference_names` aligns with what's in
+    /// `references`.
+    pub static ref PATH_INFO_WITH_NARINFO : PathInfo = PathInfo {
+        narinfo: Some(NarInfo {
+            nar_size: 0,
+            nar_sha256: DUMMY_DIGEST.clone().into(),
+            signatures: vec![],
+            reference_names: vec![DUMMY_PATH.to_string()],
+            deriver: None,
+            ca: Some(Ca { r#type: ca::Hash::NarSha256.into(), digest:  DUMMY_DIGEST.clone().into() })
+        }),
+    };
+pub(crate) fn blob_service() -> Arc<dyn BlobService> {
+    Arc::from(MemoryBlobService::default())
+pub(crate) fn directory_service() -> Arc<dyn DirectoryService> {
+    Arc::from(MemoryDirectoryService::default())
diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs
new file mode 100644
index 0000000000..1e7fc3f6b4
--- /dev/null
+++ b/tvix/store/src/tests/mod.rs
@@ -0,0 +1,2 @@
+pub mod fixtures;
+mod nar_renderer;
diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs
new file mode 100644
index 0000000000..8bfb5a72bb
--- /dev/null
+++ b/tvix/store/src/tests/nar_renderer.rs
@@ -0,0 +1,228 @@
+use crate::nar::calculate_size_and_sha256;
+use crate::nar::write_nar;
+use crate::tests::fixtures::blob_service;
+use crate::tests::fixtures::directory_service;
+use crate::tests::fixtures::*;
+use rstest::*;
+use sha2::{Digest, Sha256};
+use std::io;
+use std::sync::Arc;
+use tokio::io::sink;
+use tvix_castore::blobservice::BlobService;
+use tvix_castore::directoryservice::DirectoryService;
+use tvix_castore::proto as castorepb;
+async fn single_symlink(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    let mut buf: Vec<u8> = vec![];
+    write_nar(
+        &mut buf,
+        &castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+            name: "doesntmatter".into(),
+            target: "/nix/store/somewhereelse".into(),
+        }),
+        // don't put anything in the stores, as we don't actually do any requests.
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect("must succeed");
+    assert_eq!(buf, NAR_CONTENTS_SYMLINK.to_vec());
+/// Make sure the NARRenderer fails if a referred blob doesn't exist.
+async fn single_file_missing_blob(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    let e = write_nar(
+        sink(),
+        &castorepb::node::Node::File(castorepb::FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+            executable: false,
+        }),
+        // the blobservice is empty intentionally, to provoke the error.
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect_err("must fail");
+    match e {
+        crate::nar::RenderError::NARWriterError(e) => {
+            assert_eq!(io::ErrorKind::NotFound, e.kind());
+        }
+        _ => panic!("unexpected error: {:?}", e),
+    }
+/// Make sure the NAR Renderer fails if the returned blob meta has another size
+/// than specified in the proto node.
+async fn single_file_wrong_blob_size(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    // insert blob into the store
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(
+        &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()),
+        &mut writer,
+    )
+    .await
+    .unwrap();
+    assert_eq!(
+        HELLOWORLD_BLOB_DIGEST.clone(),
+        writer.close().await.unwrap()
+    );
+    // Test with a root FileNode of a too big size
+    let e = write_nar(
+        sink(),
+        &castorepb::node::Node::File(castorepb::FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: 42, // <- note the wrong size here!
+            executable: false,
+        }),
+        blob_service.clone(),
+        directory_service.clone(),
+    )
+    .await
+    .expect_err("must fail");
+    match e {
+        crate::nar::RenderError::NARWriterError(e) => {
+            assert_eq!(io::ErrorKind::UnexpectedEof, e.kind());
+        }
+        _ => panic!("unexpected error: {:?}", e),
+    }
+    // Test with a root FileNode of a too small size
+    let e = write_nar(
+        sink(),
+        &castorepb::node::Node::File(castorepb::FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: 2, // <- note the wrong size here!
+            executable: false,
+        }),
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect_err("must fail");
+    match e {
+        crate::nar::RenderError::NARWriterError(e) => {
+            assert_eq!(io::ErrorKind::InvalidInput, e.kind());
+        }
+        _ => panic!("unexpected error: {:?}", e),
+    }
+async fn single_file(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    // insert blob into the store
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(&mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS), &mut writer)
+        .await
+        .unwrap();
+    assert_eq!(
+        HELLOWORLD_BLOB_DIGEST.clone(),
+        writer.close().await.unwrap()
+    );
+    let mut buf: Vec<u8> = vec![];
+    write_nar(
+        &mut buf,
+        &castorepb::node::Node::File(castorepb::FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+            executable: false,
+        }),
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect("must succeed");
+    assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec());
+async fn test_complicated(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    // put all data into the stores.
+    // insert blob into the store
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(&mut io::Cursor::new(EMPTY_BLOB_CONTENTS), &mut writer)
+        .await
+        .unwrap();
+    assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap());
+    // insert directories
+    directory_service
+        .put(DIRECTORY_WITH_KEEP.clone())
+        .await
+        .unwrap();
+    directory_service
+        .put(DIRECTORY_COMPLICATED.clone())
+        .await
+        .unwrap();
+    let mut buf: Vec<u8> = vec![];
+    write_nar(
+        &mut buf,
+        &castorepb::node::Node::Directory(castorepb::DirectoryNode {
+            name: "doesntmatter".into(),
+            digest: DIRECTORY_COMPLICATED.digest().into(),
+            size: DIRECTORY_COMPLICATED.size(),
+        }),
+        blob_service.clone(),
+        directory_service.clone(),
+    )
+    .await
+    .expect("must succeed");
+    assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec());
+    // ensure calculate_nar does return the correct sha256 digest and sum.
+    let (nar_size, nar_digest) = calculate_size_and_sha256(
+        &castorepb::node::Node::Directory(castorepb::DirectoryNode {
+            name: "doesntmatter".into(),
+            digest: DIRECTORY_COMPLICATED.digest().into(),
+            size: DIRECTORY_COMPLICATED.size(),
+        }),
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect("must succeed");
+    assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size);
+    let d = Sha256::digest(NAR_CONTENTS_COMPLICATED.clone());
+    assert_eq!(d.as_slice(), nar_digest);
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs
new file mode 100644
index 0000000000..e6e42f6ec4
--- /dev/null
+++ b/tvix/store/src/utils.rs
@@ -0,0 +1,78 @@
+use std::sync::Arc;
+use std::{
+    pin::Pin,
+    task::{self, Poll},
+use tokio::io::{self, AsyncWrite};
+use tvix_castore::{
+    blobservice::{self, BlobService},
+    directoryservice::{self, DirectoryService},
+use crate::nar::{NarCalculationService, SimpleRenderer};
+use crate::pathinfoservice::{self, PathInfoService};
+/// Construct the store handles from their addrs.
+pub async fn construct_services(
+    blob_service_addr: impl AsRef<str>,
+    directory_service_addr: impl AsRef<str>,
+    path_info_service_addr: impl AsRef<str>,
+) -> std::io::Result<(
+    Arc<dyn BlobService>,
+    Arc<dyn DirectoryService>,
+    Box<dyn PathInfoService>,
+    Box<dyn NarCalculationService>,
+)> {
+    let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref())
+        .await?
+        .into();
+    let directory_service: Arc<dyn DirectoryService> =
+        directoryservice::from_addr(directory_service_addr.as_ref())
+            .await?
+            .into();
+    let path_info_service = pathinfoservice::from_addr(
+        path_info_service_addr.as_ref(),
+        blob_service.clone(),
+        directory_service.clone(),
+    )
+    .await?;
+    // TODO: grpc client also implements NarCalculationService
+    let nar_calculation_service = Box::new(SimpleRenderer::new(
+        blob_service.clone(),
+        directory_service.clone(),
+    )) as Box<dyn NarCalculationService>;
+    Ok((
+        blob_service,
+        directory_service,
+        path_info_service,
+        nar_calculation_service,
+    ))
+/// The inverse of [tokio_util::io::SyncIoBridge].
+/// Don't use this with anything that actually does blocking I/O.
+pub struct AsyncIoBridge<T>(pub T);
+impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        _cx: &mut task::Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        Poll::Ready(self.get_mut().0.write(buf))
+    }
+    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+        Poll::Ready(self.get_mut().0.flush())
+    }
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        _cx: &mut task::Context<'_>,
+    ) -> Poll<Result<(), io::Error>> {
+        Poll::Ready(Ok(()))
+    }