about summary refs log tree commit diff
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2024-05-08T08·02+0000
committeredef <edef@edef.eu>2024-05-08T15·30+0000
commit17a7dac94f29d8151ecebfbf11d5b83cf0c4f415 (patch)
tree37dfb357b6d7b5a1f9695cb0a5518dbc22f26988
parentb68ef475ee551c8efae41d4b901fd9648f9b8570 (diff)
feat(nix-compat/wire/bytes): read bytes into an existing buffer r/8094
For small bytestrings (like NAR names), we can read into a preallocated
fixed-size buffer, instead of allocating a Vec every time.

Change-Id: Id8da9e9cea99c814361230c0ec02606b731c79a3
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11606
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
-rw-r--r--tvix/nix-compat/src/wire/bytes/mod.rs61
1 files changed, 60 insertions, 1 deletions
diff --git a/tvix/nix-compat/src/wire/bytes/mod.rs b/tvix/nix-compat/src/wire/bytes/mod.rs
index db794b810f35..f48b5000a51d 100644
--- a/tvix/nix-compat/src/wire/bytes/mod.rs
+++ b/tvix/nix-compat/src/wire/bytes/mod.rs
@@ -1,8 +1,9 @@
 use std::{
     io::{Error, ErrorKind},
+    mem::MaybeUninit,
     ops::RangeInclusive,
 };
-use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{self, AsyncReadExt, AsyncWriteExt, ReadBuf};
 
 pub(crate) mod reader;
 pub use reader::BytesReader;
@@ -81,6 +82,64 @@ where
     Ok(buf)
 }
 
+#[allow(dead_code)]
+pub(crate) async fn read_bytes_buf<'a, const N: usize, R: ?Sized>(
+    reader: &mut R,
+    buf: &'a mut [MaybeUninit<u8>; N],
+    allowed_size: RangeInclusive<usize>,
+) -> io::Result<&'a [u8]>
+where
+    R: AsyncReadExt + Unpin,
+{
+    assert_eq!(N % 8, 0);
+    assert!(*allowed_size.end() <= N);
+
+    let len = reader.read_u64_le().await?;
+    let len: usize = len
+        .try_into()
+        .ok()
+        .filter(|len| allowed_size.contains(len))
+        .ok_or_else(|| {
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "signalled package size not in allowed range",
+            )
+        })?;
+
+    let buf_len = (len + 7) & !7;
+    let buf = {
+        let mut read_buf = ReadBuf::uninit(&mut buf[..buf_len]);
+
+        while read_buf.filled().len() < buf_len {
+            reader.read_buf(&mut read_buf).await?;
+        }
+
+        // ReadBuf::filled does not pass the underlying buffer's lifetime through,
+        // so we must make a trip to hell.
+        //
+        // SAFETY: `read_buf` is filled up to `buf_len`, and we verify that it is
+        // still pointing at the same underlying buffer.
+        unsafe {
+            assert_eq!(read_buf.filled().as_ptr(), buf.as_ptr() as *const u8);
+            assume_init_bytes(&buf[..buf_len])
+        }
+    };
+
+    if buf[len..buf_len].iter().any(|&b| b != 0) {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidData,
+            "padding is not all zeroes",
+        ));
+    }
+
+    Ok(&buf[..len])
+}
+
+/// SAFETY: The bytes have to actually be initialized.
+unsafe fn assume_init_bytes(slice: &[MaybeUninit<u8>]) -> &[u8] {
+    &*(slice as *const [MaybeUninit<u8>] as *const [u8])
+}
+
 /// Read a "bytes wire packet" of from the AsyncRead and tries to parse as string.
 /// Internally uses [read_bytes].
 /// Rejects reading more than `allowed_size` bytes of payload.