diff options
author | edef <edef@edef.eu> | 2024-10-17T13·26+0000 |
---|---|---|
committer | edef <edef@edef.eu> | 2024-10-19T17·01+0000 |
commit | 313899c291f0295506c275418e570b39b4a5f079 (patch) | |
tree | 0a4c737f7c54082eaf3957072693115e29fabcd1 /users/edef/weave/src/bin/swizzle.rs | |
parent | bdc2891053c28a5739c4be55f4816d362a5f08e2 (diff) |
refactor(users/edef/weave/swizzle): use polars streaming r/8844
This vastly reduces the memory requirements, so we can run in ~40G RAM. Change-Id: I4952a780df294bd852a8b4682ba2fd59b9bae675 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12667 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'users/edef/weave/src/bin/swizzle.rs')
-rw-r--r-- | users/edef/weave/src/bin/swizzle.rs | 99 |
1 files changed, 44 insertions, 55 deletions
diff --git a/users/edef/weave/src/bin/swizzle.rs b/users/edef/weave/src/bin/swizzle.rs index 68c18581268a..bcea3edfada9 100644 --- a/users/edef/weave/src/bin/swizzle.rs +++ b/users/edef/weave/src/bin/swizzle.rs @@ -32,21 +32,21 @@ use anyhow::Result; use hashbrown::HashTable; -use polars::prelude::*; -use rayon::prelude::*; -use std::fs::File; -use tokio::runtime::Runtime; +use polars::{ + lazy::dsl::{col, SpecialEq}, + prelude::*, +}; -use weave::{as_fixed_binary, hash64, load_ph_array, DONE, INDEX_NULL}; +use weave::{as_fixed_binary, hash64, leak, load_ph_array, DONE, INDEX_NULL}; fn main() -> Result<()> { - let ph_array = load_ph_array()?; + let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?); // TODO(edef): re-parallelise this // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works. // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory eprint!("… build index\r"); - let ph_map: HashTable<(u64, u32)> = { + let ph_map: &'static HashTable<(u64, u32)> = { let mut ph_map = HashTable::with_capacity(ph_array.len()); for (offset, item) in ph_array.iter().enumerate() { @@ -55,59 +55,48 @@ fn main() -> Result<()> { ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash); } - ph_map + &*Box::leak(Box::new(ph_map)) }; eprintln!("{DONE}"); - eprint!("… swizzle references\r"); - let mut pq = ParquetReader::new(File::open("narinfo.parquet")?) - .with_columns(Some(vec!["references".into()])) - .batched(1 << 16)?; - - let mut reference_idxs = - Series::new_empty("reference_idxs", &DataType::List(DataType::UInt32.into())); - - let mut bounce = vec![]; - let runtime = Runtime::new()?; - while let Some(batches) = runtime.block_on(pq.next_batches(48))? { - batches - .into_par_iter() - .map(|df| -> ListChunked { - df.column("references") - .unwrap() - .list() - .unwrap() - .apply_to_inner(&|series: Series| -> PolarsResult<Series> { - let series = series.binary()?; - let mut out: Vec<u32> = Vec::with_capacity(series.len()); - - out.extend(as_fixed_binary::<20>(series).flat_map(|xs| xs).map(|key| { - let hash = hash64(&key); - ph_map - .find(hash, |&(candidate_hash, candidate_index)| { - candidate_hash == hash - && &ph_array[candidate_index as usize] == key - }) - .map(|&(_, index)| index) - .unwrap_or(INDEX_NULL) - })); - - Ok(Series::from_vec("reference_idxs", out)) - }) - .unwrap() + let ph_to_idx = |key: &[u8; 20]| -> u32 { + let hash = hash64(key); + ph_map + .find(hash, |&(candidate_hash, candidate_index)| { + candidate_hash == hash && &ph_array[candidate_index as usize] == key }) - .collect_into_vec(&mut bounce); - - for batch in bounce.drain(..) { - reference_idxs.append(&batch.into_series())?; - } - } - eprintln!("{DONE}"); + .map(|&(_, index)| index) + .unwrap_or(INDEX_NULL) + }; - eprint!("… writing output\r"); - ParquetWriter::new(File::create("narinfo-references.parquet")?).finish(&mut df! { - "reference_idxs" => reference_idxs, - }?)?; + eprint!("… swizzle references\r"); + LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())? + .with_column( + col("references") + .map( + move |series: Series| -> PolarsResult<Option<Series>> { + Ok(Some( + series + .list()? + .apply_to_inner(&|series: Series| -> PolarsResult<Series> { + let series = series.binary()?; + let mut out: Vec<u32> = Vec::with_capacity(series.len()); + out.extend(as_fixed_binary(series).flatten().map(ph_to_idx)); + Ok(Series::from_vec("reference_idxs", out)) + })? + .into_series(), + )) + }, + SpecialEq::from_type(DataType::List(DataType::UInt32.into())), + ) + .alias("reference_idxs"), + ) + .select([col("reference_idxs")]) + .with_streaming(true) + .sink_parquet( + "narinfo-references.parquet".into(), + ParquetWriteOptions::default(), + )?; eprintln!("{DONE}"); Ok(()) |