diff options
Diffstat (limited to 'tvix/tools/crunch-v2/src/main.rs')
-rw-r--r-- | tvix/tools/crunch-v2/src/main.rs | 71 |
1 files changed, 55 insertions, 16 deletions
diff --git a/tvix/tools/crunch-v2/src/main.rs b/tvix/tools/crunch-v2/src/main.rs index 2aa2939fa1b4..a5d538f6beac 100644 --- a/tvix/tools/crunch-v2/src/main.rs +++ b/tvix/tools/crunch-v2/src/main.rs @@ -9,16 +9,17 @@ //! //! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash. -use crunch_v2::{proto, FILES}; +use crunch_v2::proto; mod remote; use anyhow::Result; +use clap::Parser; use futures::{stream, StreamExt, TryStreamExt}; use indicatif::{ProgressBar, ProgressStyle}; use std::{ - env, io::{self, BufRead, Read, Write}, + path::PathBuf, ptr, }; @@ -34,13 +35,39 @@ use digest::Digest; use prost::Message; use sha2::Sha256; +#[derive(Parser)] +struct Args { + /// Path to an existing parquet file. + /// The `file_hash` column should contain SHA-256 hashes of the compressed + /// data, corresponding to the `FileHash` narinfo field. + /// The `compression` column should contain either `"bzip2"` or `"xz"`, + /// corresponding to the `Compression` narinfo field. + /// Additional columns are ignored, but can be used by the SQL filter expression. + #[clap(long, default_value = "ingest.parquet")] + infile: PathBuf, + + /// Filter expression to filter elements in the parquet file for. + filter: String, + + /// Average chunk size for FastCDC, in KiB. + /// min value is half, max value double of that number. + #[clap(long, default_value_t = 256)] + avg_chunk_size: u32, + + /// Path to the sled database where results are written to (flatstore + /// protobufs, addressed by file hash). + #[clap(long, default_value = "crunch.db")] + outfile: PathBuf, +} + #[tokio::main] async fn main() -> Result<()> { - let mut args = env::args(); - args.next().unwrap(); + let args = Args::parse(); - let filter = sql_expr(args.next().unwrap())?; - let df = LazyFrame::scan_parquet("ingest.parquet", ScanArgsParquet::default())? + let filter = sql_expr(args.filter)?; + let avg_chunk_size = args.avg_chunk_size * 1024; + + let df = LazyFrame::scan_parquet(&args.infile, ScanArgsParquet::default())? .filter(filter) .select([col("file_hash"), col("compression")]) .drop_nulls(None) @@ -62,12 +89,16 @@ async fn main() -> Result<()> { .into_iter() .map(|c| c.unwrap()); + let db: sled::Db = sled::open(args.outfile).unwrap(); + let files_tree = db.open_tree("files").unwrap(); + let res = stream::iter(file_hash.zip(compression)) .map(Ok) .try_for_each_concurrent(Some(16), |(file_hash, compression)| { let progress = progress.clone(); + let files_tree = files_tree.clone(); async move { - if FILES.contains_key(&file_hash)? { + if files_tree.contains_key(&file_hash)? { progress.inc(1); return Ok(()); } @@ -77,12 +108,15 @@ async fn main() -> Result<()> { tokio::task::spawn_blocking(move || { let mut reader = Sha256Reader::from(reader); - let path = ingest(nar::open(&mut reader)?, vec![]).map(|node| proto::Path { - nar_hash: reader.finalize().as_slice().into(), - node: Some(node), - })?; + let path = + ingest(nar::open(&mut reader)?, vec![], avg_chunk_size).map(|node| { + proto::Path { + nar_hash: reader.finalize().as_slice().into(), + node: Some(node), + } + })?; - FILES.insert(file_hash, path.encode_to_vec())?; + files_tree.insert(file_hash, path.encode_to_vec())?; progress.inc(1); Ok::<_, anyhow::Error>(()) @@ -92,7 +126,7 @@ async fn main() -> Result<()> { }) .await; - let flush = crunch_v2::FILES.flush_async().await; + let flush = files_tree.flush_async().await; res?; flush?; @@ -100,7 +134,7 @@ async fn main() -> Result<()> { Ok(()) } -fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> { +fn ingest(node: nar::Node, name: Vec<u8>, avg_chunk_size: u32) -> Result<proto::path::Node> { match node { nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode { name, @@ -113,7 +147,7 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> { let mut symlinks = vec![]; while let Some(node) = reader.next()? { - match ingest(node.node, node.name)? { + match ingest(node.node, node.name, avg_chunk_size)? { proto::path::Node::Directory(node) => { directories.push(node); } @@ -138,7 +172,12 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> { let mut reader = B3Reader::from(reader); let mut chunks = vec![]; - for chunk in StreamCDC::new(&mut reader, 1 << 17, 1 << 18, 1 << 19) { + for chunk in StreamCDC::new( + &mut reader, + avg_chunk_size / 2, + avg_chunk_size, + avg_chunk_size * 2, + ) { let ChunkData { length: size, data, .. } = chunk?; |