diff options
author | edef <edef@edef.eu> | 2024-04-29T16·44+0000 |
---|---|---|
committer | edef <edef@edef.eu> | 2024-05-04T21·45+0000 |
commit | 08feea4817a5827db478d4202e58290d547f8cde (patch) | |
tree | 2404ac04900e414f1218e8ebd2a1c29d5d62caab /tvix/nix-compat/src/nar/reader/async/mod.rs | |
parent | 343e176bec6251b53006331e752c285d50c7167f (diff) |
feat(nix-compat/nar/reader): async support r/8075
This is a first cut at the async NAR reader, with some rough edges. Poisoning is left unimplemented for now, pending future work. Change-Id: Ifaafe0581a5e0e165a13357b909fb441f7bd8bab Reviewed-on: https://cl.tvl.fyi/c/depot/+/11524 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/nix-compat/src/nar/reader/async/mod.rs')
-rw-r--r-- | tvix/nix-compat/src/nar/reader/async/mod.rs | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/tvix/nix-compat/src/nar/reader/async/mod.rs b/tvix/nix-compat/src/nar/reader/async/mod.rs new file mode 100644 index 000000000000..aaf00faf4441 --- /dev/null +++ b/tvix/nix-compat/src/nar/reader/async/mod.rs @@ -0,0 +1,166 @@ +use std::{ + pin::Pin, + task::{self, Poll}, +}; + +use tokio::io::{self, AsyncBufRead, AsyncRead, ErrorKind::InvalidData}; + +// Required reading for understanding this module. +use crate::{ + nar::{self, wire::PadPar}, + wire::{self, BytesReader}, +}; + +mod read; +#[cfg(test)] +mod test; + +pub type Reader<'a> = dyn AsyncBufRead + Unpin + Send + 'a; + +/// Start reading a NAR file from `reader`. +pub async fn open<'a, 'r>(reader: &'a mut Reader<'r>) -> io::Result<Node<'a, 'r>> { + read::token(reader, &nar::wire::TOK_NAR).await?; + Node::new(reader).await +} + +pub enum Node<'a, 'r: 'a> { + Symlink { + target: Vec<u8>, + }, + File { + executable: bool, + reader: FileReader<'a, 'r>, + }, + Directory(DirReader<'a, 'r>), +} + +impl<'a, 'r: 'a> Node<'a, 'r> { + /// Start reading a [Node], matching the next [wire::Node]. + /// + /// Reading the terminating [wire::TOK_PAR] is done immediately for [Node::Symlink], + /// but is otherwise left to [DirReader] or [BytesReader]. + async fn new(reader: &'a mut Reader<'r>) -> io::Result<Self> { + Ok(match read::tag(reader).await? { + nar::wire::Node::Sym => { + let target = wire::read_bytes(reader, 1..=nar::wire::MAX_TARGET_LEN).await?; + + if target.contains(&0) { + return Err(InvalidData.into()); + } + + read::token(reader, &nar::wire::TOK_PAR).await?; + + Node::Symlink { target } + } + tag @ (nar::wire::Node::Reg | nar::wire::Node::Exe) => Node::File { + executable: tag == nar::wire::Node::Exe, + reader: FileReader { + inner: BytesReader::new_internal(reader, ..).await?, + }, + }, + nar::wire::Node::Dir => Node::Directory(DirReader::new(reader)), + }) + } +} + +/// File contents, readable through the [AsyncRead] trait. +/// +/// It comes with some caveats: +/// * You must always read the entire file, unless you intend to abandon the entire archive reader. +/// * You must abandon the entire archive reader upon the first error. +/// +/// It's fine to read exactly `reader.len()` bytes without ever seeing an explicit EOF. +pub struct FileReader<'a, 'r> { + inner: BytesReader<&'a mut Reader<'r>, PadPar>, +} + +impl<'a, 'r> FileReader<'a, 'r> { + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn len(&self) -> u64 { + self.inner.len() + } +} + +impl<'a, 'r> AsyncRead for FileReader<'a, 'r> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context, + buf: &mut io::ReadBuf, + ) -> Poll<io::Result<()>> { + Pin::new(&mut self.get_mut().inner).poll_read(cx, buf) + } +} + +/// A directory iterator, yielding a sequence of [Node]s. +/// It must be fully consumed before reading further from the [DirReader] that produced it, if any. +pub struct DirReader<'a, 'r> { + reader: &'a mut Reader<'r>, + /// Previous directory entry name. + /// We have to hang onto this to enforce name monotonicity. + prev_name: Option<Vec<u8>>, +} + +pub struct Entry<'a, 'r> { + pub name: Vec<u8>, + pub node: Node<'a, 'r>, +} + +impl<'a, 'r> DirReader<'a, 'r> { + fn new(reader: &'a mut Reader<'r>) -> Self { + Self { + reader, + prev_name: None, + } + } + + /// Read the next [Entry] from the directory. + /// + /// We explicitly don't implement [Iterator], since treating this as + /// a regular Rust iterator will surely lead you astray. + /// + /// * You must always consume the entire iterator, unless you abandon the entire archive reader. + /// * You must abandon the entire archive reader on the first error. + /// * You must abandon the directory reader upon the first [None]. + /// * Even if you know the amount of elements up front, you must keep reading until you encounter [None]. + pub async fn next(&mut self) -> io::Result<Option<Entry<'_, 'r>>> { + // COME FROM the previous iteration: if we've already read an entry, + // read its terminating TOK_PAR here. + if self.prev_name.is_some() { + read::token(self.reader, &nar::wire::TOK_PAR).await?; + } + + if let nar::wire::Entry::None = read::tag(self.reader).await? { + return Ok(None); + } + + let name = wire::read_bytes(self.reader, 1..=nar::wire::MAX_NAME_LEN).await?; + + if name.contains(&0) || name.contains(&b'/') || name == b"." || name == b".." { + return Err(InvalidData.into()); + } + + // Enforce strict monotonicity of directory entry names. + match &mut self.prev_name { + None => { + self.prev_name = Some(name.clone()); + } + Some(prev_name) => { + if *prev_name >= name { + return Err(InvalidData.into()); + } + + name[..].clone_into(prev_name); + } + } + + read::token(self.reader, &nar::wire::TOK_NOD).await?; + + Ok(Some(Entry { + name, + node: Node::new(self.reader).await?, + })) + } +} |