From 54609e8c17e60c1a3feaea5430887a6cc6bdce0f Mon Sep 17 00:00:00 2001 From: Aspen Smith Date: Fri, 23 Feb 2024 14:42:52 -0500 Subject: feat(tvix/glue): Add AsyncRead wrapper to decompress streams Add a new AsyncRead wrapper, DecompressedReader, that wraps an underlying AsyncRead, but sniffs the magic bytes at the start of the stream to determine which compression format is being used out of the three that are supported by builtins.fetchTarball, and switches to the correct decompression algorithm adapter dynamically. This will be used in the implementation of builtins.fetchTarball Change-Id: I892a4683d5c93e67d4c173f3d21199bdc6605922 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11019 Reviewed-by: flokli Tested-by: BuildkiteCI --- tvix/glue/Cargo.toml | 7 ++ tvix/glue/src/decompression.rs | 221 +++++++++++++++++++++++++++++++++++++++ tvix/glue/src/lib.rs | 1 + tvix/glue/src/tests/blob.tar.bz2 | Bin 0 -> 116 bytes tvix/glue/src/tests/blob.tar.gz | Bin 0 -> 116 bytes tvix/glue/src/tests/blob.tar.xz | Bin 0 -> 172 bytes 6 files changed, 229 insertions(+) create mode 100644 tvix/glue/src/decompression.rs create mode 100644 tvix/glue/src/tests/blob.tar.bz2 create mode 100644 tvix/glue/src/tests/blob.tar.gz create mode 100644 tvix/glue/src/tests/blob.tar.xz (limited to 'tvix/glue') diff --git a/tvix/glue/Cargo.toml b/tvix/glue/Cargo.toml index f4ebfe490688..1ec3832d917f 100644 --- a/tvix/glue/Cargo.toml +++ b/tvix/glue/Cargo.toml @@ -9,7 +9,9 @@ bstr = "1.6.0" bytes = "1.4.0" data-encoding = "2.3.3" futures = "0.3.30" +magic = "0.16.2" nix-compat = { path = "../nix-compat" } +pin-project = "1.1" reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots"], default-features = false } tvix-build = { path = "../build", default-features = false, features = []} tvix-eval = { path = "../eval" } @@ -17,6 +19,7 @@ tvix-castore = { path = "../castore" } tvix-store = { path = "../store", default-features = false, features = []} tracing = "0.1.37" tokio = "1.28.0" +tokio-tar = "0.3.1" tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } thiserror = "1.0.38" serde = "1.0.195" @@ -24,6 +27,10 @@ serde_json = "1.0" sha2 = "0.10.8" walkdir = "2.4.0" +[dependencies.async-compression] +version = "0.4.6" +features = ["tokio", "gzip", "bzip2", "xz"] + [dependencies.wu-manber] git = "https://github.com/tvlfyi/wu-manber.git" diff --git a/tvix/glue/src/decompression.rs b/tvix/glue/src/decompression.rs new file mode 100644 index 000000000000..7e526932e717 --- /dev/null +++ b/tvix/glue/src/decompression.rs @@ -0,0 +1,221 @@ +#![allow(dead_code)] // TODO + +use std::{ + io, mem, + pin::Pin, + task::{Context, Poll}, +}; + +use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder}; +use futures::ready; +use pin_project::pin_project; +use tokio::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf}; + +const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b]; +const BZIP2_MAGIC: [u8; 3] = *b"BZh"; +const XZ_MAGIC: [u8; 6] = [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]; +const BYTES_NEEDED: usize = 6; + +#[derive(Debug, Clone, Copy)] +enum Algorithm { + Gzip, + Bzip2, + Xz, +} + +impl Algorithm { + fn from_magic(magic: &[u8]) -> Option { + if magic.starts_with(&GZIP_MAGIC) { + Some(Self::Gzip) + } else if magic.starts_with(&BZIP2_MAGIC) { + Some(Self::Bzip2) + } else if magic.starts_with(&XZ_MAGIC) { + Some(Self::Xz) + } else { + None + } + } +} + +#[pin_project] +struct WithPreexistingBuffer { + buffer: Vec, + #[pin] + inner: R, +} + +impl AsyncRead for WithPreexistingBuffer +where + R: AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.project(); + if !this.buffer.is_empty() { + // TODO: check if the buffer fits first + buf.put_slice(this.buffer); + this.buffer.clear(); + } + this.inner.poll_read(cx, buf) + } +} + +#[pin_project(project = DecompressedReaderInnerProj)] +enum DecompressedReaderInner { + Unknown { + buffer: Vec, + #[pin] + inner: Option, + }, + Gzip(#[pin] GzipDecoder>>), + Bzip2(#[pin] BzDecoder>>), + Xz(#[pin] XzDecoder>>), +} + +impl DecompressedReaderInner +where + R: AsyncBufRead, +{ + fn switch_to(&mut self, algorithm: Algorithm) { + let (buffer, inner) = match self { + DecompressedReaderInner::Unknown { buffer, inner } => { + (mem::take(buffer), inner.take().unwrap()) + } + DecompressedReaderInner::Gzip(_) + | DecompressedReaderInner::Bzip2(_) + | DecompressedReaderInner::Xz(_) => unreachable!(), + }; + let inner = BufReader::new(WithPreexistingBuffer { buffer, inner }); + + *self = match algorithm { + Algorithm::Gzip => Self::Gzip(GzipDecoder::new(inner)), + Algorithm::Bzip2 => Self::Bzip2(BzDecoder::new(inner)), + Algorithm::Xz => Self::Xz(XzDecoder::new(inner)), + } + } +} + +impl AsyncRead for DecompressedReaderInner +where + R: AsyncBufRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.project() { + DecompressedReaderInnerProj::Unknown { .. } => { + unreachable!("Can't call poll_read on Unknown") + } + DecompressedReaderInnerProj::Gzip(inner) => inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Bzip2(inner) => inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Xz(inner) => inner.poll_read(cx, buf), + } + } +} + +#[pin_project] +pub struct DecompressedReader { + #[pin] + inner: DecompressedReaderInner, + switch_to: Option, +} + +impl DecompressedReader { + pub fn new(inner: R) -> Self { + Self { + inner: DecompressedReaderInner::Unknown { + buffer: vec![0; BYTES_NEEDED], + inner: Some(inner), + }, + switch_to: None, + } + } +} + +impl AsyncRead for DecompressedReader +where + R: AsyncBufRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut this = self.project(); + let (buffer, inner) = match this.inner.as_mut().project() { + DecompressedReaderInnerProj::Gzip(inner) => return inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Bzip2(inner) => return inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Xz(inner) => return inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Unknown { buffer, inner } => (buffer, inner), + }; + + let mut our_buf = ReadBuf::new(buffer); + if let Err(e) = ready!(inner.as_pin_mut().unwrap().poll_read(cx, &mut our_buf)) { + return Poll::Ready(Err(e)); + } + + let data = our_buf.filled(); + if data.len() >= BYTES_NEEDED { + if let Some(algorithm) = Algorithm::from_magic(data) { + this.inner.as_mut().switch_to(algorithm); + } else { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "tar data not gz, bzip2, or xz compressed", + ))); + } + this.inner.poll_read(cx, buf) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +#[cfg(test)] +mod tests { + use std::path::Path; + + use async_compression::tokio::bufread::GzipEncoder; + use futures::TryStreamExt; + use test_case::test_case; + use tokio::io::{AsyncReadExt, BufReader}; + use tokio_tar::Archive; + + use super::*; + + #[tokio::test] + async fn gzip() { + let data = b"abcdefghijk"; + let mut enc = GzipEncoder::new(&data[..]); + let mut gzipped = vec![]; + enc.read_to_end(&mut gzipped).await.unwrap(); + + let mut reader = DecompressedReader::new(BufReader::new(&gzipped[..])); + let mut round_tripped = vec![]; + reader.read_to_end(&mut round_tripped).await.unwrap(); + + assert_eq!(data[..], round_tripped[..]); + } + + #[test_case(include_bytes!("tests/blob.tar.gz"); "gzip")] + #[test_case(include_bytes!("tests/blob.tar.bz2"); "bzip2")] + #[test_case(include_bytes!("tests/blob.tar.xz"); "xz")] + #[tokio::test] + async fn compressed_tar(data: &[u8]) { + let reader = DecompressedReader::new(BufReader::new(data)); + let mut archive = Archive::new(reader); + let mut entries: Vec<_> = archive.entries().unwrap().try_collect().await.unwrap(); + + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].path().unwrap().as_ref(), Path::new("empty")); + let mut data = String::new(); + entries[0].read_to_string(&mut data).await.unwrap(); + assert_eq!(data, ""); + } +} diff --git a/tvix/glue/src/lib.rs b/tvix/glue/src/lib.rs index b2f586ce52d6..f04d5ec3a0f2 100644 --- a/tvix/glue/src/lib.rs +++ b/tvix/glue/src/lib.rs @@ -5,6 +5,7 @@ pub mod tvix_build; pub mod tvix_io; pub mod tvix_store_io; +mod decompression; #[cfg(test)] mod tests; diff --git a/tvix/glue/src/tests/blob.tar.bz2 b/tvix/glue/src/tests/blob.tar.bz2 new file mode 100644 index 000000000000..d74b9139127f Binary files /dev/null and b/tvix/glue/src/tests/blob.tar.bz2 differ diff --git a/tvix/glue/src/tests/blob.tar.gz b/tvix/glue/src/tests/blob.tar.gz new file mode 100644 index 000000000000..c2bae55078d7 Binary files /dev/null and b/tvix/glue/src/tests/blob.tar.gz differ diff --git a/tvix/glue/src/tests/blob.tar.xz b/tvix/glue/src/tests/blob.tar.xz new file mode 100644 index 000000000000..324a99d89549 Binary files /dev/null and b/tvix/glue/src/tests/blob.tar.xz differ -- cgit 1.4.1