about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/bin/tvix-store.rs64
-rw-r--r--tvix/store/src/import.rs15
-rw-r--r--tvix/store/src/nar/import.rs366
-rw-r--r--tvix/store/src/nar/mod.rs28
-rw-r--r--tvix/store/src/nar/renderer.rs51
-rw-r--r--tvix/store/src/pathinfoservice/bigtable.rs31
-rw-r--r--tvix/store/src/pathinfoservice/combinators.rs111
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs10
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs142
-rw-r--r--tvix/store/src/pathinfoservice/lru.rs128
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs67
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs20
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs196
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs169
-rw-r--r--tvix/store/src/pathinfoservice/tests/mod.rs79
-rw-r--r--tvix/store/src/pathinfoservice/tests/utils.rs45
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs25
-rw-r--r--tvix/store/src/utils.rs17
18 files changed, 788 insertions, 776 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 8262a9e98c..906d0ab520 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -19,6 +19,7 @@ use tracing_subscriber::EnvFilter;
 use tracing_subscriber::Layer;
 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
 use tvix_castore::import::fs::ingest_path;
+use tvix_store::nar::NarCalculationService;
 use tvix_store::proto::NarInfo;
 use tvix_store::proto::PathInfo;
 
@@ -56,10 +57,6 @@ use tvix_store::proto::FILE_DESCRIPTOR_SET;
 #[derive(Parser)]
 #[command(author, version, about, long_about = None)]
 struct Cli {
-    /// Whether to log in JSON
-    #[arg(long)]
-    json: bool,
-
     /// Whether to configure OTLP. Set --otlp=false to disable.
     #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
     otlp: bool,
@@ -82,7 +79,11 @@ enum Commands {
         #[arg(long, short = 'l')]
         listen_address: Option<String>,
 
-        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")]
+        #[arg(
+            long,
+            env,
+            default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
+        )]
         blob_service_addr: String,
 
         #[arg(
@@ -218,33 +219,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let level = cli.log_level.unwrap_or(Level::INFO);
 
     // Set up the tracing subscriber.
-    let subscriber = tracing_subscriber::registry()
-        .with(
-            cli.json.then_some(
-                tracing_subscriber::fmt::Layer::new()
-                    .with_writer(std::io::stderr)
-                    .json()
-                    .with_filter(
-                        EnvFilter::builder()
-                            .with_default_directive(level.into())
-                            .from_env()
-                            .expect("invalid RUST_LOG"),
-                    ),
+    let subscriber = tracing_subscriber::registry().with(
+        tracing_subscriber::fmt::Layer::new()
+            .with_writer(std::io::stderr)
+            .compact()
+            .with_filter(
+                EnvFilter::builder()
+                    .with_default_directive(level.into())
+                    .from_env()
+                    .expect("invalid RUST_LOG"),
             ),
-        )
-        .with(
-            (!cli.json).then_some(
-                tracing_subscriber::fmt::Layer::new()
-                    .with_writer(std::io::stderr)
-                    .pretty()
-                    .with_filter(
-                        EnvFilter::builder()
-                            .with_default_directive(level.into())
-                            .from_env()
-                            .expect("invalid RUST_LOG"),
-                    ),
-            ),
-        );
+    );
 
     // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
     // then init the registry.
@@ -302,7 +287,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             path_info_service_addr,
         } => {
             // initialize stores
-            let (blob_service, directory_service, path_info_service) =
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
                 tvix_store::utils::construct_services(
                     blob_service_addr,
                     directory_service_addr,
@@ -327,6 +312,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 ))
                 .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
                     Arc::from(path_info_service),
+                    nar_calculation_service,
                 )));
 
             #[cfg(feature = "tonic-reflection")]
