diff options
Diffstat (limited to 'tvix/tools/crunch-v2/src')
-rw-r--r-- | tvix/tools/crunch-v2/src/bin/extract.rs | 155 | ||||
-rw-r--r-- | tvix/tools/crunch-v2/src/lib.rs | 3 | ||||
-rw-r--r-- | tvix/tools/crunch-v2/src/main.rs | 309 | ||||
-rw-r--r-- | tvix/tools/crunch-v2/src/remote.rs | 211 |
4 files changed, 678 insertions, 0 deletions
diff --git a/tvix/tools/crunch-v2/src/bin/extract.rs b/tvix/tools/crunch-v2/src/bin/extract.rs new file mode 100644 index 000000000000..416d201f4e04 --- /dev/null +++ b/tvix/tools/crunch-v2/src/bin/extract.rs @@ -0,0 +1,155 @@ +//! This tool lossily converts a Sled database produced by crunch-v2 into a Parquet file for analysis. +//! The resulting `crunch.parquet` has columns file_hash`, `nar_hash`, and `chunk`. +//! The first two are SHA-256 hashes of the compressed file and the NAR it decompresses to. +//! `chunk` is a struct array corresponding to [crunch_v2::proto::Chunk] messages. +//! They are concatenated without any additional structure, so nothing but the chunk list is preserved. + +use anyhow::Result; +use clap::Parser; +use indicatif::{ProgressBar, ProgressStyle}; +use std::fs::File; +use std::path::PathBuf; + +use crunch_v2::proto::{self, path::Node}; +use prost::Message; + +use polars::{ + chunked_array::builder::AnonymousOwnedListBuilder, + prelude::{ + df, BinaryChunkedBuilder, ChunkedBuilder, DataFrame, DataType, Field, ListBuilderTrait, + NamedFrom, ParquetWriter, PrimitiveChunkedBuilder, Series, UInt32Type, + }, + series::IntoSeries, +}; + +#[derive(Parser)] +struct Args { + /// Path to the sled database that's read from. + #[clap(default_value = "crunch.db")] + infile: PathBuf, + + /// Path to the resulting parquet file that's written. + #[clap(default_value = "crunch.parquet")] + outfile: PathBuf, +} + +fn main() -> Result<()> { + let args = Args::parse(); + + let w = ParquetWriter::new(File::create(args.outfile)?); + + let db: sled::Db = sled::open(&args.infile).unwrap(); + let files_tree: sled::Tree = db.open_tree("files").unwrap(); + + let progress = + ProgressBar::new(files_tree.len() as u64).with_style(ProgressStyle::with_template( + "{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}", + )?); + + let mut frame = FrameBuilder::new(); + for entry in &files_tree { + let (file_hash, pb) = entry?; + frame.push( + file_hash[..].try_into().unwrap(), + proto::Path::decode(&pb[..])?, + ); + progress.inc(1); + } + + w.finish(&mut frame.finish())?; + + Ok(()) +} + +struct FrameBuilder { + file_hash: BinaryChunkedBuilder, + nar_hash: BinaryChunkedBuilder, + chunk: AnonymousOwnedListBuilder, +} + +impl FrameBuilder { + fn new() -> Self { + Self { + file_hash: BinaryChunkedBuilder::new("file_hash", 0, 0), + nar_hash: BinaryChunkedBuilder::new("nar_hash", 0, 0), + chunk: AnonymousOwnedListBuilder::new( + "chunk", + 0, + Some(DataType::Struct(vec![ + Field::new("hash", DataType::Binary), + Field::new("size", DataType::UInt32), + Field::new("size_compressed", DataType::UInt32), + ])), + ), + } + } + + fn push(&mut self, file_hash: [u8; 32], pb: proto::Path) { + self.file_hash.append_value(&file_hash[..]); + self.nar_hash.append_value(pb.nar_hash); + self.chunk + .append_series(&ChunkFrameBuilder::new(pb.node.unwrap())) + .unwrap(); + } + + fn finish(mut self) -> DataFrame { + df! { + "file_hash" => self.file_hash.finish().into_series(), + "nar_hash" => self.nar_hash.finish().into_series(), + "chunk" => self.chunk.finish().into_series() + } + .unwrap() + } +} + +struct ChunkFrameBuilder { + hash: BinaryChunkedBuilder, + size: PrimitiveChunkedBuilder<UInt32Type>, + size_compressed: PrimitiveChunkedBuilder<UInt32Type>, +} + +impl ChunkFrameBuilder { + fn new(node: proto::path::Node) -> Series { + let mut this = Self { + hash: BinaryChunkedBuilder::new("hash", 0, 0), + size: PrimitiveChunkedBuilder::new("size", 0), + size_compressed: PrimitiveChunkedBuilder::new("size_compressed", 0), + }; + + this.push(node); + this.finish() + } + + fn push(&mut self, node: Node) { + match node { + Node::Directory(node) => { + for node in node.files { + self.push(Node::File(node)); + } + + for node in node.directories { + self.push(Node::Directory(node)); + } + } + Node::File(node) => { + for chunk in node.chunks { + self.hash.append_value(&chunk.hash); + self.size.append_value(chunk.size); + self.size_compressed.append_value(chunk.size_compressed); + } + } + Node::Symlink(_) => {} + } + } + + fn finish(self) -> Series { + df! { + "hash" => self.hash.finish().into_series(), + "size" => self.size.finish().into_series(), + "size_compressed" => self.size_compressed.finish().into_series() + } + .unwrap() + .into_struct("chunk") + .into_series() + } +} diff --git a/tvix/tools/crunch-v2/src/lib.rs b/tvix/tools/crunch-v2/src/lib.rs new file mode 100644 index 000000000000..09ea2e75d5a3 --- /dev/null +++ b/tvix/tools/crunch-v2/src/lib.rs @@ -0,0 +1,3 @@ +pub mod proto { + include!(concat!(env!("OUT_DIR"), "/tvix.flatstore.v1.rs")); +} diff --git a/tvix/tools/crunch-v2/src/main.rs b/tvix/tools/crunch-v2/src/main.rs new file mode 100644 index 000000000000..a5d538f6beac --- /dev/null +++ b/tvix/tools/crunch-v2/src/main.rs @@ -0,0 +1,309 @@ +//! This is a tool for ingesting subsets of cache.nixos.org into its own flattened castore format. +//! Currently, produced chunks are not preserved, and this purely serves as a way of measuring +//! compression/deduplication ratios for various chunking and compression parameters. +//! +//! NARs to be ingested are read from `ingest.parquet`, and filtered by an SQL expression provided as a program argument. +//! The `file_hash` column should contain SHA-256 hashes of the compressed data, corresponding to the `FileHash` narinfo field. +//! The `compression` column should contain either `"bzip2"` or `"xz"`, corresponding to the `Compression` narinfo field. +//! Additional columns are ignored, but can be used by the SQL filter expression. +//! +//! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash. + +use crunch_v2::proto; + +mod remote; + +use anyhow::Result; +use clap::Parser; +use futures::{stream, StreamExt, TryStreamExt}; +use indicatif::{ProgressBar, ProgressStyle}; +use std::{ + io::{self, BufRead, Read, Write}, + path::PathBuf, + ptr, +}; + +use polars::{ + prelude::{col, LazyFrame, ScanArgsParquet}, + sql::sql_expr, +}; + +use fastcdc::v2020::{ChunkData, StreamCDC}; +use nix_compat::nar::reader as nar; + +use digest::Digest; +use prost::Message; +use sha2::Sha256; + +#[derive(Parser)] +struct Args { + /// Path to an existing parquet file. + /// The `file_hash` column should contain SHA-256 hashes of the compressed + /// data, corresponding to the `FileHash` narinfo field. + /// The `compression` column should contain either `"bzip2"` or `"xz"`, + /// corresponding to the `Compression` narinfo field. + /// Additional columns are ignored, but can be used by the SQL filter expression. + #[clap(long, default_value = "ingest.parquet")] + infile: PathBuf, + + /// Filter expression to filter elements in the parquet file for. + filter: String, + + /// Average chunk size for FastCDC, in KiB. + /// min value is half, max value double of that number. + #[clap(long, default_value_t = 256)] + avg_chunk_size: u32, + + /// Path to the sled database where results are written to (flatstore + /// protobufs, addressed by file hash). + #[clap(long, default_value = "crunch.db")] + outfile: PathBuf, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + let filter = sql_expr(args.filter)?; + let avg_chunk_size = args.avg_chunk_size * 1024; + + let df = LazyFrame::scan_parquet(&args.infile, ScanArgsParquet::default())? + .filter(filter) + .select([col("file_hash"), col("compression")]) + .drop_nulls(None) + .collect()?; + + let progress = ProgressBar::new(df.height() as u64).with_style(ProgressStyle::with_template( + "{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}", + )?); + + let file_hash = df + .column("file_hash")? + .binary()? + .into_iter() + .map(|h| -> [u8; 32] { h.unwrap().try_into().unwrap() }); + + let compression = df + .column("compression")? + .utf8()? + .into_iter() + .map(|c| c.unwrap()); + + let db: sled::Db = sled::open(args.outfile).unwrap(); + let files_tree = db.open_tree("files").unwrap(); + + let res = stream::iter(file_hash.zip(compression)) + .map(Ok) + .try_for_each_concurrent(Some(16), |(file_hash, compression)| { + let progress = progress.clone(); + let files_tree = files_tree.clone(); + async move { + if files_tree.contains_key(&file_hash)? { + progress.inc(1); + return Ok(()); + } + + let reader = remote::nar(file_hash, compression).await?; + + tokio::task::spawn_blocking(move || { + let mut reader = Sha256Reader::from(reader); + + let path = + ingest(nar::open(&mut reader)?, vec![], avg_chunk_size).map(|node| { + proto::Path { + nar_hash: reader.finalize().as_slice().into(), + node: Some(node), + } + })?; + + files_tree.insert(file_hash, path.encode_to_vec())?; + progress.inc(1); + + Ok::<_, anyhow::Error>(()) + }) + .await? + } + }) + .await; + + let flush = files_tree.flush_async().await; + + res?; + flush?; + + Ok(()) +} + +fn ingest(node: nar::Node, name: Vec<u8>, avg_chunk_size: u32) -> Result<proto::path::Node> { + match node { + nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode { + name, + target, + })), + + nar::Node::Directory(mut reader) => { + let mut directories = vec![]; + let mut files = vec![]; + let mut symlinks = vec![]; + + while let Some(node) = reader.next()? { + match ingest(node.node, node.name, avg_chunk_size)? { + proto::path::Node::Directory(node) => { + directories.push(node); + } + proto::path::Node::File(node) => { + files.push(node); + } + proto::path::Node::Symlink(node) => { + symlinks.push(node); + } + } + } + + Ok(proto::path::Node::Directory(proto::DirectoryNode { + name, + directories, + files, + symlinks, + })) + } + + nar::Node::File { executable, reader } => { + let mut reader = B3Reader::from(reader); + let mut chunks = vec![]; + + for chunk in StreamCDC::new( + &mut reader, + avg_chunk_size / 2, + avg_chunk_size, + avg_chunk_size * 2, + ) { + let ChunkData { + length: size, data, .. + } = chunk?; + + let hash = blake3::hash(&data); + let size_compressed = zstd_size(&data, 9); + + chunks.push(proto::Chunk { + hash: hash.as_bytes().as_slice().into(), + size: size.try_into().unwrap(), + size_compressed: size_compressed.try_into().unwrap(), + }); + } + + Ok(proto::path::Node::File(proto::FileNode { + name, + hash: reader.finalize().as_bytes().as_slice().into(), + chunks, + executable, + })) + } + } +} + +struct Sha256Reader<R> { + inner: R, + hasher: Sha256, + buf: *const [u8], +} + +const ZERO_BUF: *const [u8] = ptr::slice_from_raw_parts(1 as *const u8, 0); + +unsafe impl<R: Send> Send for Sha256Reader<R> {} + +impl<R> From<R> for Sha256Reader<R> { + fn from(value: R) -> Self { + Self { + inner: value, + hasher: Sha256::new(), + buf: ZERO_BUF, + } + } +} + +impl<R: Read> Read for Sha256Reader<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.buf = ZERO_BUF; + let n = self.inner.read(buf)?; + self.hasher.update(&buf[..n]); + Ok(n) + } +} + +impl<R: BufRead> BufRead for Sha256Reader<R> { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + self.buf = ZERO_BUF; + let buf = self.inner.fill_buf()?; + self.buf = buf as *const [u8]; + Ok(buf) + } + + fn consume(&mut self, amt: usize) { + // UNSAFETY: This assumes that `R::consume` doesn't invalidate the buffer. + // That's not a sound assumption in general, though it is likely to hold. + // TODO(edef): refactor this codebase to write a fresh NAR for verification purposes + // we already buffer full chunks, so there's no pressing need to reuse the input buffers + unsafe { + let (head, buf) = (*self.buf).split_at(amt); + self.buf = buf as *const [u8]; + self.hasher.update(head); + self.inner.consume(amt); + } + } +} + +impl<R> Sha256Reader<R> { + fn finalize(self) -> [u8; 32] { + self.hasher.finalize().into() + } +} + +struct B3Reader<R> { + inner: R, + hasher: blake3::Hasher, +} + +impl<R> From<R> for B3Reader<R> { + fn from(value: R) -> Self { + Self { + inner: value, + hasher: blake3::Hasher::new(), + } + } +} + +impl<R: Read> Read for B3Reader<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let n = self.inner.read(buf)?; + self.hasher.update(&buf[..n]); + Ok(n) + } +} + +impl<R> B3Reader<R> { + fn finalize(self) -> blake3::Hash { + self.hasher.finalize() + } +} + +fn zstd_size(data: &[u8], level: i32) -> u64 { + let mut w = zstd::Encoder::new(CountingWriter::default(), level).unwrap(); + w.write_all(&data).unwrap(); + let CountingWriter(size) = w.finish().unwrap(); + size +} + +#[derive(Default)] +struct CountingWriter(u64); + +impl Write for CountingWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0 += buf.len() as u64; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} diff --git a/tvix/tools/crunch-v2/src/remote.rs b/tvix/tools/crunch-v2/src/remote.rs new file mode 100644 index 000000000000..93952ecd737f --- /dev/null +++ b/tvix/tools/crunch-v2/src/remote.rs @@ -0,0 +1,211 @@ +use std::{ + cmp, + io::{self, BufRead, BufReader, Read}, + pin::Pin, + task::{self, Poll}, +}; + +use anyhow::{bail, Result}; +use bytes::{Buf, Bytes}; +use futures::{future::BoxFuture, Future, FutureExt, Stream, StreamExt}; +use lazy_static::lazy_static; +use tokio::runtime::Handle; + +use nix_compat::nixbase32; + +use rusoto_core::{ByteStream, Region}; +use rusoto_s3::{GetObjectOutput, GetObjectRequest, S3Client, S3}; + +use bzip2::read::BzDecoder; +use xz2::read::XzDecoder; + +lazy_static! { + static ref S3_CLIENT: S3Client = S3Client::new(Region::UsEast1); +} + +const BUCKET: &str = "nix-cache"; + +pub async fn nar( + file_hash: [u8; 32], + compression: &str, +) -> Result<Box<BufReader<dyn Read + Send>>> { + let (extension, decompress): (&'static str, fn(_) -> Box<_>) = match compression { + "bzip2" => ("bz2", decompress_bz2), + "xz" => ("xz", decompress_xz), + _ => bail!("unknown compression: {compression}"), + }; + + Ok(decompress( + FileStream::new(FileKey { + file_hash, + extension, + }) + .await? + .into(), + )) +} + +fn decompress_xz(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> { + Box::new(BufReader::new(XzDecoder::new(reader))) +} + +fn decompress_bz2(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> { + Box::new(BufReader::new(BzDecoder::new(reader))) +} + +struct FileStreamReader { + inner: FileStream, + buffer: Bytes, +} + +impl From<FileStream> for FileStreamReader { + fn from(value: FileStream) -> Self { + FileStreamReader { + inner: value, + buffer: Bytes::new(), + } + } +} + +impl Read for FileStreamReader { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + let src = self.fill_buf()?; + let n = cmp::min(src.len(), dst.len()); + dst[..n].copy_from_slice(&src[..n]); + self.consume(n); + Ok(n) + } +} + +impl BufRead for FileStreamReader { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + if !self.buffer.is_empty() { + return Ok(&self.buffer); + } + + self.buffer = Handle::current() + .block_on(self.inner.next()) + .transpose()? + .unwrap_or_default(); + + Ok(&self.buffer) + } + + fn consume(&mut self, cnt: usize) { + self.buffer.advance(cnt); + } +} + +struct FileKey { + file_hash: [u8; 32], + extension: &'static str, +} + +impl FileKey { + fn get( + &self, + offset: u64, + e_tag: Option<&str>, + ) -> impl Future<Output = io::Result<GetObjectOutput>> + Send + 'static { + let input = GetObjectRequest { + bucket: BUCKET.to_string(), + key: format!( + "nar/{}.nar.{}", + nixbase32::encode(&self.file_hash), + self.extension + ), + if_match: e_tag.map(str::to_owned), + range: Some(format!("bytes {}-", offset + 1)), + ..Default::default() + }; + + async { + S3_CLIENT + .get_object(input) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + } + } +} + +struct FileStream { + key: FileKey, + e_tag: String, + offset: u64, + length: u64, + inner: FileStreamState, +} + +enum FileStreamState { + Response(BoxFuture<'static, io::Result<GetObjectOutput>>), + Body(ByteStream), + Eof, +} + +impl FileStream { + pub async fn new(key: FileKey) -> io::Result<Self> { + let resp = key.get(0, None).await?; + + Ok(FileStream { + key, + e_tag: resp.e_tag.unwrap(), + offset: 0, + length: resp.content_length.unwrap().try_into().unwrap(), + inner: FileStreamState::Body(resp.body.unwrap()), + }) + } +} + +macro_rules! poll { + ($expr:expr) => { + match $expr { + Poll::Pending => { + return Poll::Pending; + } + Poll::Ready(value) => value, + } + }; +} + +impl Stream for FileStream { + type Item = io::Result<Bytes>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> { + let this = self.get_mut(); + + let chunk = loop { + match &mut this.inner { + FileStreamState::Response(resp) => match poll!(resp.poll_unpin(cx)) { + Err(err) => { + this.inner = FileStreamState::Eof; + return Poll::Ready(Some(Err(err))); + } + Ok(resp) => { + this.inner = FileStreamState::Body(resp.body.unwrap()); + } + }, + FileStreamState::Body(body) => match poll!(body.poll_next_unpin(cx)) { + None | Some(Err(_)) => { + this.inner = FileStreamState::Response( + this.key.get(this.offset, Some(&this.e_tag)).boxed(), + ); + } + Some(Ok(chunk)) => { + break chunk; + } + }, + FileStreamState::Eof => { + return Poll::Ready(None); + } + } + }; + + this.offset += chunk.len() as u64; + + if this.offset >= this.length { + this.inner = FileStreamState::Eof; + } + + Poll::Ready(Some(Ok(chunk))) + } +} |