about summary refs log tree commit diff
path: root/tvix/nix-compat/src/nar/reader/async/read.rs
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2024-04-29T16·44+0000
committeredef <edef@edef.eu>2024-05-04T21·45+0000
commit08feea4817a5827db478d4202e58290d547f8cde (patch)
tree2404ac04900e414f1218e8ebd2a1c29d5d62caab /tvix/nix-compat/src/nar/reader/async/read.rs
parent343e176bec6251b53006331e752c285d50c7167f (diff)
feat(nix-compat/nar/reader): async support r/8075
This is a first cut at the async NAR reader, with some rough edges.
Poisoning is left unimplemented for now, pending future work.

Change-Id: Ifaafe0581a5e0e165a13357b909fb441f7bd8bab
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11524
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/nix-compat/src/nar/reader/async/read.rs')
-rw-r--r--tvix/nix-compat/src/nar/reader/async/read.rs69
1 files changed, 69 insertions, 0 deletions
diff --git a/tvix/nix-compat/src/nar/reader/async/read.rs b/tvix/nix-compat/src/nar/reader/async/read.rs
new file mode 100644
index 000000000000..2adf894922c5
--- /dev/null
+++ b/tvix/nix-compat/src/nar/reader/async/read.rs
@@ -0,0 +1,69 @@
+use tokio::io::{
+    self, AsyncReadExt,
+    ErrorKind::{InvalidData, UnexpectedEof},
+};
+
+use crate::nar::wire::Tag;
+
+use super::Reader;
+
+/// Consume a known token from the reader.
+pub async fn token<const N: usize>(reader: &mut Reader<'_>, token: &[u8; N]) -> io::Result<()> {
+    let mut buf = [0u8; N];
+
+    // This implements something similar to [AsyncReadExt::read_exact], but verifies that
+    // the input data matches the token while we read it. These two slices respectively
+    // represent the remaining token to be verified, and the remaining input buffer.
+    let mut token = &token[..];
+    let mut buf = &mut buf[..];
+
+    while !token.is_empty() {
+        match reader.read(buf).await? {
+            0 => {
+                return Err(UnexpectedEof.into());
+            }
+            n => {
+                let (t, b);
+                (t, token) = token.split_at(n);
+                (b, buf) = buf.split_at_mut(n);
+
+                if t != b {
+                    return Err(InvalidData.into());
+                }
+            }
+        }
+    }
+
+    Ok(())
+}
+
+/// Consume a [Tag] from the reader.
+pub async fn tag<T: Tag>(reader: &mut Reader<'_>) -> io::Result<T> {
+    let mut buf = T::make_buf();
+    let buf = buf.as_mut();
+
+    // first read the known minimum length…
+    reader.read_exact(&mut buf[..T::MIN]).await?;
+
+    // then decide which tag we're expecting
+    let tag = T::from_u8(buf[T::OFF]).ok_or(InvalidData)?;
+    let (head, tail) = tag.as_bytes().split_at(T::MIN);
+
+    // make sure what we've read so far is valid
+    if buf[..T::MIN] != *head {
+        return Err(InvalidData.into());
+    }
+
+    // …then read the rest, if any
+    if !tail.is_empty() {
+        let rest = tail.len();
+        reader.read_exact(&mut buf[..rest]).await?;
+
+        // and make sure it's what we expect
+        if buf[..rest] != *tail {
+            return Err(InvalidData.into());
+        }
+    }
+
+    Ok(tag)
+}