about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/bin/tvix-store.rs617
-rw-r--r--tvix/store/src/import.rs183
-rw-r--r--tvix/store/src/lib.rs14
-rw-r--r--tvix/store/src/nar/import.rs237
-rw-r--r--tvix/store/src/nar/mod.rs52
-rw-r--r--tvix/store/src/nar/renderer.rs222
-rw-r--r--tvix/store/src/pathinfoservice/bigtable.rs412
-rw-r--r--tvix/store/src/pathinfoservice/combinators.rs111
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs236
-rw-r--r--tvix/store/src/pathinfoservice/fs/mod.rs81
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs154
-rw-r--r--tvix/store/src/pathinfoservice/lru.rs128
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs61
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs73
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs266
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs117
-rw-r--r--tvix/store/src/pathinfoservice/tests/mod.rs72
-rw-r--r--tvix/store/src/pathinfoservice/tests/utils.rs75
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs124
-rw-r--r--tvix/store/src/proto/mod.rs374
-rw-r--r--tvix/store/src/proto/tests/mod.rs1
-rw-r--r--tvix/store/src/proto/tests/pathinfo.rs431
-rw-r--r--tvix/store/src/tests/fixtures.rs142
-rw-r--r--tvix/store/src/tests/mod.rs2
-rw-r--r--tvix/store/src/tests/nar_renderer.rs228
-rw-r--r--tvix/store/src/utils.rs78
26 files changed, 4491 insertions, 0 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
new file mode 100644
index 0000000000..2a2d6fe6f7
--- /dev/null
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -0,0 +1,617 @@
+use clap::Parser;
+use clap::Subcommand;
+
+use futures::future::try_join_all;
+use futures::StreamExt;
+use futures::TryStreamExt;
+use indicatif::ProgressStyle;
+use nix_compat::path_info::ExportedPathInfo;
+use serde::Deserialize;
+use serde::Serialize;
+use std::path::PathBuf;
+use std::sync::Arc;
+use tokio_listener::Listener;
+use tokio_listener::SystemOptions;
+use tokio_listener::UserOptions;
+use tonic::transport::Server;
+use tracing::info;
+use tracing::info_span;
+use tracing::instrument;
+use tracing::Level;
+use tracing::Span;
+use tracing_indicatif::filter::IndicatifFilter;
+use tracing_indicatif::{span_ext::IndicatifSpanExt, IndicatifLayer};
+use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
+use tvix_castore::import::fs::ingest_path;
+use tvix_store::nar::NarCalculationService;
+use tvix_store::proto::NarInfo;
+use tvix_store::proto::PathInfo;
+
+use tvix_castore::proto::blob_service_server::BlobServiceServer;
+use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
+use tvix_castore::proto::GRPCBlobServiceWrapper;
+use tvix_castore::proto::GRPCDirectoryServiceWrapper;
+use tvix_store::pathinfoservice::PathInfoService;
+use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
+use tvix_store::proto::GRPCPathInfoServiceWrapper;
+
+use lazy_static::lazy_static;
+
+// FUTUREWORK: move this to tracing crate
+lazy_static! {
+    pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template(
+        "{span_child_prefix}{bar:30} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+    )
+    .expect("invalid progress template");
+    pub static ref PB_SPINNER_STYLE: ProgressStyle = ProgressStyle::with_template(
+        "{span_child_prefix}{spinner} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+    )
+    .expect("invalid progress template");
+}
+
+#[cfg(any(feature = "fuse", feature = "virtiofs"))]
+use tvix_store::pathinfoservice::make_fs;
+
+#[cfg(feature = "fuse")]
+use tvix_castore::fs::fuse::FuseDaemon;
+
+#[cfg(feature = "otlp")]
+use opentelemetry::KeyValue;
+#[cfg(feature = "otlp")]
+use opentelemetry_sdk::{
+    resource::{ResourceDetector, SdkProvidedResourceDetector},
+    trace::BatchConfig,
+    Resource,
+};
+
+#[cfg(feature = "virtiofs")]
+use tvix_castore::fs::virtiofs::start_virtiofs_daemon;
+
+#[cfg(feature = "tonic-reflection")]
+use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET;
+#[cfg(feature = "tonic-reflection")]
+use tvix_store::proto::FILE_DESCRIPTOR_SET;
+
+#[derive(Parser)]
+#[command(author, version, about, long_about = None)]
+struct Cli {
+    /// Whether to configure OTLP. Set --otlp=false to disable.
+    #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
+    otlp: bool,
+
+    /// A global log level to use when printing logs.
+    /// It's also possible to set `RUST_LOG` according to
+    /// `tracing_subscriber::filter::EnvFilter`, which will always have
+    /// priority.
+    #[arg(long)]
+    log_level: Option<Level>,
+
+    #[command(subcommand)]
+    command: Commands,
+}
+
+#[derive(Subcommand)]
+enum Commands {
+    /// Runs the tvix-store daemon.
+    Daemon {
+        #[arg(long, short = 'l')]
+        listen_address: Option<String>,
+
+        #[arg(
+            long,
+            env,
+            default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
+        )]
+        blob_service_addr: String,
+
+        #[arg(
+            long,
+            env,
+            default_value = "sled:///var/lib/tvix-store/directories.sled"
+        )]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
+        path_info_service_addr: String,
+    },
+    /// Imports a list of paths into the store, print the store path for each of them.
+    Import {
+        #[clap(value_name = "PATH")]
+        paths: Vec<PathBuf>,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+    },
+
+    /// Copies a list of store paths on the system into tvix-store.
+    Copy {
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+
+        /// A path pointing to a JSON file produced by the Nix
+        /// `__structuredAttrs` containing reference graph information provided
+        /// by the `exportReferencesGraph` feature.
+        ///
+        /// This can be used to invoke tvix-store inside a Nix derivation
+        /// copying to a Tvix store (or outside, if the JSON file is copied
+        /// out).
+        ///
+        /// Currently limited to the `closure` key inside that JSON file.
+        #[arg(value_name = "NIX_ATTRS_JSON_FILE", env = "NIX_ATTRS_JSON_FILE")]
+        reference_graph_path: PathBuf,
+    },
+    /// Mounts a tvix-store at the given mountpoint
+    #[cfg(feature = "fuse")]
+    Mount {
+        #[clap(value_name = "PATH")]
+        dest: PathBuf,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+
+        /// Number of FUSE threads to spawn.
+        #[arg(long, env, default_value_t = default_threads())]
+        threads: usize,
+
+        #[arg(long, env, default_value_t = false)]
+        /// Whether to configure the mountpoint with allow_other.
+        /// Requires /etc/fuse.conf to contain the `user_allow_other`
+        /// option, configured via `programs.fuse.userAllowOther` on NixOS.
+        allow_other: bool,
+
+        /// Whether to list elements at the root of the mount point.
+        /// This is useful if your PathInfoService doesn't provide an
+        /// (exhaustive) listing.
+        #[clap(long, short, action)]
+        list_root: bool,
+
+        #[arg(long, default_value_t = true)]
+        /// Whether to expose blob and directory digests as extended attributes.
+        show_xattr: bool,
+    },
+    /// Starts a tvix-store virtiofs daemon at the given socket path.
+    #[cfg(feature = "virtiofs")]
+    #[command(name = "virtiofs")]
+    VirtioFs {
+        #[clap(value_name = "PATH")]
+        socket: PathBuf,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+
+        /// Whether to list elements at the root of the mount point.
+        /// This is useful if your PathInfoService doesn't provide an
+        /// (exhaustive) listing.
+        #[clap(long, short, action)]
+        list_root: bool,
+
+        #[arg(long, default_value_t = true)]
+        /// Whether to expose blob and directory digests as extended attributes.
+        show_xattr: bool,
+    },
+}
+
+#[cfg(all(feature = "fuse", not(target_os = "macos")))]
+fn default_threads() -> usize {
+    std::thread::available_parallelism()
+        .map(|threads| threads.into())
+        .unwrap_or(4)
+}
+// On MacFUSE only a single channel will receive ENODEV when the file system is
+// unmounted and so all the other channels will block forever.
+// See https://github.com/osxfuse/osxfuse/issues/974
+#[cfg(all(feature = "fuse", target_os = "macos"))]
+fn default_threads() -> usize {
+    1
+}
+
+#[tokio::main]
+#[instrument(fields(indicatif.pb_show=1))]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let cli = Cli::parse();
+
+    // configure log settings
+    let level = cli.log_level.unwrap_or(Level::INFO);
+
+    let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
+
+    // Set up the tracing subscriber.
+    let subscriber = tracing_subscriber::registry()
+        .with(
+            tracing_subscriber::fmt::Layer::new()
+                .with_writer(indicatif_layer.get_stderr_writer())
+                .compact()
+                .with_filter(
+                    EnvFilter::builder()
+                        .with_default_directive(level.into())
+                        .from_env()
+                        .expect("invalid RUST_LOG"),
+                ),
+        )
+        .with(indicatif_layer.with_filter(
+            // only show progress for spans with indicatif.pb_show field being set
+            IndicatifFilter::new(false),
+        ));
+
+    // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
+    // then init the registry.
+    // If the feature is feature-flagged out, just init without adding the layer.
+    // It's necessary to do this separately, as every with() call chains the
+    // layer into the type of the registry.
+    #[cfg(feature = "otlp")]
+    {
+        let subscriber = if cli.otlp {
+            let tracer = opentelemetry_otlp::new_pipeline()
+                .tracing()
+                .with_exporter(opentelemetry_otlp::new_exporter().tonic())
+                .with_batch_config(BatchConfig::default())
+                .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
+                    // use SdkProvidedResourceDetector.detect to detect resources,
+                    // but replace the default service name with our default.
+                    // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
+                    let resources =
+                        SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
+                    // SdkProvidedResourceDetector currently always sets
+                    // `service.name`, but we don't like its default.
+                    if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
+                        resources.merge(&Resource::new([KeyValue::new(
+                            "service.name",
+                            "tvix.store",
+                        )]))
+                    } else {
+                        resources
+                    }
+                }))
+                .install_batch(opentelemetry_sdk::runtime::Tokio)?;
+
+            // Create a tracing layer with the configured tracer
+            let layer = tracing_opentelemetry::layer().with_tracer(tracer);
+
+            subscriber.with(Some(layer))
+        } else {
+            subscriber.with(None)
+        };
+
+        subscriber.try_init()?;
+    }
+
+    // Init the registry (when otlp is not enabled)
+    #[cfg(not(feature = "otlp"))]
+    {
+        subscriber.try_init()?;
+    }
+
+    match cli.command {
+        Commands::Daemon {
+            listen_address,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+        } => {
+            // initialize stores
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            let listen_address = listen_address
+                .unwrap_or_else(|| "[::]:8000".to_string())
+                .parse()
+                .unwrap();
+
+            let mut server = Server::builder();
+
+            #[allow(unused_mut)]
+            let mut router = server
+                .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
+                    blob_service,
+                )))
+                .add_service(DirectoryServiceServer::new(
+                    GRPCDirectoryServiceWrapper::new(directory_service),
+                ))
+                .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
+                    Arc::from(path_info_service),
+                    nar_calculation_service,
+                )));
+
+            #[cfg(feature = "tonic-reflection")]
+            {
+                let reflection_svc = tonic_reflection::server::Builder::configure()
+                    .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                    .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                    .build()?;
+                router = router.add_service(reflection_svc);
+            }
+
+            info!(listen_address=%listen_address, "starting daemon");
+
+            let listener = Listener::bind(
+                &listen_address,
+                &SystemOptions::default(),
+                &UserOptions::default(),
+            )
+            .await?;
+
+            router.serve_with_incoming(listener).await?;
+        }
+        Commands::Import {
+            paths,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+        } => {
+            // FUTUREWORK: allow flat for single files?
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            // Arc PathInfoService and NarCalculationService, as we clone it .
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            let nar_calculation_service: Arc<dyn NarCalculationService> =
+                nar_calculation_service.into();
+
+            let root_span = {
+                let s = Span::current();
+                s.pb_set_style(&PB_PROGRESS_STYLE);
+                s.pb_set_message("Importing paths");
+                s.pb_set_length(paths.len() as u64);
+                s.pb_start();
+                s
+            };
+
+            let tasks = paths
+                .into_iter()
+                .map(|path| {
+                    let paths_span = root_span.clone();
+                    tokio::task::spawn({
+                        let blob_service = blob_service.clone();
+                        let directory_service = directory_service.clone();
+                        let path_info_service = path_info_service.clone();
+                        let nar_calculation_service = nar_calculation_service.clone();
+
+                        async move {
+                            if let Ok(name) = tvix_store::import::path_to_name(&path) {
+                                let resp = tvix_store::import::import_path_as_nar_ca(
+                                    &path,
+                                    name,
+                                    blob_service,
+                                    directory_service,
+                                    path_info_service,
+                                    nar_calculation_service,
+                                )
+                                .await;
+                                if let Ok(output_path) = resp {
+                                    // If the import was successful, print the path to stdout.
+                                    println!("{}", output_path.to_absolute_path());
+                                }
+                            }
+                            paths_span.pb_inc(1);
+                        }
+                    })
+                })
+                .collect::<Vec<_>>();
+
+            try_join_all(tasks).await?;
+        }
+        Commands::Copy {
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            reference_graph_path,
+        } => {
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            // Parse the file at reference_graph_path.
+            let reference_graph_json = tokio::fs::read(&reference_graph_path).await?;
+
+            #[derive(Deserialize, Serialize)]
+            struct ReferenceGraph<'a> {
+                #[serde(borrow)]
+                closure: Vec<ExportedPathInfo<'a>>,
+            }
+
+            let reference_graph: ReferenceGraph<'_> =
+                serde_json::from_slice(reference_graph_json.as_slice())?;
+
+            // Arc the PathInfoService, as we clone it .
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+
+            let lookups_span = info_span!(
+                "lookup pathinfos",
+                "indicatif.pb_show" = tracing::field::Empty
+            );
+            lookups_span.pb_set_length(reference_graph.closure.len() as u64);
+            lookups_span.pb_set_style(&PB_PROGRESS_STYLE);
+            lookups_span.pb_start();
+
+            // From our reference graph, lookup all pathinfos that might exist.
+            let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
+                .map(|elem| {
+                    let path_info_service = path_info_service.clone();
+                    async move {
+                        let resp = path_info_service
+                            .get(*elem.path.digest())
+                            .await
+                            .map(|resp| (elem, resp));
+
+                        Span::current().pb_inc(1);
+                        resp
+                    }
+                })
+                .buffer_unordered(50)
+                // Filter out all that are already uploaded.
+                // TODO: check if there's a better combinator for this
+                .try_filter_map(|(elem, path_info)| {
+                    std::future::ready(if path_info.is_none() {
+                        Ok(Some(elem))
+                    } else {
+                        Ok(None)
+                    })
+                })
+                .try_collect()
+                .await?;
+
+            // Run ingest_path on all of them.
+            let uploads: Vec<_> = futures::stream::iter(elems)
+                .map(|elem| {
+                    // Map to a future returning the root node, alongside with the closure info.
+                    let blob_service = blob_service.clone();
+                    let directory_service = directory_service.clone();
+                    async move {
+                        // Ingest the given path.
+
+                        ingest_path(
+                            blob_service,
+                            directory_service,
+                            PathBuf::from(elem.path.to_absolute_path()),
+                        )
+                        .await
+                        .map(|root_node| (elem, root_node))
+                    }
+                })
+                .buffer_unordered(10)
+                .try_collect()
+                .await?;
+
+            // Insert them into the PathInfoService.
+            // FUTUREWORK: do this properly respecting the reference graph.
+            for (elem, root_node) in uploads {
+                // Create and upload a PathInfo pointing to the root_node,
+                // annotated with information we have from the reference graph.
+                let path_info = PathInfo {
+                    node: Some(tvix_castore::proto::Node {
+                        node: Some(root_node),
+                    }),
+                    references: Vec::from_iter(
+                        elem.references.iter().map(|e| e.digest().to_vec().into()),
+                    ),
+                    narinfo: Some(NarInfo {
+                        nar_size: elem.nar_size,
+                        nar_sha256: elem.nar_sha256.to_vec().into(),
+                        signatures: vec![],
+                        reference_names: Vec::from_iter(
+                            elem.references.iter().map(|e| e.to_string()),
+                        ),
+                        deriver: None,
+                        ca: None,
+                    }),
+                };
+
+                path_info_service.put(path_info).await?;
+            }
+        }
+        #[cfg(feature = "fuse")]
+        Commands::Mount {
+            dest,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            list_root,
+            threads,
+            allow_other,
+            show_xattr,
+        } => {
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
+                let fs = make_fs(
+                    blob_service,
+                    directory_service,
+                    Arc::from(path_info_service),
+                    list_root,
+                    show_xattr,
+                );
+                info!(mount_path=?dest, "mounting");
+
+                FuseDaemon::new(fs, &dest, threads, allow_other)
+            })
+            .await??;
+
+            // grab a handle to unmount the file system, and register a signal
+            // handler.
+            tokio::spawn(async move {
+                tokio::signal::ctrl_c().await.unwrap();
+                info!("interrupt received, unmounting…");
+                tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
+                info!("unmount occured, terminating…");
+                Ok::<_, std::io::Error>(())
+            })
+            .await??;
+        }
+        #[cfg(feature = "virtiofs")]
+        Commands::VirtioFs {
+            socket,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            list_root,
+            show_xattr,
+        } => {
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            tokio::task::spawn_blocking(move || {
+                let fs = make_fs(
+                    blob_service,
+                    directory_service,
+                    Arc::from(path_info_service),
+                    list_root,
+                    show_xattr,
+                );
+                info!(socket_path=?socket, "starting virtiofs-daemon");
+
+                start_virtiofs_daemon(fs, socket)
+            })
+            .await??;
+        }
+    };
+    Ok(())
+}
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
new file mode 100644
index 0000000000..888380bca9
--- /dev/null
+++ b/tvix/store/src/import.rs
@@ -0,0 +1,183 @@
+use std::path::Path;
+use tracing::{debug, instrument};
+use tvix_castore::{
+    blobservice::BlobService, directoryservice::DirectoryService, import::fs::ingest_path,
+    proto::node::Node, B3Digest,
+};
+
+use nix_compat::{
+    nixhash::{CAHash, NixHash},
+    store_path::{self, StorePath},
+};
+
+use crate::{
+    nar::NarCalculationService,
+    pathinfoservice::PathInfoService,
+    proto::{nar_info, NarInfo, PathInfo},
+};
+
+impl From<CAHash> for nar_info::Ca {
+    fn from(value: CAHash) -> Self {
+        let hash_type: nar_info::ca::Hash = (&value).into();
+        let digest: bytes::Bytes = value.hash().to_string().into();
+        nar_info::Ca {
+            r#type: hash_type.into(),
+            digest,
+        }
+    }
+}
+
+pub fn log_node(node: &Node, path: &Path) {
+    match node {
+        Node::Directory(directory_node) => {
+            debug!(
+                path = ?path,
+                name = ?directory_node.name,
+                digest = %B3Digest::try_from(directory_node.digest.clone()).unwrap(),
+                "import successful",
+            )
+        }
+        Node::File(file_node) => {
+            debug!(
+                path = ?path,
+                name = ?file_node.name,
+                digest = %B3Digest::try_from(file_node.digest.clone()).unwrap(),
+                "import successful"
+            )
+        }
+        Node::Symlink(symlink_node) => {
+            debug!(
+                path = ?path,
+                name = ?symlink_node.name,
+                target = ?symlink_node.target,
+                "import successful"
+            )
+        }
+    }
+}
+
+/// Transform a path into its base name and returns an [`std::io::Error`] if it is `..` or if the
+/// basename is not valid unicode.
+#[inline]
+pub fn path_to_name(path: &Path) -> std::io::Result<&str> {
+    path.file_name()
+        .and_then(|file_name| file_name.to_str())
+        .ok_or_else(|| {
+            std::io::Error::new(
+                std::io::ErrorKind::InvalidInput,
+                "path must not be .. and the basename valid unicode",
+            )
+        })
+}
+
+/// Takes the NAR size, SHA-256 of the NAR representation, the root node and optionally
+/// a CA hash information.
+///
+/// Returns the path information object for a NAR-style object.
+///
+/// This [`PathInfo`] can be further filled for signatures, deriver or verified for the expected
+/// hashes.
+#[inline]
+pub fn derive_nar_ca_path_info(
+    nar_size: u64,
+    nar_sha256: [u8; 32],
+    ca: Option<CAHash>,
+    root_node: Node,
+) -> PathInfo {
+    // assemble the [crate::proto::PathInfo] object.
+    PathInfo {
+        node: Some(tvix_castore::proto::Node {
+            node: Some(root_node),
+        }),
+        // There's no reference scanning on path contents ingested like this.
+        references: vec![],
+        narinfo: Some(NarInfo {
+            nar_size,
+            nar_sha256: nar_sha256.to_vec().into(),
+            signatures: vec![],
+            reference_names: vec![],
+            deriver: None,
+            ca: ca.map(|ca_hash| ca_hash.into()),
+        }),
+    }
+}
+
+/// Ingest the given path `path` and register the resulting output path in the
+/// [`PathInfoService`] as a recursive fixed output NAR.
+#[instrument(skip_all, fields(store_name=name, path=?path), err)]
+pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>(
+    path: P,
+    name: &str,
+    blob_service: BS,
+    directory_service: DS,
+    path_info_service: PS,
+    nar_calculation_service: NS,
+) -> Result<StorePath, std::io::Error>
+where
+    P: AsRef<Path> + std::fmt::Debug,
+    BS: BlobService + Clone,
+    DS: DirectoryService,
+    PS: AsRef<dyn PathInfoService>,
+    NS: NarCalculationService,
+{
+    let root_node = ingest_path(blob_service, directory_service, path.as_ref())
+        .await
+        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
+
+    // Ask for the NAR size and sha256
+    let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?;
+
+    // Calculate the output path. This might still fail, as some names are illegal.
+    // FUTUREWORK: express the `name` at the type level to be valid and move the conversion
+    // at the caller level.
+    let output_path = store_path::build_nar_based_store_path(&nar_sha256, name).map_err(|_| {
+        std::io::Error::new(
+            std::io::ErrorKind::InvalidData,
+            format!("invalid name: {}", name),
+        )
+    })?;
+
+    // assemble a new root_node with a name that is derived from the nar hash.
+    let root_node = root_node.rename(output_path.to_string().into_bytes().into());
+    log_node(&root_node, path.as_ref());
+
+    let path_info = derive_nar_ca_path_info(
+        nar_size,
+        nar_sha256,
+        Some(CAHash::Nar(NixHash::Sha256(nar_sha256))),
+        root_node,
+    );
+
+    // This new [`PathInfo`] that we get back from there might contain additional signatures or
+    // information set by the service itself. In this function, we silently swallow it because
+    // callers doesn't really need it.
+    let _path_info = path_info_service.as_ref().put(path_info).await?;
+
+    Ok(output_path.to_owned())
+}
+
+#[cfg(test)]
+mod tests {
+    use std::{ffi::OsStr, path::PathBuf};
+
+    use crate::import::path_to_name;
+    use rstest::rstest;
+
+    #[rstest]
+    #[case::simple_path("a/b/c", "c")]
+    #[case::simple_path_containing_dotdot("a/b/../c", "c")]
+    #[case::path_containing_multiple_dotdot("a/b/../c/d/../e", "e")]
+
+    fn test_path_to_name(#[case] path: &str, #[case] expected_name: &str) {
+        let path: PathBuf = path.into();
+        assert_eq!(path_to_name(&path).expect("must succeed"), expected_name);
+    }
+
+    #[rstest]
+    #[case::path_ending_in_dotdot(b"a/b/..")]
+    #[case::non_unicode_path(b"\xf8\xa1\xa1\xa1\xa1")]
+    fn test_invalid_path_to_name(#[case] invalid_path: &[u8]) {
+        let path: PathBuf = unsafe { OsStr::from_encoded_bytes_unchecked(invalid_path) }.into();
+        path_to_name(&path).expect_err("must fail");
+    }
+}
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
new file mode 100644
index 0000000000..8c32aaf885
--- /dev/null
+++ b/tvix/store/src/lib.rs
@@ -0,0 +1,14 @@
+pub mod import;
+pub mod nar;
+pub mod pathinfoservice;
+pub mod proto;
+pub mod utils;
+
+#[cfg(test)]
+mod tests;
+
+// That's what the rstest_reuse README asks us do, and fails about being unable
+// to find rstest_reuse in crate root.
+#[cfg(test)]
+#[allow(clippy::single_component_path_imports)]
+use rstest_reuse;
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
new file mode 100644
index 0000000000..36122d419d
--- /dev/null
+++ b/tvix/store/src/nar/import.rs
@@ -0,0 +1,237 @@
+use nix_compat::nar::reader::r#async as nar_reader;
+use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
+use tvix_castore::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    import::{
+        blobs::{self, ConcurrentBlobUploader},
+        ingest_entries, IngestionEntry, IngestionError,
+    },
+    proto::{node::Node, NamedNode},
+    PathBuf,
+};
+
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// It returns the castore root node or an error.
+pub async fn ingest_nar<R, BS, DS>(
+    blob_service: BS,
+    directory_service: DS,
+    r: &mut R,
+) -> Result<Node, IngestionError<Error>>
+where
+    R: AsyncBufRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
+{
+    // open the NAR for reading.
+    // The NAR reader emits nodes in DFS preorder.
+    let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
+
+    let (tx, rx) = mpsc::channel(1);
+    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
+
+    let produce = async move {
+        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
+
+        let res = produce_nar_inner(
+            &mut blob_uploader,
+            root_node,
+            "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
+            tx.clone(),
+        )
+        .await;
+
+        if let Err(err) = blob_uploader.join().await {
+            tx.send(Err(err.into()))
+                .await
+                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        }
+
+        tx.send(res)
+            .await
+            .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+
+        Ok(())
+    };
+
+    let consume = ingest_entries(directory_service, rx);
+
+    let (_, node) = try_join!(produce, consume)?;
+
+    // remove the fake "root" name again
+    debug_assert_eq!(&node.get_name(), b"root");
+    Ok(node.rename("".into()))
+}
+
+async fn produce_nar_inner<BS>(
+    blob_uploader: &mut ConcurrentBlobUploader<BS>,
+    node: nar_reader::Node<'_, '_>,
+    path: PathBuf,
+    tx: mpsc::Sender<Result<IngestionEntry, Error>>,
+) -> Result<IngestionEntry, Error>
+where
+    BS: BlobService + Clone + 'static,
+{
+    Ok(match node {
+        nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
+        nar_reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            let size = reader.len();
+            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
+
+            IngestionEntry::Regular {
+                path,
+                size,
+                executable,
+                digest,
+            }
+        }
+        nar_reader::Node::Directory(mut dir_reader) => {
+            while let Some(entry) = dir_reader.next().await? {
+                let mut path = path.clone();
+
+                // valid NAR names are valid castore names
+                path.try_push(entry.name)
+                    .expect("Tvix bug: failed to join name");
+
+                let entry = Box::pin(produce_nar_inner(
+                    blob_uploader,
+                    entry.node,
+                    path,
+                    tx.clone(),
+                ))
+                .await?;
+
+                tx.send(Ok(entry)).await.map_err(|e| {
+                    Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
+                })?;
+            }
+
+            IngestionEntry::Dir { path }
+        }
+    })
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error(transparent)]
+    IO(#[from] std::io::Error),
+
+    #[error(transparent)]
+    BlobUpload(#[from] blobs::Error),
+}
+
+#[cfg(test)]
+mod test {
+    use crate::nar::ingest_nar;
+    use std::io::Cursor;
+    use std::sync::Arc;
+
+    use rstest::*;
+    use tokio_stream::StreamExt;
+    use tvix_castore::blobservice::BlobService;
+    use tvix_castore::directoryservice::DirectoryService;
+    use tvix_castore::fixtures::{
+        DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS,
+        HELLOWORLD_BLOB_DIGEST,
+    };
+    use tvix_castore::proto as castorepb;
+
+    use crate::tests::fixtures::{
+        blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD,
+        NAR_CONTENTS_SYMLINK,
+    };
+
+    #[rstest]
+    #[tokio::test]
+    async fn single_symlink(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
+        )
+        .await
+        .expect("must parse");
+
+        assert_eq!(
+            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+                name: "".into(), // name must be empty
+                target: "/nix/store/somewhereelse".into(),
+            }),
+            root_node
+        );
+    }
+
+    #[rstest]
+    #[tokio::test]
+    async fn single_file(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
+        )
+        .await
+        .expect("must parse");
+
+        assert_eq!(
+            castorepb::node::Node::File(castorepb::FileNode {
+                name: "".into(), // name must be empty
+                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+                executable: false,
+            }),
+            root_node
+        );
+
+        // blobservice must contain the blob
+        assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap());
+    }
+
+    #[rstest]
+    #[tokio::test]
+    async fn complicated(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
+        )
+        .await
+        .expect("must parse");
+
+        assert_eq!(
+            castorepb::node::Node::Directory(castorepb::DirectoryNode {
+                name: "".into(), // name must be empty
+                digest: DIRECTORY_COMPLICATED.digest().into(),
+                size: DIRECTORY_COMPLICATED.size(),
+            }),
+            root_node,
+        );
+
+        // blobservice must contain the blob
+        assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
+
+        // directoryservice must contain the directories, at least with get_recursive.
+        let resp: Result<Vec<castorepb::Directory>, _> = directory_service
+            .get_recursive(&DIRECTORY_COMPLICATED.digest())
+            .collect()
+            .await;
+
+        let directories = resp.unwrap();
+
+        assert_eq!(2, directories.len());
+        assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
+        assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
+    }
+}
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
new file mode 100644
index 0000000000..164748a655
--- /dev/null
+++ b/tvix/store/src/nar/mod.rs
@@ -0,0 +1,52 @@
+use tonic::async_trait;
+use tvix_castore::B3Digest;
+
+mod import;
+mod renderer;
+pub use import::ingest_nar;
+pub use renderer::calculate_size_and_sha256;
+pub use renderer::write_nar;
+pub use renderer::SimpleRenderer;
+use tvix_castore::proto as castorepb;
+
+#[async_trait]
+pub trait NarCalculationService: Send + Sync {
+    /// Return the nar size and nar sha256 digest for a given root node.
+    /// This can be used to calculate NAR-based output paths.
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error>;
+}
+
+#[async_trait]
+impl<A> NarCalculationService for A
+where
+    A: AsRef<dyn NarCalculationService> + Send + Sync,
+{
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        self.as_ref().calculate_nar(root_node).await
+    }
+}
+
+/// Errors that can encounter while rendering NARs.
+#[derive(Debug, thiserror::Error)]
+pub enum RenderError {
+    #[error("failure talking to a backing store client: {0}")]
+    StoreError(#[source] std::io::Error),
+
+    #[error("unable to find directory {}, referred from {:?}", .0, .1)]
+    DirectoryNotFound(B3Digest, bytes::Bytes),
+
+    #[error("unable to find blob {}, referred from {:?}", .0, .1)]
+    BlobNotFound(B3Digest, bytes::Bytes),
+
+    #[error("unexpected size in metadata for blob {}, referred from {:?} returned, expected {}, got {}", .0, .1, .2, .3)]
+    UnexpectedBlobMeta(B3Digest, bytes::Bytes, u32, u32),
+
+    #[error("failure using the NAR writer: {0}")]
+    NARWriterError(std::io::Error),
+}
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
new file mode 100644
index 0000000000..efd67671db
--- /dev/null
+++ b/tvix/store/src/nar/renderer.rs
@@ -0,0 +1,222 @@
+use crate::utils::AsyncIoBridge;
+
+use super::{NarCalculationService, RenderError};
+use count_write::CountWrite;
+use nix_compat::nar::writer::r#async as nar_writer;
+use sha2::{Digest, Sha256};
+use tokio::io::{self, AsyncWrite, BufReader};
+use tonic::async_trait;
+use tvix_castore::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    proto::{self as castorepb, NamedNode},
+};
+
+pub struct SimpleRenderer<BS, DS> {
+    blob_service: BS,
+    directory_service: DS,
+}
+
+impl<BS, DS> SimpleRenderer<BS, DS> {
+    pub fn new(blob_service: BS, directory_service: DS) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+        }
+    }
+}
+
+#[async_trait]
+impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS>
+where
+    BS: BlobService + Clone,
+    DS: DirectoryService + Clone,
+{
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        calculate_size_and_sha256(
+            root_node,
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+        )
+        .await
+        .map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e)))
+    }
+}
+
+/// Invoke [write_nar], and return the size and sha256 digest of the produced
+/// NAR output.
+pub async fn calculate_size_and_sha256<BS, DS>(
+    root_node: &castorepb::node::Node,
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(u64, [u8; 32]), RenderError>
+where
+    BS: BlobService + Send,
+    DS: DirectoryService + Send,
+{
+    let mut h = Sha256::new();
+    let mut cw = CountWrite::from(&mut h);
+
+    write_nar(
+        // The hasher doesn't speak async. It doesn't
+        // actually do any I/O, so it's fine to wrap.
+        AsyncIoBridge(&mut cw),
+        root_node,
+        blob_service,
+        directory_service,
+    )
+    .await?;
+
+    Ok((cw.count(), h.finalize().into()))
+}
+
+/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
+/// and uses the passed blob_service and directory_service to perform the
+/// necessary lookups as it traverses the structure.
+/// The contents in NAR serialization are writen to the passed [AsyncWrite].
+pub async fn write_nar<W, BS, DS>(
+    mut w: W,
+    proto_root_node: &castorepb::node::Node,
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(), RenderError>
+where
+    W: AsyncWrite + Unpin + Send,
+    BS: BlobService + Send,
+    DS: DirectoryService + Send,
+{
+    // Initialize NAR writer
+    let nar_root_node = nar_writer::open(&mut w)
+        .await
+        .map_err(RenderError::NARWriterError)?;
+
+    walk_node(
+        nar_root_node,
+        proto_root_node,
+        blob_service,
+        directory_service,
+    )
+    .await?;
+
+    Ok(())
+}
+
+/// Process an intermediate node in the structure.
+/// This consumes the node.
+async fn walk_node<BS, DS>(
+    nar_node: nar_writer::Node<'_, '_>,
+    proto_node: &castorepb::node::Node,
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(BS, DS), RenderError>
+where
+    BS: BlobService + Send,
+    DS: DirectoryService + Send,
+{
+    match proto_node {
+        castorepb::node::Node::Symlink(proto_symlink_node) => {
+            nar_node
+                .symlink(&proto_symlink_node.target)
+                .await
+                .map_err(RenderError::NARWriterError)?;
+        }
+        castorepb::node::Node::File(proto_file_node) => {
+            let digest_len = proto_file_node.digest.len();
+            let digest = proto_file_node.digest.clone().try_into().map_err(|_| {
+                RenderError::StoreError(io::Error::new(
+                    io::ErrorKind::Other,
+                    format!("invalid digest len {} in file node", digest_len),
+                ))
+            })?;
+
+            let mut blob_reader = match blob_service
+                .open_read(&digest)
+                .await
+                .map_err(RenderError::StoreError)?
+            {
+                Some(blob_reader) => Ok(BufReader::new(blob_reader)),
+                None => Err(RenderError::NARWriterError(io::Error::new(
+                    io::ErrorKind::NotFound,
+                    format!("blob with digest {} not found", &digest),
+                ))),
+            }?;
+
+            nar_node
+                .file(
+                    proto_file_node.executable,
+                    proto_file_node.size,
+                    &mut blob_reader,
+                )
+                .await
+                .map_err(RenderError::NARWriterError)?;
+        }
+        castorepb::node::Node::Directory(proto_directory_node) => {
+            let digest_len = proto_directory_node.digest.len();
+            let digest = proto_directory_node
+                .digest
+                .clone()
+                .try_into()
+                .map_err(|_| {
+                    RenderError::StoreError(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!("invalid digest len {} in directory node", digest_len),
+                    ))
+                })?;
+
+            // look it up with the directory service
+            match directory_service
+                .get(&digest)
+                .await
+                .map_err(|e| RenderError::StoreError(e.into()))?
+            {
+                // if it's None, that's an error!
+                None => Err(RenderError::DirectoryNotFound(
+                    digest,
+                    proto_directory_node.name.clone(),
+                ))?,
+                Some(proto_directory) => {
+                    // start a directory node
+                    let mut nar_node_directory = nar_node
+                        .directory()
+                        .await
+                        .map_err(RenderError::NARWriterError)?;
+
+                    // We put blob_service, directory_service back here whenever we come up from
+                    // the recursion.
+                    let mut blob_service = blob_service;
+                    let mut directory_service = directory_service;
+
+                    // for each node in the directory, create a new entry with its name,
+                    // and then recurse on that entry.
+                    for proto_node in proto_directory.nodes() {
+                        let child_node = nar_node_directory
+                            .entry(proto_node.get_name())
+                            .await
+                            .map_err(RenderError::NARWriterError)?;
+
+                        (blob_service, directory_service) = Box::pin(walk_node(
+                            child_node,
+                            &proto_node,
+                            blob_service,
+                            directory_service,
+                        ))
+                        .await?;
+                    }
+
+                    // close the directory
+                    nar_node_directory
+                        .close()
+                        .await
+                        .map_err(RenderError::NARWriterError)?;
+
+                    return Ok((blob_service, directory_service));
+                }
+            }
+        }
+    }
+
+    Ok((blob_service, directory_service))
+}
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs
new file mode 100644
index 0000000000..707a686c0a
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/bigtable.rs
@@ -0,0 +1,412 @@
+use super::PathInfoService;
+use crate::proto;
+use crate::proto::PathInfo;
+use async_stream::try_stream;
+use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
+use bytes::Bytes;
+use data_encoding::HEXLOWER;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use prost::Message;
+use serde::{Deserialize, Serialize};
+use serde_with::{serde_as, DurationSeconds};
+use tonic::async_trait;
+use tracing::{instrument, trace};
+use tvix_castore::Error;
+
+/// There should not be more than 10 MiB in a single cell.
+/// https://cloud.google.com/bigtable/docs/schema-design#cells
+const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
+
+/// Provides a [DirectoryService] implementation using
+/// [Bigtable](https://cloud.google.com/bigtable/docs/)
+/// as an underlying K/V store.
+///
+/// # Data format
+/// We use Bigtable as a plain K/V store.
+/// The row key is the digest of the store path, in hexlower.
+/// Inside the row, we currently have a single column/cell, again using the
+/// hexlower store path digest.
+/// Its value is the PathInfo message, serialized in canonical protobuf.
+/// We currently only populate this column.
+///
+/// Listing is ranging over all rows, and calculate_nar is returning a
+/// "unimplemented" error.
+#[derive(Clone)]
+pub struct BigtablePathInfoService {
+    client: bigtable::BigTable,
+    params: BigtableParameters,
+
+    #[cfg(test)]
+    #[allow(dead_code)]
+    /// Holds the temporary directory containing the unix socket, and the
+    /// spawned emulator process.
+    emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
+}
+
+/// Represents configuration of [BigtablePathInfoService].
+/// This currently conflates both connect parameters and data model/client
+/// behaviour parameters.
+#[serde_as]
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+pub struct BigtableParameters {
+    project_id: String,
+    instance_name: String,
+    #[serde(default)]
+    is_read_only: bool,
+    #[serde(default = "default_channel_size")]
+    channel_size: usize,
+
+    #[serde_as(as = "Option<DurationSeconds<String>>")]
+    #[serde(default = "default_timeout")]
+    timeout: Option<std::time::Duration>,
+    table_name: String,
+    family_name: String,
+
+    #[serde(default = "default_app_profile_id")]
+    app_profile_id: String,
+}
+
+impl BigtableParameters {
+    #[cfg(test)]
+    pub fn default_for_tests() -> Self {
+        Self {
+            project_id: "project-1".into(),
+            instance_name: "instance-1".into(),
+            is_read_only: false,
+            channel_size: default_channel_size(),
+            timeout: default_timeout(),
+            table_name: "table-1".into(),
+            family_name: "cf1".into(),
+            app_profile_id: default_app_profile_id(),
+        }
+    }
+}
+
+fn default_app_profile_id() -> String {
+    "default".to_owned()
+}
+
+fn default_channel_size() -> usize {
+    4
+}
+
+fn default_timeout() -> Option<std::time::Duration> {
+    Some(std::time::Duration::from_secs(4))
+}
+
+impl BigtablePathInfoService {
+    #[cfg(not(test))]
+    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+        let connection = bigtable::BigTableConnection::new(
+            &params.project_id,
+            &params.instance_name,
+            params.is_read_only,
+            params.channel_size,
+            params.timeout,
+        )
+        .await?;
+
+        Ok(Self {
+            client: connection.client(),
+            params,
+        })
+    }
+
+    #[cfg(test)]
+    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+        use std::time::Duration;
+
+        use async_process::{Command, Stdio};
+        use tempfile::TempDir;
+        use tokio_retry::{strategy::ExponentialBackoff, Retry};
+
+        let tmpdir = TempDir::new().unwrap();
+
+        let socket_path = tmpdir.path().join("cbtemulator.sock");
+
+        let emulator_process = Command::new("cbtemulator")
+            .arg("-address")
+            .arg(socket_path.clone())
+            .stderr(Stdio::piped())
+            .stdout(Stdio::piped())
+            .kill_on_drop(true)
+            .spawn()
+            .expect("failed to spawn emulator");
+
+        Retry::spawn(
+            ExponentialBackoff::from_millis(20)
+                .max_delay(Duration::from_secs(1))
+                .take(3),
+            || async {
+                if socket_path.exists() {
+                    Ok(())
+                } else {
+                    Err(())
+                }
+            },
+        )
+        .await
+        .expect("failed to wait for socket");
+
+        // populate the emulator
+        for cmd in &[
+            vec!["createtable", &params.table_name],
+            vec!["createfamily", &params.table_name, &params.family_name],
+        ] {
+            Command::new("cbt")
+                .args({
+                    let mut args = vec![
+                        "-instance",
+                        &params.instance_name,
+                        "-project",
+                        &params.project_id,
+                    ];
+                    args.extend_from_slice(cmd);
+                    args
+                })
+                .env(
+                    "BIGTABLE_EMULATOR_HOST",
+                    format!("unix://{}", socket_path.to_string_lossy()),
+                )
+                .output()
+                .await
+                .expect("failed to run cbt setup command");
+        }
+
+        let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
+            &format!("unix://{}", socket_path.to_string_lossy()),
+            &params.project_id,
+            &params.instance_name,
+            false,
+            None,
+        )?;
+
+        Ok(Self {
+            client: connection.client(),
+            params,
+            emulator: (tmpdir, emulator_process).into(),
+        })
+    }
+}
+
+/// Derives the row/column key for a given output path.
+/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
+fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
+    HEXLOWER.encode(digest)
+}
+
+#[async_trait]
+impl PathInfoService for BigtablePathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let mut client = self.client.clone();
+        let path_info_key = derive_pathinfo_key(&digest);
+
+        let request = bigtable_v2::ReadRowsRequest {
+            app_profile_id: self.params.app_profile_id.to_string(),
+            table_name: client.get_full_table_name(&self.params.table_name),
+            rows_limit: 1,
+            rows: Some(bigtable_v2::RowSet {
+                row_keys: vec![path_info_key.clone().into()],
+                row_ranges: vec![],
+            }),
+            // Filter selected family name, and column qualifier matching the digest.
+            // The latter is to ensure we don't fail once we start adding more metadata.
+            filter: Some(bigtable_v2::RowFilter {
+                filter: Some(bigtable_v2::row_filter::Filter::Chain(
+                    bigtable_v2::row_filter::Chain {
+                        filters: vec![
+                            bigtable_v2::RowFilter {
+                                filter: Some(
+                                    bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
+                                        self.params.family_name.to_string(),
+                                    ),
+                                ),
+                            },
+                            bigtable_v2::RowFilter {
+                                filter: Some(
+                                    bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
+                                        path_info_key.clone().into(),
+                                    ),
+                                ),
+                            },
+                        ],
+                    },
+                )),
+            }),
+            ..Default::default()
+        };
+
+        let mut response = client
+            .read_rows(request)
+            .await
+            .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
+
+        if response.len() != 1 {
+            if response.len() > 1 {
+                // This shouldn't happen, we limit number of rows to 1
+                return Err(Error::StorageError(
+                    "got more than one row from bigtable".into(),
+                ));
+            }
+            // else, this is simply a "not found".
+            return Ok(None);
+        }
+
+        let (row_key, mut cells) = response.pop().unwrap();
+        if row_key != path_info_key.as_bytes() {
+            // This shouldn't happen, we requested this row key.
+            return Err(Error::StorageError(
+                "got wrong row key from bigtable".into(),
+            ));
+        }
+
+        let cell = cells
+            .pop()
+            .ok_or_else(|| Error::StorageError("found no cells".into()))?;
+
+        // Ensure there's only one cell (so no more left after the pop())
+        // This shouldn't happen, We filter out other cells in our query.
+        if !cells.is_empty() {
+            return Err(Error::StorageError(
+                "more than one cell returned from bigtable".into(),
+            ));
+        }
+
+        // We also require the qualifier to be correct in the filter above,
+        // so this shouldn't happen.
+        if path_info_key.as_bytes() != cell.qualifier {
+            return Err(Error::StorageError("unexpected cell qualifier".into()));
+        }
+
+        // Try to parse the value into a PathInfo message
+        let path_info = proto::PathInfo::decode(Bytes::from(cell.value))
+            .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
+
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?;
+
+        if store_path.digest() != &digest {
+            return Err(Error::StorageError("PathInfo has unexpected digest".into()));
+        }
+
+        Ok(Some(path_info))
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("pathinfo failed validation: {}", e)))?;
+
+        let mut client = self.client.clone();
+        let path_info_key = derive_pathinfo_key(store_path.digest());
+
+        let data = path_info.encode_to_vec();
+        if data.len() as u64 > CELL_SIZE_LIMIT {
+            return Err(Error::StorageError(
+                "PathInfo exceeds cell limit on Bigtable".into(),
+            ));
+        }
+
+        let resp = client
+            .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
+                table_name: client.get_full_table_name(&self.params.table_name),
+                app_profile_id: self.params.app_profile_id.to_string(),
+                row_key: path_info_key.clone().into(),
+                predicate_filter: Some(bigtable_v2::RowFilter {
+                    filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
+                        path_info_key.clone().into(),
+                    )),
+                }),
+                // If the column was already found, do nothing.
+                true_mutations: vec![],
+                // Else, do the insert.
+                false_mutations: vec![
+                    // https://cloud.google.com/bigtable/docs/writes
+                    bigtable_v2::Mutation {
+                        mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
+                            bigtable_v2::mutation::SetCell {
+                                family_name: self.params.family_name.to_string(),
+                                column_qualifier: path_info_key.clone().into(),
+                                timestamp_micros: -1, // use server time to fill timestamp
+                                value: data,
+                            },
+                        )),
+                    },
+                ],
+            })
+            .await
+            .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?;
+
+        if resp.predicate_matched {
+            trace!("already existed")
+        }
+
+        Ok(path_info)
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let mut client = self.client.clone();
+
+        let request = bigtable_v2::ReadRowsRequest {
+            app_profile_id: self.params.app_profile_id.to_string(),
+            table_name: client.get_full_table_name(&self.params.table_name),
+            filter: Some(bigtable_v2::RowFilter {
+                filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
+                    self.params.family_name.to_string(),
+                )),
+            }),
+            ..Default::default()
+        };
+
+        let stream = try_stream! {
+            // TODO: add pagination, we don't want to hold all of this in memory.
+            let response = client
+                .read_rows(request)
+                .await
+                .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
+
+            for (row_key, mut cells) in response {
+                let cell = cells
+                    .pop()
+                    .ok_or_else(|| Error::StorageError("found no cells".into()))?;
+
+                // The cell must have the same qualifier as the row key
+                if row_key != cell.qualifier {
+                    Err(Error::StorageError("unexpected cell qualifier".into()))?;
+                }
+
+                // Ensure there's only one cell (so no more left after the pop())
+                // This shouldn't happen, We filter out other cells in our query.
+                if !cells.is_empty() {
+
+                    Err(Error::StorageError(
+                        "more than one cell returned from bigtable".into(),
+                    ))?
+                }
+
+                // Try to parse the value into a PathInfo message.
+                let path_info = proto::PathInfo::decode(Bytes::from(cell.value))
+                    .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
+
+                // Validate the containing PathInfo, ensure its StorePath digest
+                // matches row key.
+                let store_path = path_info
+                    .validate()
+                    .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?;
+
+                if store_path.digest().as_slice() != row_key.as_slice() {
+                    Err(Error::StorageError("PathInfo has unexpected digest".into()))?
+                }
+
+
+                yield path_info
+            }
+        };
+
+        Box::pin(stream)
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs
new file mode 100644
index 0000000000..664144ef49
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/combinators.rs
@@ -0,0 +1,111 @@
+use crate::proto::PathInfo;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use tonic::async_trait;
+use tracing::{debug, instrument};
+use tvix_castore::Error;
+
+use super::PathInfoService;
+
+/// Asks near first, if not found, asks far.
+/// If found in there, returns it, and *inserts* it into
+/// near.
+/// There is no negative cache.
+/// Inserts and listings are not implemented for now.
+pub struct Cache<PS1, PS2> {
+    near: PS1,
+    far: PS2,
+}
+
+impl<PS1, PS2> Cache<PS1, PS2> {
+    pub fn new(near: PS1, far: PS2) -> Self {
+        Self { near, far }
+    }
+}
+
+#[async_trait]
+impl<PS1, PS2> PathInfoService for Cache<PS1, PS2>
+where
+    PS1: PathInfoService,
+    PS2: PathInfoService,
+{
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        match self.near.get(digest).await? {
+            Some(path_info) => {
+                debug!("serving from cache");
+                Ok(Some(path_info))
+            }
+            None => {
+                debug!("not found in near, asking remote…");
+                match self.far.get(digest).await? {
+                    None => Ok(None),
+                    Some(path_info) => {
+                        debug!("found in remote, adding to cache");
+                        self.near.put(path_info.clone()).await?;
+                        Ok(Some(path_info))
+                    }
+                }
+            }
+        }
+    }
+
+    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
+        Err(Error::StorageError("unimplemented".to_string()))
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        Box::pin(tokio_stream::once(Err(Error::StorageError(
+            "unimplemented".to_string(),
+        ))))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::num::NonZeroUsize;
+
+    use crate::{
+        pathinfoservice::{LruPathInfoService, MemoryPathInfoService, PathInfoService},
+        tests::fixtures::PATH_INFO_WITH_NARINFO,
+    };
+
+    const PATH_INFO_DIGEST: [u8; 20] = [0; 20];
+
+    /// Helper function setting up an instance of a "far" and "near"
+    /// PathInfoService.
+    async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, MemoryPathInfoService> {
+        // Create an instance of a "far" PathInfoService.
+        let far = MemoryPathInfoService::default();
+
+        // … and an instance of a "near" PathInfoService.
+        let near = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap());
+
+        // create a Pathinfoservice combining the two and return it.
+        super::Cache::new(near, far)
+    }
+
+    /// Getting from the far backend is gonna insert it into the near one.
+    #[tokio::test]
+    async fn test_populate_cache() {
+        let svc = create_pathinfoservice().await;
+
+        // query the PathInfo, things should not be there.
+        assert!(svc.get(PATH_INFO_DIGEST).await.unwrap().is_none());
+
+        // insert it into the far one.
+        svc.far.put(PATH_INFO_WITH_NARINFO.clone()).await.unwrap();
+
+        // now try getting it again, it should succeed.
+        assert_eq!(
+            Some(PATH_INFO_WITH_NARINFO.clone()),
+            svc.get(PATH_INFO_DIGEST).await.unwrap()
+        );
+
+        // peek near, it should now be there.
+        assert_eq!(
+            Some(PATH_INFO_WITH_NARINFO.clone()),
+            svc.near.get(PATH_INFO_DIGEST).await.unwrap()
+        );
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
new file mode 100644
index 0000000000..455909e7f2
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -0,0 +1,236 @@
+use crate::proto::path_info_service_client::PathInfoServiceClient;
+
+use super::{
+    GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService,
+    SledPathInfoService,
+};
+
+use nix_compat::narinfo;
+use std::sync::Arc;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+use url::Url;
+
+/// Constructs a new instance of a [PathInfoService] from an URI.
+///
+/// The following URIs are supported:
+/// - `memory:`
+///   Uses a in-memory implementation.
+/// - `sled:`
+///   Uses a in-memory sled implementation.
+/// - `sled:///absolute/path/to/somewhere`
+///   Uses sled, using a path on the disk for persistency. Can be only opened
+///   from one process at the same time.
+/// - `nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=`
+///   Exposes the Nix binary cache as a PathInfoService, ingesting NARs into the
+///   {Blob,Directory}Service. You almost certainly want to use this with some cache.
+///   The `trusted-public-keys` URL parameter can be provided, which will then
+///   enable signature verification.
+/// - `grpc+unix:///absolute/path/to/somewhere`
+///   Connects to a local tvix-store gRPC service via Unix socket.
+/// - `grpc+http://host:port`, `grpc+https://host:port`
+///    Connects to a (remote) tvix-store gRPC service.
+///
+/// As the [PathInfoService] needs to talk to [BlobService] and [DirectoryService],
+/// these also need to be passed in.
+pub async fn from_addr(
+    uri: &str,
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) -> Result<Box<dyn PathInfoService>, Error> {
+    #[allow(unused_mut)]
+    let mut url =
+        Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
+
+    let path_info_service: Box<dyn PathInfoService> = match url.scheme() {
+        "memory" => {
+            // memory doesn't support host or path in the URL.
+            if url.has_host() || !url.path().is_empty() {
+                return Err(Error::StorageError("invalid url".to_string()));
+            }
+            Box::<MemoryPathInfoService>::default()
+        }
+        "sled" => {
+            // sled doesn't support host, and a path can be provided (otherwise
+            // it'll live in memory only).
+            if url.has_host() {
+                return Err(Error::StorageError("no host allowed".to_string()));
+            }
+
+            if url.path() == "/" {
+                return Err(Error::StorageError(
+                    "cowardly refusing to open / with sled".to_string(),
+                ));
+            }
+
+            // TODO: expose other parameters as URL parameters?
+
+            Box::new(if url.path().is_empty() {
+                SledPathInfoService::new_temporary()
+                    .map_err(|e| Error::StorageError(e.to_string()))?
+            } else {
+                SledPathInfoService::new(url.path())
+                    .map_err(|e| Error::StorageError(e.to_string()))?
+            })
+        }
+        "nix+http" | "nix+https" => {
+            // Stringify the URL and remove the nix+ prefix.
+            // We can't use `url.set_scheme(rest)`, as it disallows
+            // setting something http(s) that previously wasn't.
+            let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap();
+
+            let mut nix_http_path_info_service =
+                NixHTTPPathInfoService::new(new_url, blob_service, directory_service);
+
+            let pairs = &url.query_pairs();
+            for (k, v) in pairs.into_iter() {
+                if k == "trusted-public-keys" {
+                    let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect();
+
+                    let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len());
+                    for pubkey_str in pubkey_strs {
+                        pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| {
+                            Error::StorageError(format!("invalid public key: {e}"))
+                        })?);
+                    }
+
+                    nix_http_path_info_service.set_public_keys(pubkeys);
+                }
+            }
+
+            Box::new(nix_http_path_info_service)
+        }
+        scheme if scheme.starts_with("grpc+") => {
+            // schemes starting with grpc+ go to the GRPCPathInfoService.
+            //   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+            // - In the case of unix sockets, there must be a path, but may not be a host.
+            // - In the case of non-unix sockets, there must be a host, but no path.
+            // Constructing the channel is handled by tvix_castore::channel::from_url.
+            let client =
+                PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?);
+            Box::new(GRPCPathInfoService::from_client(client))
+        }
+        #[cfg(feature = "cloud")]
+        "bigtable" => {
+            use super::bigtable::BigtableParameters;
+            use super::BigtablePathInfoService;
+
+            // parse the instance name from the hostname.
+            let instance_name = url
+                .host_str()
+                .ok_or_else(|| Error::StorageError("instance name missing".into()))?
+                .to_string();
+
+            // … but add it to the query string now, so we just need to parse that.
+            url.query_pairs_mut()
+                .append_pair("instance_name", &instance_name);
+
+            let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
+                .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
+
+            Box::new(
+                BigtablePathInfoService::connect(params)
+                    .await
+                    .map_err(|e| Error::StorageError(e.to_string()))?,
+            )
+        }
+        _ => Err(Error::StorageError(format!(
+            "unknown scheme: {}",
+            url.scheme()
+        )))?,
+    };
+
+    Ok(path_info_service)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::from_addr;
+    use lazy_static::lazy_static;
+    use rstest::rstest;
+    use std::sync::Arc;
+    use tempfile::TempDir;
+    use tvix_castore::{
+        blobservice::{BlobService, MemoryBlobService},
+        directoryservice::{DirectoryService, MemoryDirectoryService},
+    };
+
+    lazy_static! {
+        static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
+        static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap();
+    }
+
+    // the gRPC tests below don't fail, because we connect lazily.
+
+    #[rstest]
+    /// This uses a unsupported scheme.
+    #[case::unsupported_scheme("http://foo.example/test", false)]
+    /// This configures sled in temporary mode.
+    #[case::sled_temporary("sled://", true)]
+    /// This configures sled with /, which should fail.
+    #[case::sled_invalid_root("sled:///", false)]
+    /// This configures sled with a host, not path, which should fail.
+    #[case::sled_invalid_host("sled://foo.example", false)]
+    /// This configures sled with a valid path path, which should succeed.
+    #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)]
+    /// This configures sled with a host, and a valid path path, which should fail.
+    #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)]
+    /// This correctly sets the scheme, and doesn't set a path.
+    #[case::memory_valid("memory://", true)]
+    /// This sets a memory url host to `foo`
+    #[case::memory_invalid_host("memory://foo", false)]
+    /// This sets a memory url path to "/", which is invalid.
+    #[case::memory_invalid_root_path("memory:///", false)]
+    /// This sets a memory url path to "/foo", which is invalid.
+    #[case::memory_invalid_root_path_foo("memory:///foo", false)]
+    /// Correct Scheme for the cache.nixos.org binary cache.
+    #[case::correct_nix_https("nix+https://cache.nixos.org", true)]
+    /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL).
+    #[case::correct_nix_http("nix+http://cache.nixos.org", true)]
+    /// Correct Scheme for Nix HTTP Binary cache, with a subpath.
+    #[case::correct_nix_http_with_subpath("nix+http://192.0.2.1/foo", true)]
+    /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port.
+    #[case::correct_nix_http_with_subpath_and_port("nix+http://[::1]:8080/foo", true)]
+    /// Correct Scheme for the cache.nixos.org binary cache, and correct trusted public key set
+    #[case::correct_nix_https_with_trusted_public_key("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true)]
+    /// Correct Scheme for the cache.nixos.org binary cache, and two correct trusted public keys set
+    #[case::correct_nix_https_with_two_trusted_public_keys("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true)]
+    /// Correct scheme to connect to a unix socket.
+    #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)]
+    /// Correct scheme for unix socket, but setting a host too, which is invalid.
+    #[case::grpc_invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)]
+    /// Correct scheme to connect to localhost, with port 12345
+    #[case::grpc_valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[case::grpc_valid_http_host_without_port("grpc+http://localhost", true)]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)]
+    /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
+    #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
+    /// A valid example for Bigtable.
+    #[cfg_attr(
+        all(feature = "cloud", feature = "integration"),
+        case::bigtable_valid(
+            "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1",
+            true
+        )
+    )]
+    /// An invalid example for Bigtable, missing fields
+    #[cfg_attr(
+        all(feature = "cloud", feature = "integration"),
+        case::bigtable_invalid_missing_fields("bigtable://instance-1", false)
+    )]
+    #[tokio::test]
+    async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
+        let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
+        let directory_service: Arc<dyn DirectoryService> =
+            Arc::from(MemoryDirectoryService::default());
+
+        let resp = from_addr(uri_str, blob_service, directory_service).await;
+
+        if exp_succeed {
+            resp.expect("should succeed");
+        } else {
+            assert!(resp.is_err(), "should fail");
+        }
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs
new file mode 100644
index 0000000000..aa64b1c01f
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/fs/mod.rs
@@ -0,0 +1,81 @@
+use futures::stream::BoxStream;
+use futures::StreamExt;
+use tonic::async_trait;
+use tvix_castore::fs::{RootNodes, TvixStoreFs};
+use tvix_castore::proto as castorepb;
+use tvix_castore::Error;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
+
+use super::PathInfoService;
+
+/// Helper to construct a [TvixStoreFs] from a [BlobService], [DirectoryService]
+/// and [PathInfoService].
+/// This avoids users to have to interact with the wrapper struct directly, as
+/// it leaks into the type signature of TvixStoreFS.
+pub fn make_fs<BS, DS, PS>(
+    blob_service: BS,
+    directory_service: DS,
+    path_info_service: PS,
+    list_root: bool,
+    show_xattr: bool,
+) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>>
+where
+    BS: AsRef<dyn BlobService> + Send + Clone + 'static,
+    DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
+    PS: AsRef<dyn PathInfoService> + Send + Sync + Clone + 'static,
+{
+    TvixStoreFs::new(
+        blob_service,
+        directory_service,
+        RootNodesWrapper(path_info_service),
+        list_root,
+        show_xattr,
+    )
+}
+
+/// Wrapper to satisfy Rust's orphan rules for trait implementations, as
+/// RootNodes is coming from the [tvix-castore] crate.
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct RootNodesWrapper<T>(pub(crate) T);
+
+/// Implements root node lookup for any [PathInfoService]. This represents a flat
+/// directory structure like /nix/store where each entry in the root filesystem
+/// directory corresponds to a CA node.
+#[cfg(any(feature = "fuse", feature = "virtiofs"))]
+#[async_trait]
+impl<T> RootNodes for RootNodesWrapper<T>
+where
+    T: AsRef<dyn PathInfoService> + Send + Sync,
+{
+    async fn get_by_basename(&self, name: &[u8]) -> Result<Option<castorepb::node::Node>, Error> {
+        let Ok(store_path) = nix_compat::store_path::StorePath::from_bytes(name) else {
+            return Ok(None);
+        };
+
+        Ok(self
+            .0
+            .as_ref()
+            .get(*store_path.digest())
+            .await?
+            .map(|path_info| {
+                path_info
+                    .node
+                    .expect("missing root node")
+                    .node
+                    .expect("empty node")
+            }))
+    }
+
+    fn list(&self) -> BoxStream<Result<castorepb::node::Node, Error>> {
+        Box::pin(self.0.as_ref().list().map(|result| {
+            result.map(|path_info| {
+                path_info
+                    .node
+                    .expect("missing root node")
+                    .node
+                    .expect("empty node")
+            })
+        }))
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
new file mode 100644
index 0000000000..f6a356cf18
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -0,0 +1,154 @@
+use super::PathInfoService;
+use crate::{
+    nar::NarCalculationService,
+    proto::{self, ListPathInfoRequest, PathInfo},
+};
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use tonic::{async_trait, transport::Channel, Code};
+use tracing::instrument;
+use tvix_castore::{proto as castorepb, Error};
+
+/// Connects to a (remote) tvix-store PathInfoService over gRPC.
+#[derive(Clone)]
+pub struct GRPCPathInfoService {
+    /// The internal reference to a gRPC client.
+    /// Cloning it is cheap, and it internally handles concurrent requests.
+    grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+}
+
+impl GRPCPathInfoService {
+    /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient].
+    /// panics if called outside the context of a tokio runtime.
+    pub fn from_client(
+        grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
+    ) -> Self {
+        Self { grpc_client }
+    }
+}
+
+#[async_trait]
+impl PathInfoService for GRPCPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let path_info = self
+            .grpc_client
+            .clone()
+            .get(proto::GetPathInfoRequest {
+                by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
+                    digest.to_vec().into(),
+                )),
+            })
+            .await;
+
+        match path_info {
+            Ok(path_info) => {
+                let path_info = path_info.into_inner();
+
+                path_info
+                    .validate()
+                    .map_err(|e| Error::StorageError(format!("invalid pathinfo: {}", e)))?;
+
+                Ok(Some(path_info))
+            }
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(Error::StorageError(e.to_string())),
+        }
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        let path_info = self
+            .grpc_client
+            .clone()
+            .put(path_info)
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .into_inner();
+
+        Ok(path_info)
+    }
+
+    #[instrument(level = "trace", skip_all)]
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let mut grpc_client = self.grpc_client.clone();
+
+        let stream = try_stream! {
+            let resp = grpc_client.list(ListPathInfoRequest::default()).await;
+
+            let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner();
+
+            loop {
+                match stream.message().await {
+                    Ok(o) => match o {
+                        Some(pathinfo) => {
+                            // validate the pathinfo
+                            if let Err(e) = pathinfo.validate() {
+                                Err(Error::StorageError(format!(
+                                    "pathinfo {:?} failed validation: {}",
+                                    pathinfo, e
+                                )))?;
+                            }
+                            yield pathinfo
+                        }
+                        None => {
+                            return;
+                        },
+                    },
+                    Err(e) => Err(Error::StorageError(e.to_string()))?,
+                }
+            }
+        };
+
+        Box::pin(stream)
+    }
+}
+
+#[async_trait]
+impl NarCalculationService for GRPCPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        let path_info = self
+            .grpc_client
+            .clone()
+            .calculate_nar(castorepb::Node {
+                node: Some(root_node.clone()),
+            })
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .into_inner();
+
+        let nar_sha256: [u8; 32] = path_info
+            .nar_sha256
+            .to_vec()
+            .try_into()
+            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
+
+        Ok((path_info.nar_size, nar_sha256))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
+    use crate::pathinfoservice::PathInfoService;
+    use crate::tests::fixtures;
+
+    /// This ensures connecting via gRPC works as expected.
+    #[tokio::test]
+    async fn test_valid_unix_path_ping_pong() {
+        let (_blob_service, _directory_service, path_info_service) =
+            make_grpc_path_info_service_client().await;
+
+        let path_info = path_info_service
+            .get(fixtures::DUMMY_PATH_DIGEST)
+            .await
+            .expect("must not be error");
+
+        assert!(path_info.is_none());
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs
new file mode 100644
index 0000000000..da674f497a
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/lru.rs
@@ -0,0 +1,128 @@
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use lru::LruCache;
+use nix_compat::nixbase32;
+use std::num::NonZeroUsize;
+use std::sync::Arc;
+use tokio::sync::RwLock;
+use tonic::async_trait;
+use tracing::instrument;
+
+use crate::proto::PathInfo;
+use tvix_castore::Error;
+
+use super::PathInfoService;
+
+pub struct LruPathInfoService {
+    lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>,
+}
+
+impl LruPathInfoService {
+    pub fn with_capacity(capacity: NonZeroUsize) -> Self {
+        Self {
+            lru: Arc::new(RwLock::new(LruCache::new(capacity))),
+        }
+    }
+}
+
+#[async_trait]
+impl PathInfoService for LruPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        Ok(self.lru.write().await.get(&digest).cloned())
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // call validate
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("invalid PathInfo: {}", e)))?;
+
+        self.lru
+            .write()
+            .await
+            .put(*store_path.digest(), path_info.clone());
+
+        Ok(path_info)
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let lru = self.lru.clone();
+        Box::pin(try_stream! {
+            let lru = lru.read().await;
+            let it = lru.iter();
+
+            for (_k,v) in it {
+                yield v.clone()
+            }
+        })
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::num::NonZeroUsize;
+
+    use crate::{
+        pathinfoservice::{LruPathInfoService, PathInfoService},
+        proto::PathInfo,
+        tests::fixtures::PATH_INFO_WITH_NARINFO,
+    };
+    use lazy_static::lazy_static;
+    use tvix_castore::proto as castorepb;
+
+    lazy_static! {
+        static ref PATHINFO_1: PathInfo = PATH_INFO_WITH_NARINFO.clone();
+        static ref PATHINFO_1_DIGEST: [u8; 20] = [0; 20];
+        static ref PATHINFO_2: PathInfo = {
+            let mut p = PATHINFO_1.clone();
+            let root_node = p.node.as_mut().unwrap();
+            if let castorepb::Node { node: Some(node) } = root_node {
+                let n = node.to_owned();
+                *node = n.rename("11111111111111111111111111111111-dummy2".into());
+            } else {
+                unreachable!()
+            }
+            p
+        };
+        static ref PATHINFO_2_DIGEST: [u8; 20] = *(PATHINFO_2.validate().unwrap()).digest();
+    }
+
+    #[tokio::test]
+    async fn evict() {
+        let svc = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap());
+
+        // pathinfo_1 should not be there
+        assert!(svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .is_none());
+
+        // insert it
+        svc.put(PATHINFO_1.clone()).await.expect("no error");
+
+        // now it should be there.
+        assert_eq!(
+            Some(PATHINFO_1.clone()),
+            svc.get(*PATHINFO_1_DIGEST).await.expect("no error")
+        );
+
+        // insert pathinfo_2. This will evict pathinfo 1
+        svc.put(PATHINFO_2.clone()).await.expect("no error");
+
+        // now pathinfo 2 should be there.
+        assert_eq!(
+            Some(PATHINFO_2.clone()),
+            svc.get(*PATHINFO_2_DIGEST).await.expect("no error")
+        );
+
+        // … but pathinfo 1 not anymore.
+        assert!(svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .is_none());
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
new file mode 100644
index 0000000000..3de3221df2
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -0,0 +1,61 @@
+use super::PathInfoService;
+use crate::proto::PathInfo;
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use std::{collections::HashMap, sync::Arc};
+use tokio::sync::RwLock;
+use tonic::async_trait;
+use tracing::instrument;
+use tvix_castore::Error;
+
+#[derive(Default)]
+pub struct MemoryPathInfoService {
+    db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>,
+}
+
+#[async_trait]
+impl PathInfoService for MemoryPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let db = self.db.read().await;
+
+        match db.get(&digest) {
+            None => Ok(None),
+            Some(path_info) => Ok(Some(path_info.clone())),
+        }
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Call validate on the received PathInfo message.
+        match path_info.validate() {
+            Err(e) => Err(Error::InvalidRequest(format!(
+                "failed to validate PathInfo: {}",
+                e
+            ))),
+
+            // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
+            // This overwrites existing PathInfo objects.
+            Ok(nix_path) => {
+                let mut db = self.db.write().await;
+                db.insert(*nix_path.digest(), path_info.clone());
+
+                Ok(path_info)
+            }
+        }
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let db = self.db.clone();
+
+        Box::pin(try_stream! {
+            let db = db.read().await;
+            let it = db.iter();
+
+            for (_k, v) in it {
+                yield v.clone()
+            }
+        })
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
new file mode 100644
index 0000000000..574bcc0b8b
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -0,0 +1,73 @@
+mod combinators;
+mod from_addr;
+mod grpc;
+mod lru;
+mod memory;
+mod nix_http;
+mod sled;
+
+#[cfg(any(feature = "fuse", feature = "virtiofs"))]
+mod fs;
+
+#[cfg(test)]
+mod tests;
+
+use futures::stream::BoxStream;
+use tonic::async_trait;
+use tvix_castore::Error;
+
+use crate::proto::PathInfo;
+
+pub use self::combinators::Cache as CachePathInfoService;
+pub use self::from_addr::from_addr;
+pub use self::grpc::GRPCPathInfoService;
+pub use self::lru::LruPathInfoService;
+pub use self::memory::MemoryPathInfoService;
+pub use self::nix_http::NixHTTPPathInfoService;
+pub use self::sled::SledPathInfoService;
+
+#[cfg(feature = "cloud")]
+mod bigtable;
+#[cfg(feature = "cloud")]
+pub use self::bigtable::BigtablePathInfoService;
+
+#[cfg(any(feature = "fuse", feature = "virtiofs"))]
+pub use self::fs::make_fs;
+
+/// The base trait all PathInfo services need to implement.
+#[async_trait]
+pub trait PathInfoService: Send + Sync {
+    /// Retrieve a PathInfo message by the output digest.
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>;
+
+    /// Store a PathInfo message. Implementations MUST call validate and reject
+    /// invalid messages.
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>;
+
+    /// Iterate over all PathInfo objects in the store.
+    /// Implementations can decide to disallow listing.
+    ///
+    /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
+    /// and the box allows different underlying stream implementations to be returned since
+    /// Rust doesn't support this as a generic in traits yet. This is the same thing that
+    /// [async_trait] generates, but for streams instead of futures.
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>;
+}
+
+#[async_trait]
+impl<A> PathInfoService for A
+where
+    A: AsRef<dyn PathInfoService> + Send + Sync,
+{
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        self.as_ref().get(digest).await
+    }
+
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        self.as_ref().put(path_info).await
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        self.as_ref().list()
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
new file mode 100644
index 0000000000..cccd4805c6
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -0,0 +1,266 @@
+use futures::{stream::BoxStream, TryStreamExt};
+use nix_compat::{
+    narinfo::{self, NarInfo},
+    nixbase32,
+    nixhash::NixHash,
+};
+use reqwest::StatusCode;
+use sha2::Digest;
+use std::io::{self, Write};
+use tokio::io::{AsyncRead, BufReader};
+use tokio_util::io::InspectReader;
+use tonic::async_trait;
+use tracing::{debug, instrument, warn};
+use tvix_castore::{
+    blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
+};
+
+use crate::proto::PathInfo;
+
+use super::PathInfoService;
+
+/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
+/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
+/// Store Model.
+/// It implements the [PathInfoService] trait in an interesting way:
+/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file,
+/// inserting components into a [BlobService] and [DirectoryService], then
+/// returning a [PathInfo] struct with the root.
+///
+/// Due to this being quite a costly operation, clients are expected to layer
+/// this service with store composition, so they're only ingested once.
+///
+/// The client is expected to be (indirectly) using the same [BlobService] and
+/// [DirectoryService], so able to fetch referred Directories and Blobs.
+/// [PathInfoService::put] is not implemented and returns an error if called.
+/// TODO: what about reading from nix-cache-info?
+pub struct NixHTTPPathInfoService<BS, DS> {
+    base_url: url::Url,
+    http_client: reqwest::Client,
+
+    blob_service: BS,
+    directory_service: DS,
+
+    /// An optional list of [narinfo::PubKey].
+    /// If set, the .narinfo files received need to have correct signature by at least one of these.
+    public_keys: Option<Vec<narinfo::PubKey>>,
+}
+
+impl<BS, DS> NixHTTPPathInfoService<BS, DS> {
+    pub fn new(base_url: url::Url, blob_service: BS, directory_service: DS) -> Self {
+        Self {
+            base_url,
+            http_client: reqwest::Client::new(),
+            blob_service,
+            directory_service,
+
+            public_keys: None,
+        }
+    }
+
+    /// Configures [Self] to validate NARInfo fingerprints with the public keys passed.
+    pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::PubKey>) {
+        self.public_keys = Some(public_keys);
+    }
+}
+
+#[async_trait]
+impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS>
+where
+    BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
+    DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
+{
+    #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let narinfo_url = self
+            .base_url
+            .join(&format!("{}.narinfo", nixbase32::encode(&digest)))
+            .map_err(|e| {
+                warn!(e = %e, "unable to join URL");
+                io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
+            })?;
+
+        debug!(narinfo_url= %narinfo_url, "constructed NARInfo url");
+
+        let resp = self
+            .http_client
+            .get(narinfo_url)
+            .send()
+            .await
+            .map_err(|e| {
+                warn!(e=%e,"unable to send NARInfo request");
+                io::Error::new(
+                    io::ErrorKind::InvalidInput,
+                    "unable to send NARInfo request",
+                )
+            })?;
+
+        // In the case of a 404, return a NotFound.
+        // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix,
+        // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org.
+        if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN {
+            return Ok(None);
+        }
+
+        let narinfo_str = resp.text().await.map_err(|e| {
+            warn!(e=%e,"unable to decode response as string");
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "unable to decode response as string",
+            )
+        })?;
+
+        // parse the received narinfo
+        let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| {
+            warn!(e=%e,"unable to parse response as NarInfo");
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "unable to parse response as NarInfo",
+            )
+        })?;
+
+        // if [self.public_keys] is set, ensure there's at least one valid signature.
+        if let Some(public_keys) = &self.public_keys {
+            let fingerprint = narinfo.fingerprint();
+
+            if !public_keys.iter().any(|pubkey| {
+                narinfo
+                    .signatures
+                    .iter()
+                    .any(|sig| pubkey.verify(&fingerprint, sig))
+            }) {
+                warn!("no valid signature found");
+                Err(io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    "no valid signature found",
+                ))?;
+            }
+        }
+
+        // Convert to a (sparse) PathInfo. We still need to populate the node field,
+        // and for this we need to download the NAR file.
+        // FUTUREWORK: Keep some database around mapping from narsha256 to
+        // (unnamed) rootnode, so we can use that (and the name from the
+        // StorePath) and avoid downloading the same NAR a second time.
+        let pathinfo: PathInfo = (&narinfo).into();
+
+        // create a request for the NAR file itself.
+        let nar_url = self.base_url.join(narinfo.url).map_err(|e| {
+            warn!(e = %e, "unable to join URL");
+            io::Error::new(io::ErrorKind::InvalidInput, "unable to join url")
+        })?;
+        debug!(nar_url= %nar_url, "constructed NAR url");
+
+        let resp = self
+            .http_client
+            .get(nar_url.clone())
+            .send()
+            .await
+            .map_err(|e| {
+                warn!(e=%e,"unable to send NAR request");
+                io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request")
+            })?;
+
+        // if the request is not successful, return an error.
+        if !resp.status().is_success() {
+            return Err(Error::StorageError(format!(
+                "unable to retrieve NAR at {}, status {}",
+                nar_url,
+                resp.status()
+            )));
+        }
+
+        // get a reader of the response body.
+        let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
+            let e = e.without_url();
+            warn!(e=%e, "failed to get response body");
+            io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+        }));
+
+        // handle decompression, depending on the compression field.
+        let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
+            Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
+            Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some(comp_str) => {
+                return Err(Error::StorageError(format!(
+                    "unsupported compression: {comp_str}"
+                )));
+            }
+        };
+        let mut nar_hash = sha2::Sha256::new();
+        let mut nar_size = 0;
+
+        // Assemble NarHash and NarSize as we read bytes.
+        let r = InspectReader::new(r, |b| {
+            nar_size += b.len() as u64;
+            nar_hash.write_all(b).unwrap();
+        });
+
+        // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors.
+        let mut r = BufReader::new(r);
+
+        let root_node = crate::nar::ingest_nar(
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+            &mut r,
+        )
+        .await
+        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
+
+        // ensure the ingested narhash and narsize do actually match.
+        if narinfo.nar_size != nar_size {
+            warn!(
+                narinfo.nar_size = narinfo.nar_size,
+                http.nar_size = nar_size,
+                "NarSize mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarSize mismatch".to_string(),
+            ))?;
+        }
+        let nar_hash: [u8; 32] = nar_hash.finalize().into();
+        if narinfo.nar_hash != nar_hash {
+            warn!(
+                narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
+                http.nar_hash = %NixHash::Sha256(nar_hash),
+                "NarHash mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarHash mismatch".to_string(),
+            ))?;
+        }
+
+        Ok(Some(PathInfo {
+            node: Some(castorepb::Node {
+                // set the name of the root node to the digest-name of the store path.
+                node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
+            }),
+            references: pathinfo.references,
+            narinfo: pathinfo.narinfo,
+        }))
+    }
+
+    #[instrument(skip_all, fields(path_info=?_path_info))]
+    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
+        Err(Error::InvalidRequest(
+            "put not supported for this backend".to_string(),
+        ))
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        Box::pin(futures::stream::once(async {
+            Err(Error::InvalidRequest(
+                "list not supported for this backend".to_string(),
+            ))
+        }))
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
new file mode 100644
index 0000000000..eb3cf2ff1b
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -0,0 +1,117 @@
+use super::PathInfoService;
+use crate::proto::PathInfo;
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use prost::Message;
+use std::path::Path;
+use tonic::async_trait;
+use tracing::instrument;
+use tracing::warn;
+use tvix_castore::Error;
+
+/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
+///
+/// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
+/// as that's currently the only request type available.
+pub struct SledPathInfoService {
+    db: sled::Db,
+}
+
+impl SledPathInfoService {
+    pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
+        let config = sled::Config::default()
+            .use_compression(false) // is a required parameter
+            .path(p);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+
+    pub fn new_temporary() -> Result<Self, sled::Error> {
+        let config = sled::Config::default().temporary(true);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+}
+
+#[async_trait]
+impl PathInfoService for SledPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        let resp = tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            move || db.get(digest.as_slice())
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to retrieve PathInfo: {}", e);
+            Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+        })?;
+        match resp {
+            None => Ok(None),
+            Some(data) => {
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
+                    warn!("failed to decode stored PathInfo: {}", e);
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+                Ok(Some(path_info))
+            }
+        }
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // Call validate on the received PathInfo message.
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?;
+
+        // In case the PathInfo is valid, we were able to parse a StorePath.
+        // Store it in the database, keyed by its digest.
+        // This overwrites existing PathInfo objects.
+        tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            let k = *store_path.digest();
+            let data = path_info.encode_to_vec();
+            move || db.insert(k, data)
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to insert PathInfo: {}", e);
+            Error::StorageError(format! {
+                "failed to insert PathInfo: {}", e
+            })
+        })?;
+
+        Ok(path_info)
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let db = self.db.clone();
+        let mut it = db.iter().values();
+
+        Box::pin(try_stream! {
+            // Don't block the executor while waiting for .next(), so wrap that
+            // in a spawn_blocking call.
+            // We need to pass around it to be able to reuse it.
+            while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || {
+                (it.next(), it)
+            }).await? {
+                it = new_it;
+                let data = elem.map_err(|e| {
+                    warn!("failed to retrieve PathInfo: {}", e);
+                    Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+                })?;
+
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
+                    warn!("failed to decode stored PathInfo: {}", e);
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+
+                yield path_info
+            }
+        })
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs
new file mode 100644
index 0000000000..061655e4ba
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/tests/mod.rs
@@ -0,0 +1,72 @@
+//! This contains test scenarios that a given [PathInfoService] needs to pass.
+//! We use [rstest] and [rstest_reuse] to provide all services we want to test
+//! against, and then apply this template to all test functions.
+
+use rstest::*;
+use rstest_reuse::{self, *};
+
+use super::PathInfoService;
+use crate::pathinfoservice::MemoryPathInfoService;
+use crate::pathinfoservice::SledPathInfoService;
+use crate::proto::PathInfo;
+use crate::tests::fixtures::DUMMY_PATH_DIGEST;
+use tvix_castore::proto as castorepb;
+
+mod utils;
+pub use self::utils::make_grpc_path_info_service_client;
+
+#[cfg(all(feature = "cloud", feature = "integration"))]
+use self::utils::make_bigtable_path_info_service;
+
+#[template]
+#[rstest]
+#[case::memory(MemoryPathInfoService::default())]
+#[case::grpc({
+    let (_, _, svc) = make_grpc_path_info_service_client().await;
+    svc
+})]
+#[case::sled(SledPathInfoService::new_temporary().unwrap())]
+#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))]
+pub fn path_info_services(#[case] svc: impl PathInfoService) {}
+
+// FUTUREWORK: add more tests rejecting invalid PathInfo messages.
+// A subset of them should also ensure references to other PathInfos, or
+// elements in {Blob,Directory}Service do exist.
+
+/// Trying to get a non-existent PathInfo should return Ok(None).
+#[apply(path_info_services)]
+#[tokio::test]
+async fn not_found(svc: impl PathInfoService) {
+    assert!(svc
+        .get(DUMMY_PATH_DIGEST)
+        .await
+        .expect("must succeed")
+        .is_none());
+}
+
+/// Put a PathInfo into the store, get it back.
+#[apply(path_info_services)]
+#[tokio::test]
+async fn put_get(svc: impl PathInfoService) {
+    let path_info = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+                name: "00000000000000000000000000000000-foo".into(),
+                target: "doesntmatter".into(),
+            })),
+        }),
+        ..Default::default()
+    };
+
+    // insert
+    let resp = svc.put(path_info.clone()).await.expect("must succeed");
+
+    // expect the returned PathInfo to be equal (for now)
+    // in the future, some stores might add additional fields/signatures.
+    assert_eq!(path_info, resp);
+
+    // get it back
+    let resp = svc.get(DUMMY_PATH_DIGEST).await.expect("must succeed");
+
+    assert_eq!(Some(path_info), resp);
+}
diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs
new file mode 100644
index 0000000000..ee170468d1
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/tests/utils.rs
@@ -0,0 +1,75 @@
+use std::sync::Arc;
+
+use tonic::transport::{Endpoint, Server, Uri};
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
+
+use crate::{
+    nar::{NarCalculationService, SimpleRenderer},
+    pathinfoservice::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService},
+    proto::{
+        path_info_service_client::PathInfoServiceClient,
+        path_info_service_server::PathInfoServiceServer, GRPCPathInfoServiceWrapper,
+    },
+    tests::fixtures::{blob_service, directory_service},
+};
+
+/// Constructs and returns a gRPC PathInfoService.
+/// We also return memory-based {Blob,Directory}Service,
+/// as the consumer of this function accepts a 3-tuple.
+pub async fn make_grpc_path_info_service_client(
+) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) {
+    let (left, right) = tokio::io::duplex(64);
+
+    let blob_service = blob_service();
+    let directory_service = directory_service();
+
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn({
+        let blob_service = blob_service.clone();
+        let directory_service = directory_service.clone();
+        async move {
+            let path_info_service: Arc<dyn PathInfoService> =
+                Arc::from(MemoryPathInfoService::default());
+            let nar_calculation_service =
+                Box::new(SimpleRenderer::new(blob_service, directory_service))
+                    as Box<dyn NarCalculationService>;
+
+            // spin up a new PathInfoService
+            let mut server = Server::builder();
+            let router = server.add_service(PathInfoServiceServer::new(
+                GRPCPathInfoServiceWrapper::new(path_info_service, nar_calculation_service),
+            ));
+
+            router
+                .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+                .await
+        }
+    });
+
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+
+    let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    ));
+
+    (blob_service, directory_service, path_info_service)
+}
+
+#[cfg(all(feature = "cloud", feature = "integration"))]
+pub(crate) async fn make_bigtable_path_info_service(
+) -> crate::pathinfoservice::BigtablePathInfoService {
+    use crate::pathinfoservice::bigtable::BigtableParameters;
+    use crate::pathinfoservice::BigtablePathInfoService;
+
+    BigtablePathInfoService::connect(BigtableParameters::default_for_tests())
+        .await
+        .unwrap()
+}
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
new file mode 100644
index 0000000000..68f5575676
--- /dev/null
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -0,0 +1,124 @@
+use crate::nar::{NarCalculationService, RenderError};
+use crate::pathinfoservice::PathInfoService;
+use crate::proto;
+use futures::{stream::BoxStream, TryStreamExt};
+use std::ops::Deref;
+use tonic::{async_trait, Request, Response, Result, Status};
+use tracing::{instrument, warn};
+use tvix_castore::proto as castorepb;
+
+pub struct GRPCPathInfoServiceWrapper<PS, NS> {
+    path_info_service: PS,
+    // FUTUREWORK: allow exposing without allowing listing
+    nar_calculation_service: NS,
+}
+
+impl<PS, NS> GRPCPathInfoServiceWrapper<PS, NS> {
+    pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self {
+        Self {
+            path_info_service,
+            nar_calculation_service,
+        }
+    }
+}
+
+#[async_trait]
+impl<PS, NS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS>
+where
+    PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
+    NS: NarCalculationService + Send + Sync + 'static,
+{
+    type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
+
+    #[instrument(skip_all)]
+    async fn get(
+        &self,
+        request: Request<proto::GetPathInfoRequest>,
+    ) -> Result<Response<proto::PathInfo>> {
+        match request.into_inner().by_what {
+            None => Err(Status::unimplemented("by_what needs to be specified")),
+            Some(proto::get_path_info_request::ByWhat::ByOutputHash(output_digest)) => {
+                let digest: [u8; 20] = output_digest
+                    .to_vec()
+                    .try_into()
+                    .map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
+                match self.path_info_service.get(digest).await {
+                    Ok(None) => Err(Status::not_found("PathInfo not found")),
+                    Ok(Some(path_info)) => Ok(Response::new(path_info)),
+                    Err(e) => {
+                        warn!(err = %e, "failed to get PathInfo");
+                        Err(e.into())
+                    }
+                }
+            }
+        }
+    }
+
+    #[instrument(skip_all)]
+    async fn put(&self, request: Request<proto::PathInfo>) -> Result<Response<proto::PathInfo>> {
+        let path_info = request.into_inner();
+
+        // Store the PathInfo in the client. Clients MUST validate the data
+        // they receive, so we don't validate additionally here.
+        match self.path_info_service.put(path_info).await {
+            Ok(path_info_new) => Ok(Response::new(path_info_new)),
+            Err(e) => {
+                warn!(err = %e, "failed to put PathInfo");
+                Err(e.into())
+            }
+        }
+    }
+
+    #[instrument(skip_all)]
+    async fn calculate_nar(
+        &self,
+        request: Request<castorepb::Node>,
+    ) -> Result<Response<proto::CalculateNarResponse>> {
+        match request.into_inner().node {
+            None => Err(Status::invalid_argument("no root node sent")),
+            Some(root_node) => {
+                if let Err(e) = root_node.validate() {
+                    warn!(err = %e, "invalid root node");
+                    Err(Status::invalid_argument("invalid root node"))?
+                }
+
+                match self.nar_calculation_service.calculate_nar(&root_node).await {
+                    Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
+                        nar_size,
+                        nar_sha256: nar_sha256.to_vec().into(),
+                    })),
+                    Err(e) => {
+                        warn!(err = %e, "error during NAR calculation");
+                        Err(e.into())
+                    }
+                }
+            }
+        }
+    }
+
+    #[instrument(skip_all, err)]
+    async fn list(
+        &self,
+        _request: Request<proto::ListPathInfoRequest>,
+    ) -> Result<Response<Self::ListStream>, Status> {
+        let stream = Box::pin(
+            self.path_info_service
+                .list()
+                .map_err(|e| Status::internal(e.to_string())),
+        );
+
+        Ok(Response::new(Box::pin(stream)))
+    }
+}
+
+impl From<RenderError> for tonic::Status {
+    fn from(value: RenderError) -> Self {
+        match value {
+            RenderError::BlobNotFound(_, _) => Self::not_found(value.to_string()),
+            RenderError::DirectoryNotFound(_, _) => Self::not_found(value.to_string()),
+            RenderError::NARWriterError(_) => Self::internal(value.to_string()),
+            RenderError::StoreError(_) => Self::internal(value.to_string()),
+            RenderError::UnexpectedBlobMeta(_, _, _, _) => Self::internal(value.to_string()),
+        }
+    }
+}
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
new file mode 100644
index 0000000000..a09839c8bd
--- /dev/null
+++ b/tvix/store/src/proto/mod.rs
@@ -0,0 +1,374 @@
+#![allow(clippy::derive_partial_eq_without_eq, non_snake_case)]
+use bstr::ByteSlice;
+use bytes::Bytes;
+use data_encoding::BASE64;
+// https://github.com/hyperium/tonic/issues/1056
+use nix_compat::{
+    narinfo::Flags,
+    nixhash::{CAHash, NixHash},
+    store_path::{self, StorePathRef},
+};
+use thiserror::Error;
+use tvix_castore::proto::{self as castorepb, NamedNode, ValidateNodeError};
+
+mod grpc_pathinfoservice_wrapper;
+
+pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper;
+
+tonic::include_proto!("tvix.store.v1");
+
+#[cfg(feature = "tonic-reflection")]
+/// Compiled file descriptors for implementing [gRPC
+/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g.
+/// [`tonic_reflection`](https://docs.rs/tonic-reflection).
+pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.store.v1");
+
+#[cfg(test)]
+mod tests;
+
+/// Errors that can occur during the validation of PathInfo messages.
+#[derive(Debug, Error, PartialEq)]
+pub enum ValidatePathInfoError {
+    /// Invalid length of a reference
+    #[error("Invalid length of digest at position {}, expected {}, got {}", .0, store_path::DIGEST_SIZE, .1)]
+    InvalidReferenceDigestLen(usize, usize),
+
+    /// No node present
+    #[error("No node present")]
+    NoNodePresent,
+
+    /// Node fails validation
+    #[error("Invalid root node: {:?}", .0.to_string())]
+    InvalidRootNode(ValidateNodeError),
+
+    /// Invalid node name encountered. Root nodes in PathInfos have more strict name requirements
+    #[error("Failed to parse {} as StorePath: {1}", .0.to_str_lossy())]
+    InvalidNodeName(Vec<u8>, store_path::Error),
+
+    /// The digest in narinfo.nar_sha256 has an invalid len.
+    #[error("Invalid narinfo.nar_sha256 length: expected {}, got {}", 32, .0)]
+    InvalidNarSha256DigestLen(usize),
+
+    /// The number of references in the narinfo.reference_names field does not match
+    /// the number of references in the .references field.
+    #[error("Inconsistent Number of References: {0} (references) vs {1} (narinfo)")]
+    InconsistentNumberOfReferences(usize, usize),
+
+    /// A string in narinfo.reference_names does not parse to a [store_path::StorePath].
+    #[error("Invalid reference_name at position {0}: {1}")]
+    InvalidNarinfoReferenceName(usize, String),
+
+    /// The digest in the parsed `.narinfo.reference_names[i]` does not match
+    /// the one in `.references[i]`.`
+    #[error("digest in reference_name at position {} does not match digest in PathInfo, expected {}, got {}", .0, BASE64.encode(.1), BASE64.encode(.2))]
+    InconsistentNarinfoReferenceNameDigest(
+        usize,
+        [u8; store_path::DIGEST_SIZE],
+        [u8; store_path::DIGEST_SIZE],
+    ),
+
+    /// The deriver field is invalid.
+    #[error("deriver field is invalid: {0}")]
+    InvalidDeriverField(store_path::Error),
+}
+
+/// Parses a root node name.
+///
+/// On success, this returns the parsed [store_path::StorePathRef].
+/// On error, it returns an error generated from the supplied constructor.
+fn parse_node_name_root<E>(
+    name: &[u8],
+    err: fn(Vec<u8>, store_path::Error) -> E,
+) -> Result<store_path::StorePathRef<'_>, E> {
+    store_path::StorePathRef::from_bytes(name).map_err(|e| err(name.to_vec(), e))
+}
+
+impl PathInfo {
+    /// validate performs some checks on the PathInfo struct,
+    /// Returning either a [store_path::StorePath] of the root node, or a
+    /// [ValidatePathInfoError].
+    pub fn validate(&self) -> Result<store_path::StorePathRef<'_>, ValidatePathInfoError> {
+        // ensure the references have the right number of bytes.
+        for (i, reference) in self.references.iter().enumerate() {
+            if reference.len() != store_path::DIGEST_SIZE {
+                return Err(ValidatePathInfoError::InvalidReferenceDigestLen(
+                    i,
+                    reference.len(),
+                ));
+            }
+        }
+
+        // If there is a narinfo field populated…
+        if let Some(narinfo) = &self.narinfo {
+            // ensure the nar_sha256 digest has the correct length.
+            if narinfo.nar_sha256.len() != 32 {
+                return Err(ValidatePathInfoError::InvalidNarSha256DigestLen(
+                    narinfo.nar_sha256.len(),
+                ));
+            }
+
+            // ensure the number of references there matches PathInfo.references count.
+            if narinfo.reference_names.len() != self.references.len() {
+                return Err(ValidatePathInfoError::InconsistentNumberOfReferences(
+                    self.references.len(),
+                    narinfo.reference_names.len(),
+                ));
+            }
+
+            // parse references in reference_names.
+            for (i, reference_name_str) in narinfo.reference_names.iter().enumerate() {
+                // ensure thy parse as (non-absolute) store path
+                let reference_names_store_path = store_path::StorePath::from_bytes(
+                    reference_name_str.as_bytes(),
+                )
+                .map_err(|_| {
+                    ValidatePathInfoError::InvalidNarinfoReferenceName(
+                        i,
+                        reference_name_str.to_owned(),
+                    )
+                })?;
+
+                // ensure their digest matches the one at self.references[i].
+                {
+                    // This is safe, because we ensured the proper length earlier already.
+                    let reference_digest = self.references[i].to_vec().try_into().unwrap();
+
+                    if reference_names_store_path.digest() != &reference_digest {
+                        return Err(
+                            ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest(
+                                i,
+                                reference_digest,
+                                *reference_names_store_path.digest(),
+                            ),
+                        );
+                    }
+                }
+
+                // If the Deriver field is populated, ensure it parses to a
+                // [store_path::StorePath].
+                // We can't check for it to *not* end with .drv, as the .drv files produced by
+                // recursive Nix end with multiple .drv suffixes, and only one is popped when
+                // converting to this field.
+                if let Some(deriver) = &narinfo.deriver {
+                    store_path::StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest)
+                        .map_err(ValidatePathInfoError::InvalidDeriverField)?;
+                }
+            }
+        }
+
+        // Ensure there is a (root) node present, and it properly parses to a [store_path::StorePath].
+        let root_nix_path = match &self.node {
+            None | Some(castorepb::Node { node: None }) => {
+                Err(ValidatePathInfoError::NoNodePresent)?
+            }
+            Some(castorepb::Node { node: Some(node) }) => {
+                node.validate()
+                    .map_err(ValidatePathInfoError::InvalidRootNode)?;
+                // parse the name of the node itself and return
+                parse_node_name_root(node.get_name(), ValidatePathInfoError::InvalidNodeName)?
+            }
+        };
+
+        // return the root nix path
+        Ok(root_nix_path)
+    }
+
+    /// With self and its store path name, this reconstructs a
+    /// [nix_compat::narinfo::NarInfo<'_>].
+    /// It can be used to validate Signatures, or get back a (sparse) NarInfo
+    /// struct to prepare writing it out.
+    ///
+    /// It assumes self to be validated first, and will only return None if the
+    /// `narinfo` field is unpopulated.
+    ///
+    /// It does very little allocation (a Vec each for `signatures` and
+    /// `references`), the rest points to data owned elsewhere.
+    ///
+    /// Keep in mind this is not able to reconstruct all data present in the
+    /// NarInfo<'_>, as some of it is not stored at all:
+    /// - the `system`, `file_hash` and `file_size` fields are set to `None`.
+    /// - the URL is set to an empty string.
+    /// - Compression is set to "none"
+    ///
+    /// If you want to render it out to a string and be able to parse it back
+    /// in, at least URL *must* be set again.
+    pub fn to_narinfo<'a>(
+        &'a self,
+        store_path: store_path::StorePathRef<'a>,
+    ) -> Option<nix_compat::narinfo::NarInfo<'_>> {
+        let narinfo = &self.narinfo.as_ref()?;
+
+        Some(nix_compat::narinfo::NarInfo {
+            flags: Flags::empty(),
+            store_path,
+            nar_hash: narinfo
+                .nar_sha256
+                .as_ref()
+                .try_into()
+                .expect("invalid narhash"),
+            nar_size: narinfo.nar_size,
+            references: narinfo
+                .reference_names
+                .iter()
+                .map(|ref_name| {
+                    // This shouldn't pass validation
+                    StorePathRef::from_bytes(ref_name.as_bytes()).expect("invalid reference")
+                })
+                .collect(),
+            signatures: narinfo
+                .signatures
+                .iter()
+                .map(|sig| {
+                    nix_compat::narinfo::Signature::new(
+                        &sig.name,
+                        // This shouldn't pass validation
+                        sig.data[..].try_into().expect("invalid signature len"),
+                    )
+                })
+                .collect(),
+            ca: narinfo
+                .ca
+                .as_ref()
+                .map(|ca| ca.try_into().expect("invalid ca")),
+            system: None,
+            deriver: narinfo.deriver.as_ref().map(|deriver| {
+                StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest)
+                    .expect("invalid deriver")
+            }),
+            url: "",
+            compression: Some("none"),
+            file_hash: None,
+            file_size: None,
+        })
+    }
+}
+
+/// Errors that can occur when converting from a [nar_info::Ca] to a (stricter)
+/// [nix_compat::nixhash::CAHash].
+#[derive(Debug, Error, PartialEq)]
+pub enum ConvertCAError {
+    /// Invalid length of a reference
+    #[error("Invalid digest length '{0}' for type {1}")]
+    InvalidReferenceDigestLen(usize, &'static str),
+    /// Unknown Hash type
+    #[error("Unknown hash type: {0}")]
+    UnknownHashType(i32),
+}
+
+impl TryFrom<&nar_info::Ca> for nix_compat::nixhash::CAHash {
+    type Error = ConvertCAError;
+
+    fn try_from(value: &nar_info::Ca) -> Result<Self, Self::Error> {
+        Ok(match value.r#type {
+            typ if typ == nar_info::ca::Hash::FlatMd5 as i32 => {
+                Self::Flat(NixHash::Md5(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatMd5")
+                })?))
+            }
+            typ if typ == nar_info::ca::Hash::FlatSha1 as i32 => {
+                Self::Flat(NixHash::Sha1(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha1"),
+                )?))
+            }
+            typ if typ == nar_info::ca::Hash::FlatSha256 as i32 => {
+                Self::Flat(NixHash::Sha256(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha256"),
+                )?))
+            }
+            typ if typ == nar_info::ca::Hash::FlatSha512 as i32 => Self::Flat(NixHash::Sha512(
+                Box::new(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "FlatSha512")
+                })?),
+            )),
+            typ if typ == nar_info::ca::Hash::NarMd5 as i32 => {
+                Self::Nar(NixHash::Md5(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarMd5")
+                })?))
+            }
+            typ if typ == nar_info::ca::Hash::NarSha1 as i32 => {
+                Self::Nar(NixHash::Sha1(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha1"),
+                )?))
+            }
+            typ if typ == nar_info::ca::Hash::NarSha256 as i32 => {
+                Self::Nar(NixHash::Sha256(value.digest[..].try_into().map_err(
+                    |_| ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha256"),
+                )?))
+            }
+            typ if typ == nar_info::ca::Hash::NarSha512 as i32 => Self::Nar(NixHash::Sha512(
+                Box::new(value.digest[..].try_into().map_err(|_| {
+                    ConvertCAError::InvalidReferenceDigestLen(value.digest.len(), "NarSha512")
+                })?),
+            )),
+            typ => return Err(ConvertCAError::UnknownHashType(typ)),
+        })
+    }
+}
+
+impl From<&nix_compat::nixhash::CAHash> for nar_info::ca::Hash {
+    fn from(value: &nix_compat::nixhash::CAHash) -> Self {
+        match value {
+            CAHash::Flat(NixHash::Md5(_)) => nar_info::ca::Hash::FlatMd5,
+            CAHash::Flat(NixHash::Sha1(_)) => nar_info::ca::Hash::FlatSha1,
+            CAHash::Flat(NixHash::Sha256(_)) => nar_info::ca::Hash::FlatSha256,
+            CAHash::Flat(NixHash::Sha512(_)) => nar_info::ca::Hash::FlatSha512,
+            CAHash::Nar(NixHash::Md5(_)) => nar_info::ca::Hash::NarMd5,
+            CAHash::Nar(NixHash::Sha1(_)) => nar_info::ca::Hash::NarSha1,
+            CAHash::Nar(NixHash::Sha256(_)) => nar_info::ca::Hash::NarSha256,
+            CAHash::Nar(NixHash::Sha512(_)) => nar_info::ca::Hash::NarSha512,
+            CAHash::Text(_) => nar_info::ca::Hash::TextSha256,
+        }
+    }
+}
+
+impl From<&nix_compat::nixhash::CAHash> for nar_info::Ca {
+    fn from(value: &nix_compat::nixhash::CAHash) -> Self {
+        nar_info::Ca {
+            r#type: Into::<nar_info::ca::Hash>::into(value) as i32,
+            digest: value.hash().digest_as_bytes().to_vec().into(),
+        }
+    }
+}
+
+impl From<&nix_compat::narinfo::NarInfo<'_>> for NarInfo {
+    /// Converts from a NarInfo (returned from the NARInfo parser) to the proto-
+    /// level NarInfo struct.
+    fn from(value: &nix_compat::narinfo::NarInfo<'_>) -> Self {
+        let signatures = value
+            .signatures
+            .iter()
+            .map(|sig| nar_info::Signature {
+                name: sig.name().to_string(),
+                data: Bytes::copy_from_slice(sig.bytes()),
+            })
+            .collect();
+
+        NarInfo {
+            nar_size: value.nar_size,
+            nar_sha256: Bytes::copy_from_slice(&value.nar_hash),
+            signatures,
+            reference_names: value.references.iter().map(|r| r.to_string()).collect(),
+            deriver: value.deriver.as_ref().map(|sp| StorePath {
+                name: sp.name().to_owned(),
+                digest: Bytes::copy_from_slice(sp.digest()),
+            }),
+            ca: value.ca.as_ref().map(|ca| ca.into()),
+        }
+    }
+}
+
+impl From<&nix_compat::narinfo::NarInfo<'_>> for PathInfo {
+    /// Converts from a NarInfo (returned from the NARInfo parser) to a PathInfo
+    /// struct with the node set to None.
+    fn from(value: &nix_compat::narinfo::NarInfo<'_>) -> Self {
+        Self {
+            node: None,
+            references: value
+                .references
+                .iter()
+                .map(|x| Bytes::copy_from_slice(x.digest()))
+                .collect(),
+            narinfo: Some(value.into()),
+        }
+    }
+}
diff --git a/tvix/store/src/proto/tests/mod.rs b/tvix/store/src/proto/tests/mod.rs
new file mode 100644
index 0000000000..c9c6702027
--- /dev/null
+++ b/tvix/store/src/proto/tests/mod.rs
@@ -0,0 +1 @@
+mod pathinfo;
diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs
new file mode 100644
index 0000000000..4d0834878d
--- /dev/null
+++ b/tvix/store/src/proto/tests/pathinfo.rs
@@ -0,0 +1,431 @@
+use crate::proto::{nar_info::Signature, NarInfo, PathInfo, ValidatePathInfoError};
+use crate::tests::fixtures::*;
+use bytes::Bytes;
+use data_encoding::BASE64;
+use nix_compat::nixbase32;
+use nix_compat::store_path::{self, StorePathRef};
+use rstest::rstest;
+use tvix_castore::proto as castorepb;
+
+#[rstest]
+#[case::no_node(None, Err(ValidatePathInfoError::NoNodePresent))]
+#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::NoNodePresent))]
+
+fn validate_pathinfo(
+    #[case] node: Option<castorepb::Node>,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+) {
+    // construct the PathInfo object
+    let p = PathInfo {
+        node,
+        ..Default::default()
+    };
+
+    assert_eq!(exp_result, p.validate());
+
+    let err = p.validate().expect_err("validation should fail");
+    assert!(matches!(err, ValidatePathInfoError::NoNodePresent));
+}
+
+#[rstest]
+#[case::ok(castorepb::DirectoryNode {
+        name: DUMMY_PATH.into(),
+        digest: DUMMY_DIGEST.clone().into(),
+        size: 0,
+}, Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))]
+#[case::invalid_digest_length(castorepb::DirectoryNode {
+        name: DUMMY_PATH.into(),
+        digest: Bytes::new(),
+        size: 0,
+}, Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))))]
+#[case::invalid_node_name_no_storepath(castorepb::DirectoryNode {
+        name: "invalid".into(),
+        digest: DUMMY_DIGEST.clone().into(),
+        size: 0,
+}, Err(ValidatePathInfoError::InvalidNodeName(
+        "invalid".into(),
+        store_path::Error::InvalidLength
+)))]
+fn validate_directory(
+    #[case] directory_node: castorepb::DirectoryNode,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+) {
+    // construct the PathInfo object
+    let p = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::Directory(directory_node)),
+        }),
+        ..Default::default()
+    };
+    assert_eq!(exp_result, p.validate());
+}
+
+#[rstest]
+#[case::ok(
+    castorepb::FileNode {
+        name: DUMMY_PATH.into(),
+        digest: DUMMY_DIGEST.clone().into(),
+        size: 0,
+        executable: false,
+    },
+    Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
+)]
+#[case::invalid_digest_len(
+    castorepb::FileNode {
+        name: DUMMY_PATH.into(),
+        digest: Bytes::new(),
+        ..Default::default()
+    },
+    Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0)))
+)]
+#[case::invalid_node_name(
+    castorepb::FileNode {
+        name: "invalid".into(),
+        digest: DUMMY_DIGEST.clone().into(),
+        ..Default::default()
+    },
+    Err(ValidatePathInfoError::InvalidNodeName(
+        "invalid".into(),
+        store_path::Error::InvalidLength
+    ))
+)]
+fn validate_file(
+    #[case] file_node: castorepb::FileNode,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+) {
+    // construct the PathInfo object
+    let p = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::File(file_node)),
+        }),
+        ..Default::default()
+    };
+    assert_eq!(exp_result, p.validate());
+}
+
+#[rstest]
+#[case::ok(
+    castorepb::SymlinkNode {
+        name: DUMMY_PATH.into(),
+        target: "foo".into(),
+    },
+    Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
+)]
+#[case::invalid_node_name(
+    castorepb::SymlinkNode {
+        name: "invalid".into(),
+        target: "foo".into(),
+    },
+    Err(ValidatePathInfoError::InvalidNodeName(
+        "invalid".into(),
+        store_path::Error::InvalidLength
+    ))
+)]
+fn validate_symlink(
+    #[case] symlink_node: castorepb::SymlinkNode,
+    #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
+) {
+    // construct the PathInfo object
+    let p = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::Symlink(symlink_node)),
+        }),
+        ..Default::default()
+    };
+    assert_eq!(exp_result, p.validate());
+}
+
+/// Ensure parsing a correct PathInfo without narinfo populated succeeds.
+#[test]
+fn validate_references_without_narinfo_ok() {
+    assert!(PATH_INFO_WITHOUT_NARINFO.validate().is_ok());
+}
+
+/// Ensure parsing a correct PathInfo with narinfo populated succeeds.
+#[test]
+fn validate_references_with_narinfo_ok() {
+    assert!(PATH_INFO_WITH_NARINFO.validate().is_ok());
+}
+
+/// Create a PathInfo with a wrong digest length in narinfo.nar_sha256, and
+/// ensure validation fails.
+#[test]
+fn validate_wrong_nar_sha256() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+    path_info.narinfo.as_mut().unwrap().nar_sha256 = vec![0xbe, 0xef].into();
+
+    match path_info.validate().expect_err("must_fail") {
+        ValidatePathInfoError::InvalidNarSha256DigestLen(2) => {}
+        e => panic!("unexpected error: {:?}", e),
+    };
+}
+
+/// Create a PathInfo with a wrong count of narinfo.reference_names,
+/// and ensure validation fails.
+#[test]
+fn validate_inconsistent_num_refs_fail() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+    path_info.narinfo.as_mut().unwrap().reference_names = vec![];
+
+    match path_info.validate().expect_err("must_fail") {
+        ValidatePathInfoError::InconsistentNumberOfReferences(1, 0) => {}
+        e => panic!("unexpected error: {:?}", e),
+    };
+}
+
+/// Create a PathInfo with a wrong digest length in references.
+#[test]
+fn validate_invalid_reference_digest_len() {
+    let mut path_info = PATH_INFO_WITHOUT_NARINFO.clone();
+    path_info.references.push(vec![0xff, 0xff].into());
+
+    match path_info.validate().expect_err("must fail") {
+        ValidatePathInfoError::InvalidReferenceDigestLen(
+            1, // position
+            2, // unexpected digest len
+        ) => {}
+        e => panic!("unexpected error: {:?}", e),
+    };
+}
+
+/// Create a PathInfo with a narinfo.reference_name[1] that is no valid store path.
+#[test]
+fn validate_invalid_narinfo_reference_name() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+
+    // This is invalid, as the store prefix is not part of reference_names.
+    path_info.narinfo.as_mut().unwrap().reference_names[0] =
+        "/nix/store/00000000000000000000000000000000-dummy".to_string();
+
+    match path_info.validate().expect_err("must fail") {
+        ValidatePathInfoError::InvalidNarinfoReferenceName(0, reference_name) => {
+            assert_eq!(
+                "/nix/store/00000000000000000000000000000000-dummy",
+                reference_name
+            );
+        }
+        e => panic!("unexpected error: {:?}", e),
+    }
+}
+
+/// Create a PathInfo with a narinfo.reference_name[0] that doesn't match references[0].
+#[test]
+fn validate_inconsistent_narinfo_reference_name_digest() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+
+    // mutate the first reference, they were all zeroes before
+    path_info.references[0] = vec![0xff; store_path::DIGEST_SIZE].into();
+
+    match path_info.validate().expect_err("must fail") {
+        ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest(0, e_expected, e_actual) => {
+            assert_eq!(path_info.references[0][..], e_expected[..]);
+            assert_eq!(DUMMY_PATH_DIGEST, e_actual);
+        }
+        e => panic!("unexpected error: {:?}", e),
+    }
+}
+
+/// Create a node with an empty symlink target, and ensure it fails validation.
+#[test]
+fn validate_symlink_empty_target_invalid() {
+    let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+        name: "foo".into(),
+        target: "".into(),
+    });
+
+    node.validate().expect_err("must fail validation");
+}
+
+/// Create a node with a symlink target including null bytes, and ensure it
+/// fails validation.
+#[test]
+fn validate_symlink_target_null_byte_invalid() {
+    let node = castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+        name: "foo".into(),
+        target: "foo\0".into(),
+    });
+
+    node.validate().expect_err("must fail validation");
+}
+
+/// Create a PathInfo with a correct deriver field and ensure it succeeds.
+#[test]
+fn validate_valid_deriver() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+
+    // add a valid deriver
+    let narinfo = path_info.narinfo.as_mut().unwrap();
+    narinfo.deriver = Some(crate::proto::StorePath {
+        name: "foo".to_string(),
+        digest: Bytes::from(DUMMY_PATH_DIGEST.as_slice()),
+    });
+
+    path_info.validate().expect("must validate");
+}
+
+/// Create a PathInfo with a broken deriver field and ensure it fails.
+#[test]
+fn validate_invalid_deriver() {
+    let mut path_info = PATH_INFO_WITH_NARINFO.clone();
+
+    // add a broken deriver (invalid digest)
+    let narinfo = path_info.narinfo.as_mut().unwrap();
+    narinfo.deriver = Some(crate::proto::StorePath {
+        name: "foo".to_string(),
+        digest: vec![].into(),
+    });
+
+    match path_info.validate().expect_err("must fail validation") {
+        ValidatePathInfoError::InvalidDeriverField(_) => {}
+        e => panic!("unexpected error: {:?}", e),
+    }
+}
+
+#[test]
+fn from_nixcompat_narinfo() {
+    let narinfo_parsed = nix_compat::narinfo::NarInfo::parse(
+        r#"StorePath: /nix/store/s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1
+URL: nar/1nhgq6wcggx0plpy4991h3ginj6hipsdslv4fd4zml1n707j26yq.nar.xz
+Compression: xz
+FileHash: sha256:1nhgq6wcggx0plpy4991h3ginj6hipsdslv4fd4zml1n707j26yq
+FileSize: 50088
+NarHash: sha256:0yzhigwjl6bws649vcs2asa4lbs8hg93hyix187gc7s7a74w5h80
+NarSize: 226488
+References: 3n58xw4373jp0ljirf06d8077j15pc4j-glibc-2.37-8 s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1
+Deriver: ib3sh3pcz10wsmavxvkdbayhqivbghlq-hello-2.12.1.drv
+Sig: cache.nixos.org-1:8ijECciSFzWHwwGVOIVYdp2fOIOJAfmzGHPQVwpktfTQJF6kMPPDre7UtFw3o+VqenC5P8RikKOAAfN7CvPEAg=="#).expect("must parse");
+
+    assert_eq!(
+        PathInfo {
+            node: None,
+            references: vec![
+                Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("3n58xw4373jp0ljirf06d8077j15pc4j").unwrap()),
+                Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("s66mzxpvicwk07gjbjfw9izjfa797vsw").unwrap()),
+            ],
+            narinfo: Some(
+                NarInfo {
+                    nar_size: 226488,
+                    nar_sha256: Bytes::copy_from_slice(
+                        &nixbase32::decode_fixed::<32>("0yzhigwjl6bws649vcs2asa4lbs8hg93hyix187gc7s7a74w5h80".as_bytes())
+                            .unwrap()
+                    ),
+                    signatures: vec![Signature {
+                        name: "cache.nixos.org-1".to_string(),
+                        data: BASE64.decode("8ijECciSFzWHwwGVOIVYdp2fOIOJAfmzGHPQVwpktfTQJF6kMPPDre7UtFw3o+VqenC5P8RikKOAAfN7CvPEAg==".as_bytes()).unwrap().into(),
+                    }],
+                    reference_names: vec![
+                        "3n58xw4373jp0ljirf06d8077j15pc4j-glibc-2.37-8".to_string(),
+                        "s66mzxpvicwk07gjbjfw9izjfa797vsw-hello-2.12.1".to_string()
+                    ],
+                    deriver: Some(crate::proto::StorePath {
+                        digest: Bytes::copy_from_slice(&nixbase32::decode_fixed::<20>("ib3sh3pcz10wsmavxvkdbayhqivbghlq").unwrap()),
+                        name: "hello-2.12.1".to_string(),
+                     }),
+                    ca: None,
+                }
+            )
+        },
+        (&narinfo_parsed).into(),
+    );
+}
+
+#[test]
+fn from_nixcompat_narinfo_fod() {
+    let narinfo_parsed = nix_compat::narinfo::NarInfo::parse(
+        r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz
+URL: nar/1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r.nar.xz
+Compression: xz
+FileHash: sha256:1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r
+FileSize: 1033524
+NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh
+NarSize: 1033416
+References: 
+Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv
+Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==
+CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"#
+    ).expect("must parse");
+
+    assert_eq!(
+        PathInfo {
+            node: None,
+            references: vec![],
+            narinfo: Some(
+                NarInfo {
+                    nar_size: 1033416,
+                    nar_sha256: Bytes::copy_from_slice(
+                        &nixbase32::decode_fixed::<32>(
+                            "1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh"
+                        )
+                        .unwrap()
+                    ),
+                    signatures: vec![Signature {
+                        name: "cache.nixos.org-1".to_string(),
+                        data: BASE64
+                            .decode("ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==".as_bytes())
+                            .unwrap()
+                            .into(),
+                    }],
+                    reference_names: vec![],
+                    deriver: Some(crate::proto::StorePath {
+                        digest: Bytes::copy_from_slice(
+                            &nixbase32::decode_fixed::<20>("dyivpmlaq2km6c11i0s6bi6mbsx0ylqf").unwrap()
+                        ),
+                        name: "hello-2.12.1.tar.gz".to_string(),
+                    }),
+                    ca: Some(crate::proto::nar_info::Ca {
+                        r#type: crate::proto::nar_info::ca::Hash::FlatSha256.into(),
+                        digest: Bytes::copy_from_slice(
+                            &nixbase32::decode_fixed::<32>(
+                                "086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"
+                            )
+                            .unwrap()
+                        )
+                    }),
+                }
+            ),
+        },
+        (&narinfo_parsed).into()
+    );
+}
+
+/// Exercise .as_narinfo() on a PathInfo and ensure important fields are preserved..
+#[test]
+fn as_narinfo() {
+    let narinfo_parsed = nix_compat::narinfo::NarInfo::parse(
+        r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz
+URL: nar/1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r.nar.xz
+Compression: xz
+FileHash: sha256:1zjrhzhaizsrlsvdkqfl073vivmxcqnzkff4s50i0cdf541ary1r
+FileSize: 1033524
+NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh
+NarSize: 1033416
+References: 
+Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv
+Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==
+CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"#
+    ).expect("must parse");
+
+    let path_info: PathInfo = (&narinfo_parsed).into();
+
+    let mut narinfo_returned = path_info
+        .to_narinfo(
+            StorePathRef::from_bytes(b"pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz")
+                .expect("invalid storepath"),
+        )
+        .expect("must be some");
+    narinfo_returned.url = "some.nar";
+
+    assert_eq!(
+        r#"StorePath: /nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz
+URL: some.nar
+Compression: none
+NarHash: sha256:1lvqpbk2k1sb39z8jfxixf7p7v8sj4z6mmpa44nnmff3w1y6h8lh
+NarSize: 1033416
+References: 
+Deriver: dyivpmlaq2km6c11i0s6bi6mbsx0ylqf-hello-2.12.1.tar.gz.drv
+Sig: cache.nixos.org-1:ywnIG629nQZQhEr6/HLDrLT/mUEp5J1LC6NmWSlJRWL/nM7oGItJQUYWGLvYGhSQvHrhIuvMpjNmBNh/WWqCDg==
+CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd
+"#,
+        narinfo_returned.to_string(),
+    );
+}
diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs
new file mode 100644
index 0000000000..1c8359a2c0
--- /dev/null
+++ b/tvix/store/src/tests/fixtures.rs
@@ -0,0 +1,142 @@
+use lazy_static::lazy_static;
+use rstest::*;
+use std::sync::Arc;
+pub use tvix_castore::fixtures::*;
+use tvix_castore::{
+    blobservice::{BlobService, MemoryBlobService},
+    directoryservice::{DirectoryService, MemoryDirectoryService},
+    proto as castorepb,
+};
+
+use crate::proto::{
+    nar_info::{ca, Ca},
+    NarInfo, PathInfo,
+};
+
+pub const DUMMY_PATH: &str = "00000000000000000000000000000000-dummy";
+pub const DUMMY_PATH_DIGEST: [u8; 20] = [0; 20];
+
+lazy_static! {
+    /// The NAR representation of a symlink pointing to `/nix/store/somewhereelse`
+    pub static ref NAR_CONTENTS_SYMLINK: Vec<u8> = vec![
+        13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0,
+        0, 0, // "nix-archive-1"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink"
+        6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target
+        24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o',
+        b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's',
+        b'e', // "/nix/store/somewhereelse"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")"
+    ];
+
+    /// The NAR representation of a regular file with the contents "Hello World!"
+    pub static ref NAR_CONTENTS_HELLOWORLD: Vec<u8> = vec![
+        13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0,
+        0, 0, // "nix-archive-1"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular"
+        8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents"
+        12, 0, 0, 0, 0, 0, 0, 0, b'H', b'e', b'l', b'l', b'o', b' ', b'W', b'o', b'r', b'l', b'd', b'!', 0, 0,
+        0, 0, // "Hello World!"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0 // ")"
+    ];
+
+    /// The NAR representation of a more complicated directory structure.
+    pub static ref NAR_CONTENTS_COMPLICATED: Vec<u8> = vec![
+        13, 0, 0, 0, 0, 0, 0, 0, b'n', b'i', b'x', b'-', b'a', b'r', b'c', b'h', b'i', b'v', b'e', b'-', b'1', 0,
+        0, 0, // "nix-archive-1"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory"
+        5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name"
+        5, 0, 0, 0, 0, 0, 0, 0, b'.', b'k', b'e', b'e', b'p', 0, 0, 0, // ".keep"
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular"
+        8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents"
+        0, 0, 0, 0, 0, 0, 0, 0, // ""
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name"
+        2, 0, 0, 0, 0, 0, 0, 0, b'a', b'a', 0, 0, 0, 0, 0, 0, // "aa"
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b's', b'y', b'm', b'l', b'i', b'n', b'k', 0, // "symlink"
+        6, 0, 0, 0, 0, 0, 0, 0, b't', b'a', b'r', b'g', b'e', b't', 0, 0, // target
+        24, 0, 0, 0, 0, 0, 0, 0, b'/', b'n', b'i', b'x', b'/', b's', b't', b'o', b'r', b'e', b'/', b's', b'o',
+        b'm', b'e', b'w', b'h', b'e', b'r', b'e', b'e', b'l', b's',
+        b'e', // "/nix/store/somewhereelse"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name"
+        4, 0, 0, 0, 0, 0, 0, 0, b'k', b'e', b'e', b'p', 0, 0, 0, 0, // "keep"
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'o', b'd', b'e', 0, 0, 0, 0, // "node"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        9, 0, 0, 0, 0, 0, 0, 0, b'd', b'i', b'r', b'e', b'c', b't', b'o', b'r', b'y', 0, 0, 0, 0, 0, 0, 0, // "directory"
+        5, 0, 0, 0, 0, 0, 0, 0, b'e', b'n', b't', b'r', b'y', 0, 0, 0, // "entry"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b'n', b'a', b'm', b'e', 0, 0, 0, 0, // "name"
+        5, 0, 0, 0, 0, 0, 0, 0, 46, 107, 101, 101, 112, 0, 0, 0, // ".keep"
+        4, 0, 0, 0, 0, 0, 0, 0, 110, 111, 100, 101, 0, 0, 0, 0, // "node"
+        1, 0, 0, 0, 0, 0, 0, 0, b'(', 0, 0, 0, 0, 0, 0, 0, // "("
+        4, 0, 0, 0, 0, 0, 0, 0, b't', b'y', b'p', b'e', 0, 0, 0, 0, // "type"
+        7, 0, 0, 0, 0, 0, 0, 0, b'r', b'e', b'g', b'u', b'l', b'a', b'r', 0, // "regular"
+        8, 0, 0, 0, 0, 0, 0, 0, b'c', b'o', b'n', b't', b'e', b'n', b't', b's', // "contents"
+        0, 0, 0, 0, 0, 0, 0, 0, // ""
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+        1, 0, 0, 0, 0, 0, 0, 0, b')', 0, 0, 0, 0, 0, 0, 0, // ")"
+    ];
+
+    /// A PathInfo message without .narinfo populated.
+    pub static ref PATH_INFO_WITHOUT_NARINFO : PathInfo = PathInfo {
+        node: Some(castorepb::Node {
+            node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode {
+                name: DUMMY_PATH.into(),
+                digest: DUMMY_DIGEST.clone().into(),
+                size: 0,
+            })),
+        }),
+        references: vec![DUMMY_PATH_DIGEST.as_slice().into()],
+        narinfo: None,
+    };
+
+    /// A PathInfo message with .narinfo populated.
+    /// The references in `narinfo.reference_names` aligns with what's in
+    /// `references`.
+    pub static ref PATH_INFO_WITH_NARINFO : PathInfo = PathInfo {
+        narinfo: Some(NarInfo {
+            nar_size: 0,
+            nar_sha256: DUMMY_DIGEST.clone().into(),
+            signatures: vec![],
+            reference_names: vec![DUMMY_PATH.to_string()],
+            deriver: None,
+            ca: Some(Ca { r#type: ca::Hash::NarSha256.into(), digest:  DUMMY_DIGEST.clone().into() })
+        }),
+      ..PATH_INFO_WITHOUT_NARINFO.clone()
+    };
+}
+
+#[fixture]
+pub(crate) fn blob_service() -> Arc<dyn BlobService> {
+    Arc::from(MemoryBlobService::default())
+}
+
+#[fixture]
+pub(crate) fn directory_service() -> Arc<dyn DirectoryService> {
+    Arc::from(MemoryDirectoryService::default())
+}
diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs
new file mode 100644
index 0000000000..1e7fc3f6b4
--- /dev/null
+++ b/tvix/store/src/tests/mod.rs
@@ -0,0 +1,2 @@
+pub mod fixtures;
+mod nar_renderer;
diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs
new file mode 100644
index 0000000000..8bfb5a72bb
--- /dev/null
+++ b/tvix/store/src/tests/nar_renderer.rs
@@ -0,0 +1,228 @@
+use crate::nar::calculate_size_and_sha256;
+use crate::nar::write_nar;
+use crate::tests::fixtures::blob_service;
+use crate::tests::fixtures::directory_service;
+use crate::tests::fixtures::*;
+use rstest::*;
+use sha2::{Digest, Sha256};
+use std::io;
+use std::sync::Arc;
+use tokio::io::sink;
+use tvix_castore::blobservice::BlobService;
+use tvix_castore::directoryservice::DirectoryService;
+use tvix_castore::proto as castorepb;
+
+#[rstest]
+#[tokio::test]
+async fn single_symlink(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    let mut buf: Vec<u8> = vec![];
+
+    write_nar(
+        &mut buf,
+        &castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+            name: "doesntmatter".into(),
+            target: "/nix/store/somewhereelse".into(),
+        }),
+        // don't put anything in the stores, as we don't actually do any requests.
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect("must succeed");
+
+    assert_eq!(buf, NAR_CONTENTS_SYMLINK.to_vec());
+}
+
+/// Make sure the NARRenderer fails if a referred blob doesn't exist.
+#[rstest]
+#[tokio::test]
+async fn single_file_missing_blob(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    let e = write_nar(
+        sink(),
+        &castorepb::node::Node::File(castorepb::FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+            executable: false,
+        }),
+        // the blobservice is empty intentionally, to provoke the error.
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect_err("must fail");
+
+    match e {
+        crate::nar::RenderError::NARWriterError(e) => {
+            assert_eq!(io::ErrorKind::NotFound, e.kind());
+        }
+        _ => panic!("unexpected error: {:?}", e),
+    }
+}
+
+/// Make sure the NAR Renderer fails if the returned blob meta has another size
+/// than specified in the proto node.
+#[rstest]
+#[tokio::test]
+async fn single_file_wrong_blob_size(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    // insert blob into the store
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(
+        &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()),
+        &mut writer,
+    )
+    .await
+    .unwrap();
+    assert_eq!(
+        HELLOWORLD_BLOB_DIGEST.clone(),
+        writer.close().await.unwrap()
+    );
+
+    // Test with a root FileNode of a too big size
+    let e = write_nar(
+        sink(),
+        &castorepb::node::Node::File(castorepb::FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: 42, // <- note the wrong size here!
+            executable: false,
+        }),
+        blob_service.clone(),
+        directory_service.clone(),
+    )
+    .await
+    .expect_err("must fail");
+
+    match e {
+        crate::nar::RenderError::NARWriterError(e) => {
+            assert_eq!(io::ErrorKind::UnexpectedEof, e.kind());
+        }
+        _ => panic!("unexpected error: {:?}", e),
+    }
+
+    // Test with a root FileNode of a too small size
+    let e = write_nar(
+        sink(),
+        &castorepb::node::Node::File(castorepb::FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: 2, // <- note the wrong size here!
+            executable: false,
+        }),
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect_err("must fail");
+
+    match e {
+        crate::nar::RenderError::NARWriterError(e) => {
+            assert_eq!(io::ErrorKind::InvalidInput, e.kind());
+        }
+        _ => panic!("unexpected error: {:?}", e),
+    }
+}
+
+#[rstest]
+#[tokio::test]
+async fn single_file(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    // insert blob into the store
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(&mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS), &mut writer)
+        .await
+        .unwrap();
+
+    assert_eq!(
+        HELLOWORLD_BLOB_DIGEST.clone(),
+        writer.close().await.unwrap()
+    );
+
+    let mut buf: Vec<u8> = vec![];
+
+    write_nar(
+        &mut buf,
+        &castorepb::node::Node::File(castorepb::FileNode {
+            name: "doesntmatter".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+            executable: false,
+        }),
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect("must succeed");
+
+    assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec());
+}
+
+#[rstest]
+#[tokio::test]
+async fn test_complicated(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+) {
+    // put all data into the stores.
+    // insert blob into the store
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(&mut io::Cursor::new(EMPTY_BLOB_CONTENTS), &mut writer)
+        .await
+        .unwrap();
+    assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap());
+
+    // insert directories
+    directory_service
+        .put(DIRECTORY_WITH_KEEP.clone())
+        .await
+        .unwrap();
+    directory_service
+        .put(DIRECTORY_COMPLICATED.clone())
+        .await
+        .unwrap();
+
+    let mut buf: Vec<u8> = vec![];
+
+    write_nar(
+        &mut buf,
+        &castorepb::node::Node::Directory(castorepb::DirectoryNode {
+            name: "doesntmatter".into(),
+            digest: DIRECTORY_COMPLICATED.digest().into(),
+            size: DIRECTORY_COMPLICATED.size(),
+        }),
+        blob_service.clone(),
+        directory_service.clone(),
+    )
+    .await
+    .expect("must succeed");
+
+    assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec());
+
+    // ensure calculate_nar does return the correct sha256 digest and sum.
+    let (nar_size, nar_digest) = calculate_size_and_sha256(
+        &castorepb::node::Node::Directory(castorepb::DirectoryNode {
+            name: "doesntmatter".into(),
+            digest: DIRECTORY_COMPLICATED.digest().into(),
+            size: DIRECTORY_COMPLICATED.size(),
+        }),
+        blob_service,
+        directory_service,
+    )
+    .await
+    .expect("must succeed");
+
+    assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size);
+    let d = Sha256::digest(NAR_CONTENTS_COMPLICATED.clone());
+    assert_eq!(d.as_slice(), nar_digest);
+}
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs
new file mode 100644
index 0000000000..e6e42f6ec4
--- /dev/null
+++ b/tvix/store/src/utils.rs
@@ -0,0 +1,78 @@
+use std::sync::Arc;
+use std::{
+    pin::Pin,
+    task::{self, Poll},
+};
+use tokio::io::{self, AsyncWrite};
+
+use tvix_castore::{
+    blobservice::{self, BlobService},
+    directoryservice::{self, DirectoryService},
+};
+
+use crate::nar::{NarCalculationService, SimpleRenderer};
+use crate::pathinfoservice::{self, PathInfoService};
+
+/// Construct the store handles from their addrs.
+pub async fn construct_services(
+    blob_service_addr: impl AsRef<str>,
+    directory_service_addr: impl AsRef<str>,
+    path_info_service_addr: impl AsRef<str>,
+) -> std::io::Result<(
+    Arc<dyn BlobService>,
+    Arc<dyn DirectoryService>,
+    Box<dyn PathInfoService>,
+    Box<dyn NarCalculationService>,
+)> {
+    let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref())
+        .await?
+        .into();
+    let directory_service: Arc<dyn DirectoryService> =
+        directoryservice::from_addr(directory_service_addr.as_ref())
+            .await?
+            .into();
+    let path_info_service = pathinfoservice::from_addr(
+        path_info_service_addr.as_ref(),
+        blob_service.clone(),
+        directory_service.clone(),
+    )
+    .await?;
+
+    // TODO: grpc client also implements NarCalculationService
+    let nar_calculation_service = Box::new(SimpleRenderer::new(
+        blob_service.clone(),
+        directory_service.clone(),
+    )) as Box<dyn NarCalculationService>;
+
+    Ok((
+        blob_service,
+        directory_service,
+        path_info_service,
+        nar_calculation_service,
+    ))
+}
+
+/// The inverse of [tokio_util::io::SyncIoBridge].
+/// Don't use this with anything that actually does blocking I/O.
+pub struct AsyncIoBridge<T>(pub T);
+
+impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        _cx: &mut task::Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        Poll::Ready(self.get_mut().0.write(buf))
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+        Poll::Ready(self.get_mut().0.flush())
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        _cx: &mut task::Context<'_>,
+    ) -> Poll<Result<(), io::Error>> {
+        Poll::Ready(Ok(()))
+    }
+}