about summary refs log tree commit diff
path: root/tvix/glue
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue')
-rw-r--r--tvix/glue/Cargo.toml7
-rw-r--r--tvix/glue/src/decompression.rs221
-rw-r--r--tvix/glue/src/lib.rs1
-rw-r--r--tvix/glue/src/tests/blob.tar.bz2bin0 -> 116 bytes
-rw-r--r--tvix/glue/src/tests/blob.tar.gzbin0 -> 116 bytes
-rw-r--r--tvix/glue/src/tests/blob.tar.xzbin0 -> 172 bytes
6 files changed, 229 insertions, 0 deletions
diff --git a/tvix/glue/Cargo.toml b/tvix/glue/Cargo.toml
index f4ebfe490688..1ec3832d917f 100644
--- a/tvix/glue/Cargo.toml
+++ b/tvix/glue/Cargo.toml
@@ -9,7 +9,9 @@ bstr = "1.6.0"
 bytes = "1.4.0"
 data-encoding = "2.3.3"
 futures = "0.3.30"
+magic = "0.16.2"
 nix-compat = { path = "../nix-compat" }
+pin-project = "1.1"
 reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots"], default-features = false }
 tvix-build = { path = "../build", default-features = false, features = []}
 tvix-eval = { path = "../eval" }
@@ -17,6 +19,7 @@ tvix-castore = { path = "../castore" }
 tvix-store = { path = "../store", default-features = false, features = []}
 tracing = "0.1.37"
 tokio = "1.28.0"
+tokio-tar = "0.3.1"
 tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
 thiserror = "1.0.38"
 serde = "1.0.195"
@@ -24,6 +27,10 @@ serde_json = "1.0"
 sha2 = "0.10.8"
 walkdir = "2.4.0"
 
+[dependencies.async-compression]
+version = "0.4.6"
+features = ["tokio", "gzip", "bzip2", "xz"]
+
 [dependencies.wu-manber]
 git = "https://github.com/tvlfyi/wu-manber.git"
 