@@ -356,7 +342,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             path_info_service_addr,
         } => {
             // FUTUREWORK: allow flat for single files?
-            let (blob_service, directory_service, path_info_service) =
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
                 tvix_store::utils::construct_services(
                     blob_service_addr,
                     directory_service_addr,
@@ -364,8 +350,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 )
                 .await?;
 
-            // Arc the PathInfoService, as we clone it .
+            // Arc PathInfoService and NarCalculationService, as we clone it .
             let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            let nar_calculation_service: Arc<dyn NarCalculationService> =
+                nar_calculation_service.into();
 
             let tasks = paths
                 .into_iter()
@@ -374,6 +362,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                         let blob_service = blob_service.clone();
                         let directory_service = directory_service.clone();
                         let path_info_service = path_info_service.clone();
+                        let nar_calculation_service = nar_calculation_service.clone();
 
                         async move {
                             if let Ok(name) = tvix_store::import::path_to_name(&path) {
@@ -383,6 +372,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                                     blob_service,
                                     directory_service,
                                     path_info_service,
+                                    nar_calculation_service,
                                 )
                                 .await;
                                 if let Ok(output_path) = resp {
@@ -403,7 +393,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             path_info_service_addr,
             reference_graph_path,
         } => {
-            let (blob_service, directory_service, path_info_service) =
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
                 tvix_store::utils::construct_services(
                     blob_service_addr,
                     directory_service_addr,
@@ -510,7 +500,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             allow_other,
             show_xattr,
         } => {
-            let (blob_service, directory_service, path_info_service) =
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
                 tvix_store::utils::construct_services(
                     blob_service_addr,
                     directory_service_addr,
@@ -552,7 +542,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             list_root,
             show_xattr,
         } => {
-            let (blob_service, directory_service, path_info_service) =
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
                 tvix_store::utils::construct_services(
                     blob_service_addr,
                     directory_service_addr,
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index 7b6aeb824e..888380bca9 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -11,6 +11,7 @@ use nix_compat::{
 };
 
 use crate::{
+    nar::NarCalculationService,
     pathinfoservice::PathInfoService,
     proto::{nar_info, NarInfo, PathInfo},
 };
@@ -104,23 +105,27 @@ pub fn derive_nar_ca_path_info(
 /// Ingest the given path `path` and register the resulting output path in the
 /// [`PathInfoService`] as a recursive fixed output NAR.
 #[instrument(skip_all, fields(store_name=name, path=?path), err)]
-pub async fn import_path_as_nar_ca<BS, DS, PS, P>(
+pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>(
     path: P,
     name: &str,
     blob_service: BS,
     directory_service: DS,
     path_info_service: PS,
+    nar_calculation_service: NS,
 ) -> Result<StorePath, std::io::Error>
 where
     P: AsRef<Path> + std::fmt::Debug,
     BS: BlobService + Clone,
-    DS: AsRef<dyn DirectoryService>,
+    DS: DirectoryService,
     PS: AsRef<dyn PathInfoService>,
+    NS: NarCalculationService,
 {
-    let root_node = ingest_path(blob_service, directory_service, path.as_ref()).await?;
+    let root_node = ingest_path(blob_service, directory_service, path.as_ref())
+        .await
+        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
 
-    // Ask the PathInfoService for the NAR size and sha256
-    let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?;
+    // Ask for the NAR size and sha256
+    let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?;
 
     // Calculate the output path. This might still fail, as some names are illegal.
     // FUTUREWORK: express the `name` at the type level to be valid and move the conversion
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 6f4dcdea5d..36122d419d 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,225 +1,132 @@
-use bytes::Bytes;
-use nix_compat::nar;
-use std::io::{self, BufRead};
-use tokio_util::io::SyncIoBridge;
-use tracing::warn;
+use nix_compat::nar::reader::r#async as nar_reader;
+use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
 use tvix_castore::{
     blobservice::BlobService,
-    directoryservice::{DirectoryPutter, DirectoryService},
-    proto::{self as castorepb},
-    B3Digest,
+    directoryservice::DirectoryService,
+    import::{
+        blobs::{self, ConcurrentBlobUploader},
+        ingest_entries, IngestionEntry, IngestionError,
+    },
+    proto::{node::Node, NamedNode},
+    PathBuf,
 };
 
-/// Accepts a reader providing a NAR.
-/// Will traverse it, uploading blobs to the given [BlobService], and
-/// directories to the given [DirectoryService].
-/// On success, the root node is returned.
-/// This function is not async (because the NAR reader is not)
-/// and calls [tokio::task::block_in_place] when interacting with backing
-/// services, so make sure to only call this with spawn_blocking.
-pub fn read_nar<R, BS, DS>(
-    r: &mut R,
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// It returns the castore root node or an error.
+pub async fn ingest_nar<R, BS, DS>(
     blob_service: BS,
     directory_service: DS,
-) -> io::Result<castorepb::node::Node>
+    r: &mut R,
+) -> Result<Node, IngestionError<Error>>
 where
-    R: BufRead + Send,
-    BS: AsRef<dyn BlobService>,
-    DS: AsRef<dyn DirectoryService>,
+    R: AsyncBufRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
 {
-    let handle = tokio::runtime::Handle::current();
-
-    let directory_putter = directory_service.as_ref().put_multiple_start();
-
-    let node = nix_compat::nar::reader::open(r)?;
-    let (root_node, mut directory_putter, _) = process_node(
-        handle.clone(),
-        "".into(), // this is the root node, it has an empty name
-        node,
-        &blob_service,
-        directory_putter,
-    )?;
-
-    // In case the root node points to a directory, we need to close
-    // [directory_putter], and ensure the digest we got back from there matches
-    // what the root node is pointing to.
-    if let castorepb::node::Node::Directory(ref directory_node) = root_node {
-        // Close directory_putter to make sure all directories have been inserted.
-        let directory_putter_digest =
-            handle.block_on(handle.spawn(async move { directory_putter.close().await }))??;
-        let root_directory_node_digest: B3Digest =
-            directory_node.digest.clone().try_into().unwrap();
-
-        if directory_putter_digest != root_directory_node_digest {
-            warn!(
-                root_directory_node_digest = %root_directory_node_digest,
-                directory_putter_digest =%directory_putter_digest,
-                "directory digest mismatch",
-            );
-            return Err(io::Error::new(
-                io::ErrorKind::Other,
-                "directory digest mismatch",
-            ));
-        }
-    }
-    // In case it's not a Directory, [directory_putter] doesn't need to be
-    // closed (as we didn't end up uploading anything).
-    // It can just be dropped, as documented in its trait.
-
-    Ok(root_node)
-}
+    // open the NAR for reading.
+    // The NAR reader emits nodes in DFS preorder.
+    let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
 
-/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node].
-/// It does so by handling all three kinds, and recursing for directories.
-///
-/// [DirectoryPutter] is passed around, so a single instance of it can be used,
-/// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_node<BS>(
-    handle: tokio::runtime::Handle,
-    name: bytes::Bytes,
-    node: nar::reader::Node,
-    blob_service: BS,
-    directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)>
-where
-    BS: AsRef<dyn BlobService>,
-{
-    Ok(match node {
-        nar::reader::Node::Symlink { target } => (
-            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
-                name,
-                target: target.into(),
-            }),
-            directory_putter,
-            blob_service,
-        ),
-        nar::reader::Node::File { executable, reader } => (
-            castorepb::node::Node::File(process_file_reader(
-                handle,
-                name,
-                reader,
-                executable,
-                &blob_service,
-            )?),
-            directory_putter,
-            blob_service,
-        ),
-        nar::reader::Node::Directory(dir_reader) => {
-            let (directory_node, directory_putter, blob_service_back) =
-                process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
-
-            (
-                castorepb::node::Node::Directory(directory_node),
-                directory_putter,
-                blob_service_back,
-            )
-        }
-    })
-}
-
-/// Given a name and [nar::reader::FileReader], this ingests the file into the
-/// passed [BlobService] and returns a [castorepb::FileNode].
-fn process_file_reader<BS>(
-    handle: tokio::runtime::Handle,
-    name: Bytes,
-    mut file_reader: nar::reader::FileReader,
-    executable: bool,
-    blob_service: BS,
-) -> io::Result<castorepb::FileNode>
-where
-    BS: AsRef<dyn BlobService>,
-{
-    // store the length. If we read any other length, reading will fail.
-    let expected_len = file_reader.len();
+    let (tx, rx) = mpsc::channel(1);
+    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
 
-    // prepare writing a new blob.
-    let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await });
+    let produce = async move {
+        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
 
-    // write the blob.
-    let mut blob_writer = {
-        let mut dst = SyncIoBridge::new(blob_writer);
+        let res = produce_nar_inner(
+            &mut blob_uploader,
+            root_node,
+            "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
+            tx.clone(),
+        )
+        .await;
+
+        if let Err(err) = blob_uploader.join().await {
+            tx.send(Err(err.into()))
+                .await
+                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        }
 
-        file_reader.copy(&mut dst)?;
-        dst.shutdown()?;
+        tx.send(res)
+            .await
+            .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
 
-        // return back the blob_writer
-        dst.into_inner()
+        Ok(())
     };
 
-    // close the blob_writer, retrieve the digest.
-    let blob_digest = handle.block_on(async { blob_writer.close().await })?;
+    let consume = ingest_entries(directory_service, rx);
 
-    Ok(castorepb::FileNode {
-        name,
-        digest: blob_digest.into(),
-        size: expected_len,
-        executable,
-    })
+    let (_, node) = try_join!(produce, consume)?;
+
+    // remove the fake "root" name again
+    debug_assert_eq!(&node.get_name(), b"root");
+    Ok(node.rename("".into()))
 }
 
-/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode].
-/// It uses [process_node] to iterate over all children.
-///
-/// [DirectoryPutter] is passed around, so a single instance of it can be used,
-/// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_dir_reader<BS>(
-    handle: tokio::runtime::Handle,
-    name: Bytes,
-    mut dir_reader: nar::reader::DirReader,
-    blob_service: BS,
-    directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)>
+async fn produce_nar_inner<BS>(
+    blob_uploader: &mut ConcurrentBlobUploader<BS>,
+    node: nar_reader::Node<'_, '_>,
+    path: PathBuf,
+    tx: mpsc::Sender<Result<IngestionEntry, Error>>,
+) -> Result<IngestionEntry, Error>
 where
-    BS: AsRef<dyn BlobService>,
+    BS: BlobService + Clone + 'static,
 {
-    let mut directory = castorepb::Directory::default();
-
-    let mut directory_putter = directory_putter;
-    let mut blob_service = blob_service;
-    while let Some(entry) = dir_reader.next()? {
-        let (node, directory_putter_back, blob_service_back) = process_node(
-            handle.clone(),
-            entry.name.into(),
-            entry.node,
-            blob_service,
-            directory_putter,
-        )?;
-
-        blob_service = blob_service_back;
-        directory_putter = directory_putter_back;
-
-        match node {
-            castorepb::node::Node::Directory(node) => directory.directories.push(node),
-            castorepb::node::Node::File(node) => directory.files.push(node),
-            castorepb::node::Node::Symlink(node) => directory.symlinks.push(node),
+    Ok(match node {
+        nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
+        nar_reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            let size = reader.len();
+            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
+
+            IngestionEntry::Regular {
+                path,
+                size,
+                executable,
+                digest,
+            }
         }
-    }
+        nar_reader::Node::Directory(mut dir_reader) => {
+            while let Some(entry) = dir_reader.next().await? {
+                let mut path = path.clone();
+
+                // valid NAR names are valid castore names
+                path.try_push(entry.name)
+                    .expect("Tvix bug: failed to join name");
+
+                let entry = Box::pin(produce_nar_inner(
+                    blob_uploader,
+                    entry.node,
+                    path,
+                    tx.clone(),
+                ))
+                .await?;
+
+                tx.send(Ok(entry)).await.map_err(|e| {
+                    Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
+                })?;
+            }
+
+            IngestionEntry::Dir { path }
+        }
+    })
+}
 
-    // calculate digest and size.
-    let directory_digest = directory.digest();
-    let directory_size = directory.size();
-
-    // upload the directory. This is a bit more verbose, as we want to get back
-    // directory_putter for later reuse.
-    let directory_putter = handle.block_on(handle.spawn(async move {
-        directory_putter.put(directory).await?;
-        Ok::<_, io::Error>(directory_putter)
-    }))??;
-
-    Ok((
-        castorepb::DirectoryNode {
-            name,
-            digest: directory_digest.into(),
-            size: directory_size,
-        },
-        directory_putter,
-        blob_service,
-    ))
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error(transparent)]
+    IO(#[from] std::io::Error),
+
+    #[error(transparent)]
+    BlobUpload(#[from] blobs::Error),
 }
 
 #[cfg(test)]
 mod test {
-    use crate::nar::read_nar;
+    use crate::nar::ingest_nar;
     use std::io::Cursor;
     use std::sync::Arc;
 
@@ -244,19 +151,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking(|| {
-                read_nar(
-                    &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
-                    blob_service,
-                    directory_service,
-                )
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::Symlink(castorepb::SymlinkNode {
@@ -273,22 +174,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking({
-                let blob_service = blob_service.clone();
-                move || {
-                    read_nar(
-                        &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
-                        blob_service,
-                        directory_service,
-                    )
-                }
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::File(castorepb::FileNode {
@@ -310,23 +202,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking({
-                let blob_service = blob_service.clone();
-                let directory_service = directory_service.clone();
-                || {
-                    read_nar(
-                        &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
-                        blob_service,
-                        directory_service,
-                    )
-                }
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::Directory(castorepb::DirectoryNode {
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
index 49bb92fb0f..164748a655 100644
--- a/tvix/store/src/nar/mod.rs
+++ b/tvix/store/src/nar/mod.rs
@@ -1,10 +1,36 @@
+use tonic::async_trait;
 use tvix_castore::B3Digest;
 
 mod import;
 mod renderer;
-pub use import::read_nar;
+pub use import::ingest_nar;
 pub use renderer::calculate_size_and_sha256;
 pub use renderer::write_nar;
+pub use renderer::SimpleRenderer;
+use tvix_castore::proto as castorepb;
+
+#[async_trait]
+pub trait NarCalculationService: Send + Sync {
+    /// Return the nar size and nar sha256 digest for a given root node.
+    /// This can be used to calculate NAR-based output paths.
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error>;
+}
+
+#[async_trait]
+impl<A> NarCalculationService for A
+where
+    A: AsRef<dyn NarCalculationService> + Send + Sync,
+{
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        self.as_ref().calculate_nar(root_node).await
+    }
+}
 
 /// Errors that can encounter while rendering NARs.
 #[derive(Debug, thiserror::Error)]
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index 0816b8e973..efd67671db 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -1,17 +1,51 @@
 use crate::utils::AsyncIoBridge;
 
-use super::RenderError;
-use async_recursion::async_recursion;
+use super::{NarCalculationService, RenderError};
 use count_write::CountWrite;
 use nix_compat::nar::writer::r#async as nar_writer;
 use sha2::{Digest, Sha256};
 use tokio::io::{self, AsyncWrite, BufReader};
+use tonic::async_trait;
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
     proto::{self as castorepb, NamedNode},
 };
 
+pub struct SimpleRenderer<BS, DS> {
+    blob_service: BS,
+    directory_service: DS,
+}
+
+impl<BS, DS> SimpleRenderer<BS, DS> {
+    pub fn new(blob_service: BS, directory_service: DS) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+        }
+    }
+}
+
+#[async_trait]
+impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS>
+where
+    BS: BlobService + Clone,
+    DS: DirectoryService + Clone,
+{
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        calculate_size_and_sha256(
+            root_node,
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+        )
+        .await
+        .map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e)))
+    }
+}
+
 /// Invoke [write_nar], and return the size and sha256 digest of the produced
 /// NAR output.
 pub async fn calculate_size_and_sha256<BS, DS>(
@@ -72,9 +106,8 @@ where
 
 /// Process an intermediate node in the structure.
 /// This consumes the node.
-#[async_recursion]
 async fn walk_node<BS, DS>(
-    nar_node: nar_writer::Node<'async_recursion, '_>,
+    nar_node: nar_writer::Node<'_, '_>,
     proto_node: &castorepb::node::Node,
     blob_service: BS,
     directory_service: DS,
@@ -164,9 +197,13 @@ where
                             .await
                             .map_err(RenderError::NARWriterError)?;
 
-                        (blob_service, directory_service) =
-                            walk_node(child_node, &proto_node, blob_service, directory_service)
-                                .await?;
+                        (blob_service, directory_service) = Box::pin(walk_node(
+                            child_node,
+                            &proto_node,
+                            blob_service,
+                            directory_service,
+                        ))
+                        .await?;
                     }
 
                     // close the directory
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs
index f49ef475eb..707a686c0a 100644
--- a/tvix/store/src/pathinfoservice/bigtable.rs
+++ b/tvix/store/src/pathinfoservice/bigtable.rs
@@ -6,12 +6,12 @@ use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
 use bytes::Bytes;
 use data_encoding::HEXLOWER;
 use futures::stream::BoxStream;
+use nix_compat::nixbase32;
 use prost::Message;
 use serde::{Deserialize, Serialize};
 use serde_with::{serde_as, DurationSeconds};
 use tonic::async_trait;
-use tracing::trace;
-use tvix_castore::proto as castorepb;
+use tracing::{instrument, trace};
 use tvix_castore::Error;
 
 /// There should not be more than 10 MiB in a single cell.
@@ -67,6 +67,22 @@ pub struct BigtableParameters {
     app_profile_id: String,
 }
 
+impl BigtableParameters {
+    #[cfg(test)]
+    pub fn default_for_tests() -> Self {
+        Self {
+            project_id: "project-1".into(),
+            instance_name: "instance-1".into(),
+            is_read_only: false,
+            channel_size: default_channel_size(),
+            timeout: default_timeout(),
+            table_name: "table-1".into(),
+            family_name: "cf1".into(),
+            app_profile_id: default_app_profile_id(),
+        }
+    }
+}
+
 fn default_app_profile_id() -> String {
     "default".to_owned()
 }
@@ -116,7 +132,7 @@ impl BigtablePathInfoService {
             .stdout(Stdio::piped())
             .kill_on_drop(true)
             .spawn()
-            .expect("failed to spwan emulator");
+            .expect("failed to spawn emulator");
 
         Retry::spawn(
             ExponentialBackoff::from_millis(20)
@@ -182,6 +198,7 @@ fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
 
 #[async_trait]
 impl PathInfoService for BigtablePathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         let mut client = self.client.clone();
         let path_info_key = derive_pathinfo_key(&digest);
@@ -278,6 +295,7 @@ impl PathInfoService for BigtablePathInfoService {
         Ok(Some(path_info))
     }
 
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
         let store_path = path_info
             .validate()
@@ -330,13 +348,6 @@ impl PathInfoService for BigtablePathInfoService {
         Ok(path_info)
     }
 
-    async fn calculate_nar(
-        &self,
-        _root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        return Err(Error::StorageError("unimplemented".into()));
-    }
-
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         let mut client = self.client.clone();
 
diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs
new file mode 100644
index 0000000000..664144ef49
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/combinators.rs
@@ -0,0 +1,111 @@
+use crate::proto::PathInfo;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use tonic::async_trait;
+use tracing::{debug, instrument};
+use tvix_castore::Error;
+
+use super::PathInfoService;
+
+/// Asks near first, if not found, asks far.
+/// If found in there, returns it, and *inserts* it into
+/// near.
+/// There is no negative cache.
+/// Inserts and listings are not implemented for now.
+pub struct Cache<PS1, PS2> {
+    near: PS1,
+    far: PS2,
+}
+
+impl<PS1, PS2> Cache<PS1, PS2> {
+    pub fn new(near: PS1, far: PS2) -> Self {
+        Self { near, far }
+    }
+}
+
+#[async_trait]
+impl<PS1, PS2> PathInfoService for Cache<PS1, PS2>
+where
+    PS1: PathInfoService,
+    PS2: PathInfoService,
+{
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        match self.near.get(digest).await? {
+            Some(path_info) => {
+                debug!("serving from cache");
+                Ok(Some(path_info))
+            }
+            None => {
+                debug!("not found in near, asking remote…");
+                match self.far.get(digest).await? {
+                    None => Ok(None),
+                    Some(path_info) => {
+                        debug!("found in remote, adding to cache");
+                        self.near.put(path_info.clone()).await?;
+                        Ok(Some(path_info))
+                    }
+                }
+            }
+        }
+    }
+
+    async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> {
+        Err(Error::StorageError("unimplemented".to_string()))
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        Box::pin(tokio_stream::once(Err(Error::StorageError(
+            "unimplemented".to_string(),
+        ))))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::num::NonZeroUsize;
+
+    use crate::{
+        pathinfoservice::{LruPathInfoService, MemoryPathInfoService, PathInfoService},
+        tests::fixtures::PATH_INFO_WITH_NARINFO,
+    };
+
+    const PATH_INFO_DIGEST: [u8; 20] = [0; 20];
+
+    /// Helper function setting up an instance of a "far" and "near"
+    /// PathInfoService.
+    async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, MemoryPathInfoService> {
+        // Create an instance of a "far" PathInfoService.
+        let far = MemoryPathInfoService::default();
+
+        // … and an instance of a "near" PathInfoService.
+        let near = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap());
+
+        // create a Pathinfoservice combining the two and return it.
+        super::Cache::new(near, far)
+    }
+
+    /// Getting from the far backend is gonna insert it into the near one.
+    #[tokio::test]
+    async fn test_populate_cache() {
+        let svc = create_pathinfoservice().await;
+
+        // query the PathInfo, things should not be there.
+        assert!(svc.get(PATH_INFO_DIGEST).await.unwrap().is_none());
+
+        // insert it into the far one.
+        svc.far.put(PATH_INFO_WITH_NARINFO.clone()).await.unwrap();
+
+        // now try getting it again, it should succeed.
+        assert_eq!(
+            Some(PATH_INFO_WITH_NARINFO.clone()),
+            svc.get(PATH_INFO_DIGEST).await.unwrap()
+        );
+
+        // peek near, it should now be there.
+        assert_eq!(
+            Some(PATH_INFO_WITH_NARINFO.clone()),
+            svc.near.get(PATH_INFO_DIGEST).await.unwrap()
+        );
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 1ff822ad35..455909e7f2 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -47,7 +47,7 @@ pub async fn from_addr(
             if url.has_host() || !url.path().is_empty() {
                 return Err(Error::StorageError("invalid url".to_string()));
             }
-            Box::new(MemoryPathInfoService::new(blob_service, directory_service))
+            Box::<MemoryPathInfoService>::default()
         }
         "sled" => {
             // sled doesn't support host, and a path can be provided (otherwise
@@ -65,10 +65,10 @@ pub async fn from_addr(
             // TODO: expose other parameters as URL parameters?
 
             Box::new(if url.path().is_empty() {
-                SledPathInfoService::new_temporary(blob_service, directory_service)
+                SledPathInfoService::new_temporary()
                     .map_err(|e| Error::StorageError(e.to_string()))?
             } else {
-                SledPathInfoService::new(url.path(), blob_service, directory_service)
+                SledPathInfoService::new(url.path())
                     .map_err(|e| Error::StorageError(e.to_string()))?
             })
         }
@@ -208,7 +208,7 @@ mod tests {
     #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
     /// A valid example for Bigtable.
     #[cfg_attr(
-        feature = "cloud",
+        all(feature = "cloud", feature = "integration"),
         case::bigtable_valid(
             "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1",
             true
@@ -216,7 +216,7 @@ mod tests {
     )]
     /// An invalid example for Bigtable, missing fields
     #[cfg_attr(
-        feature = "cloud",
+        all(feature = "cloud", feature = "integration"),
         case::bigtable_invalid_missing_fields("bigtable://instance-1", false)
     )]
     #[tokio::test]
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 1138ebdc19..f6a356cf18 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -1,8 +1,11 @@
 use super::PathInfoService;
-use crate::proto::{self, ListPathInfoRequest, PathInfo};
+use crate::{
+    nar::NarCalculationService,
+    proto::{self, ListPathInfoRequest, PathInfo},
+};
 use async_stream::try_stream;
-use data_encoding::BASE64;
 use futures::stream::BoxStream;
+use nix_compat::nixbase32;
 use tonic::{async_trait, transport::Channel, Code};
 use tracing::instrument;
 use tvix_castore::{proto as castorepb, Error};
@@ -27,7 +30,7 @@ impl GRPCPathInfoService {
 
 #[async_trait]
 impl PathInfoService for GRPCPathInfoService {
-    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         let path_info = self
             .grpc_client
@@ -67,30 +70,6 @@ impl PathInfoService for GRPCPathInfoService {
         Ok(path_info)
     }
 
-    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        let path_info = self
-            .grpc_client
-            .clone()
-            .calculate_nar(castorepb::Node {
-                node: Some(root_node.clone()),
-            })
-            .await
-            .map_err(|e| Error::StorageError(e.to_string()))?
-            .into_inner();
-
-        let nar_sha256: [u8; 32] = path_info
-            .nar_sha256
-            .to_vec()
-            .try_into()
-            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
-
-        Ok((path_info.nar_size, nar_sha256))
-    }
-
     #[instrument(level = "trace", skip_all)]
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         let mut grpc_client = self.grpc_client.clone();
@@ -126,87 +105,46 @@ impl PathInfoService for GRPCPathInfoService {
     }
 }
 
+#[async_trait]
+impl NarCalculationService for GRPCPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        let path_info = self
+            .grpc_client
+            .clone()
+            .calculate_nar(castorepb::Node {
+                node: Some(root_node.clone()),
+            })
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .into_inner();
+
+        let nar_sha256: [u8; 32] = path_info
+            .nar_sha256
+            .to_vec()
+            .try_into()
+            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
+
+        Ok((path_info.nar_size, nar_sha256))
+    }
+}
+
 #[cfg(test)]
 mod tests {
-    use std::sync::Arc;
-    use std::time::Duration;
-
-    use rstest::*;
-    use tempfile::TempDir;
-    use tokio::net::UnixListener;
-    use tokio_retry::strategy::ExponentialBackoff;
-    use tokio_retry::Retry;
-    use tokio_stream::wrappers::UnixListenerStream;
-    use tvix_castore::blobservice::BlobService;
-    use tvix_castore::directoryservice::DirectoryService;
-
-    use crate::pathinfoservice::MemoryPathInfoService;
-    use crate::proto::path_info_service_client::PathInfoServiceClient;
-    use crate::proto::GRPCPathInfoServiceWrapper;
-    use crate::tests::fixtures::{self, blob_service, directory_service};
-
-    use super::GRPCPathInfoService;
-    use super::PathInfoService;
+    use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
+    use crate::pathinfoservice::PathInfoService;
+    use crate::tests::fixtures;
 
     /// This ensures connecting via gRPC works as expected.
-    #[rstest]
     #[tokio::test]
-    async fn test_valid_unix_path_ping_pong(
-        blob_service: Arc<dyn BlobService>,
-        directory_service: Arc<dyn DirectoryService>,
-    ) {
-        let tmpdir = TempDir::new().unwrap();
-        let socket_path = tmpdir.path().join("daemon");
-
-        let path_clone = socket_path.clone();
-
-        // Spin up a server
-        tokio::spawn(async {
-            let uds = UnixListener::bind(path_clone).unwrap();
-            let uds_stream = UnixListenerStream::new(uds);
-
-            // spin up a new server
-            let mut server = tonic::transport::Server::builder();
-            let router = server.add_service(
-                crate::proto::path_info_service_server::PathInfoServiceServer::new(
-                    GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new(
-                        blob_service,
-                        directory_service,
-                    ))
-                        as Box<dyn PathInfoService>),
-                ),
-            );
-            router.serve_with_incoming(uds_stream).await
-        });
-
-        // wait for the socket to be created
-        Retry::spawn(
-            ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
-            || async {
-                if socket_path.exists() {
-                    Ok(())
-                } else {
-                    Err(())
-                }
-            },
-        )
-        .await
-        .expect("failed to wait for socket");
-
-        // prepare a client
-        let grpc_client = {
-            let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display()))
-                .expect("must parse");
-            let client = PathInfoServiceClient::new(
-                tvix_castore::tonic::channel_from_url(&url)
-                    .await
-                    .expect("must succeed"),
-            );
-
-            GRPCPathInfoService::from_client(client)
-        };
+    async fn test_valid_unix_path_ping_pong() {
+        let (_blob_service, _directory_service, path_info_service) =
+            make_grpc_path_info_service_client().await;
 
-        let path_info = grpc_client
+        let path_info = path_info_service
             .get(fixtures::DUMMY_PATH_DIGEST)
             .await
             .expect("must not be error");
diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs
new file mode 100644
index 0000000000..da674f497a
--- /dev/null
+++ b/tvix/store/src/pathinfoservice/lru.rs
@@ -0,0 +1,128 @@
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use lru::LruCache;
+use nix_compat::nixbase32;
+use std::num::NonZeroUsize;
+use std::sync::Arc;
+use tokio::sync::RwLock;
+use tonic::async_trait;
+use tracing::instrument;
+
+use crate::proto::PathInfo;
+use tvix_castore::Error;
+
+use super::PathInfoService;
+
+pub struct LruPathInfoService {
+    lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>,
+}
+
+impl LruPathInfoService {
+    pub fn with_capacity(capacity: NonZeroUsize) -> Self {
+        Self {
+            lru: Arc::new(RwLock::new(LruCache::new(capacity))),
+        }
+    }
+}
+
+#[async_trait]
+impl PathInfoService for LruPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
+    async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
+        Ok(self.lru.write().await.get(&digest).cloned())
+    }
+
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
+    async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
+        // call validate
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("invalid PathInfo: {}", e)))?;
+
+        self.lru
+            .write()
+            .await
+            .put(*store_path.digest(), path_info.clone());
+
+        Ok(path_info)
+    }
+
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
+        let lru = self.lru.clone();
+        Box::pin(try_stream! {
+            let lru = lru.read().await;
+            let it = lru.iter();
+
+            for (_k,v) in it {
+                yield v.clone()
+            }
+        })
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::num::NonZeroUsize;
+
+    use crate::{
+        pathinfoservice::{LruPathInfoService, PathInfoService},
+        proto::PathInfo,
+        tests::fixtures::PATH_INFO_WITH_NARINFO,
+    };
+    use lazy_static::lazy_static;
+    use tvix_castore::proto as castorepb;
+
+    lazy_static! {
+        static ref PATHINFO_1: PathInfo = PATH_INFO_WITH_NARINFO.clone();
+        static ref PATHINFO_1_DIGEST: [u8; 20] = [0; 20];
+        static ref PATHINFO_2: PathInfo = {
+            let mut p = PATHINFO_1.clone();
+            let root_node = p.node.as_mut().unwrap();
+            if let castorepb::Node { node: Some(node) } = root_node {
+                let n = node.to_owned();
+                *node = n.rename("11111111111111111111111111111111-dummy2".into());
+            } else {
+                unreachable!()
+            }
+            p
+        };
+        static ref PATHINFO_2_DIGEST: [u8; 20] = *(PATHINFO_2.validate().unwrap()).digest();
+    }
+
+    #[tokio::test]
+    async fn evict() {
+        let svc = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap());
+
+        // pathinfo_1 should not be there
+        assert!(svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .is_none());
+
+        // insert it
+        svc.put(PATHINFO_1.clone()).await.expect("no error");
+
+        // now it should be there.
+        assert_eq!(
+            Some(PATHINFO_1.clone()),
+            svc.get(*PATHINFO_1_DIGEST).await.expect("no error")
+        );
+
+        // insert pathinfo_2. This will evict pathinfo 1
+        svc.put(PATHINFO_2.clone()).await.expect("no error");
+
+        // now pathinfo 2 should be there.
+        assert_eq!(
+            Some(PATHINFO_2.clone()),
+            svc.get(*PATHINFO_2_DIGEST).await.expect("no error")
+        );
+
+        // … but pathinfo 1 not anymore.
+        assert!(svc
+            .get(*PATHINFO_1_DIGEST)
+            .await
+            .expect("no error")
+            .is_none());
+    }
+}
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
index f8435dbbf8..3de3221df2 100644
--- a/tvix/store/src/pathinfoservice/memory.rs
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -1,40 +1,24 @@
 use super::PathInfoService;
-use crate::{nar::calculate_size_and_sha256, proto::PathInfo};
-use futures::stream::{iter, BoxStream};
-use std::{
-    collections::HashMap,
-    sync::{Arc, RwLock},
-};
+use crate::proto::PathInfo;
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use nix_compat::nixbase32;
+use std::{collections::HashMap, sync::Arc};
+use tokio::sync::RwLock;
 use tonic::async_trait;
-use tvix_castore::proto as castorepb;
+use tracing::instrument;
 use tvix_castore::Error;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 
-pub struct MemoryPathInfoService<BS, DS> {
+#[derive(Default)]
+pub struct MemoryPathInfoService {
     db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>,
-
-    blob_service: BS,
-    directory_service: DS,
-}
-
-impl<BS, DS> MemoryPathInfoService<BS, DS> {
-    pub fn new(blob_service: BS, directory_service: DS) -> Self {
-        Self {
-            db: Default::default(),
-            blob_service,
-            directory_service,
-        }
-    }
 }
 
 #[async_trait]
-impl<BS, DS> PathInfoService for MemoryPathInfoService<BS, DS>
-where
-    BS: AsRef<dyn BlobService> + Send + Sync,
-    DS: AsRef<dyn DirectoryService> + Send + Sync,
-{
+impl PathInfoService for MemoryPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
-        let db = self.db.read().unwrap();
+        let db = self.db.read().await;
 
         match db.get(&digest) {
             None => Ok(None),
@@ -42,6 +26,7 @@ where
         }
     }
 
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
         // Call validate on the received PathInfo message.
         match path_info.validate() {
@@ -53,7 +38,7 @@ where
             // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
             // This overwrites existing PathInfo objects.
             Ok(nix_path) => {
-                let mut db = self.db.write().unwrap();
+                let mut db = self.db.write().await;
                 db.insert(*nix_path.digest(), path_info.clone());
 
                 Ok(path_info)
@@ -61,24 +46,16 @@ where
         }
     }
 
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
-            .await
-            .map_err(|e| Error::StorageError(e.to_string()))
-    }
-
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
-        let db = self.db.read().unwrap();
+        let db = self.db.clone();
 
-        // Copy all elements into a list.
-        // This is a bit ugly, because we can't have db escape the lifetime
-        // of this function, but elements need to be returned owned anyways, and this in-
-        // memory impl is only for testing purposes anyways.
-        let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect();
+        Box::pin(try_stream! {
+            let db = db.read().await;
+            let it = db.iter();
 
-        Box::pin(iter(items))
+            for (_k, v) in it {
+                yield v.clone()
+            }
+        })
     }
 }
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index c1a482bbb5..574bcc0b8b 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -1,5 +1,7 @@
+mod combinators;
 mod from_addr;
 mod grpc;
