diff options
Diffstat (limited to 'users/edef/weave/src/bin/swizzle.rs')
-rw-r--r-- | users/edef/weave/src/bin/swizzle.rs | 79 |
1 files changed, 47 insertions, 32 deletions
diff --git a/users/edef/weave/src/bin/swizzle.rs b/users/edef/weave/src/bin/swizzle.rs index bcea3edfada9..840a53454f57 100644 --- a/users/edef/weave/src/bin/swizzle.rs +++ b/users/edef/weave/src/bin/swizzle.rs @@ -36,17 +36,27 @@ use polars::{ lazy::dsl::{col, SpecialEq}, prelude::*, }; +use tracing::info_span; +use tracing_indicatif::span_ext::IndicatifSpanExt as _; -use weave::{as_fixed_binary, hash64, leak, load_ph_array, DONE, INDEX_NULL}; +use weave::{as_fixed_binary, hash64, leak, load_ph_array, INDEX_NULL}; +#[tracing::instrument] fn main() -> Result<()> { + let _tracing = tvix_tracing::TracingBuilder::default() + .enable_progressbar() + .build()?; + let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?); // TODO(edef): re-parallelise this // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works. // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory - eprint!("… build index\r"); let ph_map: &'static HashTable<(u64, u32)> = { + let span = info_span!("ph_map", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("build index"); + span.pb_start(); + let mut ph_map = HashTable::with_capacity(ph_array.len()); for (offset, item) in ph_array.iter().enumerate() { @@ -57,7 +67,6 @@ fn main() -> Result<()> { &*Box::leak(Box::new(ph_map)) }; - eprintln!("{DONE}"); let ph_to_idx = |key: &[u8; 20]| -> u32 { let hash = hash64(key); @@ -69,35 +78,41 @@ fn main() -> Result<()> { .unwrap_or(INDEX_NULL) }; - eprint!("… swizzle references\r"); - LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())? - .with_column( - col("references") - .map( - move |series: Series| -> PolarsResult<Option<Series>> { - Ok(Some( - series - .list()? - .apply_to_inner(&|series: Series| -> PolarsResult<Series> { - let series = series.binary()?; - let mut out: Vec<u32> = Vec::with_capacity(series.len()); - out.extend(as_fixed_binary(series).flatten().map(ph_to_idx)); - Ok(Series::from_vec("reference_idxs", out)) - })? - .into_series(), - )) - }, - SpecialEq::from_type(DataType::List(DataType::UInt32.into())), - ) - .alias("reference_idxs"), - ) - .select([col("reference_idxs")]) - .with_streaming(true) - .sink_parquet( - "narinfo-references.parquet".into(), - ParquetWriteOptions::default(), - )?; - eprintln!("{DONE}"); + { + let span = info_span!("swizzle_refs", indicatif.pb_show = tracing::field::Empty).entered(); + span.pb_set_message("swizzle references"); + span.pb_start(); + + LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())? + .with_column( + col("references") + .map( + move |series: Series| -> PolarsResult<Option<Series>> { + Ok(Some( + series + .list()? + .apply_to_inner(&|series: Series| -> PolarsResult<Series> { + let series = series.binary()?; + let mut out: Vec<u32> = Vec::with_capacity(series.len()); + out.extend( + as_fixed_binary(series).flatten().map(ph_to_idx), + ); + Ok(Series::from_vec("reference_idxs", out)) + })? + .into_series(), + )) + }, + SpecialEq::from_type(DataType::List(DataType::UInt32.into())), + ) + .alias("reference_idxs"), + ) + .select([col("reference_idxs")]) + .with_streaming(true) + .sink_parquet( + "narinfo-references.parquet".into(), + ParquetWriteOptions::default(), + )?; + }; Ok(()) } |