about summary refs log tree commit diff
path: root/users/edef/weave/src/main.rs
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2024-10-19T19·23+0000
committeredef <edef@edef.eu>2024-10-20T21·22+0000
commit84a82f6f41b422f7f76e44e3705ee509b6c6eaf6 (patch)
treee9db1e7d323cd4be497d13632f8c6ad772736053 /users/edef/weave/src/main.rs
parentb3f0e25fbc29278381bdb4408285aa8b7d345d50 (diff)
feat(users/edef/weave): use tracing_indicatif for progress r/8849
Progress bars :3

Change-Id: I770d0f8381521b6efc8b38c0db4d59c771887fee
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12673
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'users/edef/weave/src/main.rs')
-rw-r--r--users/edef/weave/src/main.rs172
1 files changed, 106 insertions, 66 deletions
diff --git a/users/edef/weave/src/main.rs b/users/edef/weave/src/main.rs
index b86992c279d1..b1dd27d8c1af 100644
--- a/users/edef/weave/src/main.rs
+++ b/users/edef/weave/src/main.rs
@@ -15,6 +15,8 @@ use std::{
     ops::Index,
     sync::atomic::{AtomicU32, Ordering},
 };
+use tracing::{info_span, warn};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 
 use polars::{
     datatypes::StaticArray,
@@ -23,36 +25,48 @@ use polars::{
     prelude::*,
 };
 
-use weave::{as_fixed_binary, hash64, DONE, INDEX_NULL};
+use weave::{as_fixed_binary, hash64, INDEX_NULL};
 
+#[tracing::instrument]
 fn main() -> Result<()> {
-    eprint!("… parse roots\r");
-    let roots: PathSet32 = as_fixed_binary::<20>(
-        LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())?
-            .explode([col("store_path_hash")])
-            .select([col("store_path_hash")])
-            .collect()?
-            .column("store_path_hash")?
-            .binary()?,
-    )
-    .flatten()
-    .collect();
-    eprintln!("{DONE}");
+    let _tracing = tvix_tracing::TracingBuilder::default()
+        .enable_progressbar()
+        .build()?;
+
+    let roots: PathSet32 = {
+        let span = info_span!("parse_roots", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("parse roots");
+        span.pb_start();
+
+        as_fixed_binary::<20>(
+            LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())?
+                .explode([col("store_path_hash")])
+                .select([col("store_path_hash")])
+                .collect()?
+                .column("store_path_hash")?
+                .binary()?,
+        )
+        .flatten()
+        .collect()
+    };
 
     {
-        let ph_array = weave::load_ph_array()?;
-
-        eprint!("… resolve roots\r");
-        ph_array.par_iter().enumerate().for_each(|(idx, h)| {
-            if let Some(idx_slot) = roots.find(h) {
-                assert_eq!(
-                    idx_slot.swap(idx as u32, Ordering::Relaxed),
-                    INDEX_NULL,
-                    "duplicate entry"
-                );
-            }
-        });
-        eprintln!("{DONE}");
+        let span = info_span!("resolve_roots", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("resolve roots");
+        span.pb_start();
+
+        weave::load_ph_array()?
+            .into_par_iter()
+            .enumerate()
+            .for_each(|(idx, h)| {
+                if let Some(idx_slot) = roots.find(h) {
+                    assert_eq!(
+                        idx_slot.swap(idx as u32, Ordering::Relaxed),
+                        INDEX_NULL,
+                        "duplicate entry"
+                    );
+                }
+            });
     }
 
     let mut todo = FxHashSet::default();
@@ -67,17 +81,28 @@ fn main() -> Result<()> {
             }
             todo.insert(idx);
         }
-        println!("skipping {unknown_roots} unknown roots");
-    }
 
-    eprint!("… load reference_idxs\r");
-    let ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
-        .finish()?
-        .column("reference_idxs")?
-        .list()?
-        .clone();
+        if unknown_roots != 0 {
+            warn!("skipping {unknown_roots} unknown roots");
+        }
+    }
 
+    let ri_array;
     let ri_array = {
+        let span = info_span!(
+            "load_reference_idxs",
+            indicatif.pb_show = tracing::field::Empty
+        )
+        .entered();
+        span.pb_set_message("load reference_idxs");
+        span.pb_start();
+
+        ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
+            .finish()?
+            .column("reference_idxs")?
+            .list()?
+            .clone();
+
         ChunkedList::new(ri_array.downcast_iter().map(|chunk| {
             (
                 chunk.offsets(),
@@ -91,48 +116,63 @@ fn main() -> Result<()> {
             )
         }))
     };
-    eprintln!("{DONE}");
 
     let mut seen = todo.clone();
-    while !todo.is_empty() {
-        println!("todo: {} seen: {}", todo.len(), seen.len());
-
-        todo = todo
-            .par_iter()
-            .flat_map(|&parent| {
-                if parent == INDEX_NULL {
-                    return FxHashSet::default();
-                }
+    {
+        let span = info_span!("mark", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("marking");
+        span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+
+        while !todo.is_empty() {
+            span.pb_set_length(seen.len() as u64);
+            span.pb_set_position(seen.len().saturating_sub(todo.len()) as u64);
+
+            todo = todo
+                .par_iter()
+                .flat_map(|&parent| {
+                    if parent == INDEX_NULL {
+                        return FxHashSet::default();
+                    }
+
+                    ri_array[parent as usize]
+                        .iter()
+                        .cloned()
+                        .filter(|child| !seen.contains(child))
+                        .collect::<FxHashSet<u32>>()
+                })
+                .collect();
+
+            for &index in &todo {
+                seen.insert(index);
+            }
+        }
 
-                ri_array[parent as usize]
-                    .iter()
-                    .cloned()
-                    .filter(|child| !seen.contains(child))
-                    .collect::<FxHashSet<u32>>()
-            })
-            .collect();
+        span.pb_set_length(seen.len() as u64);
+        span.pb_set_position(seen.len() as u64);
 
-        for &index in &todo {
-            seen.insert(index);
+        if seen.remove(&INDEX_NULL) {
+            warn!("WARNING: missing edges");
         }
     }
 
-    println!("done: {} paths", seen.len());
+    let seen = {
+        let span = info_span!("gather_live", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("gathering live set");
 
-    if seen.remove(&INDEX_NULL) {
-        println!("WARNING: missing edges");
-    }
+        let mut seen: Vec<u32> = seen.into_iter().collect();
+        seen.par_sort();
+        seen
+    };
 
-    eprint!("… gathering live set\r");
-    let mut seen: Vec<u32> = seen.into_iter().collect();
-    seen.par_sort();
-    eprintln!("{DONE}");
+    {
+        let span = info_span!("write_output", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("writing output");
+        span.pb_start();
 
-    eprint!("… writing output\r");
-    ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
-        "live_idx" => seen,
-    }?)?;
-    eprintln!("{DONE}");
+        ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
+            "live_idx" => seen,
+        }?)?;
+    }
 
     Ok(())
 }