about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
Diffstat (limited to 'tvix')
-rw-r--r--tvix/Cargo.nix11
-rw-r--r--tvix/castore/Cargo.toml4
-rw-r--r--tvix/castore/default.nix17
-rw-r--r--tvix/castore/src/directoryservice/bigtable.rs2
-rw-r--r--tvix/castore/src/directoryservice/from_addr.rs6
-rw-r--r--tvix/castore/src/directoryservice/tests/mod.rs2
-rw-r--r--tvix/castore/src/directoryservice/traverse.rs66
-rw-r--r--tvix/castore/src/fs/inodes.rs14
-rw-r--r--tvix/castore/src/fs/virtiofs.rs1
-rw-r--r--tvix/castore/src/import/archive.rs133
-rw-r--r--tvix/castore/src/import/error.rs43
-rw-r--r--tvix/castore/src/import/fs.rs91
-rw-r--r--tvix/castore/src/import/mod.rs85
-rw-r--r--tvix/castore/src/lib.rs3
-rw-r--r--tvix/castore/src/path.rs446
-rw-r--r--tvix/castore/src/proto/mod.rs2
-rw-r--r--tvix/cli/src/main.rs9
-rw-r--r--tvix/crate-hashes.json4
-rw-r--r--tvix/eval/docs/bindings.md133
-rw-r--r--tvix/eval/src/builtins/impure.rs2
-rw-r--r--tvix/eval/src/vm/mod.rs2
-rw-r--r--tvix/eval/tests/nix_oracle.rs9
-rw-r--r--tvix/glue/src/builtins/derivation.rs84
-rw-r--r--tvix/glue/src/builtins/errors.rs6
-rw-r--r--tvix/glue/src/builtins/import.rs4
-rw-r--r--tvix/glue/src/builtins/mod.rs10
-rw-r--r--tvix/glue/src/fetchers/decompression.rs (renamed from tvix/glue/src/decompression.rs)6
-rw-r--r--tvix/glue/src/fetchers/mod.rs (renamed from tvix/glue/src/fetchers.rs)38
-rw-r--r--tvix/glue/src/known_paths.rs2
-rw-r--r--tvix/glue/src/lib.rs1
-rw-r--r--tvix/glue/src/tests/mod.rs9
-rw-r--r--tvix/glue/src/tvix_store_io.rs3
-rw-r--r--tvix/nar-bridge/pkg/http/narinfo_get.go61
-rw-r--r--tvix/nix-compat/src/derivation/mod.rs45
-rw-r--r--tvix/nix-compat/src/derivation/tests/mod.rs32
-rw-r--r--tvix/nix-compat/src/nar/reader/async/mod.rs166
-rw-r--r--tvix/nix-compat/src/nar/reader/async/read.rs69
-rw-r--r--tvix/nix-compat/src/nar/reader/async/test.rs310
-rw-r--r--tvix/nix-compat/src/nar/reader/mod.rs243
-rw-r--r--tvix/nix-compat/src/nar/reader/test.rs272
-rw-r--r--tvix/nix-compat/src/nar/wire/mod.rs17
-rw-r--r--tvix/nix-compat/src/nar/wire/tag.rs1
-rw-r--r--tvix/nix-compat/src/nix_daemon/worker_protocol.rs73
-rw-r--r--tvix/nix-compat/src/store_path/mod.rs18
-rw-r--r--tvix/nix-compat/src/wire/bytes/mod.rs103
-rw-r--r--tvix/nix-compat/src/wire/bytes/reader.rs464
-rw-r--r--tvix/nix-compat/src/wire/bytes/reader/mod.rs447
-rw-r--r--tvix/nix-compat/src/wire/bytes/reader/trailer.rs198
-rw-r--r--tvix/nix-compat/src/wire/bytes/writer.rs18
-rw-r--r--tvix/nix-compat/src/wire/mod.rs3
-rw-r--r--tvix/nix-compat/src/wire/primitive.rs74
-rw-r--r--tvix/shell.nix2
-rw-r--r--tvix/store/Cargo.toml4
-rw-r--r--tvix/store/default.nix14
-rw-r--r--tvix/store/src/bin/tvix-store.rs46
-rw-r--r--tvix/store/src/import.rs6
-rw-r--r--tvix/store/src/nar/import.rs36
-rw-r--r--tvix/store/src/pathinfoservice/bigtable.rs2
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs4
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs99
-rw-r--r--tvix/store/src/pathinfoservice/tests/mod.rs2
61 files changed, 2873 insertions, 1204 deletions
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 51d47b05e3..715d08d345 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -1,4 +1,4 @@
-# This file was @generated by crate2nix 0.13.0 with the command:
+# This file was @generated by crate2nix 0.14.0 with the command:
 #   "generate" "--all-features"
 # See https://github.com/kolloch/crate2nix for more info.
 
@@ -13771,7 +13771,7 @@ rec {
           "tonic-reflection" = [ "dep:tonic-reflection" ];
           "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ];
         };
