about summary refs log tree commit diff
path: root/tvix/nix-compat
diff options
context:
space:
mode:
authorBrian Olsen <brian@maven-group.org>2024-11-03T19·42+0100
committerclbot <clbot@tvl.fyi>2024-11-04T20·02+0000
commitb88579ade41244b09555bbb68296033fc300043f (patch)
tree4bfeb72e232a711e1d632a907a80b6fb4d0d6ba0 /tvix/nix-compat
parent6582fa69f15c8337cb3a16e74062037a7278020f (diff)
feat(tvix/nix-compat): Add nix serialization support r/8893
This change implements the serialization part that is needed to
implement the nix daemon protocol. Previously was add deserialization
and derivers for that and this then adds the other part of that equation
so that you can write types that can then be read using deserialization.

Change-Id: I2917de634980a93822a4f5a8ad38897b9ce16d89
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12729
Autosubmit: Brian Olsen <me@griff.name>
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/nix-compat')
-rw-r--r--tvix/nix-compat/Cargo.toml1
-rw-r--r--tvix/nix-compat/src/nix_daemon/mod.rs1
-rw-r--r--tvix/nix-compat/src/nix_daemon/ser/bytes.rs89
-rw-r--r--tvix/nix-compat/src/nix_daemon/ser/collections.rs94
-rw-r--r--tvix/nix-compat/src/nix_daemon/ser/display.rs8
-rw-r--r--tvix/nix-compat/src/nix_daemon/ser/int.rs108
-rw-r--r--tvix/nix-compat/src/nix_daemon/ser/mock.rs672
-rw-r--r--tvix/nix-compat/src/nix_daemon/ser/mod.rs124
-rw-r--r--tvix/nix-compat/src/nix_daemon/ser/writer.rs308
-rw-r--r--tvix/nix-compat/src/wire/bytes/mod.rs2
10 files changed, 1406 insertions, 1 deletions
diff --git a/tvix/nix-compat/Cargo.toml b/tvix/nix-compat/Cargo.toml
index f430a5461829..cbbf97175d14 100644
--- a/tvix/nix-compat/Cargo.toml
+++ b/tvix/nix-compat/Cargo.toml
@@ -43,6 +43,7 @@ futures = { workspace = true }
 hex-literal = { workspace = true }
 mimalloc = { workspace = true }
 pretty_assertions = { workspace = true }
+proptest = { workspace = true, features = ["std", "alloc", "tempfile"] }
 rstest = { workspace = true }
 serde_json = { workspace = true }
 smol_str = { workspace = true }
diff --git a/tvix/nix-compat/src/nix_daemon/mod.rs b/tvix/nix-compat/src/nix_daemon/mod.rs
index 11413e85fd1b..a943b279f891 100644
--- a/tvix/nix-compat/src/nix_daemon/mod.rs
+++ b/tvix/nix-compat/src/nix_daemon/mod.rs
@@ -4,3 +4,4 @@ mod protocol_version;
 pub use protocol_version::ProtocolVersion;
 
 pub mod de;
