diff options
Diffstat (limited to 'tvix/tools/crunch-v2/src/main.rs')
-rw-r--r-- | tvix/tools/crunch-v2/src/main.rs | 309 |
1 files changed, 0 insertions, 309 deletions
diff --git a/tvix/tools/crunch-v2/src/main.rs b/tvix/tools/crunch-v2/src/main.rs deleted file mode 100644 index 5be8c28e293f..000000000000 --- a/tvix/tools/crunch-v2/src/main.rs +++ /dev/null @@ -1,309 +0,0 @@ -//! This is a tool for ingesting subsets of cache.nixos.org into its own flattened castore format. -//! Currently, produced chunks are not preserved, and this purely serves as a way of measuring -//! compression/deduplication ratios for various chunking and compression parameters. -//! -//! NARs to be ingested are read from `ingest.parquet`, and filtered by an SQL expression provided as a program argument. -//! The `file_hash` column should contain SHA-256 hashes of the compressed data, corresponding to the `FileHash` narinfo field. -//! The `compression` column should contain either `"bzip2"` or `"xz"`, corresponding to the `Compression` narinfo field. -//! Additional columns are ignored, but can be used by the SQL filter expression. -//! -//! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash. - -use crunch_v2::proto; - -mod remote; - -use anyhow::Result; -use clap::Parser; -use futures::{stream, StreamExt, TryStreamExt}; -use indicatif::{ProgressBar, ProgressStyle}; -use std::{ - io::{self, BufRead, Read, Write}, - path::PathBuf, - ptr, -}; - -use polars::{ - prelude::{col, LazyFrame, ScanArgsParquet}, - sql::sql_expr, -}; - -use fastcdc::v2020::{ChunkData, StreamCDC}; -use nix_compat::nar::reader as nar; - -use digest::Digest; -use prost::Message; -use sha2::Sha256; - -#[derive(Parser)] -struct Args { - /// Path to an existing parquet file. - /// The `file_hash` column should contain SHA-256 hashes of the compressed - /// data, corresponding to the `FileHash` narinfo field. - /// The `compression` column should contain either `"bzip2"` or `"xz"`, - /// corresponding to the `Compression` narinfo field. - /// Additional columns are ignored, but can be used by the SQL filter expression. - #[clap(long, default_value = "ingest.parquet")] - infile: PathBuf, - - /// Filter expression to filter elements in the parquet file for. - filter: String, - - /// Average chunk size for FastCDC, in KiB. - /// min value is half, max value double of that number. - #[clap(long, default_value_t = 256)] - avg_chunk_size: u32, - - /// Path to the sled database where results are written to (flatstore - /// protobufs, addressed by file hash). - #[clap(long, default_value = "crunch.db")] - outfile: PathBuf, -} - -#[tokio::main] -async fn main() -> Result<()> { - let args = Args::parse(); - - let filter = sql_expr(args.filter)?; - let avg_chunk_size = args.avg_chunk_size * 1024; - - let df = LazyFrame::scan_parquet(&args.infile, ScanArgsParquet::default())? - .filter(filter) - .select([col("file_hash"), col("compression")]) - .drop_nulls(None) - .collect()?; - - let progress = ProgressBar::new(df.height() as u64).with_style(ProgressStyle::with_template( - "{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}", - )?); - - let file_hash = df - .column("file_hash")? - .binary()? - .into_iter() - .map(|h| -> [u8; 32] { h.unwrap().try_into().unwrap() }); - - let compression = df - .column("compression")? - .utf8()? - .into_iter() - .map(|c| c.unwrap()); - - let db: sled::Db = sled::open(args.outfile).unwrap(); - let files_tree = db.open_tree("files").unwrap(); - - let res = stream::iter(file_hash.zip(compression)) - .map(Ok) - .try_for_each_concurrent(Some(16), |(file_hash, compression)| { - let progress = progress.clone(); - let files_tree = files_tree.clone(); - async move { - if files_tree.contains_key(&file_hash)? { - progress.inc(1); - return Ok(()); - } - - let reader = remote::nar(file_hash, compression).await?; - - tokio::task::spawn_blocking(move || { - let mut reader = Sha256Reader::from(reader); - - let path = - ingest(nar::open(&mut reader)?, vec![], avg_chunk_size).map(|node| { - proto::Path { - nar_hash: reader.finalize().as_slice().into(), - node: Some(node), - } - })?; - - files_tree.insert(file_hash, path.encode_to_vec())?; - progress.inc(1); - - Ok::<_, anyhow::Error>(()) - }) - .await? - } - }) - .await; - - let flush = files_tree.flush_async().await; - - res?; - flush?; - - Ok(()) -} - -fn ingest(node: nar::Node, name: Vec<u8>, avg_chunk_size: u32) -> Result<proto::path::Node> { - match node { - nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode { - name, - target, - })), - - nar::Node::Directory(mut reader) => { - let mut directories = vec![]; - let mut files = vec![]; - let mut symlinks = vec![]; - - while let Some(node) = reader.next()? { - match ingest(node.node, node.name.to_owned(), avg_chunk_size)? { - proto::path::Node::Directory(node) => { - directories.push(node); - } - proto::path::Node::File(node) => { - files.push(node); - } - proto::path::Node::Symlink(node) => { - symlinks.push(node); - } - } - } - - Ok(proto::path::Node::Directory(proto::DirectoryNode { - name, - directories, - files, - symlinks, - })) - } - - nar::Node::File { executable, reader } => { - let mut reader = B3Reader::from(reader); - let mut chunks = vec![]; - - for chunk in StreamCDC::new( - &mut reader, - avg_chunk_size / 2, - avg_chunk_size, - avg_chunk_size * 2, - ) { - let ChunkData { - length: size, data, .. - } = chunk?; - - let hash = blake3::hash(&data); - let size_compressed = zstd_size(&data, 9); - - chunks.push(proto::Chunk { - hash: hash.as_bytes().as_slice().into(), - size: size.try_into().unwrap(), - size_compressed: size_compressed.try_into().unwrap(), - }); - } - - Ok(proto::path::Node::File(proto::FileNode { - name, - hash: reader.finalize().as_bytes().as_slice().into(), - chunks, - executable, - })) - } - } -} - -struct Sha256Reader<R> { - inner: R, - hasher: Sha256, - buf: *const [u8], -} - -const ZERO_BUF: *const [u8] = ptr::slice_from_raw_parts(1 as *const u8, 0); - -unsafe impl<R: Send> Send for Sha256Reader<R> {} - -impl<R> From<R> for Sha256Reader<R> { - fn from(value: R) -> Self { - Self { - inner: value, - hasher: Sha256::new(), - buf: ZERO_BUF, - } - } -} - -impl<R: Read> Read for Sha256Reader<R> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.buf = ZERO_BUF; - let n = self.inner.read(buf)?; - self.hasher.update(&buf[..n]); - Ok(n) - } -} - -impl<R: BufRead> BufRead for Sha256Reader<R> { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - self.buf = ZERO_BUF; - let buf = self.inner.fill_buf()?; - self.buf = buf as *const [u8]; - Ok(buf) - } - - fn consume(&mut self, amt: usize) { - // UNSAFETY: This assumes that `R::consume` doesn't invalidate the buffer. - // That's not a sound assumption in general, though it is likely to hold. - // TODO(edef): refactor this codebase to write a fresh NAR for verification purposes - // we already buffer full chunks, so there's no pressing need to reuse the input buffers - unsafe { - let (head, buf) = (*self.buf).split_at(amt); - self.buf = buf as *const [u8]; - self.hasher.update(head); - self.inner.consume(amt); - } - } -} - -impl<R> Sha256Reader<R> { - fn finalize(self) -> [u8; 32] { - self.hasher.finalize().into() - } -} - -struct B3Reader<R> { - inner: R, - hasher: blake3::Hasher, -} - -impl<R> From<R> for B3Reader<R> { - fn from(value: R) -> Self { - Self { - inner: value, - hasher: blake3::Hasher::new(), - } - } -} - -impl<R: Read> Read for B3Reader<R> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - let n = self.inner.read(buf)?; - self.hasher.update(&buf[..n]); - Ok(n) - } -} - -impl<R> B3Reader<R> { - fn finalize(self) -> blake3::Hash { - self.hasher.finalize() - } -} - -fn zstd_size(data: &[u8], level: i32) -> u64 { - let mut w = zstd::Encoder::new(CountingWriter::default(), level).unwrap(); - w.write_all(&data).unwrap(); - let CountingWriter(size) = w.finish().unwrap(); - size -} - -#[derive(Default)] -struct CountingWriter(u64); - -impl Write for CountingWriter { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.0 += buf.len() as u64; - Ok(buf.len()) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} |