about summary refs log tree commit diff
path: root/tvix/store
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store')
-rw-r--r--tvix/store/Cargo.toml44
-rw-r--r--tvix/store/default.nix38
-rw-r--r--tvix/store/docs/api.md4
-rw-r--r--tvix/store/src/bin/tvix-store.rs291
-rw-r--r--tvix/store/src/import.rs81
-rw-r--r--tvix/store/src/nar/import.rs393
-rw-r--r--tvix/store/src/nar/mod.rs29
-rw-r--r--tvix/store/src/nar/renderer.rs90
-rw-r--r--tvix/store/src/pathinfoservice/bigtable.rs412
-rw-r--r--tvix/store/src/pathinfoservice/combinators.rs111
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs95
-rw-r--r--tvix/store/src/pathinfoservice/fs/mod.rs2
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs144
-rw-r--r--tvix/store/src/pathinfoservice/lru.rs128
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs67
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs25
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs185
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs170
-rw-r--r--tvix/store/src/pathinfoservice/tests/mod.rs82
-rw-r--r--tvix/store/src/pathinfoservice/tests/utils.rs45
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs25
-rw-r--r--tvix/store/src/proto/mod.rs30
-rw-r--r--tvix/store/src/proto/tests/pathinfo.rs127
-rw-r--r--tvix/store/src/tests/fixtures.rs10
-rw-r--r--tvix/store/src/utils.rs47
25 files changed, 1659 insertions, 1016 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 367e63c21e..dc6126724f 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -5,6 +5,7 @@ edition = "2021"
 
 [dependencies]
 anyhow = "1.0.68"
+async-compression = { version = "0.4.9", features = ["tokio", "bzip2", "gzip", "xz", "zstd"]}
 async-stream = "0.3.5"
 blake3 = { version = "1.3.1", features = ["rayon", "std"] }
 bstr = "1.6.0"
@@ -17,47 +18,64 @@ lazy_static = "1.4.0"
 nix-compat = { path = "../nix-compat", features = ["async"] }
 pin-project-lite = "0.2.13"
 prost = "0.12.1"
-opentelemetry = { version = "0.21.0", optional = true}
-opentelemetry-otlp = { version = "0.14.0", optional = true }
-opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"], optional = true}
+serde = { version = "1.0.197", features = [ "derive" ] }
+serde_json = "1.0"
+serde_with = "3.7.0"
+serde_qs = "0.12.0"
 sha2 = "0.10.6"
 sled = { version = "0.34.7" }
 thiserror = "1.0.38"
 tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
-tokio-listener = { version = "0.3.2", features = [ "tonic011" ] }
+tokio-listener = { version = "0.4.1", features = [ "tonic011" ] }
 tokio-stream = { version = "0.1.14", features = ["fs"] }
 tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
 tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
 tower = "0.4.13"
-tracing = "0.1.37"
-tracing-opentelemetry = "0.22.0"
-tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] }
 tvix-castore = { path = "../castore" }
 url = "2.4.0"
 walkdir = "2.4.0"
-async-recursion = "1.0.5"
 reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false }
-xz2 = "0.1.7"
+lru = "0.12.3"
+parking_lot = "0.12.2"
+tvix-tracing = { path = "../tracing" }
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
 
 [dependencies.tonic-reflection]
 optional = true
 version = "0.11.0"
 
+[dependencies.bigtable_rs]
+optional = true
+# https://github.com/liufuyang/bigtable_rs/pull/72
+git = "https://github.com/flokli/bigtable_rs"
+rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7"
+
 [build-dependencies]
 prost-build = "0.12.1"
 tonic-build = "0.11.0"
 
 [dev-dependencies]
-rstest = "0.18.2"
+async-process = "2.1.0"
+rstest = "0.19.0"
 rstest_reuse = "0.6.0"
-test-case = "3.3.1"
 tempfile = "3.3.0"
 tokio-retry = "0.3.0"
 
 [features]
 default = ["cloud", "fuse", "otlp", "tonic-reflection"]
-cloud = ["tvix-castore/cloud"]
+cloud = [
+  "dep:bigtable_rs",
+  "tvix-castore/cloud"
+]
 fuse = ["tvix-castore/fuse"]
-otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"]
+otlp = ["tvix-tracing/otlp"]
 tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"]
 virtiofs = ["tvix-castore/virtiofs"]
+# Whether to run the integration tests.
+# Requires the following packages in $PATH:
+# cbtemulator, google-cloud-bigtable-tool
+integration = []
+
+[lints]
+workspace = true
diff --git a/tvix/store/default.nix b/tvix/store/default.nix
index 2c07cdf2b3..78b499114c 100644
--- a/tvix/store/default.nix
+++ b/tvix/store/default.nix
@@ -1,4 +1,4 @@
-{ depot, pkgs, ... }:
+{ depot, pkgs, lib, ... }:
 
 let
   mkImportCheck = p: expectedPath: {
@@ -22,17 +22,35 @@ let
   };
 in
 
-(depot.tvix.crates.workspaceMembers.tvix-store.build.override {
+(depot.tvix.crates.workspaceMembers.tvix-store.build.override (old: {
   runTests = true;
   testPreRun = ''
-    export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt;
+    export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
   '';
-
-  # virtiofs feature currently fails to build on Darwin.
-  # we however can ship it for non-darwin.
-  features = if pkgs.stdenv.isDarwin then [ "default" ] else [ "default" "virtiofs" ];
-}).overrideAttrs (_: {
-  meta.ci.extraSteps = {
-    import-docs = (mkImportCheck "tvix/store/docs" ./docs);
+  features = old.features
+    # virtiofs feature currently fails to build on Darwin
+    ++ lib.optional pkgs.stdenv.isLinux "virtiofs";
+})).overrideAttrs (old: rec {
+  meta.ci = {
+    targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru);
+    extraSteps.import-docs = (mkImportCheck "tvix/store/docs" ./docs);
+  };
+  passthru = (depot.tvix.utils.mkFeaturePowerset {
+    inherit (old) crateName;
+    features = ([ "cloud" "fuse" "otlp" "tonic-reflection" ]
+      # virtiofs feature currently fails to build on Darwin
+      ++ lib.optional pkgs.stdenv.isLinux "virtiofs");
+    override.testPreRun = ''
+      export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
+    '';
+  }) // {
+    integration-tests = depot.tvix.crates.workspaceMembers.${old.crateName}.build.override (old: {
+      runTests = true;
+      testPreRun = ''
+        export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt;
+        export PATH="$PATH:${pkgs.lib.makeBinPath [ pkgs.cbtemulator pkgs.google-cloud-bigtable-tool ]}"
+      '';
+      features = old.features ++ [ "integration" ];
+    });
   };
 })
diff --git a/tvix/store/docs/api.md b/tvix/store/docs/api.md
index c1dacc89a5..c5a5c477aa 100644
--- a/tvix/store/docs/api.md
+++ b/tvix/store/docs/api.md
@@ -5,7 +5,7 @@ This document outlines the design of the API exposed by tvix-castore and tvix-
 store, as well as other implementations of this store protocol.
 
 This document is meant to be read side-by-side with
-[castore.md](../../tvix-castore/docs/castore.md) which describes the data model
+[castore.md](../../castore/docs/data-model.md) which describes the data model
 in more detail.
 
 The store API has four main consumers:
@@ -218,7 +218,7 @@ This is useful for people running a Tvix-only system, or running builds on a
 In a system with Nix installed, we can't simply manually "extract" things to
 `/nix/store`, as Nix assumes to own all writes to this location.
 In these use cases, we're probably better off exposing a tvix-store as a local
-binary cache (that's what `//tvix/nar-bridge` does).
+binary cache (that's what `//tvix/nar-bridge-go` does).
 
 Assuming we are in an environment where we control `/nix/store` exclusively, a
 "realize to disk" would either "extract" things from the `tvix-store` to a
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 27a67b7c91..03c699b893 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -2,17 +2,23 @@ use clap::Parser;
 use clap::Subcommand;
 
 use futures::future::try_join_all;
+use futures::StreamExt;
+use futures::TryStreamExt;
+use nix_compat::path_info::ExportedPathInfo;
+use serde::Deserialize;
+use serde::Serialize;
 use std::path::PathBuf;
 use std::sync::Arc;
 use tokio_listener::Listener;
 use tokio_listener::SystemOptions;
 use tokio_listener::UserOptions;
 use tonic::transport::Server;
-use tracing::info;
-use tracing::Level;
-use tracing_subscriber::EnvFilter;
-use tracing_subscriber::Layer;
-use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
+use tracing::{info, info_span, instrument, Level, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+use tvix_castore::import::fs::ingest_path;
+use tvix_store::nar::NarCalculationService;
+use tvix_store::proto::NarInfo;
+use tvix_store::proto::PathInfo;
 
 use tvix_castore::proto::blob_service_server::BlobServiceServer;
 use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
@@ -28,15 +34,6 @@ use tvix_store::pathinfoservice::make_fs;
 #[cfg(feature = "fuse")]
 use tvix_castore::fs::fuse::FuseDaemon;
 
-#[cfg(feature = "otlp")]
-use opentelemetry::KeyValue;
-#[cfg(feature = "otlp")]
-use opentelemetry_sdk::{
-    resource::{ResourceDetector, SdkProvidedResourceDetector},
-    trace::BatchConfig,
-    Resource,
-};
-
 #[cfg(feature = "virtiofs")]
 use tvix_castore::fs::virtiofs::start_virtiofs_daemon;
 
@@ -48,10 +45,6 @@ use tvix_store::proto::FILE_DESCRIPTOR_SET;
 #[derive(Parser)]
 #[command(author, version, about, long_about = None)]
 struct Cli {
-    /// Whether to log in JSON
-    #[arg(long)]
-    json: bool,
-
     /// Whether to configure OTLP. Set --otlp=false to disable.
     #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
     otlp: bool,
@@ -60,8 +53,8 @@ struct Cli {
     /// It's also possible to set `RUST_LOG` according to
     /// `tracing_subscriber::filter::EnvFilter`, which will always have
     /// priority.
-    #[arg(long)]
-    log_level: Option<Level>,
+    #[arg(long, default_value_t=Level::INFO)]
+    log_level: Level,
 
     #[command(subcommand)]
     command: Commands,
@@ -74,7 +67,11 @@ enum Commands {
         #[arg(long, short = 'l')]
         listen_address: Option<String>,
 
-        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")]
+        #[arg(
+            long,
+            env,
+            default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
+        )]
         blob_service_addr: String,
 
         #[arg(
@@ -101,6 +98,30 @@ enum Commands {
         #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
         path_info_service_addr: String,
     },
+
+    /// Copies a list of store paths on the system into tvix-store.
+    Copy {
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+
+        /// A path pointing to a JSON file produced by the Nix
+        /// `__structuredAttrs` containing reference graph information provided
+        /// by the `exportReferencesGraph` feature.
+        ///
+        /// This can be used to invoke tvix-store inside a Nix derivation
+        /// copying to a Tvix store (or outside, if the JSON file is copied
+        /// out).
+        ///
+        /// Currently limited to the `closure` key inside that JSON file.
+        #[arg(value_name = "NIX_ATTRS_JSON_FILE", env = "NIX_ATTRS_JSON_FILE")]
+        reference_graph_path: PathBuf,
+    },
     /// Mounts a tvix-store at the given mountpoint
     #[cfg(feature = "fuse")]
     Mount {
@@ -131,6 +152,10 @@ enum Commands {
         /// (exhaustive) listing.
         #[clap(long, short, action)]
         list_root: bool,
+
+        #[arg(long, default_value_t = true)]
+        /// Whether to expose blob and directory digests as extended attributes.
+        show_xattr: bool,
     },
     /// Starts a tvix-store virtiofs daemon at the given socket path.
     #[cfg(feature = "virtiofs")]
@@ -153,6 +178,10 @@ enum Commands {
         /// (exhaustive) listing.
         #[clap(long, short, action)]
         list_root: bool,
+
+        #[arg(long, default_value_t = true)]
+        /// Whether to expose blob and directory digests as extended attributes.
+        show_xattr: bool,
     },
 }
 
@@ -171,87 +200,22 @@ fn default_threads() -> usize {
 }
 
 #[tokio::main]
+#[instrument(fields(indicatif.pb_show=1))]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let cli = Cli::parse();
 
-    // configure log settings
-    let level = cli.log_level.unwrap_or(Level::INFO);
-
-    // Set up the tracing subscriber.
-    let subscriber = tracing_subscriber::registry()
-        .with(
-            cli.json.then_some(
-                tracing_subscriber::fmt::Layer::new()
-                    .with_writer(std::io::stderr)
-                    .json()
-                    .with_filter(
-                        EnvFilter::builder()
-                            .with_default_directive(level.into())
-                            .from_env()
-                            .expect("invalid RUST_LOG"),
-                    ),
-            ),
-        )
-        .with(
-            (!cli.json).then_some(
-                tracing_subscriber::fmt::Layer::new()
-                    .with_writer(std::io::stderr)
-                    .pretty()
-                    .with_filter(
-                        EnvFilter::builder()
-                            .with_default_directive(level.into())
-                            .from_env()
-                            .expect("invalid RUST_LOG"),
-                    ),
-            ),
-        );
-
-    // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
-    // then init the registry.
-    // If the feature is feature-flagged out, just init without adding the layer.
-    // It's necessary to do this separately, as every with() call chains the
-    // layer into the type of the registry.
     #[cfg(feature = "otlp")]
     {
-        let subscriber = if cli.otlp {
-            let tracer = opentelemetry_otlp::new_pipeline()
-                .tracing()
-                .with_exporter(opentelemetry_otlp::new_exporter().tonic())
-                .with_batch_config(BatchConfig::default())
-                .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
-                    // use SdkProvidedResourceDetector.detect to detect resources,
-                    // but replace the default service name with our default.
-                    // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
-                    let resources =
-                        SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
-                    // SdkProvidedResourceDetector currently always sets
-                    // `service.name`, but we don't like its default.
-                    if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
-                        resources.merge(&Resource::new([KeyValue::new(
-                            "service.name",
-                            "tvix.store",
-                        )]))
-                    } else {
-                        resources
-                    }
-                }))
-                .install_batch(opentelemetry_sdk::runtime::Tokio)?;
-
-            // Create a tracing layer with the configured tracer
-            let layer = tracing_opentelemetry::layer().with_tracer(tracer);
-
-            subscriber.with(Some(layer))
+        if cli.otlp {
+            tvix_tracing::init_with_otlp(cli.log_level, "tvix.store")?;
         } else {
-            subscriber.with(None)
-        };
-
-        subscriber.try_init()?;
+            tvix_tracing::init(cli.log_level)?;
+        }
     }
 
-    // Init the registry (when otlp is not enabled)
     #[cfg(not(feature = "otlp"))]
     {
-        subscriber.try_init()?;
+        tvix_tracing::init(cli.log_level)?;
     }
 
     match cli.command {
@@ -262,7 +226,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             path_info_service_addr,
         } => {
             // initialize stores
-            let (blob_service, directory_service, path_info_service) =
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
                 tvix_store::utils::construct_services(
                     blob_service_addr,
                     directory_service_addr,
@@ -287,6 +251,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 ))
                 .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
                     Arc::from(path_info_service),
+                    nar_calculation_service,
                 )));
 
             #[cfg(feature = "tonic-reflection")]
