about summary refs log tree commit diff
path: root/tvix/tools/crunch-v2/src
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-01-25T12·47+0200
committerclbot <clbot@tvl.fyi>2024-01-27T18·40+0000
commitb38be028d96ce107439f3323026270228a871a13 (patch)
tree991bc6341714a3896645240309d88dc20784f90e /tvix/tools/crunch-v2/src
parent4f22203a3aecd070881ae9b4eabc47532d948f01 (diff)
feat(tvix/tools/crunch-v2): add CLI args r/7453
Use clap derive to make the input and output files configurable, as well
as the chunk size parameters.

Change-Id: I02b29126f3bd2c13ba2c6e7e0aa4ff048ff803ed
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10691
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: edef <edef@edef.eu>
Diffstat (limited to 'tvix/tools/crunch-v2/src')
-rw-r--r--tvix/tools/crunch-v2/src/bin/extract.rs34
-rw-r--r--tvix/tools/crunch-v2/src/lib.rs7
-rw-r--r--tvix/tools/crunch-v2/src/main.rs71
3 files changed, 80 insertions, 32 deletions
diff --git a/tvix/tools/crunch-v2/src/bin/extract.rs b/tvix/tools/crunch-v2/src/bin/extract.rs
index 8da8df707a0e..416d201f4e04 100644
--- a/tvix/tools/crunch-v2/src/bin/extract.rs
+++ b/tvix/tools/crunch-v2/src/bin/extract.rs
@@ -5,13 +5,12 @@
 //! They are concatenated without any additional structure, so nothing but the chunk list is preserved.
 
 use anyhow::Result;
+use clap::Parser;
 use indicatif::{ProgressBar, ProgressStyle};
 use std::fs::File;
+use std::path::PathBuf;
 
