about summary refs log tree commit diff
path: root/tvix/nix-compat/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/nix-compat/src')
-rw-r--r--tvix/nix-compat/src/derivation/mod.rs45
-rw-r--r--tvix/nix-compat/src/derivation/tests/mod.rs32
-rw-r--r--tvix/nix-compat/src/nar/mod.rs2
-rw-r--r--tvix/nix-compat/src/nar/reader/async/mod.rs173
-rw-r--r--tvix/nix-compat/src/nar/reader/async/read.rs69
-rw-r--r--tvix/nix-compat/src/nar/reader/async/test.rs310
-rw-r--r--tvix/nix-compat/src/nar/reader/mod.rs268
-rw-r--r--tvix/nix-compat/src/nar/reader/read.rs32
-rw-r--r--tvix/nix-compat/src/nar/reader/test.rs272
-rw-r--r--tvix/nix-compat/src/nar/wire/mod.rs17
-rw-r--r--tvix/nix-compat/src/nar/wire/tag.rs1
-rw-r--r--tvix/nix-compat/src/nix_daemon/worker_protocol.rs73
-rw-r--r--tvix/nix-compat/src/store_path/mod.rs31
-rw-r--r--tvix/nix-compat/src/wire/bytes/mod.rs169
-rw-r--r--tvix/nix-compat/src/wire/bytes/reader.rs464
-rw-r--r--tvix/nix-compat/src/wire/bytes/reader/mod.rs684
-rw-r--r--tvix/nix-compat/src/wire/bytes/reader/trailer.rs197
-rw-r--r--tvix/nix-compat/src/wire/bytes/writer.rs18
-rw-r--r--tvix/nix-compat/src/wire/mod.rs3
-rw-r--r--tvix/nix-compat/src/wire/primitive.rs74
20 files changed, 2141 insertions, 793 deletions
diff --git a/tvix/nix-compat/src/derivation/mod.rs b/tvix/nix-compat/src/derivation/mod.rs
index 07da127ed0..6e12e3ea86 100644
--- a/tvix/nix-compat/src/derivation/mod.rs
+++ b/tvix/nix-compat/src/derivation/mod.rs
@@ -188,11 +188,12 @@ impl Derivation {
     ///    `fixed:out:${algo}:${digest}:${fodPath}` string is hashed instead of
     ///    the A-Term.
     ///
-    /// If the derivation is not a fixed derivation, it's up to the caller of
-    /// this function to provide a lookup function to lookup these calculation
-    /// results of parent derivations at `fn_get_derivation_or_fod_hash` (by
-    /// drv path).
-    pub fn derivation_or_fod_hash<F>(&self, fn_get_derivation_or_fod_hash: F) -> [u8; 32]
+    /// It's up to the caller of this function to provide a (infallible) lookup
+    /// function to query [hash_derivation_modulo] of direct input derivations,
+    /// by their [StorePathRef].
+    /// It will only be called in case the derivation is not a fixed-output
+    /// derivation.
+    pub fn hash_derivation_modulo<F>(&self, fn_lookup_hash_derivation_modulo: F) -> [u8; 32]
     where
         F: Fn(&StorePathRef) -> [u8; 32],
     {
@@ -200,16 +201,16 @@ impl Derivation {
         // Non-Fixed-output derivations return the sha256 digest of the ATerm
         // notation, but with all input_derivation paths replaced by a recursive
         // call to this function.
-        // We use fn_get_derivation_or_fod_hash here, so callers can precompute this.
+        // We call [fn_lookup_hash_derivation_modulo] rather than recursing
+        // ourselves, so callers can precompute this.
         self.fod_digest().unwrap_or({
-            // For each input_derivation, look up the
-            // derivation_or_fod_hash, and replace the derivation path with
-            // it's HEXLOWER digest.
+            // For each input_derivation, look up the hash derivation modulo,
+            // and replace the derivation path in the aterm with it's HEXLOWER digest.
             let aterm_bytes = self.to_aterm_bytes_with_replacements(&BTreeMap::from_iter(
                 self.input_derivations
                     .iter()
                     .map(|(drv_path, output_names)| {
-                        let hash = fn_get_derivation_or_fod_hash(&drv_path.into());
+                        let hash = fn_lookup_hash_derivation_modulo(&drv_path.into());
 
                         (hash, output_names.to_owned())
                     }),
@@ -226,20 +227,22 @@ impl Derivation {
     /// and self.environment[$outputName] needs to be an empty string.
     ///
     /// Output path calculation requires knowledge of the
-    /// derivation_or_fod_hash [NixHash], which (in case of non-fixed-output
-    /// derivations) also requires knowledge of other hash_derivation_modulo
-    /// [NixHash]es.
+    /// [hash_derivation_modulo], which (in case of non-fixed-output
+    /// derivations) also requires knowledge of the [hash_derivation_modulo] of
+    /// input derivations (recursively).
     ///
-    /// We solve this by asking the caller of this function to provide the
-    /// hash_derivation_modulo of the current Derivation.
+    /// To avoid recursing and doing unnecessary calculation, we simply
+    /// ask the caller of this function to provide the result of the
+    /// [hash_derivation_modulo] call of the current [Derivation],
+    /// and leave it up to them to calculate it when needed.
     ///
-    /// On completion, self.environment[$outputName] and
-    /// self.outputs[$outputName].path are set to the calculated output path for all
+    /// On completion, `self.environment[$outputName]` and
+    /// `self.outputs[$outputName].path` are set to the calculated output path for all
     /// outputs.
     pub fn calculate_output_paths(
         &mut self,
         name: &str,
-        derivation_or_fod_hash: &[u8; 32],
+        hash_derivation_modulo: &[u8; 32],
     ) -> Result<(), DerivationError> {
         // The fingerprint and hash differs per output
         for (output_name, output) in self.outputs.iter_mut() {
@@ -250,14 +253,14 @@ impl Derivation {
 
             let path_name = output_path_name(name, output_name);
 
-            // For fixed output derivation we use the per-output info, otherwise we use the
-            // derivation hash.
+            // For fixed output derivation we use [build_ca_path], otherwise we
+            // use [build_output_path] with [hash_derivation_modulo].
             let abs_store_path = if let Some(ref hwm) = output.ca_hash {
                 build_ca_path(&path_name, hwm, Vec::<String>::new(), false).map_err(|e| {
                     DerivationError::InvalidOutputDerivationPath(output_name.to_string(), e)
                 })?
             } else {
-                build_output_path(derivation_or_fod_hash, output_name, &path_name).map_err(|e| {
+                build_output_path(hash_derivation_modulo, output_name, &path_name).map_err(|e| {
                     DerivationError::InvalidOutputDerivationPath(
                         output_name.to_string(),
                         store_path::BuildStorePathError::InvalidStorePath(e),
diff --git a/tvix/nix-compat/src/derivation/tests/mod.rs b/tvix/nix-compat/src/derivation/tests/mod.rs
index 63a65356bd..48d4e8926a 100644
--- a/tvix/nix-compat/src/derivation/tests/mod.rs
+++ b/tvix/nix-compat/src/derivation/tests/mod.rs
@@ -164,7 +164,7 @@ fn derivation_path(#[case] name: &str, #[case] expected_path: &str) {
 
 /// This trims all output paths from a Derivation struct,
 /// by setting outputs[$outputName].path and environment[$outputName] to the empty string.
-fn derivation_with_trimmed_output_paths(derivation: &Derivation) -> Derivation {
+fn derivation_without_output_paths(derivation: &Derivation) -> Derivation {
     let mut trimmed_env = derivation.environment.clone();
     let mut trimmed_outputs = derivation.outputs.clone();
 
@@ -191,13 +191,13 @@ fn derivation_with_trimmed_output_paths(derivation: &Derivation) -> Derivation {
 #[rstest]
 #[case::fixed_sha256("0hm2f1psjpcwg8fijsmr4wwxrx59s092-bar.drv", hex!("724f3e3634fce4cbbbd3483287b8798588e80280660b9a63fd13a1bc90485b33"))]
 #[case::fixed_sha1("ss2p4wmxijn652haqyd7dckxwl4c7hxx-bar.drv", hex!("c79aebd0ce3269393d4a1fde2cbd1d975d879b40f0bf40a48f550edc107fd5df"))]
-fn derivation_or_fod_hash(#[case] drv_path: &str, #[case] expected_digest: [u8; 32]) {
+fn hash_derivation_modulo_fixed(#[case] drv_path: &str, #[case] expected_digest: [u8; 32]) {
     // read in the fixture
     let json_bytes =
         fs::read(format!("{}/ok/{}.json", RESOURCES_PATHS, drv_path)).expect("unable to read JSON");
     let drv: Derivation = serde_json::from_slice(&json_bytes).expect("must deserialize");
 
-    let actual = drv.derivation_or_fod_hash(|_| panic!("must not be called"));
+    let actual = drv.hash_derivation_modulo(|_| panic!("must not be called"));
     assert_eq!(expected_digest, actual);
 }
 
@@ -224,13 +224,13 @@ fn output_paths(#[case] name: &str, #[case] drv_path_str: &str) {
     )
     .expect("must succeed");
 
-    // create a version with trimmed output paths, simulating we constructed
-    // the struct.
-    let mut derivation = derivation_with_trimmed_output_paths(&expected_derivation);
+    // create a version without output paths, simulating we constructed the
+    // struct.
+    let mut derivation = derivation_without_output_paths(&expected_derivation);
 
-    // calculate the derivation_or_fod_hash of derivation
+    // calculate the hash_derivation_modulo of Derivation
     // We don't expect the lookup function to be called for most derivations.
-    let calculated_derivation_or_fod_hash = derivation.derivation_or_fod_hash(|parent_drv_path| {
+    let actual_hash_derivation_modulo = derivation.hash_derivation_modulo(|parent_drv_path| {
         // 4wvvbi4jwn0prsdxb7vs673qa5h9gr7x-foo.drv may lookup /nix/store/0hm2f1psjpcwg8fijsmr4wwxrx59s092-bar.drv
         // ch49594n9avinrf8ip0aslidkc4lxkqv-foo.drv may lookup /nix/store/ss2p4wmxijn652haqyd7dckxwl4c7hxx-bar.drv
         if name == "foo"
@@ -255,9 +255,9 @@ fn output_paths(#[case] name: &str, #[case] drv_path_str: &str) {
 
             let drv: Derivation = serde_json::from_slice(&json_bytes).expect("must deserialize");
 
-            // calculate derivation_or_fod_hash for each parent.
+            // calculate hash_derivation_modulo for each parent.
             // This may not trigger subsequent requests, as both parents are FOD.
-            drv.derivation_or_fod_hash(|_| panic!("must not lookup"))
+            drv.hash_derivation_modulo(|_| panic!("must not lookup"))
         } else {
             // we only expect this to be called in the "foo" testcase, for the "bar derivations"
             panic!("may only be called for foo testcase on bar derivations");
@@ -265,7 +265,7 @@ fn output_paths(#[case] name: &str, #[case] drv_path_str: &str) {
     });
 
     derivation
-        .calculate_output_paths(name, &calculated_derivation_or_fod_hash)
+        .calculate_output_paths(name, &actual_hash_derivation_modulo)
         .unwrap();
 
     // The derivation should now look like it was before
@@ -343,7 +343,7 @@ fn output_path_construction() {
     // calculate bar output paths
     let bar_calc_result = bar_drv.calculate_output_paths(
         "bar",
-        &bar_drv.derivation_or_fod_hash(|_| panic!("is FOD, should not lookup")),
+        &bar_drv.hash_derivation_modulo(|_| panic!("is FOD, should not lookup")),
     );
     assert!(bar_calc_result.is_ok());
 
@@ -360,8 +360,8 @@ fn output_path_construction() {
     // now construct foo, which requires bar_drv
     // Note how we refer to the output path, drv name and replacement_str (with calculated output paths) of bar.
     let bar_output_path = &bar_drv.outputs.get("out").expect("must exist").path;
-    let bar_drv_derivation_or_fod_hash =
-        bar_drv.derivation_or_fod_hash(|_| panic!("is FOD, should not lookup"));
+    let bar_drv_hash_derivation_modulo =
+        bar_drv.hash_derivation_modulo(|_| panic!("is FOD, should not lookup"));
 
     let bar_drv_path = bar_drv
         .calculate_derivation_path("bar")
@@ -408,11 +408,11 @@ fn output_path_construction() {
     // calculate foo output paths
     let foo_calc_result = foo_drv.calculate_output_paths(
         "foo",
-        &foo_drv.derivation_or_fod_hash(|drv_path| {
+        &foo_drv.hash_derivation_modulo(|drv_path| {
             if drv_path.to_string() != "0hm2f1psjpcwg8fijsmr4wwxrx59s092-bar.drv" {
                 panic!("lookup called with unexpected drv_path: {}", drv_path);
             }
-            bar_drv_derivation_or_fod_hash
+            bar_drv_hash_derivation_modulo
         }),
     );
     assert!(foo_calc_result.is_ok());
diff --git a/tvix/nix-compat/src/nar/mod.rs b/tvix/nix-compat/src/nar/mod.rs
index 058977f4fc..c678d26ffb 100644
--- a/tvix/nix-compat/src/nar/mod.rs
+++ b/tvix/nix-compat/src/nar/mod.rs
@@ -1,4 +1,4 @@
-mod wire;
+pub(crate) mod wire;
 
 pub mod reader;
 pub mod writer;
diff --git a/tvix/nix-compat/src/nar/reader/async/mod.rs b/tvix/nix-compat/src/nar/reader/async/mod.rs
new file mode 100644
index 0000000000..0808fba38c
--- /dev/null
+++ b/tvix/nix-compat/src/nar/reader/async/mod.rs
@@ -0,0 +1,173 @@
+use std::{
+    mem::MaybeUninit,
+    pin::Pin,
+    task::{self, Poll},
+};
+
+use tokio::io::{self, AsyncBufRead, AsyncRead, ErrorKind::InvalidData};
+
+// Required reading for understanding this module.
+use crate::{
+    nar::{self, wire::PadPar},
+    wire::{self, BytesReader},
+};
+
+mod read;
+#[cfg(test)]
+mod test;
+
+pub type Reader<'a> = dyn AsyncBufRead + Unpin + Send + 'a;
+
+/// Start reading a NAR file from `reader`.
+pub async fn open<'a, 'r>(reader: &'a mut Reader<'r>) -> io::Result<Node<'a, 'r>> {
+    read::token(reader, &nar::wire::TOK_NAR).await?;
+    Node::new(reader).await
+}
+
+pub enum Node<'a, 'r: 'a> {
+    Symlink {
+        target: Vec<u8>,
+    },
+    File {
+        executable: bool,
+        reader: FileReader<'a, 'r>,
+    },
+    Directory(DirReader<'a, 'r>),
+}
+
+impl<'a, 'r: 'a> Node<'a, 'r> {
+    /// Start reading a [Node], matching the next [wire::Node].
+    ///
+    /// Reading the terminating [wire::TOK_PAR] is done immediately for [Node::Symlink],
+    /// but is otherwise left to [DirReader] or [BytesReader].
+    async fn new(reader: &'a mut Reader<'r>) -> io::Result<Self> {
+        Ok(match read::tag(reader).await? {
+            nar::wire::Node::Sym => {
+                let target = wire::read_bytes(reader, 1..=nar::wire::MAX_TARGET_LEN).await?;
+
+                if target.contains(&0) {
+                    return Err(InvalidData.into());
+                }
+
+                read::token(reader, &nar::wire::TOK_PAR).await?;
+
+                Node::Symlink { target }
+            }
+            tag @ (nar::wire::Node::Reg | nar::wire::Node::Exe) => Node::File {
+                executable: tag == nar::wire::Node::Exe,
+                reader: FileReader {
+                    inner: BytesReader::new_internal(reader, ..).await?,
+                },
+            },
+            nar::wire::Node::Dir => Node::Directory(DirReader::new(reader)),
+        })
+    }
+}
+
+/// File contents, readable through the [AsyncRead] trait.
+///
+/// It comes with some caveats:
+///  * You must always read the entire file, unless you intend to abandon the entire archive reader.
+///  * You must abandon the entire archive reader upon the first error.
+///
+/// It's fine to read exactly `reader.len()` bytes without ever seeing an explicit EOF.
+pub struct FileReader<'a, 'r> {
+    inner: BytesReader<&'a mut Reader<'r>, PadPar>,
+}
+
+impl<'a, 'r> FileReader<'a, 'r> {
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    pub fn len(&self) -> u64 {
+        self.inner.len()
+    }
+}
+
+impl<'a, 'r> AsyncRead for FileReader<'a, 'r> {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut task::Context,
+        buf: &mut io::ReadBuf,
+    ) -> Poll<io::Result<()>> {
+        Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
+    }
+}
+
+impl<'a, 'r> AsyncBufRead for FileReader<'a, 'r> {
+    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<io::Result<&[u8]>> {
+        Pin::new(&mut self.get_mut().inner).poll_fill_buf(cx)
+    }
+
+    fn consume(self: Pin<&mut Self>, amt: usize) {
+        Pin::new(&mut self.get_mut().inner).consume(amt)
+    }
+}
+
+/// A directory iterator, yielding a sequence of [Node]s.
+/// It must be fully consumed before reading further from the [DirReader] that produced it, if any.
+pub struct DirReader<'a, 'r> {
+    reader: &'a mut Reader<'r>,
+    /// Previous directory entry name.
+    /// We have to hang onto this to enforce name monotonicity.
+    prev_name: Vec<u8>,
+}
+
+pub struct Entry<'a, 'r> {
+    pub name: &'a [u8],
+    pub node: Node<'a, 'r>,
+}
+
+impl<'a, 'r> DirReader<'a, 'r> {
+    fn new(reader: &'a mut Reader<'r>) -> Self {
+        Self {
+            reader,
+            prev_name: vec![],
+        }
+    }
+
+    /// Read the next [Entry] from the directory.
+    ///
+    /// We explicitly don't implement [Iterator], since treating this as
+    /// a regular Rust iterator will surely lead you astray.
+    ///
+    ///  * You must always consume the entire iterator, unless you abandon the entire archive reader.
+    ///  * You must abandon the entire archive reader on the first error.
+    ///  * You must abandon the directory reader upon the first [None].
+    ///  * Even if you know the amount of elements up front, you must keep reading until you encounter [None].
+    pub async fn next(&mut self) -> io::Result<Option<Entry<'_, 'r>>> {
+        // COME FROM the previous iteration: if we've already read an entry,
+        // read its terminating TOK_PAR here.
+        if !self.prev_name.is_empty() {
+            read::token(self.reader, &nar::wire::TOK_PAR).await?;
+        }
+
+        if let nar::wire::Entry::None = read::tag(self.reader).await? {
+            return Ok(None);
+        }
+
+        let mut name = [MaybeUninit::uninit(); nar::wire::MAX_NAME_LEN + 1];
+        let name =
+            wire::read_bytes_buf(self.reader, &mut name, 1..=nar::wire::MAX_NAME_LEN).await?;
+
+        if name.contains(&0) || name.contains(&b'/') || name == b"." || name == b".." {
+            return Err(InvalidData.into());
+        }
+
+        // Enforce strict monotonicity of directory entry names.
+        if &self.prev_name[..] >= name {
+            return Err(InvalidData.into());
+        }
+
+        self.prev_name.clear();
+        self.prev_name.extend_from_slice(name);
+
+        read::token(self.reader, &nar::wire::TOK_NOD).await?;
+
+        Ok(Some(Entry {
+            name: &self.prev_name,
+            node: Node::new(self.reader).await?,
+        }))
+    }
+}
diff --git a/tvix/nix-compat/src/nar/reader/async/read.rs b/tvix/nix-compat/src/nar/reader/async/read.rs
new file mode 100644
index 0000000000..2adf894922
--- /dev/null
+++ b/tvix/nix-compat/src/nar/reader/async/read.rs
@@ -0,0 +1,69 @@
+use tokio::io::{
+    self, AsyncReadExt,
+    ErrorKind::{InvalidData, UnexpectedEof},
+};
+
+use crate::nar::wire::Tag;
+
+use super::Reader;
+
+/// Consume a known token from the reader.
+pub async fn token<const N: usize>(reader: &mut Reader<'_>, token: &[u8; N]) -> io::Result<()> {
+    let mut buf = [0u8; N];
+
+    // This implements something similar to [AsyncReadExt::read_exact], but verifies that
+    // the input data matches the token while we read it. These two slices respectively
+    // represent the remaining token to be verified, and the remaining input buffer.
+    let mut token = &token[..];
+    let mut buf = &mut buf[..];
+
+    while !token.is_empty() {
+        match reader.read(buf).await? {
+            0 => {
+                return Err(UnexpectedEof.into());
+            }
+            n => {
+                let (t, b);
+                (t, token) = token.split_at(n);
+                (b, buf) = buf.split_at_mut(n);
+
+                if t != b {
+                    return Err(InvalidData.into());
+                }
+            }
+        }
+    }
+
+    Ok(())
+}
+
+/// Consume a [Tag] from the reader.
+pub async fn tag<T: Tag>(reader: &mut Reader<'_>) -> io::Result<T> {
+    let mut buf = T::make_buf();
+    let buf = buf.as_mut();
+
+    // first read the known minimum length…
+    reader.read_exact(&mut buf[..T::MIN]).await?;
+
+    // then decide which tag we're expecting
+    let tag = T::from_u8(buf[T::OFF]).ok_or(InvalidData)?;
+    let (head, tail) = tag.as_bytes().split_at(T::MIN);
+
+    // make sure what we've read so far is valid
+    if buf[..T::MIN] != *head {
+        return Err(InvalidData.into());
+    }
+
+    // …then read the rest, if any
+    if !tail.is_empty() {
+        let rest = tail.len();
+        reader.read_exact(&mut buf[..rest]).await?;
+
+        // and make sure it's what we expect
+        if buf[..rest] != *tail {
+            return Err(InvalidData.into());
+        }
+    }
+
+    Ok(tag)
+}
diff --git a/tvix/nix-compat/src/nar/reader/async/test.rs b/tvix/nix-compat/src/nar/reader/async/test.rs
new file mode 100644
index 0000000000..7bc1f8942f
--- /dev/null
+++ b/tvix/nix-compat/src/nar/reader/async/test.rs
@@ -0,0 +1,310 @@
+use tokio::io::AsyncReadExt;
+
+mod nar {
+    pub use crate::nar::reader::r#async as reader;
+}
+
+#[tokio::test]
+async fn symlink() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/symlink.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Symlink { target } => {
+            assert_eq!(
+                &b"/nix/store/somewhereelse"[..],
+                &target,
+                "target must match"
+            );
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+async fn file() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/helloworld.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            assert!(!executable);
+            let mut buf = vec![];
+            reader
+                .read_to_end(&mut buf)
+                .await
+                .expect("read must succeed");
+            assert_eq!(&b"Hello World!"[..], &buf);
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+async fn complicated() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            )
+            .await;
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(b"keep", entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(mut subdir_reader) => {
+                        {
+                            // first entry is .keep, an empty regular file.
+                            let entry = subdir_reader
+                                .next()
+                                .await
+                                .expect("next must succeed")
+                                .expect("must be some");
+
+                            must_read_file(".keep", entry).await;
+                        }
+
+                        // we must read the None
+                        assert!(
+                            subdir_reader
+                                .next()
+                                .await
+                                .expect("next must succeed")
+                                .is_none(),
+                            "keep directory contains only .keep"
+                        );
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
+                }
+            };
+
+            // reading more entries yields None (and we actually must read until this)
+            assert!(dir_reader.next().await.expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+#[should_panic]
+#[ignore = "TODO: async poisoning"]
+async fn file_read_abandoned() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            {
+                let entry = dir_reader
+                    .next()
+                    .await
+                    .expect("next must succeed")
+                    .expect("must be some");
+
+                assert_eq!(b".keep", entry.name);
+                // don't bother to finish reading it.
+            };
+
+            // this should panic (not return an error), because we are meant to abandon the archive reader now.
+            assert!(dir_reader.next().await.expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+#[should_panic]
+#[ignore = "TODO: async poisoning"]
+async fn dir_read_abandoned() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            )
+            .await;
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(b"keep", entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(_) => {
+                        // don't finish using it, which poisons the archive reader
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
+                }
+            };
+
+            // this should panic, because we didn't finish reading the child subdirectory
+            assert!(dir_reader.next().await.expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[tokio::test]
+#[should_panic]
+#[ignore = "TODO: async poisoning"]
+async fn dir_read_after_none() {
+    let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).await.unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            )
+            .await;
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .await
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(b"keep", entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(mut subdir_reader) => {
+                        // first entry is .keep, an empty regular file.
+                        must_read_file(
+                            ".keep",
+                            subdir_reader
+                                .next()
+                                .await
+                                .expect("next must succeed")
+                                .expect("must be some"),
+                        )
+                        .await;
+
+                        // we must read the None
+                        assert!(
+                            subdir_reader
+                                .next()
+                                .await
+                                .expect("next must succeed")
+                                .is_none(),
+                            "keep directory contains only .keep"
+                        );
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
+                }
+            };
+
+            // reading more entries yields None (and we actually must read until this)
+            assert!(dir_reader.next().await.expect("must succeed").is_none());
+
+            // this should panic, because we already got a none so we're meant to stop.
+            dir_reader.next().await.unwrap();
+            unreachable!()
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+async fn must_read_file(name: &'static str, entry: nar::reader::Entry<'_, '_>) {
+    assert_eq!(name.as_bytes(), entry.name);
+
+    match entry.node {
+        nar::reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            assert!(!executable);
+            assert_eq!(reader.read(&mut [0]).await.unwrap(), 0);
+        }
+        _ => panic!("unexpected type for {}", name),
+    }
+}
+
+fn must_be_symlink(
+    name: &'static str,
+    exp_target: &'static str,
+    entry: nar::reader::Entry<'_, '_>,
+) {
+    assert_eq!(name.as_bytes(), entry.name);
+
+    match entry.node {
+        nar::reader::Node::Symlink { target } => {
+            assert_eq!(exp_target.as_bytes(), &target);
+        }
+        _ => panic!("unexpected type for {}", name),
+    }
+}
diff --git a/tvix/nix-compat/src/nar/reader/mod.rs b/tvix/nix-compat/src/nar/reader/mod.rs
index fa7ddc77f9..7e9143c8f3 100644
--- a/tvix/nix-compat/src/nar/reader/mod.rs
+++ b/tvix/nix-compat/src/nar/reader/mod.rs
@@ -10,19 +10,50 @@ use std::io::{
     Read, Write,
 };
 
+#[cfg(not(debug_assertions))]
+use std::marker::PhantomData;
+
 // Required reading for understanding this module.
 use crate::nar::wire;
 
+#[cfg(all(feature = "async", feature = "wire"))]
+pub mod r#async;
+
 mod read;
 #[cfg(test)]
 mod test;
 
 pub type Reader<'a> = dyn BufRead + Send + 'a;
 
+struct ArchiveReader<'a, 'r> {
+    inner: &'a mut Reader<'r>,
+
+    /// In debug mode, also track when we need to abandon this archive reader.
+    /// The archive reader must be abandoned when:
+    ///   * An error is encountered at any point
+    ///   * A file or directory reader is dropped before being read entirely.
+    /// All of these checks vanish in release mode.
+    status: ArchiveReaderStatus<'a>,
+}
+
+macro_rules! try_or_poison {
+    ($it:expr, $ex:expr) => {
+        match $ex {
+            Ok(x) => x,
+            Err(e) => {
+                $it.status.poison();
+                return Err(e.into());
+            }
+        }
+    };
+}
 /// Start reading a NAR file from `reader`.
 pub fn open<'a, 'r>(reader: &'a mut Reader<'r>) -> io::Result<Node<'a, 'r>> {
     read::token(reader, &wire::TOK_NAR)?;
-    Node::new(reader)
+    Node::new(ArchiveReader {
+        inner: reader,
+        status: ArchiveReaderStatus::top(),
+    })
 }
 
 pub enum Node<'a, 'r> {
@@ -41,21 +72,24 @@ impl<'a, 'r> Node<'a, 'r> {
     ///
     /// Reading the terminating [wire::TOK_PAR] is done immediately for [Node::Symlink],
     /// but is otherwise left to [DirReader] or [FileReader].
-    fn new(reader: &'a mut Reader<'r>) -> io::Result<Self> {
-        Ok(match read::tag(reader)? {
+    fn new(mut reader: ArchiveReader<'a, 'r>) -> io::Result<Self> {
+        Ok(match read::tag(reader.inner)? {
             wire::Node::Sym => {
-                let target = read::bytes(reader, wire::MAX_TARGET_LEN)?;
+                let target =
+                    try_or_poison!(reader, read::bytes(reader.inner, wire::MAX_TARGET_LEN));
 
                 if target.is_empty() || target.contains(&0) {
+                    reader.status.poison();
                     return Err(InvalidData.into());
                 }
 
-                read::token(reader, &wire::TOK_PAR)?;
+                try_or_poison!(reader, read::token(reader.inner, &wire::TOK_PAR));
+                reader.status.ready_parent(); // Immediately allow reading from parent again
 
                 Node::Symlink { target }
             }
             tag @ (wire::Node::Reg | wire::Node::Exe) => {
-                let len = read::u64(reader)?;
+                let len = try_or_poison!(&mut reader, read::u64(reader.inner));
 
                 Node::File {
                     executable: tag == wire::Node::Exe,
@@ -74,10 +108,8 @@ impl<'a, 'r> Node<'a, 'r> {
 ///  * You must abandon the entire archive reader upon the first error.
 ///
 /// It's fine to read exactly `reader.len()` bytes without ever seeing an explicit EOF.
-///
-/// TODO(edef): enforce these in `#[cfg(debug_assertions)]`
 pub struct FileReader<'a, 'r> {
-    reader: &'a mut Reader<'r>,
+    reader: ArchiveReader<'a, 'r>,
     len: u64,
     /// Truncated original file length for padding computation.
     /// We only care about the 3 least significant bits; semantically, this is a u3.
@@ -87,12 +119,13 @@ pub struct FileReader<'a, 'r> {
 impl<'a, 'r> FileReader<'a, 'r> {
     /// Instantiate a new reader, starting after [wire::TOK_REG] or [wire::TOK_EXE].
     /// We handle the terminating [wire::TOK_PAR] on semantic EOF.
-    fn new(reader: &'a mut Reader<'r>, len: u64) -> io::Result<Self> {
+    fn new(mut reader: ArchiveReader<'a, 'r>, len: u64) -> io::Result<Self> {
         // For zero-length files, we have to read the terminating TOK_PAR
         // immediately, since FileReader::read may never be called; we've
         // already reached semantic EOF by definition.
         if len == 0 {
-            read::token(reader, &wire::TOK_PAR)?;
+            read::token(reader.inner, &wire::TOK_PAR)?;
+            reader.status.ready_parent();
         }
 
         Ok(Self {
@@ -121,9 +154,12 @@ impl FileReader<'_, '_> {
             return Ok(&[]);
         }
 
-        let mut buf = self.reader.fill_buf()?;
+        self.reader.check_correct();
+
+        let mut buf = try_or_poison!(self.reader, self.reader.inner.fill_buf());
 
         if buf.is_empty() {
+            self.reader.status.poison();
             return Err(UnexpectedEof.into());
         }
 
@@ -141,12 +177,14 @@ impl FileReader<'_, '_> {
             return Ok(());
         }
 
+        self.reader.check_correct();
+
         self.len = self
             .len
             .checked_sub(n as u64)
             .expect("consumed bytes past EOF");
 
-        self.reader.consume(n);
+        self.reader.inner.consume(n);
 
         if self.is_empty() {
             self.finish()?;
@@ -159,7 +197,7 @@ impl FileReader<'_, '_> {
     pub fn copy(&mut self, mut dst: impl Write) -> io::Result<()> {
         while !self.is_empty() {
             let buf = self.fill_buf()?;
-            let n = dst.write(buf)?;
+            let n = try_or_poison!(self.reader, dst.write(buf));
             self.consume(n)?;
         }
 
@@ -173,14 +211,17 @@ impl Read for FileReader<'_, '_> {
             return Ok(0);
         }
 
+        self.reader.check_correct();
+
         if buf.len() as u64 > self.len {
             buf = &mut buf[..self.len as usize];
         }
 
-        let n = self.reader.read(buf)?;
+        let n = try_or_poison!(self.reader, self.reader.inner.read(buf));
         self.len -= n as u64;
 
         if n == 0 {
+            self.reader.status.poison();
             return Err(UnexpectedEof.into());
         }
 
@@ -200,36 +241,42 @@ impl FileReader<'_, '_> {
 
         if pad != 0 {
             let mut buf = [0; 8];
-            self.reader.read_exact(&mut buf[pad..])?;
+            try_or_poison!(self.reader, self.reader.inner.read_exact(&mut buf[pad..]));
 
             if buf != [0; 8] {
+                self.reader.status.poison();
                 return Err(InvalidData.into());
             }
         }
 
-        read::token(self.reader, &wire::TOK_PAR)
+        try_or_poison!(self.reader, read::token(self.reader.inner, &wire::TOK_PAR));
+
+        // Done with reading this file, allow going back up the chain of readers
+        self.reader.status.ready_parent();
+
+        Ok(())
     }
 }
 
 /// A directory iterator, yielding a sequence of [Node]s.
 /// It must be fully consumed before reading further from the [DirReader] that produced it, if any.
 pub struct DirReader<'a, 'r> {
-    reader: &'a mut Reader<'r>,
+    reader: ArchiveReader<'a, 'r>,
     /// Previous directory entry name.
     /// We have to hang onto this to enforce name monotonicity.
-    prev_name: Option<Vec<u8>>,
+    prev_name: Vec<u8>,
 }
 
 pub struct Entry<'a, 'r> {
-    pub name: Vec<u8>,
+    pub name: &'a [u8],
     pub node: Node<'a, 'r>,
 }
 
 impl<'a, 'r> DirReader<'a, 'r> {
-    fn new(reader: &'a mut Reader<'r>) -> Self {
+    fn new(reader: ArchiveReader<'a, 'r>) -> Self {
         Self {
             reader,
-            prev_name: None,
+            prev_name: vec![],
         }
     }
 
@@ -242,23 +289,28 @@ impl<'a, 'r> DirReader<'a, 'r> {
     ///  * You must abandon the entire archive reader on the first error.
     ///  * You must abandon the directory reader upon the first [None].
     ///  * Even if you know the amount of elements up front, you must keep reading until you encounter [None].
-    ///
-    /// TODO(edef): enforce these in `#[cfg(debug_assertions)]`
     #[allow(clippy::should_implement_trait)]
-    pub fn next(&mut self) -> io::Result<Option<Entry>> {
+    pub fn next(&mut self) -> io::Result<Option<Entry<'_, 'r>>> {
+        self.reader.check_correct();
+
         // COME FROM the previous iteration: if we've already read an entry,
         // read its terminating TOK_PAR here.
-        if self.prev_name.is_some() {
-            read::token(self.reader, &wire::TOK_PAR)?;
+        if !self.prev_name.is_empty() {
+            try_or_poison!(self.reader, read::token(self.reader.inner, &wire::TOK_PAR));
         }
 
         // Determine if there are more entries to follow
-        if let wire::Entry::None = read::tag(self.reader)? {
+        if let wire::Entry::None = try_or_poison!(self.reader, read::tag(self.reader.inner)) {
             // We've reached the end of this directory.
+            self.reader.status.ready_parent();
             return Ok(None);
         }
 
-        let name = read::bytes(self.reader, wire::MAX_NAME_LEN)?;
+        let mut name = [0; wire::MAX_NAME_LEN + 1];
+        let name = try_or_poison!(
+            self.reader,
+            read::bytes_buf(self.reader.inner, &mut name, wire::MAX_NAME_LEN)
+        );
 
         if name.is_empty()
             || name.contains(&0)
@@ -266,28 +318,160 @@ impl<'a, 'r> DirReader<'a, 'r> {
             || name == b"."
             || name == b".."
         {
+            self.reader.status.poison();
             return Err(InvalidData.into());
         }
 
         // Enforce strict monotonicity of directory entry names.
-        match &mut self.prev_name {
-            None => {
-                self.prev_name = Some(name.clone());
+        if &self.prev_name[..] >= name {
+            self.reader.status.poison();
+            return Err(InvalidData.into());
+        }
+
+        self.prev_name.clear();
+        self.prev_name.extend_from_slice(name);
+
+        try_or_poison!(self.reader, read::token(self.reader.inner, &wire::TOK_NOD));
+
+        Ok(Some(Entry {
+            name: &self.prev_name,
+            // Don't need to worry about poisoning here: Node::new will do it for us if needed
+            node: Node::new(self.reader.child())?,
+        }))
+    }
+}
+
+/// We use a stack of statuses to:
+///   * Share poisoned state across all objects from the same underlying reader,
+///     so we can check they are abandoned when an error occurs
+///   * Make sure only the most recently created object is read from, and is fully exhausted
+///     before anything it was created from is used again.
+enum ArchiveReaderStatus<'a> {
+    #[cfg(not(debug_assertions))]
+    None(PhantomData<&'a ()>),
+    #[cfg(debug_assertions)]
+    StackTop { poisoned: bool, ready: bool },
+    #[cfg(debug_assertions)]
+    StackChild {
+        poisoned: &'a mut bool,
+        parent_ready: &'a mut bool,
+        ready: bool,
+    },
+}
+
+impl ArchiveReaderStatus<'_> {
+    fn top() -> Self {
+        #[cfg(debug_assertions)]
+        {
+            ArchiveReaderStatus::StackTop {
+                poisoned: false,
+                ready: true,
             }
-            Some(prev_name) => {
-                if *prev_name >= name {
-                    return Err(InvalidData.into());
-                }
+        }
 
-                name[..].clone_into(prev_name);
+        #[cfg(not(debug_assertions))]
+        ArchiveReaderStatus::None(PhantomData)
+    }
+
+    /// Poison all the objects sharing the same reader, to be used when an error occurs
+    fn poison(&mut self) {
+        match self {
+            #[cfg(not(debug_assertions))]
+            ArchiveReaderStatus::None(_) => {}
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackTop { poisoned: x, .. } => *x = true,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackChild { poisoned: x, .. } => **x = true,
+        }
+    }
+
+    /// Mark the parent as ready, allowing it to be used again and preventing this reference to the reader being used again.
+    fn ready_parent(&mut self) {
+        match self {
+            #[cfg(not(debug_assertions))]
+            ArchiveReaderStatus::None(_) => {}
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackTop { ready, .. } => {
+                *ready = false;
+            }
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackChild {
+                ready,
+                parent_ready,
+                ..
+            } => {
+                *ready = false;
+                **parent_ready = true;
             }
+        };
+    }
+
+    fn poisoned(&self) -> bool {
+        match self {
+            #[cfg(not(debug_assertions))]
+            ArchiveReaderStatus::None(_) => false,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackTop { poisoned, .. } => *poisoned,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackChild { poisoned, .. } => **poisoned,
         }
+    }
 
-        read::token(self.reader, &wire::TOK_NOD)?;
+    fn ready(&self) -> bool {
+        match self {
+            #[cfg(not(debug_assertions))]
+            ArchiveReaderStatus::None(_) => true,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackTop { ready, .. } => *ready,
+            #[cfg(debug_assertions)]
+            ArchiveReaderStatus::StackChild { ready, .. } => *ready,
+        }
+    }
+}
 
-        Ok(Some(Entry {
-            name,
-            node: Node::new(&mut self.reader)?,
-        }))
+impl<'a, 'r> ArchiveReader<'a, 'r> {
+    /// Create a new child reader from this one.
+    /// In debug mode, this reader will panic if called before the new child is exhausted / calls `ready_parent`
+    fn child(&mut self) -> ArchiveReader<'_, 'r> {
+        ArchiveReader {
+            inner: self.inner,
+            #[cfg(not(debug_assertions))]
+            status: ArchiveReaderStatus::None(PhantomData),
+            #[cfg(debug_assertions)]
+            status: match &mut self.status {
+                ArchiveReaderStatus::StackTop { poisoned, ready } => {
+                    *ready = false;
+                    ArchiveReaderStatus::StackChild {
+                        poisoned,
+                        parent_ready: ready,
+                        ready: true,
+                    }
+                }
+                ArchiveReaderStatus::StackChild {
+                    poisoned, ready, ..
+                } => {
+                    *ready = false;
+                    ArchiveReaderStatus::StackChild {
+                        poisoned,
+                        parent_ready: ready,
+                        ready: true,
+                    }
+                }
+            },
+        }
+    }
+
+    /// Check the reader is in the correct status.
+    /// Only does anything when debug assertions are on.
+    #[inline(always)]
+    fn check_correct(&self) {
+        assert!(
+            !self.status.poisoned(),
+            "Archive reader used after it was meant to be abandoned!"
+        );
+        assert!(
+            self.status.ready(),
+            "Non-ready archive reader used! (Should've been reading from something else)"
+        );
     }
 }
diff --git a/tvix/nix-compat/src/nar/reader/read.rs b/tvix/nix-compat/src/nar/reader/read.rs
index 1ce1613764..9938581f2a 100644
--- a/tvix/nix-compat/src/nar/reader/read.rs
+++ b/tvix/nix-compat/src/nar/reader/read.rs
@@ -15,6 +15,38 @@ pub fn u64(reader: &mut Reader) -> io::Result<u64> {
     Ok(u64::from_le_bytes(buf))
 }
 
+/// Consume a byte string from the reader into a provided buffer,
+/// returning the data bytes.
+pub fn bytes_buf<'a, const N: usize>(
+    reader: &mut Reader,
+    buf: &'a mut [u8; N],
+    max_len: usize,
+) -> io::Result<&'a [u8]> {
+    assert_eq!(N % 8, 0);
+    assert!(max_len <= N);
+
+    // read the length, and reject excessively large values
+    let len = self::u64(reader)?;
+    if len > max_len as u64 {
+        return Err(InvalidData.into());
+    }
+    // we know the length fits in a usize now
+    let len = len as usize;
+
+    // read the data and padding into a buffer
+    let buf_len = (len + 7) & !7;
+    reader.read_exact(&mut buf[..buf_len])?;
+
+    // verify that the padding is all zeroes
+    for &b in &buf[len..buf_len] {
+        if b != 0 {
+            return Err(InvalidData.into());
+        }
+    }
+
+    Ok(&buf[..len])
+}
+
 /// Consume a byte string of up to `max_len` bytes from the reader.
 pub fn bytes(reader: &mut Reader, max_len: usize) -> io::Result<Vec<u8>> {
     assert!(max_len <= isize::MAX as usize);
diff --git a/tvix/nix-compat/src/nar/reader/test.rs b/tvix/nix-compat/src/nar/reader/test.rs
index fd0d6a9f5a..63e4fb289f 100644
--- a/tvix/nix-compat/src/nar/reader/test.rs
+++ b/tvix/nix-compat/src/nar/reader/test.rs
@@ -46,75 +46,233 @@ fn complicated() {
     match node {
         nar::reader::Node::Directory(mut dir_reader) => {
             // first entry is .keep, an empty regular file.
-            let entry = dir_reader
-                .next()
-                .expect("next must succeed")
-                .expect("must be some");
-
-            assert_eq!(&b".keep"[..], &entry.name);
-
-            match entry.node {
-                nar::reader::Node::File {
-                    executable,
-                    mut reader,
-                } => {
-                    assert!(!executable);
-                    assert_eq!(reader.read(&mut [0]).unwrap(), 0);
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            );
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(b"keep", entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(mut subdir_reader) => {
+                        {
+                            // first entry is .keep, an empty regular file.
+                            let entry = subdir_reader
+                                .next()
+                                .expect("next must succeed")
+                                .expect("must be some");
+
+                            must_read_file(".keep", entry);
+                        }
+
+                        // we must read the None
+                        assert!(
+                            subdir_reader.next().expect("next must succeed").is_none(),
+                            "keep directory contains only .keep"
+                        );
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
                 }
-                _ => panic!("unexpected type for .keep"),
-            }
+            };
+
+            // reading more entries yields None (and we actually must read until this)
+            assert!(dir_reader.next().expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[test]
+#[should_panic]
+fn file_read_abandoned() {
+    let mut f = std::io::Cursor::new(include_bytes!("../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            {
+                let entry = dir_reader
+                    .next()
+                    .expect("next must succeed")
+                    .expect("must be some");
+
+                assert_eq!(b".keep", entry.name);
+                // don't bother to finish reading it.
+            };
+
+            // this should panic (not return an error), because we are meant to abandon the archive reader now.
+            assert!(dir_reader.next().expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[test]
+#[should_panic]
+fn dir_read_abandoned() {
+    let mut f = std::io::Cursor::new(include_bytes!("../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            );
 
             // second entry is aa, a symlink to /nix/store/somewhereelse
-            let entry = dir_reader
-                .next()
-                .expect("next must be some")
-                .expect("must be some");
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
 
-            assert_eq!(&b"aa"[..], &entry.name);
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some");
 
-            match entry.node {
-                nar::reader::Node::Symlink { target } => {
-                    assert_eq!(&b"/nix/store/somewhereelse"[..], &target);
+                assert_eq!(b"keep", entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(_) => {
+                        // don't finish using it, which poisons the archive reader
+                    }
+                    _ => panic!("unexpected type for keep/.keep"),
                 }
-                _ => panic!("unexpected type for aa"),
-            }
-
-            // third entry is a directory called "keep"
-            let entry = dir_reader
-                .next()
-                .expect("next must be some")
-                .expect("must be some");
-
-            assert_eq!(&b"keep"[..], &entry.name);
-
-            match entry.node {
-                nar::reader::Node::Directory(mut subdir_reader) => {
-                    // first entry is .keep, an empty regular file.
-                    let entry = subdir_reader
-                        .next()
-                        .expect("next must succeed")
-                        .expect("must be some");
-
-                    // … it contains a single .keep, an empty regular file.
-                    assert_eq!(&b".keep"[..], &entry.name);
-
-                    match entry.node {
-                        nar::reader::Node::File {
-                            executable,
-                            mut reader,
-                        } => {
-                            assert!(!executable);
-                            assert_eq!(reader.read(&mut [0]).unwrap(), 0);
-                        }
-                        _ => panic!("unexpected type for keep/.keep"),
+            };
+
+            // this should panic, because we didn't finish reading the child subdirectory
+            assert!(dir_reader.next().expect("must succeed").is_none());
+        }
+        _ => panic!("unexpected type"),
+    }
+}
+
+#[test]
+#[should_panic]
+fn dir_read_after_none() {
+    let mut f = std::io::Cursor::new(include_bytes!("../tests/complicated.nar"));
+    let node = nar::reader::open(&mut f).unwrap();
+
+    match node {
+        nar::reader::Node::Directory(mut dir_reader) => {
+            // first entry is .keep, an empty regular file.
+            must_read_file(
+                ".keep",
+                dir_reader
+                    .next()
+                    .expect("next must succeed")
+                    .expect("must be some"),
+            );
+
+            // second entry is aa, a symlink to /nix/store/somewhereelse
+            must_be_symlink(
+                "aa",
+                "/nix/store/somewhereelse",
+                dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some"),
+            );
+
+            {
+                // third entry is a directory called "keep"
+                let entry = dir_reader
+                    .next()
+                    .expect("next must be some")
+                    .expect("must be some");
+
+                assert_eq!(b"keep", entry.name);
+
+                match entry.node {
+                    nar::reader::Node::Directory(mut subdir_reader) => {
+                        // first entry is .keep, an empty regular file.
+                        must_read_file(
+                            ".keep",
+                            subdir_reader
+                                .next()
+                                .expect("next must succeed")
+                                .expect("must be some"),
+                        );
+
+                        // we must read the None
+                        assert!(
+                            subdir_reader.next().expect("next must succeed").is_none(),
+                            "keep directory contains only .keep"
+                        );
                     }
+                    _ => panic!("unexpected type for keep/.keep"),
                 }
-                _ => panic!("unexpected type for keep/.keep"),
-            }
+            };
 
             // reading more entries yields None (and we actually must read until this)
             assert!(dir_reader.next().expect("must succeed").is_none());
+
+            // this should panic, because we already got a none so we're meant to stop.
+            dir_reader.next().unwrap();
+            unreachable!()
         }
         _ => panic!("unexpected type"),
     }
 }
+
+fn must_read_file(name: &'static str, entry: nar::reader::Entry<'_, '_>) {
+    assert_eq!(name.as_bytes(), entry.name);
+
+    match entry.node {
+        nar::reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            assert!(!executable);
+            assert_eq!(reader.read(&mut [0]).unwrap(), 0);
+        }
+        _ => panic!("unexpected type for {}", name),
+    }
+}
+
+fn must_be_symlink(
+    name: &'static str,
+    exp_target: &'static str,
+    entry: nar::reader::Entry<'_, '_>,
+) {
+    assert_eq!(name.as_bytes(), entry.name);
+
+    match entry.node {
+        nar::reader::Node::Symlink { target } => {
+            assert_eq!(exp_target.as_bytes(), &target);
+        }
+        _ => panic!("unexpected type for {}", name),
+    }
+}
diff --git a/tvix/nix-compat/src/nar/wire/mod.rs b/tvix/nix-compat/src/nar/wire/mod.rs
index b9e0212495..9e99b530ce 100644
--- a/tvix/nix-compat/src/nar/wire/mod.rs
+++ b/tvix/nix-compat/src/nar/wire/mod.rs
@@ -90,6 +90,23 @@ pub const TOK_DIR: [u8; 24] = *b"\x09\0\0\0\0\0\0\0directory\0\0\0\0\0\0\0";
 pub const TOK_ENT: [u8; 48] = *b"\x05\0\0\0\0\0\0\0entry\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0name\0\0\0\0";
 pub const TOK_NOD: [u8; 48] = *b"\x04\0\0\0\0\0\0\0node\0\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0type\0\0\0\0";
 pub const TOK_PAR: [u8; 16] = *b"\x01\0\0\0\0\0\0\0)\0\0\0\0\0\0\0";
+#[cfg(feature = "async")]
+const TOK_PAD_PAR: [u8; 24] = *b"\0\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0)\0\0\0\0\0\0\0";
+
+#[cfg(feature = "async")]
+#[derive(Debug)]
+pub(crate) enum PadPar {}
+
+#[cfg(feature = "async")]
+impl crate::wire::reader::Tag for PadPar {
+    const PATTERN: &'static [u8] = &TOK_PAD_PAR;
+
+    type Buf = [u8; 24];
+
+    fn make_buf() -> Self::Buf {
+        [0; 24]
+    }
+}
 
 #[test]
 fn tokens() {
diff --git a/tvix/nix-compat/src/nar/wire/tag.rs b/tvix/nix-compat/src/nar/wire/tag.rs
index 55b93f9985..4982a0d707 100644
--- a/tvix/nix-compat/src/nar/wire/tag.rs
+++ b/tvix/nix-compat/src/nar/wire/tag.rs
@@ -10,6 +10,7 @@ pub trait Tag: Sized {
     const MIN: usize;
 
     /// Minimal suitably sized buffer for reading the wire representation
+    ///
     /// HACK: This is a workaround for const generics limitations.
     type Buf: AsMut<[u8]> + Send;
 
diff --git a/tvix/nix-compat/src/nix_daemon/worker_protocol.rs b/tvix/nix-compat/src/nix_daemon/worker_protocol.rs
index 58a48d1bdd..7e3adc0db2 100644
--- a/tvix/nix-compat/src/nix_daemon/worker_protocol.rs
+++ b/tvix/nix-compat/src/nix_daemon/worker_protocol.rs
@@ -15,13 +15,34 @@ static WORKER_MAGIC_1: u64 = 0x6e697863; // "nixc"
 static WORKER_MAGIC_2: u64 = 0x6478696f; // "dxio"
 pub static STDERR_LAST: u64 = 0x616c7473; // "alts"
 
+/// | Nix version     | Protocol |
+/// |-----------------|----------|
+/// | 0.11            | 1.02     |
+/// | 0.12            | 1.04     |
+/// | 0.13            | 1.05     |
+/// | 0.14            | 1.05     |
+/// | 0.15            | 1.05     |
+/// | 0.16            | 1.06     |
+/// | 1.0             | 1.10     |
+/// | 1.1             | 1.11     |
+/// | 1.2             | 1.12     |
+/// | 1.3 - 1.5.3     | 1.13     |
+/// | 1.6 - 1.10      | 1.14     |
+/// | 1.11 - 1.11.16  | 1.15     |
+/// | 2.0 - 2.0.4     | 1.20     |
+/// | 2.1 - 2.3.18    | 1.21     |
+/// | 2.4 - 2.6.1     | 1.32     |
+/// | 2.7.0           | 1.33     |
+/// | 2.8.0 - 2.14.1  | 1.34     |
+/// | 2.15.0 - 2.19.4 | 1.35     |
+/// | 2.20.0 - 2.22.0 | 1.37     |
 static PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::from_parts(1, 37);
 
 /// Max length of a Nix setting name/value. In bytes.
 ///
 /// This value has been arbitrarily choosen after looking the nix.conf
 /// manpage. Don't hesitate to increase it if it's too limiting.
-pub static MAX_SETTING_SIZE: u64 = 1024;
+pub static MAX_SETTING_SIZE: usize = 1024;
 
 /// Worker Operation
 ///
@@ -131,30 +152,30 @@ pub async fn read_client_settings<R: AsyncReadExt + Unpin>(
     r: &mut R,
     client_version: ProtocolVersion,
 ) -> std::io::Result<ClientSettings> {
-    let keep_failed = wire::read_bool(r).await?;
-    let keep_going = wire::read_bool(r).await?;
-    let try_fallback = wire::read_bool(r).await?;
-    let verbosity_uint = wire::read_u64(r).await?;
+    let keep_failed = r.read_u64_le().await? != 0;
+    let keep_going = r.read_u64_le().await? != 0;
+    let try_fallback = r.read_u64_le().await? != 0;
+    let verbosity_uint = r.read_u64_le().await?;
     let verbosity = Verbosity::from_u64(verbosity_uint).ok_or_else(|| {
         Error::new(
             ErrorKind::InvalidData,
             format!("Can't convert integer {} to verbosity", verbosity_uint),
         )
     })?;
-    let max_build_jobs = wire::read_u64(r).await?;
-    let max_silent_time = wire::read_u64(r).await?;
-    _ = wire::read_u64(r).await?; // obsolete useBuildHook
-    let verbose_build = wire::read_bool(r).await?;
-    _ = wire::read_u64(r).await?; // obsolete logType
-    _ = wire::read_u64(r).await?; // obsolete printBuildTrace
-    let build_cores = wire::read_u64(r).await?;
-    let use_substitutes = wire::read_bool(r).await?;
+    let max_build_jobs = r.read_u64_le().await?;
+    let max_silent_time = r.read_u64_le().await?;
+    _ = r.read_u64_le().await?; // obsolete useBuildHook
+    let verbose_build = r.read_u64_le().await? != 0;
+    _ = r.read_u64_le().await?; // obsolete logType
+    _ = r.read_u64_le().await?; // obsolete printBuildTrace
+    let build_cores = r.read_u64_le().await?;
+    let use_substitutes = r.read_u64_le().await? != 0;
     let mut overrides = HashMap::new();
     if client_version.minor() >= 12 {
-        let num_overrides = wire::read_u64(r).await?;
+        let num_overrides = r.read_u64_le().await?;
         for _ in 0..num_overrides {
-            let name = wire::read_string(r, 0..MAX_SETTING_SIZE).await?;
-            let value = wire::read_string(r, 0..MAX_SETTING_SIZE).await?;
+            let name = wire::read_string(r, 0..=MAX_SETTING_SIZE).await?;
+            let value = wire::read_string(r, 0..=MAX_SETTING_SIZE).await?;
             overrides.insert(name, value);
         }
     }
@@ -197,17 +218,17 @@ pub async fn server_handshake_client<'a, RW: 'a>(
 where
     &'a mut RW: AsyncReadExt + AsyncWriteExt + Unpin,
 {
-    let worker_magic_1 = wire::read_u64(&mut conn).await?;
+    let worker_magic_1 = conn.read_u64_le().await?;
     if worker_magic_1 != WORKER_MAGIC_1 {
         Err(std::io::Error::new(
             ErrorKind::InvalidData,
             format!("Incorrect worker magic number received: {}", worker_magic_1),
         ))
     } else {
-        wire::write_u64(&mut conn, WORKER_MAGIC_2).await?;
-        wire::write_u64(&mut conn, PROTOCOL_VERSION.into()).await?;
+        conn.write_u64_le(WORKER_MAGIC_2).await?;
+        conn.write_u64_le(PROTOCOL_VERSION.into()).await?;
         conn.flush().await?;
-        let client_version = wire::read_u64(&mut conn).await?;
+        let client_version = conn.read_u64_le().await?;
         // Parse into ProtocolVersion.
         let client_version: ProtocolVersion = client_version
             .try_into()
@@ -220,14 +241,14 @@ where
         }
         if client_version.minor() >= 14 {
             // Obsolete CPU affinity.
-            let read_affinity = wire::read_u64(&mut conn).await?;
+            let read_affinity = conn.read_u64_le().await?;
             if read_affinity != 0 {
-                let _cpu_affinity = wire::read_u64(&mut conn).await?;
+                let _cpu_affinity = conn.read_u64_le().await?;
             };
         }
         if client_version.minor() >= 11 {
             // Obsolete reserveSpace
-            let _reserve_space = wire::read_u64(&mut conn).await?;
+            let _reserve_space = conn.read_u64_le().await?;
         }
         if client_version.minor() >= 33 {
             // Nix version. We're plain lying, we're not Nix, but eh…
@@ -245,7 +266,7 @@ where
 
 /// Read a worker [Operation] from the wire.
 pub async fn read_op<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<Operation> {
-    let op_number = wire::read_u64(r).await?;
+    let op_number = r.read_u64_le().await?;
     Operation::from_u64(op_number).ok_or(Error::new(
         ErrorKind::InvalidData,
         format!("Invalid OP number {}", op_number),
@@ -278,8 +299,8 @@ where
     W: AsyncReadExt + AsyncWriteExt + Unpin,
 {
     match t {
-        Trust::Trusted => wire::write_u64(conn, 1).await,
-        Trust::NotTrusted => wire::write_u64(conn, 2).await,
+        Trust::Trusted => conn.write_u64_le(1).await,
+        Trust::NotTrusted => conn.write_u64_le(2).await,
     }
 }
 
diff --git a/tvix/nix-compat/src/store_path/mod.rs b/tvix/nix-compat/src/store_path/mod.rs
index ac9f1805e3..707c41a92d 100644
--- a/tvix/nix-compat/src/store_path/mod.rs
+++ b/tvix/nix-compat/src/store_path/mod.rs
@@ -56,7 +56,7 @@ pub enum Error {
 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
 pub struct StorePath {
     digest: [u8; DIGEST_SIZE],
-    name: String,
+    name: Box<str>,
 }
 
 impl StorePath {
@@ -65,7 +65,7 @@ impl StorePath {
     }
 
     pub fn name(&self) -> &str {
-        self.name.as_ref()
+        &self.name
     }
 
     pub fn as_ref(&self) -> StorePathRef<'_> {
@@ -176,10 +176,7 @@ pub struct StorePathRef<'a> {
 
 impl<'a> From<&'a StorePath> for StorePathRef<'a> {
     fn from(&StorePath { digest, ref name }: &'a StorePath) -> Self {
-        StorePathRef {
-            digest,
-            name: name.as_ref(),
-        }
+        StorePathRef { digest, name }
     }
 }
 
@@ -209,7 +206,7 @@ impl<'a> StorePathRef<'a> {
     pub fn to_owned(&self) -> StorePath {
         StorePath {
             digest: self.digest,
-            name: self.name.to_owned(),
+            name: self.name.into(),
         }
     }
 
@@ -303,8 +300,7 @@ impl Serialize for StorePathRef<'_> {
     }
 }
 
-/// NAME_CHARS contains `true` for bytes that are valid in store path names,
-/// not accounting for '.' being permitted only past the first character.
+/// NAME_CHARS contains `true` for bytes that are valid in store path names.
 static NAME_CHARS: [bool; 256] = {
     let mut tbl = [false; 256];
     let mut c = 0;
@@ -332,10 +328,6 @@ pub(crate) fn validate_name(s: &(impl AsRef<[u8]> + ?Sized)) -> Result<&str, Err
         return Err(Error::InvalidLength);
     }
 
-    if s[0] == b'.' {
-        return Err(Error::InvalidName(s.to_vec(), 0));
-    }
-
     let mut valid = true;
     for &c in s {
         valid = valid && NAME_CHARS[c as usize];
@@ -399,7 +391,7 @@ mod tests {
 
         let expected_digest: [u8; DIGEST_SIZE] = hex!("8a12321522fd91efbd60ebb2481af88580f61600");
 
-        assert_eq!("net-tools-1.60_p20170221182432", nixpath.name);
+        assert_eq!("net-tools-1.60_p20170221182432", nixpath.name());
         assert_eq!(nixpath.digest, expected_digest);
 
         assert_eq!(example_nix_path_str, nixpath.to_string())
@@ -446,15 +438,18 @@ mod tests {
         }
     }
 
-    /// This is the store path rejected when `nix-store --add`'ing an
+    /// This is the store path *accepted* when `nix-store --add`'ing an
     /// empty `.gitignore` file.
     ///
-    /// Nix 2.4 accidentally dropped this behaviour, but this is considered a bug.
-    /// See https://github.com/NixOS/nix/pull/9095.
+    /// Nix 2.4 accidentally permitted this behaviour, but the revert came
+    /// too late to beat Hyrum's law. It is now considered permissible.
+    ///
+    /// https://github.com/NixOS/nix/pull/9095 (revert)
+    /// https://github.com/NixOS/nix/pull/9867 (revert-of-revert)
     #[test]
     fn starts_with_dot() {
         StorePath::from_bytes(b"fli4bwscgna7lpm7v5xgnjxrxh0yc7ra-.gitignore")
-            .expect_err("must fail");
+            .expect("must succeed");
     }
 
     #[test]
diff --git a/tvix/nix-compat/src/wire/bytes/mod.rs b/tvix/nix-compat/src/wire/bytes/mod.rs
index 9ec8b3fa04..2ed071e379 100644
--- a/tvix/nix-compat/src/wire/bytes/mod.rs
+++ b/tvix/nix-compat/src/wire/bytes/mod.rs
@@ -1,23 +1,21 @@
 use std::{
     io::{Error, ErrorKind},
-    ops::RangeBounds,
+    mem::MaybeUninit,
+    ops::RangeInclusive,
 };
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::io::{self, AsyncReadExt, AsyncWriteExt, ReadBuf};
 
-mod reader;
+pub(crate) mod reader;
 pub use reader::BytesReader;
 mod writer;
 pub use writer::BytesWriter;
 
-use super::primitive;
-
 /// 8 null bytes, used to write out padding.
 const EMPTY_BYTES: &[u8; 8] = &[0u8; 8];
 
 /// The length of the size field, in bytes is always 8.
 const LEN_SIZE: usize = 8;
 
-#[allow(dead_code)]
 /// Read a "bytes wire packet" from the AsyncRead.
 /// Rejects reading more than `allowed_size` bytes of payload.
 ///
@@ -35,24 +33,29 @@ const LEN_SIZE: usize = 8;
 ///
 /// This buffers the entire payload into memory,
 /// a streaming version is available at [crate::wire::bytes::BytesReader].
-pub async fn read_bytes<R, S>(r: &mut R, allowed_size: S) -> std::io::Result<Vec<u8>>
+pub async fn read_bytes<R: ?Sized>(
+    r: &mut R,
+    allowed_size: RangeInclusive<usize>,
+) -> io::Result<Vec<u8>>
 where
     R: AsyncReadExt + Unpin,
-    S: RangeBounds<u64>,
 {
     // read the length field
-    let len = primitive::read_u64(r).await?;
-
-    if !allowed_size.contains(&len) {
-        return Err(std::io::Error::new(
-            std::io::ErrorKind::InvalidData,
-            "signalled package size not in allowed range",
-        ));
-    }
+    let len = r.read_u64_le().await?;
+    let len: usize = len
+        .try_into()
+        .ok()
+        .filter(|len| allowed_size.contains(len))
+        .ok_or_else(|| {
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "signalled package size not in allowed range",
+            )
+        })?;
 
     // calculate the total length, including padding.
     // byte packets are padded to 8 byte blocks each.
-    let padded_len = padding_len(len) as u64 + (len as u64);
+    let padded_len = padding_len(len as u64) as u64 + (len as u64);
     let mut limited_reader = r.take(padded_len);
 
     let mut buf = Vec::new();
@@ -61,34 +64,87 @@ where
 
     // make sure we got exactly the number of bytes, and not less.
     if s as u64 != padded_len {
-        return Err(std::io::Error::new(
-            std::io::ErrorKind::InvalidData,
-            "got less bytes than expected",
-        ));
+        return Err(io::ErrorKind::UnexpectedEof.into());
     }
 
-    let (_content, padding) = buf.split_at(len as usize);
+    let (_content, padding) = buf.split_at(len);
 
     // ensure the padding is all zeroes.
-    if !padding.iter().all(|e| *e == b'\0') {
-        return Err(std::io::Error::new(
-            std::io::ErrorKind::InvalidData,
+    if padding.iter().any(|&b| b != 0) {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidData,
             "padding is not all zeroes",
         ));
     }
 
     // return the data without the padding
-    buf.truncate(len as usize);
+    buf.truncate(len);
     Ok(buf)
 }
 
+pub(crate) async fn read_bytes_buf<'a, const N: usize, R: ?Sized>(
+    reader: &mut R,
+    buf: &'a mut [MaybeUninit<u8>; N],
+    allowed_size: RangeInclusive<usize>,
+) -> io::Result<&'a [u8]>
+where
+    R: AsyncReadExt + Unpin,
+{
+    assert_eq!(N % 8, 0);
+    assert!(*allowed_size.end() <= N);
+
+    let len = reader.read_u64_le().await?;
+    let len: usize = len
+        .try_into()
+        .ok()
+        .filter(|len| allowed_size.contains(len))
+        .ok_or_else(|| {
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "signalled package size not in allowed range",
+            )
+        })?;
+
+    let buf_len = (len + 7) & !7;
+    let buf = {
+        let mut read_buf = ReadBuf::uninit(&mut buf[..buf_len]);
+
+        while read_buf.filled().len() < buf_len {
+            reader.read_buf(&mut read_buf).await?;
+        }
+
+        // ReadBuf::filled does not pass the underlying buffer's lifetime through,
+        // so we must make a trip to hell.
+        //
+        // SAFETY: `read_buf` is filled up to `buf_len`, and we verify that it is
+        // still pointing at the same underlying buffer.
+        unsafe {
+            assert_eq!(read_buf.filled().as_ptr(), buf.as_ptr() as *const u8);
+            assume_init_bytes(&buf[..buf_len])
+        }
+    };
+
+    if buf[len..buf_len].iter().any(|&b| b != 0) {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidData,
+            "padding is not all zeroes",
+        ));
+    }
+
+    Ok(&buf[..len])
+}
+
+/// SAFETY: The bytes have to actually be initialized.
+unsafe fn assume_init_bytes(slice: &[MaybeUninit<u8>]) -> &[u8] {
+    &*(slice as *const [MaybeUninit<u8>] as *const [u8])
+}
+
 /// Read a "bytes wire packet" of from the AsyncRead and tries to parse as string.
 /// Internally uses [read_bytes].
 /// Rejects reading more than `allowed_size` bytes of payload.
-pub async fn read_string<R, S>(r: &mut R, allowed_size: S) -> std::io::Result<String>
+pub async fn read_string<R>(r: &mut R, allowed_size: RangeInclusive<usize>) -> io::Result<String>
 where
     R: AsyncReadExt + Unpin,
-    S: RangeBounds<u64>,
 {
     let bytes = read_bytes(r, allowed_size).await?;
     String::from_utf8(bytes).map_err(|e| Error::new(ErrorKind::InvalidData, e))
@@ -106,9 +162,9 @@ where
 pub async fn write_bytes<W: AsyncWriteExt + Unpin, B: AsRef<[u8]>>(
     w: &mut W,
     b: B,
-) -> std::io::Result<()> {
+) -> io::Result<()> {
     // write the size packet.
-    primitive::write_u64(w, b.as_ref().len() as u64).await?;
+    w.write_u64_le(b.as_ref().len() as u64).await?;
 
     // write the payload
     w.write_all(b.as_ref()).await?;
@@ -122,33 +178,10 @@ pub async fn write_bytes<W: AsyncWriteExt + Unpin, B: AsRef<[u8]>>(
 }
 
 /// Computes the number of bytes we should add to len (a length in
-/// bytes) to be alined on 64 bits (8 bytes).
+/// bytes) to be aligned on 64 bits (8 bytes).
 fn padding_len(len: u64) -> u8 {
-    let modulo = len % 8;
-    if modulo == 0 {
-        0
-    } else {
-        8 - modulo as u8
-    }
-}
-
-/// Models the position inside a "bytes wire packet" that the reader or writer
-/// is in.
-/// It can be in three different stages, inside size, payload or padding fields.
-/// The number tracks the number of bytes written inside the specific field.
-/// There shall be no ambiguous states, at the end of a stage we immediately
-/// move to the beginning of the next one:
-/// - Size(LEN_SIZE) must be expressed as Payload(0)
-/// - Payload(self.payload_len) must be expressed as Padding(0)
-/// There's one exception - Size(LEN_SIZE) in the reader represents a failure
-/// state we enter in case the allowed size doesn't match the allowed range.
-///
-/// Padding(padding_len) means we're at the end of the bytes wire packet.
-#[derive(Clone, Debug, PartialEq, Eq)]
-enum BytesPacketPosition {
-    Size(usize),
-    Payload(u64),
-    Padding(usize),
+    let aligned = len.wrapping_add(7) & !7;
+    aligned.wrapping_sub(len) as u8
 }
 
 #[cfg(test)]
@@ -160,7 +193,7 @@ mod tests {
 
     /// The maximum length of bytes packets we're willing to accept in the test
     /// cases.
-    const MAX_LEN: u64 = 1024;
+    const MAX_LEN: usize = 1024;
 
     #[tokio::test]
     async fn test_read_8_bytes() {
@@ -171,10 +204,7 @@ mod tests {
 
         assert_eq!(
             &12345678u64.to_le_bytes(),
-            read_bytes(&mut mock, 0u64..MAX_LEN)
-                .await
-                .unwrap()
-                .as_slice()
+            read_bytes(&mut mock, 0..=MAX_LEN).await.unwrap().as_slice()
         );
     }
 
@@ -187,10 +217,7 @@ mod tests {
 
         assert_eq!(
             hex!("010203040506070809"),
-            read_bytes(&mut mock, 0u64..MAX_LEN)
-                .await
-                .unwrap()
-                .as_slice()
+            read_bytes(&mut mock, 0..=MAX_LEN).await.unwrap().as_slice()
         );
     }
 
@@ -202,10 +229,7 @@ mod tests {
 
         assert_eq!(
             hex!(""),
-            read_bytes(&mut mock, 0u64..MAX_LEN)
-                .await
-                .unwrap()
-                .as_slice()
+            read_bytes(&mut mock, 0..=MAX_LEN).await.unwrap().as_slice()
         );
     }
 
@@ -215,7 +239,7 @@ mod tests {
     async fn test_read_reject_too_large() {
         let mut mock = Builder::new().read(&100u64.to_le_bytes()).build();
 
-        read_bytes(&mut mock, 10..10)
+        read_bytes(&mut mock, 10..=10)
             .await
             .expect_err("expect this to fail");
     }
@@ -251,4 +275,9 @@ mod tests {
             .build();
         assert_ok!(write_bytes(&mut mock, &input).await)
     }
+
+    #[test]
+    fn padding_len_u64_max() {
+        assert_eq!(padding_len(u64::MAX), 1);
+    }
 }
diff --git a/tvix/nix-compat/src/wire/bytes/reader.rs b/tvix/nix-compat/src/wire/bytes/reader.rs
deleted file mode 100644
index 9aea677645..0000000000
--- a/tvix/nix-compat/src/wire/bytes/reader.rs
+++ /dev/null
@@ -1,464 +0,0 @@
-use pin_project_lite::pin_project;
-use std::{
-    ops::RangeBounds,
-    task::{ready, Poll},
-};
-use tokio::io::AsyncRead;
-
-use super::{padding_len, BytesPacketPosition, LEN_SIZE};
-
-pin_project! {
-    /// Reads a "bytes wire packet" from the underlying reader.
-    /// The format is the same as in [crate::wire::bytes::read_bytes],
-    /// however this structure provides a [AsyncRead] interface,
-    /// allowing to not having to pass around the entire payload in memory.
-    ///
-    /// After being constructed with the underlying reader and an allowed size,
-    /// subsequent requests to poll_read will return payload data until the end
-    /// of the packet is reached.
-    ///
-    /// Internally, it will first read over the size packet, filling payload_size,
-    /// ensuring it fits allowed_size, then return payload data.
-    /// It will only signal EOF (returning `Ok(())` without filling the buffer anymore)
-    /// when all padding has been successfully consumed too.
-    ///
-    /// This also means, it's important for a user to always read to the end,
-    /// and not just call read_exact - otherwise it might not skip over the
-    /// padding, and return garbage when reading the next packet.
-    ///
-    /// In case of an error due to size constraints, or in case of not reading
-    /// all the way to the end (and getting a EOF), the underlying reader is no
-    /// longer usable and might return garbage.
-    pub struct BytesReader<R, S>
-    where
-    R: AsyncRead,
-    S: RangeBounds<u64>,
-
-    {
-        #[pin]
-        inner: R,
-
-        allowed_size: S,
-        payload_size: [u8; 8],
-        state: BytesPacketPosition,
-    }
-}
-
-impl<R, S> BytesReader<R, S>
-where
-    R: AsyncRead + Unpin,
-    S: RangeBounds<u64>,
-{
-    /// Constructs a new BytesReader, using the underlying passed reader.
-    pub fn new(r: R, allowed_size: S) -> Self {
-        Self {
-            inner: r,
-            allowed_size,
-            payload_size: [0; 8],
-            state: BytesPacketPosition::Size(0),
-        }
-    }
-}
-/// Returns an error if the passed usize is 0.
-#[inline]
-fn ensure_nonzero_bytes_read(bytes_read: usize) -> Result<usize, std::io::Error> {
-    if bytes_read == 0 {
-        Err(std::io::Error::new(
-            std::io::ErrorKind::UnexpectedEof,
-            "underlying reader returned EOF",
-        ))
-    } else {
-        Ok(bytes_read)
-    }
-}
-
-impl<R, S> AsyncRead for BytesReader<R, S>
-where
-    R: AsyncRead,
-    S: RangeBounds<u64>,
-{
-    fn poll_read(
-        self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &mut tokio::io::ReadBuf<'_>,
-    ) -> Poll<std::io::Result<()>> {
-        let mut this = self.project();
-
-        // Use a loop, so we can deal with (multiple) state transitions.
-        loop {
-            match *this.state {
-                BytesPacketPosition::Size(LEN_SIZE) => {
-                    // used in case an invalid size was signalled.
-                    Err(std::io::Error::new(
-                        std::io::ErrorKind::InvalidData,
-                        "signalled package size not in allowed range",
-                    ))?
-                }
-                BytesPacketPosition::Size(pos) => {
-                    // try to read more of the size field.
-                    // We wrap a ReadBuf around this.payload_size here, and set_filled.
-                    let mut read_buf = tokio::io::ReadBuf::new(this.payload_size);
-                    read_buf.advance(pos);
-                    ready!(this.inner.as_mut().poll_read(cx, &mut read_buf))?;
-
-                    ensure_nonzero_bytes_read(read_buf.filled().len() - pos)?;
-
-                    let total_size_read = read_buf.filled().len();
-                    if total_size_read == LEN_SIZE {
-                        // If the entire payload size was read, parse it
-                        let payload_size = u64::from_le_bytes(*this.payload_size);
-
-                        if !this.allowed_size.contains(&payload_size) {
-                            // If it's not in the allowed
-                            // range, transition to failure mode
-                            // `BytesPacketPosition::Size(LEN_SIZE)`, where only
-                            // an error is returned.
-                            *this.state = BytesPacketPosition::Size(LEN_SIZE)
-                        } else if payload_size == 0 {
-                            // If the payload size is 0, move on to reading padding directly.
-                            *this.state = BytesPacketPosition::Padding(0)
-                        } else {
-                            // Else, transition to reading the payload.
-                            *this.state = BytesPacketPosition::Payload(0)
-                        }
-                    } else {
-                        // If we still need to read more of payload size, update
-                        // our position in the state.
-                        *this.state = BytesPacketPosition::Size(total_size_read)
-                    }
-                }
-                BytesPacketPosition::Payload(pos) => {
-                    let signalled_size = u64::from_le_bytes(*this.payload_size);
-                    // We don't enter this match arm at all if we're expecting empty payload
-                    debug_assert!(signalled_size > 0, "signalled size must be larger than 0");
-
-                    // Read from the underlying reader into buf
-                    // We cap the ReadBuf to the size of the payload, as we
-                    // don't want to leak padding to the caller.
-                    let bytes_read = ensure_nonzero_bytes_read({
-                        // Reducing these two u64 to usize on 32bits is fine - we
-                        // only care about not reading too much, not too less.
-                        let mut limited_buf = buf.take((signalled_size - pos) as usize);
-                        ready!(this.inner.as_mut().poll_read(cx, &mut limited_buf))?;
-                        limited_buf.filled().len()
-                    })?;
-
-                    // SAFETY: we just did populate this, but through limited_buf.
-                    unsafe { buf.assume_init(bytes_read) }
-                    buf.advance(bytes_read);
-
-                    if pos + bytes_read as u64 == signalled_size {
-                        // If we now read all payload, transition to padding
-                        // state.
-                        *this.state = BytesPacketPosition::Padding(0);
-                    } else {
-                        // if we didn't read everything yet, update our position
-                        // in the state.
-                        *this.state = BytesPacketPosition::Payload(pos + bytes_read as u64);
-                    }
-
-                    // We return from poll_read here.
-                    // This is important, as any error (or even Pending) from
-                    // the underlying reader on the next read (be it padding or
-                    // payload) would require us to roll back buf, as generally
-                    // a AsyncRead::poll_read may not advance the buffer in case
-                    // of a nonsuccessful read.
-                    // It can't be misinterpreted as EOF, as we definitely *did*
-                    // write something into buf if we come to here (we pass
-                    // `ensure_nonzero_bytes_read`).
-                    return Ok(()).into();
-                }
-                BytesPacketPosition::Padding(pos) => {
-                    // Consume whatever padding is left, ensuring it's all null
-                    // bytes. Only return `Ready(Ok(()))` once we're past the
-                    // padding (or in cases where polling the inner reader
-                    // returns `Poll::Pending`).
-                    let signalled_size = u64::from_le_bytes(*this.payload_size);
-                    let total_padding_len = padding_len(signalled_size) as usize;
-
-                    let padding_len_remaining = total_padding_len - pos;
-                    if padding_len_remaining != 0 {
-                        // create a buffer only accepting the number of remaining padding bytes.
-                        let mut buf = [0; 8];
-                        let mut padding_buf = tokio::io::ReadBuf::new(&mut buf);
-                        let mut padding_buf = padding_buf.take(padding_len_remaining);
-
-                        // read into padding_buf.
-                        ready!(this.inner.as_mut().poll_read(cx, &mut padding_buf))?;
-                        let bytes_read = ensure_nonzero_bytes_read(padding_buf.filled().len())?;
-
-                        *this.state = BytesPacketPosition::Padding(pos + bytes_read);
-
-                        // ensure the bytes are not null bytes
-                        if !padding_buf.filled().iter().all(|e| *e == b'\0') {
-                            return Err(std::io::Error::new(
-                                std::io::ErrorKind::InvalidData,
-                                "padding is not all zeroes",
-                            ))
-                            .into();
-                        }
-
-                        // if we still have padding to read, run the loop again.
-                        continue;
-                    }
-                    // return EOF
-                    return Ok(()).into();
-                }
-            }
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::time::Duration;
-
-    use crate::wire::bytes::write_bytes;
-    use hex_literal::hex;
-    use lazy_static::lazy_static;
-    use rstest::rstest;
-    use tokio::io::AsyncReadExt;
-    use tokio_test::{assert_err, io::Builder};
-
-    use super::*;
-
-    /// The maximum length of bytes packets we're willing to accept in the test
-    /// cases.
-    const MAX_LEN: u64 = 1024;
-
-    lazy_static! {
-        pub static ref LARGE_PAYLOAD: Vec<u8> = (0..255).collect::<Vec<u8>>().repeat(4 * 1024);
-    }
-
-    /// Helper function, calling the (simpler) write_bytes with the payload.
-    /// We use this to create data we want to read from the wire.
-    async fn produce_packet_bytes(payload: &[u8]) -> Vec<u8> {
-        let mut exp = vec![];
-        write_bytes(&mut exp, payload).await.unwrap();
-        exp
-    }
-
-    /// Read bytes packets of various length, and ensure read_to_end returns the
-    /// expected payload.
-    #[rstest]
-    #[case::empty(&[])] // empty bytes packet
-    #[case::size_1b(&[0xff])] // 1 bytes payload
-    #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding)
-    #[case::size_9b( &hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding)
-    #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet
-    #[tokio::test]
-    async fn read_payload_correct(#[case] payload: &[u8]) {
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await)
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64);
-        let mut buf = Vec::new();
-        r.read_to_end(&mut buf).await.expect("must succeed");
-
-        assert_eq!(payload, &buf[..]);
-    }
-
-    /// Fail if the bytes packet is larger than allowed
-    #[tokio::test]
-    async fn read_bigger_than_allowed_fail() {
-        let payload = LARGE_PAYLOAD.as_slice();
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..2048);
-        let mut buf = Vec::new();
-        assert_err!(r.read_to_end(&mut buf).await);
-    }
-
-    /// Fail if the bytes packet is smaller than allowed
-    #[tokio::test]
-    async fn read_smaller_than_allowed_fail() {
-        let payload = &[0x00, 0x01, 0x02];
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, 1024..2048);
-        let mut buf = Vec::new();
-        assert_err!(r.read_to_end(&mut buf).await);
-    }
-
-    /// Fail if the padding is not all zeroes
-    #[tokio::test]
-    async fn read_fail_if_nonzero_padding() {
-        let payload = &[0x00, 0x01, 0x02];
-        let mut packet_bytes = produce_packet_bytes(payload).await;
-        // Flip some bits in the padding
-        packet_bytes[12] = 0xff;
-        let mut mock = Builder::new().read(&packet_bytes).build(); // We stop reading after the faulty bit
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = Vec::new();
-
-        r.read_to_end(&mut buf).await.expect_err("must fail");
-    }
-
-    /// Start a 9 bytes payload packet, but have the underlying reader return
-    /// EOF in the middle of the size packet (after 4 bytes).
-    /// We should get an unexpected EOF error, already when trying to read the
-    /// first byte (of payload)
-    #[tokio::test]
-    async fn read_9b_eof_during_size() {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..4])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = [0u8; 1];
-
-        assert_eq!(
-            r.read_exact(&mut buf).await.expect_err("must fail").kind(),
-            std::io::ErrorKind::UnexpectedEof
-        );
-
-        assert_eq!(&[0], &buf, "buffer should stay empty");
-    }
-
-    /// Start a 9 bytes payload packet, but have the underlying reader return
-    /// EOF in the middle of the payload (4 bytes into the payload).
-    /// We should get an unexpected EOF error, after reading the first 4 bytes
-    /// (successfully).
-    #[tokio::test]
-    async fn read_9b_eof_during_payload() {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..8 + 4])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = [0; 9];
-
-        r.read_exact(&mut buf[..4]).await.expect("must succeed");
-
-        assert_eq!(
-            r.read_exact(&mut buf[4..=4])
-                .await
-                .expect_err("must fail")
-                .kind(),
-            std::io::ErrorKind::UnexpectedEof
-        );
-    }
-
-    /// Start a 9 bytes payload packet, but return an error at various stages *after* the actual payload.
-    /// read_exact with a 9 bytes buffer is expected to succeed, but any further
-    /// read, as well as read_to_end are expected to fail.
-    #[rstest]
-    #[case::before_padding(8 + 9)]
-    #[case::during_padding(8 + 9 + 2)]
-    #[case::after_padding(8 + 9 + padding_len(9) as usize)]
-    #[tokio::test]
-    async fn read_9b_eof_after_payload(#[case] offset: usize) {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..offset])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = [0; 9];
-
-        // read_exact of the payload will succeed, but a subsequent read will
-        // return UnexpectedEof error.
-        r.read_exact(&mut buf).await.expect("should succeed");
-        assert_eq!(
-            r.read_exact(&mut buf[4..=4])
-                .await
-                .expect_err("must fail")
-                .kind(),
-            std::io::ErrorKind::UnexpectedEof
-        );
-
-        // read_to_end will fail.
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..8 + payload.len()])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = Vec::new();
-        assert_eq!(
-            r.read_to_end(&mut buf).await.expect_err("must fail").kind(),
-            std::io::ErrorKind::UnexpectedEof
-        );
-    }
-
-    /// Start a 9 bytes payload packet, but return an error after a certain position.
-    /// Ensure that error is propagated.
-    #[rstest]
-    #[case::during_size(4)]
-    #[case::before_payload(8)]
-    #[case::during_payload(8 + 4)]
-    #[case::before_padding(8 + 4)]
-    #[case::during_padding(8 + 9 + 2)]
-    #[tokio::test]
-    async fn propagate_error_from_reader(#[case] offset: usize) {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..offset])
-            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = Vec::new();
-
-        let err = r.read_to_end(&mut buf).await.expect_err("must fail");
-        assert_eq!(
-            err.kind(),
-            std::io::ErrorKind::Other,
-            "error kind must match"
-        );
-
-        assert_eq!(
-            err.into_inner().unwrap().to_string(),
-            "foo",
-            "error payload must contain foo"
-        );
-    }
-
-    /// If there's an error right after the padding, we don't propagate it, as
-    /// we're done reading. We just return EOF.
-    #[tokio::test]
-    async fn no_error_after_eof() {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await)
-            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..MAX_LEN);
-        let mut buf = Vec::new();
-
-        r.read_to_end(&mut buf).await.expect("must succeed");
-        assert_eq!(buf.as_slice(), payload);
-    }
-
-    /// Introduce various stalls in various places of the packet, to ensure we
-    /// handle these cases properly, too.
-    #[rstest]
-    #[case::beginning(0)]
-    #[case::before_payload(8)]
-    #[case::during_payload(8 + 4)]
-    #[case::before_padding(8 + 4)]
-    #[case::during_padding(8 + 9 + 2)]
-    #[tokio::test]
-    async fn read_payload_correct_pending(#[case] offset: usize) {
-        let payload = &hex!("FF0102030405060708");
-        let mut mock = Builder::new()
-            .read(&produce_packet_bytes(payload).await[..offset])
-            .wait(Duration::from_nanos(0))
-            .read(&produce_packet_bytes(payload).await[offset..])
-            .build();
-
-        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64);
-        let mut buf = Vec::new();
-        r.read_to_end(&mut buf).await.expect("must succeed");
-
-        assert_eq!(payload, &buf[..]);
-    }
-}
diff --git a/tvix/nix-compat/src/wire/bytes/reader/mod.rs b/tvix/nix-compat/src/wire/bytes/reader/mod.rs
new file mode 100644
index 0000000000..6bd376c06f
--- /dev/null
+++ b/tvix/nix-compat/src/wire/bytes/reader/mod.rs
@@ -0,0 +1,684 @@
+use std::{
+    future::Future,
+    io,
+    num::NonZeroU64,
+    ops::RangeBounds,
+    pin::Pin,
+    task::{self, ready, Poll},
+};
+use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf};
+
+use trailer::{read_trailer, ReadTrailer, Trailer};
+
+#[doc(hidden)]
+pub use self::trailer::Pad;
+pub(crate) use self::trailer::Tag;
+mod trailer;
+
+/// Reads a "bytes wire packet" from the underlying reader.
+/// The format is the same as in [crate::wire::bytes::read_bytes],
+/// however this structure provides a [AsyncRead] interface,
+/// allowing to not having to pass around the entire payload in memory.
+///
+/// It is constructed by reading a size with [BytesReader::new],
+/// and yields payload data until the end of the packet is reached.
+///
+/// It will not return the final bytes before all padding has been successfully
+/// consumed as well, but the full length of the reader must be consumed.
+///
+/// If the data is not read all the way to the end, or an error is encountered,
+/// the underlying reader is no longer usable and might return garbage.
+#[derive(Debug)]
+#[allow(private_bounds)]
+pub struct BytesReader<R, T: Tag = Pad> {
+    state: State<R, T>,
+}
+
+/// Split the `user_len` into `body_len` and `tail_len`, which are respectively
+/// the non-terminal 8-byte blocks, and the ≤8 bytes of user data contained in
+/// the trailer block.
+#[inline(always)]
+fn split_user_len(user_len: NonZeroU64) -> (u64, u8) {
+    let n = user_len.get() - 1;
+    let body_len = n & !7;
+    let tail_len = (n & 7) as u8 + 1;
+    (body_len, tail_len)
+}
+
+#[derive(Debug)]
+enum State<R, T: Tag> {
+    /// Full 8-byte blocks are being read and released to the caller.
+    /// NOTE: The final 8-byte block is *always* part of the trailer.
+    Body {
+        reader: Option<R>,
+        consumed: u64,
+        /// The total length of all user data contained in both the body and trailer.
+        user_len: NonZeroU64,
+    },
+    /// The trailer is in the process of being read.
+    ReadTrailer(ReadTrailer<R, T>),
+    /// The trailer has been fully read and validated,
+    /// and data can now be released to the caller.
+    ReleaseTrailer { consumed: u8, data: Trailer },
+}
+
+impl<R> BytesReader<R>
+where
+    R: AsyncRead + Unpin,
+{
+    /// Constructs a new BytesReader, using the underlying passed reader.
+    pub async fn new<S: RangeBounds<u64>>(reader: R, allowed_size: S) -> io::Result<Self> {
+        BytesReader::new_internal(reader, allowed_size).await
+    }
+}
+
+#[allow(private_bounds)]
+impl<R, T: Tag> BytesReader<R, T>
+where
+    R: AsyncRead + Unpin,
+{
+    /// Constructs a new BytesReader, using the underlying passed reader.
+    pub(crate) async fn new_internal<S: RangeBounds<u64>>(
+        mut reader: R,
+        allowed_size: S,
+    ) -> io::Result<Self> {
+        let size = reader.read_u64_le().await?;
+
+        if !allowed_size.contains(&size) {
+            return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid size"));
+        }
+
+        Ok(Self {
+            state: match NonZeroU64::new(size) {
+                Some(size) => State::Body {
+                    reader: Some(reader),
+                    consumed: 0,
+                    user_len: size,
+                },
+                None => State::ReleaseTrailer {
+                    consumed: 0,
+                    data: read_trailer::<R, T>(reader, 0).await?,
+                },
+            },
+        })
+    }
+
+    /// Returns whether there is any remaining data to be read.
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    /// Remaining data length, ie not including data already read.
+    ///
+    /// If the size has not been read yet, this is [None].
+    pub fn len(&self) -> u64 {
+        match self.state {
+            State::Body {
+                consumed, user_len, ..
+            } => user_len.get() - consumed,
+            State::ReadTrailer(ref fut) => fut.len() as u64,
+            State::ReleaseTrailer { consumed, ref data } => data.len() as u64 - consumed as u64,
+        }
+    }
+}
+
+#[allow(private_bounds)]
+impl<R: AsyncRead + Unpin, T: Tag> AsyncRead for BytesReader<R, T> {
+    fn poll_read(
+        mut self: Pin<&mut Self>,
+        cx: &mut task::Context,
+        buf: &mut ReadBuf,
+    ) -> Poll<io::Result<()>> {
+        let this = &mut self.state;
+
+        loop {
+            match this {
+                State::Body {
+                    reader,
+                    consumed,
+                    user_len,
+                } => {
+                    let (body_len, tail_len) = split_user_len(*user_len);
+                    let remaining = body_len - *consumed;
+
+                    let reader = if remaining == 0 {
+                        let reader = reader.take().unwrap();
+                        *this = State::ReadTrailer(read_trailer(reader, tail_len));
+                        continue;
+                    } else {
+                        Pin::new(reader.as_mut().unwrap())
+                    };
+
+                    let mut bytes_read = 0;
+                    ready!(with_limited(buf, remaining, |buf| {
+                        let ret = reader.poll_read(cx, buf);
+                        bytes_read = buf.initialized().len();
+                        ret
+                    }))?;
+
+                    *consumed += bytes_read as u64;
+
+                    return if bytes_read != 0 {
+                        Ok(())
+                    } else {
+                        Err(io::ErrorKind::UnexpectedEof.into())
+                    }
+                    .into();
+                }
+                State::ReadTrailer(fut) => {
+                    *this = State::ReleaseTrailer {
+                        consumed: 0,
+                        data: ready!(Pin::new(fut).poll(cx))?,
+                    };
+                }
+                State::ReleaseTrailer { consumed, data } => {
+                    let data = &data[*consumed as usize..];
+                    let data = &data[..usize::min(data.len(), buf.remaining())];
+
+                    buf.put_slice(data);
+                    *consumed += data.len() as u8;
+
+                    return Ok(()).into();
+                }
+            }
+        }
+    }
+}
+
+#[allow(private_bounds)]
+impl<R: AsyncBufRead + Unpin, T: Tag> AsyncBufRead for BytesReader<R, T> {
+    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<io::Result<&[u8]>> {
+        let this = &mut self.get_mut().state;
+
+        loop {
+            match this {
+                // This state comes *after* the following case,
+                // but we can't keep it in logical order because
+                // that would lengthen the borrow lifetime.
+                State::Body {
+                    reader,
+                    consumed,
+                    user_len,
+                } if {
+                    let (body_len, _) = split_user_len(*user_len);
+                    let remaining = body_len - *consumed;
+
+                    remaining == 0
+                } =>
+                {
+                    let reader = reader.take().unwrap();
+                    let (_, tail_len) = split_user_len(*user_len);
+
+                    *this = State::ReadTrailer(read_trailer(reader, tail_len));
+                }
+                State::Body {
+                    reader,
+                    consumed,
+                    user_len,
+                } => {
+                    let (body_len, _) = split_user_len(*user_len);
+                    let remaining = body_len - *consumed;
+
+                    let reader = Pin::new(reader.as_mut().unwrap());
+
+                    match ready!(reader.poll_fill_buf(cx))? {
+                        &[] => {
+                            return Err(io::ErrorKind::UnexpectedEof.into()).into();
+                        }
+                        mut buf => {
+                            if buf.len() as u64 > remaining {
+                                buf = &buf[..remaining as usize];
+                            }
+
+                            return Ok(buf).into();
+                        }
+                    }
+                }
+                State::ReadTrailer(fut) => {
+                    *this = State::ReleaseTrailer {
+                        consumed: 0,
+                        data: ready!(Pin::new(fut).poll(cx))?,
+                    };
+                }
+                State::ReleaseTrailer { consumed, data } => {
+                    return Ok(&data[*consumed as usize..]).into();
+                }
+            }
+        }
+    }
+
+    fn consume(mut self: Pin<&mut Self>, amt: usize) {
+        match &mut self.state {
+            State::Body {
+                reader,
+                consumed,
+                user_len,
+            } => {
+                let reader = Pin::new(reader.as_mut().unwrap());
+                let (body_len, _) = split_user_len(*user_len);
+
+                *consumed = consumed
+                    .checked_add(amt as u64)
+                    .filter(|&consumed| consumed <= body_len)
+                    .expect("consumed out of bounds");
+
+                reader.consume(amt);
+            }
+            State::ReadTrailer(_) => unreachable!(),
+            State::ReleaseTrailer { consumed, data } => {
+                *consumed = amt
+                    .checked_add(*consumed as usize)
+                    .filter(|&consumed| consumed <= data.len())
+                    .expect("consumed out of bounds") as u8;
+            }
+        }
+    }
+}
+
+/// Make a limited version of `buf`, consisting only of up to `n` bytes of the unfilled section, and call `f` with it.
+/// After `f` returns, we propagate the filled cursor advancement back to `buf`.
+fn with_limited<R>(buf: &mut ReadBuf, n: u64, f: impl FnOnce(&mut ReadBuf) -> R) -> R {
+    let mut nbuf = buf.take(n.try_into().unwrap_or(usize::MAX));
+    let ptr = nbuf.initialized().as_ptr();
+    let ret = f(&mut nbuf);
+
+    // SAFETY: `ReadBuf::take` only returns the *unfilled* section of `buf`,
+    // so anything filled is new, initialized data.
+    //
+    // We verify that `nbuf` still points to the same buffer,
+    // so we're sure it hasn't been swapped out.
+    unsafe {
+        // ensure our buffer hasn't been swapped out
+        assert_eq!(nbuf.initialized().as_ptr(), ptr);
+
+        let n = nbuf.filled().len();
+        buf.assume_init(n);
+        buf.advance(n);
+    }
+
+    ret
+}
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use crate::wire::bytes::{padding_len, write_bytes};
+    use hex_literal::hex;
+    use lazy_static::lazy_static;
+    use rstest::rstest;
+    use tokio::io::{AsyncReadExt, BufReader};
+    use tokio_test::io::Builder;
+
+    use super::*;
+
+    /// The maximum length of bytes packets we're willing to accept in the test
+    /// cases.
+    const MAX_LEN: u64 = 1024;
+
+    lazy_static! {
+        pub static ref LARGE_PAYLOAD: Vec<u8> = (0..255).collect::<Vec<u8>>().repeat(4 * 1024);
+    }
+
+    /// Helper function, calling the (simpler) write_bytes with the payload.
+    /// We use this to create data we want to read from the wire.
+    async fn produce_packet_bytes(payload: &[u8]) -> Vec<u8> {
+        let mut exp = vec![];
+        write_bytes(&mut exp, payload).await.unwrap();
+        exp
+    }
+
+    /// Read bytes packets of various length, and ensure read_to_end returns the
+    /// expected payload.
+    #[rstest]
+    #[case::empty(&[])] // empty bytes packet
+    #[case::size_1b(&[0xff])] // 1 bytes payload
+    #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding)
+    #[case::size_9b(&hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding)
+    #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet
+    #[tokio::test]
+    async fn read_payload_correct(#[case] payload: &[u8]) {
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await)
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64)
+            .await
+            .unwrap();
+        let mut buf = Vec::new();
+        r.read_to_end(&mut buf).await.expect("must succeed");
+
+        assert_eq!(payload, &buf[..]);
+    }
+
+    /// Read bytes packets of various length, and ensure copy_buf reads the
+    /// expected payload.
+    #[rstest]
+    #[case::empty(&[])] // empty bytes packet
+    #[case::size_1b(&[0xff])] // 1 bytes payload
+    #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding)
+    #[case::size_9b(&hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding)
+    #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet
+    #[tokio::test]
+    async fn read_payload_correct_readbuf(#[case] payload: &[u8]) {
+        let mut mock = BufReader::new(
+            Builder::new()
+                .read(&produce_packet_bytes(payload).await)
+                .build(),
+        );
+
+        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64)
+            .await
+            .unwrap();
+
+        let mut buf = Vec::new();
+        tokio::io::copy_buf(&mut r, &mut buf)
+            .await
+            .expect("copy_buf must succeed");
+
+        assert_eq!(payload, &buf[..]);
+    }
+
+    /// Fail if the bytes packet is larger than allowed
+    #[tokio::test]
+    async fn read_bigger_than_allowed_fail() {
+        let payload = LARGE_PAYLOAD.as_slice();
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet
+            .build();
+
+        assert_eq!(
+            BytesReader::new(&mut mock, ..2048)
+                .await
+                .unwrap_err()
+                .kind(),
+            io::ErrorKind::InvalidData
+        );
+    }
+
+    /// Fail if the bytes packet is smaller than allowed
+    #[tokio::test]
+    async fn read_smaller_than_allowed_fail() {
+        let payload = &[0x00, 0x01, 0x02];
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet
+            .build();
+
+        assert_eq!(
+            BytesReader::new(&mut mock, 1024..2048)
+                .await
+                .unwrap_err()
+                .kind(),
+            io::ErrorKind::InvalidData
+        );
+    }
+
+    /// Read the trailer immediately if there is no payload.
+    #[tokio::test]
+    async fn read_trailer_immediately() {
+        use crate::nar::wire::PadPar;
+
+        let mut mock = Builder::new()
+            .read(&[0; 8])
+            .read(&PadPar::PATTERN[8..])
+            .build();
+
+        BytesReader::<_, PadPar>::new_internal(&mut mock, ..)
+            .await
+            .unwrap();
+
+        // The mock reader will panic if dropped without reading all data.
+    }
+
+    /// Read the trailer even if we only read the exact payload size.
+    #[tokio::test]
+    async fn read_exact_trailer() {
+        use crate::nar::wire::PadPar;
+
+        let mut mock = Builder::new()
+            .read(&16u64.to_le_bytes())
+            .read(&[0x55; 16])
+            .read(&PadPar::PATTERN[8..])
+            .build();
+
+        let mut reader = BytesReader::<_, PadPar>::new_internal(&mut mock, ..)
+            .await
+            .unwrap();
+
+        let mut buf = [0; 16];
+        reader.read_exact(&mut buf).await.unwrap();
+        assert_eq!(buf, [0x55; 16]);
+
+        // The mock reader will panic if dropped without reading all data.
+    }
+
+    /// Fail if the padding is not all zeroes
+    #[tokio::test]
+    async fn read_fail_if_nonzero_padding() {
+        let payload = &[0x00, 0x01, 0x02];
+        let mut packet_bytes = produce_packet_bytes(payload).await;
+        // Flip some bits in the padding
+        packet_bytes[12] = 0xff;
+        let mut mock = Builder::new().read(&packet_bytes).build(); // We stop reading after the faulty bit
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+        let mut buf = Vec::new();
+
+        r.read_to_end(&mut buf).await.expect_err("must fail");
+    }
+
+    /// Start a 9 bytes payload packet, but have the underlying reader return
+    /// EOF in the middle of the size packet (after 4 bytes).
+    /// We should get an unexpected EOF error, already when trying to read the
+    /// first byte (of payload)
+    #[tokio::test]
+    async fn read_9b_eof_during_size() {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..4])
+            .build();
+
+        assert_eq!(
+            BytesReader::new(&mut mock, ..MAX_LEN)
+                .await
+                .expect_err("must fail")
+                .kind(),
+            io::ErrorKind::UnexpectedEof
+        );
+    }
+
+    /// Start a 9 bytes payload packet, but have the underlying reader return
+    /// EOF in the middle of the payload (4 bytes into the payload).
+    /// We should get an unexpected EOF error, after reading the first 4 bytes
+    /// (successfully).
+    #[tokio::test]
+    async fn read_9b_eof_during_payload() {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..8 + 4])
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+        let mut buf = [0; 9];
+
+        r.read_exact(&mut buf[..4]).await.expect("must succeed");
+
+        assert_eq!(
+            r.read_exact(&mut buf[4..=4])
+                .await
+                .expect_err("must fail")
+                .kind(),
+            std::io::ErrorKind::UnexpectedEof
+        );
+    }
+
+    /// Start a 9 bytes payload packet, but don't supply the necessary padding.
+    /// This is expected to always fail before returning the final data.
+    #[rstest]
+    #[case::before_padding(8 + 9)]
+    #[case::during_padding(8 + 9 + 2)]
+    #[case::after_padding(8 + 9 + padding_len(9) as usize - 1)]
+    #[tokio::test]
+    async fn read_9b_eof_after_payload(#[case] offset: usize) {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..offset])
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+
+        // read_exact of the payload *body* will succeed, but a subsequent read will
+        // return UnexpectedEof error.
+        assert_eq!(r.read_exact(&mut [0; 8]).await.unwrap(), 8);
+        assert_eq!(
+            r.read_exact(&mut [0]).await.unwrap_err().kind(),
+            std::io::ErrorKind::UnexpectedEof
+        );
+    }
+
+    /// Start a 9 bytes payload packet, but return an error after a certain position.
+    /// Ensure that error is propagated.
+    #[rstest]
+    #[case::during_size(4)]
+    #[case::before_payload(8)]
+    #[case::during_payload(8 + 4)]
+    #[case::before_padding(8 + 4)]
+    #[case::during_padding(8 + 9 + 2)]
+    #[tokio::test]
+    async fn propagate_error_from_reader(#[case] offset: usize) {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..offset])
+            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
+            .build();
+
+        // Either length reading or data reading can fail, depending on which test case we're in.
+        let err: io::Error = async {
+            let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await?;
+            let mut buf = Vec::new();
+
+            r.read_to_end(&mut buf).await?;
+
+            Ok(())
+        }
+        .await
+        .expect_err("must fail");
+
+        assert_eq!(
+            err.kind(),
+            std::io::ErrorKind::Other,
+            "error kind must match"
+        );
+
+        assert_eq!(
+            err.into_inner().unwrap().to_string(),
+            "foo",
+            "error payload must contain foo"
+        );
+    }
+
+    /// Start a 9 bytes payload packet, but return an error after a certain position.
+    /// Ensure that error is propagated (AsyncReadBuf case)
+    #[rstest]
+    #[case::during_size(4)]
+    #[case::before_payload(8)]
+    #[case::during_payload(8 + 4)]
+    #[case::before_padding(8 + 4)]
+    #[case::during_padding(8 + 9 + 2)]
+    #[tokio::test]
+    async fn propagate_error_from_reader_buffered(#[case] offset: usize) {
+        let payload = &hex!("FF0102030405060708");
+        let mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..offset])
+            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
+            .build();
+        let mut mock = BufReader::new(mock);
+
+        // Either length reading or data reading can fail, depending on which test case we're in.
+        let err: io::Error = async {
+            let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await?;
+            let mut buf = Vec::new();
+
+            tokio::io::copy_buf(&mut r, &mut buf).await?;
+
+            Ok(())
+        }
+        .await
+        .expect_err("must fail");
+
+        assert_eq!(
+            err.kind(),
+            std::io::ErrorKind::Other,
+            "error kind must match"
+        );
+
+        assert_eq!(
+            err.into_inner().unwrap().to_string(),
+            "foo",
+            "error payload must contain foo"
+        );
+    }
+
+    /// If there's an error right after the padding, we don't propagate it, as
+    /// we're done reading. We just return EOF.
+    #[tokio::test]
+    async fn no_error_after_eof() {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await)
+            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+        let mut buf = Vec::new();
+
+        r.read_to_end(&mut buf).await.expect("must succeed");
+        assert_eq!(buf.as_slice(), payload);
+    }
+
+    /// If there's an error right after the padding, we don't propagate it, as
+    /// we're done reading. We just return EOF.
+    #[tokio::test]
+    async fn no_error_after_eof_buffered() {
+        let payload = &hex!("FF0102030405060708");
+        let mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await)
+            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
+            .build();
+        let mut mock = BufReader::new(mock);
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+        let mut buf = Vec::new();
+
+        tokio::io::copy_buf(&mut r, &mut buf)
+            .await
+            .expect("must succeed");
+        assert_eq!(buf.as_slice(), payload);
+    }
+
+    /// Introduce various stalls in various places of the packet, to ensure we
+    /// handle these cases properly, too.
+    #[rstest]
+    #[case::beginning(0)]
+    #[case::before_payload(8)]
+    #[case::during_payload(8 + 4)]
+    #[case::before_padding(8 + 4)]
+    #[case::during_padding(8 + 9 + 2)]
+    #[tokio::test]
+    async fn read_payload_correct_pending(#[case] offset: usize) {
+        let payload = &hex!("FF0102030405060708");
+        let mut mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..offset])
+            .wait(Duration::from_nanos(0))
+            .read(&produce_packet_bytes(payload).await[offset..])
+            .build();
+
+        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64)
+            .await
+            .unwrap();
+        let mut buf = Vec::new();
+        r.read_to_end(&mut buf).await.expect("must succeed");
+
+        assert_eq!(payload, &buf[..]);
+    }
+}
diff --git a/tvix/nix-compat/src/wire/bytes/reader/trailer.rs b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs
new file mode 100644
index 0000000000..3a5bb75e71
--- /dev/null
+++ b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs
@@ -0,0 +1,197 @@
+use std::{
+    fmt::Debug,
+    future::Future,
+    marker::PhantomData,
+    ops::Deref,
+    pin::Pin,
+    task::{self, ready, Poll},
+};
+
+use tokio::io::{self, AsyncRead, ReadBuf};
+
+/// Trailer represents up to 8 bytes of data read as part of the trailer block(s)
+#[derive(Debug)]
+pub(crate) struct Trailer {
+    data_len: u8,
+    buf: [u8; 8],
+}
+
+impl Deref for Trailer {
+    type Target = [u8];
+
+    fn deref(&self) -> &Self::Target {
+        &self.buf[..self.data_len as usize]
+    }
+}
+
+/// Tag defines a "trailer tag": specific, fixed bytes that must follow wire data.
+pub(crate) trait Tag {
+    /// The expected suffix
+    ///
+    /// The first 8 bytes may be ignored, and it must be an 8-byte aligned size.
+    const PATTERN: &'static [u8];
+
+    /// Suitably sized buffer for reading [Self::PATTERN]
+    ///
+    /// HACK: This is a workaround for const generics limitations.
+    type Buf: AsRef<[u8]> + AsMut<[u8]> + Debug + Unpin;
+
+    /// Make an instance of [Self::Buf]
+    fn make_buf() -> Self::Buf;
+}
+
+#[derive(Debug)]
+pub enum Pad {}
+
+impl Tag for Pad {
+    const PATTERN: &'static [u8] = &[0; 8];
+
+    type Buf = [u8; 8];
+
+    fn make_buf() -> Self::Buf {
+        [0; 8]
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct ReadTrailer<R, T: Tag> {
+    reader: R,
+    data_len: u8,
+    filled: u8,
+    buf: T::Buf,
+    _phantom: PhantomData<fn(T) -> T>,
+}
+
+/// read_trailer returns a [Future] that reads a trailer with a given [Tag] from `reader`
+pub(crate) fn read_trailer<R: AsyncRead + Unpin, T: Tag>(
+    reader: R,
+    data_len: u8,
+) -> ReadTrailer<R, T> {
+    assert!(data_len <= 8, "payload in trailer must be <= 8 bytes");
+
+    let buf = T::make_buf();
+    assert_eq!(buf.as_ref().len(), T::PATTERN.len());
+    assert_eq!(T::PATTERN.len() % 8, 0);
+
+    ReadTrailer {
+        reader,
+        data_len,
+        filled: if data_len != 0 { 0 } else { 8 },
+        buf,
+        _phantom: PhantomData,
+    }
+}
+
+impl<R, T: Tag> ReadTrailer<R, T> {
+    pub fn len(&self) -> u8 {
+        self.data_len
+    }
+}
+
+impl<R: AsyncRead + Unpin, T: Tag> Future for ReadTrailer<R, T> {
+    type Output = io::Result<Trailer>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
+        let this = &mut *self;
+
+        loop {
+            if this.filled >= this.data_len {
+                let check_range = || this.data_len as usize..this.filled as usize;
+
+                if this.buf.as_ref()[check_range()] != T::PATTERN[check_range()] {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        "invalid trailer",
+                    ))
+                    .into();
+                }
+            }
+
+            if this.filled as usize == T::PATTERN.len() {
+                let mut buf = [0; 8];
+                buf.copy_from_slice(&this.buf.as_ref()[..8]);
+
+                return Ok(Trailer {
+                    data_len: this.data_len,
+                    buf,
+                })
+                .into();
+            }
+
+            let mut buf = ReadBuf::new(this.buf.as_mut());
+            buf.advance(this.filled as usize);
+
+            ready!(Pin::new(&mut this.reader).poll_read(cx, &mut buf))?;
+
+            this.filled = {
+                let filled = buf.filled().len() as u8;
+
+                if filled == this.filled {
+                    return Err(io::ErrorKind::UnexpectedEof.into()).into();
+                }
+
+                filled
+            };
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn unexpected_eof() {
+        let reader = tokio_test::io::Builder::new()
+            .read(&[0xed])
+            .wait(Duration::ZERO)
+            .read(&[0xef, 0x00])
+            .build();
+
+        assert_eq!(
+            read_trailer::<_, Pad>(reader, 2).await.unwrap_err().kind(),
+            io::ErrorKind::UnexpectedEof
+        );
+    }
+
+    #[tokio::test]
+    async fn invalid_padding() {
+        let reader = tokio_test::io::Builder::new()
+            .read(&[0xed])
+            .wait(Duration::ZERO)
+            .read(&[0xef, 0x01, 0x00])
+            .wait(Duration::ZERO)
+            .build();
+
+        assert_eq!(
+            read_trailer::<_, Pad>(reader, 2).await.unwrap_err().kind(),
+            io::ErrorKind::InvalidData
+        );
+    }
+
+    #[tokio::test]
+    async fn success() {
+        let reader = tokio_test::io::Builder::new()
+            .read(&[0xed])
+            .wait(Duration::ZERO)
+            .read(&[0xef, 0x00])
+            .wait(Duration::ZERO)
+            .read(&[0x00, 0x00, 0x00, 0x00, 0x00])
+            .build();
+
+        assert_eq!(
+            &*read_trailer::<_, Pad>(reader, 2).await.unwrap(),
+            &[0xed, 0xef]
+        );
+    }
+
+    #[tokio::test]
+    async fn no_padding() {
+        assert!(read_trailer::<_, Pad>(io::empty(), 0)
+            .await
+            .unwrap()
+            .is_empty());
+    }
+}
diff --git a/tvix/nix-compat/src/wire/bytes/writer.rs b/tvix/nix-compat/src/wire/bytes/writer.rs
index 347934b3dc..f5632771e9 100644
--- a/tvix/nix-compat/src/wire/bytes/writer.rs
+++ b/tvix/nix-compat/src/wire/bytes/writer.rs
@@ -3,7 +3,7 @@ use std::task::{ready, Poll};
 
 use tokio::io::AsyncWrite;
 
-use super::{padding_len, BytesPacketPosition, EMPTY_BYTES, LEN_SIZE};
+use super::{padding_len, EMPTY_BYTES, LEN_SIZE};
 
 pin_project! {
     /// Writes a "bytes wire packet" to the underlying writer.
@@ -41,6 +41,22 @@ pin_project! {
     }
 }
 
+/// Models the position inside a "bytes wire packet" that the writer is in.
+/// It can be in three different stages, inside size, payload or padding fields.
+/// The number tracks the number of bytes written inside the specific field.
+/// There shall be no ambiguous states, at the end of a stage we immediately
+/// move to the beginning of the next one:
+/// - Size(LEN_SIZE) must be expressed as Payload(0)
+/// - Payload(self.payload_len) must be expressed as Padding(0)
+///
+/// Padding(padding_len) means we're at the end of the bytes wire packet.
+#[derive(Clone, Debug, PartialEq, Eq)]
+enum BytesPacketPosition {
+    Size(usize),
+    Payload(u64),
+    Padding(usize),
+}
+
 impl<W> BytesWriter<W>
 where
     W: AsyncWrite,
diff --git a/tvix/nix-compat/src/wire/mod.rs b/tvix/nix-compat/src/wire/mod.rs
index 65c053d58e..a197e3a1f4 100644
--- a/tvix/nix-compat/src/wire/mod.rs
+++ b/tvix/nix-compat/src/wire/mod.rs
@@ -3,6 +3,3 @@
 
 mod bytes;
 pub use bytes::*;
-
-mod primitive;
-pub use primitive::*;
diff --git a/tvix/nix-compat/src/wire/primitive.rs b/tvix/nix-compat/src/wire/primitive.rs
deleted file mode 100644
index ee0f5fc427..0000000000
--- a/tvix/nix-compat/src/wire/primitive.rs
+++ /dev/null
@@ -1,74 +0,0 @@
-// SPDX-FileCopyrightText: 2023 embr <git@liclac.eu>
-//
-// SPDX-License-Identifier: EUPL-1.2
-
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-
-#[allow(dead_code)]
-/// Read a u64 from the AsyncRead (little endian).
-pub async fn read_u64<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<u64> {
-    r.read_u64_le().await
-}
-
-/// Write a u64 to the AsyncWrite (little endian).
-pub async fn write_u64<W: AsyncWrite + Unpin>(w: &mut W, v: u64) -> std::io::Result<()> {
-    w.write_u64_le(v).await
-}
-
-#[allow(dead_code)]
-/// Read a boolean from the AsyncRead, encoded as u64 (>0 is true).
-pub async fn read_bool<R: AsyncRead + Unpin>(r: &mut R) -> std::io::Result<bool> {
-    Ok(read_u64(r).await? > 0)
-}
-
-#[allow(dead_code)]
-/// Write a boolean to the AsyncWrite, encoded as u64 (>0 is true).
-pub async fn write_bool<W: AsyncWrite + Unpin>(w: &mut W, v: bool) -> std::io::Result<()> {
-    write_u64(w, if v { 1u64 } else { 0u64 }).await
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use tokio_test::io::Builder;
-
-    // Integers.
-    #[tokio::test]
-    async fn test_read_u64() {
-        let mut mock = Builder::new().read(&1234567890u64.to_le_bytes()).build();
-        assert_eq!(1234567890u64, read_u64(&mut mock).await.unwrap());
-    }
-    #[tokio::test]
-    async fn test_write_u64() {
-        let mut mock = Builder::new().write(&1234567890u64.to_le_bytes()).build();
-        write_u64(&mut mock, 1234567890).await.unwrap();
-    }
-
-    // Booleans.
-    #[tokio::test]
-    async fn test_read_bool_0() {
-        let mut mock = Builder::new().read(&0u64.to_le_bytes()).build();
-        assert!(!read_bool(&mut mock).await.unwrap());
-    }
-    #[tokio::test]
-    async fn test_read_bool_1() {
-        let mut mock = Builder::new().read(&1u64.to_le_bytes()).build();
-        assert!(read_bool(&mut mock).await.unwrap());
-    }
-    #[tokio::test]
-    async fn test_read_bool_2() {
-        let mut mock = Builder::new().read(&2u64.to_le_bytes()).build();
-        assert!(read_bool(&mut mock).await.unwrap());
-    }
-
-    #[tokio::test]
-    async fn test_write_bool_false() {
-        let mut mock = Builder::new().write(&0u64.to_le_bytes()).build();
-        write_bool(&mut mock, false).await.unwrap();
-    }
-    #[tokio::test]
-    async fn test_write_bool_true() {
-        let mut mock = Builder::new().write(&1u64.to_le_bytes()).build();
-        write_bool(&mut mock, true).await.unwrap();
-    }
-}