@@ -316,7 +281,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             path_info_service_addr,
         } => {
             // FUTUREWORK: allow flat for single files?
-            let (blob_service, directory_service, path_info_service) =
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
                 tvix_store::utils::construct_services(
                     blob_service_addr,
                     directory_service_addr,
@@ -324,16 +289,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 )
                 .await?;
 
-            // Arc the PathInfoService, as we clone it .
+            // Arc PathInfoService and NarCalculationService, as we clone it .
             let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            let nar_calculation_service: Arc<dyn NarCalculationService> =
+                nar_calculation_service.into();
+
+            let root_span = {
+                let s = Span::current();
+                s.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+                s.pb_set_message("Importing paths");
+                s.pb_set_length(paths.len() as u64);
+                s.pb_start();
+                s
+            };
 
             let tasks = paths
                 .into_iter()
                 .map(|path| {
+                    let paths_span = root_span.clone();
                     tokio::task::spawn({
                         let blob_service = blob_service.clone();
                         let directory_service = directory_service.clone();
                         let path_info_service = path_info_service.clone();
+                        let nar_calculation_service = nar_calculation_service.clone();
 
                         async move {
                             if let Ok(name) = tvix_store::import::path_to_name(&path) {
@@ -343,6 +321,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                                     blob_service,
                                     directory_service,
                                     path_info_service,
+                                    nar_calculation_service,
                                 )
                                 .await;
                                 if let Ok(output_path) = resp {
@@ -350,6 +329,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                                     println!("{}", output_path.to_absolute_path());
                                 }
                             }
+                            paths_span.pb_inc(1);
                         }
                     })
                 })
@@ -357,6 +337,119 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
             try_join_all(tasks).await?;
         }