diff --git a/tvix/glue/src/decompression.rs b/tvix/glue/src/decompression.rs
new file mode 100644
index 000000000000..7e526932e717
--- /dev/null
+++ b/tvix/glue/src/decompression.rs
@@ -0,0 +1,221 @@
+#![allow(dead_code)] // TODO
+
+use std::{
+    io, mem,
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder};
+use futures::ready;
+use pin_project::pin_project;
+use tokio::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf};
+
+const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b];
+const BZIP2_MAGIC: [u8; 3] = *b"BZh";
+const XZ_MAGIC: [u8; 6] = [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00];
+const BYTES_NEEDED: usize = 6;
+
+#[derive(Debug, Clone, Copy)]
+enum Algorithm {
+    Gzip,
+    Bzip2,
+    Xz,
+}
+
+impl Algorithm {
+    fn from_magic(magic: &[u8]) -> Option<Self> {
+        if magic.starts_with(&GZIP_MAGIC) {
+            Some(Self::Gzip)
+        } else if magic.starts_with(&BZIP2_MAGIC) {
+            Some(Self::Bzip2)
+        } else if magic.starts_with(&XZ_MAGIC) {
+            Some(Self::Xz)
+        } else {
+            None
+        }
+    }
+}
+
+#[pin_project]
+struct WithPreexistingBuffer<R> {
+    buffer: Vec<u8>,
+    #[pin]
+    inner: R,
+}
+
+impl<R> AsyncRead for WithPreexistingBuffer<R>
+where
+    R: AsyncRead,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<io::Result<()>> {
+        let this = self.project();
+        if !this.buffer.is_empty() {
+            // TODO: check if the buffer fits first
+            buf.put_slice(this.buffer);
+            this.buffer.clear();
+        }
+        this.inner.poll_read(cx, buf)
+    }
+}
+
+#[pin_project(project = DecompressedReaderInnerProj)]
+enum DecompressedReaderInner<R> {
+    Unknown {
+        buffer: Vec<u8>,
+        #[pin]
+        inner: Option<R>,
+    },
+    Gzip(#[pin] GzipDecoder<BufReader<WithPreexistingBuffer<R>>>),
+    Bzip2(#[pin] BzDecoder<BufReader<WithPreexistingBuffer<R>>>),
+    Xz(#[pin] XzDecoder<BufReader<WithPreexistingBuffer<R>>>),
+}
+
+impl<R> DecompressedReaderInner<R>
+where
+    R: AsyncBufRead,
+{
+    fn switch_to(&mut self, algorithm: Algorithm) {
+        let (buffer, inner) = match self {
+            DecompressedReaderInner::Unknown { buffer, inner } => {
+                (mem::take(buffer), inner.take().unwrap())
+            }
+            DecompressedReaderInner::Gzip(_)
+            | DecompressedReaderInner::Bzip2(_)
+            | DecompressedReaderInner::Xz(_) => unreachable!(),
+        };
+        let inner = BufReader::new(WithPreexistingBuffer { buffer, inner });
+
+        *self = match algorithm {
+            Algorithm::Gzip => Self::Gzip(GzipDecoder::new(inner)),
+            Algorithm::Bzip2 => Self::Bzip2(BzDecoder::new(inner)),
+            Algorithm::Xz => Self::Xz(XzDecoder::new(inner)),
+        }
+    }
+}
+
+impl<R> AsyncRead for DecompressedReaderInner<R>
+where
+    R: AsyncBufRead,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<io::Result<()>> {
+        match self.project() {
+            DecompressedReaderInnerProj::Unknown { .. } => {
+                unreachable!("Can't call poll_read on Unknown")
+            }
+            DecompressedReaderInnerProj::Gzip(inner) => inner.poll_read(cx, buf),
+            DecompressedReaderInnerProj::Bzip2(inner) => inner.poll_read(cx, buf),
+            DecompressedReaderInnerProj::Xz(inner) => inner.poll_read(cx, buf),
+        }
+    }
+}
+
+#[pin_project]
+pub struct DecompressedReader<R> {
+    #[pin]
+    inner: DecompressedReaderInner<R>,
+    switch_to: Option<Algorithm>,
+}
+
+impl<R> DecompressedReader<R> {
+    pub fn new(inner: R) -> Self {
+        Self {
+            inner: DecompressedReaderInner::Unknown {
+                buffer: vec![0; BYTES_NEEDED],
+                inner: Some(inner),
+            },
+            switch_to: None,
+        }
+    }
+}
+
+impl<R> AsyncRead for DecompressedReader<R>
+where
+    R: AsyncBufRead + Unpin,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<io::Result<()>> {
+        let mut this = self.project();
+        let (buffer, inner) = match this.inner.as_mut().project() {
+            DecompressedReaderInnerProj::Gzip(inner) => return inner.poll_read(cx, buf),
+            DecompressedReaderInnerProj::Bzip2(inner) => return inner.poll_read(cx, buf),
+            DecompressedReaderInnerProj::Xz(inner) => return inner.poll_read(cx, buf),
+            DecompressedReaderInnerProj::Unknown { buffer, inner } => (buffer, inner),
+        };
+
+        let mut our_buf = ReadBuf::new(buffer);
+        if let Err(e) = ready!(inner.as_pin_mut().unwrap().poll_read(cx, &mut our_buf)) {
+            return Poll::Ready(Err(e));
+        }
+
+        let data = our_buf.filled();
+        if data.len() >= BYTES_NEEDED {
+            if let Some(algorithm) = Algorithm::from_magic(data) {
+                this.inner.as_mut().switch_to(algorithm);
+            } else {
+                return Poll::Ready(Err(io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    "tar data not gz, bzip2, or xz compressed",
+                )));
+            }
+            this.inner.poll_read(cx, buf)
+        } else {
+            cx.waker().wake_by_ref();
+            Poll::Pending
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::path::Path;
+
+    use async_compression::tokio::bufread::GzipEncoder;
+    use futures::TryStreamExt;
+    use test_case::test_case;
+    use tokio::io::{AsyncReadExt, BufReader};
+    use tokio_tar::Archive;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn gzip() {
+        let data = b"abcdefghijk";
+        let mut enc = GzipEncoder::new(&data[..]);
+        let mut gzipped = vec![];
+        enc.read_to_end(&mut gzipped).await.unwrap();
+
+        let mut reader = DecompressedReader::new(BufReader::new(&gzipped[..]));
+        let mut round_tripped = vec![];
+        reader.read_to_end(&mut round_tripped).await.unwrap();
+
+        assert_eq!(data[..], round_tripped[..]);
+    }
+
+    #[test_case(include_bytes!("tests/blob.tar.gz"); "gzip")]
+    #[test_case(include_bytes!("tests/blob.tar.bz2"); "bzip2")]
+    #[test_case(include_bytes!("tests/blob.tar.xz"); "xz")]
+    #[tokio::test]
+    async fn compressed_tar(data: &[u8]) {
+        let reader = DecompressedReader::new(BufReader::new(data));
+        let mut archive = Archive::new(reader);
+        let mut entries: Vec<_> = archive.entries().unwrap().try_collect().await.unwrap();
+
+        assert_eq!(entries.len(), 1);
+        assert_eq!(entries[0].path().unwrap().as_ref(), Path::new("empty"));
+        let mut data = String::new();
+        entries[0].read_to_string(&mut data).await.unwrap();
+        assert_eq!(data, "");
+    }
+}
diff --git a/tvix/glue/src/lib.rs b/tvix/glue/src/lib.rs
index b2f586ce52d6..f04d5ec3a0f2 100644
--- a/tvix/glue/src/lib.rs
+++ b/tvix/glue/src/lib.rs
@@ -5,6 +5,7 @@ pub mod tvix_build;
 pub mod tvix_io;
 pub mod tvix_store_io;
 
+mod decompression;
 #[cfg(test)]
 mod tests;
 
diff --git a/tvix/glue/src/tests/blob.tar.bz2 b/tvix/glue/src/tests/blob.tar.bz2
new file mode 100644
index 000000000000..d74b9139127f
--- /dev/null
+++ b/tvix/glue/src/tests/blob.tar.bz2
Binary files differdiff --git a/tvix/glue/src/tests/blob.tar.gz b/tvix/glue/src/tests/blob.tar.gz
new file mode 100644
index 000000000000..c2bae55078d7
--- /dev/null
+++ b/tvix/glue/src/tests/blob.tar.gz
Binary files differdiff --git a/tvix/glue/src/tests/blob.tar.xz b/tvix/glue/src/tests/blob.tar.xz
new file mode 100644
index 000000000000..324a99d89549
--- /dev/null
+++ b/tvix/glue/src/tests/blob.tar.xz
Binary files differ