+mod lru;
 mod memory;
 mod nix_http;
 mod sled;
@@ -12,13 +14,14 @@ mod tests;
 
 use futures::stream::BoxStream;
 use tonic::async_trait;
-use tvix_castore::proto as castorepb;
 use tvix_castore::Error;
 
 use crate::proto::PathInfo;
 
+pub use self::combinators::Cache as CachePathInfoService;
 pub use self::from_addr::from_addr;
 pub use self::grpc::GRPCPathInfoService;
+pub use self::lru::LruPathInfoService;
 pub use self::memory::MemoryPathInfoService;
 pub use self::nix_http::NixHTTPPathInfoService;
 pub use self::sled::SledPathInfoService;
@@ -41,14 +44,6 @@ pub trait PathInfoService: Send + Sync {
     /// invalid messages.
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>;
 
-    /// Return the nar size and nar sha256 digest for a given root node.
-    /// This can be used to calculate NAR-based output paths,
-    /// and implementations are encouraged to cache it.
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error>;
-
     /// Iterate over all PathInfo objects in the store.
     /// Implementations can decide to disallow listing.
     ///
@@ -72,13 +67,6 @@ where
         self.as_ref().put(path_info).await
     }
 
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        self.as_ref().calculate_nar(root_node).await
-    }
-
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         self.as_ref().list()
     }
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index bdb0e2c3cb..cccd4805c6 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -1,6 +1,3 @@
-use std::io::{self, BufRead, Read, Write};
-
-use data_encoding::BASE64;
 use futures::{stream::BoxStream, TryStreamExt};
 use nix_compat::{
     narinfo::{self, NarInfo},
@@ -8,7 +5,10 @@ use nix_compat::{
     nixhash::NixHash,
 };
 use reqwest::StatusCode;
-use sha2::{digest::FixedOutput, Digest, Sha256};
+use sha2::Digest;
+use std::io::{self, Write};
+use tokio::io::{AsyncRead, BufReader};
+use tokio_util::io::InspectReader;
 use tonic::async_trait;
 use tracing::{debug, instrument, warn};
 use tvix_castore::{
@@ -32,8 +32,7 @@ use super::PathInfoService;
 ///
 /// The client is expected to be (indirectly) using the same [BlobService] and
 /// [DirectoryService], so able to fetch referred Directories and Blobs.
-/// [PathInfoService::put] and [PathInfoService::calculate_nar] are not
-/// implemented and return an error if called.
+/// [PathInfoService::put] is not implemented and returns an error if called.
 /// TODO: what about reading from nix-cache-info?
 pub struct NixHTTPPathInfoService<BS, DS> {
     base_url: url::Url,
@@ -71,7 +70,7 @@ where
     BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
     DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
 {
-    #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))]
+    #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         let narinfo_url = self
             .base_url
@@ -171,85 +170,83 @@ where
             )));
         }
 
