diff options
Diffstat (limited to 'tvix/nix-compat/src/wire')
-rw-r--r-- | tvix/nix-compat/src/wire/bytes/reader/mod.rs (renamed from tvix/nix-compat/src/wire/bytes/reader.rs) | 320 | ||||
-rw-r--r-- | tvix/nix-compat/src/wire/bytes/reader/trailer.rs | 175 |
2 files changed, 317 insertions, 178 deletions
diff --git a/tvix/nix-compat/src/wire/bytes/reader.rs b/tvix/nix-compat/src/wire/bytes/reader/mod.rs index 18a8c6c686f0..8d4eab78f370 100644 --- a/tvix/nix-compat/src/wire/bytes/reader.rs +++ b/tvix/nix-compat/src/wire/bytes/reader/mod.rs @@ -1,12 +1,13 @@ use std::{ io, - ops::{Bound, RangeBounds, RangeInclusive}, + ops::{Bound, RangeBounds}, pin::Pin, task::{self, ready, Poll}, }; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, ReadBuf}; -use super::{padding_len, BytesPacketPosition, LEN_SIZE}; +use trailer::TrailerReader; +mod trailer; /// Reads a "bytes wire packet" from the underlying reader. /// The format is the same as in [crate::wire::bytes::read_bytes], @@ -19,21 +20,34 @@ use super::{padding_len, BytesPacketPosition, LEN_SIZE}; /// /// Internally, it will first read over the size packet, filling payload_size, /// ensuring it fits allowed_size, then return payload data. -/// It will only signal EOF (returning `Ok(())` without filling the buffer anymore) -/// when all padding has been successfully consumed too. /// -/// This also means, it's important for a user to always read to the end, -/// and not just call read_exact - otherwise it might not skip over the -/// padding, and return garbage when reading the next packet. +/// It will not return the final bytes before all padding has been successfully +/// consumed as well, but the full length of the reader must be consumed. /// /// In case of an error due to size constraints, or in case of not reading /// all the way to the end (and getting a EOF), the underlying reader is no /// longer usable and might return garbage. pub struct BytesReader<R> { - inner: R, - allowed_size: RangeInclusive<u64>, - payload_size: [u8; 8], - state: BytesPacketPosition, + state: State<R>, +} + +#[derive(Debug)] +enum State<R> { + Size { + reader: Option<R>, + /// Minimum length (inclusive) + user_len_min: u64, + /// Maximum length (inclusive) + user_len_max: u64, + filled: u8, + buf: [u8; 8], + }, + Body { + reader: Option<R>, + consumed: u64, + user_len: u64, + }, + Trailer(TrailerReader<R>), } impl<R> BytesReader<R> @@ -41,7 +55,7 @@ where R: AsyncRead + Unpin, { /// Constructs a new BytesReader, using the underlying passed reader. - pub fn new<S: RangeBounds<u64>>(r: R, allowed_size: S) -> Self { + pub fn new<S: RangeBounds<u64>>(reader: R, allowed_size: S) -> Self { let user_len_min = match allowed_size.start_bound() { Bound::Included(&n) => n, Bound::Excluded(&n) => n.saturating_add(1), @@ -55,181 +69,148 @@ where }; Self { - inner: r, - allowed_size: user_len_min..=user_len_max, - payload_size: [0; 8], - state: BytesPacketPosition::Size(0), + state: State::Size { + reader: Some(reader), + user_len_min, + user_len_max, + filled: 0, + buf: [0; 8], + }, } } /// Construct a new BytesReader with a known, and already-read size. - pub fn with_size(r: R, size: u64) -> Self { + pub fn with_size(reader: R, size: u64) -> Self { Self { - inner: r, - allowed_size: size..=size, - payload_size: u64::to_le_bytes(size), - state: if size != 0 { - BytesPacketPosition::Payload(0) - } else { - BytesPacketPosition::Padding(0) + state: State::Body { + reader: Some(reader), + consumed: 0, + user_len: size, }, } } } -/// Returns an error if the passed usize is 0. -#[inline] -fn ensure_nonzero_bytes_read(bytes_read: usize) -> Result<usize, io::Error> { - if bytes_read == 0 { - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "underlying reader returned EOF", - )) - } else { - Ok(bytes_read) - } -} -impl<R> AsyncRead for BytesReader<R> -where - R: AsyncRead + Unpin, -{ +impl<R: AsyncRead + Unpin> AsyncRead for BytesReader<R> { fn poll_read( self: Pin<&mut Self>, cx: &mut task::Context, - buf: &mut tokio::io::ReadBuf, + buf: &mut ReadBuf, ) -> Poll<io::Result<()>> { - let this = self.get_mut(); + let this = &mut self.get_mut().state; - // Use a loop, so we can deal with (multiple) state transitions. loop { - match this.state { - BytesPacketPosition::Size(LEN_SIZE) => { - // used in case an invalid size was signalled. - Err(io::Error::new( - io::ErrorKind::InvalidData, - "signalled package size not in allowed range", - ))? - } - BytesPacketPosition::Size(pos) => { - // try to read more of the size field. - // We wrap a ReadBuf around this.payload_size here, and set_filled. - let mut read_buf = tokio::io::ReadBuf::new(&mut this.payload_size); - read_buf.advance(pos); - ready!(Pin::new(&mut this.inner).poll_read(cx, &mut read_buf))?; - - ensure_nonzero_bytes_read(read_buf.filled().len() - pos)?; - - let total_size_read = read_buf.filled().len(); - if total_size_read == LEN_SIZE { - // If the entire payload size was read, parse it - let payload_size = u64::from_le_bytes(this.payload_size); - - if !this.allowed_size.contains(&payload_size) { - // If it's not in the allowed - // range, transition to failure mode - // `BytesPacketPosition::Size(LEN_SIZE)`, where only - // an error is returned. - this.state = BytesPacketPosition::Size(LEN_SIZE) - } else if payload_size == 0 { - // If the payload size is 0, move on to reading padding directly. - this.state = BytesPacketPosition::Padding(0) - } else { - // Else, transition to reading the payload. - this.state = BytesPacketPosition::Payload(0) - } - } else { - // If we still need to read more of payload size, update - // our position in the state. - this.state = BytesPacketPosition::Size(total_size_read) + match this { + State::Size { + reader, + user_len_min, + user_len_max, + filled: 8, + buf, + } => { + let reader = reader.take().unwrap(); + + let data_len = u64::from_le_bytes(*buf); + if data_len < *user_len_min || data_len > *user_len_max { + return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid size")) + .into(); } + + *this = State::Body { + reader: Some(reader), + consumed: 0, + user_len: data_len, + }; } - BytesPacketPosition::Payload(pos) => { - let signalled_size = u64::from_le_bytes(this.payload_size); - // We don't enter this match arm at all if we're expecting empty payload - debug_assert!(signalled_size > 0, "signalled size must be larger than 0"); - - // Read from the underlying reader into buf - // We cap the ReadBuf to the size of the payload, as we - // don't want to leak padding to the caller. - let bytes_read = ensure_nonzero_bytes_read({ - // Reducing these two u64 to usize on 32bits is fine - we - // only care about not reading too much, not too less. - let mut limited_buf = buf.take((signalled_size - pos) as usize); - ready!(Pin::new(&mut this.inner).poll_read(cx, &mut limited_buf))?; - limited_buf.filled().len() - })?; - - // SAFETY: we just did populate this, but through limited_buf. - unsafe { buf.assume_init(bytes_read) } - buf.advance(bytes_read); - - if pos + bytes_read as u64 == signalled_size { - // If we now read all payload, transition to padding - // state. - this.state = BytesPacketPosition::Padding(0); - } else { - // if we didn't read everything yet, update our position - // in the state. - this.state = BytesPacketPosition::Payload(pos + bytes_read as u64); + State::Size { + reader, + filled, + buf, + .. + } => { + let reader = reader.as_mut().unwrap(); + + let mut read_buf = ReadBuf::new(&mut buf[..]); + read_buf.advance(*filled as usize); + ready!(Pin::new(reader).poll_read(cx, &mut read_buf))?; + + let new_filled = read_buf.filled().len() as u8; + if *filled == new_filled { + return Err(io::ErrorKind::UnexpectedEof.into()).into(); } - // We return from poll_read here. - // This is important, as any error (or even Pending) from - // the underlying reader on the next read (be it padding or - // payload) would require us to roll back buf, as generally - // a AsyncRead::poll_read may not advance the buffer in case - // of a nonsuccessful read. - // It can't be misinterpreted as EOF, as we definitely *did* - // write something into buf if we come to here (we pass - // `ensure_nonzero_bytes_read`). - return Ok(()).into(); + *filled = new_filled; } - BytesPacketPosition::Padding(pos) => { - // Consume whatever padding is left, ensuring it's all null - // bytes. Only return `Ready(Ok(()))` once we're past the - // padding (or in cases where polling the inner reader - // returns `Poll::Pending`). - let signalled_size = u64::from_le_bytes(this.payload_size); - let total_padding_len = padding_len(signalled_size) as usize; - - let padding_len_remaining = total_padding_len - pos; - if padding_len_remaining != 0 { - // create a buffer only accepting the number of remaining padding bytes. - let mut buf = [0; 8]; - let mut padding_buf = tokio::io::ReadBuf::new(&mut buf); - let mut padding_buf = padding_buf.take(padding_len_remaining); - - // read into padding_buf. - ready!(Pin::new(&mut this.inner).poll_read(cx, &mut padding_buf))?; - let bytes_read = ensure_nonzero_bytes_read(padding_buf.filled().len())?; - - this.state = BytesPacketPosition::Padding(pos + bytes_read); - - // ensure the bytes are not null bytes - if !padding_buf.filled().iter().all(|e| *e == b'\0') { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "padding is not all zeroes", - )) - .into(); - } - - // if we still have padding to read, run the loop again. + State::Body { + reader, + consumed, + user_len, + } => { + let body_len = *user_len & !7; + let remaining = body_len - *consumed; + + let reader = if remaining == 0 { + let reader = reader.take().unwrap(); + let user_len = (*user_len & 7) as u8; + *this = State::Trailer(TrailerReader::new(reader, user_len)); continue; + } else { + reader.as_mut().unwrap() + }; + + let mut bytes_read = 0; + ready!(with_limited(buf, remaining, |buf| { + let ret = Pin::new(reader).poll_read(cx, buf); + bytes_read = buf.initialized().len(); + ret + }))?; + + *consumed += bytes_read as u64; + + return if bytes_read != 0 { + Ok(()) + } else { + Err(io::ErrorKind::UnexpectedEof.into()) } - // return EOF - return Ok(()).into(); + .into(); + } + State::Trailer(reader) => { + return Pin::new(reader).poll_read(cx, buf); } } } } } +/// Make a limited version of `buf`, consisting only of up to `n` bytes of the unfilled section, and call `f` with it. +/// After `f` returns, we propagate the filled cursor advancement back to `buf`. +fn with_limited<R>(buf: &mut ReadBuf, n: u64, f: impl FnOnce(&mut ReadBuf) -> R) -> R { + let mut nbuf = buf.take(n.try_into().unwrap_or(usize::MAX)); + let ptr = nbuf.initialized().as_ptr(); + let ret = f(&mut nbuf); + + // SAFETY: `ReadBuf::take` only returns the *unfilled* section of `buf`, + // so anything filled is new, initialized data. + // + // We verify that `nbuf` still points to the same buffer, + // so we're sure it hasn't been swapped out. + unsafe { + // ensure our buffer hasn't been swapped out + assert_eq!(nbuf.initialized().as_ptr(), ptr); + + let n = nbuf.filled().len(); + buf.assume_init(n); + buf.advance(n); + } + + ret +} + #[cfg(test)] mod tests { use std::time::Duration; - use crate::wire::bytes::write_bytes; + use crate::wire::bytes::{padding_len, write_bytes}; use hex_literal::hex; use lazy_static::lazy_static; use rstest::rstest; @@ -390,13 +371,12 @@ mod tests { ); } - /// Start a 9 bytes payload packet, but return an error at various stages *after* the actual payload. - /// read_exact with a 9 bytes buffer is expected to succeed, but any further - /// read, as well as read_to_end are expected to fail. + /// Start a 9 bytes payload packet, but don't supply the necessary padding. + /// This is expected to always fail before returning the final data. #[rstest] #[case::before_padding(8 + 9)] #[case::during_padding(8 + 9 + 2)] - #[case::after_padding(8 + 9 + padding_len(9) as usize)] + #[case::after_padding(8 + 9 + padding_len(9) as usize - 1)] #[tokio::test] async fn read_9b_eof_after_payload(#[case] offset: usize) { let payload = &hex!("FF0102030405060708"); @@ -405,28 +385,12 @@ mod tests { .build(); let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = [0; 9]; - // read_exact of the payload will succeed, but a subsequent read will + // read_exact of the payload *body* will succeed, but a subsequent read will // return UnexpectedEof error. - r.read_exact(&mut buf).await.expect("should succeed"); - assert_eq!( - r.read_exact(&mut buf[4..=4]) - .await - .expect_err("must fail") - .kind(), - std::io::ErrorKind::UnexpectedEof - ); - - // read_to_end will fail. - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..8 + payload.len()]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = Vec::new(); + assert_eq!(r.read_exact(&mut [0; 8]).await.unwrap(), 8); assert_eq!( - r.read_to_end(&mut buf).await.expect_err("must fail").kind(), + r.read_exact(&mut [0]).await.unwrap_err().kind(), std::io::ErrorKind::UnexpectedEof ); } diff --git a/tvix/nix-compat/src/wire/bytes/reader/trailer.rs b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs new file mode 100644 index 000000000000..d2b867c2c338 --- /dev/null +++ b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs @@ -0,0 +1,175 @@ +use std::{ + pin::Pin, + task::{self, ready, Poll}, +}; + +use tokio::io::{self, AsyncRead, ReadBuf}; + +#[derive(Debug)] +pub enum TrailerReader<R> { + Reading { + reader: R, + user_len: u8, + filled: u8, + buf: [u8; 8], + }, + Releasing { + off: u8, + len: u8, + buf: [u8; 8], + }, + Done, +} + +impl<R: AsyncRead + Unpin> TrailerReader<R> { + pub fn new(reader: R, user_len: u8) -> Self { + if user_len == 0 { + return Self::Done; + } + + assert!(user_len < 8, "payload in trailer must be less than 8 bytes"); + Self::Reading { + reader, + user_len, + filled: 0, + buf: [0; 8], + } + } +} + +impl<R: AsyncRead + Unpin> AsyncRead for TrailerReader<R> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context, + user_buf: &mut ReadBuf, + ) -> Poll<io::Result<()>> { + let this = self.get_mut(); + + loop { + match this { + &mut Self::Reading { + reader: _, + user_len, + filled: 8, + buf, + } => { + *this = Self::Releasing { + off: 0, + len: user_len, + buf, + }; + } + Self::Reading { + reader, + user_len, + filled, + buf, + } => { + let mut read_buf = ReadBuf::new(&mut buf[..]); + read_buf.advance(*filled as usize); + ready!(Pin::new(reader).poll_read(cx, &mut read_buf))?; + + let new_filled = read_buf.filled().len() as u8; + if *filled == new_filled { + return Err(io::ErrorKind::UnexpectedEof.into()).into(); + } + + *filled = new_filled; + + // ensure the padding is all zeroes + if (u64::from_le_bytes(*buf) >> (*user_len * 8)) != 0 { + return Err(io::ErrorKind::InvalidData.into()).into(); + } + } + Self::Releasing { off: 8, .. } => { + *this = Self::Done; + } + Self::Releasing { off, len, buf } => { + assert_ne!(user_buf.remaining(), 0); + + let buf = &buf[*off as usize..*len as usize]; + let buf = &buf[..usize::min(buf.len(), user_buf.remaining())]; + + user_buf.put_slice(buf); + *off += buf.len() as u8; + + break; + } + Self::Done => break, + } + } + + Ok(()).into() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + use tokio::io::AsyncReadExt; + + use super::*; + + #[tokio::test] + async fn unexpected_eof() { + let reader = tokio_test::io::Builder::new() + .read(&[0xed]) + .wait(Duration::ZERO) + .read(&[0xef, 0x00]) + .build(); + + let mut reader = TrailerReader::new(reader, 2); + + let mut buf = vec![]; + assert_eq!( + reader.read_to_end(&mut buf).await.unwrap_err().kind(), + io::ErrorKind::UnexpectedEof + ); + } + + #[tokio::test] + async fn invalid_padding() { + let reader = tokio_test::io::Builder::new() + .read(&[0xed]) + .wait(Duration::ZERO) + .read(&[0xef, 0x01, 0x00]) + .wait(Duration::ZERO) + .build(); + + let mut reader = TrailerReader::new(reader, 2); + + let mut buf = vec![]; + assert_eq!( + reader.read_to_end(&mut buf).await.unwrap_err().kind(), + io::ErrorKind::InvalidData + ); + } + + #[tokio::test] + async fn success() { + let reader = tokio_test::io::Builder::new() + .read(&[0xed]) + .wait(Duration::ZERO) + .read(&[0xef, 0x00]) + .wait(Duration::ZERO) + .read(&[0x00, 0x00, 0x00, 0x00, 0x00]) + .build(); + + let mut reader = TrailerReader::new(reader, 2); + + let mut buf = vec![]; + reader.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(buf, &[0xed, 0xef]); + } + + #[tokio::test] + async fn no_padding() { + let reader = tokio_test::io::Builder::new().build(); + let mut reader = TrailerReader::new(reader, 0); + + let mut buf = vec![]; + reader.read_to_end(&mut buf).await.unwrap(); + assert!(buf.is_empty()); + } +} |