+        Commands::Copy {
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            reference_graph_path,
+        } => {
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            // Parse the file at reference_graph_path.
+            let reference_graph_json = tokio::fs::read(&reference_graph_path).await?;
+
+            #[derive(Deserialize, Serialize)]
+            struct ReferenceGraph<'a> {
+                #[serde(borrow)]
+                closure: Vec<ExportedPathInfo<'a>>,
+            }
+
+            let reference_graph: ReferenceGraph<'_> =
+                serde_json::from_slice(reference_graph_json.as_slice())?;
+
+            // Arc the PathInfoService, as we clone it .
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+
+            let lookups_span = info_span!(
+                "lookup pathinfos",
+                "indicatif.pb_show" = tracing::field::Empty
+            );
+            lookups_span.pb_set_length(reference_graph.closure.len() as u64);
+            lookups_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+            lookups_span.pb_start();
+
+            // From our reference graph, lookup all pathinfos that might exist.
+            let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
+                .map(|elem| {
+                    let path_info_service = path_info_service.clone();
+                    async move {
+                        let resp = path_info_service
+                            .get(*elem.path.digest())
+                            .await
+                            .map(|resp| (elem, resp));
+
+                        Span::current().pb_inc(1);
+                        resp
+                    }
+                })
+                .buffer_unordered(50)
+                // Filter out all that are already uploaded.
+                // TODO: check if there's a better combinator for this
+                .try_filter_map(|(elem, path_info)| {
+                    std::future::ready(if path_info.is_none() {
+                        Ok(Some(elem))
+                    } else {
+                        Ok(None)
+                    })
+                })
+                .try_collect()
+                .await?;
+
+            // Run ingest_path on all of them.
+            let uploads: Vec<_> = futures::stream::iter(elems)
+                .map(|elem| {
+                    // Map to a future returning the root node, alongside with the closure info.
+                    let blob_service = blob_service.clone();
+                    let directory_service = directory_service.clone();
+                    async move {
+                        // Ingest the given path.
+
+                        ingest_path(
+                            blob_service,
+                            directory_service,
+                            PathBuf::from(elem.path.to_absolute_path()),
+                        )
+                        .await
+                        .map(|root_node| (elem, root_node))
+                    }
+                })
+                .buffer_unordered(10)
+                .try_collect()
+                .await?;
+
+            // Insert them into the PathInfoService.
+            // FUTUREWORK: do this properly respecting the reference graph.
+            for (elem, root_node) in uploads {
+                // Create and upload a PathInfo pointing to the root_node,
+                // annotated with information we have from the reference graph.
+                let path_info = PathInfo {
+                    node: Some(tvix_castore::proto::Node {
+                        node: Some(root_node),
+                    }),
+                    references: Vec::from_iter(
+                        elem.references.iter().map(|e| e.digest().to_vec().into()),
+                    ),
+                    narinfo: Some(NarInfo {
+                        nar_size: elem.nar_size,
+                        nar_sha256: elem.nar_sha256.to_vec().into(),
+                        signatures: vec![],
+                        reference_names: Vec::from_iter(
+                            elem.references.iter().map(|e| e.to_string()),
+                        ),
+                        deriver: None,
+                        ca: None,
+                    }),
+                };
+
+                path_info_service.put(path_info).await?;
+            }
+        }
         #[cfg(feature = "fuse")]
         Commands::Mount {
             dest,
@@ -366,8 +459,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             list_root,
             threads,
             allow_other,
+            show_xattr,
         } => {
-            let (blob_service, directory_service, path_info_service) =
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
                 tvix_store::utils::construct_services(
                     blob_service_addr,
                     directory_service_addr,
@@ -381,6 +475,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                     directory_service,
                     Arc::from(path_info_service),
                     list_root,
+                    show_xattr,
                 );
                 info!(mount_path=?dest, "mounting");
 
@@ -406,8 +501,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             directory_service_addr,
             path_info_service_addr,
             list_root,
+            show_xattr,
         } => {
-            let (blob_service, directory_service, path_info_service) =
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
                 tvix_store::utils::construct_services(
                     blob_service_addr,
                     directory_service_addr,
@@ -421,6 +517,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                     directory_service,
                     Arc::from(path_info_service),
                     list_root,
+                    show_xattr,
                 );
                 info!(socket_path=?socket, "starting virtiofs-daemon");
 
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index f2e7bdfe5d..888380bca9 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -1,16 +1,32 @@
 use std::path::Path;
 use tracing::{debug, instrument};
 use tvix_castore::{
-    blobservice::BlobService, directoryservice::DirectoryService, proto::node::Node, B3Digest,
+    blobservice::BlobService, directoryservice::DirectoryService, import::fs::ingest_path,
+    proto::node::Node, B3Digest,
 };
 
-use nix_compat::store_path::{self, StorePath};
+use nix_compat::{
+    nixhash::{CAHash, NixHash},
+    store_path::{self, StorePath},
+};
 
 use crate::{
+    nar::NarCalculationService,
     pathinfoservice::PathInfoService,
     proto::{nar_info, NarInfo, PathInfo},
 };
 
+impl From<CAHash> for nar_info::Ca {
+    fn from(value: CAHash) -> Self {
+        let hash_type: nar_info::ca::Hash = (&value).into();
+        let digest: bytes::Bytes = value.hash().to_string().into();
+        nar_info::Ca {
+            r#type: hash_type.into(),
+            digest,
+        }
+    }
+}
+
 pub fn log_node(node: &Node, path: &Path) {
     match node {
         Node::Directory(directory_node) => {
@@ -54,13 +70,20 @@ pub fn path_to_name(path: &Path) -> std::io::Result<&str> {
         })
 }
 
-/// Takes the NAR size, SHA-256 of the NAR representation and the root node.
-/// Returns the path information object for a content addressed NAR-style (recursive) object.
+/// Takes the NAR size, SHA-256 of the NAR representation, the root node and optionally
+/// a CA hash information.
+///
+/// Returns the path information object for a NAR-style object.
 ///
 /// This [`PathInfo`] can be further filled for signatures, deriver or verified for the expected
 /// hashes.
 #[inline]
-pub fn derive_nar_ca_path_info(nar_size: u64, nar_sha256: [u8; 32], root_node: Node) -> PathInfo {
+pub fn derive_nar_ca_path_info(
+    nar_size: u64,
+    nar_sha256: [u8; 32],
+    ca: Option<CAHash>,
+    root_node: Node,
+) -> PathInfo {
     // assemble the [crate::proto::PathInfo] object.
     PathInfo {
         node: Some(tvix_castore::proto::Node {
@@ -74,10 +97,7 @@ pub fn derive_nar_ca_path_info(nar_size: u64, nar_sha256: [u8; 32], root_node: N
             signatures: vec![],
             reference_names: vec![],
             deriver: None,
-            ca: Some(nar_info::Ca {
-                r#type: nar_info::ca::Hash::NarSha256.into(),
-                digest: nar_sha256.to_vec().into(),
-            }),
+            ca: ca.map(|ca_hash| ca_hash.into()),
         }),
     }
 }
@@ -85,24 +105,27 @@ pub fn derive_nar_ca_path_info(nar_size: u64, nar_sha256: [u8; 32], root_node: N
 /// Ingest the given path `path` and register the resulting output path in the
 /// [`PathInfoService`] as a recursive fixed output NAR.
 #[instrument(skip_all, fields(store_name=name, path=?path), err)]
-pub async fn import_path_as_nar_ca<BS, DS, PS, P>(
+pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>(
     path: P,
     name: &str,
     blob_service: BS,
     directory_service: DS,
     path_info_service: PS,
+    nar_calculation_service: NS,
 ) -> Result<StorePath, std::io::Error>
 where
     P: AsRef<Path> + std::fmt::Debug,
-    BS: AsRef<dyn BlobService> + Clone,
-    DS: AsRef<dyn DirectoryService>,
+    BS: BlobService + Clone,
+    DS: DirectoryService,
     PS: AsRef<dyn PathInfoService>,
+    NS: NarCalculationService,
 {
-    let root_node =
-        tvix_castore::import::ingest_path(blob_service, directory_service, &path).await?;
+    let root_node = ingest_path(blob_service, directory_service, path.as_ref())
+        .await
+        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
 
-    // Ask the PathInfoService for the NAR size and sha256
-    let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?;
+    // Ask for the NAR size and sha256
+    let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?;
 
     // Calculate the output path. This might still fail, as some names are illegal.
     // FUTUREWORK: express the `name` at the type level to be valid and move the conversion
@@ -118,7 +141,12 @@ where
     let root_node = root_node.rename(output_path.to_string().into_bytes().into());
     log_node(&root_node, path.as_ref());
 
-    let path_info = derive_nar_ca_path_info(nar_size, nar_sha256, root_node);
+    let path_info = derive_nar_ca_path_info(
+        nar_size,
+        nar_sha256,
+        Some(CAHash::Nar(NixHash::Sha256(nar_sha256))),
+        root_node,
+    );
 
     // This new [`PathInfo`] that we get back from there might contain additional signatures or
     // information set by the service itself. In this function, we silently swallow it because
@@ -133,21 +161,22 @@ mod tests {
     use std::{ffi::OsStr, path::PathBuf};
 
     use crate::import::path_to_name;
-    use test_case::test_case;
+    use rstest::rstest;
 
-    #[test_case("a/b/c", "c"; "simple path")]
-    #[test_case("a/b/../c", "c"; "simple path containing ..")]
-    #[test_case("a/b/../c/d/../e", "e"; "path containing multiple ..")]
+    #[rstest]
+    #[case::simple_path("a/b/c", "c")]
+    #[case::simple_path_containing_dotdot("a/b/../c", "c")]
+    #[case::path_containing_multiple_dotdot("a/b/../c/d/../e", "e")]
 
-    fn test_path_to_name(path: &str, expected_name: &str) {
+    fn test_path_to_name(#[case] path: &str, #[case] expected_name: &str) {
         let path: PathBuf = path.into();
         assert_eq!(path_to_name(&path).expect("must succeed"), expected_name);
     }
 
-    #[test_case(b"a/b/.."; "path ending in ..")]
-    #[test_case(b"\xf8\xa1\xa1\xa1\xa1"; "non unicode path")]
-
-    fn test_invalid_path_to_name(invalid_path: &[u8]) {
+    #[rstest]
+    #[case::path_ending_in_dotdot(b"a/b/..")]
+    #[case::non_unicode_path(b"\xf8\xa1\xa1\xa1\xa1")]
+    fn test_invalid_path_to_name(#[case] invalid_path: &[u8]) {
         let path: PathBuf = unsafe { OsStr::from_encoded_bytes_unchecked(invalid_path) }.into();
         path_to_name(&path).expect_err("must fail");
     }
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 6f4dcdea5d..32c2f4e580 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,225 +1,171 @@
-use bytes::Bytes;
-use nix_compat::nar;
-use std::io::{self, BufRead};
-use tokio_util::io::SyncIoBridge;
-use tracing::warn;
+use nix_compat::nar::reader::r#async as nar_reader;
+use sha2::Digest;
+use tokio::{
+    io::{AsyncBufRead, AsyncRead},
+    sync::mpsc,
+    try_join,
+};
 use tvix_castore::{
     blobservice::BlobService,
-    directoryservice::{DirectoryPutter, DirectoryService},
-    proto::{self as castorepb},
-    B3Digest,
+    directoryservice::DirectoryService,
+    import::{
+        blobs::{self, ConcurrentBlobUploader},
+        ingest_entries, IngestionEntry, IngestionError,
+    },
+    proto::{node::Node, NamedNode},
+    PathBuf,
 };
 
-/// Accepts a reader providing a NAR.
-/// Will traverse it, uploading blobs to the given [BlobService], and
-/// directories to the given [DirectoryService].
-/// On success, the root node is returned.
-/// This function is not async (because the NAR reader is not)
-/// and calls [tokio::task::block_in_place] when interacting with backing
-/// services, so make sure to only call this with spawn_blocking.
-pub fn read_nar<R, BS, DS>(
-    r: &mut R,
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// Returns the castore root node, as well as the sha256 and size of the NAR
+/// contents ingested.
+pub async fn ingest_nar_and_hash<R, BS, DS>(
     blob_service: BS,
     directory_service: DS,
-) -> io::Result<castorepb::node::Node>
+    r: &mut R,
+) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
 where
-    R: BufRead + Send,
-    BS: AsRef<dyn BlobService>,
-    DS: AsRef<dyn DirectoryService>,
+    R: AsyncRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
 {
-    let handle = tokio::runtime::Handle::current();
-
-    let directory_putter = directory_service.as_ref().put_multiple_start();
-
-    let node = nix_compat::nar::reader::open(r)?;
-    let (root_node, mut directory_putter, _) = process_node(
-        handle.clone(),
-        "".into(), // this is the root node, it has an empty name
-        node,
-        &blob_service,
-        directory_putter,
-    )?;
-
-    // In case the root node points to a directory, we need to close
-    // [directory_putter], and ensure the digest we got back from there matches
-    // what the root node is pointing to.
-    if let castorepb::node::Node::Directory(ref directory_node) = root_node {
-        // Close directory_putter to make sure all directories have been inserted.
-        let directory_putter_digest =
-            handle.block_on(handle.spawn(async move { directory_putter.close().await }))??;
-        let root_directory_node_digest: B3Digest =
-            directory_node.digest.clone().try_into().unwrap();
-
-        if directory_putter_digest != root_directory_node_digest {
-            warn!(
-                root_directory_node_digest = %root_directory_node_digest,
-                directory_putter_digest =%directory_putter_digest,
-                "directory digest mismatch",
-            );
-            return Err(io::Error::new(
-                io::ErrorKind::Other,
-                "directory digest mismatch",
-            ));
-        }
-    }
-    // In case it's not a Directory, [directory_putter] doesn't need to be
-    // closed (as we didn't end up uploading anything).
-    // It can just be dropped, as documented in its trait.
+    let mut nar_hash = sha2::Sha256::new();
+    let mut nar_size = 0;
 
-    Ok(root_node)
-}
+    // Assemble NarHash and NarSize as we read bytes.
+    let r = tokio_util::io::InspectReader::new(r, |b| {
+        nar_size += b.len() as u64;
+        use std::io::Write;
+        nar_hash.write_all(b).unwrap();
+    });
 
-/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node].
-/// It does so by handling all three kinds, and recursing for directories.
-///
-/// [DirectoryPutter] is passed around, so a single instance of it can be used,
-/// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_node<BS>(
-    handle: tokio::runtime::Handle,
-    name: bytes::Bytes,
-    node: nar::reader::Node,
-    blob_service: BS,
-    directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)>
-where
-    BS: AsRef<dyn BlobService>,
-{
-    Ok(match node {
-        nar::reader::Node::Symlink { target } => (
-            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
-                name,
-                target: target.into(),
-            }),
-            directory_putter,
-            blob_service,
-        ),
-        nar::reader::Node::File { executable, reader } => (
-            castorepb::node::Node::File(process_file_reader(
-                handle,
-                name,
-                reader,
-                executable,
-                &blob_service,
-            )?),
-            directory_putter,
-            blob_service,
-        ),
-        nar::reader::Node::Directory(dir_reader) => {
-            let (directory_node, directory_putter, blob_service_back) =
-                process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
-
-            (
-                castorepb::node::Node::Directory(directory_node),
-                directory_putter,
-                blob_service_back,
-            )
-        }
-    })
+    // HACK: InspectReader doesn't implement AsyncBufRead.
+    // See if this can be propagated through and we can then require our input
+    // reader to be buffered too.
+    let mut r = tokio::io::BufReader::new(r);
+
+    let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
+
+    Ok((root_node, nar_hash.finalize().into(), nar_size))
 }
 
-/// Given a name and [nar::reader::FileReader], this ingests the file into the
-/// passed [BlobService] and returns a [castorepb::FileNode].
-fn process_file_reader<BS>(
-    handle: tokio::runtime::Handle,
-    name: Bytes,
-    mut file_reader: nar::reader::FileReader,
-    executable: bool,
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// It returns the castore root node or an error.
+pub async fn ingest_nar<R, BS, DS>(
     blob_service: BS,
-) -> io::Result<castorepb::FileNode>
+    directory_service: DS,
+    r: &mut R,
+) -> Result<Node, IngestionError<Error>>
 where
-    BS: AsRef<dyn BlobService>,
+    R: AsyncBufRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
 {
-    // store the length. If we read any other length, reading will fail.
-    let expected_len = file_reader.len();
+    // open the NAR for reading.
+    // The NAR reader emits nodes in DFS preorder.
+    let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
 
-    // prepare writing a new blob.
-    let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await });
+    let (tx, rx) = mpsc::channel(1);
+    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
 
-    // write the blob.
-    let mut blob_writer = {
-        let mut dst = SyncIoBridge::new(blob_writer);
+    let produce = async move {
+        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
 
-        file_reader.copy(&mut dst)?;
-        dst.shutdown()?;
+        let res = produce_nar_inner(
+            &mut blob_uploader,
+            root_node,
+            "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
+            tx.clone(),
+        )
+        .await;
+
+        if let Err(err) = blob_uploader.join().await {
+            tx.send(Err(err.into()))
+                .await
+                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        }
+
+        tx.send(res)
+            .await
+            .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
 
-        // return back the blob_writer
-        dst.into_inner()
+        Ok(())
     };
 
-    // close the blob_writer, retrieve the digest.
-    let blob_digest = handle.block_on(async { blob_writer.close().await })?;
+    let consume = ingest_entries(directory_service, rx);
 
-    Ok(castorepb::FileNode {
-        name,
-        digest: blob_digest.into(),
-        size: expected_len,
-        executable,
-    })
+    let (_, node) = try_join!(produce, consume)?;
+
+    // remove the fake "root" name again
+    debug_assert_eq!(&node.get_name(), b"root");
+    Ok(node.rename("".into()))
 }
 
-/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode].
-/// It uses [process_node] to iterate over all children.
-///
-/// [DirectoryPutter] is passed around, so a single instance of it can be used,
-/// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_dir_reader<BS>(
-    handle: tokio::runtime::Handle,
-    name: Bytes,
-    mut dir_reader: nar::reader::DirReader,
-    blob_service: BS,
-    directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)>
+async fn produce_nar_inner<BS>(
+    blob_uploader: &mut ConcurrentBlobUploader<BS>,
+    node: nar_reader::Node<'_, '_>,
+    path: PathBuf,
+    tx: mpsc::Sender<Result<IngestionEntry, Error>>,
+) -> Result<IngestionEntry, Error>
 where
-    BS: AsRef<dyn BlobService>,
+    BS: BlobService + Clone + 'static,
 {
-    let mut directory = castorepb::Directory::default();
-
-    let mut directory_putter = directory_putter;
-    let mut blob_service = blob_service;
-    while let Some(entry) = dir_reader.next()? {
-        let (node, directory_putter_back, blob_service_back) = process_node(
-            handle.clone(),
-            entry.name.into(),
-            entry.node,
-            blob_service,
-            directory_putter,
-        )?;
-
-        blob_service = blob_service_back;
-        directory_putter = directory_putter_back;
-
-        match node {
-            castorepb::node::Node::Directory(node) => directory.directories.push(node),
-            castorepb::node::Node::File(node) => directory.files.push(node),
-            castorepb::node::Node::Symlink(node) => directory.symlinks.push(node),
+    Ok(match node {
+        nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
+        nar_reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            let size = reader.len();
+            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
+
+            IngestionEntry::Regular {
+                path,
+                size,
+                executable,
+                digest,
+            }
         }
-    }
+        nar_reader::Node::Directory(mut dir_reader) => {
+            while let Some(entry) = dir_reader.next().await? {
+                let mut path = path.clone();
+
+                // valid NAR names are valid castore names
+                path.try_push(entry.name)
+                    .expect("Tvix bug: failed to join name");
+
+                let entry = Box::pin(produce_nar_inner(
+                    blob_uploader,
+                    entry.node,
+                    path,
+                    tx.clone(),
+                ))
+                .await?;
+
+                tx.send(Ok(entry)).await.map_err(|e| {
+                    Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
+                })?;
+            }
+
+            IngestionEntry::Dir { path }
+        }
+    })
+}
 
-    // calculate digest and size.
-    let directory_digest = directory.digest();
-    let directory_size = directory.size();
-
-    // upload the directory. This is a bit more verbose, as we want to get back
-    // directory_putter for later reuse.
-    let directory_putter = handle.block_on(handle.spawn(async move {
-        directory_putter.put(directory).await?;
-        Ok::<_, io::Error>(directory_putter)
-    }))??;
-
-    Ok((
-        castorepb::DirectoryNode {
-            name,
-            digest: directory_digest.into(),
-            size: directory_size,
-        },
-        directory_putter,
-        blob_service,
-    ))
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error(transparent)]
+    IO(#[from] std::io::Error),
+
+    #[error(transparent)]
+    BlobUpload(#[from] blobs::Error),
 }
 
 #[cfg(test)]
 mod test {
-    use crate::nar::read_nar;
+    use crate::nar::ingest_nar;
     use std::io::Cursor;
     use std::sync::Arc;
 
@@ -244,19 +190,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking(|| {
-                read_nar(
-                    &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
-                    blob_service,
-                    directory_service,
-                )
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::Symlink(castorepb::SymlinkNode {
@@ -273,22 +213,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking({
-                let blob_service = blob_service.clone();
-                move || {
-                    read_nar(
-                        &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
-                        blob_service,
-                        directory_service,
-                    )
-                }
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::File(castorepb::FileNode {
@@ -310,23 +241,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking({
-                let blob_service = blob_service.clone();
-                let directory_service = directory_service.clone();
-                || {
-                    read_nar(
-                        &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
-                        blob_service,
-                        directory_service,
-                    )
-                }
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::Directory(castorepb::DirectoryNode {
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
index 49bb92fb0f..8cbb091f1a 100644
--- a/tvix/store/src/nar/mod.rs
+++ b/tvix/store/src/nar/mod.rs
@@ -1,10 +1,37 @@
+use tonic::async_trait;
 use tvix_castore::B3Digest;
 
 mod import;
 mod renderer;
-pub use import::read_nar;
+pub use import::ingest_nar;
+pub use import::ingest_nar_and_hash;
 pub use renderer::calculate_size_and_sha256;
 pub use renderer::write_nar;
+pub use renderer::SimpleRenderer;
+use tvix_castore::proto as castorepb;
+
+#[async_trait]
+pub trait NarCalculationService: Send + Sync {
+    /// Return the nar size and nar sha256 digest for a given root node.
+    /// This can be used to calculate NAR-based output paths.
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error>;
+}
+
+#[async_trait]
+impl<A> NarCalculationService for A
+where
+    A: AsRef<dyn NarCalculationService> + Send + Sync,
+{
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        self.as_ref().calculate_nar(root_node).await
+    }
+}
 
 /// Errors that can encounter while rendering NARs.
 #[derive(Debug, thiserror::Error)]
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index 9ac363ff57..efd67671db 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -1,20 +1,51 @@
-use super::RenderError;
-use async_recursion::async_recursion;
+use crate::utils::AsyncIoBridge;
+
+use super::{NarCalculationService, RenderError};
 use count_write::CountWrite;
 use nix_compat::nar::writer::r#async as nar_writer;
 use sha2::{Digest, Sha256};
-use std::{
-    pin::Pin,
-    task::{self, Poll},
-};
 use tokio::io::{self, AsyncWrite, BufReader};
-use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
+use tonic::async_trait;
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
     proto::{self as castorepb, NamedNode},
 };
 
+pub struct SimpleRenderer<BS, DS> {
+    blob_service: BS,
+    directory_service: DS,
+}
+
+impl<BS, DS> SimpleRenderer<BS, DS> {
+    pub fn new(blob_service: BS, directory_service: DS) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+        }
+    }
+}
+
+#[async_trait]
+impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS>
+where
+    BS: BlobService + Clone,
+    DS: DirectoryService + Clone,
+{
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        calculate_size_and_sha256(
+            root_node,
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+        )
+        .await
+        .map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e)))
+    }
+}
+
 /// Invoke [write_nar], and return the size and sha256 digest of the produced
 /// NAR output.
 pub async fn calculate_size_and_sha256<BS, DS>(
@@ -42,37 +73,12 @@ where
     Ok((cw.count(), h.finalize().into()))
 }
 
-/// The inverse of [tokio_util::io::SyncIoBridge].
-/// Don't use this with anything that actually does blocking I/O.
-struct AsyncIoBridge<T>(T);
-
-impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
-    fn poll_write(
-        self: Pin<&mut Self>,
-        _cx: &mut task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<io::Result<usize>> {
-        Poll::Ready(self.get_mut().0.write(buf))
-    }
-
-    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
-        Poll::Ready(self.get_mut().0.flush())
-    }
-
-    fn poll_shutdown(
-        self: Pin<&mut Self>,
-        _cx: &mut task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        Poll::Ready(Ok(()))
-    }
-}
-
 /// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
 /// and uses the passed blob_service and directory_service to perform the
 /// necessary lookups as it traverses the structure.
 /// The contents in NAR serialization are writen to the passed [AsyncWrite].
 pub async fn write_nar<W, BS, DS>(
-    w: W,
+    mut w: W,
     proto_root_node: &castorepb::node::Node,
     blob_service: BS,
     directory_service: DS,
@@ -83,7 +89,6 @@ where
     DS: DirectoryService + Send,
 {
     // Initialize NAR writer
-    let mut w = w.compat_write();
     let nar_root_node = nar_writer::open(&mut w)
         .await
         .map_err(RenderError::NARWriterError)?;
@@ -101,9 +106,8 @@ where
 
 /// Process an intermediate node in the structure.
 /// This consumes the node.
-#[async_recursion]
 async fn walk_node<BS, DS>(
-    nar_node: nar_writer::Node<'async_recursion, '_>,
+    nar_node: nar_writer::Node<'_, '_>,
     proto_node: &castorepb::node::Node,
     blob_service: BS,
     directory_service: DS,
@@ -128,7 +132,7 @@ where
                 ))
             })?;
 
-            let blob_reader = match blob_service
+            let mut blob_reader = match blob_service
                 .open_read(&digest)
                 .await
                 .map_err(RenderError::StoreError)?
@@ -144,7 +148,7 @@ where
                 .file(
                     proto_file_node.executable,
                     proto_file_node.size,
-                    &mut blob_reader.compat(),
+                    &mut blob_reader,
                 )
                 .await
                 .map_err(RenderError::NARWriterError)?;
@@ -193,9 +197,13 @@ where
                             .await
                             .map_err(RenderError::NARWriterError)?;
 
-                        (blob_service, directory_service) =
-                            walk_node(child_node, &proto_node, blob_service, directory_service)
-                                .await?;
+                        (blob_service, directory_service) = Box::pin(walk_node(
+                            child_node,
+                            &proto_node,
+                            blob_service,
+                            directory_service,
+                        ))
+                        .await?;
                     }
 
                     // close the directory
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs
new file mode 100644
index 0000000000..707a686c0a
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/bigtable.rs
@@ -0,0 +1,412 @@
+use super::PathInfoService;
+use crate::proto;
+use crate::proto::PathInfo;
+use async_stream::try_stream;
+use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
+use bytes::Bytes;
+use data_encoding::HEXLOWER;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use prost::Message;
+use serde::{Deserialize, Serialize};
+use serde_with::{serde_as, DurationSeconds};
+use tonic::async_trait;
+use tracing::{instrument, trace};
+use tvix_castore::Error;
+
+/// There should not be more than 10 MiB in a single cell.
+/// https://cloud.google.com/bigtable/docs/schema-design#cells
+const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
+
+/// Provides a [DirectoryService] implementation using
+/// [Bigtable](https://cloud.google.com/bigtable/docs/)
+/// as an underlying K/V store.
+///
+/// # Data format
+/// We use Bigtable as a plain K/V store.
+/// The row key is the digest of the store path, in hexlower.
+/// Inside the row, we currently have a single column/cell, again using the
+/// hexlower store path digest.
+/// Its value is the PathInfo message, serialized in canonical protobuf.
+/// We currently only populate this column.
+///
+/// Listing is ranging over all rows, and calculate_nar is returning a
+/// "unimplemented" error.
+#[derive(Clone)]
+pub struct BigtablePathInfoService {
+    client: bigtable::BigTable,
+    params: BigtableParameters,
+
+    #[cfg(test)]
+    #[allow(dead_code)]
+    /// Holds the temporary directory containing the unix socket, and the
+    /// spawned emulator process.
+    emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
+}
+
+/// Represents configuration of [BigtablePathInfoService].
+/// This currently conflates both connect parameters and data model/client
+/// behaviour parameters.
+#[serde_as]
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+pub struct BigtableParameters {
+    project_id: String,
+    instance_name: String,
+    #[serde(default)]
+    is_read_only: bool,
+    #[serde(default = "default_channel_size")]
+    channel_size: usize,
+
+    #[serde_as(as = "Option<DurationSeconds<String>>")]
+    #[serde(default = "default_timeout")]
+    timeout: Option<std::time::Duration>,
+    table_name: String,
+    family_name: String,
+
+    #[serde(default = "default_app_profile_id")]
+    app_profile_id: String,
+}
+
+impl BigtableParameters {
+    #[cfg(test)]
+    pub fn default_for_tests() -> Self {
+        Self {
+            project_id: "project-1".into(),
+            instance_name: "instance-1".into(),
+            is_read_only: false,
+            channel_size: default_channel_size(),
+            timeout: default_timeout(),
+            table_name: "table-1".into(),
+            family_name: "cf1".into(),
+            app_profile_id: default_app_profile_id(),
+        }
+    }
+}
+
+fn default_app_profile_id() -> String {
+    "default".to_owned()
+}
+
+fn default_channel_size() -> usize {
+    4
+}
+
+fn default_timeout() -> Option<std::time::Duration> {
+    Some(std::time::Duration::from_secs(4))
+}
+
+impl BigtablePathInfoService {
+    #[cfg(not(test))]
+    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+        let connection = bigtable::BigTableConnection::new(
+            &params.project_id,
+            &params.instance_name,
+            params.is_read_only,
+            params.channel_size,
+            params.timeout,
+        )
+        .await?;
+
+        Ok(Self {
+            client: connection.client(),
+            params,
+        })
+    }
+
+    #[cfg(test)]
+    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+        use std::time::Duration;
+
+        use async_process::{Command, Stdio};
+        use tempfile::TempDir;
+        use tokio_retry::{strategy::ExponentialBackoff, Retry};
+
+        let tmpdir = TempDir::new().unwrap();
+
+        let socket_path = tmpdir.path().join("cbtemulator.sock");
+
+        let emulator_process = Command::new("cbtemulator")
+            .arg("-address")
+            .arg(socket_path.clone())
+            .stderr(Stdio::piped())
+            .stdout(Stdio::piped())
+            .kill_on_drop(true)
+            .spawn()
+            .expect("failed to spawn emulator");
+
+        Retry::spawn(
+            ExponentialBackoff::from_millis(20)
+                .max_delay(Duration::from_secs(1))
+                .take(3),
+            || async {
+                if socket_path.exists() {
+                    Ok(())
+                } else {
+                    Err(())
+                }
+            },
+        )
+        .await
+        .expect("failed to wait for socket");
+
+        // populate the emulator
+        for cmd in &[
+            vec!["createtable", &params.table_name],
+            vec!["createfamily", &params.table_name, &params.family_name],
+        ] {
+            Command::new("cbt")
+                .args({
+                    let mut args = vec![
+                        "-instance",
+                        &params.instance_name,
+                        "-project",
+                        &params.project_id,
+                    ];
+                    args.extend_from_slice(cmd);
+                    args
+                })
+                .env(
+                    "BIGTABLE_EMULATOR_HOST",
+                    format!("unix://{}", socket_path.to_string_lossy()),
+                )
+                .output()
+                .await
+                .expect("failed to run cbt setup command");
+        }
+
+        let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
+            &format!("unix://{}", socket_path.to_string_lossy()),
+            &params.project_id,
+            &params.instance_name,
+            false,
+            None,
+        )?;
+
+        Ok(Self {
+            client: connection.client(),
+            params,
+            emulator: (tmpdir, emulator_process).into(),
+        })
+    }
+}
+
+/// Derives the row/column key for a given output path.
+/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
+fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
+    HEXLOWER.encode(digest)
+}
+
+#[async_trait]
+impl PathInfoService for BigtablePathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let mut client = self.client.clone();
+        let path_info_key = derive_pathinfo_key(&digest);
+
+        let request = bigtable_v2::ReadRowsRequest {
+            app_profile_id: self.params.app_profile_id.to_string(),
+            table_name: client.get_full_table_name(&self.params.table_name),
+            rows_limit: 1,
+            rows: Some(bigtable_v2::RowSet {
+                row_keys: vec![path_info_key.clone().into()],
+                row_ranges: vec![],
+            }),
+            // Filter selected family name, and column qualifier matching the digest.
+            // The latter is to ensure we don't fail once we start adding more metadata.
+            filter: Some(bigtable_v2::RowFilter {
+                filter: Some(bigtable_v2::row_filter::Filter::Chain(
+                    bigtable_v2::row_filter::Chain {
+                        filters: vec![
+                            bigtable_v2::RowFilter {
+                                filter: Some(
+                                    bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
+                                        self.params.family_name.to_string(),
+                                    ),
+                                ),
+                            },
+                            bigtable_v2::RowFilter {
+                                filter: Some(
+                                    bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
+                                        path_info_key.clone().into(),
+                                    ),
+                                ),
+                            },
+                        ],
+                    },
+                )),
+            }),
+            ..Default::default()
+        };
+
+        let mut response = client
+            .read_rows(request)
+            .await
+            .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
+
+        if response.len() != 1 {
+            if response.len() > 1 {
+                // This shouldn't happen, we limit number of rows to 1
+                return Err(Error::StorageError(
+                    "got more than one row from bigtable".into(),
+                ));
+            }
+            // else, this is simply a "not found".
+            return Ok(None);
+        }
+
+        let (row_key, mut cells) = response.pop().unwrap();
+        if row_key != path_info_key.as_bytes() {
+            // This shouldn't happen, we requested this row key.
+            return Err(Error::StorageError(
+                "got wrong row key from bigtable".into(),
+            ));
+        }
+
+        let cell = cells
+            .pop()
+            .ok_or_else(|| Error::StorageError("found no cells".into()))?;
+
+        // Ensure there's only one cell (so no more left after the pop())
+        // This shouldn't happen, We filter out other cells in our query.
+        if !cells.is_empty() {
+            return Err(Error::StorageError(
+                "more than one cell returned from bigtable".into(),
+            ));
+        }
+
+        // We also require the qualifier to be correct in the filter above,
+        // so this shouldn't happen.
+        if path_info_key.as_bytes() != cell.qualifier {
+            return Err(Error::StorageError("unexpected cell qualifier".into()));
+        }
+
+        // Try to parse the value into a PathInfo message
+        let path_info = proto::PathInfo::decode(Bytes::from(cell.value))
+            .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
+
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?;
+
+        if store_path.digest() != &digest {
+            return Err(Error::StorageError("PathInfo has unexpected digest".into()));
+        }
+
+        Ok(Some(path_info))
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("pathinfo failed validation: {}", e)))?;
+
+        let mut client = self.client.clone();
+        let path_info_key = derive_pathinfo_key(store_path.digest());
+
+        let data = path_info.encode_to_vec();
+        if data.len() as u64 > CELL_SIZE_LIMIT {
+            return Err(Error::StorageError(
+                "PathInfo exceeds cell limit on Bigtable".into(),
+            ));
+        }
+
+        let resp = client
+            .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
+                table_name: client.get_full_table_name(&self.params.table_name),
+                app_profile_id: self.params.app_profile_id.to_string(),
+                row_key: path_info_key.clone().into(),
+                predicate_filter: Some(bigtable_v2::RowFilter {
+                    filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
+                        path_info_key.clone().into(),
+                    )),
+                }),
+                // If the column was already found, do nothing.
+                true_mutations: vec![],
+                // Else, do the insert.
+                false_mutations: vec![
+                    // https://cloud.google.com/bigtable/docs/writes
+                    bigtable_v2::Mutation {
+                        mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
+                            bigtable_v2::mutation::SetCell {
+                                family_name: self.params.family_name.to_string(),
+                                column_qualifier: path_info_key.clone().into(),
+                                timestamp_micros: -1, // use server time to fill timestamp
+                                value: data,
+                            },
+                        )),
+                    },
+                ],
+            })
+            .await
+            .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?;
+
+        if resp.predicate_matched {
+            trace!("already existed")
+        }
+
+        Ok(path_info)
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let mut client = self.client.clone();
+
+        let request = bigtable_v2::ReadRowsRequest {
+            app_profile_id: self.params.app_profile_id.to_string(),
+            table_name: client.get_full_table_name(&self.params.table_name),
+            filter: Some(bigtable_v2::RowFilter {
+                filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
+                    self.params.family_name.to_string(),
+                )),
+            }),
+            ..Default::default()
+        };
+
+        let stream = try_stream! {
+            // TODO: add pagination, we don't want to hold all of this in memory.
+            let response = client
+                .read_rows(request)
+                .await
+                .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
+
+            for (row_key, mut cells) in response {
+                let cell = cells
+                    .pop()
+                    .ok_or_else(|| Error::StorageError("found no cells".into()))?;
+
+                // The cell must have the same qualifier as the row key
+                if row_key != cell.qualifier {
+                    Err(Error::StorageError("unexpected cell qualifier".into()))?;
+                }
+
+                // Ensure there's only one cell (so no more left after the pop())
+                // This shouldn't happen, We filter out other cells in our query.
+                if !cells.is_empty() {
+
+                    Err(Error::StorageError(
+                        "more than one cell returned from bigtable".into(),
+                    ))?
+                }
+
+                // Try to parse the value into a PathInfo message.
+                let path_info = proto::PathInfo::decode(Bytes::from(cell.value))
+                    .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
+
+                // Validate the containing PathInfo, ensure its StorePath digest
+                // matches row key.
+                let store_path = path_info
+                    .validate()
+                    .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?;
+
+                if store_path.digest().as_slice() != row_key.as_slice() {
+                    Err(Error::StorageError("PathInfo has unexpected digest".into()))?
+                }
+
+
+                yield path_info
+            }
+        };
+
+        Box::pin(stream)
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs
new file mode 100644
index 0000000000..664144ef49
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/combinators.rs
@@ -0,0 +1,111 @@
+use crate::proto::PathInfo;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use tonic::async_trait;
+use tracing::{debug, instrument};
+use tvix_castore::Error;
+
+use super::PathInfoService;
+
+/// Asks near first, if not found, asks far.
+/// If found in there, returns it, and *inserts* it into
+/// near.
+/// There is no negative cache.
+/// Inserts and listings are not implemented for now.
+pub struct Cache<PS1, PS2> {
+    near: PS1,
+    far: PS2,
+}
+
+impl<PS1, PS2> Cache<PS1, PS2> {
+    pub fn new(near: PS1, far: PS2) -> Self {
+        Self { near, far }
+    }
+}
+
+#[async_trait]
+impl<PS1, PS2> PathInfoService for Cache<PS1, PS2>
+where
+    PS1: PathInfoService,
+    PS2: PathInfoService,
+{
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        match self.near.get(digest).await? {
+            Some(path_info) => {
+                debug!("serving from cache");
+                Ok(Some(path_info))
+            }
+            None => {
+                debug!("not found in near, asking remoteโ€ฆ");
+                match self.far.get(digest).await? {
+                    None => Ok(None),
+                    Some(path_info) => {
+                        debug!("found in remote, adding to cache");
+                        self.near.put(path_info.clone()).await?;
+                        Ok(Some(path_info))
+                    }
+                }
+            }
+        }
+    }
+
+    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
+        Err(Error::StorageError("unimplemented".to_string()))
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        Box::pin(tokio_stream::once(Err(Error::StorageError(
+            "unimplemented".to_string(),
+        ))))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::num::NonZeroUsize;
+
+    use crate::{
+        pathinfoservice::{LruPathInfoService, MemoryPathInfoService, PathInfoService},
+        tests::fixtures::PATH_INFO_WITH_NARINFO,
+    };
+
+    const PATH_INFO_DIGEST: [u8; 20] = [0; 20];
+
+    /// Helper function setting up an instance of a "far" and "near"
+    /// PathInfoService.
+    async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, MemoryPathInfoService> {
+        // Create an instance of a "far" PathInfoService.
+        let far = MemoryPathInfoService::default();
+
+        // โ€ฆ and an instance of a "near" PathInfoService.
+        let near = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap());
+
+        // create a Pathinfoservice combining the two and return it.
+        super::Cache::new(near, far)
+    }
+
+    /// Getting from the far backend is gonna insert it into the near one.
+    #[tokio::test]
+    async fn test_populate_cache() {
+        let svc = create_pathinfoservice().await;
+
+        // query the PathInfo, things should not be there.
+        assert!(svc.get(PATH_INFO_DIGEST).await.unwrap().is_none());
+
+        // insert it into the far one.
+        svc.far.put(PATH_INFO_WITH_NARINFO.clone()).await.unwrap();
+
+        // now try getting it again, it should succeed.
+        assert_eq!(
+            Some(PATH_INFO_WITH_NARINFO.clone()),
+            svc.get(PATH_INFO_DIGEST).await.unwrap()
+        );
+
+        // peek near, it should now be there.
+        assert_eq!(
+            Some(PATH_INFO_WITH_NARINFO.clone()),
+            svc.near.get(PATH_INFO_DIGEST).await.unwrap()
+        );
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 6109054d7a..455909e7f2 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -37,7 +37,8 @@ pub async fn from_addr(
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
 ) -> Result<Box<dyn PathInfoService>, Error> {
-    let url =
+    #[allow(unused_mut)]
+    let mut url =
         Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
 
     let path_info_service: Box<dyn PathInfoService> = match url.scheme() {
@@ -46,7 +47,7 @@ pub async fn from_addr(
             if url.has_host() || !url.path().is_empty() {
                 return Err(Error::StorageError("invalid url".to_string()));
             }
-            Box::new(MemoryPathInfoService::new(blob_service, directory_service))
+            Box::<MemoryPathInfoService>::default()
         }
         "sled" => {
             // sled doesn't support host, and a path can be provided (otherwise
@@ -64,10 +65,10 @@ pub async fn from_addr(
             // TODO: expose other parameters as URL parameters?
 
             Box::new(if url.path().is_empty() {
-                SledPathInfoService::new_temporary(blob_service, directory_service)
+                SledPathInfoService::new_temporary()
                     .map_err(|e| Error::StorageError(e.to_string()))?
             } else {
-                SledPathInfoService::new(url.path(), blob_service, directory_service)
+                SledPathInfoService::new(url.path())
                     .map_err(|e| Error::StorageError(e.to_string()))?
             })
         }
@@ -108,6 +109,30 @@ pub async fn from_addr(
                 PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?);
             Box::new(GRPCPathInfoService::from_client(client))
         }
+        #[cfg(feature = "cloud")]
+        "bigtable" => {
+            use super::bigtable::BigtableParameters;
+            use super::BigtablePathInfoService;
+
+            // parse the instance name from the hostname.
+            let instance_name = url
+                .host_str()
+                .ok_or_else(|| Error::StorageError("instance name missing".into()))?
+                .to_string();
+
+            // โ€ฆ but add it to the query string now, so we just need to parse that.
+            url.query_pairs_mut()
+                .append_pair("instance_name", &instance_name);
+
+            let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
+                .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
+
+            Box::new(
+                BigtablePathInfoService::connect(params)
+                    .await
+                    .map_err(|e| Error::StorageError(e.to_string()))?,
+            )
+        }
         _ => Err(Error::StorageError(format!(
             "unknown scheme: {}",
             url.scheme()
@@ -121,9 +146,9 @@ pub async fn from_addr(
 mod tests {
     use super::from_addr;
     use lazy_static::lazy_static;
+    use rstest::rstest;
     use std::sync::Arc;
     use tempfile::TempDir;
-    use test_case::test_case;
     use tvix_castore::{
         blobservice::{BlobService, MemoryBlobService},
         directoryservice::{DirectoryService, MemoryDirectoryService},
@@ -136,52 +161,66 @@ mod tests {
 
     // the gRPC tests below don't fail, because we connect lazily.
 
+    #[rstest]
     /// This uses a unsupported scheme.
-    #[test_case("http://foo.example/test", false; "unsupported scheme")]
+    #[case::unsupported_scheme("http://foo.example/test", false)]
     /// This configures sled in temporary mode.
-    #[test_case("sled://", true; "sled valid temporary")]
+    #[case::sled_temporary("sled://", true)]
     /// This configures sled with /, which should fail.
-    #[test_case("sled:///", false; "sled invalid root")]
+    #[case::sled_invalid_root("sled:///", false)]
     /// This configures sled with a host, not path, which should fail.
-    #[test_case("sled://foo.example", false; "sled invalid host")]
+    #[case::sled_invalid_host("sled://foo.example", false)]
     /// This configures sled with a valid path path, which should succeed.
-    #[test_case(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true; "sled valid path")]
+    #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)]
     /// This configures sled with a host, and a valid path path, which should fail.
-    #[test_case(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false; "sled invalid host with valid path")]
+    #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)]
     /// This correctly sets the scheme, and doesn't set a path.
-    #[test_case("memory://", true; "memory valid")]
+    #[case::memory_valid("memory://", true)]
     /// This sets a memory url host to `foo`
-    #[test_case("memory://foo", false; "memory invalid host")]
+    #[case::memory_invalid_host("memory://foo", false)]
     /// This sets a memory url path to "/", which is invalid.
-    #[test_case("memory:///", false; "memory invalid root path")]
+    #[case::memory_invalid_root_path("memory:///", false)]
     /// This sets a memory url path to "/foo", which is invalid.
-    #[test_case("memory:///foo", false; "memory invalid root path foo")]
+    #[case::memory_invalid_root_path_foo("memory:///foo", false)]
     /// Correct Scheme for the cache.nixos.org binary cache.
-    #[test_case("nix+https://cache.nixos.org", true; "correct nix+https")]
+    #[case::correct_nix_https("nix+https://cache.nixos.org", true)]
     /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL).
-    #[test_case("nix+http://cache.nixos.org", true; "correct nix+http")]
+    #[case::correct_nix_http("nix+http://cache.nixos.org", true)]
     /// Correct Scheme for Nix HTTP Binary cache, with a subpath.
-    #[test_case("nix+http://192.0.2.1/foo", true; "correct nix http with subpath")]
+    #[case::correct_nix_http_with_subpath("nix+http://192.0.2.1/foo", true)]
     /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port.
-    #[test_case("nix+http://[::1]:8080/foo", true; "correct nix http with subpath and port")]
+    #[case::correct_nix_http_with_subpath_and_port("nix+http://[::1]:8080/foo", true)]
     /// Correct Scheme for the cache.nixos.org binary cache, and correct trusted public key set
-    #[test_case("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true; "correct nix+https with trusted-public-key")]
+    #[case::correct_nix_https_with_trusted_public_key("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true)]
     /// Correct Scheme for the cache.nixos.org binary cache, and two correct trusted public keys set
-    #[test_case("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true; "correct nix+https with two trusted-public-key")]
+    #[case::correct_nix_https_with_two_trusted_public_keys("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true)]
     /// Correct scheme to connect to a unix socket.
-    #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")]
+    #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)]
     /// Correct scheme for unix socket, but setting a host too, which is invalid.
-    #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")]
+    #[case::grpc_invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)]
     /// Correct scheme to connect to localhost, with port 12345
-    #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")]
+    #[case::grpc_valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)]
     /// Correct scheme to connect to localhost over http, without specifying a port.
-    #[test_case("grpc+http://localhost", true; "grpc valid http host without port")]
+    #[case::grpc_valid_http_host_without_port("grpc+http://localhost", true)]
     /// Correct scheme to connect to localhost over http, without specifying a port.
-    #[test_case("grpc+https://localhost", true; "grpc valid https host without port")]
+    #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)]
     /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
-    #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")]
+    #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
+    /// A valid example for Bigtable.
+    #[cfg_attr(
+        all(feature = "cloud", feature = "integration"),
+        case::bigtable_valid(
+            "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1",
+            true
+        )
+    )]
+    /// An invalid example for Bigtable, missing fields
+    #[cfg_attr(
+        all(feature = "cloud", feature = "integration"),
+        case::bigtable_invalid_missing_fields("bigtable://instance-1", false)
+    )]
     #[tokio::test]
-    async fn test_from_addr_tokio(uri_str: &str, exp_succeed: bool) {
+    async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
         let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
         let directory_service: Arc<dyn DirectoryService> =
             Arc::from(MemoryDirectoryService::default());
diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs
index 45d59fd0bc..aa64b1c01f 100644
--- a/tvix/store/src/pathinfoservice/fs/mod.rs
+++ b/tvix/store/src/pathinfoservice/fs/mod.rs
@@ -17,6 +17,7 @@ pub fn make_fs<BS, DS, PS>(
     directory_service: DS,
     path_info_service: PS,
     list_root: bool,
+    show_xattr: bool,
 ) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>>
 where
     BS: AsRef<dyn BlobService> + Send + Clone + 'static,
@@ -28,6 +29,7 @@ where
         directory_service,
         RootNodesWrapper(path_info_service),
         list_root,
+        show_xattr,
     )
 }
 
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 7d740429cf..f6a356cf18 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -1,8 +1,13 @@
 use super::PathInfoService;
-use crate::proto::{self, ListPathInfoRequest, PathInfo};
+use crate::{
+    nar::NarCalculationService,
+    proto::{self, ListPathInfoRequest, PathInfo},
+};
 use async_stream::try_stream;
 use futures::stream::BoxStream;
+use nix_compat::nixbase32;
 use tonic::{async_trait, transport::Channel, Code};
+use tracing::instrument;
 use tvix_castore::{proto as castorepb, Error};
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
@@ -25,6 +30,7 @@ impl GRPCPathInfoService {
 
 #[async_trait]
 impl PathInfoService for GRPCPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         let path_info = self
             .grpc_client
@@ -51,6 +57,7 @@ impl PathInfoService for GRPCPathInfoService {
         }
     }
 
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
         let path_info = self
             .grpc_client
@@ -63,29 +70,7 @@ impl PathInfoService for GRPCPathInfoService {
         Ok(path_info)
     }
 
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        let path_info = self
-            .grpc_client
-            .clone()
-            .calculate_nar(castorepb::Node {
-                node: Some(root_node.clone()),
-            })
-            .await
-            .map_err(|e| Error::StorageError(e.to_string()))?
-            .into_inner();
-
-        let nar_sha256: [u8; 32] = path_info
-            .nar_sha256
-            .to_vec()
-            .try_into()
-            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
-
-        Ok((path_info.nar_size, nar_sha256))
-    }
-
+    #[instrument(level = "trace", skip_all)]
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         let mut grpc_client = self.grpc_client.clone();
 
@@ -120,88 +105,47 @@ impl PathInfoService for GRPCPathInfoService {
     }
 }
 
+#[async_trait]
+impl NarCalculationService for GRPCPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        let path_info = self
+            .grpc_client
+            .clone()
+            .calculate_nar(castorepb::Node {
+                node: Some(root_node.clone()),
+            })
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .into_inner();
+
+        let nar_sha256: [u8; 32] = path_info
+            .nar_sha256
+            .to_vec()
+            .try_into()
+            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
+
+        Ok((path_info.nar_size, nar_sha256))
+    }
+}
+
 #[cfg(test)]
 mod tests {
-    use std::sync::Arc;
-    use std::time::Duration;
-
-    use rstest::*;
-    use tempfile::TempDir;
-    use tokio::net::UnixListener;
-    use tokio_retry::strategy::ExponentialBackoff;
-    use tokio_retry::Retry;
-    use tokio_stream::wrappers::UnixListenerStream;
-    use tvix_castore::blobservice::BlobService;
-    use tvix_castore::directoryservice::DirectoryService;
-
-    use crate::pathinfoservice::MemoryPathInfoService;
-    use crate::proto::path_info_service_client::PathInfoServiceClient;
-    use crate::proto::GRPCPathInfoServiceWrapper;
-    use crate::tests::fixtures::{self, blob_service, directory_service};
-
-    use super::GRPCPathInfoService;
-    use super::PathInfoService;
+    use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
+    use crate::pathinfoservice::PathInfoService;
+    use crate::tests::fixtures;
 
     /// This ensures connecting via gRPC works as expected.
-    #[rstest]
     #[tokio::test]
-    async fn test_valid_unix_path_ping_pong(
-        blob_service: Arc<dyn BlobService>,
-        directory_service: Arc<dyn DirectoryService>,
-    ) {
-        let tmpdir = TempDir::new().unwrap();
-        let socket_path = tmpdir.path().join("daemon");
-
-        let path_clone = socket_path.clone();
-
-        // Spin up a server
-        tokio::spawn(async {
-            let uds = UnixListener::bind(path_clone).unwrap();
-            let uds_stream = UnixListenerStream::new(uds);
-
-            // spin up a new server
-            let mut server = tonic::transport::Server::builder();
-            let router = server.add_service(
-                crate::proto::path_info_service_server::PathInfoServiceServer::new(
-                    GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new(
-                        blob_service,
-                        directory_service,
-                    ))
-                        as Box<dyn PathInfoService>),
-                ),
-            );
-            router.serve_with_incoming(uds_stream).await
-        });
-
-        // wait for the socket to be created
-        Retry::spawn(
-            ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
-            || async {
-                if socket_path.exists() {
-                    Ok(())
-                } else {
-                    Err(())
-                }
-            },
-        )
-        .await
-        .expect("failed to wait for socket");
-
-        // prepare a client
-        let grpc_client = {
-            let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display()))
-                .expect("must parse");
-            let client = PathInfoServiceClient::new(
-                tvix_castore::tonic::channel_from_url(&url)
-                    .await
-                    .expect("must succeed"),
-            );
-
-            GRPCPathInfoService::from_client(client)
-        };
+    async fn test_valid_unix_path_ping_pong() {
+        let (_blob_service, _directory_service, path_info_service) =
+            make_grpc_path_info_service_client().await;
 
-        let path_info = grpc_client
-            .get(fixtures::DUMMY_OUTPUT_HASH)
+        let path_info = path_info_service
+            .get(fixtures::DUMMY_PATH_DIGEST)
             .await
             .expect("must not be error");
 
diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs
new file mode 100644
index 0000000000..da674f497a
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/lru.rs
@@ -0,0 +1,128 @@
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use lru::LruCache;
+use nix_compat::nixbase32;
+use std::num::NonZeroUsize;
+use std::sync::Arc;
+use tokio::sync::RwLock;
+use tonic::async_trait;
+use tracing::instrument;
+
+use crate::proto::PathInfo;
+use tvix_castore::Error;
+
+use super::PathInfoService;
+
+pub struct LruPathInfoService {
+    lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>,
+}
+
+impl LruPathInfoService {
+    pub fn with_capacity(capacity: NonZeroUsize) -> Self {
+        Self {
+            lru: Arc::new(RwLock::new(LruCache::new(capacity))),
+        }
+    }
+}
+
+#[async_trait]
+impl PathInfoService for LruPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        Ok(self.lru.write().await.get(&digest).cloned())
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // call validate
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("invalid PathInfo: {}", e)))?;
+
+        self.lru
+            .write()
+            .await
+            .put(*store_path.digest(), path_info.clone());
+
+        Ok(path_info)
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let lru = self.lru.clone();
+        Box::pin(try_stream! {
+            let lru = lru.read().await;
+            let it = lru.iter();
+
+            for (_k,v) in it {
+                yield v.clone()
+            }
+        })
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::num::NonZeroUsize;
+
+    use crate::{
+        pathinfoservice::{LruPathInfoService, PathInfoService},
+        proto::PathInfo,
+        tests::fixtures::PATH_INFO_WITH_NARINFO,
+    };
+    use lazy_static::lazy_static;
+    use tvix_castore::proto as castorepb;
+
+    lazy_static! {
+        static ref PATHINFO_1: PathInfo = PATH_INFO_WITH_NARINFO.clone();
+        static ref PATHINFO_1_DIGEST: [u8; 20] = [0; 20];
+        static ref PATHINFO_2: PathInfo = {
+            let mut p = PATHINFO_1.clone();
+            let root_node = p.node.as_mut().unwrap();
+            if let castorepb::Node { node: Some(node) } = root_node {
+                let n = node.to_owned();
+                *node = n.rename("11111111111111111111111111111111-dummy2".into());
+            } else {
+                unreachable!()
+            }
+            p
+        };
+        static ref PATHINFO_2_DIGEST: [u8; 20] = *(PATHINFO_2.validate().unwrap()).digest();
+    }
+
+    #[tokio::test]
+    async fn evict() {
+        let svc = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap());
+
+        // pathinfo_1 should not be there
+        assert!(svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .is_none());
+
+        // insert it
+        svc.put(PATHINFO_1.clone()).await.expect("no error");
+
+        // now it should be there.
+        assert_eq!(
+            Some(PATHINFO_1.clone()),
+            svc.get(*PATHINFO_1_DIGEST).await.expect("no error")
+        );
+
+        // insert pathinfo_2. This will evict pathinfo 1
+        svc.put(PATHINFO_2.clone()).await.expect("no error");
+
+        // now pathinfo 2 should be there.
+        assert_eq!(
+            Some(PATHINFO_2.clone()),
+            svc.get(*PATHINFO_2_DIGEST).await.expect("no error")
+        );
+
+        // โ€ฆ but pathinfo 1 not anymore.
+        assert!(svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .is_none());
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
index f8435dbbf8..3de3221df2 100644
--- a/tvix/store/src/pathinfoservice/memory.rs
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -1,40 +1,24 @@
 use super::PathInfoService;