-        // get an AsyncRead of the response body.
-        let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
+        // get a reader of the response body.
+        let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
             let e = e.without_url();
             warn!(e=%e, "failed to get response body");
             io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
         }));
-        let sync_r = tokio_util::io::SyncIoBridge::new(async_r);
 
-        // handle decompression, by wrapping the reader.
-        let sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
-            Some("none") => Box::new(sync_r),
-            Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
-            Some(comp) => {
-                return Err(Error::InvalidRequest(
-                    format!("unsupported compression: {}", comp).to_string(),
-                ))
-            }
-            None => {
-                return Err(Error::InvalidRequest(
-                    "unsupported compression: bzip2".to_string(),
-                ))
+        // handle decompression, depending on the compression field.
+        let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
+            Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
+            Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some(comp_str) => {
+                return Err(Error::StorageError(format!(
+                    "unsupported compression: {comp_str}"
+                )));
             }
         };
-
-        let res = tokio::task::spawn_blocking({
-            let blob_service = self.blob_service.clone();
-            let directory_service = self.directory_service.clone();
-            move || -> io::Result<_> {
-                // Wrap the reader once more, so we can calculate NarSize and NarHash
-                let mut sync_r = io::BufReader::new(NarReader::from(sync_r));
-                let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?;
-
-                let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner();
-
-                Ok((root_node, nar_hash, nar_size))
-            }
-        })
+        let mut nar_hash = sha2::Sha256::new();
+        let mut nar_size = 0;
+
+        // Assemble NarHash and NarSize as we read bytes.
+        let r = InspectReader::new(r, |b| {
+            nar_size += b.len() as u64;
+            nar_hash.write_all(b).unwrap();
+        });
+
+        // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors.
+        let mut r = BufReader::new(r);
+
+        let root_node = crate::nar::ingest_nar(
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+            &mut r,
+        )
         .await
