diff options
Diffstat (limited to 'users/edef/weave/src')
-rw-r--r-- | users/edef/weave/src/bin/swizzle.rs | 79 | ||||
-rw-r--r-- | users/edef/weave/src/lib.rs | 10 | ||||
-rw-r--r-- | users/edef/weave/src/main.rs | 172 |
3 files changed, 160 insertions, 101 deletions
diff --git a/users/edef/weave/src/bin/swizzle.rs b/users/edef/weave/src/bin/swizzle.rs index bcea3edfada9..840a53454f57 100644 --- a/users/edef/weave/src/bin/swizzle.rs +++ b/users/edef/weave/src/bin/swizzle.rs @@ -36,17 +36,27 @@ use polars::{ lazy::dsl::{col, SpecialEq}, prelude::*, }; +use tracing::info_span; +use tracing_indicatif::span_ext::IndicatifSpanExt as _; -use weave::{as_fixed_binary, hash64, leak, load_ph_array, DONE, INDEX_NULL}; +use weave::{as_fixed_binary, hash64, leak, load_ph_array, INDEX_NULL}; +#[tracing::instrument] fn main() -> Result<()> { + let _tracing = tvix_tracing::TracingBuilder::default() + .enable_progressbar() + .build()?; + let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?); // TODO(edef): re-parallelise this // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works. // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory - eprint!("… build index\r"); let ph_map: &'static HashTable<(u64, u32)> = { + let span = info_span!("ph_map", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("build index"); + span.pb_start(); + let mut ph_map = HashTable::with_capacity(ph_array.len()); for (offset, item) in ph_array.iter().enumerate() { @@ -57,7 +67,6 @@ fn main() -> Result<()> { &*Box::leak(Box::new(ph_map)) }; - eprintln!("{DONE}"); let ph_to_idx = |key: &[u8; 20]| -> u32 { let hash = hash64(key); @@ -69,35 +78,41 @@ fn main() -> Result<()> { .unwrap_or(INDEX_NULL) }; - eprint!("… swizzle references\r"); - LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())? - .with_column( - col("references") - .map( - move |series: Series| -> PolarsResult<Option<Series>> { - Ok(Some( - series - .list()? - .apply_to_inner(&|series: Series| -> PolarsResult<Series> { - let series = series.binary()?; - let mut out: Vec<u32> = Vec::with_capacity(series.len()); - out.extend(as_fixed_binary(series).flatten().map(ph_to_idx)); - Ok(Series::from_vec("reference_idxs", out)) - })? - .into_series(), - )) - }, - SpecialEq::from_type(DataType::List(DataType::UInt32.into())), - ) - .alias("reference_idxs"), - ) - .select([col("reference_idxs")]) - .with_streaming(true) - .sink_parquet( - "narinfo-references.parquet".into(), - ParquetWriteOptions::default(), - )?; - eprintln!("{DONE}"); + { + let span = info_span!("swizzle_refs", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("swizzle references"); + span.pb_start(); + + LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())? + .with_column( + col("references") + .map( + move |series: Series| -> PolarsResult<Option<Series>> { + Ok(Some( + series + .list()? + .apply_to_inner(&|series: Series| -> PolarsResult<Series> { + let series = series.binary()?; + let mut out: Vec<u32> = Vec::with_capacity(series.len()); + out.extend( + as_fixed_binary(series).flatten().map(ph_to_idx), + ); + Ok(Series::from_vec("reference_idxs", out)) + })? + .into_series(), + )) + }, + SpecialEq::from_type(DataType::List(DataType::UInt32.into())), + ) + .alias("reference_idxs"), + ) + .select([col("reference_idxs")]) + .with_streaming(true) + .sink_parquet( + "narinfo-references.parquet".into(), + ParquetWriteOptions::default(), + )?; + }; Ok(()) } diff --git a/users/edef/weave/src/lib.rs b/users/edef/weave/src/lib.rs index 4ccd566ca52d..ff5bf9232676 100644 --- a/users/edef/weave/src/lib.rs +++ b/users/edef/weave/src/lib.rs @@ -8,6 +8,7 @@ use std::{ slice, sync::Arc, }; +use tracing_indicatif::span_ext::IndicatifSpanExt as _; use polars::{ datatypes::BinaryChunked, @@ -20,7 +21,6 @@ pub type FixedBytes<const N: usize> = ArcRef<'static, polars::export::arrow::buffer::Bytes<u8>, [[u8; N]]>; pub const INDEX_NULL: u32 = !0; -pub const DONE: &str = "\u{2714}"; /// A terrific hash function, turning 20 bytes of cryptographic hash /// into 8 bytes of cryptographic hash. @@ -42,8 +42,13 @@ pub fn leak<O, T: ?Sized>(r: OwningRef<Arc<O>, T>) -> &T { /// Read a dense `store_path_hash` array from `narinfo.parquet`, /// returning it as an owned [FixedBytes]. +#[tracing::instrument(fields(indicatif.pb_show = tracing::field::Empty))] pub fn load_ph_array() -> Result<FixedBytes<20>> { - eprint!("… load store_path_hash\r"); + let span = tracing::Span::current(); + + span.pb_set_message("load store_path_hash"); + span.pb_start(); + // TODO(edef): this could use a further pushdown, since polars is more hindrance than help here // We know this has to fit in memory (we can't mmap it without further encoding constraints), // and we want a single `Vec<[u8; 20]>` of the data. @@ -57,7 +62,6 @@ pub fn load_ph_array() -> Result<FixedBytes<20>> { ); u32::try_from(ph_array.len()).expect("dataset exceeds 2^32"); - eprintln!("{DONE}"); Ok(ph_array) } diff --git a/users/edef/weave/src/main.rs b/users/edef/weave/src/main.rs index b86992c279d1..b1dd27d8c1af 100644 --- a/users/edef/weave/src/main.rs +++ b/users/edef/weave/src/main.rs @@ -15,6 +15,8 @@ use std::{ ops::Index, sync::atomic::{AtomicU32, Ordering}, }; +use tracing::{info_span, warn}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use polars::{ datatypes::StaticArray, @@ -23,36 +25,48 @@ use polars::{ prelude::*, }; -use weave::{as_fixed_binary, hash64, DONE, INDEX_NULL}; +use weave::{as_fixed_binary, hash64, INDEX_NULL}; +#[tracing::instrument] fn main() -> Result<()> { - eprint!("… parse roots\r"); - let roots: PathSet32 = as_fixed_binary::<20>( - LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())? - .explode([col("store_path_hash")]) - .select([col("store_path_hash")]) - .collect()? - .column("store_path_hash")? - .binary()?, - ) - .flatten() - .collect(); - eprintln!("{DONE}"); + let _tracing = tvix_tracing::TracingBuilder::default() + .enable_progressbar() + .build()?; + + let roots: PathSet32 = { + let span = info_span!("parse_roots", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("parse roots"); + span.pb_start(); + + as_fixed_binary::<20>( + LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())? + .explode([col("store_path_hash")]) + .select([col("store_path_hash")]) + .collect()? + .column("store_path_hash")? + .binary()?, + ) + .flatten() + .collect() + }; { - let ph_array = weave::load_ph_array()?; - - eprint!("… resolve roots\r"); - ph_array.par_iter().enumerate().for_each(|(idx, h)| { - if let Some(idx_slot) = roots.find(h) { - assert_eq!( - idx_slot.swap(idx as u32, Ordering::Relaxed), - INDEX_NULL, - "duplicate entry" - ); - } - }); - eprintln!("{DONE}"); + let span = info_span!("resolve_roots", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("resolve roots"); + span.pb_start(); + + weave::load_ph_array()? + .into_par_iter() + .enumerate() + .for_each(|(idx, h)| { + if let Some(idx_slot) = roots.find(h) { + assert_eq!( + idx_slot.swap(idx as u32, Ordering::Relaxed), + INDEX_NULL, + "duplicate entry" + ); + } + }); } let mut todo = FxHashSet::default(); @@ -67,17 +81,28 @@ fn main() -> Result<()> { } todo.insert(idx); } - println!("skipping {unknown_roots} unknown roots"); - } - eprint!("… load reference_idxs\r"); - let ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?) - .finish()? - .column("reference_idxs")? - .list()? - .clone(); + if unknown_roots != 0 { + warn!("skipping {unknown_roots} unknown roots"); + } + } + let ri_array; let ri_array = { + let span = info_span!( + "load_reference_idxs", + indicatif.pb_show = tracing::field::Empty + ) + .entered(); + span.pb_set_message("load reference_idxs"); + span.pb_start(); + + ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?) + .finish()? + .column("reference_idxs")? + .list()? + .clone(); + ChunkedList::new(ri_array.downcast_iter().map(|chunk| { ( chunk.offsets(), @@ -91,48 +116,63 @@ fn main() -> Result<()> { ) })) }; - eprintln!("{DONE}"); let mut seen = todo.clone(); - while !todo.is_empty() { - println!("todo: {} seen: {}", todo.len(), seen.len()); - - todo = todo - .par_iter() - .flat_map(|&parent| { - if parent == INDEX_NULL { - return FxHashSet::default(); - } + { + let span = info_span!("mark", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("marking"); + span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + + while !todo.is_empty() { + span.pb_set_length(seen.len() as u64); + span.pb_set_position(seen.len().saturating_sub(todo.len()) as u64); + + todo = todo + .par_iter() + .flat_map(|&parent| { + if parent == INDEX_NULL { + return FxHashSet::default(); + } + + ri_array[parent as usize] + .iter() + .cloned() + .filter(|child| !seen.contains(child)) + .collect::<FxHashSet<u32>>() + }) + .collect(); + + for &index in &todo { + seen.insert(index); + } + } - ri_array[parent as usize] - .iter() - .cloned() - .filter(|child| !seen.contains(child)) - .collect::<FxHashSet<u32>>() - }) - .collect(); + span.pb_set_length(seen.len() as u64); + span.pb_set_position(seen.len() as u64); - for &index in &todo { - seen.insert(index); + if seen.remove(&INDEX_NULL) { + warn!("WARNING: missing edges"); } } - println!("done: {} paths", seen.len()); + let seen = { + let span = info_span!("gather_live", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("gathering live set"); - if seen.remove(&INDEX_NULL) { - println!("WARNING: missing edges"); - } + let mut seen: Vec<u32> = seen.into_iter().collect(); + seen.par_sort(); + seen + }; - eprint!("… gathering live set\r"); - let mut seen: Vec<u32> = seen.into_iter().collect(); - seen.par_sort(); - eprintln!("{DONE}"); + { + let span = info_span!("write_output", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("writing output"); + span.pb_start(); - eprint!("… writing output\r"); - ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! { - "live_idx" => seen, - }?)?; - eprintln!("{DONE}"); + ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! { + "live_idx" => seen, + }?)?; + } Ok(()) } |