-use crate::{nar::calculate_size_and_sha256, proto::PathInfo};
-use futures::stream::{iter, BoxStream};
-use std::{
-    collections::HashMap,
-    sync::{Arc, RwLock},
-};
+use crate::proto::PathInfo;
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use std::{collections::HashMap, sync::Arc};
+use tokio::sync::RwLock;
 use tonic::async_trait;
-use tvix_castore::proto as castorepb;
+use tracing::instrument;
 use tvix_castore::Error;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 
-pub struct MemoryPathInfoService<BS, DS> {
+#[derive(Default)]
+pub struct MemoryPathInfoService {
     db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>,
-
-    blob_service: BS,
-    directory_service: DS,
-}
-
-impl<BS, DS> MemoryPathInfoService<BS, DS> {
-    pub fn new(blob_service: BS, directory_service: DS) -> Self {
-        Self {
-            db: Default::default(),
-            blob_service,
-            directory_service,
-        }
-    }
 }
 
 #[async_trait]
-impl<BS, DS> PathInfoService for MemoryPathInfoService<BS, DS>
-where
-    BS: AsRef<dyn BlobService> + Send + Sync,
-    DS: AsRef<dyn DirectoryService> + Send + Sync,
-{
+impl PathInfoService for MemoryPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
-        let db = self.db.read().unwrap();
+        let db = self.db.read().await;
 
         match db.get(&digest) {
             None => Ok(None),
@@ -42,6 +26,7 @@ where
         }
     }
 
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
         // Call validate on the received PathInfo message.
         match path_info.validate() {
@@ -53,7 +38,7 @@ where
             // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
             // This overwrites existing PathInfo objects.
             Ok(nix_path) => {
-                let mut db = self.db.write().unwrap();
+                let mut db = self.db.write().await;
                 db.insert(*nix_path.digest(), path_info.clone());
 
                 Ok(path_info)
@@ -61,24 +46,16 @@ where
         }
     }
 
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
-            .await
-            .map_err(|e| Error::StorageError(e.to_string()))
-    }
-
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
-        let db = self.db.read().unwrap();
+        let db = self.db.clone();
 
