about summary refs log tree commit diff
path: root/tvix/tools/crunch-v2/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/tools/crunch-v2/src/main.rs')
-rw-r--r--tvix/tools/crunch-v2/src/main.rs71
1 files changed, 55 insertions, 16 deletions
diff --git a/tvix/tools/crunch-v2/src/main.rs b/tvix/tools/crunch-v2/src/main.rs
index 2aa2939fa1b4..a5d538f6beac 100644
--- a/tvix/tools/crunch-v2/src/main.rs
+++ b/tvix/tools/crunch-v2/src/main.rs
@@ -9,16 +9,17 @@
 //!
 //! flatstore protobufs are written to a sled database named `crunch.db`, addressed by file hash.
 
-use crunch_v2::{proto, FILES};
+use crunch_v2::proto;
 
 mod remote;
 
 use anyhow::Result;
+use clap::Parser;
 use futures::{stream, StreamExt, TryStreamExt};
 use indicatif::{ProgressBar, ProgressStyle};
 use std::{
-    env,
     io::{self, BufRead, Read, Write},
+    path::PathBuf,
     ptr,
 };
 
@@ -34,13 +35,39 @@ use digest::Digest;
 use prost::Message;
 use sha2::Sha256;
 
+#[derive(Parser)]
+struct Args {
+    /// Path to an existing parquet file.
+    /// The `file_hash` column should contain SHA-256 hashes of the compressed
+    /// data, corresponding to the `FileHash` narinfo field.
+    /// The `compression` column should contain either `"bzip2"` or `"xz"`,
+    /// corresponding to the `Compression` narinfo field.
+    /// Additional columns are ignored, but can be used by the SQL filter expression.
+    #[clap(long, default_value = "ingest.parquet")]
+    infile: PathBuf,
+
+    /// Filter expression to filter elements in the parquet file for.
+    filter: String,
+
+    /// Average chunk size for FastCDC, in KiB.
+    /// min value is half, max value double of that number.
+    #[clap(long, default_value_t = 256)]
+    avg_chunk_size: u32,
+
+    /// Path to the sled database where results are written to (flatstore
+    /// protobufs, addressed by file hash).
+    #[clap(long, default_value = "crunch.db")]
+    outfile: PathBuf,
+}
+
 #[tokio::main]
 async fn main() -> Result<()> {
-    let mut args = env::args();
-    args.next().unwrap();
+    let args = Args::parse();
 
-    let filter = sql_expr(args.next().unwrap())?;
-    let df = LazyFrame::scan_parquet("ingest.parquet", ScanArgsParquet::default())?
+    let filter = sql_expr(args.filter)?;
+    let avg_chunk_size = args.avg_chunk_size * 1024;
+
+    let df = LazyFrame::scan_parquet(&args.infile, ScanArgsParquet::default())?
         .filter(filter)
         .select([col("file_hash"), col("compression")])
         .drop_nulls(None)
@@ -62,12 +89,16 @@ async fn main() -> Result<()> {
         .into_iter()
         .map(|c| c.unwrap());
 
+    let db: sled::Db = sled::open(args.outfile).unwrap();
+    let files_tree = db.open_tree("files").unwrap();
+
     let res = stream::iter(file_hash.zip(compression))
         .map(Ok)
         .try_for_each_concurrent(Some(16), |(file_hash, compression)| {
             let progress = progress.clone();
+            let files_tree = files_tree.clone();
             async move {
-                if FILES.contains_key(&file_hash)? {
+                if files_tree.contains_key(&file_hash)? {
                     progress.inc(1);
                     return Ok(());
                 }
@@ -77,12 +108,15 @@ async fn main() -> Result<()> {
                 tokio::task::spawn_blocking(move || {
                     let mut reader = Sha256Reader::from(reader);
 
-                    let path = ingest(nar::open(&mut reader)?, vec![]).map(|node| proto::Path {
-                        nar_hash: reader.finalize().as_slice().into(),
-                        node: Some(node),
-                    })?;
+                    let path =
+                        ingest(nar::open(&mut reader)?, vec![], avg_chunk_size).map(|node| {
+                            proto::Path {
+                                nar_hash: reader.finalize().as_slice().into(),
+                                node: Some(node),
+                            }
+                        })?;
 
-                    FILES.insert(file_hash, path.encode_to_vec())?;
+                    files_tree.insert(file_hash, path.encode_to_vec())?;
                     progress.inc(1);
 
                     Ok::<_, anyhow::Error>(())
@@ -92,7 +126,7 @@ async fn main() -> Result<()> {
         })
         .await;
 
-    let flush = crunch_v2::FILES.flush_async().await;
+    let flush = files_tree.flush_async().await;
 
     res?;
     flush?;
@@ -100,7 +134,7 @@ async fn main() -> Result<()> {
     Ok(())
 }
 
-fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
+fn ingest(node: nar::Node, name: Vec<u8>, avg_chunk_size: u32) -> Result<proto::path::Node> {
     match node {
         nar::Node::Symlink { target } => Ok(proto::path::Node::Symlink(proto::SymlinkNode {
             name,
@@ -113,7 +147,7 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
             let mut symlinks = vec![];
 
             while let Some(node) = reader.next()? {
-                match ingest(node.node, node.name)? {
+                match ingest(node.node, node.name, avg_chunk_size)? {
                     proto::path::Node::Directory(node) => {
                         directories.push(node);
                     }
@@ -138,7 +172,12 @@ fn ingest(node: nar::Node, name: Vec<u8>) -> Result<proto::path::Node> {
             let mut reader = B3Reader::from(reader);
             let mut chunks = vec![];
 
-            for chunk in StreamCDC::new(&mut reader, 1 << 17, 1 << 18, 1 << 19) {
+            for chunk in StreamCDC::new(
+                &mut reader,
+                avg_chunk_size / 2,
+                avg_chunk_size,
+                avg_chunk_size * 2,
+            ) {
                 let ChunkData {
                     length: size, data, ..
                 } = chunk?;