+pub mod ser;
diff --git a/tvix/nix-compat/src/nix_daemon/ser/bytes.rs b/tvix/nix-compat/src/nix_daemon/ser/bytes.rs
new file mode 100644
index 000000000000..19494934ff32
--- /dev/null
+++ b/tvix/nix-compat/src/nix_daemon/ser/bytes.rs
@@ -0,0 +1,89 @@
+use bytes::Bytes;
+
+use super::{NixSerialize, NixWrite};
+
+impl NixSerialize for Bytes {
+    async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
+    where
+        W: NixWrite,
+    {
+        writer.write_slice(self).await
+    }
+}
+
+impl<'a> NixSerialize for &'a [u8] {
+    async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
+    where
+        W: NixWrite,
+    {
+        writer.write_slice(self).await
+    }
+}
+
+impl NixSerialize for String {
+    async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
+    where
+        W: NixWrite,
+    {
+        writer.write_slice(self.as_bytes()).await
+    }
+}
+
+impl NixSerialize for str {
+    async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
+    where
+        W: NixWrite,
+    {
+        writer.write_slice(self.as_bytes()).await
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use hex_literal::hex;
+    use rstest::rstest;
+    use tokio::io::AsyncWriteExt as _;
+    use tokio_test::io::Builder;
+
+    use crate::nix_daemon::ser::{NixWrite, NixWriter};
+
+    #[rstest]
+    #[case::empty("", &hex!("0000 0000 0000 0000"))]
+    #[case::one(")", &hex!("0100 0000 0000 0000 2900 0000 0000 0000"))]
+    #[case::two("it", &hex!("0200 0000 0000 0000 6974 0000 0000 0000"))]
+    #[case::three("tea", &hex!("0300 0000 0000 0000 7465 6100 0000 0000"))]
+    #[case::four("were", &hex!("0400 0000 0000 0000 7765 7265 0000 0000"))]
+    #[case::five("where", &hex!("0500 0000 0000 0000 7768 6572 6500 0000"))]
+    #[case::six("unwrap", &hex!("0600 0000 0000 0000 756E 7772 6170 0000"))]
+    #[case::seven("where's", &hex!("0700 0000 0000 0000 7768 6572 6527 7300"))]
+    #[case::aligned("read_tea", &hex!("0800 0000 0000 0000 7265 6164 5F74 6561"))]
+    #[case::more_bytes("read_tess", &hex!("0900 0000 0000 0000 7265 6164 5F74 6573 7300 0000 0000 0000"))]
+    #[case::utf8("The quick brown 🦊 jumps over 13 lazy 🐶.", &hex!("2D00 0000 0000 0000  5468 6520 7175 6963  6b20 6272 6f77 6e20  f09f a68a 206a 756d  7073 206f 7665 7220  3133 206c 617a 7920  f09f 90b6 2e00 0000"))]
+    #[tokio::test]
+    async fn test_write_str(#[case] value: &str, #[case] data: &[u8]) {
+        let mock = Builder::new().write(data).build();
+        let mut writer = NixWriter::new(mock);
+        writer.write_value(value).await.unwrap();
+        writer.flush().await.unwrap();
+    }
+
+    #[rstest]
+    #[case::empty("", &hex!("0000 0000 0000 0000"))]
+    #[case::one(")", &hex!("0100 0000 0000 0000 2900 0000 0000 0000"))]
+    #[case::two("it", &hex!("0200 0000 0000 0000 6974 0000 0000 0000"))]
+    #[case::three("tea", &hex!("0300 0000 0000 0000 7465 6100 0000 0000"))]
+    #[case::four("were", &hex!("0400 0000 0000 0000 7765 7265 0000 0000"))]
+    #[case::five("where", &hex!("0500 0000 0000 0000 7768 6572 6500 0000"))]
+    #[case::six("unwrap", &hex!("0600 0000 0000 0000 756E 7772 6170 0000"))]
+    #[case::seven("where's", &hex!("0700 0000 0000 0000 7768 6572 6527 7300"))]
+    #[case::aligned("read_tea", &hex!("0800 0000 0000 0000 7265 6164 5F74 6561"))]
+    #[case::more_bytes("read_tess", &hex!("0900 0000 0000 0000 7265 6164 5F74 6573 7300 0000 0000 0000"))]
+    #[case::utf8("The quick brown 🦊 jumps over 13 lazy 🐶.", &hex!("2D00 0000 0000 0000  5468 6520 7175 6963  6b20 6272 6f77 6e20  f09f a68a 206a 756d  7073 206f 7665 7220  3133 206c 617a 7920  f09f 90b6 2e00 0000"))]
+    #[tokio::test]
+    async fn test_write_string(#[case] value: &str, #[case] data: &[u8]) {
+        let mock = Builder::new().write(data).build();
+        let mut writer = NixWriter::new(mock);
+        writer.write_value(&value.to_string()).await.unwrap();
+        writer.flush().await.unwrap();
+    }
+}
diff --git a/tvix/nix-compat/src/nix_daemon/ser/collections.rs b/tvix/nix-compat/src/nix_daemon/ser/collections.rs
new file mode 100644
index 000000000000..70c32e1c79ac
--- /dev/null
+++ b/tvix/nix-compat/src/nix_daemon/ser/collections.rs
@@ -0,0 +1,94 @@
+use std::collections::BTreeMap;
+use std::future::Future;
+
+use super::{NixSerialize, NixWrite};
+
+impl<T> NixSerialize for Vec<T>
+where
+    T: NixSerialize + Send + Sync,
+{
+    #[allow(clippy::manual_async_fn)]
+    fn serialize<W>(&self, writer: &mut W) -> impl Future<Output = Result<(), W::Error>> + Send
+    where
+        W: NixWrite,
+    {
+        async move {
+            writer.write_value(&self.len()).await?;
+            for value in self.iter() {
+                writer.write_value(value).await?;
+            }
+            Ok(())
+        }
+    }
+}
+
+impl<K, V> NixSerialize for BTreeMap<K, V>
+where
+    K: NixSerialize + Ord + Send + Sync,
+    V: NixSerialize + Send + Sync,
+{
+    #[allow(clippy::manual_async_fn)]
+    fn serialize<W>(&self, writer: &mut W) -> impl Future<Output = Result<(), W::Error>> + Send
+    where
+        W: NixWrite,
+    {
+        async move {
+            writer.write_value(&self.len()).await?;
+            for (key, value) in self.iter() {
+                writer.write_value(key).await?;
+                writer.write_value(value).await?;
+            }
+            Ok(())
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::collections::BTreeMap;
+    use std::fmt;
+
+    use hex_literal::hex;
+    use rstest::rstest;
+    use tokio::io::AsyncWriteExt as _;
+    use tokio_test::io::Builder;
+
+    use crate::nix_daemon::ser::{NixSerialize, NixWrite, NixWriter};
+
+    #[rstest]
+    #[case::empty(vec![], &hex!("0000 0000 0000 0000"))]
+    #[case::one(vec![0x29], &hex!("0100 0000 0000 0000 2900 0000 0000 0000"))]
+    #[case::two(vec![0x7469, 10], &hex!("0200 0000 0000 0000 6974 0000 0000 0000 0A00 0000 0000 0000"))]
+    #[tokio::test]
+    async fn test_write_small_vec(#[case] value: Vec<usize>, #[case] data: &[u8]) {
+        let mock = Builder::new().write(data).build();
+        let mut writer = NixWriter::new(mock);
+        writer.write_value(&value).await.unwrap();
+        writer.flush().await.unwrap();
+    }
+
+    fn empty_map() -> BTreeMap<usize, u64> {
+        BTreeMap::new()
+    }
+    macro_rules! map {
+        ($($key:expr => $value:expr),*) => {{
+            let mut ret = BTreeMap::new();
+            $(ret.insert($key, $value);)*
+            ret
+        }};
+    }
+
+    #[rstest]
+    #[case::empty(empty_map(), &hex!("0000 0000 0000 0000"))]
+    #[case::one(map![0x7469usize => 10u64], &hex!("0100 0000 0000 0000 6974 0000 0000 0000 0A00 0000 0000 0000"))]
+    #[tokio::test]
+    async fn test_write_small_btree_map<E>(#[case] value: E, #[case] data: &[u8])
+    where
+        E: NixSerialize + Send + PartialEq + fmt::Debug,
+    {
+        let mock = Builder::new().write(data).build();
+        let mut writer = NixWriter::new(mock);
+        writer.write_value(&value).await.unwrap();
+        writer.flush().await.unwrap();
+    }
+}
diff --git a/tvix/nix-compat/src/nix_daemon/ser/display.rs b/tvix/nix-compat/src/nix_daemon/ser/display.rs
new file mode 100644
index 000000000000..a3438d50d8ff
--- /dev/null
+++ b/tvix/nix-compat/src/nix_daemon/ser/display.rs
@@ -0,0 +1,8 @@
+use nix_compat_derive::nix_serialize_remote;
+
+use crate::nixhash;
+
+nix_serialize_remote!(
+    #[nix(display)]
+    nixhash::HashAlgo
+);
diff --git a/tvix/nix-compat/src/nix_daemon/ser/int.rs b/tvix/nix-compat/src/nix_daemon/ser/int.rs
new file mode 100644
index 000000000000..1be06442e322
--- /dev/null
+++ b/tvix/nix-compat/src/nix_daemon/ser/int.rs
@@ -0,0 +1,108 @@
+#[cfg(feature = "nix-compat-derive")]
+use nix_compat_derive::nix_serialize_remote;
+
+use super::{Error, NixSerialize, NixWrite};
+
+impl NixSerialize for u64 {
+    async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
+    where
+        W: NixWrite,
+    {
+        writer.write_number(*self).await
+    }
+}
+
+impl NixSerialize for usize {
+    async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
+    where
+        W: NixWrite,
+    {
+        let v = (*self).try_into().map_err(W::Error::unsupported_data)?;
+        writer.write_number(v).await
+    }
+}
+
+#[cfg(feature = "nix-compat-derive")]
+nix_serialize_remote!(
+    #[nix(into = "u64")]
+    u8
+);
+#[cfg(feature = "nix-compat-derive")]
+nix_serialize_remote!(
+    #[nix(into = "u64")]
+    u16
+);
+#[cfg(feature = "nix-compat-derive")]
+nix_serialize_remote!(
+    #[nix(into = "u64")]
+    u32
+);
+
+impl NixSerialize for bool {
+    async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
+    where
+        W: NixWrite,
+    {
+        if *self {
+            writer.write_number(1).await
+        } else {
+            writer.write_number(0).await
+        }
+    }
+}
+
+impl NixSerialize for i64 {
+    async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
+    where
+        W: NixWrite,
+    {
+        writer.write_number(*self as u64).await
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use hex_literal::hex;
+    use rstest::rstest;
+    use tokio::io::AsyncWriteExt as _;
+    use tokio_test::io::Builder;
+
+    use crate::nix_daemon::ser::{NixWrite, NixWriter};
+
+    #[rstest]
+    #[case::simple_false(false, &hex!("0000 0000 0000 0000"))]
+    #[case::simple_true(true, &hex!("0100 0000 0000 0000"))]
+    #[tokio::test]
+    async fn test_write_bool(#[case] value: bool, #[case] expected: &[u8]) {
+        let mock = Builder::new().write(expected).build();
+        let mut writer = NixWriter::new(mock);
+        writer.write_value(&value).await.unwrap();
+        writer.flush().await.unwrap();
+    }
+
+    #[rstest]
+    #[case::zero(0, &hex!("0000 0000 0000 0000"))]
+    #[case::one(1, &hex!("0100 0000 0000 0000"))]
+    #[case::other(0x563412, &hex!("1234 5600 0000 0000"))]
+    #[case::max_value(u64::MAX, &hex!("FFFF FFFF FFFF FFFF"))]
+    #[tokio::test]
+    async fn test_write_u64(#[case] value: u64, #[case] expected: &[u8]) {
+        let mock = Builder::new().write(expected).build();
+        let mut writer = NixWriter::new(mock);
+        writer.write_value(&value).await.unwrap();
+        writer.flush().await.unwrap();
+    }
+
+    #[rstest]
+    #[case::zero(0, &hex!("0000 0000 0000 0000"))]
+    #[case::one(1, &hex!("0100 0000 0000 0000"))]
+    #[case::other(0x563412, &hex!("1234 5600 0000 0000"))]
+    #[case::max_value(usize::MAX, &usize::MAX.to_le_bytes())]
+    #[tokio::test]
+    async fn test_write_usize(#[case] value: usize, #[case] expected: &[u8]) {
+        let mock = Builder::new().write(expected).build();
+        let mut writer = NixWriter::new(mock);
+        writer.write_value(&value).await.unwrap();
+        writer.flush().await.unwrap();
+    }
+}
diff --git a/tvix/nix-compat/src/nix_daemon/ser/mock.rs b/tvix/nix-compat/src/nix_daemon/ser/mock.rs
new file mode 100644
index 000000000000..1319a8da3228
--- /dev/null
+++ b/tvix/nix-compat/src/nix_daemon/ser/mock.rs
@@ -0,0 +1,672 @@
+use std::collections::VecDeque;
+use std::fmt;
+use std::io;
+use std::thread;
+
+#[cfg(test)]
+use ::proptest::prelude::TestCaseError;
+use thiserror::Error;
+
+use crate::nix_daemon::ProtocolVersion;
+
+use super::NixWrite;
+
+#[derive(Debug, Error, PartialEq, Eq, Clone)]
+pub enum Error {
+    #[error("custom error '{0}'")]
+    Custom(String),
+    #[error("unsupported data error '{0}'")]
+    UnsupportedData(String),
+    #[error("Invalid enum: {0}")]
+    InvalidEnum(String),
+    #[error("IO error {0} '{1}'")]
+    IO(io::ErrorKind, String),
+    #[error("wrong write: expected {0} got {1}")]
+    WrongWrite(OperationType, OperationType),
+    #[error("unexpected write: got an extra {0}")]
+    ExtraWrite(OperationType),
+    #[error("got an unexpected number {0} in write_number")]
+    UnexpectedNumber(u64),
+    #[error("got an unexpected slice '{0:?}' in write_slice")]
+    UnexpectedSlice(Vec<u8>),
+    #[error("got an unexpected display '{0:?}' in write_slice")]
+    UnexpectedDisplay(String),
+}
+
+impl Error {
+    pub fn unexpected_write_number(expected: OperationType) -> Error {
+        Error::WrongWrite(expected, OperationType::WriteNumber)
+    }
+
+    pub fn extra_write_number() -> Error {
+        Error::ExtraWrite(OperationType::WriteNumber)
+    }
+
+    pub fn unexpected_write_slice(expected: OperationType) -> Error {
+        Error::WrongWrite(expected, OperationType::WriteSlice)
+    }
+
+    pub fn unexpected_write_display(expected: OperationType) -> Error {
+        Error::WrongWrite(expected, OperationType::WriteDisplay)
+    }
+}
+
+impl super::Error for Error {
+    fn custom<T: fmt::Display>(msg: T) -> Self {
+        Self::Custom(msg.to_string())
+    }
+
+    fn io_error(err: std::io::Error) -> Self {
+        Self::IO(err.kind(), err.to_string())
+    }
+
+    fn unsupported_data<T: fmt::Display>(msg: T) -> Self {
+        Self::UnsupportedData(msg.to_string())
+    }
+
+    fn invalid_enum<T: fmt::Display>(msg: T) -> Self {
+        Self::InvalidEnum(msg.to_string())
+    }
+}
+
+#[allow(clippy::enum_variant_names)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum OperationType {
+    WriteNumber,
+    WriteSlice,
+    WriteDisplay,
+}
+
+impl fmt::Display for OperationType {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Self::WriteNumber => write!(f, "write_number"),
+            Self::WriteSlice => write!(f, "write_slice"),
+            Self::WriteDisplay => write!(f, "write_display"),
+        }
+    }
+}
+
+#[allow(clippy::enum_variant_names)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+enum Operation {
+    WriteNumber(u64, Result<(), Error>),
+    WriteSlice(Vec<u8>, Result<(), Error>),
+    WriteDisplay(String, Result<(), Error>),
+}
+
+impl From<Operation> for OperationType {
+    fn from(value: Operation) -> Self {
+        match value {
+            Operation::WriteNumber(_, _) => OperationType::WriteNumber,
+            Operation::WriteSlice(_, _) => OperationType::WriteSlice,
+            Operation::WriteDisplay(_, _) => OperationType::WriteDisplay,
+        }
+    }
+}
+
+pub struct Builder {
+    version: ProtocolVersion,
+    ops: VecDeque<Operation>,
+}
+
+impl Builder {
+    pub fn new() -> Builder {
+        Builder {
+            version: Default::default(),
+            ops: VecDeque::new(),
+        }
+    }
+
+    pub fn version<V: Into<ProtocolVersion>>(&mut self, version: V) -> &mut Self {
+        self.version = version.into();
+        self
+    }
+
+    pub fn write_number(&mut self, value: u64) -> &mut Self {
+        self.ops.push_back(Operation::WriteNumber(value, Ok(())));
+        self
+    }
+
+    pub fn write_number_error(&mut self, value: u64, err: Error) -> &mut Self {
+        self.ops.push_back(Operation::WriteNumber(value, Err(err)));
+        self
+    }
+
+    pub fn write_slice(&mut self, value: &[u8]) -> &mut Self {
+        self.ops
+            .push_back(Operation::WriteSlice(value.to_vec(), Ok(())));
+        self
+    }
+
+    pub fn write_slice_error(&mut self, value: &[u8], err: Error) -> &mut Self {
+        self.ops
+            .push_back(Operation::WriteSlice(value.to_vec(), Err(err)));
+        self
+    }
+
+    pub fn write_display<D>(&mut self, value: D) -> &mut Self
+    where
+        D: fmt::Display,
+    {
+        let msg = value.to_string();
+        self.ops.push_back(Operation::WriteDisplay(msg, Ok(())));
+        self
+    }
+
+    pub fn write_display_error<D>(&mut self, value: D, err: Error) -> &mut Self
+    where
+        D: fmt::Display,
+    {
+        let msg = value.to_string();
+        self.ops.push_back(Operation::WriteDisplay(msg, Err(err)));
+        self
+    }
+
+    #[cfg(test)]
+    fn write_operation_type(&mut self, op: OperationType) -> &mut Self {
+        match op {
+            OperationType::WriteNumber => self.write_number(10),
+            OperationType::WriteSlice => self.write_slice(b"testing"),
+            OperationType::WriteDisplay => self.write_display("testing"),
+        }
+    }
+
+    #[cfg(test)]
+    fn write_operation(&mut self, op: &Operation) -> &mut Self {
+        match op {
+            Operation::WriteNumber(value, Ok(_)) => self.write_number(*value),
+            Operation::WriteNumber(value, Err(Error::UnexpectedNumber(_))) => {
+                self.write_number(*value)
+            }
+            Operation::WriteNumber(_, Err(Error::ExtraWrite(OperationType::WriteNumber))) => self,
+            Operation::WriteNumber(_, Err(Error::WrongWrite(op, OperationType::WriteNumber))) => {
+                self.write_operation_type(*op)
+            }
+            Operation::WriteNumber(value, Err(Error::Custom(msg))) => {
+                self.write_number_error(*value, Error::Custom(msg.clone()))
+            }
+            Operation::WriteNumber(value, Err(Error::IO(kind, msg))) => {
+                self.write_number_error(*value, Error::IO(*kind, msg.clone()))
+            }
+            Operation::WriteSlice(value, Ok(_)) => self.write_slice(value),
+            Operation::WriteSlice(value, Err(Error::UnexpectedSlice(_))) => self.write_slice(value),
+            Operation::WriteSlice(_, Err(Error::ExtraWrite(OperationType::WriteSlice))) => self,
+            Operation::WriteSlice(_, Err(Error::WrongWrite(op, OperationType::WriteSlice))) => {
+                self.write_operation_type(*op)
+            }
+            Operation::WriteSlice(value, Err(Error::Custom(msg))) => {
+                self.write_slice_error(value, Error::Custom(msg.clone()))
+            }
+            Operation::WriteSlice(value, Err(Error::IO(kind, msg))) => {
+                self.write_slice_error(value, Error::IO(*kind, msg.clone()))
+            }
+            Operation::WriteDisplay(value, Ok(_)) => self.write_display(value),
+            Operation::WriteDisplay(value, Err(Error::Custom(msg))) => {
+                self.write_display_error(value, Error::Custom(msg.clone()))
+            }
+            Operation::WriteDisplay(value, Err(Error::IO(kind, msg))) => {
+                self.write_display_error(value, Error::IO(*kind, msg.clone()))
+            }
+            Operation::WriteDisplay(value, Err(Error::UnexpectedDisplay(_))) => {
+                self.write_display(value)
+            }
+            Operation::WriteDisplay(_, Err(Error::ExtraWrite(OperationType::WriteDisplay))) => self,
+            Operation::WriteDisplay(_, Err(Error::WrongWrite(op, OperationType::WriteDisplay))) => {
+                self.write_operation_type(*op)
+            }
+            s => panic!("Invalid operation {:?}", s),
+        }
+    }
+
+    pub fn build(&mut self) -> Mock {
+        Mock {
+            version: self.version,
+            ops: self.ops.clone(),
+        }
+    }
+}
+
+impl Default for Builder {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+pub struct Mock {
+    version: ProtocolVersion,
+    ops: VecDeque<Operation>,
+}
+
+impl Mock {
+    #[cfg(test)]
+    #[allow(dead_code)]
+    async fn assert_operation(&mut self, op: Operation) {
+        match op {
+            Operation::WriteNumber(_, Err(Error::UnexpectedNumber(value))) => {
+                assert_eq!(
+                    self.write_number(value).await,
+                    Err(Error::UnexpectedNumber(value))
+                );
+            }
+            Operation::WriteNumber(value, res) => {
+                assert_eq!(self.write_number(value).await, res);
+            }
+            Operation::WriteSlice(_, ref res @ Err(Error::UnexpectedSlice(ref value))) => {
+                assert_eq!(self.write_slice(value).await, res.clone());
+            }
+            Operation::WriteSlice(value, res) => {
+                assert_eq!(self.write_slice(&value).await, res);
+            }
+            Operation::WriteDisplay(_, ref res @ Err(Error::UnexpectedDisplay(ref value))) => {
+                assert_eq!(self.write_display(value).await, res.clone());
+            }
+            Operation::WriteDisplay(value, res) => {
+                assert_eq!(self.write_display(value).await, res);
+            }
+        }
+    }
+
+    #[cfg(test)]
+    async fn prop_assert_operation(&mut self, op: Operation) -> Result<(), TestCaseError> {
+        use ::proptest::prop_assert_eq;
+
+        match op {
+            Operation::WriteNumber(_, Err(Error::UnexpectedNumber(value))) => {
+                prop_assert_eq!(
+                    self.write_number(value).await,
+                    Err(Error::UnexpectedNumber(value))
+                );
+            }
+            Operation::WriteNumber(value, res) => {
+                prop_assert_eq!(self.write_number(value).await, res);
+            }
+            Operation::WriteSlice(_, ref res @ Err(Error::UnexpectedSlice(ref value))) => {
+                prop_assert_eq!(self.write_slice(value).await, res.clone());
+            }
+            Operation::WriteSlice(value, res) => {
+                prop_assert_eq!(self.write_slice(&value).await, res);
+            }
+            Operation::WriteDisplay(_, ref res @ Err(Error::UnexpectedDisplay(ref value))) => {
+                prop_assert_eq!(self.write_display(&value).await, res.clone());
+            }
+            Operation::WriteDisplay(value, res) => {
+                prop_assert_eq!(self.write_display(&value).await, res);
+            }
+        }
+        Ok(())
+    }
+}
+
+impl NixWrite for Mock {
+    type Error = Error;
+
+    fn version(&self) -> ProtocolVersion {
+        self.version
+    }
+
+    async fn write_number(&mut self, value: u64) -> Result<(), Self::Error> {
+        match self.ops.pop_front() {
+            Some(Operation::WriteNumber(expected, ret)) => {
+                if value != expected {
+                    return Err(Error::UnexpectedNumber(value));
+                }
+                ret
+            }
+            Some(op) => Err(Error::unexpected_write_number(op.into())),
+            _ => Err(Error::ExtraWrite(OperationType::WriteNumber)),
+        }
+    }
+
+    async fn write_slice(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
+        match self.ops.pop_front() {
+            Some(Operation::WriteSlice(expected, ret)) => {
+                if buf != expected {
+                    return Err(Error::UnexpectedSlice(buf.to_vec()));
+                }
+                ret
+            }
+            Some(op) => Err(Error::unexpected_write_slice(op.into())),
+            _ => Err(Error::ExtraWrite(OperationType::WriteSlice)),
+        }
+    }
+
+    async fn write_display<D>(&mut self, msg: D) -> Result<(), Self::Error>
+    where
+        D: fmt::Display + Send,
+        Self: Sized,
+    {
+        let value = msg.to_string();
+        match self.ops.pop_front() {
+            Some(Operation::WriteDisplay(expected, ret)) => {
+                if value != expected {
+                    return Err(Error::UnexpectedDisplay(value));
+                }
+                ret
+            }
+            Some(op) => Err(Error::unexpected_write_display(op.into())),
+            _ => Err(Error::ExtraWrite(OperationType::WriteDisplay)),
+        }
+    }
+}
+
+impl Drop for Mock {
+    fn drop(&mut self) {
+        // No need to panic again
+        if thread::panicking() {
+            return;
+        }
+        if let Some(op) = self.ops.front() {
+            panic!("reader dropped with {op:?} operation still unread")
+        }
+    }
+}
+
+#[cfg(test)]
+mod proptest {
+    use std::io;
+
+    use proptest::{
+        prelude::{any, Arbitrary, BoxedStrategy, Just, Strategy},
+        prop_oneof,
+    };
+
+    use super::{Error, Operation, OperationType};
+
+    pub fn arb_write_number_operation() -> impl Strategy<Value = Operation> {
+        (
+            any::<u64>(),
+            prop_oneof![
+                Just(Ok(())),
+                any::<u64>().prop_map(|v| Err(Error::UnexpectedNumber(v))),
+                Just(Err(Error::WrongWrite(
+                    OperationType::WriteSlice,
+                    OperationType::WriteNumber
+                ))),
+                Just(Err(Error::WrongWrite(
+                    OperationType::WriteDisplay,
+                    OperationType::WriteNumber
+                ))),
+                any::<String>().prop_map(|s| Err(Error::Custom(s))),
+                (any::<io::ErrorKind>(), any::<String>())
+                    .prop_map(|(kind, msg)| Err(Error::IO(kind, msg))),
+            ],
+        )
+            .prop_filter("same number", |(v, res)| match res {
+                Err(Error::UnexpectedNumber(exp_v)) => v != exp_v,
+                _ => true,
+            })
+            .prop_map(|(v, res)| Operation::WriteNumber(v, res))
+    }
+
+    pub fn arb_write_slice_operation() -> impl Strategy<Value = Operation> {
+        (
+            any::<Vec<u8>>(),
+            prop_oneof![
+                Just(Ok(())),
+                any::<Vec<u8>>().prop_map(|v| Err(Error::UnexpectedSlice(v))),
+                Just(Err(Error::WrongWrite(
+                    OperationType::WriteNumber,
+                    OperationType::WriteSlice
+                ))),
+                Just(Err(Error::WrongWrite(
+                    OperationType::WriteDisplay,
+                    OperationType::WriteSlice
+                ))),
+                any::<String>().prop_map(|s| Err(Error::Custom(s))),
+                (any::<io::ErrorKind>(), any::<String>())
+                    .prop_map(|(kind, msg)| Err(Error::IO(kind, msg))),
+            ],
+        )
+            .prop_filter("same slice", |(v, res)| match res {
+                Err(Error::UnexpectedSlice(exp_v)) => v != exp_v,
+                _ => true,
+            })
+            .prop_map(|(v, res)| Operation::WriteSlice(v, res))
+    }
+
+    #[allow(dead_code)]
+    pub fn arb_extra_write() -> impl Strategy<Value = Operation> {
+        prop_oneof![
+            any::<u64>().prop_map(|msg| {
+                Operation::WriteNumber(msg, Err(Error::ExtraWrite(OperationType::WriteNumber)))
+            }),
+            any::<Vec<u8>>().prop_map(|msg| {
+                Operation::WriteSlice(msg, Err(Error::ExtraWrite(OperationType::WriteSlice)))
+            }),
+            any::<String>().prop_map(|msg| {
+                Operation::WriteDisplay(msg, Err(Error::ExtraWrite(OperationType::WriteDisplay)))
+            }),
+        ]
+    }
+
+    pub fn arb_write_display_operation() -> impl Strategy<Value = Operation> {
+        (
+            any::<String>(),
+            prop_oneof![
+                Just(Ok(())),
+                any::<String>().prop_map(|v| Err(Error::UnexpectedDisplay(v))),
+                Just(Err(Error::WrongWrite(
+                    OperationType::WriteNumber,
+                    OperationType::WriteDisplay
+                ))),
+                Just(Err(Error::WrongWrite(
+                    OperationType::WriteSlice,
+                    OperationType::WriteDisplay
+                ))),
+                any::<String>().prop_map(|s| Err(Error::Custom(s))),
+                (any::<io::ErrorKind>(), any::<String>())
+                    .prop_map(|(kind, msg)| Err(Error::IO(kind, msg))),
+            ],
+        )
+            .prop_filter("same string", |(v, res)| match res {
+                Err(Error::UnexpectedDisplay(exp_v)) => v != exp_v,
+                _ => true,
+            })
+            .prop_map(|(v, res)| Operation::WriteDisplay(v, res))
+    }
+
+    pub fn arb_operation() -> impl Strategy<Value = Operation> {
+        prop_oneof![
+            arb_write_number_operation(),
+            arb_write_slice_operation(),
+            arb_write_display_operation(),
+        ]
+    }
+
+    impl Arbitrary for Operation {
+        type Parameters = ();
+        type Strategy = BoxedStrategy<Operation>;
+
+        fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
+            arb_operation().boxed()
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use hex_literal::hex;
+    use proptest::prelude::any;
+    use proptest::prelude::TestCaseError;
+    use proptest::proptest;
+
+    use crate::nix_daemon::ser::mock::proptest::arb_extra_write;
+    use crate::nix_daemon::ser::mock::Operation;
+    use crate::nix_daemon::ser::mock::OperationType;
+    use crate::nix_daemon::ser::Error as _;
+    use crate::nix_daemon::ser::NixWrite;
+
+    use super::{Builder, Error};
+
+    #[tokio::test]
+    async fn write_number() {
+        let mut mock = Builder::new().write_number(10).build();
+        mock.write_number(10).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn write_number_error() {
+        let mut mock = Builder::new()
+            .write_number_error(10, Error::custom("bad number"))
+            .build();
+        assert_eq!(
+            Err(Error::custom("bad number")),
+            mock.write_number(10).await
+        );
+    }
+
+    #[tokio::test]
+    async fn write_number_unexpected() {
+        let mut mock = Builder::new().write_slice(b"").build();
+        assert_eq!(
+            Err(Error::unexpected_write_number(OperationType::WriteSlice)),
+            mock.write_number(11).await
+        );
+    }
+
+    #[tokio::test]
+    async fn write_number_unexpected_number() {
+        let mut mock = Builder::new().write_number(10).build();
+        assert_eq!(
+            Err(Error::UnexpectedNumber(11)),
+            mock.write_number(11).await
+        );
+    }
+
+    #[tokio::test]
+    async fn extra_write_number() {
+        let mut mock = Builder::new().build();
+        assert_eq!(
+            Err(Error::ExtraWrite(OperationType::WriteNumber)),
+            mock.write_number(11).await
+        );
+    }
+
+    #[tokio::test]
+    async fn write_slice() {
+        let mut mock = Builder::new()
+            .write_slice(&[])
+            .write_slice(&hex!("0000 1234 5678 9ABC DEFF"))
+            .build();
+        mock.write_slice(&[]).await.expect("write_slice empty");
+        mock.write_slice(&hex!("0000 1234 5678 9ABC DEFF"))
+            .await
+            .expect("write_slice");
+    }
+
+    #[tokio::test]
+    async fn write_slice_error() {
+        let mut mock = Builder::new()
+            .write_slice_error(&[], Error::custom("bad slice"))
+            .build();
+        assert_eq!(Err(Error::custom("bad slice")), mock.write_slice(&[]).await);
+    }
+
+    #[tokio::test]
+    async fn write_slice_unexpected() {
+        let mut mock = Builder::new().write_number(10).build();
+        assert_eq!(
+            Err(Error::unexpected_write_slice(OperationType::WriteNumber)),
+            mock.write_slice(b"").await
+        );
+    }
+
+    #[tokio::test]
+    async fn write_slice_unexpected_slice() {
+        let mut mock = Builder::new().write_slice(b"").build();
+        assert_eq!(
+            Err(Error::UnexpectedSlice(b"bad slice".to_vec())),
+            mock.write_slice(b"bad slice").await
+        );
+    }
+
+    #[tokio::test]
+    async fn extra_write_slice() {
+        let mut mock = Builder::new().build();
+        assert_eq!(
+            Err(Error::ExtraWrite(OperationType::WriteSlice)),
+            mock.write_slice(b"extra slice").await
+        );
+    }
+
+    #[tokio::test]
+    async fn write_display() {
+        let mut mock = Builder::new().write_display("testing").build();
+        mock.write_display("testing").await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn write_display_error() {
+        let mut mock = Builder::new()
+            .write_display_error("testing", Error::custom("bad number"))
+            .build();
+        assert_eq!(
+            Err(Error::custom("bad number")),
+            mock.write_display("testing").await
+        );
+    }
+
+    #[tokio::test]
+    async fn write_display_unexpected() {
+        let mut mock = Builder::new().write_number(10).build();
+        assert_eq!(
+            Err(Error::unexpected_write_display(OperationType::WriteNumber)),
+            mock.write_display("").await
+        );
+    }
+
+    #[tokio::test]
+    async fn write_display_unexpected_display() {
+        let mut mock = Builder::new().write_display("").build();
+        assert_eq!(
+            Err(Error::UnexpectedDisplay("bad display".to_string())),
+            mock.write_display("bad display").await
+        );
+    }
+
+    #[tokio::test]
+    async fn extra_write_display() {
+        let mut mock = Builder::new().build();
+        assert_eq!(
+            Err(Error::ExtraWrite(OperationType::WriteDisplay)),
+            mock.write_display("extra slice").await
+        );
+    }
+
+    #[test]
+    #[should_panic]
+    fn operations_left() {
+        let _ = Builder::new().write_number(10).build();
+    }
+
+    #[test]
+    fn proptest_mock() {
+        let rt = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        proptest!(|(
+            operations in any::<Vec<Operation>>(),
+            extra_operations in proptest::collection::vec(arb_extra_write(), 0..3)
+            )| {
+            rt.block_on(async {
+                let mut builder = Builder::new();
+                for op in operations.iter() {
+                    builder.write_operation(op);
+                }
+                for op in extra_operations.iter() {
+                    builder.write_operation(op);
+                }
+                let mut mock = builder.build();
+                for op in operations {
+                    mock.prop_assert_operation(op).await?;
+                }
+                for op in extra_operations {
+                    mock.prop_assert_operation(op).await?;
+                }
+                Ok(()) as Result<(), TestCaseError>
+            })?;
+        });
+    }
+}
diff --git a/tvix/nix-compat/src/nix_daemon/ser/mod.rs b/tvix/nix-compat/src/nix_daemon/ser/mod.rs
new file mode 100644
index 000000000000..5860226f39eb
--- /dev/null
+++ b/tvix/nix-compat/src/nix_daemon/ser/mod.rs
@@ -0,0 +1,124 @@
+use std::error::Error as StdError;
+use std::future::Future;
+use std::{fmt, io};
+
+use super::ProtocolVersion;
+
+mod bytes;
+mod collections;
+#[cfg(feature = "nix-compat-derive")]
+mod display;
+mod int;
+#[cfg(any(test, feature = "test"))]
+pub mod mock;
+mod writer;
+
+pub use writer::{NixWriter, NixWriterBuilder};
+
+pub trait Error: Sized + StdError {
+    fn custom<T: fmt::Display>(msg: T) -> Self;
+
+    fn io_error(err: std::io::Error) -> Self {
+        Self::custom(format_args!("There was an I/O error {}", err))
+    }
+
+    fn unsupported_data<T: fmt::Display>(msg: T) -> Self {
+        Self::custom(msg)
+    }
+
+    fn invalid_enum<T: fmt::Display>(msg: T) -> Self {
+        Self::custom(msg)
+    }
+}
+
+impl Error for io::Error {
+    fn custom<T: fmt::Display>(msg: T) -> Self {
+        io::Error::new(io::ErrorKind::Other, msg.to_string())
+    }
+
+    fn io_error(err: std::io::Error) -> Self {
+        err
+    }
+
+    fn unsupported_data<T: fmt::Display>(msg: T) -> Self {
+        io::Error::new(io::ErrorKind::InvalidData, msg.to_string())
+    }
+}
+
+pub trait NixWrite: Send {
+    type Error: Error;
+
+    /// Some types are serialized differently depending on the version
+    /// of the protocol and so this can be used for implementing that.
+    fn version(&self) -> ProtocolVersion;
+
+    /// Write a single u64 to the protocol.
+    fn write_number(&mut self, value: u64) -> impl Future<Output = Result<(), Self::Error>> + Send;
+
+    /// Write a slice of bytes to the protocol.
+    fn write_slice(&mut self, buf: &[u8]) -> impl Future<Output = Result<(), Self::Error>> + Send;
+
+    /// Write a value that implements `std::fmt::Display` to the protocol.
+    /// The protocol uses many small string formats and instead of allocating
+    /// a `String` each time we want to write one an implementation of `NixWrite`
+    /// can instead use `Display` to dump these formats to a reusable buffer.
+    fn write_display<D>(&mut self, msg: D) -> impl Future<Output = Result<(), Self::Error>> + Send
+    where
+        D: fmt::Display + Send,
+        Self: Sized,
+    {
+        async move {
+            let s = msg.to_string();
+            self.write_slice(s.as_bytes()).await
+        }
+    }
+
+    /// Write a value to the protocol.
+    /// Uses `NixSerialize::serialize` to write the value.
+    fn write_value<V>(&mut self, value: &V) -> impl Future<Output = Result<(), Self::Error>> + Send
+    where
+        V: NixSerialize + Send + ?Sized,
+        Self: Sized,
+    {
+        value.serialize(self)
+    }
+}
+
+impl<T: NixWrite> NixWrite for &mut T {
+    type Error = T::Error;
+
+    fn version(&self) -> ProtocolVersion {
+        (**self).version()
+    }
+
+    fn write_number(&mut self, value: u64) -> impl Future<Output = Result<(), Self::Error>> + Send {
+        (**self).write_number(value)
+    }
+
+    fn write_slice(&mut self, buf: &[u8]) -> impl Future<Output = Result<(), Self::Error>> + Send {
+        (**self).write_slice(buf)
+    }
+
+    fn write_display<D>(&mut self, msg: D) -> impl Future<Output = Result<(), Self::Error>> + Send
+    where
+        D: fmt::Display + Send,
+        Self: Sized,
+    {
+        (**self).write_display(msg)
+    }
+
+    fn write_value<V>(&mut self, value: &V) -> impl Future<Output = Result<(), Self::Error>> + Send
+    where
+        V: NixSerialize + Send + ?Sized,
+        Self: Sized,
+    {
+        (**self).write_value(value)
+    }
+}
+
+pub trait NixSerialize {
+    /// Write a value to the writer.
+    fn serialize<W>(&self, writer: &mut W) -> impl Future<Output = Result<(), W::Error>> + Send
+    where
+        W: NixWrite;
+}
diff --git a/tvix/nix-compat/src/nix_daemon/ser/writer.rs b/tvix/nix-compat/src/nix_daemon/ser/writer.rs
new file mode 100644
index 000000000000..87e30580af34
--- /dev/null
+++ b/tvix/nix-compat/src/nix_daemon/ser/writer.rs
@@ -0,0 +1,308 @@
+use std::fmt::{self, Write as _};
+use std::future::poll_fn;
+use std::io::{self, Cursor};
+use std::pin::Pin;
+use std::task::{ready, Context, Poll};
+
+use bytes::{Buf, BufMut, BytesMut};
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncWrite, AsyncWriteExt};
+
+use crate::nix_daemon::ProtocolVersion;
+use crate::wire::padding_len;
+use crate::wire::EMPTY_BYTES;
+
+use super::{Error, NixWrite};
+
+pub struct NixWriterBuilder {
+    buf: Option<BytesMut>,
+    reserved_buf_size: usize,
+    max_buf_size: usize,
+    version: ProtocolVersion,
+}
+
+impl Default for NixWriterBuilder {
+    fn default() -> Self {
+        Self {
+            buf: Default::default(),
+            reserved_buf_size: 8192,
+            max_buf_size: 8192,
+            version: Default::default(),
+        }
+    }
+}
+
+impl NixWriterBuilder {
+    pub fn set_buffer(mut self, buf: BytesMut) -> Self {
+        self.buf = Some(buf);
+        self
+    }
+
+    pub fn set_reserved_buf_size(mut self, size: usize) -> Self {
+        self.reserved_buf_size = size;
+        self
+    }
+
+    pub fn set_max_buf_size(mut self, size: usize) -> Self {
+        self.max_buf_size = size;
+        self
+    }
+
+    pub fn set_version(mut self, version: ProtocolVersion) -> Self {
+        self.version = version;
+        self
+    }
+
+    pub fn build<W>(self, writer: W) -> NixWriter<W> {
+        let buf = self
+            .buf
+            .unwrap_or_else(|| BytesMut::with_capacity(self.max_buf_size));
+        NixWriter {
+            buf,
+            inner: writer,
+            reserved_buf_size: self.reserved_buf_size,
+            max_buf_size: self.max_buf_size,
+            version: self.version,
+        }
+    }
+}
+
+pin_project! {
+    pub struct NixWriter<W> {
+        #[pin]
+        inner: W,
+        buf: BytesMut,
+        reserved_buf_size: usize,
+        max_buf_size: usize,
+        version: ProtocolVersion,
+    }
+}
+
+impl NixWriter<Cursor<Vec<u8>>> {
+    pub fn builder() -> NixWriterBuilder {
+        NixWriterBuilder::default()
+    }
+}
+
+impl<W> NixWriter<W>
+where
+    W: AsyncWriteExt,
+{
+    pub fn new(writer: W) -> NixWriter<W> {
+        NixWriter::builder().build(writer)
+    }
+
+    pub fn buffer(&self) -> &[u8] {
+        &self.buf[..]
+    }
+
+    pub fn set_version(&mut self, version: ProtocolVersion) {
+        self.version = version;
+    }
+
+    /// Remaining capacity in internal buffer
+    pub fn remaining_mut(&self) -> usize {
+        self.buf.capacity() - self.buf.len()
+    }
+
+    fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+        let mut this = self.project();
+        while !this.buf.is_empty() {
+            let n = ready!(this.inner.as_mut().poll_write(cx, &this.buf[..]))?;
+            if n == 0 {
+                return Poll::Ready(Err(io::Error::new(
+                    io::ErrorKind::WriteZero,
+                    "failed to write the buffer",
+                )));
+            }
+            this.buf.advance(n);
+        }
+        Poll::Ready(Ok(()))
+    }
+}
+
+impl<W> NixWriter<W>
+where
+    W: AsyncWriteExt + Unpin,
+{
+    async fn flush_buf(&mut self) -> Result<(), io::Error> {
+        let mut s = Pin::new(self);
+        poll_fn(move |cx| s.as_mut().poll_flush_buf(cx)).await
+    }
+}
+
+impl<W> AsyncWrite for NixWriter<W>
+where
+    W: AsyncWrite,
+{
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<Result<usize, io::Error>> {
+        // Flush
+        if self.remaining_mut() < buf.len() {
+            ready!(self.as_mut().poll_flush_buf(cx))?;
+        }
+        let this = self.project();
+        if buf.len() > this.buf.capacity() {
+            this.inner.poll_write(cx, buf)
+        } else {
+            this.buf.put_slice(buf);
+            Poll::Ready(Ok(buf.len()))
+        }
+    }
+
+    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+        ready!(self.as_mut().poll_flush_buf(cx))?;
+        self.project().inner.poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), io::Error>> {
+        ready!(self.as_mut().poll_flush_buf(cx))?;
+        self.project().inner.poll_shutdown(cx)
+    }
+}
+
+impl<W> NixWrite for NixWriter<W>
+where
+    W: AsyncWrite + Send + Unpin,
+{
+    type Error = io::Error;
+
+    fn version(&self) -> ProtocolVersion {
+        self.version
+    }
+
+    async fn write_number(&mut self, value: u64) -> Result<(), Self::Error> {
+        let mut buf = [0u8; 8];
+        BufMut::put_u64_le(&mut &mut buf[..], value);
+        self.write_all(&buf).await
+    }
+
+    async fn write_slice(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
+        let padding = padding_len(buf.len() as u64) as usize;
+        self.write_value(&buf.len()).await?;
+        self.write_all(buf).await?;
+        if padding > 0 {
+            self.write_all(&EMPTY_BYTES[..padding]).await
+        } else {
+            Ok(())
+        }
+    }
+
+    async fn write_display<D>(&mut self, msg: D) -> Result<(), Self::Error>
+    where
+        D: fmt::Display + Send,
+        Self: Sized,
+    {
+        // Ensure that buffer has space for at least reserved_buf_size bytes
+        if self.remaining_mut() < self.reserved_buf_size && !self.buf.is_empty() {
+            self.flush_buf().await?;
+        }
+        let offset = self.buf.len();
+        self.buf.put_u64_le(0);
+        if let Err(err) = write!(self.buf, "{}", msg) {
+            self.buf.truncate(offset);
+            return Err(Self::Error::unsupported_data(err));
+        }
+        let len = self.buf.len() - offset - 8;
+        BufMut::put_u64_le(&mut &mut self.buf[offset..(offset + 8)], len as u64);
+        let padding = padding_len(len as u64) as usize;
+        self.write_all(&EMPTY_BYTES[..padding]).await
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::time::Duration;
+
+    use hex_literal::hex;
+    use rstest::rstest;
+    use tokio::io::AsyncWriteExt as _;
+    use tokio_test::io::Builder;
+
+    use crate::nix_daemon::ser::NixWrite;
+
+    use super::NixWriter;
+
+    #[rstest]
+    #[case(1, &hex!("0100 0000 0000 0000"))]
+    #[case::evil(666, &hex!("9A02 0000 0000 0000"))]
+    #[case::max(u64::MAX, &hex!("FFFF FFFF FFFF FFFF"))]
+    #[tokio::test]
+    async fn test_write_number(#[case] number: u64, #[case] buf: &[u8]) {
+        let mock = Builder::new().write(buf).build();
+        let mut writer = NixWriter::new(mock);
+
+        writer.write_number(number).await.unwrap();
+        assert_eq!(writer.buffer(), buf);
+        writer.flush().await.unwrap();
+        assert_eq!(writer.buffer(), b"");
+    }
+
+    #[rstest]
+    #[case::empty(b"", &hex!("0000 0000 0000 0000"))]
+    #[case::one(b")", &hex!("0100 0000 0000 0000 2900 0000 0000 0000"))]
+    #[case::two(b"it", &hex!("0200 0000 0000 0000 6974 0000 0000 0000"))]
+    #[case::three(b"tea", &hex!("0300 0000 0000 0000 7465 6100 0000 0000"))]
+    #[case::four(b"were", &hex!("0400 0000 0000 0000 7765 7265 0000 0000"))]
+    #[case::five(b"where", &hex!("0500 0000 0000 0000 7768 6572 6500 0000"))]
+    #[case::six(b"unwrap", &hex!("0600 0000 0000 0000 756E 7772 6170 0000"))]
+    #[case::seven(b"where's", &hex!("0700 0000 0000 0000 7768 6572 6527 7300"))]
+    #[case::aligned(b"read_tea", &hex!("0800 0000 0000 0000 7265 6164 5F74 6561"))]
+    #[case::more_bytes(b"read_tess", &hex!("0900 0000 0000 0000 7265 6164 5F74 6573 7300 0000 0000 0000"))]
+    #[tokio::test]
+    async fn test_write_slice(
+        #[case] value: &[u8],
+        #[case] buf: &[u8],
+        #[values(1, 2, 3, 4, 5, 6, 7, 8, 9, 1024)] chunks_size: usize,
+        #[values(1, 2, 3, 4, 5, 6, 7, 8, 9, 1024)] buf_size: usize,
+    ) {
+        let mut builder = Builder::new();
+        for chunk in buf.chunks(chunks_size) {
+            builder.write(chunk);
+            builder.wait(Duration::ZERO);
+        }
+        let mock = builder.build();
+        let mut writer = NixWriter::builder().set_max_buf_size(buf_size).build(mock);
+
+        writer.write_slice(value).await.unwrap();
+        writer.flush().await.unwrap();
+        assert_eq!(writer.buffer(), b"");
+    }
+
+    #[rstest]
+    #[case::empty("", &hex!("0000 0000 0000 0000"))]
+    #[case::one(")", &hex!("0100 0000 0000 0000 2900 0000 0000 0000"))]
+    #[case::two("it", &hex!("0200 0000 0000 0000 6974 0000 0000 0000"))]
+    #[case::three("tea", &hex!("0300 0000 0000 0000 7465 6100 0000 0000"))]
+    #[case::four("were", &hex!("0400 0000 0000 0000 7765 7265 0000 0000"))]
+    #[case::five("where", &hex!("0500 0000 0000 0000 7768 6572 6500 0000"))]
+    #[case::six("unwrap", &hex!("0600 0000 0000 0000 756E 7772 6170 0000"))]
+    #[case::seven("where's", &hex!("0700 0000 0000 0000 7768 6572 6527 7300"))]
+    #[case::aligned("read_tea", &hex!("0800 0000 0000 0000 7265 6164 5F74 6561"))]
+    #[case::more_bytes("read_tess", &hex!("0900 0000 0000 0000 7265 6164 5F74 6573 7300 0000 0000 0000"))]
+    #[tokio::test]
+    async fn test_write_display(
+        #[case] value: &str,
+        #[case] buf: &[u8],
+        #[values(1, 2, 3, 4, 5, 6, 7, 8, 9, 1024)] chunks_size: usize,
+    ) {
+        let mut builder = Builder::new();
+        for chunk in buf.chunks(chunks_size) {
+            builder.write(chunk);
+            builder.wait(Duration::ZERO);
+        }
+        let mock = builder.build();
+        let mut writer = NixWriter::builder().build(mock);
+
+        writer.write_display(value).await.unwrap();
+        assert_eq!(writer.buffer(), buf);
+        writer.flush().await.unwrap();
+        assert_eq!(writer.buffer(), b"");
+    }
+}
diff --git a/tvix/nix-compat/src/wire/bytes/mod.rs b/tvix/nix-compat/src/wire/bytes/mod.rs
index 74adfb49b6a4..9b981fbbd2c0 100644
--- a/tvix/nix-compat/src/wire/bytes/mod.rs
+++ b/tvix/nix-compat/src/wire/bytes/mod.rs
@@ -181,7 +181,7 @@ pub async fn write_bytes<W: AsyncWriteExt + Unpin, B: AsRef<[u8]>>(
 
 /// Computes the number of bytes we should add to len (a length in
 /// bytes) to be aligned on 64 bits (8 bytes).
-fn padding_len(len: u64) -> u8 {
+pub(crate) fn padding_len(len: u64) -> u8 {
     let aligned = len.wrapping_add(7) & !7;
     aligned.wrapping_sub(len) as u8
 }