-        .unwrap();
-
-        match res {
-            Ok((root_node, nar_hash, nar_size)) => {
-                // ensure the ingested narhash and narsize do actually match.
-                if narinfo.nar_size != nar_size {
-                    warn!(
-                        narinfo.nar_size = narinfo.nar_size,
-                        http.nar_size = nar_size,
-                        "NARSize mismatch"
-                    );
-                    Err(io::Error::new(
-                        io::ErrorKind::InvalidData,
-                        "NarSize mismatch".to_string(),
-                    ))?;
-                }
-                if narinfo.nar_hash != nar_hash {
-                    warn!(
-                        narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
-                        http.nar_hash = %NixHash::Sha256(nar_hash),
-                        "NarHash mismatch"
-                    );
-                    Err(io::Error::new(
-                        io::ErrorKind::InvalidData,
-                        "NarHash mismatch".to_string(),
-                    ))?;
-                }
-
-                Ok(Some(PathInfo {
-                    node: Some(castorepb::Node {
-                        // set the name of the root node to the digest-name of the store path.
-                        node: Some(
-                            root_node.rename(narinfo.store_path.to_string().to_owned().into()),
-                        ),
-                    }),
-                    references: pathinfo.references,
-                    narinfo: pathinfo.narinfo,
-                }))
-            }
-            Err(e) => Err(e.into()),
+        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
+
+        // ensure the ingested narhash and narsize do actually match.
+        if narinfo.nar_size != nar_size {
+            warn!(
+                narinfo.nar_size = narinfo.nar_size,
+                http.nar_size = nar_size,
+                "NarSize mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarSize mismatch".to_string(),
+            ))?;
+        }
+        let nar_hash: [u8; 32] = nar_hash.finalize().into();
+        if narinfo.nar_hash != nar_hash {
+            warn!(
+                narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
+                http.nar_hash = %NixHash::Sha256(nar_hash),
+                "NarHash mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarHash mismatch".to_string(),
+            ))?;
         }