-use crunch_v2::{
-    proto::{self, path::Node},
-    FILES,
-};
+use crunch_v2::proto::{self, path::Node};
 use prost::Message;
 
 use polars::{
@@ -23,15 +22,32 @@ use polars::{
     series::IntoSeries,
 };
 
+#[derive(Parser)]
+struct Args {
+    /// Path to the sled database that's read from.
+    #[clap(default_value = "crunch.db")]
+    infile: PathBuf,
+
+    /// Path to the resulting parquet file that's written.
+    #[clap(default_value = "crunch.parquet")]
+    outfile: PathBuf,
+}
+
 fn main() -> Result<()> {
-    let w = ParquetWriter::new(File::create("crunch.parquet")?);
+    let args = Args::parse();
+
+    let w = ParquetWriter::new(File::create(args.outfile)?);
+
+    let db: sled::Db = sled::open(&args.infile).unwrap();
+    let files_tree: sled::Tree = db.open_tree("files").unwrap();
 
-    let progress = ProgressBar::new(FILES.len() as u64).with_style(ProgressStyle::with_template(
-        "{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
-    )?);
+    let progress =
+        ProgressBar::new(files_tree.len() as u64).with_style(ProgressStyle::with_template(
+            "{elapsed_precise}/{duration_precise} {wide_bar} {pos}/{len}",
+        )?);
 
     let mut frame = FrameBuilder::new();
-    for entry in &*FILES {
+    for entry in &files_tree {
         let (file_hash, pb) = entry?;
         frame.push(
             file_hash[..].try_into().unwrap(),
diff --git a/tvix/tools/crunch-v2/src/lib.rs b/tvix/tools/crunch-v2/src/lib.rs
index 0f84e84f1772..09ea2e75d5a3 100644
--- a/tvix/tools/crunch-v2/src/lib.rs
+++ b/tvix/tools/crunch-v2/src/lib.rs
@@ -1,10 +1,3 @@
-use lazy_static::lazy_static;
-
 pub mod proto {
     include!(concat!(env!("OUT_DIR"), "/tvix.flatstore.v1.rs"));
 }
-
-lazy_static! {
-    static ref DB: sled::Db = sled::open("crunch.db").unwrap();
-    pub static ref FILES: sled::Tree = DB.open_tree("files").unwrap();
-}
diff --git a/tvix/tools/crunch-v2/src/main.rs b/tvix/tools/crunch-v2/src/main.rs
index 2aa2939fa1b4..a5d538f6beac 100644
--- a/tvix/tools/crunch-v2/src/main.rs
+++ b/tvix/tools/crunch-v2/src/main.rs
@@ -9,16 +9,17 @@
 //!
 //! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash.
 
-use crunch_v2::{proto, FILES};
+use crunch_v2::proto;
 
 mod remote;
 
 use anyhow::Result;
+use clap::Parser;
 use futures::{stream, StreamExt, TryStreamExt};
 use indicatif::{ProgressBar, ProgressStyle};
 use std::{
-    env,
     io::{self, BufRead, Read, Write},
+    path::PathBuf,
     ptr,
 };
 
@@ -34,13 +35,39 @@ use digest::Digest;
 use prost::Message;
 use sha2::Sha256;
 
+#[derive(Parser)]
+struct Args {
+    /// Path to an existing parquet file.
+    /// The `file_hash` column should contain SHA-256 hashes of the compressed
+    /// data, corresponding to the `FileHash` narinfo field.
+    /// The `compression` column should contain either `"bzip2"` or `"xz"`,
+    /// corresponding to the `Compression` narinfo field.
+    /// Additional columns are ignored, but can be used by the SQL filter expression.
+    #[clap(long, default_value = "ingest.parquet")]
+    infile: PathBuf,
+
+    /// Filter expression to filter elements in the parquet file for.
+    filter: String,
+
+    /// Average chunk size for FastCDC, in KiB.
+    /// min value is half, max value double of that number.
+    #[clap(long, default_value_t = 256)]
+    avg_chunk_size: u32,
+
+    /// Path to the sled database where results are written to (flatstore
+    /// protobufs, addressed by file hash).
+    #[clap(long, default_value = "crunch.db")]
+    outfile: PathBuf,
+}
+
 #[tokio::main]
 async fn main() -> Result<()> {
-    let mut args = env::args();
-    args.next().unwrap();
+    let args = Args::parse();
 
-    let filter = sql_expr(args.next().unwrap())?;
-    let df = LazyFrame::scan_parquet("ingest.parquet", ScanArgsParquet::default())?
+    let filter = sql_expr(args.filter)?;
+    let avg_chunk_size = args.avg_chunk_size * 1024;
+
+    let df = LazyFrame::scan_parquet(&args.infile, ScanArgsParquet::default())?
         .filter(filter)
         .select([col("file_hash"), col("compression")])
         .drop_nulls(None)
@@ -62,12 +89,16 @@ async fn main() -> Result<()> {
         .into_iter()
         .map(|c| c.unwrap());
 
+    let db: sled::Db = sled::open(args.outfile).unwrap();
+    let files_tree = db.open_tree("files").unwrap();
+
     let res = stream::iter(file_hash.zip(compression))
         .map(Ok)
         .try_for_each_concurrent(Some(16), |(file_hash, compression)| {
             let progress = progress.clone();
+            let files_tree = files_tree.clone();
             async move {
-                if FILES.contains_key(&file_hash)? {
+                if files_tree.contains_key(&file_hash)? {
                     progress.inc(1);
                     return Ok(());
                 }
@@ -77,12 +108,15 @@ async fn main() -> Result<()> {
                 tokio::task::spawn_blocking(move || {
                     let mut reader = Sha256Reader::from(reader);
 
-                    let path = ingest(nar::open(&mut reader)?, vec![]).map(|node| proto::Path {
-                        nar_hash: reader.finalize().as_slice().into(),
-                        node: Some(node),
-                    })?;
+                    let path =
+                        ingest(nar::open(&mut reader)?, vec![], avg_chunk_size).map(|node| {
+                            proto::Path {
+                                nar_hash: reader.finalize().as_slice().into(),
+                                node: Some(node),
+                            }
+                        })?;
 
-                    FILES.insert(file_hash, path.encode_to_vec())?;
+                    files_tree.insert(file_hash, path.encode_to_vec())?;
                     progress.inc(1);
 
                     Ok::<_, anyhow::Error>(())
@@ -92,7 +126,7 @@ async fn main() -> Result<()> {
         })
         .await;
 
-    let flush = crunch_v2::FILES.flush_async().await;
+    let flush = files_tree.flush_async().await;
 
     res?;
     flush?;
@@ -100,7 +134,7 @@ async fn main() -> Result<()> {
     Ok(())
 }
 
-fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
+fn ingest(node: nar::Node, name: Vec<u8>, avg_chunk_size: u32) -> Result<proto::path::Node> {
     match node {
         nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode {
             name,
@@ -113,7 +147,7 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
             let mut symlinks = vec![];
 
             while let Some(node) = reader.next()? {
-                match ingest(node.node, node.name)? {
+                match ingest(node.node, node.name, avg_chunk_size)? {
                     proto::path::Node::Directory(node) => {
                         directories.push(node);
                     }
@@ -138,7 +172,12 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
             let mut reader = B3Reader::from(reader);
             let mut chunks = vec![];
 
-            for chunk in StreamCDC::new(&mut reader, 1 << 17, 1 << 18, 1 << 19) {
+            for chunk in StreamCDC::new(
+                &mut reader,
+                avg_chunk_size / 2,
+                avg_chunk_size,
+                avg_chunk_size * 2,
+            ) {
                 let ChunkData {
                     length: size, data, ..
                 } = chunk?;