-        // Copy all elements into a list.
-        // This is a bit ugly, because we can't have db escape the lifetime
-        // of this function, but elements need to be returned owned anyways, and this in-
-        // memory impl is only for testing purposes anyways.
-        let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect();
+        Box::pin(try_stream! {
+            let db = db.read().await;
+            let it = db.iter();
 
-        Box::pin(iter(items))
+            for (_k, v) in it {
+                yield v.clone()
+            }
+        })
     }
 }
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index c334c8dc56..574bcc0b8b 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -1,5 +1,7 @@
+mod combinators;
 mod from_addr;
 mod grpc;
+mod lru;
 mod memory;
 mod nix_http;
 mod sled;
@@ -12,17 +14,23 @@ mod tests;
 
 use futures::stream::BoxStream;
 use tonic::async_trait;
-use tvix_castore::proto as castorepb;
 use tvix_castore::Error;
 
 use crate::proto::PathInfo;
 
+pub use self::combinators::Cache as CachePathInfoService;
 pub use self::from_addr::from_addr;
 pub use self::grpc::GRPCPathInfoService;
+pub use self::lru::LruPathInfoService;
 pub use self::memory::MemoryPathInfoService;
 pub use self::nix_http::NixHTTPPathInfoService;
 pub use self::sled::SledPathInfoService;
 