+
+        Ok(Some(PathInfo {
+            node: Some(castorepb::Node {
+                // set the name of the root node to the digest-name of the store path.
+                node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
+            }),
+            references: pathinfo.references,
+            narinfo: pathinfo.narinfo,
+        }))
     }
 
     #[instrument(skip_all, fields(path_info=?_path_info))]
@@ -259,16 +256,6 @@ where
         ))
     }
 
-    #[instrument(skip_all, fields(root_node=?root_node))]
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        Err(Error::InvalidRequest(
-            "calculate_nar not supported for this backend".to_string(),
-        ))
-    }
-
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         Box::pin(futures::stream::once(async {
             Err(Error::InvalidRequest(
@@ -277,38 +264,3 @@ where
         }))
     }
 }
-
-/// Small helper reader implementing [std::io::Read].
-/// It can be used to wrap another reader, counts the number of bytes read
-/// and the sha256 digest of the contents.
-struct NarReader<R: Read> {
-    r: R,
-
-    sha256: sha2::Sha256,
-    bytes_read: u64,
-}
-
-impl<R: Read> NarReader<R> {
-    pub fn from(inner: R) -> Self {
-        Self {
-            r: inner,
-            sha256: Sha256::new(),
-            bytes_read: 0,
-        }
-    }
-
-    /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read.
-    pub fn into_inner(self) -> (R, [u8; 32], u64) {
-        (self.r, self.sha256.finalize_fixed().into(), self.bytes_read)
-    }
-}
-
-impl<R: Read> Read for NarReader<R> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        self.r.read(buf).map(|n| {
-            self.bytes_read += n as u64;
-            self.sha256.write_all(&buf[..n]).unwrap();
-            n
-        })
-    }
-}
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index 7b6d7fd7ab..eb3cf2ff1b 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -1,140 +1,117 @@
 use super::PathInfoService;
-use crate::nar::calculate_size_and_sha256;
 use crate::proto::PathInfo;
-use futures::stream::iter;
+use async_stream::try_stream;
 use futures::stream::BoxStream;
+use nix_compat::nixbase32;
 use prost::Message;
 use std::path::Path;
 use tonic::async_trait;
+use tracing::instrument;
 use tracing::warn;
-use tvix_castore::proto as castorepb;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+use tvix_castore::Error;
 
 /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
 ///
 /// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
 /// as that's currently the only request type available.
