about summary refs log tree commit diff
path: root/users/edef/weave/src
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2024-10-19T19·23+0000
committeredef <edef@edef.eu>2024-10-20T21·22+0000
commit84a82f6f41b422f7f76e44e3705ee509b6c6eaf6 (patch)
treee9db1e7d323cd4be497d13632f8c6ad772736053 /users/edef/weave/src
parentb3f0e25fbc29278381bdb4408285aa8b7d345d50 (diff)
feat(users/edef/weave): use tracing_indicatif for progress r/8849
Progress bars :3

Change-Id: I770d0f8381521b6efc8b38c0db4d59c771887fee
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12673
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'users/edef/weave/src')
-rw-r--r--users/edef/weave/src/bin/swizzle.rs79
-rw-r--r--users/edef/weave/src/lib.rs10
-rw-r--r--users/edef/weave/src/main.rs172
3 files changed, 160 insertions, 101 deletions
diff --git a/users/edef/weave/src/bin/swizzle.rs b/users/edef/weave/src/bin/swizzle.rs
index bcea3edfada9..840a53454f57 100644
--- a/users/edef/weave/src/bin/swizzle.rs
+++ b/users/edef/weave/src/bin/swizzle.rs
@@ -36,17 +36,27 @@ use polars::{
     lazy::dsl::{col, SpecialEq},
     prelude::*,
 };
+use tracing::info_span;
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
 
-use weave::{as_fixed_binary, hash64, leak, load_ph_array, DONE, INDEX_NULL};
+use weave::{as_fixed_binary, hash64, leak, load_ph_array, INDEX_NULL};
 
+#[tracing::instrument]
 fn main() -> Result<()> {
+    let _tracing = tvix_tracing::TracingBuilder::default()
+        .enable_progressbar()
+        .build()?;
+
     let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?);
 
     // TODO(edef): re-parallelise this
     // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works.
     // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory
-    eprint!("… build index\r");
     let ph_map: &'static HashTable<(u64, u32)> = {
