diff options
Diffstat (limited to 'tvix/glue')
-rw-r--r-- | tvix/glue/Cargo.toml | 7 | ||||
-rw-r--r-- | tvix/glue/src/decompression.rs | 221 | ||||
-rw-r--r-- | tvix/glue/src/lib.rs | 1 | ||||
-rw-r--r-- | tvix/glue/src/tests/blob.tar.bz2 | bin | 0 -> 116 bytes | |||
-rw-r--r-- | tvix/glue/src/tests/blob.tar.gz | bin | 0 -> 116 bytes | |||
-rw-r--r-- | tvix/glue/src/tests/blob.tar.xz | bin | 0 -> 172 bytes |
6 files changed, 229 insertions, 0 deletions
diff --git a/tvix/glue/Cargo.toml b/tvix/glue/Cargo.toml index f4ebfe490688..1ec3832d917f 100644 --- a/tvix/glue/Cargo.toml +++ b/tvix/glue/Cargo.toml @@ -9,7 +9,9 @@ bstr = "1.6.0" bytes = "1.4.0" data-encoding = "2.3.3" futures = "0.3.30" +magic = "0.16.2" nix-compat = { path = "../nix-compat" } +pin-project = "1.1" reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots"], default-features = false } tvix-build = { path = "../build", default-features = false, features = []} tvix-eval = { path = "../eval" } @@ -17,6 +19,7 @@ tvix-castore = { path = "../castore" } tvix-store = { path = "../store", default-features = false, features = []} tracing = "0.1.37" tokio = "1.28.0" +tokio-tar = "0.3.1" tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } thiserror = "1.0.38" serde = "1.0.195" @@ -24,6 +27,10 @@ serde_json = "1.0" sha2 = "0.10.8" walkdir = "2.4.0" +[dependencies.async-compression] +version = "0.4.6" +features = ["tokio", "gzip", "bzip2", "xz"] + [dependencies.wu-manber] git = "https://github.com/tvlfyi/wu-manber.git" diff --git a/tvix/glue/src/decompression.rs b/tvix/glue/src/decompression.rs new file mode 100644 index 000000000000..7e526932e717 --- /dev/null +++ b/tvix/glue/src/decompression.rs @@ -0,0 +1,221 @@ +#![allow(dead_code)] // TODO + +use std::{ + io, mem, + pin::Pin, + task::{Context, Poll}, +}; + +use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder}; +use futures::ready; +use pin_project::pin_project; +use tokio::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf}; + +const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b]; +const BZIP2_MAGIC: [u8; 3] = *b"BZh"; +const XZ_MAGIC: [u8; 6] = [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]; +const BYTES_NEEDED: usize = 6; + +#[derive(Debug, Clone, Copy)] +enum Algorithm { + Gzip, + Bzip2, + Xz, +} + +impl Algorithm { + fn from_magic(magic: &[u8]) -> Option<Self> { + if magic.starts_with(&GZIP_MAGIC) { + Some(Self::Gzip) + } else if magic.starts_with(&BZIP2_MAGIC) { + Some(Self::Bzip2) + } else if magic.starts_with(&XZ_MAGIC) { + Some(Self::Xz) + } else { + None + } + } +} + +#[pin_project] +struct WithPreexistingBuffer<R> { + buffer: Vec<u8>, + #[pin] + inner: R, +} + +impl<R> AsyncRead for WithPreexistingBuffer<R> +where + R: AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + let this = self.project(); + if !this.buffer.is_empty() { + // TODO: check if the buffer fits first + buf.put_slice(this.buffer); + this.buffer.clear(); + } + this.inner.poll_read(cx, buf) + } +} + +#[pin_project(project = DecompressedReaderInnerProj)] +enum DecompressedReaderInner<R> { + Unknown { + buffer: Vec<u8>, + #[pin] + inner: Option<R>, + }, + Gzip(#[pin] GzipDecoder<BufReader<WithPreexistingBuffer<R>>>), + Bzip2(#[pin] BzDecoder<BufReader<WithPreexistingBuffer<R>>>), + Xz(#[pin] XzDecoder<BufReader<WithPreexistingBuffer<R>>>), +} + +impl<R> DecompressedReaderInner<R> +where + R: AsyncBufRead, +{ + fn switch_to(&mut self, algorithm: Algorithm) { + let (buffer, inner) = match self { + DecompressedReaderInner::Unknown { buffer, inner } => { + (mem::take(buffer), inner.take().unwrap()) + } + DecompressedReaderInner::Gzip(_) + | DecompressedReaderInner::Bzip2(_) + | DecompressedReaderInner::Xz(_) => unreachable!(), + }; + let inner = BufReader::new(WithPreexistingBuffer { buffer, inner }); + + *self = match algorithm { + Algorithm::Gzip => Self::Gzip(GzipDecoder::new(inner)), + Algorithm::Bzip2 => Self::Bzip2(BzDecoder::new(inner)), + Algorithm::Xz => Self::Xz(XzDecoder::new(inner)), + } + } +} + +impl<R> AsyncRead for DecompressedReaderInner<R> +where + R: AsyncBufRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + match self.project() { + DecompressedReaderInnerProj::Unknown { .. } => { + unreachable!("Can't call poll_read on Unknown") + } + DecompressedReaderInnerProj::Gzip(inner) => inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Bzip2(inner) => inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Xz(inner) => inner.poll_read(cx, buf), + } + } +} + +#[pin_project] +pub struct DecompressedReader<R> { + #[pin] + inner: DecompressedReaderInner<R>, + switch_to: Option<Algorithm>, +} + +impl<R> DecompressedReader<R> { + pub fn new(inner: R) -> Self { + Self { + inner: DecompressedReaderInner::Unknown { + buffer: vec![0; BYTES_NEEDED], + inner: Some(inner), + }, + switch_to: None, + } + } +} + +impl<R> AsyncRead for DecompressedReader<R> +where + R: AsyncBufRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + let mut this = self.project(); + let (buffer, inner) = match this.inner.as_mut().project() { + DecompressedReaderInnerProj::Gzip(inner) => return inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Bzip2(inner) => return inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Xz(inner) => return inner.poll_read(cx, buf), + DecompressedReaderInnerProj::Unknown { buffer, inner } => (buffer, inner), + }; + + let mut our_buf = ReadBuf::new(buffer); + if let Err(e) = ready!(inner.as_pin_mut().unwrap().poll_read(cx, &mut our_buf)) { + return Poll::Ready(Err(e)); + } + + let data = our_buf.filled(); + if data.len() >= BYTES_NEEDED { + if let Some(algorithm) = Algorithm::from_magic(data) { + this.inner.as_mut().switch_to(algorithm); + } else { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "tar data not gz, bzip2, or xz compressed", + ))); + } + this.inner.poll_read(cx, buf) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +#[cfg(test)] +mod tests { + use std::path::Path; + + use async_compression::tokio::bufread::GzipEncoder; + use futures::TryStreamExt; + use test_case::test_case; + use tokio::io::{AsyncReadExt, BufReader}; + use tokio_tar::Archive; + + use super::*; + + #[tokio::test] + async fn gzip() { + let data = b"abcdefghijk"; + let mut enc = GzipEncoder::new(&data[..]); + let mut gzipped = vec![]; + enc.read_to_end(&mut gzipped).await.unwrap(); + + let mut reader = DecompressedReader::new(BufReader::new(&gzipped[..])); + let mut round_tripped = vec![]; + reader.read_to_end(&mut round_tripped).await.unwrap(); + + assert_eq!(data[..], round_tripped[..]); + } + + #[test_case(include_bytes!("tests/blob.tar.gz"); "gzip")] + #[test_case(include_bytes!("tests/blob.tar.bz2"); "bzip2")] + #[test_case(include_bytes!("tests/blob.tar.xz"); "xz")] + #[tokio::test] + async fn compressed_tar(data: &[u8]) { + let reader = DecompressedReader::new(BufReader::new(data)); + let mut archive = Archive::new(reader); + let mut entries: Vec<_> = archive.entries().unwrap().try_collect().await.unwrap(); + + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].path().unwrap().as_ref(), Path::new("empty")); + let mut data = String::new(); + entries[0].read_to_string(&mut data).await.unwrap(); + assert_eq!(data, ""); + } +} diff --git a/tvix/glue/src/lib.rs b/tvix/glue/src/lib.rs index b2f586ce52d6..f04d5ec3a0f2 100644 --- a/tvix/glue/src/lib.rs +++ b/tvix/glue/src/lib.rs @@ -5,6 +5,7 @@ pub mod tvix_build; pub mod tvix_io; pub mod tvix_store_io; +mod decompression; #[cfg(test)] mod tests; diff --git a/tvix/glue/src/tests/blob.tar.bz2 b/tvix/glue/src/tests/blob.tar.bz2 new file mode 100644 index 000000000000..d74b9139127f --- /dev/null +++ b/tvix/glue/src/tests/blob.tar.bz2 Binary files differdiff --git a/tvix/glue/src/tests/blob.tar.gz b/tvix/glue/src/tests/blob.tar.gz new file mode 100644 index 000000000000..c2bae55078d7 --- /dev/null +++ b/tvix/glue/src/tests/blob.tar.gz Binary files differdiff --git a/tvix/glue/src/tests/blob.tar.xz b/tvix/glue/src/tests/blob.tar.xz new file mode 100644 index 000000000000..324a99d89549 --- /dev/null +++ b/tvix/glue/src/tests/blob.tar.xz Binary files differ |