-        resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "tonic-reflection" "virtiofs" ];
+        resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" ];
       };
       "tvix-cli" = rec {
         crateName = "tvix-cli";
@@ -14489,7 +14489,7 @@ rec {
           "tonic-reflection" = [ "dep:tonic-reflection" "tvix-castore/tonic-reflection" ];
           "virtiofs" = [ "tvix-castore/virtiofs" ];
         };
-        resolvedDefaultFeatures = [ "cloud" "default" "fuse" "otlp" "tonic-reflection" "virtiofs" ];
+        resolvedDefaultFeatures = [ "cloud" "default" "fuse" "integration" "otlp" "tonic-reflection" "virtiofs" ];
       };
       "typenum" = rec {
         crateName = "typenum";
@@ -17085,8 +17085,9 @@ rec {
             # because we compiled those test binaries in the former and not the latter.
             # So all paths will expect source tree to be there and not in the build top directly.
             # For example: $NIX_BUILD_TOP := /build in general, if you ask yourself.
-            # TODO(raitobezarius): I believe there could be more edge cases if `crate.sourceRoot`
-            # do exist but it's very hard to reason about them, so let's wait until the first bug report.
+            # NOTE: There could be edge cases if `crate.sourceRoot` does exist but
+            # it's very hard to reason about them.
+            # Open a bug if you run into this!
             mkdir -p source/
             cd source/
 
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index f54bb2ddb5..2797ef08f2 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -112,3 +112,7 @@ virtiofs = [
 ]
 fuse = ["fs"]
 tonic-reflection = ["dep:tonic-reflection"]
+# Whether to run the integration tests.
+# Requires the following packages in $PATH:
+# cbtemulator, google-cloud-bigtable-tool
+integration = []
diff --git a/tvix/castore/default.nix b/tvix/castore/default.nix
index edc20ac79d..641d883760 100644
--- a/tvix/castore/default.nix
+++ b/tvix/castore/default.nix
@@ -1,12 +1,23 @@
 { depot, pkgs, ... }:
 
-depot.tvix.crates.workspaceMembers.tvix-castore.build.override {
+(depot.tvix.crates.workspaceMembers.tvix-castore.build.override {
   runTests = true;
   testPreRun = ''
     export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt;
-    export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}"
   '';
 
   # enable some optional features.
   features = [ "default" "cloud" ];
-}
+}).overrideAttrs (_: {
+  meta.ci.targets = [ "integration-tests" ];
+  passthru.integration-tests = depot.tvix.crates.workspaceMembers.tvix-castore.build.override {
+    runTests = true;
+    testPreRun = ''
+      export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt;
+      export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}"
+    '';
+
+    # enable some optional features.
+    features = [ "default" "cloud" "integration" ];
+  };
+})
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs
index bee2fb15ae..0fdb24628f 100644
--- a/tvix/castore/src/directoryservice/bigtable.rs
+++ b/tvix/castore/src/directoryservice/bigtable.rs
@@ -115,7 +115,7 @@ impl BigtableDirectoryService {
             .stdout(Stdio::piped())
             .kill_on_drop(true)
             .spawn()
-            .expect("failed to spwan emulator");
+            .expect("failed to spawn emulator");
 
         Retry::spawn(
             ExponentialBackoff::from_millis(20)
diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs
index 31158d3a38..ae51df6376 100644
--- a/tvix/castore/src/directoryservice/from_addr.rs
+++ b/tvix/castore/src/directoryservice/from_addr.rs
@@ -144,7 +144,7 @@ mod tests {
     #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
     /// A valid example for Bigtable
     #[cfg_attr(
-        feature = "cloud",
+        all(feature = "cloud", feature = "integration"),
         case::bigtable_valid_url(
             "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1",
             true
@@ -152,7 +152,7 @@ mod tests {
     )]
     /// A valid example for Bigtable, specifying a custom channel size and timeout
     #[cfg_attr(
-        feature = "cloud",
+        all(feature = "cloud", feature = "integration"),
         case::bigtable_valid_url(
             "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1&channel_size=10&timeout=10",
             true
@@ -160,7 +160,7 @@ mod tests {
     )]
     /// A invalid Bigtable example (missing fields)
     #[cfg_attr(
-        feature = "cloud",
+        all(feature = "cloud", feature = "integration"),
         case::bigtable_invalid_url("bigtable://instance-1", false)
     )]
     #[tokio::test]
diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs
index 50c8a5c6d3..1b40d9feb0 100644
--- a/tvix/castore/src/directoryservice/tests/mod.rs
+++ b/tvix/castore/src/directoryservice/tests/mod.rs
@@ -26,7 +26,7 @@ use self::utils::make_grpc_directory_service_client;
 #[case::grpc(make_grpc_directory_service_client().await)]
 #[case::memory(directoryservice::from_addr("memory://").await.unwrap())]
 #[case::sled(directoryservice::from_addr("sled://").await.unwrap())]
-#[cfg_attr(feature = "cloud", case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))]
+#[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))]
 pub fn directory_services(#[case] directory_service: impl DirectoryService) {}
 
 /// Ensures asking for a directory that doesn't exist returns a Ok(None).
diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs
index 573581edbd..8a668c868c 100644
--- a/tvix/castore/src/directoryservice/traverse.rs
+++ b/tvix/castore/src/directoryservice/traverse.rs
@@ -1,30 +1,20 @@
 use super::DirectoryService;
-use crate::{proto::NamedNode, B3Digest, Error};
-use std::os::unix::ffi::OsStrExt;
+use crate::{proto::NamedNode, B3Digest, Error, Path};
 use tracing::{instrument, warn};
 
 /// This descends from a (root) node to the given (sub)path, returning the Node
 /// at that path, or none, if there's nothing at that path.
-#[instrument(skip(directory_service))]
+#[instrument(skip(directory_service, path), fields(%path))]
 pub async fn descend_to<DS>(
     directory_service: DS,
     root_node: crate::proto::node::Node,
-    path: &std::path::Path,
+    path: impl AsRef<Path> + std::fmt::Display,
 ) -> Result<Option<crate::proto::node::Node>, Error>
 where
     DS: AsRef<dyn DirectoryService>,
 {
-    // strip a possible `/` prefix from the path.
-    let path = {
-        if path.starts_with("/") {
-            path.strip_prefix("/").unwrap()
-        } else {
-            path
-        }
-    };
-
     let mut cur_node = root_node;
-    let mut it = path.components();
+    let mut it = path.as_ref().components();
 
     loop {
         match it.next() {
@@ -59,9 +49,8 @@ where
                                 // look for first_component in the [Directory].
                                 // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
                                 // could stop as soon as e.name is larger than the search string.
-                                let child_node = directory.nodes().find(|n| {
-                                    n.get_name() == first_component.as_os_str().as_bytes()
-                                });
+                                let child_node =
+                                    directory.nodes().find(|n| n.get_name() == first_component);
 
                                 match child_node {
                                     // child node not found means there's no such element inside the directory.
@@ -85,11 +74,10 @@ where
 
 #[cfg(test)]
 mod tests {
-    use std::path::PathBuf;
-
     use crate::{
         directoryservice,
         fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
+        PathBuf,
     };
 
     use super::descend_to;
@@ -132,7 +120,7 @@ mod tests {
             let resp = descend_to(
                 &directory_service,
                 node_directory_complicated.clone(),
-                &PathBuf::from(""),
+                "".parse::<PathBuf>().unwrap(),
             )
             .await
             .expect("must succeed");
@@ -145,7 +133,7 @@ mod tests {
             let resp = descend_to(
                 &directory_service,
                 node_directory_complicated.clone(),
-                &PathBuf::from("keep"),
+                "keep".parse::<PathBuf>().unwrap(),
             )
             .await
             .expect("must succeed");
@@ -158,7 +146,7 @@ mod tests {
             let resp = descend_to(
                 &directory_service,
                 node_directory_complicated.clone(),
-                &PathBuf::from("keep/.keep"),
+                "keep/.keep".parse::<PathBuf>().unwrap(),
             )
             .await
             .expect("must succeed");
@@ -166,25 +154,12 @@ mod tests {
             assert_eq!(Some(node_file_keep.clone()), resp);
         }
 
-        // traversal to `keep/.keep` should return the node for the .keep file
-        {
-            let resp = descend_to(
-                &directory_service,
-                node_directory_complicated.clone(),
-                &PathBuf::from("/keep/.keep"),
-            )
-            .await
-            .expect("must succeed");
-
-            assert_eq!(Some(node_file_keep), resp);
-        }
-
         // traversal to `void` should return None (doesn't exist)
         {
             let resp = descend_to(
                 &directory_service,
                 node_directory_complicated.clone(),
-                &PathBuf::from("void"),
+                "void".parse::<PathBuf>().unwrap(),
             )
             .await
             .expect("must succeed");
@@ -192,12 +167,12 @@ mod tests {
             assert_eq!(None, resp);
         }
 
-        // traversal to `void` should return None (doesn't exist)
+        // traversal to `v/oid` should return None (doesn't exist)
         {
             let resp = descend_to(
                 &directory_service,
                 node_directory_complicated.clone(),
-                &PathBuf::from("//v/oid"),
+                "v/oid".parse::<PathBuf>().unwrap(),
             )
             .await
             .expect("must succeed");
@@ -211,25 +186,12 @@ mod tests {
             let resp = descend_to(
                 &directory_service,
                 node_directory_complicated.clone(),
-                &PathBuf::from("keep/.keep/foo"),
+                "keep/.keep/foo".parse::<PathBuf>().unwrap(),
             )
             .await
             .expect("must succeed");
 
             assert_eq!(None, resp);
         }
-
-        // traversal to a subpath of '/' should return the root node.
-        {
-            let resp = descend_to(
-                &directory_service,
-                node_directory_complicated.clone(),
-                &PathBuf::from("/"),
-            )
-            .await
-            .expect("must succeed");
-
-            assert_eq!(Some(node_directory_complicated), resp);
-        }
     }
 }
diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs
index c22bd4b2eb..bdd4595434 100644
--- a/tvix/castore/src/fs/inodes.rs
+++ b/tvix/castore/src/fs/inodes.rs
@@ -57,16 +57,18 @@ impl InodeData {
                     children.len() as u64
                 }
             },
-            mode: match self {
-                InodeData::Regular(_, _, false) => libc::S_IFREG | 0o444, // no-executable files
-                InodeData::Regular(_, _, true) => libc::S_IFREG | 0o555,  // executable files
-                InodeData::Symlink(_) => libc::S_IFLNK | 0o444,
-                InodeData::Directory(_) => libc::S_IFDIR | 0o555,
-            },
+            mode: self.as_fuse_type() | self.mode(),
             ..Default::default()
         }
     }
 
+    fn mode(&self) -> u32 {
+        match self {
+            InodeData::Regular(_, _, false) | InodeData::Symlink(_) => 0o444,
+            InodeData::Regular(_, _, true) | InodeData::Directory(_) => 0o555,
+        }
+    }
+
     pub fn as_fuse_entry(&self, inode: u64) -> fuse_backend_rs::api::filesystem::Entry {
         fuse_backend_rs::api::filesystem::Entry {
             inode,
diff --git a/tvix/castore/src/fs/virtiofs.rs b/tvix/castore/src/fs/virtiofs.rs
index 846270d285..d63e2f2bdd 100644
--- a/tvix/castore/src/fs/virtiofs.rs
+++ b/tvix/castore/src/fs/virtiofs.rs
@@ -34,6 +34,7 @@ enum Error {
     /// Invalid descriptor chain.
     InvalidDescriptorChain,
     /// Failed to handle filesystem requests.
+    #[allow(dead_code)]
     HandleRequests(fuse_backend_rs::Error),
     /// Failed to construct new vhost user daemon.
     NewDaemon,
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index adcfb871d5..0d21481d40 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -1,6 +1,8 @@
+//! Imports from an archive (tarballs)
+
+use std::collections::HashMap;
 use std::io::{Cursor, Write};
 use std::sync::Arc;
-use std::{collections::HashMap, path::PathBuf};
 
 use petgraph::graph::{DiGraph, NodeIndex};
 use petgraph::visit::{DfsPostOrder, EdgeRef};
@@ -15,10 +17,12 @@ use tracing::{instrument, warn, Level};
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
-use crate::import::{ingest_entries, Error as ImportError, IngestionEntry};
+use crate::import::{ingest_entries, IngestionEntry, IngestionError};
 use crate::proto::node::Node;
 use crate::B3Digest;
 
+type TarPathBuf = std::path::PathBuf;
+
 /// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
 /// background.
 ///
@@ -32,20 +36,42 @@ const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
-    #[error("error reading archive entry: {0}")]
-    Io(#[from] std::io::Error),
+    #[error("unable to construct stream of entries: {0}")]
+    Entries(std::io::Error),
+
+    #[error("unable to read next entry: {0}")]
+    NextEntry(std::io::Error),
+
+    #[error("unable to read path for entry: {0}")]
+    PathRead(std::io::Error),
+
+    #[error("unable to convert path {0} for entry: {1}")]
+    PathConvert(TarPathBuf, std::io::Error),
+
+    #[error("unable to read size field for {0}: {1}")]
+    Size(TarPathBuf, std::io::Error),
+
+    #[error("unable to read mode field for {0}: {1}")]
+    Mode(TarPathBuf, std::io::Error),
+
+    #[error("unable to read link name field for {0}: {1}")]
+    LinkName(TarPathBuf, std::io::Error),
+
+    #[error("unable to read blob contents for {0}: {1}")]
+    BlobRead(TarPathBuf, std::io::Error),
+
+    // FUTUREWORK: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(TarPathBuf, std::io::Error),
 
     #[error("unsupported tar entry {0} type: {1:?}")]
-    UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
+    EntryType(TarPathBuf, tokio_tar::EntryType),
 
     #[error("symlink missing target {0}")]
-    MissingSymlinkTarget(PathBuf),
+    MissingSymlinkTarget(TarPathBuf),
 
     #[error("unexpected number of top level directory entries")]
     UnexpectedNumberOfTopLevelEntries,
-
-    #[error("failed to import into castore {0}")]
-    Import(#[from] ImportError),
 }
 
 /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
@@ -55,10 +81,10 @@ pub async fn ingest_archive<BS, DS, R>(
     blob_service: BS,
     directory_service: DS,
     mut archive: Archive<R>,
-) -> Result<Node, Error>
+) -> Result<Node, IngestionError<Error>>
 where
     BS: BlobService + Clone + 'static,
-    DS: AsRef<dyn DirectoryService>,
+    DS: DirectoryService,
     R: AsyncRead + Unpin,
 {
     // Since tarballs can have entries in any arbitrary order, we need to
@@ -71,16 +97,22 @@ where
     let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE));
     let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new();
 
-    let mut entries_iter = archive.entries()?;
-    while let Some(mut entry) = entries_iter.try_next().await? {
-        let path: PathBuf = entry.path()?.into();
+    let mut entries_iter = archive.entries().map_err(Error::Entries)?;
+    while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
+        let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into();
+
+        // construct a castore PathBuf, which we use in the produced IngestionEntry.
+        let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true)
+            .map_err(|e| Error::PathConvert(tar_path.clone(), e))?;
 
         let header = entry.header();
         let entry = match header.entry_type() {
             tokio_tar::EntryType::Regular
             | tokio_tar::EntryType::GNUSparse
             | tokio_tar::EntryType::Continuous => {
-                let header_size = header.size()?;
+                let header_size = header
+                    .size()
+                    .map_err(|e| Error::Size(tar_path.clone(), e))?;
 
                 // If the blob is small enough, read it off the wire, compute the digest,
                 // and upload it to the [BlobService] in the background.
@@ -101,7 +133,9 @@ where
                         .acquire_many_owned(header_size as u32)
                         .await
                         .unwrap();
-                    let size = tokio::io::copy(&mut reader, &mut buffer).await?;
+                    let size = tokio::io::copy(&mut reader, &mut buffer)
+                        .await
+                        .map_err(|e| Error::Size(tar_path.clone(), e))?;
 
                     let digest: B3Digest = hasher.finalize().as_bytes().into();
 
@@ -109,12 +143,18 @@ where
                         let blob_service = blob_service.clone();
                         let digest = digest.clone();
                         async_blob_uploads.spawn({
+                            let tar_path = tar_path.clone();
                             async move {
                                 let mut writer = blob_service.open_write().await;
 
-                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?;
+                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer)
+                                    .await
+                                    .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
 
-                                let blob_digest = writer.close().await?;
+                                let blob_digest = writer
+                                    .close()
+                                    .await
+                                    .map_err(|e| Error::BlobFinalize(tar_path, e))?;
 
                                 assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
 
@@ -130,35 +170,50 @@ where
                 } else {
                     let mut writer = blob_service.open_write().await;
 
-                    let size = tokio::io::copy(&mut entry, &mut writer).await?;
+                    let size = tokio::io::copy(&mut entry, &mut writer)
+                        .await
+                        .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
 
-                    let digest = writer.close().await?;
+                    let digest = writer
+                        .close()
+                        .await
+                        .map_err(|e| Error::BlobFinalize(tar_path.clone(), e))?;
 
                     (size, digest)
                 };
 
+                let executable = entry
+                    .header()
+                    .mode()
+                    .map_err(|e| Error::Mode(tar_path, e))?
+                    & 64
+                    != 0;
+
                 IngestionEntry::Regular {
                     path,
                     size,
-                    executable: entry.header().mode()? & 64 != 0,
+                    executable,
                     digest,
                 }
             }
             tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
                 target: entry
-                    .link_name()?
-                    .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
-                    .into(),
+                    .link_name()
+                    .map_err(|e| Error::LinkName(tar_path.clone(), e))?
+                    .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))?
+                    .into_owned()
+                    .into_os_string()
+                    .into_encoded_bytes(),
                 path,
             },
             // Push a bogus directory marker so we can make sure this directoy gets
             // created. We don't know the digest and size until after reading the full
             // tarball.
-            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() },
+            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path },
 
             tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
 
-            entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)),
+            entry_type => return Err(Error::EntryType(tar_path, entry_type).into()),
         };
 
         nodes.add(entry)?;
@@ -193,7 +248,7 @@ where
 /// An error is returned if this is not the case and ingestion will fail.
 struct IngestionEntryGraph {
     graph: DiGraph<IngestionEntry, ()>,
-    path_to_index: HashMap<PathBuf, NodeIndex>,
+    path_to_index: HashMap<crate::path::PathBuf, NodeIndex>,
     root_node: Option<NodeIndex>,
 }
 
@@ -218,7 +273,7 @@ impl IngestionEntryGraph {
     /// and new nodes are not directories, the node is replaced and is disconnected from its
     /// children.
     pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> {
-        let path = entry.path().to_path_buf();
+        let path = entry.path().to_owned();
 
         let index = match self.path_to_index.get(entry.path()) {
             Some(&index) => {
@@ -238,7 +293,7 @@ impl IngestionEntryGraph {
             // We expect archives to contain a single root node, if there is another root node
             // entry with a different path name, this is unsupported.
             if let Some(root_node) = self.root_node {
-                if self.get_node(root_node).path() != path {
+                if self.get_node(root_node).path() != &path {
                     return Err(Error::UnexpectedNumberOfTopLevelEntries);
                 }
             }
@@ -247,7 +302,7 @@ impl IngestionEntryGraph {
         } else if let Some(parent_path) = path.parent() {
             // Recursively add the parent node until it hits the root node.
             let parent_index = self.add(IngestionEntry::Dir {
-                path: parent_path.to_path_buf(),
+                path: parent_path.to_owned(),
             })?;
 
             // Insert an edge from the parent directory to the child entry.
@@ -332,23 +387,29 @@ mod test {
 
     lazy_static! {
         pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into();
-        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() };
-        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() };
-        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() };
+        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir {
+            path: "a".parse().unwrap()
+        };
+        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir {
+            path: "b".parse().unwrap()
+        };
+        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir {
+            path: "a/b".parse().unwrap()
+        };
         pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular {
-            path: "a".into(),
+            path: "a".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
         };
         pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b".into(),
+            path: "a/b".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
         };
         pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b/c".into(),
+            path: "a/b/c".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs
index 15dd0664de..e3fba617e0 100644
--- a/tvix/castore/src/import/error.rs
+++ b/tvix/castore/src/import/error.rs
@@ -1,39 +1,20 @@
-use std::{fs::FileType, path::PathBuf};
+use super::PathBuf;
 
 use crate::Error as CastoreError;
 
+/// Represents all error types that emitted by ingest_entries.
+/// It can represent errors uploading individual Directories and finalizing
+/// the upload.
+/// It also contains a generic error kind that'll carry ingestion-method
+/// specific errors.
 #[derive(Debug, thiserror::Error)]
-pub enum Error {
+pub enum IngestionError<E: std::fmt::Display> {
+    #[error("error from producer: {0}")]
+    Producer(#[from] E),
+
     #[error("failed to upload directory at {0}: {1}")]
     UploadDirectoryError(PathBuf, CastoreError),
 
-    #[error("invalid encoding encountered for entry {0:?}")]
-    InvalidEncoding(PathBuf),
-
-    #[error("unable to stat {0}: {1}")]
-    UnableToStat(PathBuf, std::io::Error),
-
-    #[error("unable to open {0}: {1}")]
-    UnableToOpen(PathBuf, std::io::Error),
-
-    #[error("unable to read {0}: {1}")]
-    UnableToRead(PathBuf, std::io::Error),
-
-    #[error("unsupported file {0} type: {1:?}")]
-    UnsupportedFileType(PathBuf, FileType),
-}
-
-impl From<CastoreError> for Error {
-    fn from(value: CastoreError) -> Self {
-        match value {
-            CastoreError::InvalidRequest(_) => panic!("tvix bug"),
-            CastoreError::StorageError(_) => panic!("error"),
-        }
-    }
-}
-
-impl From<Error> for std::io::Error {
-    fn from(value: Error) -> Self {
-        std::io::Error::new(std::io::ErrorKind::Other, value)
-    }
+    #[error("failed to finalize directory upload: {0}")]
+    FinalizeDirectoryUpload(CastoreError),
 }
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 6709d4a127..9d3ecfe6ab 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -1,8 +1,11 @@
+//! Import from a real filesystem.
+
 use futures::stream::BoxStream;
 use futures::StreamExt;
+use std::fs::FileType;
+use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
-use std::path::Path;
 use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
@@ -10,13 +13,11 @@ use walkdir::WalkDir;
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
 use crate::proto::node::Node;
+use crate::B3Digest;
 
 use super::ingest_entries;
-use super::upload_blob_at_path;
-use super::Error;
 use super::IngestionEntry;
-
-///! Imports that deal with a real filesystem.
+use super::IngestionError;
 
 /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
 /// [DirectoryService]. It returns the root node or an error.
@@ -30,11 +31,11 @@ pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
     path: P,
-) -> Result<Node, Error>
+) -> Result<Node, IngestionError<Error>>
 where
-    P: AsRef<Path> + std::fmt::Debug,
+    P: AsRef<std::path::Path> + std::fmt::Debug,
     BS: BlobService + Clone,
-    DS: AsRef<dyn DirectoryService>,
+    DS: DirectoryService,
 {
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
@@ -55,13 +56,13 @@ where
 pub fn dir_entries_to_ingestion_stream<'a, BS, I>(
     blob_service: BS,
     iter: I,
-    root: &'a Path,
+    root: &'a std::path::Path,
 ) -> BoxStream<'a, Result<IngestionEntry, Error>>
 where
     BS: BlobService + Clone + 'a,
     I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
 {
-    let prefix = root.parent().unwrap_or_else(|| Path::new(""));
+    let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
 
     Box::pin(
         futures::stream::iter(iter)
@@ -72,7 +73,7 @@ where
                         Ok(dir_entry) => {
                             dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await
                         }
-                        Err(e) => Err(Error::UnableToStat(
+                        Err(e) => Err(Error::Stat(
                             prefix.to_path_buf(),
                             e.into_io_error().expect("walkdir err must be some"),
                         )),
@@ -91,32 +92,37 @@ where
 pub async fn dir_entry_to_ingestion_entry<BS>(
     blob_service: BS,
     entry: &DirEntry,
-    prefix: &Path,
+    prefix: &std::path::Path,
 ) -> Result<IngestionEntry, Error>
 where
     BS: BlobService,
 {
     let file_type = entry.file_type();
 
-    let path = entry
+    let fs_path = entry
         .path()
         .strip_prefix(prefix)
-        .expect("Tvix bug: failed to strip root path prefix")
-        .to_path_buf();
+        .expect("Tvix bug: failed to strip root path prefix");
+
+    // convert to castore PathBuf
+    let path = crate::path::PathBuf::from_host_path(fs_path, false)
+        .unwrap_or_else(|e| panic!("Tvix bug: walkdir direntry cannot be parsed: {}", e));
 
     if file_type.is_dir() {
         Ok(IngestionEntry::Dir { path })
     } else if file_type.is_symlink() {
         let target = std::fs::read_link(entry.path())
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?;
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))?
+            .into_os_string()
+            .into_vec();
 
         Ok(IngestionEntry::Symlink { path, target })
     } else if file_type.is_file() {
         let metadata = entry
             .metadata()
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
 
-        let digest = upload_blob_at_path(blob_service, entry.path().to_path_buf()).await?;
+        let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?;
 
         Ok(IngestionEntry::Regular {
             path,
@@ -127,6 +133,53 @@ where
             digest,
         })
     } else {
-        Ok(IngestionEntry::Unknown { path, file_type })
+        return Err(Error::FileType(fs_path.to_path_buf(), file_type));
     }
 }
+
+/// Uploads the file at the provided [Path] the the [BlobService].
+#[instrument(skip(blob_service), fields(path), err)]
+async fn upload_blob<BS>(
+    blob_service: BS,
+    path: impl AsRef<std::path::Path>,
+) -> Result<B3Digest, Error>
+where
+    BS: BlobService,
+{
+    let mut file = match tokio::fs::File::open(path.as_ref()).await {
+        Ok(file) => file,
+        Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)),
+    };
+
+    let mut writer = blob_service.open_write().await;
+
+    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
+        return Err(Error::BlobRead(path.as_ref().to_path_buf(), e));
+    };
+
+    let digest = writer
+        .close()
+        .await
+        .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
+
+    Ok(digest)
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("unsupported file type at {0}: {1:?}")]
+    FileType(std::path::PathBuf, FileType),
+
+    #[error("unable to stat {0}: {1}")]
+    Stat(std::path::PathBuf, std::io::Error),
+
+    #[error("unable to open {0}: {1}")]
+    Open(std::path::PathBuf, std::io::Error),
+
+    #[error("unable to read {0}: {1}")]
+    BlobRead(std::path::PathBuf, std::io::Error),
+
+    // TODO: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(std::path::PathBuf, std::io::Error),
+}
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index e9fdc750f8..7b42644a27 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -4,9 +4,9 @@
 //! Specific implementations, such as ingesting from the filesystem, live in
 //! child modules.
 
-use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryPutter;
 use crate::directoryservice::DirectoryService;
+use crate::path::PathBuf;
 use crate::proto::node::Node;
 use crate::proto::Directory;
 use crate::proto::DirectoryNode;
@@ -14,21 +14,14 @@ use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
 use crate::B3Digest;
 use futures::{Stream, StreamExt};
-use std::fs::FileType;
 
 use tracing::Level;
 
-#[cfg(target_family = "unix")]
-use std::os::unix::ffi::OsStrExt;
-
-use std::{
-    collections::HashMap,
-    path::{Path, PathBuf},
-};
+use std::collections::HashMap;
 use tracing::instrument;
 
 mod error;
-pub use error::Error;
+pub use error::IngestionError;
 
 pub mod archive;
 pub mod fs;
@@ -51,10 +44,14 @@ pub mod fs;
 ///
 /// On success, returns the root node.
 #[instrument(skip_all, ret(level = Level::TRACE), err)]
-pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error>
+pub async fn ingest_entries<DS, S, E>(
+    directory_service: DS,
+    mut entries: S,
+) -> Result<Node, IngestionError<E>>
 where
-    DS: AsRef<dyn DirectoryService>,
-    S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin,
+    DS: DirectoryService,
+    S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
+    E: std::error::Error,
 {
     // For a given path, this holds the [Directory] structs as they are populated.
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
@@ -68,20 +65,11 @@ where
             // we break the loop manually.
             .expect("Tvix bug: unexpected end of stream")?;
 
-        debug_assert!(
-            entry
-                .path()
-                .components()
-                .all(|x| matches!(x, std::path::Component::Normal(_))),
-            "path may only contain normal components"
-        );
-
         let name = entry
             .path()
             .file_name()
             // If this is the root node, it will have an empty name.
             .unwrap_or_default()
-            .as_bytes()
             .to_owned()
             .into();
 
@@ -102,9 +90,12 @@ where
                 // If we don't have one yet (as that's the first one to upload),
                 // initialize the putter.
                 maybe_directory_putter
-                    .get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
+                    .get_or_insert_with(|| directory_service.put_multiple_start())
                     .put(directory)
-                    .await?;
+                    .await
+                    .map_err(|e| {
+                        IngestionError::UploadDirectoryError(entry.path().to_owned(), e)
+                    })?;
 
                 Node::Directory(DirectoryNode {
                     name,
@@ -114,7 +105,7 @@ where
             }
             IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode {
                 name,
-                target: target.as_os_str().as_bytes().to_owned().into(),
+                target: target.to_owned().into(),
             }),
             IngestionEntry::Regular {
                 size,
@@ -127,9 +118,6 @@ where
                 size: *size,
                 executable: *executable,
             }),
-            IngestionEntry::Unknown { path, file_type } => {
-                return Err(Error::UnsupportedFileType(path.clone(), *file_type));
-            }
         };
 
         if entry.path().components().count() == 1 {
@@ -138,7 +126,7 @@ where
 
         // record node in parent directory, creating a new [Directory] if not there yet.
         directories
-            .entry(entry.path().parent().unwrap().to_path_buf())
+            .entry(entry.path().parent().unwrap().to_owned())
             .or_default()
             .add(node);
     };
@@ -152,7 +140,10 @@ where
     // they're all persisted to the backend.
     if let Some(mut directory_putter) = maybe_directory_putter {
         #[cfg_attr(not(debug_assertions), allow(unused))]
-        let root_directory_digest = directory_putter.close().await?;
+        let root_directory_digest = directory_putter
+            .close()
+            .await
+            .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
 
         #[cfg(debug_assertions)]
         {
@@ -174,31 +165,6 @@ where
     Ok(root_node)
 }
 
-/// Uploads the file at the provided [Path] the the [BlobService].
-#[instrument(skip(blob_service), fields(path), err)]
-async fn upload_blob_at_path<BS>(blob_service: BS, path: PathBuf) -> Result<B3Digest, Error>
-where
-    BS: BlobService,
-{
-    let mut file = match tokio::fs::File::open(&path).await {
-        Ok(file) => file,
-        Err(e) => return Err(Error::UnableToRead(path, e)),
-    };
-
-    let mut writer = blob_service.open_write().await;
-
-    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
-        return Err(Error::UnableToRead(path, e));
-    };
-
-    let digest = writer
-        .close()
-        .await
-        .map_err(|e| Error::UnableToRead(path, e))?;
-
-    Ok(digest)
-}
-
 #[derive(Debug, Clone, Eq, PartialEq)]
 pub enum IngestionEntry {
     Regular {
@@ -209,24 +175,19 @@ pub enum IngestionEntry {
     },
     Symlink {
         path: PathBuf,
-        target: PathBuf,
+        target: Vec<u8>,
     },
     Dir {
         path: PathBuf,
     },
-    Unknown {
-        path: PathBuf,
-        file_type: FileType,
-    },
 }
 
 impl IngestionEntry {
-    fn path(&self) -> &Path {
+    fn path(&self) -> &PathBuf {
         match self {
             IngestionEntry::Regular { path, .. } => path,
             IngestionEntry::Symlink { path, .. } => path,
             IngestionEntry::Dir { path } => path,
-            IngestionEntry::Unknown { path, .. } => path,
         }
     }
 
diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs
index 1a7ac6b4b4..bdc533a8c5 100644
--- a/tvix/castore/src/lib.rs
+++ b/tvix/castore/src/lib.rs
@@ -9,6 +9,9 @@ pub mod fixtures;
 #[cfg(feature = "fs")]
 pub mod fs;
 
+mod path;
+pub use path::{Path, PathBuf};
+
 pub mod import;
 pub mod proto;
 pub mod tonic;
diff --git a/tvix/castore/src/path.rs b/tvix/castore/src/path.rs
new file mode 100644
index 0000000000..fcc2bd01fb
--- /dev/null
+++ b/tvix/castore/src/path.rs
@@ -0,0 +1,446 @@
+//! Contains data structures to deal with Paths in the tvix-castore model.
+
+use std::{
+    borrow::Borrow,
+    fmt::{self, Debug, Display},
+    mem,
+    ops::Deref,
+    str::FromStr,
+};
+
+use bstr::ByteSlice;
+
+use crate::proto::validate_node_name;
+
+/// Represents a Path in the castore model.
+/// These are always relative, and platform-independent, which distinguishes
+/// them from the ones provided in the standard library.
+#[derive(Eq, Hash, PartialEq)]
+#[repr(transparent)] // SAFETY: Representation has to match [u8]
+pub struct Path {
+    // As node names in the castore model cannot contain slashes,
+    // we use them as component separators here.
+    inner: [u8],
+}
+
+#[allow(dead_code)]
+impl Path {
+    // SAFETY: The empty path is valid.
+    pub const ROOT: &'static Path = unsafe { Path::from_bytes_unchecked(&[]) };
+
+    /// Convert a byte slice to a path, without checking validity.
+    const unsafe fn from_bytes_unchecked(bytes: &[u8]) -> &Path {
+        // SAFETY: &[u8] and &Path have the same representation.
+        unsafe { mem::transmute(bytes) }
+    }
+
+    fn from_bytes(bytes: &[u8]) -> Option<&Path> {
+        if !bytes.is_empty() {
+            // Ensure all components are valid castore node names.
+            for component in bytes.split_str(b"/") {
+                validate_node_name(component).ok()?;
+            }
+        }
+
+        // SAFETY: We have verified that the path contains no empty components.
+        Some(unsafe { Path::from_bytes_unchecked(bytes) })
+    }
+
+    pub fn into_boxed_bytes(self: Box<Path>) -> Box<[u8]> {
+        // SAFETY: Box<Path> and Box<[u8]> have the same representation.
+        unsafe { mem::transmute(self) }
+    }
+
+    /// Returns the path without its final component, if there is one.
+    ///
+    /// Note that the parent of a bare file name is [Path::ROOT].
+    /// [Path::ROOT] is the only path without a parent.
+    pub fn parent(&self) -> Option<&Path> {
+        // The root does not have a parent.
+        if self.inner.is_empty() {
+            return None;
+        }
+
+        Some(
+            if let Some((parent, _file_name)) = self.inner.rsplit_once_str(b"/") {
+                // SAFETY: The parent of a valid Path is a valid Path.
+                unsafe { Path::from_bytes_unchecked(parent) }
+            } else {
+                // The parent of a bare file name is the root.
+                Path::ROOT
+            },
+        )
+    }
+
+    /// Creates a PathBuf with `name` adjoined to self.
+    pub fn try_join(&self, name: &[u8]) -> Result<PathBuf, std::io::Error> {
+        let mut v = PathBuf::with_capacity(self.inner.len() + name.len() + 1);
+        v.inner.extend_from_slice(&self.inner);
+        v.try_push(name)?;
+
+        Ok(v)
+    }
+
+    /// Produces an iterator over the components of the path, which are
+    /// individual byte slices.
+    /// In case the path is empty, an empty iterator is returned.
+    pub fn components(&self) -> impl Iterator<Item = &[u8]> {
+        let mut iter = self.inner.split_str(&b"/");
+
+        // We don't want to return an empty element, consume it if it's the only one.
+        if self.inner.is_empty() {
+            let _ = iter.next();
+        }
+
+        iter
+    }
+
+    /// Returns the final component of the Path, if there is one.
+    pub fn file_name(&self) -> Option<&[u8]> {
+        self.components().last()
+    }
+
+    pub fn as_bytes(&self) -> &[u8] {
+        &self.inner
+    }
+}
+
+impl Debug for Path {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        Debug::fmt(self.inner.as_bstr(), f)
+    }
+}
+
+impl Display for Path {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        Display::fmt(self.inner.as_bstr(), f)
+    }
+}
+
+impl AsRef<Path> for Path {
+    fn as_ref(&self) -> &Path {
+        self
+    }
+}
+
+/// Represents a owned PathBuf in the castore model.
+/// These are always relative, and platform-independent, which distinguishes
+/// them from the ones provided in the standard library.
+#[derive(Clone, Default, Eq, Hash, PartialEq)]
+pub struct PathBuf {
+    inner: Vec<u8>,
+}
+
+impl Deref for PathBuf {
+    type Target = Path;
+
+    fn deref(&self) -> &Self::Target {
+        // SAFETY: PathBuf always contains a valid Path.
+        unsafe { Path::from_bytes_unchecked(&self.inner) }
+    }
+}
+
+impl AsRef<Path> for PathBuf {
+    fn as_ref(&self) -> &Path {
+        self
+    }
+}
+
+impl ToOwned for Path {
+    type Owned = PathBuf;
+
+    fn to_owned(&self) -> Self::Owned {
+        PathBuf {
+            inner: self.inner.to_owned(),
+        }
+    }
+}
+
+impl Borrow<Path> for PathBuf {
+    fn borrow(&self) -> &Path {
+        self
+    }
+}
+
+impl From<Box<Path>> for PathBuf {
+    fn from(value: Box<Path>) -> Self {
+        // SAFETY: Box<Path> is always a valid path.
+        unsafe { PathBuf::from_bytes_unchecked(value.into_boxed_bytes().into_vec()) }
+    }
+}
+
+impl From<&Path> for PathBuf {
+    fn from(value: &Path) -> Self {
+        value.to_owned()
+    }
+}
+
+impl FromStr for PathBuf {
+    type Err = std::io::Error;
+
+    fn from_str(s: &str) -> Result<PathBuf, Self::Err> {
+        Ok(Path::from_bytes(s.as_bytes())
+            .ok_or(std::io::ErrorKind::InvalidData)?
+            .to_owned())
+    }
+}
+
+impl Debug for PathBuf {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        Debug::fmt(&**self, f)
+    }
+}
+
+impl Display for PathBuf {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        Display::fmt(&**self, f)
+    }
+}
+
+impl PathBuf {
+    pub fn new() -> PathBuf {
+        Self::default()
+    }
+
+    pub fn with_capacity(capacity: usize) -> PathBuf {
+        // SAFETY: The empty path is a valid path.
+        Self {
+            inner: Vec::with_capacity(capacity),
+        }
+    }
+
+    /// Adjoins `name` to self.
+    pub fn try_push(&mut self, name: &[u8]) -> Result<(), std::io::Error> {
+        validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?;
+
+        if !self.inner.is_empty() {
+            self.inner.push(b'/');
+        }
+
+        self.inner.extend_from_slice(name);
+
+        Ok(())
+    }
+
+    /// Convert a byte vector to a PathBuf, without checking validity.
+    unsafe fn from_bytes_unchecked(bytes: Vec<u8>) -> PathBuf {
+        PathBuf { inner: bytes }
+    }
+
+    /// Convert from a [&std::path::Path] to [Self].
+    ///
+    /// - Self uses `/` as path separator.
+    /// - Absolute paths are always rejected, are are these with custom prefixes.
+    /// - Repeated separators are deduplicated.
+    /// - Occurrences of `.` are normalized away.
+    /// - A trailing slash is normalized away.
+    ///
+    /// A `canonicalize_dotdot` boolean controls whether `..` will get
+    /// canonicalized if possible, or should return an error.
+    ///
+    /// For more exotic paths, this conversion might produce different results
+    /// on different platforms, due to different underlying byte
+    /// representations, which is why it's restricted to unix for now.
+    #[cfg(unix)]
+    pub fn from_host_path(
+        host_path: &std::path::Path,
+        canonicalize_dotdot: bool,
+    ) -> Result<Self, std::io::Error> {
+        let mut p = PathBuf::with_capacity(host_path.as_os_str().len());
+
+        for component in host_path.components() {
+            match component {
+                std::path::Component::Prefix(_) | std::path::Component::RootDir => {
+                    return Err(std::io::Error::new(
+                        std::io::ErrorKind::InvalidData,
+                        "found disallowed prefix or rootdir",
+                    ))
+                }
+                std::path::Component::CurDir => continue, // ignore
+                std::path::Component::ParentDir => {
+                    if canonicalize_dotdot {
+                        // Try popping the last element from the path being constructed.
+                        // FUTUREWORK: pop method?
+                        p = p
+                            .parent()
+                            .ok_or_else(|| {
+                                std::io::Error::new(
+                                    std::io::ErrorKind::InvalidData,
+                                    "found .. going too far up",
+                                )
+                            })?
+                            .to_owned();
+                    } else {
+                        return Err(std::io::Error::new(
+                            std::io::ErrorKind::InvalidData,
+                            "found disallowed ..",
+                        ));
+                    }
+                }
+                std::path::Component::Normal(s) => {
+                    // append the new component to the path being constructed.
+                    p.try_push(s.as_encoded_bytes()).map_err(|_| {
+                        std::io::Error::new(
+                            std::io::ErrorKind::InvalidData,
+                            "encountered invalid node in sub_path component",
+                        )
+                    })?
+                }
+            }
+        }
+
+        Ok(p)
+    }
+
+    pub fn into_boxed_path(self) -> Box<Path> {
+        // SAFETY: Box<[u8]> and Box<Path> have the same representation,
+        // and PathBuf always contains a valid Path.
+        unsafe { mem::transmute(self.inner.into_boxed_slice()) }
+    }
+
+    pub fn into_bytes(self) -> Vec<u8> {
+        self.inner
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::{Path, PathBuf};
+    use bstr::ByteSlice;
+    use rstest::rstest;
+
+    // TODO: add some manual tests including invalid UTF-8 (hard to express
+    // with rstest)
+
+    #[rstest]
+    #[case::empty("", 0)]
+    #[case("a", 1)]
+    #[case("a/b", 2)]
+    #[case("a/b/c", 3)]
+    // add two slightly more cursed variants.
+    // Technically nothing prevents us from representing this with castore,
+    // but maybe we want to disallow constructing paths like this as it's a
+    // bad idea.
+    #[case::cursed("C:\\a/b", 2)]
+    #[case::cursed("\\\\tvix-store", 1)]
+    pub fn from_str(#[case] s: &str, #[case] num_components: usize) {
+        let p: PathBuf = s.parse().expect("must parse");
+
+        assert_eq!(s.as_bytes(), p.as_bytes(), "inner bytes mismatch");
+        assert_eq!(
+            num_components,
+            p.components().count(),
+            "number of components mismatch"
+        );
+    }
+
+    #[rstest]
+    #[case::absolute("/a/b")]
+    #[case::two_forward_slashes_start("//a/b")]
+    #[case::two_forward_slashes_middle("a/b//c/d")]
+    #[case::trailing_slash("a/b/")]
+    #[case::dot(".")]
+    #[case::dotdot("..")]
+    #[case::dot_start("./a")]
+    #[case::dotdot_start("../a")]
+    #[case::dot_middle("a/./b")]
+    #[case::dotdot_middle("a/../b")]
+    #[case::dot_end("a/b/.")]
+    #[case::dotdot_end("a/b/..")]
+    #[case::null("fo\0o")]
+    pub fn from_str_fail(#[case] s: &str) {
+        s.parse::<PathBuf>().expect_err("must fail");
+    }
+
+    #[rstest]
+    #[case("foo", "")]
+    #[case("foo/bar", "foo")]
+    #[case("foo2/bar2", "foo2")]
+    #[case("foo/bar/baz", "foo/bar")]
+    pub fn parent(#[case] p: PathBuf, #[case] exp_parent: PathBuf) {
+        assert_eq!(Some(&*exp_parent), p.parent());
+    }
+
+    #[rstest]
+    pub fn no_parent() {
+        assert!(Path::ROOT.parent().is_none());
+    }
+
+    #[rstest]
+    #[case("a", "b", "a/b")]
+    #[case("a", "b", "a/b")]
+    pub fn join_push(#[case] mut p: PathBuf, #[case] name: &str, #[case] exp_p: PathBuf) {
+        assert_eq!(exp_p, p.try_join(name.as_bytes()).expect("join failed"));
+        p.try_push(name.as_bytes()).expect("push failed");
+        assert_eq!(exp_p, p);
+    }
+
+    #[rstest]
+    #[case("a", "/")]
+    #[case("a", "")]
+    #[case("a", "b/c")]
+    #[case("", "/")]
+    #[case("", "")]
+    #[case("", "b/c")]
+    #[case("", ".")]
+    #[case("", "..")]
+    pub fn join_push_fail(#[case] mut p: PathBuf, #[case] name: &str) {
+        p.try_join(name.as_bytes())
+            .expect_err("join succeeded unexpectedly");
+        p.try_push(name.as_bytes())
+            .expect_err("push succeeded unexpectedly");
+    }
+
+    #[rstest]
+    #[case::empty("", vec![])]
+    #[case("a", vec!["a"])]
+    #[case("a/b", vec!["a", "b"])]
+    #[case("a/b/c", vec!["a","b", "c"])]
+    pub fn components(#[case] p: PathBuf, #[case] exp_components: Vec<&str>) {
+        assert_eq!(
+            exp_components,
+            p.components()
+                .map(|x| x.to_str().unwrap())
+                .collect::<Vec<_>>()
+        );
+    }
+
+    #[rstest]
+    #[case::empty("", "", false)]
+    #[case::path("a", "a", false)]
+    #[case::path2("a/b", "a/b", false)]
+    #[case::double_slash_middle("a//b", "a/b", false)]
+    #[case::dot(".", "", false)]
+    #[case::dot_start("./a/b", "a/b", false)]
+    #[case::dot_middle("a/./b", "a/b", false)]
+    #[case::dot_end("a/b/.", "a/b", false)]
+    #[case::trailing_slash("a/b/", "a/b", false)]
+    #[case::dotdot_canonicalize("a/..", "", true)]
+    #[case::dotdot_canonicalize2("a/../b", "b", true)]
+    #[cfg_attr(unix, case::faux_prefix("\\\\nix-store", "\\\\nix-store", false))]
+    #[cfg_attr(unix, case::faux_letter("C:\\foo.txt", "C:\\foo.txt", false))]
+    pub fn from_host_path(
+        #[case] host_path: std::path::PathBuf,
+        #[case] exp_path: PathBuf,
+        #[case] canonicalize_dotdot: bool,
+    ) {
+        let p = PathBuf::from_host_path(&host_path, canonicalize_dotdot).expect("must succeed");
+
+        assert_eq!(exp_path, p);
+    }
+
+    #[rstest]
+    #[case::absolute("/", false)]
+    #[case::dotdot_root("..", false)]
+    #[case::dotdot_root_canonicalize("..", true)]
+    #[case::dotdot_root_no_canonicalize("a/..", false)]
+    #[case::invalid_name("foo/bar\0", false)]
+    // #[cfg_attr(windows, case::prefix("\\\\nix-store", false))]
+    // #[cfg_attr(windows, case::letter("C:\\foo.txt", false))]
+    pub fn from_host_path_fail(
+        #[case] host_path: std::path::PathBuf,
+        #[case] canonicalize_dotdot: bool,
+    ) {
+        PathBuf::from_host_path(&host_path, canonicalize_dotdot).expect_err("must fail");
+    }
+}
diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs
index 39c1bcc6fa..5374e3ae5a 100644
--- a/tvix/castore/src/proto/mod.rs
+++ b/tvix/castore/src/proto/mod.rs
@@ -66,7 +66,7 @@ pub enum ValidateStatBlobResponseError {
 
 /// Checks a Node name for validity as an intermediate node.
 /// We disallow slashes, null bytes, '.', '..' and the empty string.
-fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> {
+pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> {
     if name.is_empty()
         || name == b".."
         || name == b"."
diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs
index 436e895863..5635f446b9 100644
--- a/tvix/cli/src/main.rs
+++ b/tvix/cli/src/main.rs
@@ -5,6 +5,7 @@ use std::{fs, path::PathBuf};
 use tracing::Level;
 use tracing_subscriber::fmt::writer::MakeWriterExt;
 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
+use tracing_subscriber::{EnvFilter, Layer};
 use tvix_build::buildservice;
 use tvix_eval::builtins::impure_builtins;
 use tvix_eval::observer::{DisassemblingObserver, TracingObserver};
@@ -229,7 +230,13 @@ fn main() {
     let subscriber = tracing_subscriber::registry().with(
         tracing_subscriber::fmt::Layer::new()
             .with_writer(std::io::stderr.with_max_level(level))
-            .pretty(),
+            .compact()
+            .with_filter(
+                EnvFilter::builder()
+                    .with_default_directive(level.into())
+                    .from_env()
+                    .expect("invalid RUST_LOG"),
+            ),
     );
     subscriber
         .try_init()
diff --git a/tvix/crate-hashes.json b/tvix/crate-hashes.json
index ca45e43176..2c1e740cb9 100644
--- a/tvix/crate-hashes.json
+++ b/tvix/crate-hashes.json
@@ -1,4 +1,4 @@
 {
-  "bigtable_rs 0.2.9 (git+https://github.com/flokli/bigtable_rs?rev=0af404741dfc40eb9fa99cf4d4140a09c5c20df7#0af404741dfc40eb9fa99cf4d4140a09c5c20df7)": "1njjam1lx2xlnm7a41lga8601vmjgqz0fvc77x24gd04pc7avxll",
-  "wu-manber 0.1.0 (git+https://github.com/tvlfyi/wu-manber.git#0d5b22bea136659f7de60b102a7030e0daaa503d)": "1zhk83lbq99xzyjwphv2qrb8f8qgfqwa5bbbvyzm0z0bljsjv0pd"
+  "git+https://github.com/flokli/bigtable_rs?rev=0af404741dfc40eb9fa99cf4d4140a09c5c20df7#0.2.9": "1njjam1lx2xlnm7a41lga8601vmjgqz0fvc77x24gd04pc7avxll",
+  "git+https://github.com/tvlfyi/wu-manber.git#wu-manber@0.1.0": "1zhk83lbq99xzyjwphv2qrb8f8qgfqwa5bbbvyzm0z0bljsjv0pd"
 }
\ No newline at end of file
diff --git a/tvix/eval/docs/bindings.md b/tvix/eval/docs/bindings.md
new file mode 100644
index 0000000000..2b062cb13d
--- /dev/null
+++ b/tvix/eval/docs/bindings.md
@@ -0,0 +1,133 @@
+Compilation of bindings
+=======================
+
+Compilation of Nix bindings is one of the most mind-bending parts of Nix
+evaluation. The implementation of just the compilation is currently almost 1000
+lines of code, excluding the various insane test cases we dreamt up for it.
+
+## What is a binding?
+
+In short, any attribute set or `let`-expression. Tvix currently does not treat
+formals in function parameters (e.g. `{ name ? "fred" }: ...`) the same as these
+bindings.
+
+They have two very difficult features:
+
+1. Keys can mutually refer to each other in `rec` sets or `let`-bindings,
+   including out of definition order.
+2. Attribute sets can be nested, and parts of one attribute set can be defined
+   in multiple separate bindings.
+
+Tvix resolves as much of this logic statically (i.e. at compile-time) as
+possible, but the procedure is quite complicated.
+
+## High-level concept
+
+The idea behind the way we compile bindings is to fully resolve nesting
+statically, and use the usual mechanisms (i.e. recursion/thunking/value
+capturing) for resolving dynamic values.
+
+This is done by compiling bindings in several phases:
+
+1. An initial compilation phase *only* for plain inherit statements (i.e.
+   `inherit name;`), *not* for namespaced inherits (i.e. `inherit (from)
+   name;`).
+
+2. A declaration-only phase, in which we use the compiler's scope tracking logic
+   to calculate the physical runtime stack indices (further referred to as
+   "stack slots" or just "slots") that all values will end up in.
+
+   In this phase, whenever we encounter a nested attribute set, it is merged
+   into a custom data structure that acts like a synthetic AST node.
+
+   This can be imagined similar to a rewrite like this:
+
+   ```nix
+   # initial code:
+   {
+       a.b = 1;
+       a.c = 2;
+   }
+
+   # rewritten form:
+   {
+       a = {
+           b = 1;
+           c = 2;
+       };
+   }
+   ```
+
+   The rewrite applies to attribute sets and `let`-bindings alike.
+
+   At the end of this phase, we know the stack slots of all namespaces for
+   inheriting from, all values inherited from them, and all values (and
+   optionall keys) of bindings at the current level.
+
+   Only statically known keys are actually merged, so any dynamic keys that
+   conflict will lead to a "key already defined" error at runtime.
+
+3. A compilation phase, in which all values (and, when necessary, keys) are
+   actually compiled. In this phase the custom data structure used for merging
+   is encountered when compiling values.
+
+   As this data structure acts like an AST node, the process begins recursively
+   for each nested attribute set.
+
+At the end of this process we have bytecode that leaves the required values (and
+optionally keys) on the stack. In the case of attribute sets, a final operation
+is emitted that constructs the actual attribute set structure at runtime. For
+`let`-bindings a final operation is emitted that removes these locals from the
+stack when the scope ends.
+
+## Moving parts
+
+WARNING: This documents the *current* implementation. If you only care about the
+conceptual aspects, see above.
+
+There's a few types involved:
+
+* `PeekableAttrs`: peekable iterator over an attribute path (e.g. `a.b.c`)
+* `BindingsKind`: enum defining the kind of bindings (attrs/recattrs/let)
+* `AttributeSet`: struct holding the bindings kind, the AST nodes with inherits
+  (both namespaced and not), and an internal representation of bindings
+  (essentially a vector of tuples of the peekable attrs and the expression to
+  compile for the value).
+* `Binding`: enum describing the kind of binding (namespaced inherit, attribute
+  set, plain binding of *any other value type*)
+* `KeySlot`: enum describing the location in which a key slot is placed at
+  runtime (nowhere, statically known value in a slot, dynamic value in a slot)
+* `TrackedBinding`: struct representing statically known information about a
+  single binding (its key slot, value slot and `Binding`)
+* `TrackedBindings`: vector of tracked bindings, which implements logic for
+  merging attribute sets together
+
+And quite a few methods on `Compiler`:
+
+* `compile_bindings`: entry point for compiling anything that looks like a
+  binding, this calls out to the functions below.
+* `compile_plain_inherits`: takes all inherits of a bindings node and compiles
+  the ones that are trivial to compile (i.e. just plain inherits without a
+  namespace). The `rnix` parser does not represent namespaced/plain inherits in
+  different nodes, so this function also aggregates the namespaced inherits and
+  returns them for further use
+* `declare_namespaced_inherits`: passes over all namespaced inherits and
+  declares them on the locals stack, as well as inserts them into the provided
+  `TrackedBindings`
+* `declare_bindings`: declares all regular key/value bindings in a bindings
+  scope, but without actually compiling their keys or values.
+
+  There's a lot of heavy lifting going on here:
+
+  1. It invokes the various pieces of logic responsible for merging nested
+     attribute sets together, creating intermediate data structures in the value
+     slots of bindings that can be recursively processed the same way.
+  2. It decides on the key slots of expressions based on the kind of bindings,
+     and the type of expression providing the key.
+* `bind_values`: runs the actual compilation of values. Notably this function is
+  responsible for recursively compiling merged attribute sets when it encounters
+  a `Binding::Set` (on which it invokes `compile_bindings` itself).
+
+In addition to these several methods (such as `compile_attr_set`,
+`compile_let_in`, ...) invoke the binding-kind specific logic and then call out
+to the functions above.
diff --git a/tvix/eval/src/builtins/impure.rs b/tvix/eval/src/builtins/impure.rs
index 18403fe5d8..c82b910f5f 100644
--- a/tvix/eval/src/builtins/impure.rs
+++ b/tvix/eval/src/builtins/impure.rs
@@ -37,7 +37,7 @@ mod impure_builtins {
             Ok(p) => p,
         };
         let r = generators::request_open_file(&co, path).await;
-        Ok(hash_nix_string(algo.to_str()?, r).map(Value::from)?)
+        hash_nix_string(algo.to_str()?, r).map(Value::from)
     }
 
     #[builtin("pathExists")]
diff --git a/tvix/eval/src/vm/mod.rs b/tvix/eval/src/vm/mod.rs
index c10b79cd99..5c244cc3ca 100644
--- a/tvix/eval/src/vm/mod.rs
+++ b/tvix/eval/src/vm/mod.rs
@@ -1148,7 +1148,7 @@ where
                     let mut captured_with_stack = frame
                         .upvalues
                         .with_stack()
-                        .map(Clone::clone)
+                        .cloned()
                         // ... or make an empty one if there isn't one already.
                         .unwrap_or_else(|| Vec::with_capacity(self.with_stack.len()));
 
diff --git a/tvix/eval/tests/nix_oracle.rs b/tvix/eval/tests/nix_oracle.rs
index 6bab75cfd9..5a5cc0a822 100644
--- a/tvix/eval/tests/nix_oracle.rs
+++ b/tvix/eval/tests/nix_oracle.rs
@@ -30,7 +30,14 @@ fn nix_eval(expr: &str, strictness: Strictness) -> String {
         .arg(format!("({expr})"))
         .env(
             "NIX_REMOTE",
-            format!("local?root={}", store_dir.path().display()),
+            format!(
+                "local?root={}",
+                store_dir
+                    .path()
+                    .canonicalize()
+                    .expect("valid path")
+                    .display()
+            ),
         )
         .output()
         .unwrap();
diff --git a/tvix/glue/src/builtins/derivation.rs b/tvix/glue/src/builtins/derivation.rs
index 8c7df96f91..a7742ae40a 100644
--- a/tvix/glue/src/builtins/derivation.rs
+++ b/tvix/glue/src/builtins/derivation.rs
@@ -457,55 +457,59 @@ pub(crate) mod derivation_builtins {
         drv.validate(false)
             .map_err(DerivationError::InvalidDerivation)?;
 
-        // Calculate the derivation_or_fod_hash for the current derivation.
-        // This one is still intermediate (so not added to known_paths)
-        let derivation_or_fod_hash_tmp = drv.derivation_or_fod_hash(|drv_path| {
-            known_paths
-                .get_hash_derivation_modulo(&drv_path.to_owned())
-                .unwrap_or_else(|| panic!("{} not found", drv_path))
-                .to_owned()
-        });
+        // Calculate the hash_derivation_modulo for the current derivation..
+        debug_assert!(
+            drv.outputs.values().all(|output| { output.path.is_none() }),
+            "outputs should still be unset"
+        );
 
         // Mutate the Derivation struct and set output paths
-        drv.calculate_output_paths(name, &derivation_or_fod_hash_tmp)
-            .map_err(DerivationError::InvalidDerivation)?;
+        drv.calculate_output_paths(
+            name,
+            // This one is still intermediate (so not added to known_paths),
+            // as the outputs are still unset.
+            &drv.hash_derivation_modulo(|drv_path| {
+                *known_paths
+                    .get_hash_derivation_modulo(&drv_path.to_owned())
+                    .unwrap_or_else(|| panic!("{} not found", drv_path))
+            }),
+        )
+        .map_err(DerivationError::InvalidDerivation)?;
 
         let drv_path = drv
             .calculate_derivation_path(name)
             .map_err(DerivationError::InvalidDerivation)?;
 
-        // TODO: avoid cloning
-        known_paths.add_derivation(drv_path.clone(), drv.clone());
-
-        let mut new_attrs: Vec<(String, NixString)> = drv
-            .outputs
-            .into_iter()
-            .map(|(name, output)| {
-                (
-                    name.clone(),
+        // Assemble the attrset to return from this builtin.
+        let out = Value::Attrs(Box::new(NixAttrs::from_iter(
+            drv.outputs
+                .iter()
+                .map(|(name, output)| {
+                    (
+                        name.clone(),
+                        NixString::new_context_from(
+                            NixContextElement::Single {
+                                name: name.clone(),
+                                derivation: drv_path.to_absolute_path(),
+                            }
+                            .into(),
+                            output.path.as_ref().unwrap().to_absolute_path(),
+                        ),
+                    )
+                })
+                .chain(std::iter::once((
+                    "drvPath".to_owned(),
                     NixString::new_context_from(
-                        NixContextElement::Single {
-                            name,
-                            derivation: drv_path.to_absolute_path(),
-                        }
-                        .into(),
-                        output.path.unwrap().to_absolute_path(),
+                        NixContextElement::Derivation(drv_path.to_absolute_path()).into(),
+                        drv_path.to_absolute_path(),
                     ),
-                )
-            })
-            .collect();
-
-        new_attrs.push((
-            "drvPath".to_string(),
-            NixString::new_context_from(
-                NixContextElement::Derivation(drv_path.to_absolute_path()).into(),
-                drv_path.to_absolute_path(),
-            ),
-        ));
-
-        Ok(Value::Attrs(Box::new(NixAttrs::from_iter(
-            new_attrs.into_iter(),
-        ))))
+                ))),
+        )));
+
+        // Register the Derivation in known_paths.
+        known_paths.add_derivation(drv_path, drv);
+
+        Ok(out)
     }
 
     #[builtin("toFile")]
diff --git a/tvix/glue/src/builtins/errors.rs b/tvix/glue/src/builtins/errors.rs
index c05d366f13..f6d5745c56 100644
--- a/tvix/glue/src/builtins/errors.rs
+++ b/tvix/glue/src/builtins/errors.rs
@@ -6,6 +6,7 @@ use nix_compat::{
 use reqwest::Url;
 use std::rc::Rc;
 use thiserror::Error;
+use tvix_castore::import;
 
 /// Errors related to derivation construction
 #[derive(Debug, Error)]
@@ -52,10 +53,7 @@ pub enum FetcherError {
     Io(#[from] std::io::Error),
 
     #[error(transparent)]
-    Import(#[from] tvix_castore::import::Error),
-
-    #[error(transparent)]
-    ImportArchive(#[from] tvix_castore::import::archive::Error),
+    Import(#[from] tvix_castore::import::IngestionError<import::archive::Error>),
 
     #[error("Error calculating store path for fetcher output: {0}")]
     StorePath(#[from] BuildStorePathError),
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs
index 6814781df3..219695b69f 100644
--- a/tvix/glue/src/builtins/import.rs
+++ b/tvix/glue/src/builtins/import.rs
@@ -95,9 +95,9 @@ async fn filtered_ingest(
         );
         ingest_entries(&state.directory_service, entries)
             .await
-            .map_err(|err| ErrorKind::IO {
+            .map_err(|e| ErrorKind::IO {
                 path: Some(path.to_path_buf()),
-                error: Rc::new(err.into()),
+                error: Rc::new(std::io::Error::new(std::io::ErrorKind::Other, e)),
             })
     })
 }
diff --git a/tvix/glue/src/builtins/mod.rs b/tvix/glue/src/builtins/mod.rs
index 4081489e0e..0c7bcc880a 100644
--- a/tvix/glue/src/builtins/mod.rs
+++ b/tvix/glue/src/builtins/mod.rs
@@ -739,6 +739,7 @@ mod tests {
         false
     )]
     fn builtins_filter_source_unsupported_files(#[case] code: &str, #[case] exp_success: bool) {
+        use nix::errno::Errno;
         use nix::sys::stat;
         use nix::unistd;
         use std::os::unix::net::UnixListener;
@@ -765,6 +766,15 @@ mod tests {
             stat::Mode::S_IRWXU,
             0,
         )
+        .inspect_err(|e| {
+            if *e == Errno::EPERM {
+                eprintln!(
+                    "\
+Missing permissions to create a character device node with mknod(2).
+Please run this test as root or set CAP_MKNOD."
+                );
+            }
+        })
         .expect("Failed to create a character device node");
 
         let code_replaced = code.replace("@fixtures", &temp.path().to_string_lossy());
diff --git a/tvix/glue/src/decompression.rs b/tvix/glue/src/fetchers/decompression.rs
index 11dc9d9835..f96fa60e34 100644
--- a/tvix/glue/src/decompression.rs
+++ b/tvix/glue/src/fetchers/decompression.rs
@@ -204,9 +204,9 @@ mod tests {
     }
 
     #[rstest]
-    #[case::gzip(include_bytes!("tests/blob.tar.gz"))]
-    #[case::bzip2(include_bytes!("tests/blob.tar.bz2"))]
-    #[case::xz(include_bytes!("tests/blob.tar.xz"))]
+    #[case::gzip(include_bytes!("../tests/blob.tar.gz"))]
+    #[case::bzip2(include_bytes!("../tests/blob.tar.bz2"))]
+    #[case::xz(include_bytes!("../tests/blob.tar.xz"))]
     #[tokio::test]
     async fn compressed_tar(#[case] data: &[u8]) {
         let reader = DecompressedReader::new(BufReader::new(data));
diff --git a/tvix/glue/src/fetchers.rs b/tvix/glue/src/fetchers/mod.rs
index 7560c447d8..342dfd84e8 100644
--- a/tvix/glue/src/fetchers.rs
+++ b/tvix/glue/src/fetchers/mod.rs
@@ -17,7 +17,10 @@ use tvix_castore::{
 use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
 use url::Url;
 
-use crate::{builtins::FetcherError, decompression::DecompressedReader};
+use crate::builtins::FetcherError;
+
+mod decompression;
+use decompression::DecompressedReader;
 
 /// Representing options for doing a fetch.
 #[derive(Clone, Eq, PartialEq)]
@@ -28,7 +31,8 @@ pub enum Fetch {
     URL(Url, Option<NixHash>),
 
     /// Fetch a tarball from the given URL and unpack.
-    /// The file must be a tape archive (.tar) compressed with gzip, bzip2 or xz.
+    /// The file must be a tape archive (.tar), optionally compressed with gzip,
+    /// bzip2 or xz.
     /// The top-level path component of the files in the tarball is removed,
     /// so it is best if the tarball contains a single directory at top level.
     /// Optionally, a sha256 digest can be provided to verify the unpacked
@@ -56,10 +60,10 @@ fn redact_url(url: &Url) -> Url {
 impl std::fmt::Debug for Fetch {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
-            Fetch::URL(url, nixhash) => {
+            Fetch::URL(url, exp_hash) => {
                 let url = redact_url(url);
-                if let Some(nixhash) = nixhash {
-                    write!(f, "URL [url: {}, exp_hash: Some({})]", &url, nixhash)
+                if let Some(exp_hash) = exp_hash {
+                    write!(f, "URL [url: {}, exp_hash: Some({})]", &url, exp_hash)
                 } else {
                     write!(f, "URL [url: {}, exp_hash: None]", &url)
                 }
@@ -168,8 +172,8 @@ async fn hash<D: Digest + std::io::Write>(
 
 impl<BS, DS, PS> Fetcher<BS, DS, PS>
 where
-    BS: AsRef<(dyn BlobService + 'static)> + Clone + Send + Sync + 'static,
-    DS: AsRef<(dyn DirectoryService + 'static)>,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService + Clone,
     PS: PathInfoService,
 {
     /// Ingest the data from a specified [Fetch].
@@ -178,7 +182,7 @@ where
     /// didn't match the previously communicated hash contained inside the FetchArgs.
     pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> {
         match fetch {
-            Fetch::URL(url, exp_nixhash) => {
+            Fetch::URL(url, exp_hash) => {
                 // Construct a AsyncRead reading from the data as its downloaded.
                 let mut r = self.download(url.clone()).await?;
 
@@ -188,7 +192,7 @@ where
                 // Copy the contents from the download reader to the blob writer.
                 // Calculate the digest of the file received, depending on the
                 // communicated expected hash (or sha256 if none provided).
-                let (actual_nixhash, blob_size) = match exp_nixhash
+                let (actual_hash, blob_size) = match exp_hash
                     .as_ref()
                     .map(NixHash::algo)
                     .unwrap_or_else(|| HashAlgo::Sha256)
@@ -209,12 +213,12 @@ where
                     )?,
                 };
 
-                if let Some(exp_nixhash) = exp_nixhash {
-                    if exp_nixhash != actual_nixhash {
+                if let Some(exp_hash) = exp_hash {
+                    if exp_hash != actual_hash {
                         return Err(FetcherError::HashMismatch {
                             url,
-                            wanted: exp_nixhash,
-                            got: actual_nixhash,
+                            wanted: exp_hash,
+                            got: actual_hash,
                         });
                     }
                 }
@@ -227,7 +231,7 @@ where
                         size: blob_size,
                         executable: false,
                     }),
-                    CAHash::Flat(actual_nixhash),
+                    CAHash::Flat(actual_hash),
                     blob_size,
                 ))
             }
@@ -243,7 +247,7 @@ where
                 // Ingest the archive, get the root node
                 let node = tvix_castore::import::archive::ingest_archive(
                     self.blob_service.clone(),
-                    &self.directory_service,
+                    self.directory_service.clone(),
                     archive,
                 )
                 .await?;
@@ -379,12 +383,12 @@ mod tests {
         #[test]
         fn fetchurl_store_path() {
             let url = Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap();
-            let exp_nixhash = NixHash::Sha256(
+            let exp_hash = NixHash::Sha256(
                 nixbase32::decode_fixed("0nawkl04sj7psw6ikzay7kydj3dhd0fkwghcsf5rzaw4bmp4kbax")
                     .unwrap(),
             );
 
-            let fetch = Fetch::URL(url, Some(exp_nixhash));
+            let fetch = Fetch::URL(url, Some(exp_hash));
             assert_eq!(
                 "06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch",
                 &fetch
diff --git a/tvix/glue/src/known_paths.rs b/tvix/glue/src/known_paths.rs
index c95065592b..290c9d5b69 100644
--- a/tvix/glue/src/known_paths.rs
+++ b/tvix/glue/src/known_paths.rs
@@ -73,7 +73,7 @@ impl KnownPaths {
         }
 
         // compute the hash derivation modulo
-        let hash_derivation_modulo = drv.derivation_or_fod_hash(|drv_path| {
+        let hash_derivation_modulo = drv.hash_derivation_modulo(|drv_path| {
             self.get_hash_derivation_modulo(&drv_path.to_owned())
                 .unwrap_or_else(|| panic!("{} not found", drv_path))
                 .to_owned()
diff --git a/tvix/glue/src/lib.rs b/tvix/glue/src/lib.rs
index 8528f09e52..2e5a3be103 100644
--- a/tvix/glue/src/lib.rs
+++ b/tvix/glue/src/lib.rs
@@ -6,7 +6,6 @@ pub mod tvix_build;
 pub mod tvix_io;
 pub mod tvix_store_io;
 
-mod decompression;
 #[cfg(test)]
 mod tests;
 
diff --git a/tvix/glue/src/tests/mod.rs b/tvix/glue/src/tests/mod.rs
index e66f484e3d..8e1572b6e3 100644
--- a/tvix/glue/src/tests/mod.rs
+++ b/tvix/glue/src/tests/mod.rs
@@ -14,6 +14,8 @@ use rstest::rstest;
 
 use crate::{
     builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins},
+    configure_nix_path,
+    tvix_io::TvixIO,
     tvix_store_io::TvixStoreIO,
 };
 
@@ -50,12 +52,17 @@ fn eval_test(code_path: PathBuf, expect_success: bool) {
         Arc::new(DummyBuildService::default()),
         tokio_runtime.handle().clone(),
     ));
-    let mut eval = tvix_eval::Evaluation::new(tvix_store_io.clone() as Rc<dyn EvalIO>, true);
+    // Wrap with TvixIO, so <nix/fetchurl.nix can be imported.
+    let mut eval = tvix_eval::Evaluation::new(
+        Box::new(TvixIO::new(tvix_store_io.clone() as Rc<dyn EvalIO>)) as Box<dyn EvalIO>,
+        true,
+    );
 
     eval.strict = true;
     add_derivation_builtins(&mut eval, tvix_store_io.clone());
     add_fetcher_builtins(&mut eval, tvix_store_io.clone());
     add_import_builtins(&mut eval, tvix_store_io.clone());
+    configure_nix_path(&mut eval, &None);
 
     let result = eval.evaluate(code, Some(code_path.clone()));
     let failed = match result.value {
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 1f709906de..7daefffe82 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -305,6 +305,9 @@ impl TvixStoreIO {
         };
 
         // now with the root_node and sub_path, descend to the node requested.
+        // We convert sub_path to the castore model here.
+        let sub_path = tvix_castore::PathBuf::from_host_path(sub_path, true)?;
+
         directoryservice::descend_to(&self.directory_service, root_node, sub_path)
             .await
             .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))
diff --git a/tvix/nar-bridge/pkg/http/narinfo_get.go b/tvix/nar-bridge/pkg/http/narinfo_get.go
index 98d85744d8..d43cb58078 100644
--- a/tvix/nar-bridge/pkg/http/narinfo_get.go
+++ b/tvix/nar-bridge/pkg/http/narinfo_get.go
@@ -96,37 +96,42 @@ func renderNarinfo(
 }
 
 func registerNarinfoGet(s *Server) {
-	// GET $outHash.narinfo looks up the PathInfo from the tvix-store,
-	// and then render a .narinfo file to the client.
-	// It will keep the PathInfo in the lookup map,
-	// so a subsequent GET /nar/ $narhash.nar request can find it.
-	s.handler.Get("/{outputhash:^["+nixbase32.Alphabet+"]{32}}.narinfo", func(w http.ResponseWriter, r *http.Request) {
-		defer r.Body.Close()
-
-		ctx := r.Context()
-		log := log.WithField("outputhash", chi.URLParamFromCtx(ctx, "outputhash"))
-
-		// parse the output hash sent in the request URL
-		outputHash, err := nixbase32.DecodeString(chi.URLParamFromCtx(ctx, "outputhash"))
-		if err != nil {
-			log.WithError(err).Error("unable to decode output hash from url")
-			w.WriteHeader(http.StatusBadRequest)
-			_, err := w.Write([]byte("unable to decode output hash from url"))
+	// GET/HEAD $outHash.narinfo looks up the PathInfo from the tvix-store,
+	// and, if it's a GET request, render a .narinfo file to the client.
+	// In both cases it will keep the PathInfo in the lookup map,
+	// so a subsequent GET/HEAD /nar/ $narhash.nar request can find it.
+	genNarinfoHandler := func(isHead bool) func(w http.ResponseWriter, r *http.Request) {
+		return func(w http.ResponseWriter, r *http.Request) {
+			defer r.Body.Close()
+
+			ctx := r.Context()
+			log := log.WithField("outputhash", chi.URLParamFromCtx(ctx, "outputhash"))
+
+			// parse the output hash sent in the request URL
+			outputHash, err := nixbase32.DecodeString(chi.URLParamFromCtx(ctx, "outputhash"))
 			if err != nil {
-				log.WithError(err).Errorf("unable to write error message to client")
+				log.WithError(err).Error("unable to decode output hash from url")
+				w.WriteHeader(http.StatusBadRequest)
+				_, err := w.Write([]byte("unable to decode output hash from url"))
+				if err != nil {
+					log.WithError(err).Errorf("unable to write error message to client")
+				}
+
+				return
 			}
 
-			return
-		}
-
-		err = renderNarinfo(ctx, log, s.pathInfoServiceClient, &s.narDbMu, s.narDb, outputHash, w, false)
-		if err != nil {
-			if errors.Is(err, fs.ErrNotExist) {
-				w.WriteHeader(http.StatusNotFound)
-			} else {
-				log.WithError(err).Warn("unable to render narinfo")
-				w.WriteHeader(http.StatusInternalServerError)
+			err = renderNarinfo(ctx, log, s.pathInfoServiceClient, &s.narDbMu, s.narDb, outputHash, w, isHead)
+			if err != nil {
+				if errors.Is(err, fs.ErrNotExist) {
+					w.WriteHeader(http.StatusNotFound)
+				} else {
+					log.WithError(err).Warn("unable to render narinfo")
+					w.WriteHeader(http.StatusInternalServerError)
+				}
 			}
 		}
-	})
+	}
+
+	s.handler.Get("/{outputhash:^["+nixbase32.Alphabet+"]{32}}.narinfo", genNarinfoHandler(false))
+	s.handler.Head("/{outputhash:^["+nixbase32.Alphabet+"]{32}}.narinfo", genNarinfoHandler(true))
 }
diff --git a/tvix/nix-compat/src/derivation/mod.rs b/tvix/nix-compat/src/derivation/mod.rs
index 07da127ed0..6e12e3ea86 100644
--- a/tvix/nix-compat/src/derivation/mod.rs
+++ b/tvix/nix-compat/src/derivation/mod.rs
@@ -188,11 +188,12 @@ impl Derivation {
     ///    `fixed:out:${algo}:${digest}:${fodPath}` string is hashed instead of
     ///    the A-Term.
     ///
-    /// If the derivation is not a fixed derivation, it's up to the caller of
-    /// this function to provide a lookup function to lookup these calculation
-    /// results of parent derivations at `fn_get_derivation_or_fod_hash` (by
-    /// drv path).
-    pub fn derivation_or_fod_hash<F>(&self, fn_get_derivation_or_fod_hash: F) -> [u8; 32]
+    /// It's up to the caller of this function to provide a (infallible) lookup
+    /// function to query [hash_derivation_modulo] of direct input derivations,
+    /// by their [StorePathRef].
+    /// It will only be called in case the derivation is not a fixed-output
+    /// derivation.
+    pub fn hash_derivation_modulo<F>(&self, fn_lookup_hash_derivation_modulo: F) -> [u8; 32]
     where
         F: Fn(&StorePathRef) -> [u8; 32],
     {
@@ -200,16 +201,16 @@ impl Derivation {
         // Non-Fixed-output derivations return the sha256 digest of the ATerm
         // notation, but with all input_derivation paths replaced by a recursive
         // call to this function.
-        // We use fn_get_derivation_or_fod_hash here, so callers can precompute this.
+        // We call [fn_lookup_hash_derivation_modulo] rather than recursing
+        // ourselves, so callers can precompute this.
         self.fod_digest().unwrap_or({
-            // For each input_derivation, look up the
-            // derivation_or_fod_hash, and replace the derivation path with
-            // it's HEXLOWER digest.
+            // For each input_derivation, look up the hash derivation modulo,
+            // and replace the derivation path in the aterm with it's HEXLOWER digest.
             let aterm_bytes = self.to_aterm_bytes_with_replacements(&BTreeMap::from_iter(
                 self.input_derivations
                     .iter()
                     .map(|(drv_path, output_names)| {
-                        let hash = fn_get_derivation_or_fod_hash(&drv_path.into());
+                        let hash = fn_lookup_hash_derivation_modulo(&drv_path.into());
 
                         (hash, output_names.to_owned())
                     }),
@@ -226,20 +227,22 @@ impl Derivation {
     /// and self.environment[$outputName] needs to be an empty string.
     ///
     /// Output path calculation requires knowledge of the
-    /// derivation_or_fod_hash [NixHash], which (in case of non-fixed-output
-    /// derivations) also requires knowledge of other hash_derivation_modulo
-    /// [NixHash]es.
+    /// [hash_derivation_modulo], which (in case of non-fixed-output
+    /// derivations) also requires knowledge of the [hash_derivation_modulo] of
+    /// input derivations (recursively).
     ///
-    /// We solve this by asking the caller of this function to provide the
-    /// hash_derivation_modulo of the current Derivation.
+    /// To avoid recursing and doing unnecessary calculation, we simply
+    /// ask the caller of this function to provide the result of the
+    /// [hash_derivation_modulo] call of the current [Derivation],
+    /// and leave it up to them to calculate it when needed.
     ///
-    /// On completion, self.environment[$outputName] and
-    /// self.outputs[$outputName].path are set to the calculated output path for all
+    /// On completion, `self.environment[$outputName]` and
+    /// `self.outputs[$outputName].path` are set to the calculated output path for all
     /// outputs.
     pub fn calculate_output_paths(
         &mut self,
         name: &str,
-        derivation_or_fod_hash: &[u8; 32],
+        hash_derivation_modulo: &[u8; 32],
     ) -> Result<(), DerivationError> {
         // The fingerprint and hash differs per output
         for (output_name, output) in self.outputs.iter_mut() {
@@ -250,14 +253,14 @@ impl Derivation {
 
             let path_name = output_path_name(name, output_name);
 
-            // For fixed output derivation we use the per-output info, otherwise we use the
-            // derivation hash.
+            // For fixed output derivation we use [build_ca_path], otherwise we
+            // use [build_output_path] with [hash_derivation_modulo].
             let abs_store_path = if let Some(ref hwm) = output.ca_hash {
                 build_ca_path(&path_name, hwm, Vec::<String>::new(), false).map_err(|e| {
                     DerivationError::InvalidOutputDerivationPath(output_name.to_string(), e)
                 })?
             } else {
-                build_output_path(derivation_or_fod_hash, output_name, &path_name).map_err(|e| {
+                build_output_path(hash_derivation_modulo, output_name, &path_name).map_err(|e| {
                     DerivationError::InvalidOutputDerivationPath(
                         output_name.to_string(),
                         store_path::BuildStorePathError::InvalidStorePath(e),
diff --git a/tvix/nix-compat/src/derivation/tests/mod.rs b/tvix/nix-compat/src/derivation/tests/mod.rs
index 63a65356bd..48d4e8926a 100644
--- a/tvix/nix-compat/src/derivation/tests/mod.rs
+++ b/tvix/nix-compat/src/derivation/tests/mod.rs
@@ -164,7 +164,7 @@ fn derivation_path(#[case] name: &str, #[case] expected_path: &str) {
 
 /// This trims all output paths from a Derivation struct,
 /// by setting outputs[$outputName].path and environment[$outputName] to the empty string.
-fn derivation_with_trimmed_output_paths(derivation: &Derivation) -> Derivation {
+fn derivation_without_output_paths(derivation: &Derivation) -> Derivation {
     let mut trimmed_env = derivation.environment.clone();
     let mut trimmed_outputs = derivation.outputs.clone();
 
@@ -191,13 +191,13 @@ fn derivation_with_trimmed_output_paths(derivation: &Derivation) -> Derivation {
 #[rstest]
 #[case::fixed_sha256("0hm2f1psjpcwg8fijsmr4wwxrx59s092-bar.drv", hex!("724f3e3634fce4cbbbd3483287b8798588e80280660b9a63fd13a1bc90485b33"))]
 #[case::fixed_sha1("ss2p4wmxijn652haqyd7dckxwl4c7hxx-bar.drv", hex!("c79aebd0ce3269393d4a1fde2cbd1d975d879b40f0bf40a48f550edc107fd5df"))]
-fn derivation_or_fod_hash(#[case] drv_path: &str, #[case] expected_digest: [u8; 32]) {
+fn hash_derivation_modulo_fixed(#[case] drv_path: &str, #[case] expected_digest: [u8; 32]) {
     // read in the fixture
     let json_bytes =
         fs::read(format!("{}/ok/{}.json", RESOURCES_PATHS, drv_path)).expect("unable to read JSON");
     let drv: Derivation = serde_json::from_slice(&json_bytes).expect("must deserialize");
 
-    let actual = drv.derivation_or_fod_hash(|_| panic!("must not be called"));
+    let actual = drv.hash_derivation_modulo(|_| panic!("must not be called"));
     assert_eq!(expected_digest, actual);
 }
 
@@ -224,13 +224,13 @@ fn output_paths(#[case] name: &str, #[case] drv_path_str: &str) {
     )
     .expect("must succeed");
 
-    // create a version with trimmed output paths, simulating we constructed
-    // the struct.
-    let mut derivation = derivation_with_trimmed_output_paths(&expected_derivation);
+    // create a version without output paths, simulating we constructed the
+    // struct.
+    let mut derivation = derivation_without_output_paths(&expected_derivation);
 
-    // calculate the derivation_or_fod_hash of derivation
+    // calculate the hash_derivation_modulo of Derivation
     // We don't expect the lookup function to be called for most derivations.
-    let calculated_derivation_or_fod_hash = derivation.derivation_or_fod_hash(|parent_drv_path| {
+    let actual_hash_derivation_modulo = derivation.hash_derivation_modulo(|parent_drv_path| {
         // 4wvvbi4jwn0prsdxb7vs673qa5h9gr7x-foo.drv may lookup /nix/store/0hm2f1psjpcwg8fijsmr4wwxrx59s092-bar.drv
         // ch49594n9avinrf8ip0aslidkc4lxkqv-foo.drv may lookup /nix/store/ss2p4wmxijn652haqyd7dckxwl4c7hxx-bar.drv
         if name == "foo"
@@ -255,9 +255,9 @@ fn output_paths(#[case] name: &str, #[case] drv_path_str: &str) {
 
             let drv: Derivation = serde_json::from_slice(&json_bytes).expect("must deserialize");
 
-            // calculate derivation_or_fod_hash for each parent.
+            // calculate hash_derivation_modulo for each parent.
             // This may not trigger subsequent requests, as both parents are FOD.
-            drv.derivation_or_fod_hash(|_| panic!("must not lookup"))
+            drv.hash_derivation_modulo(|_| panic!("must not lookup"))
         } else {
             // we only expect this to be called in the "foo" testcase, for the "bar derivations"
             panic!("may only be called for foo testcase on bar derivations");
@@ -265,7 +265,7 @@ fn output_paths(#[case] name: &str, #[case] drv_path_str: &str) {
     });
 
     derivation
-        .calculate_output_paths(name, &calculated_derivation_or_fod_hash)
+        .calculate_output_paths(name, &actual_hash_derivation_modulo)
         .unwrap();
 
     // The derivation should now look like it was before
@@ -343,7 +343,7 @@ fn output_path_construction() {
     // calculate bar output paths
     let bar_calc_result = bar_drv.calculate_output_paths(
         "bar",
-        &bar_drv.derivation_or_fod_hash(|_| panic!("is FOD, should not lookup")),
+        &bar_drv.hash_derivation_modulo(|_| panic!("is FOD, should not lookup")),
     );
     assert!(bar_calc_result.is_ok());
 
@@ -360,8 +360,8 @@ fn output_path_construction() {
     // now construct foo, which requires bar_drv
     // Note how we refer to the output path, drv name and replacement_str (with calculated output paths) of bar.
     let bar_output_path = &bar_drv.outputs.get("out").expect("must exist").path;
-    let bar_drv_derivation_or_fod_hash =
-        bar_drv.derivation_or_fod_hash(|_| panic!("is FOD, should not lookup"));
+    let bar_drv_hash_derivation_modulo =
+        bar_drv.hash_derivation_modulo(|_| panic!("is FOD, should not lookup"));
 
     let bar_drv_path = bar_drv
         .calculate_derivation_path("bar")
@@ -408,11 +408,11 @@ fn output_path_construction() {
     // calculate foo output paths
     let foo_calc_result = foo_drv.calculate_output_paths(
         "foo",
-        &foo_drv.derivation_or_fod_hash(|drv_path| {
+        &foo_drv.hash_derivation_modulo(|drv_path| {
             if drv_path.to_string() != "0hm2f1psjpcwg8fijsmr4wwxrx59s092-bar.drv" {
                 panic!("lookup called with unexpected drv_path: {}", drv_path);
             }
-            bar_drv_derivation_or_fod_hash
+            bar_drv_hash_derivation_modulo
         }),
     );
     assert!(foo_calc_result.is_ok());
diff --git a/tvix/nix-compat/src/nar/reader/async/mod.rs b/tvix/nix-compat/src/nar/reader/async/mod.rs
new file mode 100644
index 0000000000..aaf00faf44
--- /dev/null
+++ b/tvix/nix-compat/src/nar/reader/async/mod.rs
@@ -0,0 +1,166 @@
+use std::{
+    pin::Pin,
+    task::{self, Poll},
+};
+
+use tokio::io::{self, AsyncBufRead, AsyncRead, ErrorKind::InvalidData};
+
+// Required reading for understanding this module.
+use crate::{
+    nar::{self, wire::PadPar},
+    wire::{self, BytesReader},
+};
+
+mod read;
+#[cfg(test)]
+mod test;
+
+pub type Reader<'a> = dyn AsyncBufRead + Unpin + Send + 'a;
+
+/// Start reading a NAR file from `reader`.
+pub async fn open<'a, 'r>(reader: &'a mut Reader<'r>) -> io::Result<Node<'a, 'r>> {
+    read::token(reader, &nar::wire::TOK_NAR).await?;
+    Node::new(reader).await
+}
+
+pub enum Node<'a, 'r: 'a> {
+    Symlink {
+        target: Vec<u8>,
+    },
+    File {
+        executable: bool,
+        reader: FileReader<'a, 'r>,
+    },
+    Directory(DirReader<'a, 'r>),
+}
+
+impl<'a, 'r: 'a> Node<'a, 'r> {
+    /// Start reading a [Node], matching the next [wire::Node].
+    ///
+    /// Reading the terminating [wire::TOK_PAR] is done immediately for [Node::Symlink],
+    /// but is otherwise left to [DirReader] or [BytesReader].
+    async fn new(reader: &'a mut Reader<'r>) -> io::Result<Self> {
+        Ok(match read::tag(reader).await? {
+            nar::wire::Node::Sym => {
+                let target = wire::read_bytes(reader, 1..=nar::wire::MAX_TARGET_LEN).await?;
+
+                if target.contains(&0) {
+                    return Err(InvalidData.into());
+                }
+
+                read::token(reader, &nar::wire::TOK_PAR).await?;
+
+                Node::Symlink { target }
+            }
+            tag @ (nar::wire::Node::Reg | nar::wire::Node::Exe) => Node::File {
+                executable: tag == nar::wire::Node::Exe,
+                reader: FileReader {
+                    inner: BytesReader::new_internal(reader, ..).await?,
+                },
+            },
+            nar::wire::Node::Dir => Node::Directory(DirReader::new(reader)),
+        })
+    }
+}
+
+/// File contents, readable through the [AsyncRead] trait.
+///
+/// It comes with some caveats:
+///  * You must always read the entire file, unless you intend to abandon the entire archive reader.
+///  * You must abandon the entire archive reader upon the first error.
+///
+/// It's fine to read exactly `reader.len()` bytes without ever seeing an explicit EOF.
+pub struct FileReader<'a, 'r> {
+    inner: BytesReader<&'a mut Reader<'r>, PadPar>,
+}
+
+impl<'a, 'r> FileReader<'a, 'r> {
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    pub fn len(&self) -> u64 {
+        self.inner.len()
+    }
+}
+
+impl<'a, 'r> AsyncRead for FileReader<'a, 'r> {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut task::Context,
+        buf: &mut io::ReadBuf,
+    ) -> Poll<io::Result<()>> {
+        Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
+    }
+}
+
+/// A directory iterator, yielding a sequence of [Node]s.
+/// It must be fully consumed before reading further from the [DirReader] that produced it, if any.
+pub struct DirReader<'a, 'r> {
+    reader: &'a mut Reader<'r>,
+    /// Previous directory entry name.
+    /// We have to hang onto this to enforce name monotonicity.
+    prev_name: Option<Vec<u8>>,
+}
+
+pub struct Entry<'a, 'r> {
+    pub name: Vec<u8>,
+    pub node: Node<'a, 'r>,
+}
+
+impl<'a, 'r> DirReader<'a, 'r> {
+    fn new(reader: &'a mut Reader<'r>) -> Self {
+        Self {
+            reader,
+            prev_name: None,
+        }
+    }
+
+    /// Read the next [Entry] from the directory.
+    ///
+    /// We explicitly don't implement [Iterator], since treating this as
+    /// a regular Rust iterator will surely lead you astray.
+    ///
+    ///  * You must always consume the entire iterator, unless you abandon the entire archive reader.
+    ///  * You must abandon the entire archive reader on the first error.
+    ///  * You must abandon the directory reader upon the first [None].
+    ///  * Even if you know the amount of elements up front, you must keep reading until you encounter [None].
+    pub async fn next(&mut self) -> io::Result<Option<Entry<'_, 'r>>> {
+        // COME FROM the previous iteration: if we've already read an entry,
+        // read its terminating TOK_PAR here.
+        if self.prev_name.is_some() {
+            read::token(self.reader, &nar::wire::TOK_PAR).await?;
+        }
+
+        if let nar::wire::Entry::None = read::tag(self.reader).await? {
+            return Ok(None);
+        }
+
+        let name = wire::read_bytes(self.reader, 1..=nar::wire::MAX_NAME_LEN).await?;
+
+        if name.contains(&0) || name.contains(&b'/') || name == b"." || name == b".." {
+            return Err(InvalidData.into());
+        }
+
+        // Enforce strict monotonicity of directory entry names.
+        match &mut self.prev_name {
+            None => {
+                self.prev_name = Some(name.clone());
+            }
+            Some(prev_name) => {
+                if *prev_name >= name {
+                    return Err(InvalidData.into());
+                }
+
+                name[..].clone_into(prev_name);
+            }
+        }
+
+        read::token(self.reader, &nar::wire::TOK_NOD).await?;
+
+        Ok(Some(Entry {
+            name,
+            node: Node::new(self.reader).await?,
+        }))
+    }
+}
diff --git a/tvix/nix-compat/src/nar/reader/async/read.rs b/tvix/nix-compat/src/nar/reader/async/read.rs
new file mode 100644
index 0000000000..2adf894922
--- /dev/null
+++ b/tvix/nix-compat/src/nar/reader/async/read.rs
@@ -0,0 +1,69 @@
+use tokio::io::{
+    self, AsyncReadExt,
+    ErrorKind::{InvalidData, UnexpectedEof},
+};
+
+use crate::nar::wire::Tag;
+
+use super::Reader;
+
+/// Consume a known token from the reader.
+pub async fn token<const N: usize>(reader: &mut Reader<'_>, token: &[u8; N]) -> io::Result<()> {
+    let mut buf = [0u8; N];
+
+    // This implements something similar to [AsyncReadExt::read_exact], but verifies that
+    // the input data matches the token while we read it. These two slices respectively
+    // represent the remaining token to be verified, and the remaining input buffer.
+    let mut token = &token[..];
+    let mut buf = &mut buf[..];
+
+    while !token.is_empty() {
+        match reader.read(buf).await? {
+            0 => {
+                return Err(UnexpectedEof.into());
+            }
+            n => {
+                let (t, b);
+                (t, token) = token.split_at(n);
+                (b, buf) = buf.split_at_mut(n);
+
+                if t != b {
+                    return Err(InvalidData.into());
+                }
+            }
+        }
+    }
+
+    Ok(())
+}
+
+/// Consume a [Tag] from the reader.
+pub async fn tag<T: Tag>(reader: &mut Reader<'_>) -> io::Result<T> {
+    let mut buf = T::make_buf();
+    let buf = buf.as_mut();
+
+    // first read the known minimum lengthโ€ฆ
+    reader.read_exact(&mut buf[..T::MIN]).await?;
+
+    // then decide which tag we're expecting
+    let tag = T::from_u8(buf[T::OFF]).ok_or(InvalidData)?;
+    let (head, tail) = tag.as_bytes().split_at(T::MIN);
+
+    // make sure what we've read so far is valid
+    if buf[..T::MIN] != *head {
+        return Err(InvalidData.into());
+    }
+
+    // โ€ฆthen read the rest, if any
+    if !tail.is_empty() {
+        let rest = tail.len();
+        reader.read_exact(&mut buf[..rest]).await?;
+
+        // and make sure it's what we expect
+        if buf[..rest] != *tail {
+            return Err(InvalidData.into());
+        }
+    }
+
+    Ok(tag)
+}
diff --git a/tvix/nix-compat/src/nar/reader/async/test.rs b/tvix/nix-compat/src/nar/reader/async/test.rs
new file mode 100644
index 0000000000..58bb651fca
--- /dev/null
+++ b/tvix/nix-compat/src/nar/reader/async/test.rs
@@ -0,0 +1,310 @@
+use tokio::io::AsyncReadExt;
+
+mod nar {
+    pub use crate::nar::reader::r#async as reader;
+}
+
+#[tokio::test]
+async fn symlink() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/symlink.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Symlink { target } => {
+            assert_eq!(
+                &b"/nix/store/somewhereelse"[..],
+                &target,
+                "target must match"
+            );
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+async fn file() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/helloworld.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            assert!(!executable);
+            let mut buf = vec![];
+            reader
+                .read_to_end(&mut buf)
+                .await
+                .expect("read must succeed");
+            assert_eq!(&b"Hello World!"[..], &buf);
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+async fn complicated() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            )
+            .await;
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(&b"keep"[..], &entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(mut subdir_reader) => {
+                        {
+                            // first entry is .keep, an empty regular file.
+                            let entry = subdir_reader
+                                .next()
+                                .await
+                                .expect("next must succeed")
+                                .expect("must be some");
+
+                            must_read_file(".keep", entry).await;
+                        }
+
+                        // we must read the None
+                        assert!(
+                            subdir_reader
+                                .next()
+                                .await
+                                .expect("next must succeed")
+                                .is_none(),
+                            "keep directory contains only .keep"
+                        );
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
+                }
+            };
+
+            // reading more entries yields None (and we actually must read until this)
+            assert!(dir_reader.next().await.expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+#[should_panic]
+#[ignore = "TODO: async poisoning"]
+async fn file_read_abandoned() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            {
+                let entry = dir_reader
+                    .next()
+                    .await
+                    .expect("next must succeed")
+                    .expect("must be some");
+
+                assert_eq!(&b".keep"[..], &entry.name);
+                // don't bother to finish reading it.
+            };
+
+            // this should panic (not return an error), because we are meant to abandon the archive reader now.
+            assert!(dir_reader.next().await.expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+#[should_panic]
+#[ignore = "TODO: async poisoning"]
+async fn dir_read_abandoned() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            )
+            .await;
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(&b"keep"[..], &entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(_) => {
+                        // don't finish using it, which poisons the archive reader
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
+                }
+            };
+
+            // this should panic, because we didn't finish reading the child subdirectory
+            assert!(dir_reader.next().await.expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+#[should_panic]
+#[ignore = "TODO: async poisoning"]
+async fn dir_read_after_none() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            )
+            .await;
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(&b"keep"[..], &entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(mut subdir_reader) => {
+                        // first entry is .keep, an empty regular file.
+                        must_read_file(
+                            ".keep",
+                            subdir_reader
+                                .next()
+                                .await
+                                .expect("next must succeed")
+                                .expect("must be some"),
+                        )
+                        .await;
+
+                        // we must read the None
+                        assert!(
+                            subdir_reader
+                                .next()
+                                .await
+                                .expect("next must succeed")
+                                .is_none(),
+                            "keep directory contains only .keep"
+                        );
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
+                }
+            };
+
+            // reading more entries yields None (and we actually must read until this)
+            assert!(dir_reader.next().await.expect("must succeed").is_none());
+
+            // this should panic, because we already got a none so we're meant to stop.
+            dir_reader.next().await.unwrap();
+            unreachable!()
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+async fn must_read_file(name: &'static str, entry: nar::reader::Entry<'_, '_>) {
+    assert_eq!(name.as_bytes(), &entry.name);
+
+    match entry.node {
+        nar::reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            assert!(!executable);
+            assert_eq!(reader.read(&mut [0]).await.unwrap(), 0);
+        }
+        _ => panic!("unexpected type for {}", name),
+    }
+}
+
+fn must_be_symlink(
+    name: &'static str,
+    exp_target: &'static str,
+    entry: nar::reader::Entry<'_, '_>,
+) {
+    assert_eq!(name.as_bytes(), &entry.name);
+
+    match entry.node {
+        nar::reader::Node::Symlink { target } => {
+            assert_eq!(exp_target.as_bytes(), &target);
+        }
+        _ => panic!("unexpected type for {}", name),
+    }
+}
diff --git a/tvix/nix-compat/src/nar/reader/mod.rs b/tvix/nix-compat/src/nar/reader/mod.rs
index fa7ddc77f9..bddf175080 100644
--- a/tvix/nix-compat/src/nar/reader/mod.rs
+++ b/tvix/nix-compat/src/nar/reader/mod.rs
@@ -10,19 +10,50 @@ use std::io::{
     Read, Write,
 };
 
+#[cfg(not(debug_assertions))]
+use std::marker::PhantomData;
+
 // Required reading for understanding this module.
 use crate::nar::wire;
 
+#[cfg(feature = "async")]
+pub mod r#async;
+
 mod read;
 #[cfg(test)]
 mod test;
 
 pub type Reader<'a> = dyn BufRead + Send + 'a;
 
+struct ArchiveReader<'a, 'r> {
+    inner: &'a mut Reader<'r>,
+
+    /// In debug mode, also track when we need to abandon this archive reader.
+    /// The archive reader must be abandoned when:
+    ///   * An error is encountered at any point
+    ///   * A file or directory reader is dropped before being read entirely.
+    /// All of these checks vanish in release mode.
+    status: ArchiveReaderStatus<'a>,
+}
+
+macro_rules! try_or_poison {
+    ($it:expr, $ex:expr) => {
+        match $ex {
+            Ok(x) => x,
+            Err(e) => {
+                $it.status.poison();
+                return Err(e.into());
+            }
+        }
+    };
+}
 /// Start reading a NAR file from `reader`.
 pub fn open<'a, 'r>(reader: &'a mut Reader<'r>) -> io::Result<Node<'a, 'r>> {
     read::token(reader, &wire::TOK_NAR)?;
-    Node::new(reader)
+    Node::new(ArchiveReader {
+        inner: reader,
+        status: ArchiveReaderStatus::top(),
+    })
 }
 
 pub enum Node<'a, 'r> {
@@ -41,21 +72,24 @@ impl<'a, 'r> Node<'a, 'r> {
     ///
     /// Reading the terminating [wire::TOK_PAR] is done immediately for [Node::Symlink],
     /// but is otherwise left to [DirReader] or [FileReader].
-    fn new(reader: &'a mut Reader<'r>) -> io::Result<Self> {
-        Ok(match read::tag(reader)? {
+    fn new(mut reader: ArchiveReader<'a, 'r>) -> io::Result<Self> {
+        Ok(match read::tag(reader.inner)? {
             wire::Node::Sym => {
-                let target = read::bytes(reader, wire::MAX_TARGET_LEN)?;
+                let target =
+                    try_or_poison!(reader, read::bytes(reader.inner, wire::MAX_TARGET_LEN));
 
                 if target.is_empty() || target.contains(&0) {
+                    reader.status.poison();
                     return Err(InvalidData.into());
                 }
 
-                read::token(reader, &wire::TOK_PAR)?;
+                try_or_poison!(reader, read::token(reader.inner, &wire::TOK_PAR));
+                reader.status.ready_parent(); // Immediately allow reading from parent again
 
                 Node::Symlink { target }
             }
             tag @ (wire::Node::Reg | wire::Node::Exe) => {
-                let len = read::u64(reader)?;
+                let len = try_or_poison!(&mut reader, read::u64(reader.inner));
 
                 Node::File {
                     executable: tag == wire::Node::Exe,
@@ -74,10 +108,8 @@ impl<'a, 'r> Node<'a, 'r> {
 ///  * You must abandon the entire archive reader upon the first error.
 ///
 /// It's fine to read exactly `reader.len()` bytes without ever seeing an explicit EOF.
-///
-/// TODO(edef): enforce these in `#[cfg(debug_assertions)]`
 pub struct FileReader<'a, 'r> {
-    reader: &'a mut Reader<'r>,
+    reader: ArchiveReader<'a, 'r>,
     len: u64,
     /// Truncated original file length for padding computation.
     /// We only care about the 3 least significant bits; semantically, this is a u3.
@@ -87,12 +119,13 @@ pub struct FileReader<'a, 'r> {
 impl<'a, 'r> FileReader<'a, 'r> {
     /// Instantiate a new reader, starting after [wire::TOK_REG] or [wire::TOK_EXE].
     /// We handle the terminating [wire::TOK_PAR] on semantic EOF.
-    fn new(reader: &'a mut Reader<'r>, len: u64) -> io::Result<Self> {
+    fn new(mut reader: ArchiveReader<'a, 'r>, len: u64) -> io::Result<Self> {
         // For zero-length files, we have to read the terminating TOK_PAR
         // immediately, since FileReader::read may never be called; we've
         // already reached semantic EOF by definition.
         if len == 0 {
-            read::token(reader, &wire::TOK_PAR)?;
+            read::token(reader.inner, &wire::TOK_PAR)?;
+            reader.status.ready_parent();
         }
 
         Ok(Self {
@@ -121,9 +154,12 @@ impl FileReader<'_, '_> {
             return Ok(&[]);
         }
 
-        let mut buf = self.reader.fill_buf()?;
+        self.reader.check_correct();
+
+        let mut buf = try_or_poison!(self.reader, self.reader.inner.fill_buf());
 
         if buf.is_empty() {
+            self.reader.status.poison();
             return Err(UnexpectedEof.into());
         }
 
@@ -141,12 +177,14 @@ impl FileReader<'_, '_> {
             return Ok(());
         }
 
+        self.reader.check_correct();
+
         self.len = self
             .len
             .checked_sub(n as u64)
             .expect("consumed bytes past EOF");
 
-        self.reader.consume(n);
+        self.reader.inner.consume(n);
 
         if self.is_empty() {
             self.finish()?;
@@ -159,7 +197,7 @@ impl FileReader<'_, '_> {
     pub fn copy(&mut self, mut dst: impl Write) -> io::Result<()> {
         while !self.is_empty() {
             let buf = self.fill_buf()?;
-            let n = dst.write(buf)?;
+            let n = try_or_poison!(self.reader, dst.write(buf));
             self.consume(n)?;
         }
 
@@ -173,14 +211,17 @@ impl Read for FileReader<'_, '_> {
             return Ok(0);
         }
 
+        self.reader.check_correct();
+
         if buf.len() as u64 > self.len {
             buf = &mut buf[..self.len as usize];
         }
 
-        let n = self.reader.read(buf)?;
+        let n = try_or_poison!(self.reader, self.reader.inner.read(buf));
         self.len -= n as u64;
 
         if n == 0 {
+            self.reader.status.poison();
             return Err(UnexpectedEof.into());
         }
 
@@ -200,21 +241,27 @@ impl FileReader<'_, '_> {
 
         if pad != 0 {
             let mut buf = [0; 8];
-            self.reader.read_exact(&mut buf[pad..])?;
+            try_or_poison!(self.reader, self.reader.inner.read_exact(&mut buf[pad..]));
 
             if buf != [0; 8] {
+                self.reader.status.poison();
                 return Err(InvalidData.into());
             }
         }
 
-        read::token(self.reader, &wire::TOK_PAR)
+        try_or_poison!(self.reader, read::token(self.reader.inner, &wire::TOK_PAR));
+
+        // Done with reading this file, allow going back up the chain of readers
+        self.reader.status.ready_parent();
+
+        Ok(())
     }
 }
 
 /// A directory iterator, yielding a sequence of [Node]s.
 /// It must be fully consumed before reading further from the [DirReader] that produced it, if any.
 pub struct DirReader<'a, 'r> {
-    reader: &'a mut Reader<'r>,
+    reader: ArchiveReader<'a, 'r>,
     /// Previous directory entry name.
     /// We have to hang onto this to enforce name monotonicity.
     prev_name: Option<Vec<u8>>,
@@ -226,7 +273,7 @@ pub struct Entry<'a, 'r> {
 }
 
 impl<'a, 'r> DirReader<'a, 'r> {
-    fn new(reader: &'a mut Reader<'r>) -> Self {
+    fn new(reader: ArchiveReader<'a, 'r>) -> Self {
         Self {
             reader,
             prev_name: None,
@@ -242,23 +289,27 @@ impl<'a, 'r> DirReader<'a, 'r> {
     ///  * You must abandon the entire archive reader on the first error.
     ///  * You must abandon the directory reader upon the first [None].
     ///  * Even if you know the amount of elements up front, you must keep reading until you encounter [None].
-    ///
-    /// TODO(edef): enforce these in `#[cfg(debug_assertions)]`
     #[allow(clippy::should_implement_trait)]
-    pub fn next(&mut self) -> io::Result<Option<Entry>> {
+    pub fn next(&mut self) -> io::Result<Option<Entry<'_, 'r>>> {
+        self.reader.check_correct();
+
         // COME FROM the previous iteration: if we've already read an entry,
         // read its terminating TOK_PAR here.
         if self.prev_name.is_some() {
-            read::token(self.reader, &wire::TOK_PAR)?;
+            try_or_poison!(self.reader, read::token(self.reader.inner, &wire::TOK_PAR));
         }
 
         // Determine if there are more entries to follow
-        if let wire::Entry::None = read::tag(self.reader)? {
+        if let wire::Entry::None = try_or_poison!(self.reader, read::tag(self.reader.inner)) {
             // We've reached the end of this directory.
+            self.reader.status.ready_parent();
             return Ok(None);
         }
 
-        let name = read::bytes(self.reader, wire::MAX_NAME_LEN)?;
+        let name = try_or_poison!(
+            self.reader,
+            read::bytes(self.reader.inner, wire::MAX_NAME_LEN)
+        );
 
         if name.is_empty()
             || name.contains(&0)
@@ -266,6 +317,7 @@ impl<'a, 'r> DirReader<'a, 'r> {
             || name == b"."
             || name == b".."
         {
+            self.reader.status.poison();
             return Err(InvalidData.into());
         }
 
@@ -276,6 +328,7 @@ impl<'a, 'r> DirReader<'a, 'r> {
             }
             Some(prev_name) => {
                 if *prev_name >= name {
+                    self.reader.status.poison();
                     return Err(InvalidData.into());
                 }
 
@@ -283,11 +336,147 @@ impl<'a, 'r> DirReader<'a, 'r> {
             }
         }
 
-        read::token(self.reader, &wire::TOK_NOD)?;
+        try_or_poison!(self.reader, read::token(self.reader.inner, &wire::TOK_NOD));
 
         Ok(Some(Entry {
             name,
-            node: Node::new(&mut self.reader)?,
+            // Don't need to worry about poisoning here: Node::new will do it for us if needed
+            node: Node::new(self.reader.child())?,
         }))
     }
 }
+
+/// We use a stack of statuses to:
+///   * Share poisoned state across all objects from the same underlying reader,
+///     so we can check they are abandoned when an error occurs
+///   * Make sure only the most recently created object is read from, and is fully exhausted
+///     before anything it was created from is used again.
+enum ArchiveReaderStatus<'a> {
+    #[cfg(not(debug_assertions))]
+    None(PhantomData<&'a ()>),
+    #[cfg(debug_assertions)]
+    StackTop { poisoned: bool, ready: bool },
+    #[cfg(debug_assertions)]
+    StackChild {
+        poisoned: &'a mut bool,
+        parent_ready: &'a mut bool,
+        ready: bool,
+    },
+}
+
+impl ArchiveReaderStatus<'_> {
+    fn top() -> Self {
+        #[cfg(debug_assertions)]
+        {
+            ArchiveReaderStatus::StackTop {
+                poisoned: false,
+                ready: true,
+            }
+        }
+
+        #[cfg(not(debug_assertions))]
+        ArchiveReaderStatus::None(PhantomData)
+    }
+
+    /// Poison all the objects sharing the same reader, to be used when an error occurs
+    fn poison(&mut self) {
+        match self {
+            #[cfg(not(debug_assertions))]
+            ArchiveReaderStatus::None(_) => {}
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackTop { poisoned: x, .. } => *x = true,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackChild { poisoned: x, .. } => **x = true,
+        }
+    }
+
+    /// Mark the parent as ready, allowing it to be used again and preventing this reference to the reader being used again.
+    fn ready_parent(&mut self) {
+        match self {
+            #[cfg(not(debug_assertions))]
+            ArchiveReaderStatus::None(_) => {}
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackTop { ready, .. } => {
+                *ready = false;
+            }
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackChild {
+                ready,
+                parent_ready,
+                ..
+            } => {
+                *ready = false;
+                **parent_ready = true;
+            }
+        };
+    }
+
+    fn poisoned(&self) -> bool {
+        match self {
+            #[cfg(not(debug_assertions))]
+            ArchiveReaderStatus::None(_) => false,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackTop { poisoned, .. } => *poisoned,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackChild { poisoned, .. } => **poisoned,
+        }
+    }
+
+    fn ready(&self) -> bool {
+        match self {
+            #[cfg(not(debug_assertions))]
+            ArchiveReaderStatus::None(_) => true,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackTop { ready, .. } => *ready,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackChild { ready, .. } => *ready,
+        }
+    }
+}
+
+impl<'a, 'r> ArchiveReader<'a, 'r> {
+    /// Create a new child reader from this one.
+    /// In debug mode, this reader will panic if called before the new child is exhausted / calls `ready_parent`
+    fn child(&mut self) -> ArchiveReader<'_, 'r> {
+        ArchiveReader {
+            inner: self.inner,
+            #[cfg(not(debug_assertions))]
+            status: ArchiveReaderStatus::None(PhantomData),
+            #[cfg(debug_assertions)]
+            status: match &mut self.status {
+                ArchiveReaderStatus::StackTop { poisoned, ready } => {
+                    *ready = false;
+                    ArchiveReaderStatus::StackChild {
+                        poisoned,
+                        parent_ready: ready,
+                        ready: true,
+                    }
+                }
+                ArchiveReaderStatus::StackChild {
+                    poisoned, ready, ..
+                } => {
+                    *ready = false;
+                    ArchiveReaderStatus::StackChild {
+                        poisoned,
+                        parent_ready: ready,
+                        ready: true,
+                    }
+                }
+            },
+        }
+    }
+
+    /// Check the reader is in the correct status.
+    /// Only does anything when debug assertions are on.
+    #[inline(always)]
+    fn check_correct(&self) {
+        assert!(
+            !self.status.poisoned(),
+            "Archive reader used after it was meant to be abandoned!"
+        );
+        assert!(
+            self.status.ready(),
+            "Non-ready archive reader used! (Should've been reading from something else)"
+        );
+    }
+}
diff --git a/tvix/nix-compat/src/nar/reader/test.rs b/tvix/nix-compat/src/nar/reader/test.rs
index fd0d6a9f5a..02dc4767c9 100644
--- a/tvix/nix-compat/src/nar/reader/test.rs
+++ b/tvix/nix-compat/src/nar/reader/test.rs
@@ -46,75 +46,233 @@ fn complicated() {
     match node {
         nar::reader::Node::Directory(mut dir_reader) => {
             // first entry is .keep, an empty regular file.
-            let entry = dir_reader
-                .next()
-                .expect("next must succeed")
-                .expect("must be some");
-
-            assert_eq!(&b".keep"[..], &entry.name);
-
-            match entry.node {
-                nar::reader::Node::File {
-                    executable,
-                    mut reader,
-                } => {
-                    assert!(!executable);
-                    assert_eq!(reader.read(&mut [0]).unwrap(), 0);
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            );
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(&b"keep"[..], &entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(mut subdir_reader) => {
+                        {
+                            // first entry is .keep, an empty regular file.
+                            let entry = subdir_reader
+                                .next()
+                                .expect("next must succeed")
+                                .expect("must be some");
+
+                            must_read_file(".keep", entry);
+                        }
+
+                        // we must read the None
+                        assert!(
+                            subdir_reader.next().expect("next must succeed").is_none(),
+                            "keep directory contains only .keep"
+                        );
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
                 }
-                _ => panic!("unexpected type for .keep"),
-            }
+            };
+
+            // reading more entries yields None (and we actually must read until this)
+            assert!(dir_reader.next().expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[test]
+#[should_panic]
+fn file_read_abandoned() {
+    let mut f = std::io::Cursor::new(include_bytes!("../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            {
+                let entry = dir_reader
+                    .next()
+                    .expect("next must succeed")
+                    .expect("must be some");
+
+                assert_eq!(&b".keep"[..], &entry.name);
+                // don't bother to finish reading it.
+            };
+
+            // this should panic (not return an error), because we are meant to abandon the archive reader now.
+            assert!(dir_reader.next().expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[test]
+#[should_panic]
+fn dir_read_abandoned() {
+    let mut f = std::io::Cursor::new(include_bytes!("../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            );
 
             // second entry is aa, a symlink to /nix/store/somewhereelse
-            let entry = dir_reader
-                .next()
-                .expect("next must be some")
-                .expect("must be some");
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
 
-            assert_eq!(&b"aa"[..], &entry.name);
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some");
 
-            match entry.node {
-                nar::reader::Node::Symlink { target } => {
-                    assert_eq!(&b"/nix/store/somewhereelse"[..], &target);
+                assert_eq!(&b"keep"[..], &entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(_) => {
+                        // don't finish using it, which poisons the archive reader
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
                 }
-                _ => panic!("unexpected type for aa"),
-            }
-
-            // third entry is a directory called "keep"
-            let entry = dir_reader
-                .next()
-                .expect("next must be some")
-                .expect("must be some");
-
-            assert_eq!(&b"keep"[..], &entry.name);
-
-            match entry.node {
-                nar::reader::Node::Directory(mut subdir_reader) => {
-                    // first entry is .keep, an empty regular file.
-                    let entry = subdir_reader
-                        .next()
-                        .expect("next must succeed")
-                        .expect("must be some");
-
-                    // โ€ฆ it contains a single .keep, an empty regular file.
-                    assert_eq!(&b".keep"[..], &entry.name);
-
-                    match entry.node {
-                        nar::reader::Node::File {
-                            executable,
-                            mut reader,
-                        } => {
-                            assert!(!executable);
-                            assert_eq!(reader.read(&mut [0]).unwrap(), 0);
-                        }
-                        _ => panic!("unexpected type for keep/.keep"),
+            };
+
+            // this should panic, because we didn't finish reading the child subdirectory
+            assert!(dir_reader.next().expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[test]
+#[should_panic]
+fn dir_read_after_none() {
+    let mut f = std::io::Cursor::new(include_bytes!("../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            );
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(&b"keep"[..], &entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(mut subdir_reader) => {
+                        // first entry is .keep, an empty regular file.
+                        must_read_file(
+                            ".keep",
+                            subdir_reader
+                                .next()
+                                .expect("next must succeed")
+                                .expect("must be some"),
+                        );
+
+                        // we must read the None
+                        assert!(
+                            subdir_reader.next().expect("next must succeed").is_none(),
+                            "keep directory contains only .keep"
+                        );
                     }
+                    _ => panic!("unexpected type for keep/.keep"),
                 }
-                _ => panic!("unexpected type for keep/.keep"),
-            }
+            };
 
             // reading more entries yields None (and we actually must read until this)
             assert!(dir_reader.next().expect("must succeed").is_none());
+
+            // this should panic, because we already got a none so we're meant to stop.
+            dir_reader.next().unwrap();
+            unreachable!()
         }
         _ => panic!("unexpected type"),
     }
 }
+
+fn must_read_file(name: &'static str, entry: nar::reader::Entry<'_, '_>) {
+    assert_eq!(name.as_bytes(), &entry.name);
+
+    match entry.node {
+        nar::reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            assert!(!executable);
+            assert_eq!(reader.read(&mut [0]).unwrap(), 0);
+        }
+        _ => panic!("unexpected type for {}", name),
+    }
+}
+
+fn must_be_symlink(
+    name: &'static str,
+    exp_target: &'static str,
+    entry: nar::reader::Entry<'_, '_>,
+) {
+    assert_eq!(name.as_bytes(), &entry.name);
+
+    match entry.node {
+        nar::reader::Node::Symlink { target } => {
+            assert_eq!(exp_target.as_bytes(), &target);
+        }
+        _ => panic!("unexpected type for {}", name),
+    }
+}
diff --git a/tvix/nix-compat/src/nar/wire/mod.rs b/tvix/nix-compat/src/nar/wire/mod.rs
index b9e0212495..9e99b530ce 100644
--- a/tvix/nix-compat/src/nar/wire/mod.rs
+++ b/tvix/nix-compat/src/nar/wire/mod.rs
@@ -90,6 +90,23 @@ pub const TOK_DIR: [u8; 24] = *b"\x09\0\0\0\0\0\0\0directory\0\0\0\0\0\0\0";
 pub const TOK_ENT: [u8; 48] = *b"\x05\0\0\0\0\0\0\0entry\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0name\0\0\0\0";
 pub const TOK_NOD: [u8; 48] = *b"\x04\0\0\0\0\0\0\0node\0\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0type\0\0\0\0";
 pub const TOK_PAR: [u8; 16] = *b"\x01\0\0\0\0\0\0\0)\0\0\0\0\0\0\0";
+#[cfg(feature = "async")]
+const TOK_PAD_PAR: [u8; 24] = *b"\0\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0)\0\0\0\0\0\0\0";
+
+#[cfg(feature = "async")]
+#[derive(Debug)]
+pub(crate) enum PadPar {}
+
+#[cfg(feature = "async")]
+impl crate::wire::reader::Tag for PadPar {
+    const PATTERN: &'static [u8] = &TOK_PAD_PAR;
+
+    type Buf = [u8; 24];
+
+    fn make_buf() -> Self::Buf {
+        [0; 24]
+    }
+}
 
 #[test]
 fn tokens() {
diff --git a/tvix/nix-compat/src/nar/wire/tag.rs b/tvix/nix-compat/src/nar/wire/tag.rs
index 55b93f9985..4982a0d707 100644
--- a/tvix/nix-compat/src/nar/wire/tag.rs
+++ b/tvix/nix-compat/src/nar/wire/tag.rs
@@ -10,6 +10,7 @@ pub trait Tag: Sized {
     const MIN: usize;
 
     /// Minimal suitably sized buffer for reading the wire representation
+    ///
     /// HACK: This is a workaround for const generics limitations.
     type Buf: AsMut<[u8]> + Send;
 
diff --git a/tvix/nix-compat/src/nix_daemon/worker_protocol.rs b/tvix/nix-compat/src/nix_daemon/worker_protocol.rs
index 58a48d1bdd..7e3adc0db2 100644
--- a/tvix/nix-compat/src/nix_daemon/worker_protocol.rs
+++ b/tvix/nix-compat/src/nix_daemon/worker_protocol.rs
@@ -15,13 +15,34 @@ static WORKER_MAGIC_1: u64 = 0x6e697863; // "nixc"
 static WORKER_MAGIC_2: u64 = 0x6478696f; // "dxio"
 pub static STDERR_LAST: u64 = 0x616c7473; // "alts"
 
+/// | Nix version     | Protocol |
+/// |-----------------|----------|
+/// | 0.11            | 1.02     |
+/// | 0.12            | 1.04     |
+/// | 0.13            | 1.05     |
+/// | 0.14            | 1.05     |
+/// | 0.15            | 1.05     |
+/// | 0.16            | 1.06     |
+/// | 1.0             | 1.10     |
+/// | 1.1             | 1.11     |
+/// | 1.2             | 1.12     |
+/// | 1.3 - 1.5.3     | 1.13     |
+/// | 1.6 - 1.10      | 1.14     |
+/// | 1.11 - 1.11.16  | 1.15     |
+/// | 2.0 - 2.0.4     | 1.20     |
+/// | 2.1 - 2.3.18    | 1.21     |
+/// | 2.4 - 2.6.1     | 1.32     |
+/// | 2.7.0           | 1.33     |
+/// | 2.8.0 - 2.14.1  | 1.34     |
+/// | 2.15.0 - 2.19.4 | 1.35     |
+/// | 2.20.0 - 2.22.0 | 1.37     |
 static PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::from_parts(1, 37);
 
 /// Max length of a Nix setting name/value. In bytes.
 ///
 /// This value has been arbitrarily choosen after looking the nix.conf
 /// manpage. Don't hesitate to increase it if it's too limiting.
-pub static MAX_SETTING_SIZE: u64 = 1024;
+pub static MAX_SETTING_SIZE: usize = 1024;
 
 /// Worker Operation
 ///
@@ -131,30 +152,30 @@ pub async fn read_client_settings<R: AsyncReadExt + Unpin>(
     r: &mut R,
     client_version: ProtocolVersion,
 ) -> std::io::Result<ClientSettings> {
-    let keep_failed = wire::read_bool(r).await?;
-    let keep_going = wire::read_bool(r).await?;
-    let try_fallback = wire::read_bool(r).await?;
-    let verbosity_uint = wire::read_u64(r).await?;
+    let keep_failed = r.read_u64_le().await? != 0;
+    let keep_going = r.read_u64_le().await? != 0;
+    let try_fallback = r.read_u64_le().await? != 0;
+    let verbosity_uint = r.read_u64_le().await?;
     let verbosity = Verbosity::from_u64(verbosity_uint).ok_or_else(|| {
         Error::new(
             ErrorKind::InvalidData,
             format!("Can't convert integer {} to verbosity", verbosity_uint),
         )
     })?;
-    let max_build_jobs = wire::read_u64(r).await?;
-    let max_silent_time = wire::read_u64(r).await?;
-    _ = wire::read_u64(r).await?; // obsolete useBuildHook
-    let verbose_build = wire::read_bool(r).await?;
-    _ = wire::read_u64(r).await?; // obsolete logType
-    _ = wire::read_u64(r).await?; // obsolete printBuildTrace
-    let build_cores = wire::read_u64(r).await?;
-    let use_substitutes = wire::read_bool(r).await?;
+    let max_build_jobs = r.read_u64_le().await?;
+    let max_silent_time = r.read_u64_le().await?;
+    _ = r.read_u64_le().await?; // obsolete useBuildHook
+    let verbose_build = r.read_u64_le().await? != 0;
+    _ = r.read_u64_le().await?; // obsolete logType
+    _ = r.read_u64_le().await?; // obsolete printBuildTrace
+    let build_cores = r.read_u64_le().await?;
+    let use_substitutes = r.read_u64_le().await? != 0;
     let mut overrides = HashMap::new();
     if client_version.minor() >= 12 {
-        let num_overrides = wire::read_u64(r).await?;
+        let num_overrides = r.read_u64_le().await?;
         for _ in 0..num_overrides {
-            let name = wire::read_string(r, 0..MAX_SETTING_SIZE).await?;
-            let value = wire::read_string(r, 0..MAX_SETTING_SIZE).await?;
+            let name = wire::read_string(r, 0..=MAX_SETTING_SIZE).await?;
+            let value = wire::read_string(r, 0..=MAX_SETTING_SIZE).await?;
             overrides.insert(name, value);
         }
     }
@@ -197,17 +218,17 @@ pub async fn server_handshake_client<'a, RW: 'a>(
 where
     &'a mut RW: AsyncReadExt + AsyncWriteExt + Unpin,
 {
-    let worker_magic_1 = wire::read_u64(&mut conn).await?;
+    let worker_magic_1 = conn.read_u64_le().await?;
     if worker_magic_1 != WORKER_MAGIC_1 {
         Err(std::io::Error::new(
             ErrorKind::InvalidData,
             format!("Incorrect worker magic number received: {}", worker_magic_1),
         ))
     } else {
-        wire::write_u64(&mut conn, WORKER_MAGIC_2).await?;
-        wire::write_u64(&mut conn, PROTOCOL_VERSION.into()).await?;
+        conn.write_u64_le(WORKER_MAGIC_2).await?;
+        conn.write_u64_le(PROTOCOL_VERSION.into()).await?;
         conn.flush().await?;
-        let client_version = wire::read_u64(&mut conn).await?;
+        let client_version = conn.read_u64_le().await?;
         // Parse into ProtocolVersion.
         let client_version: ProtocolVersion = client_version
             .try_into()
@@ -220,14 +241,14 @@ where
         }
         if client_version.minor() >= 14 {
             // Obsolete CPU affinity.
-            let read_affinity = wire::read_u64(&mut conn).await?;
+            let read_affinity = conn.read_u64_le().await?;
             if read_affinity != 0 {
-                let _cpu_affinity = wire::read_u64(&mut conn).await?;
+                let _cpu_affinity = conn.read_u64_le().await?;
             };
         }
         if client_version.minor() >= 11 {
             // Obsolete reserveSpace
-            let _reserve_space = wire::read_u64(&mut conn).await?;
+            let _reserve_space = conn.read_u64_le().await?;
         }
         if client_version.minor() >= 33 {
             // Nix version. We're plain lying, we're not Nix, but ehโ€ฆ
@@ -245,7 +266,7 @@ where
 
 /// Read a worker [Operation] from the wire.
 pub async fn read_op<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<Operation> {
-    let op_number = wire::read_u64(r).await?;
+    let op_number = r.read_u64_le().await?;
     Operation::from_u64(op_number).ok_or(Error::new(
         ErrorKind::InvalidData,
         format!("Invalid OP number {}", op_number),
@@ -278,8 +299,8 @@ where
     W: AsyncReadExt + AsyncWriteExt + Unpin,
 {
     match t {
-        Trust::Trusted => wire::write_u64(conn, 1).await,
-        Trust::NotTrusted => wire::write_u64(conn, 2).await,
+        Trust::Trusted => conn.write_u64_le(1).await,
+        Trust::NotTrusted => conn.write_u64_le(2).await,
     }
 }
 
diff --git a/tvix/nix-compat/src/store_path/mod.rs b/tvix/nix-compat/src/store_path/mod.rs
index ac9f1805e3..ff7ede77e1 100644
--- a/tvix/nix-compat/src/store_path/mod.rs
+++ b/tvix/nix-compat/src/store_path/mod.rs
@@ -303,8 +303,7 @@ impl Serialize for StorePathRef<'_> {
     }
 }
 
-/// NAME_CHARS contains `true` for bytes that are valid in store path names,
-/// not accounting for '.' being permitted only past the first character.
+/// NAME_CHARS contains `true` for bytes that are valid in store path names.
 static NAME_CHARS: [bool; 256] = {
     let mut tbl = [false; 256];
     let mut c = 0;
@@ -332,10 +331,6 @@ pub(crate) fn validate_name(s: &(impl AsRef<[u8]> + ?Sized)) -> Result<&str, Err
         return Err(Error::InvalidLength);
     }
 
-    if s[0] == b'.' {
-        return Err(Error::InvalidName(s.to_vec(), 0));
-    }
-
     let mut valid = true;
     for &c in s {
         valid = valid && NAME_CHARS[c as usize];
@@ -446,15 +441,18 @@ mod tests {
         }
     }
 
-    /// This is the store path rejected when `nix-store --add`'ing an
+    /// This is the store path *accepted* when `nix-store --add`'ing an
     /// empty `.gitignore` file.
     ///
-    /// Nix 2.4 accidentally dropped this behaviour, but this is considered a bug.
-    /// See https://github.com/NixOS/nix/pull/9095.
+    /// Nix 2.4 accidentally permitted this behaviour, but the revert came
+    /// too late to beat Hyrum's law. It is now considered permissible.
+    ///
+    /// https://github.com/NixOS/nix/pull/9095 (revert)
+    /// https://github.com/NixOS/nix/pull/9867 (revert-of-revert)
     #[test]
     fn starts_with_dot() {
         StorePath::from_bytes(b"fli4bwscgna7lpm7v5xgnjxrxh0yc7ra-.gitignore")
-            .expect_err("must fail");
+            .expect("must succeed");
     }
 
     #[test]
diff --git a/tvix/nix-compat/src/wire/bytes/mod.rs b/tvix/nix-compat/src/wire/bytes/mod.rs
index 9ec8b3fa04..5ed5e15a64 100644
--- a/tvix/nix-compat/src/wire/bytes/mod.rs
+++ b/tvix/nix-compat/src/wire/bytes/mod.rs
@@ -1,16 +1,14 @@
 use std::{
     io::{Error, ErrorKind},
-    ops::RangeBounds,
+    ops::RangeInclusive,
 };
 use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
-mod reader;
+pub(crate) mod reader;
 pub use reader::BytesReader;
 mod writer;
 pub use writer::BytesWriter;
 
-use super::primitive;
-
 /// 8 null bytes, used to write out padding.
 const EMPTY_BYTES: &[u8; 8] = &[0u8; 8];
 
@@ -35,24 +33,29 @@ const LEN_SIZE: usize = 8;
 ///
 /// This buffers the entire payload into memory,
 /// a streaming version is available at [crate::wire::bytes::BytesReader].
-pub async fn read_bytes<R, S>(r: &mut R, allowed_size: S) -> std::io::Result<Vec<u8>>
+pub async fn read_bytes<R: ?Sized>(
+    r: &mut R,
+    allowed_size: RangeInclusive<usize>,
+) -> std::io::Result<Vec<u8>>
 where
     R: AsyncReadExt + Unpin,
-    S: RangeBounds<u64>,
 {
     // read the length field
-    let len = primitive::read_u64(r).await?;
-
-    if !allowed_size.contains(&len) {
-        return Err(std::io::Error::new(
-            std::io::ErrorKind::InvalidData,
-            "signalled package size not in allowed range",
-        ));
-    }
+    let len = r.read_u64_le().await?;
+    let len: usize = len
+        .try_into()
+        .ok()
+        .filter(|len| allowed_size.contains(len))
+        .ok_or_else(|| {
+            std::io::Error::new(
+                std::io::ErrorKind::InvalidData,
+                "signalled package size not in allowed range",
+            )
+        })?;
 
     // calculate the total length, including padding.
     // byte packets are padded to 8 byte blocks each.
-    let padded_len = padding_len(len) as u64 + (len as u64);
+    let padded_len = padding_len(len as u64) as u64 + (len as u64);
     let mut limited_reader = r.take(padded_len);
 
     let mut buf = Vec::new();
@@ -61,13 +64,10 @@ where
 
     // make sure we got exactly the number of bytes, and not less.
     if s as u64 != padded_len {
-        return Err(std::io::Error::new(
-            std::io::ErrorKind::InvalidData,
-            "got less bytes than expected",
-        ));
+        return Err(std::io::ErrorKind::UnexpectedEof.into());
     }
 
-    let (_content, padding) = buf.split_at(len as usize);
+    let (_content, padding) = buf.split_at(len);
 
     // ensure the padding is all zeroes.
     if !padding.iter().all(|e| *e == b'\0') {
@@ -78,17 +78,19 @@ where
     }
 
     // return the data without the padding
-    buf.truncate(len as usize);
+    buf.truncate(len);
     Ok(buf)
 }
 
 /// Read a "bytes wire packet" of from the AsyncRead and tries to parse as string.
 /// Internally uses [read_bytes].
 /// Rejects reading more than `allowed_size` bytes of payload.
-pub async fn read_string<R, S>(r: &mut R, allowed_size: S) -> std::io::Result<String>
+pub async fn read_string<R>(
+    r: &mut R,
+    allowed_size: RangeInclusive<usize>,
+) -> std::io::Result<String>
 where
     R: AsyncReadExt + Unpin,
-    S: RangeBounds<u64>,
 {
     let bytes = read_bytes(r, allowed_size).await?;
     String::from_utf8(bytes).map_err(|e| Error::new(ErrorKind::InvalidData, e))
@@ -108,7 +110,7 @@ pub async fn write_bytes<W: AsyncWriteExt + Unpin, B: AsRef<[u8]>>(
     b: B,
 ) -> std::io::Result<()> {
     // write the size packet.
-    primitive::write_u64(w, b.as_ref().len() as u64).await?;
+    w.write_u64_le(b.as_ref().len() as u64).await?;
 
     // write the payload
     w.write_all(b.as_ref()).await?;
@@ -122,33 +124,10 @@ pub async fn write_bytes<W: AsyncWriteExt + Unpin, B: AsRef<[u8]>>(
 }
 
 /// Computes the number of bytes we should add to len (a length in
-/// bytes) to be alined on 64 bits (8 bytes).
+/// bytes) to be aligned on 64 bits (8 bytes).
 fn padding_len(len: u64) -> u8 {
-    let modulo = len % 8;
-    if modulo == 0 {
-        0
-    } else {
-        8 - modulo as u8
-    }
-}
-
-/// Models the position inside a "bytes wire packet" that the reader or writer
-/// is in.
-/// It can be in three different stages, inside size, payload or padding fields.
-/// The number tracks the number of bytes written inside the specific field.
-/// There shall be no ambiguous states, at the end of a stage we immediately
-/// move to the beginning of the next one:
-/// - Size(LEN_SIZE) must be expressed as Payload(0)
-/// - Payload(self.payload_len) must be expressed as Padding(0)
-/// There's one exception - Size(LEN_SIZE) in the reader represents a failure
-/// state we enter in case the allowed size doesn't match the allowed range.
-///
-/// Padding(padding_len) means we're at the end of the bytes wire packet.
-#[derive(Clone, Debug, PartialEq, Eq)]
-enum BytesPacketPosition {
-    Size(usize),
-    Payload(u64),
-    Padding(usize),
+    let aligned = len.wrapping_add(7) & !7;
+    aligned.wrapping_sub(len) as u8
 }
 
 #[cfg(test)]
@@ -160,7 +139,7 @@ mod tests {
 
     /// The maximum length of bytes packets we're willing to accept in the test
     /// cases.
-    const MAX_LEN: u64 = 1024;
+    const MAX_LEN: usize = 1024;
 
     #[tokio::test]
     async fn test_read_8_bytes() {
@@ -171,10 +150,7 @@ mod tests {
 
         assert_eq!(
             &12345678u64.to_le_bytes(),
-            read_bytes(&mut mock, 0u64..MAX_LEN)
-                .await
-                .unwrap()
-                .as_slice()
+            read_bytes(&mut mock, 0..=MAX_LEN).await.unwrap().as_slice()
         );
     }
 
@@ -187,10 +163,7 @@ mod tests {
 
         assert_eq!(
             hex!("010203040506070809"),
-            read_bytes(&mut mock, 0u64..MAX_LEN)
-                .await
-                .unwrap()
-                .as_slice()
+            read_bytes(&mut mock, 0..=MAX_LEN).await.unwrap().as_slice()
         );
     }
 
@@ -202,10 +175,7 @@ mod tests {
 
         assert_eq!(
             hex!(""),
-            read_bytes(&mut mock, 0u64..MAX_LEN)
-                .await
-                .unwrap()
-                .as_slice()
+            read_bytes(&mut mock, 0..=MAX_LEN).await.unwrap().as_slice()
         );
     }
 
@@ -215,7 +185,7 @@ mod tests {
     async fn test_read_reject_too_large() {
         let mut mock = Builder::new().read(&100u64.to_le_bytes()).build();
 
-        read_bytes(&mut mock, 10..10)
+        read_bytes(&mut mock, 10..=10)
             .await
             .expect_err("expect this to fail");
     }
@@ -251,4 +221,9 @@ mod tests {
             .build();
         assert_ok!(write_bytes(&mut mock, &input).await)
     }
+
+    #[test]
+    fn padding_len_u64_max() {
+        assert_eq!(padding_len(u64::MAX), 1);
+    }
 }
diff --git a/tvix/nix-compat/src/wire/bytes/reader.rs b/tvix/nix-compat/src/wire/bytes/reader.rs
deleted file mode 100644
index 9aea677645..0000000000
--- a/tvix/nix-compat/src/wire/bytes/reader.rs
+++ /dev/null
@@ -1,464 +0,0 @@
-use pin_project_lite::pin_project;
-use std::{
-    ops::RangeBounds,
-    task::{ready, Poll},
-};
-use tokio::io::AsyncRead;
-
-use super::{padding_len, BytesPacketPosition, LEN_SIZE};
-
-pin_project! {
-    /// Reads a "bytes wire packet" from the underlying reader.
-    /// The format is the same as in [crate::wire::bytes::read_bytes],
-    /// however this structure provides a [AsyncRead] interface,
-    /// allowing to not having to pass around the entire payload in memory.
-    ///
-    /// After being constructed with the underlying reader and an allowed size,
-    /// subsequent requests to poll_read will return payload data until the end
-    /// of the packet is reached.
-    ///
-    /// Internally, it will first read over the size packet, filling payload_size,
-    /// ensuring it fits allowed_size, then return payload data.
-    /// It will only signal EOF (returning `Ok(())` without filling the buffer anymore)
-    /// when all padding has been successfully consumed too.
-    ///
-    /// This also means, it's important for a user to always read to the end,
-    /// and not just call read_exact - otherwise it might not skip over the
-    /// padding, and return garbage when reading the next packet.
-    ///
-    /// In case of an error due to size constraints, or in case of not reading
-    /// all the way to the end (and getting a EOF), the underlying reader is no
-    /// longer usable and might return garbage.
-    pub struct BytesReader<R, S>
-    where
-    R: AsyncRead,
-    S: RangeBounds<u64>,
-
-    {
-        #[pin]
-        inner: R,
-
-        allowed_size: S,
-        payload_size: [u8; 8],
-        state: BytesPacketPosition,
-    }
-}
-
-impl<R, S> BytesReader<R, S>
-where
-    R: AsyncRead + Unpin,
-    S: RangeBounds<u64>,
-{
-    /// Constructs a new BytesReader, using the underlying passed reader.
-    pub fn new(r: R, allowed_size: S) -> Self {
-        Self {
-            inner: r,
-            allowed_size,
-            payload_size: [0; 8],
-            state: BytesPacketPosition::Size(0),
-        }
-    }
-}
-/// Returns an error if the passed usize is 0.
-#[inline]
-fn ensure_nonzero_bytes_read(bytes_read: usize) -> Result<usize, std::io::Error> {
-    if bytes_read == 0 {
-        Err(std::io::Error::new(
-            std::io::ErrorKind::UnexpectedEof,
-            "underlying reader returned EOF",
-        ))
-    } else {
-        Ok(bytes_read)
-    }
-}
-
-impl<R, S> AsyncRead for BytesReader<R, S>
-where
-    R: AsyncRead,
-    S: RangeBounds<u64>,
-{
-    fn poll_read(
-        self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &mut tokio::io::ReadBuf<'_>,
-    ) -> Poll<std::io::Result<()>> {
-        let mut this = self.project();
-
-        // Use a loop, so we can deal with (multiple) state transitions.
-        loop {
-            match *this.state {
-                BytesPacketPosition::Size(LEN_SIZE) => {
-                    // used in case an invalid size was signalled.
-                    Err(std::io::Error::new(
-                        std::io::ErrorKind::InvalidData,
-                        "signalled package size not in allowed range",
-                    ))?
-                }
-                BytesPacketPosition::Size(pos) => {
-                    // try to read more of the size field.
-                    // We wrap a ReadBuf around this.payload_size here, and set_filled.
-                    let mut read_buf = tokio::io::ReadBuf::new(this.payload_size);
-                    read_buf.advance(pos);
-                    ready!(this.inner.as_mut().poll_read(cx, &mut read_buf))?;
-
-                    ensure_nonzero_bytes_read(read_buf.filled().len() - pos)?;
-
-                    let total_size_read = read_buf.filled().len();
-                    if total_size_read == LEN_SIZE {
-                        // If the entire payload size was read, parse it
-                        let payload_size = u64::from_le_bytes(*this.payload_size);
-
-                        if !this.allowed_size.contains(&payload_size) {
-                            // If it's not in the allowed
-                            // range, transition to failure mode
-                            // `BytesPacketPosition::Size(LEN_SIZE)`, where only
-                            // an error is returned.
-                            *this.state = BytesPacketPosition::Size(LEN_SIZE)
-                        } else if payload_size == 0 {
-                            // If the payload size is 0, move on to reading padding directly.
-                            *this.state = BytesPacketPosition::Padding(0)
-                        } else {
-                            // Else, transition to reading the payload.
-                            *this.state = BytesPacketPosition::Payload(0)
-                        }
-                    } else {
-                        // If we still need to read more of payload size, update
-                        // our position in the state.
-                        *this.state = BytesPacketPosition::Size(total_size_read)
-                    }
-                }
-                BytesPacketPosition::Payload(pos) => {
-                    let signalled_size = u64::from_le_bytes(*this.payload_size);
-                    // We don't enter this match arm at all if we're expecting empty payload
-                    debug_assert!(signalled_size > 0, "signalled size must be larger than 0");
-
-                    // Read from the underlying reader into buf
-                    // We cap the ReadBuf to the size of the payload, as we
-                    // don't want to leak padding to the caller.
-                    let bytes_read = ensure_nonzero_bytes_read({
-                        // Reducing these two u64 to usize on 32bits is fine - we
-                        // only care about not reading too much, not too less.
-                        let mut limited_buf = buf.take((signalled_size - pos) as usize);
-                        ready!(this.inner.as_mut().poll_read(cx, &mut limited_buf))?;
-                        limited_buf.filled().len()
-                    })?;
-
-                    // SAFETY: we just did populate this, but through limited_buf.
-                    unsafe { buf.assume_init(bytes_read) }
-                    buf.advance(bytes_read);
-
-                    if pos + bytes_read as u64 == signalled_size {
-                        // If we now read all payload, transition to padding
-                        // state.
-                        *this.state = BytesPacketPosition::Padding(0);
-                    } else {
-                        // if we didn't read everything yet, update our position
-                        // in the state.
-                        *this.state = BytesPacketPosition::Payload(pos + bytes_read as u64);
-                    }
-
-                    // We return from poll_read here.
-                    // This is important, as any error (or even Pending) from
-                    // the underlying reader on the next read (be it padding or
-                    // payload) would require us to roll back buf, as generally
-                    // a AsyncRead::poll_read may not advance the buffer in case
-                    // of a nonsuccessful read.
-                    // It can't be misinterpreted as EOF, as we definitely *did*
-                    // write something into buf if we come to here (we pass
-                    // `ensure_nonzero_bytes_read`).
-                    return Ok(()).into();
-                }
-                BytesPacketPosition::Padding(pos) => {
-                    // Consume whatever padding is left, ensuring it's all null
-                    // bytes. Only return `Ready(Ok(()))` once we're past the
-                    // padding (or in cases where polling the inner reader
-                    // returns `Poll::Pending`).
-                    let signalled_size = u64::from_le_bytes(*this.payload_size);
-                    let total_padding_len = padding_len(signalled_size) as usize;
-
-                    let padding_len_remaining = total_padding_len - pos;
-                    if padding_len_remaining != 0 {
-                        // create a buffer only accepting the number of remaining padding bytes.
-                        let mut buf = [0; 8];
-                        let mut padding_buf = tokio::io::ReadBuf::new(&mut buf);
-                        let mut padding_buf = padding_buf.take(padding_len_remaining);
-
-                        // read into padding_buf.
-                        ready!(this.inner.as_mut().poll_read(cx, &mut padding_buf))?;
-                        let bytes_read = ensure_nonzero_bytes_read(padding_buf.filled().len())?;
-
-                        *this.state = BytesPacketPosition::Padding(pos + bytes_read);
-
-                        // ensure the bytes are not null bytes
-                        if !padding_buf.filled().iter().all(|e| *e == b'\0') {
-                            return Err(std::io::Error::new(
-                                std::io::ErrorKind::InvalidData,
-                                "padding is not all zeroes",
-                            ))
-                            .into();
-                        }
-
-                        // if we still have padding to read, run the loop again.
-                        continue;
-                    }
-                    // return EOF
-                    return Ok(()).into();
-                }
-            }
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::time::Duration;
-
-    use crate::wire::bytes::write_bytes;
-    use hex_literal::hex;
-    use lazy_static::lazy_static;
-    use rstest::rstest;
-    use tokio::io::AsyncReadExt;
-    use tokio_test::{assert_err, io::Builder};
-
-    use super::*;
-
-    /// The maximum length of bytes packets we're willing to accept in the test
-    /// cases.
-    const MAX_LEN: u64 = 1024;
-
-    lazy_static! {
-        pub static ref LARGE_PAYLOAD: Vec<u8> = (0..255).collect::<Vec<u8>>().repeat(4 * 1024);
-    }
-
-    /// Helper function, calling the (simpler) write_bytes with the payload.
-    /// We use this to create data we want to read from the wire.
-    async fn produce_packet_bytes(payload: &[u8]) -> Vec<u8> {
-        let mut exp = vec![];
-        write_bytes(&mut exp, payload).await.unwrap();
-        exp
-    }
-
-    /// Read bytes packets of various length, and ensure read_to_end returns the
-    /// expected payload.
-    #[rstest]
-    #[case::empty(&[])] // empty bytes packet
-    #[case::size_1b(&[0xff])] // 1 bytes payload
-    #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding)
-    #[case::size_9b( &hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding)
-    #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet
-    #[tokio::test]
-    async fn read_payload_correct(#[case] payload: &[u8]) {
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await)
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64);
-        let mut buf = Vec::new();
-        r.read_to_end(&mut buf).await.expect("must succeed");
-
-        assert_eq!(payload, &buf[..]);
-    }
-
-    /// Fail if the bytes packet is larger than allowed
-    #[tokio::test]
-    async fn read_bigger_than_allowed_fail() {
-        let payload = LARGE_PAYLOAD.as_slice();
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..2048);
-        let mut buf = Vec::new();
-        assert_err!(r.read_to_end(&mut buf).await);
-    }
-
-    /// Fail if the bytes packet is smaller than allowed
-    #[tokio::test]
-    async fn read_smaller_than_allowed_fail() {
-        let payload = &[0x00, 0x01, 0x02];
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, 1024..2048);
-        let mut buf = Vec::new();
-        assert_err!(r.read_to_end(&mut buf).await);
-    }
-
-    /// Fail if the padding is not all zeroes
-    #[tokio::test]
-    async fn read_fail_if_nonzero_padding() {
-        let payload = &[0x00, 0x01, 0x02];
-        let mut packet_bytes = produce_packet_bytes(payload).await;
-        // Flip some bits in the padding
-        packet_bytes[12] = 0xff;
-        let mut mock = Builder::new().read(&packet_bytes).build(); // We stop reading after the faulty bit
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = Vec::new();
-
-        r.read_to_end(&mut buf).await.expect_err("must fail");
-    }
-
-    /// Start a 9 bytes payload packet, but have the underlying reader return
-    /// EOF in the middle of the size packet (after 4 bytes).
-    /// We should get an unexpected EOF error, already when trying to read the
-    /// first byte (of payload)
-    #[tokio::test]
-    async fn read_9b_eof_during_size() {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..4])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = [0u8; 1];
-
-        assert_eq!(
-            r.read_exact(&mut buf).await.expect_err("must fail").kind(),
-            std::io::ErrorKind::UnexpectedEof
-        );
-
-        assert_eq!(&[0], &buf, "buffer should stay empty");
-    }
-
-    /// Start a 9 bytes payload packet, but have the underlying reader return
-    /// EOF in the middle of the payload (4 bytes into the payload).
-    /// We should get an unexpected EOF error, after reading the first 4 bytes
-    /// (successfully).
-    #[tokio::test]
-    async fn read_9b_eof_during_payload() {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..8 + 4])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = [0; 9];
-
-        r.read_exact(&mut buf[..4]).await.expect("must succeed");
-
-        assert_eq!(
-            r.read_exact(&mut buf[4..=4])
-                .await
-                .expect_err("must fail")
-                .kind(),
-            std::io::ErrorKind::UnexpectedEof
-        );
-    }
-
-    /// Start a 9 bytes payload packet, but return an error at various stages *after* the actual payload.
-    /// read_exact with a 9 bytes buffer is expected to succeed, but any further
-    /// read, as well as read_to_end are expected to fail.
-    #[rstest]
-    #[case::before_padding(8 + 9)]
-    #[case::during_padding(8 + 9 + 2)]
-    #[case::after_padding(8 + 9 + padding_len(9) as usize)]
-    #[tokio::test]
-    async fn read_9b_eof_after_payload(#[case] offset: usize) {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..offset])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = [0; 9];
-
-        // read_exact of the payload will succeed, but a subsequent read will
-        // return UnexpectedEof error.
-        r.read_exact(&mut buf).await.expect("should succeed");
-        assert_eq!(
-            r.read_exact(&mut buf[4..=4])
-                .await
-                .expect_err("must fail")
-                .kind(),
-            std::io::ErrorKind::UnexpectedEof
-        );
-
-        // read_to_end will fail.
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..8 + payload.len()])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = Vec::new();
-        assert_eq!(
-            r.read_to_end(&mut buf).await.expect_err("must fail").kind(),
-            std::io::ErrorKind::UnexpectedEof
-        );
-    }
-
-    /// Start a 9 bytes payload packet, but return an error after a certain position.
-    /// Ensure that error is propagated.
-    #[rstest]
-    #[case::during_size(4)]
-    #[case::before_payload(8)]
-    #[case::during_payload(8 + 4)]
-    #[case::before_padding(8 + 4)]
-    #[case::during_padding(8 + 9 + 2)]
-    #[tokio::test]
-    async fn propagate_error_from_reader(#[case] offset: usize) {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..offset])
-            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = Vec::new();
-
-        let err = r.read_to_end(&mut buf).await.expect_err("must fail");
-        assert_eq!(
-            err.kind(),
-            std::io::ErrorKind::Other,
-            "error kind must match"
-        );
-
-        assert_eq!(
-            err.into_inner().unwrap().to_string(),
-            "foo",
-            "error payload must contain foo"
-        );
-    }
-
-    /// If there's an error right after the padding, we don't propagate it, as
-    /// we're done reading. We just return EOF.
-    #[tokio::test]
-    async fn no_error_after_eof() {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await)
-            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = Vec::new();
-
-        r.read_to_end(&mut buf).await.expect("must succeed");
-        assert_eq!(buf.as_slice(), payload);
-    }
-
-    /// Introduce various stalls in various places of the packet, to ensure we
-    /// handle these cases properly, too.
-    #[rstest]
-    #[case::beginning(0)]
-    #[case::before_payload(8)]
-    #[case::during_payload(8 + 4)]
-    #[case::before_padding(8 + 4)]
-    #[case::during_padding(8 + 9 + 2)]
-    #[tokio::test]
-    async fn read_payload_correct_pending(#[case] offset: usize) {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..offset])
-            .wait(Duration::from_nanos(0))
-            .read(&produce_packet_bytes(payload).await[offset..])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64);
-        let mut buf = Vec::new();
-        r.read_to_end(&mut buf).await.expect("must succeed");
-
-        assert_eq!(payload, &buf[..]);
-    }
-}
diff --git a/tvix/nix-compat/src/wire/bytes/reader/mod.rs b/tvix/nix-compat/src/wire/bytes/reader/mod.rs
new file mode 100644
index 0000000000..cd45f78a0c
--- /dev/null
+++ b/tvix/nix-compat/src/wire/bytes/reader/mod.rs
@@ -0,0 +1,447 @@
+use std::{
+    future::Future,
+    io,
+    ops::RangeBounds,
+    pin::Pin,
+    task::{self, ready, Poll},
+};
+use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
+
+use trailer::{read_trailer, ReadTrailer, Trailer};
+
+#[doc(hidden)]
+pub use self::trailer::Pad;
+pub(crate) use self::trailer::Tag;
+mod trailer;
+
+/// Reads a "bytes wire packet" from the underlying reader.
+/// The format is the same as in [crate::wire::bytes::read_bytes],
+/// however this structure provides a [AsyncRead] interface,
+/// allowing to not having to pass around the entire payload in memory.
+///
+/// It is constructed by reading a size with [BytesReader::new],
+/// and yields payload data until the end of the packet is reached.
+///
+/// It will not return the final bytes before all padding has been successfully
+/// consumed as well, but the full length of the reader must be consumed.
+///
+/// If the data is not read all the way to the end, or an error is encountered,
+/// the underlying reader is no longer usable and might return garbage.
+#[derive(Debug)]
+#[allow(private_bounds)]
+pub struct BytesReader<R, T: Tag = Pad> {
+    state: State<R, T>,
+}
+
+#[derive(Debug)]
+enum State<R, T: Tag> {
+    /// Full 8-byte blocks are being read and released to the caller.
+    Body {
+        reader: Option<R>,
+        consumed: u64,
+        /// The total length of all user data contained in both the body and trailer.
+        user_len: u64,
+    },
+    /// The trailer is in the process of being read.
+    ReadTrailer(ReadTrailer<R, T>),
+    /// The trailer has been fully read and validated,
+    /// and data can now be released to the caller.
+    ReleaseTrailer { consumed: u8, data: Trailer },
+}
+
+impl<R> BytesReader<R>
+where
+    R: AsyncRead + Unpin,
+{
+    /// Constructs a new BytesReader, using the underlying passed reader.
+    pub async fn new<S: RangeBounds<u64>>(reader: R, allowed_size: S) -> io::Result<Self> {
+        BytesReader::new_internal(reader, allowed_size).await
+    }
+}
+
+#[allow(private_bounds)]
+impl<R, T: Tag> BytesReader<R, T>
+where
+    R: AsyncRead + Unpin,
+{
+    /// Constructs a new BytesReader, using the underlying passed reader.
+    pub(crate) async fn new_internal<S: RangeBounds<u64>>(
+        mut reader: R,
+        allowed_size: S,
+    ) -> io::Result<Self> {
+        let size = reader.read_u64_le().await?;
+
+        if !allowed_size.contains(&size) {
+            return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid size"));
+        }
+
+        Ok(Self {
+            state: State::Body {
+                reader: Some(reader),
+                consumed: 0,
+                user_len: size,
+            },
+        })
+    }
+
+    /// Returns whether there is any remaining data to be read.
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    /// Remaining data length, ie not including data already read.
+    ///
+    /// If the size has not been read yet, this is [None].
+    pub fn len(&self) -> u64 {
+        match self.state {
+            State::Body {
+                consumed, user_len, ..
+            } => user_len - consumed,
+            State::ReadTrailer(ref fut) => fut.len() as u64,
+            State::ReleaseTrailer { consumed, ref data } => data.len() as u64 - consumed as u64,
+        }
+    }
+}
+
+#[allow(private_bounds)]
+impl<R: AsyncRead + Unpin, T: Tag> AsyncRead for BytesReader<R, T> {
+    fn poll_read(
+        mut self: Pin<&mut Self>,
+        cx: &mut task::Context,
+        buf: &mut ReadBuf,
+    ) -> Poll<io::Result<()>> {
+        let this = &mut self.state;
+
+        loop {
+            match this {
+                State::Body {
+                    reader,
+                    consumed,
+                    user_len,
+                } => {
+                    let body_len = *user_len & !7;
+                    let remaining = body_len - *consumed;
+
+                    let reader = if remaining == 0 {
+                        let reader = reader.take().unwrap();
+                        let user_len = (*user_len & 7) as u8;
+                        *this = State::ReadTrailer(read_trailer(reader, user_len));
+                        continue;
+                    } else {
+                        reader.as_mut().unwrap()
+                    };
+
+                    let mut bytes_read = 0;
+                    ready!(with_limited(buf, remaining, |buf| {
+                        let ret = Pin::new(reader).poll_read(cx, buf);
+                        bytes_read = buf.initialized().len();
+                        ret
+                    }))?;
+
+                    *consumed += bytes_read as u64;
+
+                    return if bytes_read != 0 {
+                        Ok(())
+                    } else {
+                        Err(io::ErrorKind::UnexpectedEof.into())
+                    }
+                    .into();
+                }
+                State::ReadTrailer(fut) => {
+                    *this = State::ReleaseTrailer {
+                        consumed: 0,
+                        data: ready!(Pin::new(fut).poll(cx))?,
+                    };
+                }
+                State::ReleaseTrailer { consumed, data } => {
+                    let data = &data[*consumed as usize..];
+                    let data = &data[..usize::min(data.len(), buf.remaining())];
+
+                    buf.put_slice(data);
+                    *consumed += data.len() as u8;
+
+                    return Ok(()).into();
+                }
+            }
+        }
+    }
+}
+
+/// Make a limited version of `buf`, consisting only of up to `n` bytes of the unfilled section, and call `f` with it.
+/// After `f` returns, we propagate the filled cursor advancement back to `buf`.
+fn with_limited<R>(buf: &mut ReadBuf, n: u64, f: impl FnOnce(&mut ReadBuf) -> R) -> R {
+    let mut nbuf = buf.take(n.try_into().unwrap_or(usize::MAX));
+    let ptr = nbuf.initialized().as_ptr();
+    let ret = f(&mut nbuf);
+
+    // SAFETY: `ReadBuf::take` only returns the *unfilled* section of `buf`,
+    // so anything filled is new, initialized data.
+    //
+    // We verify that `nbuf` still points to the same buffer,
+    // so we're sure it hasn't been swapped out.
+    unsafe {
+        // ensure our buffer hasn't been swapped out
+        assert_eq!(nbuf.initialized().as_ptr(), ptr);
+
+        let n = nbuf.filled().len();
+        buf.assume_init(n);
+        buf.advance(n);
+    }
+
+    ret
+}
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use crate::wire::bytes::{padding_len, write_bytes};
+    use hex_literal::hex;
+    use lazy_static::lazy_static;
+    use rstest::rstest;
+    use tokio::io::AsyncReadExt;
+    use tokio_test::io::Builder;
+
+    use super::*;
+
+    /// The maximum length of bytes packets we're willing to accept in the test
+    /// cases.
+    const MAX_LEN: u64 = 1024;
+
+    lazy_static! {
+        pub static ref LARGE_PAYLOAD: Vec<u8> = (0..255).collect::<Vec<u8>>().repeat(4 * 1024);
+    }
+
+    /// Helper function, calling the (simpler) write_bytes with the payload.
+    /// We use this to create data we want to read from the wire.
+    async fn produce_packet_bytes(payload: &[u8]) -> Vec<u8> {
+        let mut exp = vec![];
+        write_bytes(&mut exp, payload).await.unwrap();
+        exp
+    }
+
+    /// Read bytes packets of various length, and ensure read_to_end returns the
+    /// expected payload.
+    #[rstest]
+    #[case::empty(&[])] // empty bytes packet
+    #[case::size_1b(&[0xff])] // 1 bytes payload
+    #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding)
+    #[case::size_9b(&hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding)
+    #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet
+    #[tokio::test]
+    async fn read_payload_correct(#[case] payload: &[u8]) {
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await)
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64)
+            .await
+            .unwrap();
+        let mut buf = Vec::new();
+        r.read_to_end(&mut buf).await.expect("must succeed");
+
+        assert_eq!(payload, &buf[..]);
+    }
+
+    /// Fail if the bytes packet is larger than allowed
+    #[tokio::test]
+    async fn read_bigger_than_allowed_fail() {
+        let payload = LARGE_PAYLOAD.as_slice();
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet
+            .build();
+
+        assert_eq!(
+            BytesReader::new(&mut mock, ..2048)
+                .await
+                .unwrap_err()
+                .kind(),
+            io::ErrorKind::InvalidData
+        );
+    }
+
+    /// Fail if the bytes packet is smaller than allowed
+    #[tokio::test]
+    async fn read_smaller_than_allowed_fail() {
+        let payload = &[0x00, 0x01, 0x02];
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet
+            .build();
+
+        assert_eq!(
+            BytesReader::new(&mut mock, 1024..2048)
+                .await
+                .unwrap_err()
+                .kind(),
+            io::ErrorKind::InvalidData
+        );
+    }
+
+    /// Fail if the padding is not all zeroes
+    #[tokio::test]
+    async fn read_fail_if_nonzero_padding() {
+        let payload = &[0x00, 0x01, 0x02];
+        let mut packet_bytes = produce_packet_bytes(payload).await;
+        // Flip some bits in the padding
+        packet_bytes[12] = 0xff;
+        let mut mock = Builder::new().read(&packet_bytes).build(); // We stop reading after the faulty bit
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+        let mut buf = Vec::new();
+
+        r.read_to_end(&mut buf).await.expect_err("must fail");
+    }
+
+    /// Start a 9 bytes payload packet, but have the underlying reader return
+    /// EOF in the middle of the size packet (after 4 bytes).
+    /// We should get an unexpected EOF error, already when trying to read the
+    /// first byte (of payload)
+    #[tokio::test]
+    async fn read_9b_eof_during_size() {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..4])
+            .build();
+
+        assert_eq!(
+            BytesReader::new(&mut mock, ..MAX_LEN)
+                .await
+                .expect_err("must fail")
+                .kind(),
+            io::ErrorKind::UnexpectedEof
+        );
+    }
+
+    /// Start a 9 bytes payload packet, but have the underlying reader return
+    /// EOF in the middle of the payload (4 bytes into the payload).
+    /// We should get an unexpected EOF error, after reading the first 4 bytes
+    /// (successfully).
+    #[tokio::test]
+    async fn read_9b_eof_during_payload() {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..8 + 4])
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+        let mut buf = [0; 9];
+
+        r.read_exact(&mut buf[..4]).await.expect("must succeed");
+
+        assert_eq!(
+            r.read_exact(&mut buf[4..=4])
+                .await
+                .expect_err("must fail")
+                .kind(),
+            std::io::ErrorKind::UnexpectedEof
+        );
+    }
+
+    /// Start a 9 bytes payload packet, but don't supply the necessary padding.
+    /// This is expected to always fail before returning the final data.
+    #[rstest]
+    #[case::before_padding(8 + 9)]
+    #[case::during_padding(8 + 9 + 2)]
+    #[case::after_padding(8 + 9 + padding_len(9) as usize - 1)]
+    #[tokio::test]
+    async fn read_9b_eof_after_payload(#[case] offset: usize) {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..offset])
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+
+        // read_exact of the payload *body* will succeed, but a subsequent read will
+        // return UnexpectedEof error.
+        assert_eq!(r.read_exact(&mut [0; 8]).await.unwrap(), 8);
+        assert_eq!(
+            r.read_exact(&mut [0]).await.unwrap_err().kind(),
+            std::io::ErrorKind::UnexpectedEof
+        );
+    }
+
+    /// Start a 9 bytes payload packet, but return an error after a certain position.
+    /// Ensure that error is propagated.
+    #[rstest]
+    #[case::during_size(4)]
+    #[case::before_payload(8)]
+    #[case::during_payload(8 + 4)]
+    #[case::before_padding(8 + 4)]
+    #[case::during_padding(8 + 9 + 2)]
+    #[tokio::test]
+    async fn propagate_error_from_reader(#[case] offset: usize) {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..offset])
+            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
+            .build();
+
+        // Either length reading or data reading can fail, depending on which test case we're in.
+        let err: io::Error = async {
+            let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await?;
+            let mut buf = Vec::new();
+
+            r.read_to_end(&mut buf).await?;
+
+            Ok(())
+        }
+        .await
+        .expect_err("must fail");
+
+        assert_eq!(
+            err.kind(),
+            std::io::ErrorKind::Other,
+            "error kind must match"
+        );
+
+        assert_eq!(
+            err.into_inner().unwrap().to_string(),
+            "foo",
+            "error payload must contain foo"
+        );
+    }
+
+    /// If there's an error right after the padding, we don't propagate it, as
+    /// we're done reading. We just return EOF.
+    #[tokio::test]
+    async fn no_error_after_eof() {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await)
+            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+        let mut buf = Vec::new();
+
+        r.read_to_end(&mut buf).await.expect("must succeed");
+        assert_eq!(buf.as_slice(), payload);
+    }
+
+    /// Introduce various stalls in various places of the packet, to ensure we
+    /// handle these cases properly, too.
+    #[rstest]
+    #[case::beginning(0)]
+    #[case::before_payload(8)]
+    #[case::during_payload(8 + 4)]
+    #[case::before_padding(8 + 4)]
+    #[case::during_padding(8 + 9 + 2)]
+    #[tokio::test]
+    async fn read_payload_correct_pending(#[case] offset: usize) {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..offset])
+            .wait(Duration::from_nanos(0))
+            .read(&produce_packet_bytes(payload).await[offset..])
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64)
+            .await
+            .unwrap();
+        let mut buf = Vec::new();
+        r.read_to_end(&mut buf).await.expect("must succeed");
+
+        assert_eq!(payload, &buf[..]);
+    }
+}
diff --git a/tvix/nix-compat/src/wire/bytes/reader/trailer.rs b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs
new file mode 100644
index 0000000000..0b0c7b1355
--- /dev/null
+++ b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs
@@ -0,0 +1,198 @@
+use std::{
+    fmt::Debug,
+    future::Future,
+    marker::PhantomData,
+    ops::Deref,
+    pin::Pin,
+    task::{self, ready, Poll},
+};
+
+use tokio::io::{self, AsyncRead, ReadBuf};
+
+/// Trailer represents up to 7 bytes of data read as part of the trailer block(s)
+#[derive(Debug)]
+pub(crate) struct Trailer {
+    data_len: u8,
+    buf: [u8; 7],
+}
+
+impl Deref for Trailer {
+    type Target = [u8];
+
+    fn deref(&self) -> &Self::Target {
+        &self.buf[..self.data_len as usize]
+    }
+}
+
+/// Tag defines a "trailer tag": specific, fixed bytes that must follow wire data.
+pub(crate) trait Tag {
+    /// The expected suffix
+    ///
+    /// The first 7 bytes may be ignored, and it must be an 8-byte aligned size.
+    const PATTERN: &'static [u8];
+
+    /// Suitably sized buffer for reading [Self::PATTERN]
+    ///
+    /// HACK: This is a workaround for const generics limitations.
+    type Buf: AsRef<[u8]> + AsMut<[u8]> + Debug + Unpin;
+
+    /// Make an instance of [Self::Buf]
+    fn make_buf() -> Self::Buf;
+}
+
+#[derive(Debug)]
+pub enum Pad {}
+
+impl Tag for Pad {
+    const PATTERN: &'static [u8] = &[0; 8];
+
+    type Buf = [u8; 8];
+
+    fn make_buf() -> Self::Buf {
+        [0; 8]
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct ReadTrailer<R, T: Tag> {
+    reader: R,
+    data_len: u8,
+    filled: u8,
+    buf: T::Buf,
+    _phantom: PhantomData<fn(T) -> T>,
+}
+
+/// read_trailer returns a [Future] that reads a trailer with a given [Tag] from `reader`
+pub(crate) fn read_trailer<R: AsyncRead + Unpin, T: Tag>(
+    reader: R,
+    data_len: u8,
+) -> ReadTrailer<R, T> {
+    assert!(data_len < 8, "payload in trailer must be less than 8 bytes");
+
+    let buf = T::make_buf();
+    assert_eq!(buf.as_ref().len(), T::PATTERN.len());
+    assert_eq!(T::PATTERN.len() % 8, 0);
+
+    ReadTrailer {
+        reader,
+        data_len,
+        filled: if data_len != 0 { 0 } else { 8 },
+        buf,
+        _phantom: PhantomData,
+    }
+}
+
+impl<R, T: Tag> ReadTrailer<R, T> {
+    pub fn len(&self) -> u8 {
+        self.data_len
+    }
+}
+
+impl<R: AsyncRead + Unpin, T: Tag> Future for ReadTrailer<R, T> {
+    type Output = io::Result<Trailer>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
+        let this = &mut *self;
+
+        loop {
+            if this.filled >= this.data_len {
+                let check_range = || this.data_len as usize..this.filled as usize;
+
+                if this.buf.as_ref()[check_range()] != T::PATTERN[check_range()] {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        "invalid trailer",
+                    ))
+                    .into();
+                }
+            }
+
+            if this.filled as usize == T::PATTERN.len() {
+                let mut buf = [0; 7];
+                buf.copy_from_slice(&this.buf.as_ref()[..7]);
+
+                return Ok(Trailer {
+                    data_len: this.data_len,
+                    buf,
+                })
+                .into();
+            }
+
+            let mut buf = ReadBuf::new(this.buf.as_mut());
+            buf.advance(this.filled as usize);
+
+            ready!(Pin::new(&mut this.reader).poll_read(cx, &mut buf))?;
+
+            this.filled = {
+                let prev_filled = this.filled;
+                let filled = buf.filled().len() as u8;
+
+                if filled == prev_filled {
+                    return Err(io::ErrorKind::UnexpectedEof.into()).into();
+                }
+
+                filled
+            };
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn unexpected_eof() {
+        let reader = tokio_test::io::Builder::new()
+            .read(&[0xed])
+            .wait(Duration::ZERO)
+            .read(&[0xef, 0x00])
+            .build();
+
+        assert_eq!(
+            read_trailer::<_, Pad>(reader, 2).await.unwrap_err().kind(),
+            io::ErrorKind::UnexpectedEof
+        );
+    }
+
+    #[tokio::test]
+    async fn invalid_padding() {
+        let reader = tokio_test::io::Builder::new()
+            .read(&[0xed])
+            .wait(Duration::ZERO)
+            .read(&[0xef, 0x01, 0x00])
+            .wait(Duration::ZERO)
+            .build();
+
+        assert_eq!(
+            read_trailer::<_, Pad>(reader, 2).await.unwrap_err().kind(),
+            io::ErrorKind::InvalidData
+        );
+    }
+
+    #[tokio::test]
+    async fn success() {
+        let reader = tokio_test::io::Builder::new()
+            .read(&[0xed])
+            .wait(Duration::ZERO)
+            .read(&[0xef, 0x00])
+            .wait(Duration::ZERO)
+            .read(&[0x00, 0x00, 0x00, 0x00, 0x00])
+            .build();
+
+        assert_eq!(
+            &*read_trailer::<_, Pad>(reader, 2).await.unwrap(),
+            &[0xed, 0xef]
+        );
+    }
+
+    #[tokio::test]
+    async fn no_padding() {
+        assert!(read_trailer::<_, Pad>(io::empty(), 0)
+            .await
+            .unwrap()
+            .is_empty());
+    }
+}
diff --git a/tvix/nix-compat/src/wire/bytes/writer.rs b/tvix/nix-compat/src/wire/bytes/writer.rs
index 347934b3dc..f5632771e9 100644
--- a/tvix/nix-compat/src/wire/bytes/writer.rs
+++ b/tvix/nix-compat/src/wire/bytes/writer.rs
@@ -3,7 +3,7 @@ use std::task::{ready, Poll};
 
 use tokio::io::AsyncWrite;
 
-use super::{padding_len, BytesPacketPosition, EMPTY_BYTES, LEN_SIZE};
+use super::{padding_len, EMPTY_BYTES, LEN_SIZE};
 
 pin_project! {
     /// Writes a "bytes wire packet" to the underlying writer.
@@ -41,6 +41,22 @@ pin_project! {
     }
 }
 
+/// Models the position inside a "bytes wire packet" that the writer is in.
+/// It can be in three different stages, inside size, payload or padding fields.
+/// The number tracks the number of bytes written inside the specific field.
+/// There shall be no ambiguous states, at the end of a stage we immediately
+/// move to the beginning of the next one:
+/// - Size(LEN_SIZE) must be expressed as Payload(0)
+/// - Payload(self.payload_len) must be expressed as Padding(0)
+///
+/// Padding(padding_len) means we're at the end of the bytes wire packet.
+#[derive(Clone, Debug, PartialEq, Eq)]
+enum BytesPacketPosition {
+    Size(usize),
+    Payload(u64),
+    Padding(usize),
+}
+
 impl<W> BytesWriter<W>
 where
     W: AsyncWrite,
diff --git a/tvix/nix-compat/src/wire/mod.rs b/tvix/nix-compat/src/wire/mod.rs
index 65c053d58e..a197e3a1f4 100644
--- a/tvix/nix-compat/src/wire/mod.rs
+++ b/tvix/nix-compat/src/wire/mod.rs
@@ -3,6 +3,3 @@
 
 mod bytes;
 pub use bytes::*;
-
-mod primitive;
-pub use primitive::*;
diff --git a/tvix/nix-compat/src/wire/primitive.rs b/tvix/nix-compat/src/wire/primitive.rs
deleted file mode 100644
index ee0f5fc427..0000000000
--- a/tvix/nix-compat/src/wire/primitive.rs
+++ /dev/null
@@ -1,74 +0,0 @@
-// SPDX-FileCopyrightText: 2023 embr <git@liclac.eu>
-//
-// SPDX-License-Identifier: EUPL-1.2
-
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-
-#[allow(dead_code)]
-/// Read a u64 from the AsyncRead (little endian).
-pub async fn read_u64<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<u64> {
-    r.read_u64_le().await
-}
-
-/// Write a u64 to the AsyncWrite (little endian).
-pub async fn write_u64<W: AsyncWrite + Unpin>(w: &mut W, v: u64) -> std::io::Result<()> {
-    w.write_u64_le(v).await
-}
-
-#[allow(dead_code)]
-/// Read a boolean from the AsyncRead, encoded as u64 (>0 is true).
-pub async fn read_bool<R: AsyncRead + Unpin>(r: &mut R) -> std::io::Result<bool> {
-    Ok(read_u64(r).await? > 0)
-}
-
-#[allow(dead_code)]
-/// Write a boolean to the AsyncWrite, encoded as u64 (>0 is true).
-pub async fn write_bool<W: AsyncWrite + Unpin>(w: &mut W, v: bool) -> std::io::Result<()> {
-    write_u64(w, if v { 1u64 } else { 0u64 }).await
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use tokio_test::io::Builder;
-
-    // Integers.
-    #[tokio::test]
-    async fn test_read_u64() {
-        let mut mock = Builder::new().read(&1234567890u64.to_le_bytes()).build();
-        assert_eq!(1234567890u64, read_u64(&mut mock).await.unwrap());
-    }
-    #[tokio::test]
-    async fn test_write_u64() {
-        let mut mock = Builder::new().write(&1234567890u64.to_le_bytes()).build();
-        write_u64(&mut mock, 1234567890).await.unwrap();
-    }
-
-    // Booleans.
-    #[tokio::test]
-    async fn test_read_bool_0() {
-        let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
-        assert!(!read_bool(&mut mock).await.unwrap());
-    }
-    #[tokio::test]
-    async fn test_read_bool_1() {
-        let mut mock = Builder::new().read(&1u64.to_le_bytes()).build();
-        assert!(read_bool(&mut mock).await.unwrap());
-    }
-    #[tokio::test]
-    async fn test_read_bool_2() {
-        let mut mock = Builder::new().read(&2u64.to_le_bytes()).build();
-        assert!(read_bool(&mut mock).await.unwrap());
-    }
-
-    #[tokio::test]
-    async fn test_write_bool_false() {
-        let mut mock = Builder::new().write(&0u64.to_le_bytes()).build();
-        write_bool(&mut mock, false).await.unwrap();
-    }
-    #[tokio::test]
-    async fn test_write_bool_true() {
-        let mut mock = Builder::new().write(&1u64.to_le_bytes()).build();
-        write_bool(&mut mock, true).await.unwrap();
-    }
-}
diff --git a/tvix/shell.nix b/tvix/shell.nix
index 422f1c8dd4..f0d8ab1657 100644
--- a/tvix/shell.nix
+++ b/tvix/shell.nix
@@ -29,12 +29,10 @@ pkgs.mkShell {
     pkgs.cargo
     pkgs.cargo-machete
     pkgs.cargo-expand
-    pkgs.cbtemulator
     pkgs.clippy
     pkgs.evans
     pkgs.fuse
     pkgs.go
-    pkgs.google-cloud-bigtable-tool
     pkgs.grpcurl
     pkgs.hyperfine
     pkgs.mdbook
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index b549eeb7f5..4c0e9be49a 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -74,3 +74,7 @@ fuse = ["tvix-castore/fuse"]
 otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"]
 tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"]
 virtiofs = ["tvix-castore/virtiofs"]
+# Whether to run the integration tests.
+# Requires the following packages in $PATH:
+# cbtemulator, google-cloud-bigtable-tool
+integration = []
diff --git a/tvix/store/default.nix b/tvix/store/default.nix
index f30923ac27..ad47994f24 100644
--- a/tvix/store/default.nix
+++ b/tvix/store/default.nix
@@ -26,7 +26,6 @@ in
   runTests = true;
   testPreRun = ''
     export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
-    export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}"
   '';
 
   # enable some optional features.
@@ -34,7 +33,20 @@ in
     # virtiofs feature currently fails to build on Darwin.
     ++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs";
 }).overrideAttrs (_: {
+  meta.ci.targets = [ "integration-tests" ];
   meta.ci.extraSteps = {
     import-docs = (mkImportCheck "tvix/store/docs" ./docs);
   };
+  passthru.integration-tests = depot.tvix.crates.workspaceMembers.tvix-store.build.override {
+    runTests = true;
+    testPreRun = ''
+      export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt
+      export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}"
+    '';
+
+    # enable some optional features.
+    features = [ "default" "cloud" "integration" ]
+      # virtiofs feature currently fails to build on Darwin.
+      ++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs";
+  };
 })
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 8262a9e98c..fa30501e78 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -56,10 +56,6 @@ use tvix_store::proto::FILE_DESCRIPTOR_SET;
 #[derive(Parser)]
 #[command(author, version, about, long_about = None)]
 struct Cli {
-    /// Whether to log in JSON
-    #[arg(long)]
-    json: bool,
-
     /// Whether to configure OTLP. Set --otlp=false to disable.
     #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
     otlp: bool,
@@ -82,7 +78,11 @@ enum Commands {
         #[arg(long, short = 'l')]
         listen_address: Option<String>,
 
-        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")]
+        #[arg(
+            long,
+            env,
+            default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
+        )]
         blob_service_addr: String,
 
         #[arg(
@@ -218,33 +218,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let level = cli.log_level.unwrap_or(Level::INFO);
 
     // Set up the tracing subscriber.
-    let subscriber = tracing_subscriber::registry()
-        .with(
-            cli.json.then_some(
-                tracing_subscriber::fmt::Layer::new()
-                    .with_writer(std::io::stderr)
-                    .json()
-                    .with_filter(
-                        EnvFilter::builder()
-                            .with_default_directive(level.into())
-                            .from_env()
-                            .expect("invalid RUST_LOG"),
-                    ),
-            ),
-        )
-        .with(
-            (!cli.json).then_some(
-                tracing_subscriber::fmt::Layer::new()
-                    .with_writer(std::io::stderr)
-                    .pretty()
-                    .with_filter(
-                        EnvFilter::builder()
-                            .with_default_directive(level.into())
-                            .from_env()
-                            .expect("invalid RUST_LOG"),
-                    ),
+    let subscriber = tracing_subscriber::registry().with(
+        tracing_subscriber::fmt::Layer::new()
+            .with_writer(std::io::stderr)
+            .compact()
+            .with_filter(
+                EnvFilter::builder()
+                    .with_default_directive(level.into())
+                    .from_env()
+                    .expect("invalid RUST_LOG"),
             ),
-        );
+    );
 
     // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
     // then init the registry.
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index 7b6aeb824e..2331fd77ea 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -114,10 +114,12 @@ pub async fn import_path_as_nar_ca<BS, DS, PS, P>(
 where
     P: AsRef<Path> + std::fmt::Debug,
     BS: BlobService + Clone,
-    DS: AsRef<dyn DirectoryService>,
+    DS: DirectoryService,
     PS: AsRef<dyn PathInfoService>,
 {
-    let root_node = ingest_path(blob_service, directory_service, path.as_ref()).await?;
+    let root_node = ingest_path(blob_service, directory_service, path.as_ref())
+        .await
+        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
 
     // Ask the PathInfoService for the NAR size and sha256
     let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?;
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 6f4dcdea5d..70f8137e89 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -24,19 +24,19 @@ pub fn read_nar<R, BS, DS>(
 ) -> io::Result<castorepb::node::Node>
 where
     R: BufRead + Send,
-    BS: AsRef<dyn BlobService>,
-    DS: AsRef<dyn DirectoryService>,
+    BS: BlobService + Clone,
+    DS: DirectoryService,
 {
     let handle = tokio::runtime::Handle::current();
 
-    let directory_putter = directory_service.as_ref().put_multiple_start();
+    let directory_putter = directory_service.put_multiple_start();
 
     let node = nix_compat::nar::reader::open(r)?;
-    let (root_node, mut directory_putter, _) = process_node(
+    let (root_node, mut directory_putter) = process_node(
         handle.clone(),
         "".into(), // this is the root node, it has an empty name
         node,
-        &blob_service,
+        blob_service,
         directory_putter,
     )?;
 
@@ -80,9 +80,9 @@ fn process_node<BS>(
     node: nar::reader::Node,
     blob_service: BS,
     directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)>
+) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)>
 where
-    BS: AsRef<dyn BlobService>,
+    BS: BlobService + Clone,
 {
     Ok(match node {
         nar::reader::Node::Symlink { target } => (
@@ -91,7 +91,6 @@ where
                 target: target.into(),
             }),
             directory_putter,
-            blob_service,
         ),
         nar::reader::Node::File { executable, reader } => (
             castorepb::node::Node::File(process_file_reader(
@@ -99,19 +98,17 @@ where
                 name,
                 reader,
                 executable,
-                &blob_service,
+                blob_service,
             )?),
             directory_putter,
-            blob_service,
         ),
         nar::reader::Node::Directory(dir_reader) => {
-            let (directory_node, directory_putter, blob_service_back) =
+            let (directory_node, directory_putter) =
                 process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
 
             (
                 castorepb::node::Node::Directory(directory_node),
                 directory_putter,
-                blob_service_back,
             )
         }
     })
@@ -127,13 +124,13 @@ fn process_file_reader<BS>(
     blob_service: BS,
 ) -> io::Result<castorepb::FileNode>
 where
-    BS: AsRef<dyn BlobService>,
+    BS: BlobService,
 {
     // store the length. If we read any other length, reading will fail.
     let expected_len = file_reader.len();
 
     // prepare writing a new blob.
-    let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await });
+    let blob_writer = handle.block_on(async { blob_service.open_write().await });
 
     // write the blob.
     let mut blob_writer = {
@@ -168,24 +165,22 @@ fn process_dir_reader<BS>(
     mut dir_reader: nar::reader::DirReader,
     blob_service: BS,
     directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)>
+) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)>
 where
-    BS: AsRef<dyn BlobService>,
+    BS: BlobService + Clone,
 {
     let mut directory = castorepb::Directory::default();
 
     let mut directory_putter = directory_putter;
-    let mut blob_service = blob_service;
     while let Some(entry) = dir_reader.next()? {
-        let (node, directory_putter_back, blob_service_back) = process_node(
+        let (node, directory_putter_back) = process_node(
             handle.clone(),
             entry.name.into(),
             entry.node,
-            blob_service,
+            blob_service.clone(),
             directory_putter,
         )?;
 
-        blob_service = blob_service_back;
         directory_putter = directory_putter_back;
 
         match node {
@@ -213,7 +208,6 @@ where
             size: directory_size,
         },
         directory_putter,
-        blob_service,
     ))
 }
 
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs
index f49ef475eb..6fb52abbfd 100644
--- a/tvix/store/src/pathinfoservice/bigtable.rs
+++ b/tvix/store/src/pathinfoservice/bigtable.rs
@@ -116,7 +116,7 @@ impl BigtablePathInfoService {
             .stdout(Stdio::piped())
             .kill_on_drop(true)
             .spawn()
-            .expect("failed to spwan emulator");
+            .expect("failed to spawn emulator");
 
         Retry::spawn(
             ExponentialBackoff::from_millis(20)
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 1ff822ad35..f22884ca47 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -208,7 +208,7 @@ mod tests {
     #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
     /// A valid example for Bigtable.
     #[cfg_attr(
-        feature = "cloud",
+        all(feature = "cloud", feature = "integration"),
         case::bigtable_valid(
             "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1",
             true
@@ -216,7 +216,7 @@ mod tests {
     )]
     /// An invalid example for Bigtable, missing fields
     #[cfg_attr(
-        feature = "cloud",
+        all(feature = "cloud", feature = "integration"),
         case::bigtable_invalid_missing_fields("bigtable://instance-1", false)
     )]
     #[tokio::test]
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index 7b6d7fd7ab..0255c031e2 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -1,11 +1,13 @@
 use super::PathInfoService;
 use crate::nar::calculate_size_and_sha256;
 use crate::proto::PathInfo;
+use data_encoding::BASE64;
 use futures::stream::iter;
 use futures::stream::BoxStream;
 use prost::Message;
 use std::path::Path;
 use tonic::async_trait;
+use tracing::instrument;
 use tracing::warn;
 use tvix_castore::proto as castorepb;
 use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
@@ -57,53 +59,46 @@ where
     BS: AsRef<dyn BlobService> + Send + Sync,
     DS: AsRef<dyn DirectoryService> + Send + Sync,
 {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(None),
-            Ok(Some(data)) => match PathInfo::decode(&*data) {
-                Ok(path_info) => Ok(Some(path_info)),
-                Err(e) => {
+        match self.db.get(digest).map_err(|e| {
+            warn!("failed to retrieve PathInfo: {}", e);
+            Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+        })? {
+            None => Ok(None),
+            Some(data) => {
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
                     warn!("failed to decode stored PathInfo: {}", e);
-                    Err(Error::StorageError(format!(
-                        "failed to decode stored PathInfo: {}",
-                        e
-                    )))
-                }
-            },
-            Err(e) => {
-                warn!("failed to retrieve PathInfo: {}", e);
-                Err(Error::StorageError(format!(
-                    "failed to retrieve PathInfo: {}",
-                    e
-                )))
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+                Ok(Some(path_info))
             }
         }
     }
 
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
         // Call validate on the received PathInfo message.
-        match path_info.validate() {
-            Err(e) => Err(Error::InvalidRequest(format!(
-                "failed to validate PathInfo: {}",
-                e
-            ))),
-            // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
-            // This overwrites existing PathInfo objects.
-            Ok(nix_path) => match self
-                .db
-                .insert(*nix_path.digest(), path_info.encode_to_vec())
-            {
-                Ok(_) => Ok(path_info),
-                Err(e) => {
-                    warn!("failed to insert PathInfo: {}", e);
-                    Err(Error::StorageError(format! {
-                        "failed to insert PathInfo: {}", e
-                    }))
-                }
-            },
-        }
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?;
+
+        // In case the PathInfo is valid, we were able to parse a StorePath.
+        // Store it in the database, keyed by its digest.
+        // This overwrites existing PathInfo objects.
+        self.db
+            .insert(store_path.digest(), path_info.encode_to_vec())
+            .map_err(|e| {
+                warn!("failed to insert PathInfo: {}", e);
+                Error::StorageError(format! {
+                    "failed to insert PathInfo: {}", e
+                })
+            })?;
+
+        Ok(path_info)
     }
 
+    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
     async fn calculate_nar(
         &self,
         root_node: &castorepb::node::Node,
@@ -114,27 +109,17 @@ where
     }
 
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
-        Box::pin(iter(self.db.iter().values().map(|v| match v {
-            Ok(data) => {
-                // we retrieved some bytes
-                match PathInfo::decode(&*data) {
-                    Ok(path_info) => Ok(path_info),
-                    Err(e) => {
-                        warn!("failed to decode stored PathInfo: {}", e);
-                        Err(Error::StorageError(format!(
-                            "failed to decode stored PathInfo: {}",
-                            e
-                        )))
-                    }
-                }
-            }
-            Err(e) => {
+        Box::pin(iter(self.db.iter().values().map(|v| {
+            let data = v.map_err(|e| {
                 warn!("failed to retrieve PathInfo: {}", e);
-                Err(Error::StorageError(format!(
-                    "failed to retrieve PathInfo: {}",
-                    e
-                )))
-            }
+                Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+            })?;
+
+            let path_info = PathInfo::decode(&*data).map_err(|e| {
+                warn!("failed to decode stored PathInfo: {}", e);
+                Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+            })?;
+            Ok(path_info)
         })))
     }
 }
diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs
index c9b9a06377..9719371592 100644
--- a/tvix/store/src/pathinfoservice/tests/mod.rs
+++ b/tvix/store/src/pathinfoservice/tests/mod.rs
@@ -51,7 +51,7 @@ pub async fn make_path_info_service(uri: &str) -> BSDSPS {
 #[case::memory(make_path_info_service("memory://").await)]
 #[case::grpc(make_grpc_path_info_service_client().await)]
 #[case::sled(make_path_info_service("sled://").await)]
-#[cfg_attr(feature = "cloud", case::bigtable(make_path_info_service("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await))]
+#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_path_info_service("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await))]
 pub fn path_info_services(
     #[case] services: (
         impl BlobService,