about summary refs log tree commit diff
path: root/tvix/glue/src/decompression.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-26T15·58+0300
committerflokli <flokli@flokli.de>2024-04-29T13·11+0000
commit26b77b2cf3138e4db236bf243b718f3de23b0529 (patch)
treecf68332103b7dad516e57cf3fc559c71eb02c1fb /tvix/glue/src/decompression.rs
parent69e4a7881843478916fc2b62d55bba66e393ad38 (diff)
refactor(tvix/glue): move decompression into fetchers/ subdir r/8026
This is specifically used for the fetcher code (only).
Moving it to there for now.

Change-Id: I1e1d0541b85340ef4ff3a4c6b3fa99b51853f539
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11532
Reviewed-by: edef <edef@edef.eu>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/glue/src/decompression.rs')
-rw-r--r--tvix/glue/src/decompression.rs222
1 files changed, 0 insertions, 222 deletions
diff --git a/tvix/glue/src/decompression.rs b/tvix/glue/src/decompression.rs
deleted file mode 100644
index 11dc9d98352a..000000000000
--- a/tvix/glue/src/decompression.rs
+++ /dev/null
@@ -1,222 +0,0 @@
-#![allow(dead_code)] // TODO
-
-use std::{
-    io, mem,
-    pin::Pin,
-    task::{Context, Poll},
-};
-
-use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder};
-use futures::ready;
-use pin_project::pin_project;
-use tokio::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf};
-
-const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b];
-const BZIP2_MAGIC: [u8; 3] = *b"BZh";
-const XZ_MAGIC: [u8; 6] = [0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00];
-const BYTES_NEEDED: usize = 6;
-
-#[derive(Debug, Clone, Copy)]
-enum Algorithm {
-    Gzip,
-    Bzip2,
-    Xz,
-}
-
-impl Algorithm {
-    fn from_magic(magic: &[u8]) -> Option<Self> {
-        if magic.starts_with(&GZIP_MAGIC) {
-            Some(Self::Gzip)
-        } else if magic.starts_with(&BZIP2_MAGIC) {
-            Some(Self::Bzip2)
-        } else if magic.starts_with(&XZ_MAGIC) {
-            Some(Self::Xz)
-        } else {
-            None
-        }
-    }
-}
-
-#[pin_project]
-struct WithPreexistingBuffer<R> {
-    buffer: Vec<u8>,
-    #[pin]
-    inner: R,
-}
-
-impl<R> AsyncRead for WithPreexistingBuffer<R>
-where
-    R: AsyncRead,
-{
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        buf: &mut ReadBuf<'_>,
-    ) -> Poll<io::Result<()>> {
-        let this = self.project();
-        if !this.buffer.is_empty() {
-            // TODO: check if the buffer fits first
-            buf.put_slice(this.buffer);
-            this.buffer.clear();
-        }
-        this.inner.poll_read(cx, buf)
-    }
-}
-
-#[pin_project(project = DecompressedReaderInnerProj)]
-enum DecompressedReaderInner<R> {
-    Unknown {
-        buffer: Vec<u8>,
-        #[pin]
-        inner: Option<R>,
-    },
-    Gzip(#[pin] GzipDecoder<BufReader<WithPreexistingBuffer<R>>>),
-    Bzip2(#[pin] BzDecoder<BufReader<WithPreexistingBuffer<R>>>),
-    Xz(#[pin] XzDecoder<BufReader<WithPreexistingBuffer<R>>>),
-}
-
-impl<R> DecompressedReaderInner<R>
-where
-    R: AsyncBufRead,
-{
-    fn switch_to(&mut self, algorithm: Algorithm) {
-        let (buffer, inner) = match self {
-            DecompressedReaderInner::Unknown { buffer, inner } => {
-                (mem::take(buffer), inner.take().unwrap())
-            }
-            DecompressedReaderInner::Gzip(_)
-            | DecompressedReaderInner::Bzip2(_)
-            | DecompressedReaderInner::Xz(_) => unreachable!(),
-        };
-        let inner = BufReader::new(WithPreexistingBuffer { buffer, inner });
-
-        *self = match algorithm {
-            Algorithm::Gzip => Self::Gzip(GzipDecoder::new(inner)),
-            Algorithm::Bzip2 => Self::Bzip2(BzDecoder::new(inner)),
-            Algorithm::Xz => Self::Xz(XzDecoder::new(inner)),
-        }
-    }
-}
-
-impl<R> AsyncRead for DecompressedReaderInner<R>
-where
-    R: AsyncBufRead,
-{
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        buf: &mut ReadBuf<'_>,
-    ) -> Poll<io::Result<()>> {
-        match self.project() {
-            DecompressedReaderInnerProj::Unknown { .. } => {
-                unreachable!("Can't call poll_read on Unknown")
-            }
-            DecompressedReaderInnerProj::Gzip(inner) => inner.poll_read(cx, buf),
-            DecompressedReaderInnerProj::Bzip2(inner) => inner.poll_read(cx, buf),
-            DecompressedReaderInnerProj::Xz(inner) => inner.poll_read(cx, buf),
-        }
-    }
-}
-
-#[pin_project]
-pub struct DecompressedReader<R> {
-    #[pin]
-    inner: DecompressedReaderInner<R>,
-    switch_to: Option<Algorithm>,
-}
-
-impl<R> DecompressedReader<R> {
-    pub fn new(inner: R) -> Self {
-        Self {
-            inner: DecompressedReaderInner::Unknown {
-                buffer: vec![0; BYTES_NEEDED],
-                inner: Some(inner),
-            },
-            switch_to: None,
-        }
-    }
-}
-
-impl<R> AsyncRead for DecompressedReader<R>
-where
-    R: AsyncBufRead + Unpin,
-{
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        buf: &mut ReadBuf<'_>,
-    ) -> Poll<io::Result<()>> {
-        let mut this = self.project();
-        let (buffer, inner) = match this.inner.as_mut().project() {
-            DecompressedReaderInnerProj::Gzip(inner) => return inner.poll_read(cx, buf),
-            DecompressedReaderInnerProj::Bzip2(inner) => return inner.poll_read(cx, buf),
-            DecompressedReaderInnerProj::Xz(inner) => return inner.poll_read(cx, buf),
-            DecompressedReaderInnerProj::Unknown { buffer, inner } => (buffer, inner),
-        };
-
-        let mut our_buf = ReadBuf::new(buffer);
-        if let Err(e) = ready!(inner.as_pin_mut().unwrap().poll_read(cx, &mut our_buf)) {
-            return Poll::Ready(Err(e));
-        }
-
-        let data = our_buf.filled();
-        if data.len() >= BYTES_NEEDED {
-            if let Some(algorithm) = Algorithm::from_magic(data) {
-                this.inner.as_mut().switch_to(algorithm);
-            } else {
-                return Poll::Ready(Err(io::Error::new(
-                    io::ErrorKind::InvalidData,
-                    "tar data not gz, bzip2, or xz compressed",
-                )));
-            }
-            this.inner.poll_read(cx, buf)
-        } else {
-            cx.waker().wake_by_ref();
-            Poll::Pending
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::path::Path;
-
-    use async_compression::tokio::bufread::GzipEncoder;
-    use futures::TryStreamExt;
-    use rstest::rstest;
-    use tokio::io::{AsyncReadExt, BufReader};
-    use tokio_tar::Archive;
-
-    use super::*;
-
-    #[tokio::test]
-    async fn gzip() {
-        let data = b"abcdefghijk";
-        let mut enc = GzipEncoder::new(&data[..]);
-        let mut gzipped = vec![];
-        enc.read_to_end(&mut gzipped).await.unwrap();
-
-        let mut reader = DecompressedReader::new(BufReader::new(&gzipped[..]));
-        let mut round_tripped = vec![];
-        reader.read_to_end(&mut round_tripped).await.unwrap();
-
-        assert_eq!(data[..], round_tripped[..]);
-    }
-
-    #[rstest]
-    #[case::gzip(include_bytes!("tests/blob.tar.gz"))]
-    #[case::bzip2(include_bytes!("tests/blob.tar.bz2"))]
-    #[case::xz(include_bytes!("tests/blob.tar.xz"))]
-    #[tokio::test]
-    async fn compressed_tar(#[case] data: &[u8]) {
-        let reader = DecompressedReader::new(BufReader::new(data));
-        let mut archive = Archive::new(reader);
-        let mut entries: Vec<_> = archive.entries().unwrap().try_collect().await.unwrap();
-
-        assert_eq!(entries.len(), 1);
-        assert_eq!(entries[0].path().unwrap().as_ref(), Path::new("empty"));
-        let mut data = String::new();
-        entries[0].read_to_string(&mut data).await.unwrap();
-        assert_eq!(data, "");
-    }
-}