+        let span = info_span!("ph_map", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("build index");
+        span.pb_start();
+
         let mut ph_map = HashTable::with_capacity(ph_array.len());
 
         for (offset, item) in ph_array.iter().enumerate() {
@@ -57,7 +67,6 @@ fn main() -> Result<()> {
 
         &*Box::leak(Box::new(ph_map))
     };
-    eprintln!("{DONE}");
 
     let ph_to_idx = |key: &[u8; 20]| -> u32 {
         let hash = hash64(key);
@@ -69,35 +78,41 @@ fn main() -> Result<()> {
             .unwrap_or(INDEX_NULL)
     };
 
-    eprint!("… swizzle references\r");
-    LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())?
-        .with_column(
-            col("references")
-                .map(
-                    move |series: Series| -> PolarsResult<Option<Series>> {
-                        Ok(Some(
-                            series
-                                .list()?
-                                .apply_to_inner(&|series: Series| -> PolarsResult<Series> {
-                                    let series = series.binary()?;
-                                    let mut out: Vec<u32> = Vec::with_capacity(series.len());
-                                    out.extend(as_fixed_binary(series).flatten().map(ph_to_idx));
-                                    Ok(Series::from_vec("reference_idxs", out))
-                                })?
-                                .into_series(),
-                        ))
-                    },
-                    SpecialEq::from_type(DataType::List(DataType::UInt32.into())),
-                )
-                .alias("reference_idxs"),
-        )
-        .select([col("reference_idxs")])
-        .with_streaming(true)
-        .sink_parquet(
-            "narinfo-references.parquet".into(),
-            ParquetWriteOptions::default(),
-        )?;
-    eprintln!("{DONE}");
+    {
+        let span = info_span!("swizzle_refs", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("swizzle references");
+        span.pb_start();
+
+        LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())?
+            .with_column(
+                col("references")
+                    .map(
+                        move |series: Series| -> PolarsResult<Option<Series>> {
+                            Ok(Some(
+                                series
+                                    .list()?
+                                    .apply_to_inner(&|series: Series| -> PolarsResult<Series> {
+                                        let series = series.binary()?;
+                                        let mut out: Vec<u32> = Vec::with_capacity(series.len());
+                                        out.extend(
+                                            as_fixed_binary(series).flatten().map(ph_to_idx),
+                                        );
+                                        Ok(Series::from_vec("reference_idxs", out))
+                                    })?
+                                    .into_series(),
+                            ))
+                        },
+                        SpecialEq::from_type(DataType::List(DataType::UInt32.into())),
+                    )
+                    .alias("reference_idxs"),
+            )
+            .select([col("reference_idxs")])
+            .with_streaming(true)
+            .sink_parquet(
+                "narinfo-references.parquet".into(),
+                ParquetWriteOptions::default(),
+            )?;
+    };
 
     Ok(())
 }
diff --git a/users/edef/weave/src/lib.rs b/users/edef/weave/src/lib.rs
index 4ccd566ca52d..ff5bf9232676 100644
--- a/users/edef/weave/src/lib.rs
+++ b/users/edef/weave/src/lib.rs
@@ -8,6 +8,7 @@ use std::{
     slice,
     sync::Arc,
 };
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
 
 use polars::{
     datatypes::BinaryChunked,
@@ -20,7 +21,6 @@ pub type FixedBytes<const N: usize> =
     ArcRef<'static, polars::export::arrow::buffer::Bytes<u8>, [[u8; N]]>;
 
 pub const INDEX_NULL: u32 = !0;
-pub const DONE: &str = "\u{2714}";
 
 /// A terrific hash function, turning 20 bytes of cryptographic hash
 /// into 8 bytes of cryptographic hash.
@@ -42,8 +42,13 @@ pub fn leak<O, T: ?Sized>(r: OwningRef<Arc<O>, T>) -> &T {
 
 /// Read a dense `store_path_hash` array from `narinfo.parquet`,
 /// returning it as an owned [FixedBytes].
+#[tracing::instrument(fields(indicatif.pb_show = tracing::field::Empty))]
 pub fn load_ph_array() -> Result<FixedBytes<20>> {
-    eprint!("… load store_path_hash\r");
+    let span = tracing::Span::current();
+
+    span.pb_set_message("load store_path_hash");
+    span.pb_start();
+
     // TODO(edef): this could use a further pushdown, since polars is more hindrance than help here
     // We know this has to fit in memory (we can't mmap it without further encoding constraints),
     // and we want a single `Vec<[u8; 20]>` of the data.
@@ -57,7 +62,6 @@ pub fn load_ph_array() -> Result<FixedBytes<20>> {
     );
 
     u32::try_from(ph_array.len()).expect("dataset exceeds 2^32");
-    eprintln!("{DONE}");
 
     Ok(ph_array)
 }
diff --git a/users/edef/weave/src/main.rs b/users/edef/weave/src/main.rs
index b86992c279d1..b1dd27d8c1af 100644
--- a/users/edef/weave/src/main.rs
+++ b/users/edef/weave/src/main.rs
@@ -15,6 +15,8 @@ use std::{
     ops::Index,
     sync::atomic::{AtomicU32, Ordering},
 };
+use tracing::{info_span, warn};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 
 use polars::{
     datatypes::StaticArray,
@@ -23,36 +25,48 @@ use polars::{
     prelude::*,
 };
 
-use weave::{as_fixed_binary, hash64, DONE, INDEX_NULL};
+use weave::{as_fixed_binary, hash64, INDEX_NULL};
 
+#[tracing::instrument]
 fn main() -> Result<()> {
-    eprint!("… parse roots\r");
-    let roots: PathSet32 = as_fixed_binary::<20>(
-        LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())?
-            .explode([col("store_path_hash")])
-            .select([col("store_path_hash")])
-            .collect()?
-            .column("store_path_hash")?
-            .binary()?,
-    )
-    .flatten()
-    .collect();
-    eprintln!("{DONE}");
+    let _tracing = tvix_tracing::TracingBuilder::default()
+        .enable_progressbar()
+        .build()?;
+
+    let roots: PathSet32 = {
+        let span = info_span!("parse_roots", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("parse roots");
+        span.pb_start();
+
+        as_fixed_binary::<20>(
+            LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())?
+                .explode([col("store_path_hash")])
+                .select([col("store_path_hash")])
+                .collect()?
+                .column("store_path_hash")?
+                .binary()?,
+        )
+        .flatten()
+        .collect()
+    };
 
     {
-        let ph_array = weave::load_ph_array()?;
-
-        eprint!("… resolve roots\r");
-        ph_array.par_iter().enumerate().for_each(|(idx, h)| {
-            if let Some(idx_slot) = roots.find(h) {
-                assert_eq!(
-                    idx_slot.swap(idx as u32, Ordering::Relaxed),
-                    INDEX_NULL,
-                    "duplicate entry"
-                );
-            }
-        });
-        eprintln!("{DONE}");
+        let span = info_span!("resolve_roots", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("resolve roots");
+        span.pb_start();
+
+        weave::load_ph_array()?
+            .into_par_iter()
+            .enumerate()
+            .for_each(|(idx, h)| {
+                if let Some(idx_slot) = roots.find(h) {
+                    assert_eq!(
+                        idx_slot.swap(idx as u32, Ordering::Relaxed),
+                        INDEX_NULL,
+                        "duplicate entry"
+                    );
+                }
+            });
     }
 
     let mut todo = FxHashSet::default();
@@ -67,17 +81,28 @@ fn main() -> Result<()> {
             }
             todo.insert(idx);
         }
-        println!("skipping {unknown_roots} unknown roots");
-    }
 
-    eprint!("… load reference_idxs\r");
-    let ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
-        .finish()?
-        .column("reference_idxs")?
-        .list()?
-        .clone();
+        if unknown_roots != 0 {
+            warn!("skipping {unknown_roots} unknown roots");
+        }
+    }
 
+    let ri_array;
     let ri_array = {
+        let span = info_span!(
+            "load_reference_idxs",
+            indicatif.pb_show = tracing::field::Empty
+        )
+        .entered();
+        span.pb_set_message("load reference_idxs");
+        span.pb_start();
+
+        ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
+            .finish()?
+            .column("reference_idxs")?
+            .list()?
+            .clone();
+
         ChunkedList::new(ri_array.downcast_iter().map(|chunk| {
             (
                 chunk.offsets(),
@@ -91,48 +116,63 @@ fn main() -> Result<()> {
             )
         }))
     };
-    eprintln!("{DONE}");
 
     let mut seen = todo.clone();
-    while !todo.is_empty() {
-        println!("todo: {} seen: {}", todo.len(), seen.len());
-
-        todo = todo
-            .par_iter()
-            .flat_map(|&parent| {
-                if parent == INDEX_NULL {
-                    return FxHashSet::default();
-                }
+    {
+        let span = info_span!("mark", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("marking");
+        span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+
+        while !todo.is_empty() {
+            span.pb_set_length(seen.len() as u64);
+            span.pb_set_position(seen.len().saturating_sub(todo.len()) as u64);
+
+            todo = todo
+                .par_iter()
+                .flat_map(|&parent| {
+                    if parent == INDEX_NULL {
+                        return FxHashSet::default();
+                    }
+
+                    ri_array[parent as usize]
+                        .iter()
+                        .cloned()
+                        .filter(|child| !seen.contains(child))
+                        .collect::<FxHashSet<u32>>()
+                })
+                .collect();
+
+            for &index in &todo {
+                seen.insert(index);
+            }
+        }
 
-                ri_array[parent as usize]
-                    .iter()
-                    .cloned()
-                    .filter(|child| !seen.contains(child))
-                    .collect::<FxHashSet<u32>>()
-            })
-            .collect();
+        span.pb_set_length(seen.len() as u64);
+        span.pb_set_position(seen.len() as u64);
 
-        for &index in &todo {
-            seen.insert(index);
+        if seen.remove(&INDEX_NULL) {
+            warn!("WARNING: missing edges");
         }
     }
 
-    println!("done: {} paths", seen.len());
+    let seen = {
+        let span = info_span!("gather_live", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("gathering live set");
 
-    if seen.remove(&INDEX_NULL) {
-        println!("WARNING: missing edges");
-    }
+        let mut seen: Vec<u32> = seen.into_iter().collect();
+        seen.par_sort();
+        seen
+    };
 
-    eprint!("… gathering live set\r");
-    let mut seen: Vec<u32> = seen.into_iter().collect();
-    seen.par_sort();
-    eprintln!("{DONE}");
+    {
+        let span = info_span!("write_output", indicatif.pb_show = tracing::field::Empty).entered();
+        span.pb_set_message("writing output");
+        span.pb_start();
 
-    eprint!("… writing output\r");
-    ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
-        "live_idx" => seen,
-    }?)?;
-    eprintln!("{DONE}");
+        ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
+            "live_idx" => seen,
+        }?)?;
+    }
 
     Ok(())
 }