+#[cfg(feature = "cloud")]
+mod bigtable;
+#[cfg(feature = "cloud")]
+pub use self::bigtable::BigtablePathInfoService;
+
 #[cfg(any(feature = "fuse", feature = "virtiofs"))]
 pub use self::fs::make_fs;
 
@@ -36,14 +44,6 @@ pub trait PathInfoService: Send + Sync {
     /// invalid messages.
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>;
 
-    /// Return the nar size and nar sha256 digest for a given root node.
-    /// This can be used to calculate NAR-based output paths,
-    /// and implementations are encouraged to cache it.
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error>;
-
     /// Iterate over all PathInfo objects in the store.
     /// Implementations can decide to disallow listing.
     ///
@@ -67,13 +67,6 @@ where
         self.as_ref().put(path_info).await
     }
 
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        self.as_ref().calculate_nar(root_node).await
-    }
-
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         self.as_ref().list()
     }
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index bdb0e2c3cb..1dd7da4831 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -1,6 +1,5 @@
-use std::io::{self, BufRead, Read, Write};
-
-use data_encoding::BASE64;
+use super::PathInfoService;
+use crate::{nar::ingest_nar_and_hash, proto::PathInfo};
 use futures::{stream::BoxStream, TryStreamExt};
 use nix_compat::{
     narinfo::{self, NarInfo},
@@ -8,17 +7,13 @@ use nix_compat::{
     nixhash::NixHash,
 };
 use reqwest::StatusCode;
-use sha2::{digest::FixedOutput, Digest, Sha256};
+use tokio::io::{self, AsyncRead};
 use tonic::async_trait;
 use tracing::{debug, instrument, warn};
 use tvix_castore::{
     blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
 };
 
-use crate::proto::PathInfo;
-
-use super::PathInfoService;
-
 /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
 /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
 /// Store Model.
@@ -32,8 +27,7 @@ use super::PathInfoService;
 ///
 /// The client is expected to be (indirectly) using the same [BlobService] and
 /// [DirectoryService], so able to fetch referred Directories and Blobs.
-/// [PathInfoService::put] and [PathInfoService::calculate_nar] are not
-/// implemented and return an error if called.
+/// [PathInfoService::put] is not implemented and returns an error if called.
 /// TODO: what about reading from nix-cache-info?
 pub struct NixHTTPPathInfoService<BS, DS> {
     base_url: url::Url,
@@ -71,7 +65,7 @@ where
     BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
     DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
 {
-    #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))]
+    #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         let narinfo_url = self
             .base_url
@@ -171,85 +165,71 @@ where
             )));
         }
 
-        // get an AsyncRead of the response body.
-        let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
+        // get a reader of the response body.
+        let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
             let e = e.without_url();
             warn!(e=%e, "failed to get response body");
             io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
         }));
-        let sync_r = tokio_util::io::SyncIoBridge::new(async_r);
 
-        // handle decompression, by wrapping the reader.
-        let sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
-            Some("none") => Box::new(sync_r),
-            Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
-            Some(comp) => {
-                return Err(Error::InvalidRequest(
-                    format!("unsupported compression: {}", comp).to_string(),
-                ))
-            }
-            None => {
-                return Err(Error::InvalidRequest(
-                    "unsupported compression: bzip2".to_string(),
-                ))
+        // handle decompression, depending on the compression field.
+        let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
+            Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
+            Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some(comp_str) => {
+                return Err(Error::StorageError(format!(
+                    "unsupported compression: {comp_str}"
+                )));
             }
         };
 
-        let res = tokio::task::spawn_blocking({
-            let blob_service = self.blob_service.clone();
-            let directory_service = self.directory_service.clone();
-            move || -> io::Result<_> {
-                // Wrap the reader once more, so we can calculate NarSize and NarHash
-                let mut sync_r = io::BufReader::new(NarReader::from(sync_r));
-                let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?;
-
-                let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner();
-
-                Ok((root_node, nar_hash, nar_size))
-            }
-        })
+        let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+            &mut r,
+        )
         .await
-        .unwrap();
-
-        match res {
-            Ok((root_node, nar_hash, nar_size)) => {
-                // ensure the ingested narhash and narsize do actually match.
-                if narinfo.nar_size != nar_size {
-                    warn!(
-                        narinfo.nar_size = narinfo.nar_size,
-                        http.nar_size = nar_size,
-                        "NARSize mismatch"
-                    );
-                    Err(io::Error::new(
-                        io::ErrorKind::InvalidData,
-                        "NarSize mismatch".to_string(),
-                    ))?;
-                }
-                if narinfo.nar_hash != nar_hash {
-                    warn!(
-                        narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
-                        http.nar_hash = %NixHash::Sha256(nar_hash),
-                        "NarHash mismatch"
-                    );
-                    Err(io::Error::new(
-                        io::ErrorKind::InvalidData,
-                        "NarHash mismatch".to_string(),
-                    ))?;
-                }
-
-                Ok(Some(PathInfo {
-                    node: Some(castorepb::Node {
-                        // set the name of the root node to the digest-name of the store path.
-                        node: Some(
-                            root_node.rename(narinfo.store_path.to_string().to_owned().into()),
-                        ),
-                    }),
-                    references: pathinfo.references,
-                    narinfo: pathinfo.narinfo,
-                }))
-            }
-            Err(e) => Err(e.into()),
+        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
+
+        // ensure the ingested narhash and narsize do actually match.
+        if narinfo.nar_size != nar_size {
+            warn!(
+                narinfo.nar_size = narinfo.nar_size,
+                http.nar_size = nar_size,
+                "NarSize mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarSize mismatch".to_string(),
+            ))?;
         }
+        if narinfo.nar_hash != nar_hash {
+            warn!(
+                narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
+                http.nar_hash = %NixHash::Sha256(nar_hash),
+                "NarHash mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarHash mismatch".to_string(),
+            ))?;
+        }
+
+        Ok(Some(PathInfo {
+            node: Some(castorepb::Node {
+                // set the name of the root node to the digest-name of the store path.
+                node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
+            }),
+            references: pathinfo.references,
+            narinfo: pathinfo.narinfo,
+        }))
     }
 
     #[instrument(skip_all, fields(path_info=?_path_info))]
@@ -259,16 +239,6 @@ where
         ))
     }
 
-    #[instrument(skip_all, fields(root_node=?root_node))]
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        Err(Error::InvalidRequest(
-            "calculate_nar not supported for this backend".to_string(),
-        ))
-    }
-
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         Box::pin(futures::stream::once(async {
             Err(Error::InvalidRequest(
@@ -277,38 +247,3 @@ where
         }))
     }
 }
-
-/// Small helper reader implementing [std::io::Read].
-/// It can be used to wrap another reader, counts the number of bytes read
-/// and the sha256 digest of the contents.
-struct NarReader<R: Read> {
-    r: R,
-
-    sha256: sha2::Sha256,
-    bytes_read: u64,
-}
-
-impl<R: Read> NarReader<R> {
-    pub fn from(inner: R) -> Self {
-        Self {
-            r: inner,
-            sha256: Sha256::new(),
-            bytes_read: 0,
-        }
-    }
-
-    /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read.
-    pub fn into_inner(self) -> (R, [u8; 32], u64) {
-        (self.r, self.sha256.finalize_fixed().into(), self.bytes_read)
-    }
-}
-
-impl<R: Read> Read for NarReader<R> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        self.r.read(buf).map(|n| {
-            self.bytes_read += n as u64;
-            self.sha256.write_all(&buf[..n]).unwrap();
-            n
-        })
-    }
-}
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index 7b6d7fd7ab..96ade18169 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -1,140 +1,116 @@
 use super::PathInfoService;
-use crate::nar::calculate_size_and_sha256;
 use crate::proto::PathInfo;
-use futures::stream::iter;
+use async_stream::try_stream;
 use futures::stream::BoxStream;
+use nix_compat::nixbase32;
 use prost::Message;
 use std::path::Path;
 use tonic::async_trait;
-use tracing::warn;
-use tvix_castore::proto as castorepb;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+use tracing::{instrument, warn};
+use tvix_castore::Error;
 
 /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
 ///
 /// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
 /// as that's currently the only request type available.
