diff options
author | Florian Klink <flokli@flokli.de> | 2024-01-25T12·47+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-01-27T18·40+0000 |
commit | b38be028d96ce107439f3323026270228a871a13 (patch) | |
tree | 991bc6341714a3896645240309d88dc20784f90e /tvix/tools/crunch-v2/src | |
parent | 4f22203a3aecd070881ae9b4eabc47532d948f01 (diff) |
feat(tvix/tools/crunch-v2): add CLI args r/7453
Use clap derive to make the input and output files configurable, as well as the chunk size parameters. Change-Id: I02b29126f3bd2c13ba2c6e7e0aa4ff048ff803ed Reviewed-on: https://cl.tvl.fyi/c/depot/+/10691 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: edef <edef@edef.eu>
Diffstat (limited to 'tvix/tools/crunch-v2/src')
-rw-r--r-- | tvix/tools/crunch-v2/src/bin/extract.rs | 34 | ||||
-rw-r--r-- | tvix/tools/crunch-v2/src/lib.rs | 7 | ||||
-rw-r--r-- | tvix/tools/crunch-v2/src/main.rs | 71 |
3 files changed, 80 insertions, 32 deletions
diff --git a/tvix/tools/crunch-v2/src/bin/extract.rs b/tvix/tools/crunch-v2/src/bin/extract.rs index 8da8df707a0e..416d201f4e04 100644 --- a/tvix/tools/crunch-v2/src/bin/extract.rs +++ b/tvix/tools/crunch-v2/src/bin/extract.rs @@ -5,13 +5,12 @@ //! They are concatenated without any additional structure, so nothing but the chunk list is preserved. use anyhow::Result; +use clap::Parser; use indicatif::{ProgressBar, ProgressStyle}; use std::fs::File; +use std::path::PathBuf; -use crunch_v2::{ - proto::{self, path::Node}, - FILES, -}; +use crunch_v2::proto::{self, path::Node}; use prost::Message; use polars::{ @@ -23,15 +22,32 @@ use polars::{ series::IntoSeries, }; +#[derive(Parser)] +struct Args { + /// Path to the sled database that's read from. + #[clap(default_value = "crunch.db")] + infile: PathBuf, + + /// Path to the resulting parquet file that's written. + #[clap(default_value = "crunch.parquet")] + outfile: PathBuf, +} + fn main() -> Result<()> { - let w = ParquetWriter::new(File::create("crunch.parquet")?); + let args = Args::parse(); + + let w = ParquetWriter::new(File::create(args.outfile)?); + + let db: sled::Db = sled::open(&args.infile).unwrap(); + let files_tree: sled::Tree = db.open_tree("files").unwrap(); - let progress = ProgressBar::new(FILES.len() as u64).with_style(ProgressStyle::with_template( - "{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}", - )?); + let progress = + ProgressBar::new(files_tree.len() as u64).with_style(ProgressStyle::with_template( + "{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}", + )?); let mut frame = FrameBuilder::new(); - for entry in &*FILES { + for entry in &files_tree { let (file_hash, pb) = entry?; frame.push( file_hash[..].try_into().unwrap(), diff --git a/tvix/tools/crunch-v2/src/lib.rs b/tvix/tools/crunch-v2/src/lib.rs index 0f84e84f1772..09ea2e75d5a3 100644 --- a/tvix/tools/crunch-v2/src/lib.rs +++ b/tvix/tools/crunch-v2/src/lib.rs @@ -1,10 +1,3 @@ -use lazy_static::lazy_static; - pub mod proto { include!(concat!(env!("OUT_DIR"), "/tvix.flatstore.v1.rs")); } - -lazy_static! { - static ref DB: sled::Db = sled::open("crunch.db").unwrap(); - pub static ref FILES: sled::Tree = DB.open_tree("files").unwrap(); -} diff --git a/tvix/tools/crunch-v2/src/main.rs b/tvix/tools/crunch-v2/src/main.rs index 2aa2939fa1b4..a5d538f6beac 100644 --- a/tvix/tools/crunch-v2/src/main.rs +++ b/tvix/tools/crunch-v2/src/main.rs @@ -9,16 +9,17 @@ //! //! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash. -use crunch_v2::{proto, FILES}; +use crunch_v2::proto; mod remote; use anyhow::Result; +use clap::Parser; use futures::{stream, StreamExt, TryStreamExt}; use indicatif::{ProgressBar, ProgressStyle}; use std::{ - env, io::{self, BufRead, Read, Write}, + path::PathBuf, ptr, }; @@ -34,13 +35,39 @@ use digest::Digest; use prost::Message; use sha2::Sha256; +#[derive(Parser)] +struct Args { + /// Path to an existing parquet file. + /// The `file_hash` column should contain SHA-256 hashes of the compressed + /// data, corresponding to the `FileHash` narinfo field. + /// The `compression` column should contain either `"bzip2"` or `"xz"`, + /// corresponding to the `Compression` narinfo field. + /// Additional columns are ignored, but can be used by the SQL filter expression. + #[clap(long, default_value = "ingest.parquet")] + infile: PathBuf, + + /// Filter expression to filter elements in the parquet file for. + filter: String, + + /// Average chunk size for FastCDC, in KiB. + /// min value is half, max value double of that number. + #[clap(long, default_value_t = 256)] + avg_chunk_size: u32, + + /// Path to the sled database where results are written to (flatstore + /// protobufs, addressed by file hash). + #[clap(long, default_value = "crunch.db")] + outfile: PathBuf, +} + #[tokio::main] async fn main() -> Result<()> { - let mut args = env::args(); - args.next().unwrap(); + let args = Args::parse(); - let filter = sql_expr(args.next().unwrap())?; - let df = LazyFrame::scan_parquet("ingest.parquet", ScanArgsParquet::default())? + let filter = sql_expr(args.filter)?; + let avg_chunk_size = args.avg_chunk_size * 1024; + + let df = LazyFrame::scan_parquet(&args.infile, ScanArgsParquet::default())? .filter(filter) .select([col("file_hash"), col("compression")]) .drop_nulls(None) @@ -62,12 +89,16 @@ async fn main() -> Result<()> { .into_iter() .map(|c| c.unwrap()); + let db: sled::Db = sled::open(args.outfile).unwrap(); + let files_tree = db.open_tree("files").unwrap(); + let res = stream::iter(file_hash.zip(compression)) .map(Ok) .try_for_each_concurrent(Some(16), |(file_hash, compression)| { let progress = progress.clone(); + let files_tree = files_tree.clone(); async move { - if FILES.contains_key(&file_hash)? { + if files_tree.contains_key(&file_hash)? { progress.inc(1); return Ok(()); } @@ -77,12 +108,15 @@ async fn main() -> Result<()> { tokio::task::spawn_blocking(move || { let mut reader = Sha256Reader::from(reader); - let path = ingest(nar::open(&mut reader)?, vec![]).map(|node| proto::Path { - nar_hash: reader.finalize().as_slice().into(), - node: Some(node), - })?; + let path = + ingest(nar::open(&mut reader)?, vec![], avg_chunk_size).map(|node| { + proto::Path { + nar_hash: reader.finalize().as_slice().into(), + node: Some(node), + } + })?; - FILES.insert(file_hash, path.encode_to_vec())?; + files_tree.insert(file_hash, path.encode_to_vec())?; progress.inc(1); Ok::<_, anyhow::Error>(()) @@ -92,7 +126,7 @@ async fn main() -> Result<()> { }) .await; - let flush = crunch_v2::FILES.flush_async().await; + let flush = files_tree.flush_async().await; res?; flush?; @@ -100,7 +134,7 @@ async fn main() -> Result<()> { Ok(()) } -fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> { +fn ingest(node: nar::Node, name: Vec<u8>, avg_chunk_size: u32) -> Result<proto::path::Node> { match node { nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode { name, @@ -113,7 +147,7 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> { let mut symlinks = vec![]; while let Some(node) = reader.next()? { - match ingest(node.node, node.name)? { + match ingest(node.node, node.name, avg_chunk_size)? { proto::path::Node::Directory(node) => { directories.push(node); } @@ -138,7 +172,12 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> { let mut reader = B3Reader::from(reader); let mut chunks = vec![]; - for chunk in StreamCDC::new(&mut reader, 1 << 17, 1 << 18, 1 << 19) { + for chunk in StreamCDC::new( + &mut reader, + avg_chunk_size / 2, + avg_chunk_size, + avg_chunk_size * 2, + ) { let ChunkData { length: size, data, .. } = chunk?; |