-pub struct SledPathInfoService<BS, DS> {
+pub struct SledPathInfoService {
     db: sled::Db,
-
-    blob_service: BS,
-    directory_service: DS,
 }
 
-impl<BS, DS> SledPathInfoService<BS, DS> {
-    pub fn new<P: AsRef<Path>>(
-        p: P,
-        blob_service: BS,
-        directory_service: DS,
-    ) -> Result<Self, sled::Error> {
+impl SledPathInfoService {
+    pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
         let config = sled::Config::default()
             .use_compression(false) // is a required parameter
             .path(p);
         let db = config.open()?;
 
-        Ok(Self {
-            db,
-            blob_service,
-            directory_service,
-        })
+        Ok(Self { db })
     }
 
-    pub fn new_temporary(blob_service: BS, directory_service: DS) -> Result<Self, sled::Error> {
+    pub fn new_temporary() -> Result<Self, sled::Error> {
         let config = sled::Config::default().temporary(true);
         let db = config.open()?;
 
-        Ok(Self {
-            db,
-            blob_service,
-            directory_service,
-        })
+        Ok(Self { db })
     }
 }
 
 #[async_trait]
-impl<BS, DS> PathInfoService for SledPathInfoService<BS, DS>
-where
-    BS: AsRef<dyn BlobService> + Send + Sync,
-    DS: AsRef<dyn DirectoryService> + Send + Sync,
-{
+impl PathInfoService for SledPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(None),
-            Ok(Some(data)) => match PathInfo::decode(&*data) {
-                Ok(path_info) => Ok(Some(path_info)),
-                Err(e) => {
+        let resp = tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            move || db.get(digest.as_slice())
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to retrieve PathInfo: {}", e);
+            Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+        })?;
+        match resp {
+            None => Ok(None),
+            Some(data) => {
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
                     warn!("failed to decode stored PathInfo: {}", e);
-                    Err(Error::StorageError(format!(
-                        "failed to decode stored PathInfo: {}",
-                        e
-                    )))
-                }
-            },
-            Err(e) => {
-                warn!("failed to retrieve PathInfo: {}", e);
-                Err(Error::StorageError(format!(
-                    "failed to retrieve PathInfo: {}",
-                    e
-                )))
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+                Ok(Some(path_info))
             }
         }
     }
 
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
         // Call validate on the received PathInfo message.
-        match path_info.validate() {
-            Err(e) => Err(Error::InvalidRequest(format!(
-                "failed to validate PathInfo: {}",
-                e
-            ))),
-            // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
-            // This overwrites existing PathInfo objects.
-            Ok(nix_path) => match self
-                .db
-                .insert(*nix_path.digest(), path_info.encode_to_vec())
-            {
-                Ok(_) => Ok(path_info),
-                Err(e) => {
-                    warn!("failed to insert PathInfo: {}", e);
-                    Err(Error::StorageError(format! {
-                        "failed to insert PathInfo: {}", e
-                    }))
-                }
-            },
-        }
-    }
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?;
+
+        // In case the PathInfo is valid, we were able to parse a StorePath.
+        // Store it in the database, keyed by its digest.
+        // This overwrites existing PathInfo objects.
+        tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            let k = *store_path.digest();
+            let data = path_info.encode_to_vec();
+            move || db.insert(k, data)
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to insert PathInfo: {}", e);
+            Error::StorageError(format! {
+                "failed to insert PathInfo: {}", e
+            })
+        })?;
 
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
-            .await
-            .map_err(|e| Error::StorageError(e.to_string()))
+        Ok(path_info)
     }
 
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
-        Box::pin(iter(self.db.iter().values().map(|v| match v {
-            Ok(data) => {
-                // we retrieved some bytes
-                match PathInfo::decode(&*data) {
-                    Ok(path_info) => Ok(path_info),
-                    Err(e) => {
-                        warn!("failed to decode stored PathInfo: {}", e);
-                        Err(Error::StorageError(format!(
-                            "failed to decode stored PathInfo: {}",
-                            e
-                        )))
-                    }
-                }
-            }
-            Err(e) => {
-                warn!("failed to retrieve PathInfo: {}", e);
-                Err(Error::StorageError(format!(
-                    "failed to retrieve PathInfo: {}",
-                    e
-                )))
+        let db = self.db.clone();
+        let mut it = db.iter().values();
+
+        Box::pin(try_stream! {
+            // Don't block the executor while waiting for .next(), so wrap that
+            // in a spawn_blocking call.
+            // We need to pass around it to be able to reuse it.
+            while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || {
+                (it.next(), it)
+            }).await? {
+                it = new_it;
+                let data = elem.map_err(|e| {
+                    warn!("failed to retrieve PathInfo: {}", e);
+                    Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+                })?;
+
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
+                    warn!("failed to decode stored PathInfo: {}", e);
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+
+                yield path_info
             }
-        })))
+        })
     }
 }
diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs
index c9b9a06377..061655e4ba 100644
--- a/tvix/store/src/pathinfoservice/tests/mod.rs
+++ b/tvix/store/src/pathinfoservice/tests/mod.rs
@@ -4,62 +4,30 @@
 
 use rstest::*;
 use rstest_reuse::{self, *};
-use std::sync::Arc;
-use tvix_castore::proto as castorepb;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 
 use super::PathInfoService;
+use crate::pathinfoservice::MemoryPathInfoService;
+use crate::pathinfoservice::SledPathInfoService;
 use crate::proto::PathInfo;
 use crate::tests::fixtures::DUMMY_PATH_DIGEST;
+use tvix_castore::proto as castorepb;
 
 mod utils;
-use self::utils::make_grpc_path_info_service_client;
-
-/// Convenience type alias batching all three servives together.
-#[allow(clippy::upper_case_acronyms)]
-type BSDSPS = (
-    Arc<dyn BlobService>,
-    Arc<dyn DirectoryService>,
-    Box<dyn PathInfoService>,
-);
+pub use self::utils::make_grpc_path_info_service_client;
 
-/// Creates a PathInfoService using a new Memory{Blob,Directory}Service.
-/// We return a 3-tuple containing all of them, as some tests want to interact
-/// with all three.
-pub async fn make_path_info_service(uri: &str) -> BSDSPS {
-    let blob_service: Arc<dyn BlobService> = tvix_castore::blobservice::from_addr("memory://")
-        .await
-        .unwrap()
-        .into();
-    let directory_service: Arc<dyn DirectoryService> =
-        tvix_castore::directoryservice::from_addr("memory://")
-            .await
-            .unwrap()
-            .into();
-
-    (
-        blob_service.clone(),
-        directory_service.clone(),
-        crate::pathinfoservice::from_addr(uri, blob_service, directory_service)
-            .await
-            .unwrap(),
-    )
-}
+#[cfg(all(feature = "cloud", feature = "integration"))]
+use self::utils::make_bigtable_path_info_service;
 
 #[template]
 #[rstest]
