about summary refs log tree commit diff
path: root/tvix/tools/crunch-v2/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/tools/crunch-v2/src')
-rw-r--r--tvix/tools/crunch-v2/src/bin/extract.rs155
-rw-r--r--tvix/tools/crunch-v2/src/lib.rs3
-rw-r--r--tvix/tools/crunch-v2/src/main.rs309
-rw-r--r--tvix/tools/crunch-v2/src/remote.rs211
4 files changed, 678 insertions, 0 deletions
diff --git a/tvix/tools/crunch-v2/src/bin/extract.rs b/tvix/tools/crunch-v2/src/bin/extract.rs
new file mode 100644
index 0000000000..416d201f4e
--- /dev/null
+++ b/tvix/tools/crunch-v2/src/bin/extract.rs
@@ -0,0 +1,155 @@
+//! This tool lossily converts a Sled database produced by crunch-v2 into a Parquet file for analysis.
+//! The resulting `crunch.parquet` has columns file_hash`, `nar_hash`, and `chunk`.
+//! The first two are SHA-256 hashes of the compressed file and the NAR it decompresses to.
+//! `chunk` is a struct array corresponding to [crunch_v2::proto::Chunk] messages.
+//! They are concatenated without any additional structure, so nothing but the chunk list is preserved.
+
+use anyhow::Result;
+use clap::Parser;
+use indicatif::{ProgressBar, ProgressStyle};
+use std::fs::File;
+use std::path::PathBuf;
+
+use crunch_v2::proto::{self, path::Node};
+use prost::Message;
+
+use polars::{
+    chunked_array::builder::AnonymousOwnedListBuilder,
+    prelude::{
+        df, BinaryChunkedBuilder, ChunkedBuilder, DataFrame, DataType, Field, ListBuilderTrait,
+        NamedFrom, ParquetWriter, PrimitiveChunkedBuilder, Series, UInt32Type,
+    },
+    series::IntoSeries,
+};
+
+#[derive(Parser)]
+struct Args {
+    /// Path to the sled database that's read from.
+    #[clap(default_value = "crunch.db")]
+    infile: PathBuf,
+
+    /// Path to the resulting parquet file that's written.
+    #[clap(default_value = "crunch.parquet")]
+    outfile: PathBuf,
+}
+
+fn main() -> Result<()> {
+    let args = Args::parse();
+
+    let w = ParquetWriter::new(File::create(args.outfile)?);
+
+    let db: sled::Db = sled::open(&args.infile).unwrap();
+    let files_tree: sled::Tree = db.open_tree("files").unwrap();
+
+    let progress =
+        ProgressBar::new(files_tree.len() as u64).with_style(ProgressStyle::with_template(
+            "{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
+        )?);
+
+    let mut frame = FrameBuilder::new();
+    for entry in &files_tree {
+        let (file_hash, pb) = entry?;
+        frame.push(
+            file_hash[..].try_into().unwrap(),
+            proto::Path::decode(&pb[..])?,
+        );
+        progress.inc(1);
+    }
+
+    w.finish(&mut frame.finish())?;
+
+    Ok(())
+}
+
+struct FrameBuilder {
+    file_hash: BinaryChunkedBuilder,
+    nar_hash: BinaryChunkedBuilder,
+    chunk: AnonymousOwnedListBuilder,
+}
+
+impl FrameBuilder {
+    fn new() -> Self {
+        Self {
+            file_hash: BinaryChunkedBuilder::new("file_hash", 0, 0),
+            nar_hash: BinaryChunkedBuilder::new("nar_hash", 0, 0),
+            chunk: AnonymousOwnedListBuilder::new(
+                "chunk",
+                0,
+                Some(DataType::Struct(vec![
+                    Field::new("hash", DataType::Binary),
+                    Field::new("size", DataType::UInt32),
+                    Field::new("size_compressed", DataType::UInt32),
+                ])),
+            ),
+        }
+    }
+
+    fn push(&mut self, file_hash: [u8; 32], pb: proto::Path) {
+        self.file_hash.append_value(&file_hash[..]);
+        self.nar_hash.append_value(pb.nar_hash);
+        self.chunk
+            .append_series(&ChunkFrameBuilder::new(pb.node.unwrap()))
+            .unwrap();
+    }
+
+    fn finish(mut self) -> DataFrame {
+        df! {
+            "file_hash" => self.file_hash.finish().into_series(),
+            "nar_hash" => self.nar_hash.finish().into_series(),
+            "chunk" => self.chunk.finish().into_series()
+        }
+        .unwrap()
+    }
+}
+
+struct ChunkFrameBuilder {
+    hash: BinaryChunkedBuilder,
+    size: PrimitiveChunkedBuilder<UInt32Type>,
+    size_compressed: PrimitiveChunkedBuilder<UInt32Type>,
+}
+
+impl ChunkFrameBuilder {
+    fn new(node: proto::path::Node) -> Series {
+        let mut this = Self {
+            hash: BinaryChunkedBuilder::new("hash", 0, 0),
+            size: PrimitiveChunkedBuilder::new("size", 0),
+            size_compressed: PrimitiveChunkedBuilder::new("size_compressed", 0),
+        };
+
+        this.push(node);
+        this.finish()
+    }
+
+    fn push(&mut self, node: Node) {
+        match node {
+            Node::Directory(node) => {
+                for node in node.files {
+                    self.push(Node::File(node));
+                }
+
+                for node in node.directories {
+                    self.push(Node::Directory(node));
+                }
+            }
+            Node::File(node) => {
+                for chunk in node.chunks {
+                    self.hash.append_value(&chunk.hash);
+                    self.size.append_value(chunk.size);
+                    self.size_compressed.append_value(chunk.size_compressed);
+                }
+            }
+            Node::Symlink(_) => {}
+        }
+    }
+
+    fn finish(self) -> Series {
+        df! {
+            "hash" => self.hash.finish().into_series(),
+            "size" => self.size.finish().into_series(),
+            "size_compressed" => self.size_compressed.finish().into_series()
+        }
+        .unwrap()
+        .into_struct("chunk")
+        .into_series()
+    }
+}
diff --git a/tvix/tools/crunch-v2/src/lib.rs b/tvix/tools/crunch-v2/src/lib.rs
new file mode 100644
index 0000000000..09ea2e75d5
--- /dev/null
+++ b/tvix/tools/crunch-v2/src/lib.rs
@@ -0,0 +1,3 @@
+pub mod proto {
+    include!(concat!(env!("OUT_DIR"), "/tvix.flatstore.v1.rs"));
+}
diff --git a/tvix/tools/crunch-v2/src/main.rs b/tvix/tools/crunch-v2/src/main.rs
new file mode 100644
index 0000000000..5be8c28e29
--- /dev/null
+++ b/tvix/tools/crunch-v2/src/main.rs
@@ -0,0 +1,309 @@
+//! This is a tool for ingesting subsets of cache.nixos.org into its own flattened castore format.
+//! Currently, produced chunks are not preserved, and this purely serves as a way of measuring
+//! compression/deduplication ratios for various chunking and compression parameters.
+//!
+//! NARs to be ingested are read from `ingest.parquet`, and filtered by an SQL expression provided as a program argument.
+//! The `file_hash` column should contain SHA-256 hashes of the compressed data, corresponding to the `FileHash` narinfo field.
+//! The `compression` column should contain either `"bzip2"` or `"xz"`, corresponding to the `Compression` narinfo field.
+//! Additional columns are ignored, but can be used by the SQL filter expression.
+//!
+//! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash.
+
+use crunch_v2::proto;
+
+mod remote;
+
+use anyhow::Result;
+use clap::Parser;
+use futures::{stream, StreamExt, TryStreamExt};
+use indicatif::{ProgressBar, ProgressStyle};
+use std::{
+    io::{self, BufRead, Read, Write},
+    path::PathBuf,
+    ptr,
+};
+
+use polars::{
+    prelude::{col, LazyFrame, ScanArgsParquet},
+    sql::sql_expr,
+};
+
+use fastcdc::v2020::{ChunkData, StreamCDC};
+use nix_compat::nar::reader as nar;
+
+use digest::Digest;
+use prost::Message;
+use sha2::Sha256;
+
+#[derive(Parser)]
+struct Args {
+    /// Path to an existing parquet file.
+    /// The `file_hash` column should contain SHA-256 hashes of the compressed
+    /// data, corresponding to the `FileHash` narinfo field.
+    /// The `compression` column should contain either `"bzip2"` or `"xz"`,
+    /// corresponding to the `Compression` narinfo field.
+    /// Additional columns are ignored, but can be used by the SQL filter expression.
+    #[clap(long, default_value = "ingest.parquet")]
+    infile: PathBuf,
+
+    /// Filter expression to filter elements in the parquet file for.
+    filter: String,
+
+    /// Average chunk size for FastCDC, in KiB.
+    /// min value is half, max value double of that number.
+    #[clap(long, default_value_t = 256)]
+    avg_chunk_size: u32,
+
+    /// Path to the sled database where results are written to (flatstore
+    /// protobufs, addressed by file hash).
+    #[clap(long, default_value = "crunch.db")]
+    outfile: PathBuf,
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let args = Args::parse();
+
+    let filter = sql_expr(args.filter)?;
+    let avg_chunk_size = args.avg_chunk_size * 1024;
+
+    let df = LazyFrame::scan_parquet(&args.infile, ScanArgsParquet::default())?
+        .filter(filter)
+        .select([col("file_hash"), col("compression")])
+        .drop_nulls(None)
+        .collect()?;
+
+    let progress = ProgressBar::new(df.height() as u64).with_style(ProgressStyle::with_template(
+        "{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
+    )?);
+
+    let file_hash = df
+        .column("file_hash")?
+        .binary()?
+        .into_iter()
+        .map(|h| -> [u8; 32] { h.unwrap().try_into().unwrap() });
+
+    let compression = df
+        .column("compression")?
+        .utf8()?
+        .into_iter()
+        .map(|c| c.unwrap());
+
+    let db: sled::Db = sled::open(args.outfile).unwrap();
+    let files_tree = db.open_tree("files").unwrap();
+
+    let res = stream::iter(file_hash.zip(compression))
+        .map(Ok)
+        .try_for_each_concurrent(Some(16), |(file_hash, compression)| {
+            let progress = progress.clone();
+            let files_tree = files_tree.clone();
+            async move {
+                if files_tree.contains_key(&file_hash)? {
+                    progress.inc(1);
+                    return Ok(());
+                }
+
+                let reader = remote::nar(file_hash, compression).await?;
+
+                tokio::task::spawn_blocking(move || {
+                    let mut reader = Sha256Reader::from(reader);
+
+                    let path =
+                        ingest(nar::open(&mut reader)?, vec![], avg_chunk_size).map(|node| {
+                            proto::Path {
+                                nar_hash: reader.finalize().as_slice().into(),
+                                node: Some(node),
+                            }
+                        })?;
+
+                    files_tree.insert(file_hash, path.encode_to_vec())?;
+                    progress.inc(1);
+
+                    Ok::<_, anyhow::Error>(())
+                })
+                .await?
+            }
+        })
+        .await;
+
+    let flush = files_tree.flush_async().await;
+
+    res?;
+    flush?;
+
+    Ok(())
+}
+
+fn ingest(node: nar::Node, name: Vec<u8>, avg_chunk_size: u32) -> Result<proto::path::Node> {
+    match node {
+        nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode {
+            name,
+            target,
+        })),
+
+        nar::Node::Directory(mut reader) => {
+            let mut directories = vec![];
+            let mut files = vec![];
+            let mut symlinks = vec![];
+
+            while let Some(node) = reader.next()? {
+                match ingest(node.node, node.name.to_owned(), avg_chunk_size)? {
+                    proto::path::Node::Directory(node) => {
+                        directories.push(node);
+                    }
+                    proto::path::Node::File(node) => {
+                        files.push(node);
+                    }
+                    proto::path::Node::Symlink(node) => {
+                        symlinks.push(node);
+                    }
+                }
+            }
+
+            Ok(proto::path::Node::Directory(proto::DirectoryNode {
+                name,
+                directories,
+                files,
+                symlinks,
+            }))
+        }
+
+        nar::Node::File { executable, reader } => {
+            let mut reader = B3Reader::from(reader);
+            let mut chunks = vec![];
+
+            for chunk in StreamCDC::new(
+                &mut reader,
+                avg_chunk_size / 2,
+                avg_chunk_size,
+                avg_chunk_size * 2,
+            ) {
+                let ChunkData {
+                    length: size, data, ..
+                } = chunk?;
+
+                let hash = blake3::hash(&data);
+                let size_compressed = zstd_size(&data, 9);
+
+                chunks.push(proto::Chunk {
+                    hash: hash.as_bytes().as_slice().into(),
+                    size: size.try_into().unwrap(),
+                    size_compressed: size_compressed.try_into().unwrap(),
+                });
+            }
+
+            Ok(proto::path::Node::File(proto::FileNode {
+                name,
+                hash: reader.finalize().as_bytes().as_slice().into(),
+                chunks,
+                executable,
+            }))
+        }
+    }
+}
+
+struct Sha256Reader<R> {
+    inner: R,
+    hasher: Sha256,
+    buf: *const [u8],
+}
+
+const ZERO_BUF: *const [u8] = ptr::slice_from_raw_parts(1 as *const u8, 0);
+
+unsafe impl<R: Send> Send for Sha256Reader<R> {}
+
+impl<R> From<R> for Sha256Reader<R> {
+    fn from(value: R) -> Self {
+        Self {
+            inner: value,
+            hasher: Sha256::new(),
+            buf: ZERO_BUF,
+        }
+    }
+}
+
+impl<R: Read> Read for Sha256Reader<R> {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.buf = ZERO_BUF;
+        let n = self.inner.read(buf)?;
+        self.hasher.update(&buf[..n]);
+        Ok(n)
+    }
+}
+
+impl<R: BufRead> BufRead for Sha256Reader<R> {
+    fn fill_buf(&mut self) -> io::Result<&[u8]> {
+        self.buf = ZERO_BUF;
+        let buf = self.inner.fill_buf()?;
+        self.buf = buf as *const [u8];
+        Ok(buf)
+    }
+
+    fn consume(&mut self, amt: usize) {
+        // UNSAFETY: This assumes that `R::consume` doesn't invalidate the buffer.
+        // That's not a sound assumption in general, though it is likely to hold.
+        // TODO(edef): refactor this codebase to write a fresh NAR for verification purposes
+        // we already buffer full chunks, so there's no pressing need to reuse the input buffers
+        unsafe {
+            let (head, buf) = (*self.buf).split_at(amt);
+            self.buf = buf as *const [u8];
+            self.hasher.update(head);
+            self.inner.consume(amt);
+        }
+    }
+}
+
+impl<R> Sha256Reader<R> {
+    fn finalize(self) -> [u8; 32] {
+        self.hasher.finalize().into()
+    }
+}
+
+struct B3Reader<R> {
+    inner: R,
+    hasher: blake3::Hasher,
+}
+
+impl<R> From<R> for B3Reader<R> {
+    fn from(value: R) -> Self {
+        Self {
+            inner: value,
+            hasher: blake3::Hasher::new(),
+        }
+    }
+}
+
+impl<R: Read> Read for B3Reader<R> {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        let n = self.inner.read(buf)?;
+        self.hasher.update(&buf[..n]);
+        Ok(n)
+    }
+}
+
+impl<R> B3Reader<R> {
+    fn finalize(self) -> blake3::Hash {
+        self.hasher.finalize()
+    }
+}
+
+fn zstd_size(data: &[u8], level: i32) -> u64 {
+    let mut w = zstd::Encoder::new(CountingWriter::default(), level).unwrap();
+    w.write_all(&data).unwrap();
+    let CountingWriter(size) = w.finish().unwrap();
+    size
+}
+
+#[derive(Default)]
+struct CountingWriter(u64);
+
+impl Write for CountingWriter {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.0 += buf.len() as u64;
+        Ok(buf.len())
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        Ok(())
+    }
+}
diff --git a/tvix/tools/crunch-v2/src/remote.rs b/tvix/tools/crunch-v2/src/remote.rs
new file mode 100644
index 0000000000..93952ecd73
--- /dev/null
+++ b/tvix/tools/crunch-v2/src/remote.rs
@@ -0,0 +1,211 @@
+use std::{
+    cmp,
+    io::{self, BufRead, BufReader, Read},
+    pin::Pin,
+    task::{self, Poll},
+};
+
+use anyhow::{bail, Result};
+use bytes::{Buf, Bytes};
+use futures::{future::BoxFuture, Future, FutureExt, Stream, StreamExt};
+use lazy_static::lazy_static;
+use tokio::runtime::Handle;
+
+use nix_compat::nixbase32;
+
+use rusoto_core::{ByteStream, Region};
+use rusoto_s3::{GetObjectOutput, GetObjectRequest, S3Client, S3};
+
+use bzip2::read::BzDecoder;
+use xz2::read::XzDecoder;
+
+lazy_static! {
+    static ref S3_CLIENT: S3Client = S3Client::new(Region::UsEast1);
+}
+
+const BUCKET: &str = "nix-cache";
+
+pub async fn nar(
+    file_hash: [u8; 32],
+    compression: &str,
+) -> Result<Box<BufReader<dyn Read + Send>>> {
+    let (extension, decompress): (&'static str, fn(_) -> Box<_>) = match compression {
+        "bzip2" => ("bz2", decompress_bz2),
+        "xz" => ("xz", decompress_xz),
+        _ => bail!("unknown compression: {compression}"),
+    };
+
+    Ok(decompress(
+        FileStream::new(FileKey {
+            file_hash,
+            extension,
+        })
+        .await?
+        .into(),
+    ))
+}
+
+fn decompress_xz(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> {
+    Box::new(BufReader::new(XzDecoder::new(reader)))
+}
+
+fn decompress_bz2(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> {
+    Box::new(BufReader::new(BzDecoder::new(reader)))
+}
+
+struct FileStreamReader {
+    inner: FileStream,
+    buffer: Bytes,
+}
+
+impl From<FileStream> for FileStreamReader {
+    fn from(value: FileStream) -> Self {
+        FileStreamReader {
+            inner: value,
+            buffer: Bytes::new(),
+        }
+    }
+}
+
+impl Read for FileStreamReader {
+    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+        let src = self.fill_buf()?;
+        let n = cmp::min(src.len(), dst.len());
+        dst[..n].copy_from_slice(&src[..n]);
+        self.consume(n);
+        Ok(n)
+    }
+}
+
+impl BufRead for FileStreamReader {
+    fn fill_buf(&mut self) -> io::Result<&[u8]> {
+        if !self.buffer.is_empty() {
+            return Ok(&self.buffer);
+        }
+
+        self.buffer = Handle::current()
+            .block_on(self.inner.next())
+            .transpose()?
+            .unwrap_or_default();
+
+        Ok(&self.buffer)
+    }
+
+    fn consume(&mut self, cnt: usize) {
+        self.buffer.advance(cnt);
+    }
+}
+
+struct FileKey {
+    file_hash: [u8; 32],
+    extension: &'static str,
+}
+
+impl FileKey {
+    fn get(
+        &self,
+        offset: u64,
+        e_tag: Option<&str>,
+    ) -> impl Future<Output = io::Result<GetObjectOutput>> + Send + 'static {
+        let input = GetObjectRequest {
+            bucket: BUCKET.to_string(),
+            key: format!(
+                "nar/{}.nar.{}",
+                nixbase32::encode(&self.file_hash),
+                self.extension
+            ),
+            if_match: e_tag.map(str::to_owned),
+            range: Some(format!("bytes {}-", offset + 1)),
+            ..Default::default()
+        };
+
+        async {
+            S3_CLIENT
+                .get_object(input)
+                .await
+                .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+        }
+    }
+}
+
+struct FileStream {
+    key: FileKey,
+    e_tag: String,
+    offset: u64,
+    length: u64,
+    inner: FileStreamState,
+}
+
+enum FileStreamState {
+    Response(BoxFuture<'static, io::Result<GetObjectOutput>>),
+    Body(ByteStream),
+    Eof,
+}
+
+impl FileStream {
+    pub async fn new(key: FileKey) -> io::Result<Self> {
+        let resp = key.get(0, None).await?;
+
+        Ok(FileStream {
+            key,
+            e_tag: resp.e_tag.unwrap(),
+            offset: 0,
+            length: resp.content_length.unwrap().try_into().unwrap(),
+            inner: FileStreamState::Body(resp.body.unwrap()),
+        })
+    }
+}
+
+macro_rules! poll {
+    ($expr:expr) => {
+        match $expr {
+            Poll::Pending => {
+                return Poll::Pending;
+            }
+            Poll::Ready(value) => value,
+        }
+    };
+}
+
+impl Stream for FileStream {
+    type Item = io::Result<Bytes>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        let chunk = loop {
+            match &mut this.inner {
+                FileStreamState::Response(resp) => match poll!(resp.poll_unpin(cx)) {
+                    Err(err) => {
+                        this.inner = FileStreamState::Eof;
+                        return Poll::Ready(Some(Err(err)));
+                    }
+                    Ok(resp) => {
+                        this.inner = FileStreamState::Body(resp.body.unwrap());
+                    }
+                },
+                FileStreamState::Body(body) => match poll!(body.poll_next_unpin(cx)) {
+                    None | Some(Err(_)) => {
+                        this.inner = FileStreamState::Response(
+                            this.key.get(this.offset, Some(&this.e_tag)).boxed(),
+                        );
+                    }
+                    Some(Ok(chunk)) => {
+                        break chunk;
+                    }
+                },
+                FileStreamState::Eof => {
+                    return Poll::Ready(None);
+                }
+            }
+        };
+
+        this.offset += chunk.len() as u64;
+
+        if this.offset >= this.length {
+            this.inner = FileStreamState::Eof;
+        }
+
+        Poll::Ready(Some(Ok(chunk)))
+    }
+}