-pub struct SledPathInfoService<BS, DS> {
+pub struct SledPathInfoService {
     db: sled::Db,
-
-    blob_service: BS,
-    directory_service: DS,
 }
 
-impl<BS, DS> SledPathInfoService<BS, DS> {
-    pub fn new<P: AsRef<Path>>(
-        p: P,
-        blob_service: BS,
-        directory_service: DS,
-    ) -> Result<Self, sled::Error> {
+impl SledPathInfoService {
+    pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
         let config = sled::Config::default()
             .use_compression(false) // is a required parameter
             .path(p);
         let db = config.open()?;
 
-        Ok(Self {
-            db,
-            blob_service,
-            directory_service,
-        })
+        Ok(Self { db })
     }
 
-    pub fn new_temporary(blob_service: BS, directory_service: DS) -> Result<Self, sled::Error> {
+    pub fn new_temporary() -> Result<Self, sled::Error> {
         let config = sled::Config::default().temporary(true);
         let db = config.open()?;
 
-        Ok(Self {
-            db,
-            blob_service,
-            directory_service,
-        })
+        Ok(Self { db })
     }
 }
 
 #[async_trait]
-impl<BS, DS> PathInfoService for SledPathInfoService<BS, DS>
-where
-    BS: AsRef<dyn BlobService> + Send + Sync,
-    DS: AsRef<dyn DirectoryService> + Send + Sync,
-{
+impl PathInfoService for SledPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(None),
-            Ok(Some(data)) => match PathInfo::decode(&*data) {
-                Ok(path_info) => Ok(Some(path_info)),
-                Err(e) => {
+        let resp = tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            move || db.get(digest.as_slice())
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to retrieve PathInfo: {}", e);
+            Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+        })?;
+        match resp {
+            None => Ok(None),
+            Some(data) => {
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
                     warn!("failed to decode stored PathInfo: {}", e);
-                    Err(Error::StorageError(format!(
-                        "failed to decode stored PathInfo: {}",
-                        e
-                    )))
-                }
-            },
-            Err(e) => {
-                warn!("failed to retrieve PathInfo: {}", e);
-                Err(Error::StorageError(format!(
-                    "failed to retrieve PathInfo: {}",
-                    e
-                )))
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+                Ok(Some(path_info))
             }
         }
     }
 
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
         // Call validate on the received PathInfo message.
-        match path_info.validate() {
-            Err(e) => Err(Error::InvalidRequest(format!(
-                "failed to validate PathInfo: {}",
-                e
-            ))),
-            // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
-            // This overwrites existing PathInfo objects.
-            Ok(nix_path) => match self
-                .db
-                .insert(*nix_path.digest(), path_info.encode_to_vec())
-            {
-                Ok(_) => Ok(path_info),
-                Err(e) => {
-                    warn!("failed to insert PathInfo: {}", e);
-                    Err(Error::StorageError(format! {
-                        "failed to insert PathInfo: {}", e
-                    }))
-                }
-            },
-        }
-    }
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?;
+
+        // In case the PathInfo is valid, we were able to parse a StorePath.
+        // Store it in the database, keyed by its digest.
+        // This overwrites existing PathInfo objects.
+        tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            let k = *store_path.digest();
+            let data = path_info.encode_to_vec();
+            move || db.insert(k, data)
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to insert PathInfo: {}", e);
+            Error::StorageError(format! {
+                "failed to insert PathInfo: {}", e
+            })
+        })?;
 
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
-            .await
-            .map_err(|e| Error::StorageError(e.to_string()))
+        Ok(path_info)
     }
 
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
-        Box::pin(iter(self.db.iter().values().map(|v| match v {
-            Ok(data) => {
-                // we retrieved some bytes
-                match PathInfo::decode(&*data) {
-                    Ok(path_info) => Ok(path_info),
-                    Err(e) => {
-                        warn!("failed to decode stored PathInfo: {}", e);
-                        Err(Error::StorageError(format!(
-                            "failed to decode stored PathInfo: {}",
-                            e
-                        )))
-                    }
-                }
-            }
-            Err(e) => {
-                warn!("failed to retrieve PathInfo: {}", e);
-                Err(Error::StorageError(format!(
-                    "failed to retrieve PathInfo: {}",
-                    e
-                )))
+        let db = self.db.clone();
+        let mut it = db.iter().values();
+
+        Box::pin(try_stream! {
+            // Don't block the executor while waiting for .next(), so wrap that
+            // in a spawn_blocking call.
+            // We need to pass around it to be able to reuse it.
+            while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || {
+                (it.next(), it)
+            }).await? {
+                it = new_it;
+                let data = elem.map_err(|e| {
+                    warn!("failed to retrieve PathInfo: {}", e);
+                    Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+                })?;
+
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
+                    warn!("failed to decode stored PathInfo: {}", e);
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+
+                yield path_info
             }
-        })))
+        })
     }
 }
diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs
index a3035d094d..061655e4ba 100644
--- a/tvix/store/src/pathinfoservice/tests/mod.rs
+++ b/tvix/store/src/pathinfoservice/tests/mod.rs
@@ -4,61 +4,30 @@
 
 use rstest::*;
 use rstest_reuse::{self, *};
-use std::sync::Arc;
-use tvix_castore::proto as castorepb;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 
 use super::PathInfoService;
+use crate::pathinfoservice::MemoryPathInfoService;
+use crate::pathinfoservice::SledPathInfoService;
 use crate::proto::PathInfo;
-use crate::tests::fixtures::DUMMY_OUTPUT_HASH;
+use crate::tests::fixtures::DUMMY_PATH_DIGEST;
+use tvix_castore::proto as castorepb;
 
 mod utils;
-use self::utils::make_grpc_path_info_service_client;
-
-/// Convenience type alias batching all three servives together.
-#[allow(clippy::upper_case_acronyms)]
-type BSDSPS = (
-    Arc<dyn BlobService>,
-    Arc<dyn DirectoryService>,
-    Box<dyn PathInfoService>,
-);
-
-/// Creates a PathInfoService using a new Memory{Blob,Directory}Service.
-/// We return a 3-tuple containing all of them, as some tests want to interact
-/// with all three.
-pub async fn make_path_info_service(uri: &str) -> BSDSPS {
-    let blob_service: Arc<dyn BlobService> = tvix_castore::blobservice::from_addr("memory://")
-        .await
-        .unwrap()
-        .into();
-    let directory_service: Arc<dyn DirectoryService> =
-        tvix_castore::directoryservice::from_addr("memory://")
-            .await
-            .unwrap()
-            .into();
+pub use self::utils::make_grpc_path_info_service_client;
 
-    (
-        blob_service.clone(),
-        directory_service.clone(),
-        crate::pathinfoservice::from_addr(uri, blob_service, directory_service)
-            .await
-            .unwrap(),
-    )
-}
+#[cfg(all(feature = "cloud", feature = "integration"))]
+use self::utils::make_bigtable_path_info_service;
 
 #[template]
 #[rstest]
-#[case::memory(make_path_info_service("memory://").await)]
-#[case::grpc(make_grpc_path_info_service_client().await)]
-#[case::sled(make_path_info_service("sled://").await)]
-pub fn path_info_services(
-    #[case] services: (
-        impl BlobService,
-        impl DirectoryService,
-        impl PathInfoService,
-    ),
-) {
-}
+#[case::memory(MemoryPathInfoService::default())]
+#[case::grpc({
+    let (_, _, svc) = make_grpc_path_info_service_client().await;
+    svc
+})]
+#[case::sled(SledPathInfoService::new_temporary().unwrap())]
+#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))]
+pub fn path_info_services(#[case] svc: impl PathInfoService) {}
 
 // FUTUREWORK: add more tests rejecting invalid PathInfo messages.
 // A subset of them should also ensure references to other PathInfos, or
@@ -67,10 +36,9 @@ pub fn path_info_services(
 /// Trying to get a non-existent PathInfo should return Ok(None).
 #[apply(path_info_services)]
 #[tokio::test]
-async fn not_found(services: BSDSPS) {
-    let (_, _, path_info_service) = services;
-    assert!(path_info_service
-        .get(DUMMY_OUTPUT_HASH)
+async fn not_found(svc: impl PathInfoService) {
+    assert!(svc
+        .get(DUMMY_PATH_DIGEST)
         .await
         .expect("must succeed")
         .is_none());
@@ -79,9 +47,7 @@ async fn not_found(services: BSDSPS) {
 /// Put a PathInfo into the store, get it back.
 #[apply(path_info_services)]
 #[tokio::test]
-async fn put_get(services: BSDSPS) {
-    let (_, _, path_info_service) = services;
-
+async fn put_get(svc: impl PathInfoService) {
     let path_info = PathInfo {
         node: Some(castorepb::Node {
             node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode {
@@ -93,20 +59,14 @@ async fn put_get(services: BSDSPS) {
     };
 
     // insert
-    let resp = path_info_service
-        .put(path_info.clone())
-        .await
-        .expect("must succeed");
+    let resp = svc.put(path_info.clone()).await.expect("must succeed");
 
     // expect the returned PathInfo to be equal (for now)
     // in the future, some stores might add additional fields/signatures.
     assert_eq!(path_info, resp);
 
     // get it back
-    let resp = path_info_service
-        .get(DUMMY_OUTPUT_HASH)
-        .await
-        .expect("must succeed");
+    let resp = svc.get(DUMMY_PATH_DIGEST).await.expect("must succeed");
 
     assert_eq!(Some(path_info), resp);
 }
diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs
index 31ec57aade..ee170468d1 100644
--- a/tvix/store/src/pathinfoservice/tests/utils.rs
+++ b/tvix/store/src/pathinfoservice/tests/utils.rs
@@ -1,8 +1,10 @@
 use std::sync::Arc;
 
 use tonic::transport::{Endpoint, Server, Uri};
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 
 use crate::{
+    nar::{NarCalculationService, SimpleRenderer},
     pathinfoservice::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService},
     proto::{
         path_info_service_client::PathInfoServiceClient,
@@ -14,7 +16,8 @@ use crate::{
 /// Constructs and returns a gRPC PathInfoService.
 /// We also return memory-based {Blob,Directory}Service,
 /// as the consumer of this function accepts a 3-tuple.
-pub async fn make_grpc_path_info_service_client() -> super::BSDSPS {
+pub async fn make_grpc_path_info_service_client(
+) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) {
     let (left, right) = tokio::io::duplex(64);
 
     let blob_service = blob_service();
@@ -26,12 +29,15 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS {
         let directory_service = directory_service.clone();
         async move {
             let path_info_service: Arc<dyn PathInfoService> =
-                Arc::from(MemoryPathInfoService::new(blob_service, directory_service));
+                Arc::from(MemoryPathInfoService::default());
+            let nar_calculation_service =
+                Box::new(SimpleRenderer::new(blob_service, directory_service))
+                    as Box<dyn NarCalculationService>;
 
-            // spin up a new DirectoryService
+            // spin up a new PathInfoService
             let mut server = Server::builder();
             let router = server.add_service(PathInfoServiceServer::new(
-                GRPCPathInfoServiceWrapper::new(path_info_service),
+                GRPCPathInfoServiceWrapper::new(path_info_service, nar_calculation_service),
             ));
 
             router
@@ -43,18 +49,27 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS {
     // Create a client, connecting to the right side. The URI is unused.
     let mut maybe_right = Some(right);
 
-    let path_info_service = Box::new(GRPCPathInfoService::from_client(
-        PathInfoServiceClient::new(
-            Endpoint::try_from("http://[::]:50051")
-                .unwrap()
-                .connect_with_connector(tower::service_fn(move |_: Uri| {
-                    let right = maybe_right.take().unwrap();
-                    async move { Ok::<_, std::io::Error>(right) }
-                }))
-                .await
-                .unwrap(),
-        ),
+    let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
     ));
 
     (blob_service, directory_service, path_info_service)
 }
+
+#[cfg(all(feature = "cloud", feature = "integration"))]
+pub(crate) async fn make_bigtable_path_info_service(
+) -> crate::pathinfoservice::BigtablePathInfoService {
+    use crate::pathinfoservice::bigtable::BigtableParameters;
+    use crate::pathinfoservice::BigtablePathInfoService;
+
+    BigtablePathInfoService::connect(BigtableParameters::default_for_tests())
+        .await
+        .unwrap()
+}
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 9f45818227..68f5575676 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -1,4 +1,4 @@
-use crate::nar::RenderError;
+use crate::nar::{NarCalculationService, RenderError};
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
 use futures::{stream::BoxStream, TryStreamExt};
@@ -7,23 +7,26 @@ use tonic::{async_trait, Request, Response, Result, Status};
 use tracing::{instrument, warn};
 use tvix_castore::proto as castorepb;
 
-pub struct GRPCPathInfoServiceWrapper<PS> {
-    inner: PS,
+pub struct GRPCPathInfoServiceWrapper<PS, NS> {
+    path_info_service: PS,
     // FUTUREWORK: allow exposing without allowing listing
+    nar_calculation_service: NS,
 }
 
-impl<PS> GRPCPathInfoServiceWrapper<PS> {
-    pub fn new(path_info_service: PS) -> Self {
+impl<PS, NS> GRPCPathInfoServiceWrapper<PS, NS> {
+    pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self {
         Self {
-            inner: path_info_service,
+            path_info_service,
+            nar_calculation_service,
         }
     }
 }
 
 #[async_trait]
-impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS>
+impl<PS, NS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS>
 where
     PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
+    NS: NarCalculationService + Send + Sync + 'static,
 {
     type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
 
@@ -39,7 +42,7 @@ where
                     .to_vec()
                     .try_into()
                     .map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
-                match self.inner.get(digest).await {
+                match self.path_info_service.get(digest).await {
                     Ok(None) => Err(Status::not_found("PathInfo not found")),
                     Ok(Some(path_info)) => Ok(Response::new(path_info)),
                     Err(e) => {
@@ -57,7 +60,7 @@ where
 
         // Store the PathInfo in the client. Clients MUST validate the data
         // they receive, so we don't validate additionally here.
-        match self.inner.put(path_info).await {
+        match self.path_info_service.put(path_info).await {
             Ok(path_info_new) => Ok(Response::new(path_info_new)),
             Err(e) => {
                 warn!(err = %e, "failed to put PathInfo");
@@ -79,7 +82,7 @@ where
                     Err(Status::invalid_argument("invalid root node"))?
                 }
 
-                match self.inner.calculate_nar(&root_node).await {
+                match self.nar_calculation_service.calculate_nar(&root_node).await {
                     Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
                         nar_size,
                         nar_sha256: nar_sha256.to_vec().into(),
@@ -99,7 +102,7 @@ where
         _request: Request<proto::ListPathInfoRequest>,
     ) -> Result<Response<Self::ListStream>, Status> {
         let stream = Box::pin(
-            self.inner
+            self.path_info_service
                 .list()
                 .map_err(|e| Status::internal(e.to_string())),
         );
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
index 10fde33527..a09839c8bd 100644
--- a/tvix/store/src/proto/mod.rs
+++ b/tvix/store/src/proto/mod.rs
@@ -74,23 +74,20 @@ pub enum ValidatePathInfoError {
 
 /// Parses a root node name.
 ///
-/// On success, this returns the parsed [store_path::StorePath].
+/// On success, this returns the parsed [store_path::StorePathRef].
 /// On error, it returns an error generated from the supplied constructor.
 fn parse_node_name_root<E>(
     name: &[u8],
     err: fn(Vec<u8>, store_path::Error) -> E,
-) -> Result<store_path::StorePath, E> {
-    match store_path::StorePath::from_bytes(name) {
-        Ok(np) => Ok(np),
-        Err(e) => Err(err(name.to_vec(), e)),
-    }
+) -> Result<store_path::StorePathRef<'_>, E> {
+    store_path::StorePathRef::from_bytes(name).map_err(|e| err(name.to_vec(), e))
 }
 
 impl PathInfo {
     /// validate performs some checks on the PathInfo struct,
     /// Returning either a [store_path::StorePath] of the root node, or a
     /// [ValidatePathInfoError].
-    pub fn validate(&self) -> Result<store_path::StorePath, ValidatePathInfoError> {
+    pub fn validate(&self) -> Result<store_path::StorePathRef<'_>, ValidatePathInfoError> {
         // ensure the references have the right number of bytes.
         for (i, reference) in self.references.iter().enumerate() {
             if reference.len() != store_path::DIGEST_SIZE {
@@ -154,8 +151,7 @@ impl PathInfo {
                 // converting to this field.
                 if let Some(deriver) = &narinfo.deriver {
                     store_path::StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest)
-                        .map_err(ValidatePathInfoError::InvalidDeriverField)?
-                        .to_owned();
+                        .map_err(ValidatePathInfoError::InvalidDeriverField)?;
                 }
             }
         }
@@ -177,12 +173,16 @@ impl PathInfo {
         Ok(root_nix_path)
     }
 
-    /// With self and a given StorePathRef, this reconstructs a
+    /// With self and its store path name, this reconstructs a
     /// [nix_compat::narinfo::NarInfo<'_>].
     /// It can be used to validate Signatures, or get back a (sparse) NarInfo
     /// struct to prepare writing it out.
     ///
-    /// This doesn't allocate any new data.
+    /// It assumes self to be validated first, and will only return None if the
+    /// `narinfo` field is unpopulated.
+    ///
+    /// It does very little allocation (a Vec each for `signatures` and
+    /// `references`), the rest points to data owned elsewhere.
     ///
     /// Keep in mind this is not able to reconstruct all data present in the
     /// NarInfo<'_>, as some of it is not stored at all:
@@ -192,7 +192,7 @@ impl PathInfo {
     ///
     /// If you want to render it out to a string and be able to parse it back
     /// in, at least URL *must* be set again.
-    pub fn as_narinfo<'a>(
+    pub fn to_narinfo<'a>(
         &'a self,
         store_path: store_path::StorePathRef<'a>,
     ) -> Option<nix_compat::narinfo::NarInfo<'_>> {
@@ -201,7 +201,11 @@ impl PathInfo {
         Some(nix_compat::narinfo::NarInfo {
             flags: Flags::empty(),
             store_path,
-            nar_hash: narinfo.nar_sha256.to_vec().try_into().unwrap(),
+            nar_hash: narinfo
+                .nar_sha256
+                .as_ref()
+                .try_into()
+                .expect("invalid narhash"),
             nar_size: narinfo.nar_size,
             references: narinfo
                 .reference_names
diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs
index dca74dc92f..4d0834878d 100644
--- a/tvix/store/src/proto/tests/pathinfo.rs
+++ b/tvix/store/src/proto/tests/pathinfo.rs
@@ -3,97 +3,82 @@ use crate::tests::fixtures::*;
 use bytes::Bytes;
 use data_encoding::BASE64;
 use nix_compat::nixbase32;
-use nix_compat::store_path::{self, StorePath, StorePathRef};
-use std::str::FromStr;
-use test_case::test_case;
+use nix_compat::store_path::{self, StorePathRef};
+use rstest::rstest;
 use tvix_castore::proto as castorepb;
 
-#[test_case(
-    None,
-    Err(ValidatePathInfoError::NoNodePresent) ;
-    "No node"
-)]
-#[test_case(
-    Some(castorepb::Node { node: None }),
-    Err(ValidatePathInfoError::NoNodePresent);
-    "No node 2"
-)]
-fn validate_no_node(
-    t_node: Option<castorepb::Node>,
-    t_result: Result<StorePath, ValidatePathInfoError>,
+#[rstest]
+#[case::no_node(None, Err(ValidatePathInfoError::NoNodePresent))]
+#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::NoNodePresent))]
+
+fn validate_pathinfo(
+    #[case] node: Option<castorepb::Node>,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
 ) {
     // construct the PathInfo object
     let p = PathInfo {
-        node: t_node,
+        node,
         ..Default::default()
     };
-    assert_eq!(t_result, p.validate());
+
+    assert_eq!(exp_result, p.validate());
+
+    let err = p.validate().expect_err("validation should fail");
+    assert!(matches!(err, ValidatePathInfoError::NoNodePresent));
 }
 
-#[test_case(
-    castorepb::DirectoryNode {
-        name: DUMMY_NAME.into(),
+#[rstest]
+#[case::ok(castorepb::DirectoryNode {
+        name: DUMMY_PATH.into(),
         digest: DUMMY_DIGEST.clone().into(),
         size: 0,
-    },
-    Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed"));
-    "ok"
-)]
-#[test_case(
-    castorepb::DirectoryNode {
-        name: DUMMY_NAME.into(),
+}, Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))]
+#[case::invalid_digest_length(castorepb::DirectoryNode {
+        name: DUMMY_PATH.into(),
         digest: Bytes::new(),
         size: 0,
-    },
-    Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0)));
-    "invalid digest length"
-)]
-#[test_case(
-    castorepb::DirectoryNode {
+}, Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))))]
+#[case::invalid_node_name_no_storepath(castorepb::DirectoryNode {
         name: "invalid".into(),
         digest: DUMMY_DIGEST.clone().into(),
         size: 0,
-    },
-    Err(ValidatePathInfoError::InvalidNodeName(
+}, Err(ValidatePathInfoError::InvalidNodeName(
         "invalid".into(),
         store_path::Error::InvalidLength
-    ));
-    "invalid node name"
-)]
+)))]
 fn validate_directory(
-    t_directory_node: castorepb::DirectoryNode,
-    t_result: Result<StorePath, ValidatePathInfoError>,
+    #[case] directory_node: castorepb::DirectoryNode,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
 ) {
     // construct the PathInfo object
     let p = PathInfo {
         node: Some(castorepb::Node {
-            node: Some(castorepb::node::Node::Directory(t_directory_node)),
+            node: Some(castorepb::node::Node::Directory(directory_node)),
         }),
         ..Default::default()
     };
-    assert_eq!(t_result, p.validate());
+    assert_eq!(exp_result, p.validate());
 }
 
-#[test_case(
+#[rstest]
+#[case::ok(
     castorepb::FileNode {
-        name: DUMMY_NAME.into(),
+        name: DUMMY_PATH.into(),
         digest: DUMMY_DIGEST.clone().into(),
         size: 0,
         executable: false,
     },
-    Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed"));
-    "ok"
+    Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
 )]
-#[test_case(
+#[case::invalid_digest_len(
     castorepb::FileNode {
-        name: DUMMY_NAME.into(),
+        name: DUMMY_PATH.into(),
         digest: Bytes::new(),
         ..Default::default()
     },
-    Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0)));
-    "invalid digest length"
+    Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0)))
 )]
