diff options
Diffstat (limited to 'users/edef/weave/src/main.rs')
-rw-r--r-- | users/edef/weave/src/main.rs | 172 |
1 files changed, 106 insertions, 66 deletions
diff --git a/users/edef/weave/src/main.rs b/users/edef/weave/src/main.rs index b86992c279d1..b1dd27d8c1af 100644 --- a/users/edef/weave/src/main.rs +++ b/users/edef/weave/src/main.rs @@ -15,6 +15,8 @@ use std::{ ops::Index, sync::atomic::{AtomicU32, Ordering}, }; +use tracing::{info_span, warn}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use polars::{ datatypes::StaticArray, @@ -23,36 +25,48 @@ use polars::{ prelude::*, }; -use weave::{as_fixed_binary, hash64, DONE, INDEX_NULL}; +use weave::{as_fixed_binary, hash64, INDEX_NULL}; +#[tracing::instrument] fn main() -> Result<()> { - eprint!("… parse roots\r"); - let roots: PathSet32 = as_fixed_binary::<20>( - LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())? - .explode([col("store_path_hash")]) - .select([col("store_path_hash")]) - .collect()? - .column("store_path_hash")? - .binary()?, - ) - .flatten() - .collect(); - eprintln!("{DONE}"); + let _tracing = tvix_tracing::TracingBuilder::default() + .enable_progressbar() + .build()?; + + let roots: PathSet32 = { + let span = info_span!("parse_roots", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("parse roots"); + span.pb_start(); + + as_fixed_binary::<20>( + LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())? + .explode([col("store_path_hash")]) + .select([col("store_path_hash")]) + .collect()? + .column("store_path_hash")? + .binary()?, + ) + .flatten() + .collect() + }; { - let ph_array = weave::load_ph_array()?; - - eprint!("… resolve roots\r"); - ph_array.par_iter().enumerate().for_each(|(idx, h)| { - if let Some(idx_slot) = roots.find(h) { - assert_eq!( - idx_slot.swap(idx as u32, Ordering::Relaxed), - INDEX_NULL, - "duplicate entry" - ); - } - }); - eprintln!("{DONE}"); + let span = info_span!("resolve_roots", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("resolve roots"); + span.pb_start(); + + weave::load_ph_array()? + .into_par_iter() + .enumerate() + .for_each(|(idx, h)| { + if let Some(idx_slot) = roots.find(h) { + assert_eq!( + idx_slot.swap(idx as u32, Ordering::Relaxed), + INDEX_NULL, + "duplicate entry" + ); + } + }); } let mut todo = FxHashSet::default(); @@ -67,17 +81,28 @@ fn main() -> Result<()> { } todo.insert(idx); } - println!("skipping {unknown_roots} unknown roots"); - } - eprint!("… load reference_idxs\r"); - let ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?) - .finish()? - .column("reference_idxs")? - .list()? - .clone(); + if unknown_roots != 0 { + warn!("skipping {unknown_roots} unknown roots"); + } + } + let ri_array; let ri_array = { + let span = info_span!( + "load_reference_idxs", + indicatif.pb_show = tracing::field::Empty + ) + .entered(); + span.pb_set_message("load reference_idxs"); + span.pb_start(); + + ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?) + .finish()? + .column("reference_idxs")? + .list()? + .clone(); + ChunkedList::new(ri_array.downcast_iter().map(|chunk| { ( chunk.offsets(), @@ -91,48 +116,63 @@ fn main() -> Result<()> { ) })) }; - eprintln!("{DONE}"); let mut seen = todo.clone(); - while !todo.is_empty() { - println!("todo: {} seen: {}", todo.len(), seen.len()); - - todo = todo - .par_iter() - .flat_map(|&parent| { - if parent == INDEX_NULL { - return FxHashSet::default(); - } + { + let span = info_span!("mark", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("marking"); + span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + + while !todo.is_empty() { + span.pb_set_length(seen.len() as u64); + span.pb_set_position(seen.len().saturating_sub(todo.len()) as u64); + + todo = todo + .par_iter() + .flat_map(|&parent| { + if parent == INDEX_NULL { + return FxHashSet::default(); + } + + ri_array[parent as usize] + .iter() + .cloned() + .filter(|child| !seen.contains(child)) + .collect::<FxHashSet<u32>>() + }) + .collect(); + + for &index in &todo { + seen.insert(index); + } + } - ri_array[parent as usize] - .iter() - .cloned() - .filter(|child| !seen.contains(child)) - .collect::<FxHashSet<u32>>() - }) - .collect(); + span.pb_set_length(seen.len() as u64); + span.pb_set_position(seen.len() as u64); - for &index in &todo { - seen.insert(index); + if seen.remove(&INDEX_NULL) { + warn!("WARNING: missing edges"); } } - println!("done: {} paths", seen.len()); + let seen = { + let span = info_span!("gather_live", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("gathering live set"); - if seen.remove(&INDEX_NULL) { - println!("WARNING: missing edges"); - } + let mut seen: Vec<u32> = seen.into_iter().collect(); + seen.par_sort(); + seen + }; - eprint!("… gathering live set\r"); - let mut seen: Vec<u32> = seen.into_iter().collect(); - seen.par_sort(); - eprintln!("{DONE}"); + { + let span = info_span!("write_output", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("writing output"); + span.pb_start(); - eprint!("… writing output\r"); - ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! { - "live_idx" => seen, - }?)?; - eprintln!("{DONE}"); + ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! { + "live_idx" => seen, + }?)?; + } Ok(()) } |