-#[case::memory(make_path_info_service("memory://").await)]
-#[case::grpc(make_grpc_path_info_service_client().await)]
-#[case::sled(make_path_info_service("sled://").await)]
-#[cfg_attr(feature = "cloud", case::bigtable(make_path_info_service("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await))]
-pub fn path_info_services(
-    #[case] services: (
-        impl BlobService,
-        impl DirectoryService,
-        impl PathInfoService,
-    ),
-) {
-}
+#[case::memory(MemoryPathInfoService::default())]
+#[case::grpc({
+    let (_, _, svc) = make_grpc_path_info_service_client().await;
+    svc
+})]
+#[case::sled(SledPathInfoService::new_temporary().unwrap())]
+#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))]
+pub fn path_info_services(#[case] svc: impl PathInfoService) {}
 
 // FUTUREWORK: add more tests rejecting invalid PathInfo messages.
 // A subset of them should also ensure references to other PathInfos, or
@@ -68,9 +36,8 @@ pub fn path_info_services(
 /// Trying to get a non-existent PathInfo should return Ok(None).
 #[apply(path_info_services)]
 #[tokio::test]
-async fn not_found(services: BSDSPS) {
-    let (_, _, path_info_service) = services;
-    assert!(path_info_service
+async fn not_found(svc: impl PathInfoService) {
+    assert!(svc
         .get(DUMMY_PATH_DIGEST)
         .await
         .expect("must succeed")
@@ -80,9 +47,7 @@ async fn not_found(services: BSDSPS) {
 /// Put a PathInfo into the store, get it back.
 #[apply(path_info_services)]
 #[tokio::test]
-async fn put_get(services: BSDSPS) {
-    let (_, _, path_info_service) = services;
-
+async fn put_get(svc: impl PathInfoService) {
     let path_info = PathInfo {
         node: Some(castorepb::Node {
             node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode {
@@ -94,20 +59,14 @@ async fn put_get(services: BSDSPS) {
     };
 
     // insert
-    let resp = path_info_service
-        .put(path_info.clone())
-        .await
-        .expect("must succeed");
+    let resp = svc.put(path_info.clone()).await.expect("must succeed");
 
     // expect the returned PathInfo to be equal (for now)
     // in the future, some stores might add additional fields/signatures.
     assert_eq!(path_info, resp);
 
     // get it back
-    let resp = path_info_service
-        .get(DUMMY_PATH_DIGEST)
-        .await
-        .expect("must succeed");
+    let resp = svc.get(DUMMY_PATH_DIGEST).await.expect("must succeed");
 
     assert_eq!(Some(path_info), resp);
 }
diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs
index 31ec57aade..ee170468d1 100644
--- a/tvix/store/src/pathinfoservice/tests/utils.rs
+++ b/tvix/store/src/pathinfoservice/tests/utils.rs
@@ -1,8 +1,10 @@
 use std::sync::Arc;
 
 use tonic::transport::{Endpoint, Server, Uri};
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 
 use crate::{
+    nar::{NarCalculationService, SimpleRenderer},
     pathinfoservice::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService},
     proto::{
         path_info_service_client::PathInfoServiceClient,
@@ -14,7 +16,8 @@ use crate::{
 /// Constructs and returns a gRPC PathInfoService.
 /// We also return memory-based {Blob,Directory}Service,
 /// as the consumer of this function accepts a 3-tuple.
-pub async fn make_grpc_path_info_service_client() -> super::BSDSPS {
+pub async fn make_grpc_path_info_service_client(
+) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) {
     let (left, right) = tokio::io::duplex(64);
 
     let blob_service = blob_service();
@@ -26,12 +29,15 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS {
         let directory_service = directory_service.clone();
         async move {
             let path_info_service: Arc<dyn PathInfoService> =
-                Arc::from(MemoryPathInfoService::new(blob_service, directory_service));
+                Arc::from(MemoryPathInfoService::default());
+            let nar_calculation_service =
+                Box::new(SimpleRenderer::new(blob_service, directory_service))
+                    as Box<dyn NarCalculationService>;
 
-            // spin up a new DirectoryService
+            // spin up a new PathInfoService
             let mut server = Server::builder();
             let router = server.add_service(PathInfoServiceServer::new(
-                GRPCPathInfoServiceWrapper::new(path_info_service),
+                GRPCPathInfoServiceWrapper::new(path_info_service, nar_calculation_service),
             ));
 
             router
@@ -43,18 +49,27 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS {
     // Create a client, connecting to the right side. The URI is unused.
     let mut maybe_right = Some(right);
 
-    let path_info_service = Box::new(GRPCPathInfoService::from_client(
-        PathInfoServiceClient::new(
-            Endpoint::try_from("http://[::]:50051")
-                .unwrap()
-                .connect_with_connector(tower::service_fn(move |_: Uri| {
-                    let right = maybe_right.take().unwrap();
-                    async move { Ok::<_, std::io::Error>(right) }
-                }))
-                .await
-                .unwrap(),
-        ),
+    let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
     ));
 
     (blob_service, directory_service, path_info_service)
 }
+
+#[cfg(all(feature = "cloud", feature = "integration"))]
+pub(crate) async fn make_bigtable_path_info_service(
+) -> crate::pathinfoservice::BigtablePathInfoService {
+    use crate::pathinfoservice::bigtable::BigtableParameters;
+    use crate::pathinfoservice::BigtablePathInfoService;
+
+    BigtablePathInfoService::connect(BigtableParameters::default_for_tests())
+        .await
+        .unwrap()
+}
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 9f45818227..68f5575676 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -1,4 +1,4 @@
-use crate::nar::RenderError;
+use crate::nar::{NarCalculationService, RenderError};
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
 use futures::{stream::BoxStream, TryStreamExt};
@@ -7,23 +7,26 @@ use tonic::{async_trait, Request, Response, Result, Status};
 use tracing::{instrument, warn};
 use tvix_castore::proto as castorepb;
 
-pub struct GRPCPathInfoServiceWrapper<PS> {
-    inner: PS,
+pub struct GRPCPathInfoServiceWrapper<PS, NS> {
+    path_info_service: PS,
     // FUTUREWORK: allow exposing without allowing listing
+    nar_calculation_service: NS,
 }
 
-impl<PS> GRPCPathInfoServiceWrapper<PS> {
-    pub fn new(path_info_service: PS) -> Self {
+impl<PS, NS> GRPCPathInfoServiceWrapper<PS, NS> {
+    pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self {
         Self {
-            inner: path_info_service,
+            path_info_service,
+            nar_calculation_service,
         }
     }
 }
 
 #[async_trait]
-impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS>
+impl<PS, NS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS>
 where
     PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
+    NS: NarCalculationService + Send + Sync + 'static,
 {
     type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
 
@@ -39,7 +42,7 @@ where
                     .to_vec()
                     .try_into()
                     .map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
-                match self.inner.get(digest).await {
+                match self.path_info_service.get(digest).await {
                     Ok(None) => Err(Status::not_found("PathInfo not found")),
                     Ok(Some(path_info)) => Ok(Response::new(path_info)),
                     Err(e) => {
@@ -57,7 +60,7 @@ where
 
         // Store the PathInfo in the client. Clients MUST validate the data
         // they receive, so we don't validate additionally here.
-        match self.inner.put(path_info).await {
+        match self.path_info_service.put(path_info).await {
             Ok(path_info_new) => Ok(Response::new(path_info_new)),
             Err(e) => {
                 warn!(err = %e, "failed to put PathInfo");
@@ -79,7 +82,7 @@ where
                     Err(Status::invalid_argument("invalid root node"))?
                 }
 
-                match self.inner.calculate_nar(&root_node).await {
+                match self.nar_calculation_service.calculate_nar(&root_node).await {
                     Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
                         nar_size,
                         nar_sha256: nar_sha256.to_vec().into(),
@@ -99,7 +102,7 @@ where
         _request: Request<proto::ListPathInfoRequest>,
     ) -> Result<Response<Self::ListStream>, Status> {
         let stream = Box::pin(
-            self.inner
+            self.path_info_service
                 .list()
                 .map_err(|e| Status::internal(e.to_string())),
         );
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs
index 0b171377bd..e6e42f6ec4 100644
--- a/tvix/store/src/utils.rs
+++ b/tvix/store/src/utils.rs
@@ -10,9 +10,10 @@ use tvix_castore::{
     directoryservice::{self, DirectoryService},
 };
 
+use crate::nar::{NarCalculationService, SimpleRenderer};
 use crate::pathinfoservice::{self, PathInfoService};
 
-/// Construct the three store handles from their addrs.
+/// Construct the store handles from their addrs.
 pub async fn construct_services(
     blob_service_addr: impl AsRef<str>,
     directory_service_addr: impl AsRef<str>,
@@ -21,6 +22,7 @@ pub async fn construct_services(
     Arc<dyn BlobService>,
     Arc<dyn DirectoryService>,
     Box<dyn PathInfoService>,
+    Box<dyn NarCalculationService>,
 )> {
     let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref())
         .await?
@@ -36,7 +38,18 @@ pub async fn construct_services(
     )
     .await?;
 
-    Ok((blob_service, directory_service, path_info_service))
+    // TODO: grpc client also implements NarCalculationService
+    let nar_calculation_service = Box::new(SimpleRenderer::new(
+        blob_service.clone(),
+        directory_service.clone(),
+    )) as Box<dyn NarCalculationService>;
+
+    Ok((
+        blob_service,
+        directory_service,
+        path_info_service,
+        nar_calculation_service,
+    ))
 }
 
 /// The inverse of [tokio_util::io::SyncIoBridge].