-#[test_case(
+#[case::invalid_node_name(
     castorepb::FileNode {
         name: "invalid".into(),
         digest: DUMMY_DIGEST.clone().into(),
@@ -102,32 +87,31 @@ fn validate_directory(
     Err(ValidatePathInfoError::InvalidNodeName(
         "invalid".into(),
         store_path::Error::InvalidLength
-    ));
-    "invalid node name"
+    ))
 )]
 fn validate_file(
-    t_file_node: castorepb::FileNode,
-    t_result: Result<StorePath, ValidatePathInfoError>,
+    #[case] file_node: castorepb::FileNode,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
 ) {
     // construct the PathInfo object
     let p = PathInfo {
         node: Some(castorepb::Node {
-            node: Some(castorepb::node::Node::File(t_file_node)),
+            node: Some(castorepb::node::Node::File(file_node)),
         }),
         ..Default::default()
     };
-    assert_eq!(t_result, p.validate());
+    assert_eq!(exp_result, p.validate());
 }
 
-#[test_case(
+#[rstest]
+#[case::ok(
     castorepb::SymlinkNode {
-        name: DUMMY_NAME.into(),
+        name: DUMMY_PATH.into(),
         target: "foo".into(),
     },
-    Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed"));
-    "ok"
+    Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
 )]
-#[test_case(
+#[case::invalid_node_name(
     castorepb::SymlinkNode {
         name: "invalid".into(),
         target: "foo".into(),
@@ -135,21 +119,20 @@ fn validate_file(
     Err(ValidatePathInfoError::InvalidNodeName(
         "invalid".into(),
         store_path::Error::InvalidLength
-    ));
-    "invalid node name"
+    ))
 )]
 fn validate_symlink(
-    t_symlink_node: castorepb::SymlinkNode,
-    t_result: Result<StorePath, ValidatePathInfoError>,
+    #[case] symlink_node: castorepb::SymlinkNode,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
 ) {
     // construct the PathInfo object
     let p = PathInfo {
         node: Some(castorepb::Node {
-            node: Some(castorepb::node::Node::Symlink(t_symlink_node)),
+            node: Some(castorepb::node::Node::Symlink(symlink_node)),
         }),
         ..Default::default()
     };
-    assert_eq!(t_result, p.validate());
+    assert_eq!(exp_result, p.validate());
 }
 
 /// Ensure parsing a correct PathInfo without narinfo populated succeeds.
@@ -236,7 +219,7 @@ fn validate_inconsistent_narinfo_reference_name_digest() {
     match path_info.validate().expect_err("must fail") {
         ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest(0, e_expected, e_actual) => {
             assert_eq!(path_info.references[0][..], e_expected[..]);
-            assert_eq!(DUMMY_OUTPUT_HASH, e_actual);
+            assert_eq!(DUMMY_PATH_DIGEST, e_actual);
         }
         e => panic!("unexpected error: {:?}", e),
     }
@@ -274,7 +257,7 @@ fn validate_valid_deriver() {
     let narinfo = path_info.narinfo.as_mut().unwrap();
     narinfo.deriver = Some(crate::proto::StorePath {
         name: "foo".to_string(),
-        digest: Bytes::from(DUMMY_OUTPUT_HASH.as_slice()),
+        digest: Bytes::from(DUMMY_PATH_DIGEST.as_slice()),
     });
 
     path_info.validate().expect("must validate");
@@ -425,7 +408,7 @@ CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"#
     let path_info: PathInfo = (&narinfo_parsed).into();
 
     let mut narinfo_returned = path_info
-        .as_narinfo(
+        .to_narinfo(
             StorePathRef::from_bytes(b"pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz")
                 .expect("invalid storepath"),
         )
diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs
index 500ac0aa5b..1c8359a2c0 100644
--- a/tvix/store/src/tests/fixtures.rs
+++ b/tvix/store/src/tests/fixtures.rs
@@ -13,8 +13,8 @@ use crate::proto::{
     NarInfo, PathInfo,
 };
 
-pub const DUMMY_NAME: &str = "00000000000000000000000000000000-dummy";
-pub const DUMMY_OUTPUT_HASH: [u8; 20] = [0; 20];
+pub const DUMMY_PATH: &str = "00000000000000000000000000000000-dummy";
+pub const DUMMY_PATH_DIGEST: [u8; 20] = [0; 20];
 
 lazy_static! {
     /// The NAR representation of a symlink pointing to `/nix/store/somewhereelse`
@@ -106,12 +106,12 @@ lazy_static! {
     pub static ref PATH_INFO_WITHOUT_NARINFO : PathInfo = PathInfo {
         node: Some(castorepb::Node {
             node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode {
-                name: DUMMY_NAME.into(),
+                name: DUMMY_PATH.into(),
                 digest: DUMMY_DIGEST.clone().into(),
                 size: 0,
             })),
         }),
-        references: vec![DUMMY_OUTPUT_HASH.as_slice().into()],
+        references: vec![DUMMY_PATH_DIGEST.as_slice().into()],
         narinfo: None,
     };
 
@@ -123,7 +123,7 @@ lazy_static! {
             nar_size: 0,
             nar_sha256: DUMMY_DIGEST.clone().into(),
             signatures: vec![],
-            reference_names: vec![DUMMY_NAME.to_string()],
+            reference_names: vec![DUMMY_PATH.to_string()],
             deriver: None,
             ca: Some(Ca { r#type: ca::Hash::NarSha256.into(), digest:  DUMMY_DIGEST.clone().into() })
         }),
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs
index 041a9e683d..e6e42f6ec4 100644
--- a/tvix/store/src/utils.rs
+++ b/tvix/store/src/utils.rs
@@ -1,13 +1,19 @@
 use std::sync::Arc;
+use std::{
+    pin::Pin,
+    task::{self, Poll},
+};
+use tokio::io::{self, AsyncWrite};
 
 use tvix_castore::{
     blobservice::{self, BlobService},
     directoryservice::{self, DirectoryService},
 };
 
+use crate::nar::{NarCalculationService, SimpleRenderer};
 use crate::pathinfoservice::{self, PathInfoService};
 
-/// Construct the three store handles from their addrs.
+/// Construct the store handles from their addrs.
 pub async fn construct_services(
     blob_service_addr: impl AsRef<str>,
     directory_service_addr: impl AsRef<str>,
@@ -16,6 +22,7 @@ pub async fn construct_services(
     Arc<dyn BlobService>,
     Arc<dyn DirectoryService>,
     Box<dyn PathInfoService>,
+    Box<dyn NarCalculationService>,
 )> {
     let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref())
         .await?
@@ -31,5 +38,41 @@ pub async fn construct_services(
     )
     .await?;
 
-    Ok((blob_service, directory_service, path_info_service))
+    // TODO: grpc client also implements NarCalculationService
+    let nar_calculation_service = Box::new(SimpleRenderer::new(
+        blob_service.clone(),
+        directory_service.clone(),
+    )) as Box<dyn NarCalculationService>;
+
+    Ok((
+        blob_service,
+        directory_service,
+        path_info_service,
+        nar_calculation_service,
+    ))
+}
+
+/// The inverse of [tokio_util::io::SyncIoBridge].
+/// Don't use this with anything that actually does blocking I/O.
+pub struct AsyncIoBridge<T>(pub T);
+
+impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        _cx: &mut task::Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        Poll::Ready(self.get_mut().0.write(buf))
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+        Poll::Ready(self.get_mut().0.flush())
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        _cx: &mut task::Context<'_>,
+    ) -> Poll<Result<(), io::Error>> {
+        Poll::Ready(Ok(()))
+    }
 }