From 313899c291f0295506c275418e570b39b4a5f079 Mon Sep 17 00:00:00 2001 From: edef Date: Thu, 17 Oct 2024 13:26:01 +0000 Subject: refactor(users/edef/weave/swizzle): use polars streaming This vastly reduces the memory requirements, so we can run in ~40G RAM. Change-Id: I4952a780df294bd852a8b4682ba2fd59b9bae675 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12667 Reviewed-by: flokli Tested-by: BuildkiteCI --- users/edef/weave/src/bin/swizzle.rs | 99 +++++++++++++++++-------------------- users/edef/weave/src/lib.rs | 20 +++++++- 2 files changed, 62 insertions(+), 57 deletions(-) (limited to 'users/edef/weave/src') diff --git a/users/edef/weave/src/bin/swizzle.rs b/users/edef/weave/src/bin/swizzle.rs index 68c18581268a..bcea3edfada9 100644 --- a/users/edef/weave/src/bin/swizzle.rs +++ b/users/edef/weave/src/bin/swizzle.rs @@ -32,21 +32,21 @@ use anyhow::Result; use hashbrown::HashTable; -use polars::prelude::*; -use rayon::prelude::*; -use std::fs::File; -use tokio::runtime::Runtime; +use polars::{ + lazy::dsl::{col, SpecialEq}, + prelude::*, +}; -use weave::{as_fixed_binary, hash64, load_ph_array, DONE, INDEX_NULL}; +use weave::{as_fixed_binary, hash64, leak, load_ph_array, DONE, INDEX_NULL}; fn main() -> Result<()> { - let ph_array = load_ph_array()?; + let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?); // TODO(edef): re-parallelise this // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works. // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory eprint!("… build index\r"); - let ph_map: HashTable<(u64, u32)> = { + let ph_map: &'static HashTable<(u64, u32)> = { let mut ph_map = HashTable::with_capacity(ph_array.len()); for (offset, item) in ph_array.iter().enumerate() { @@ -55,59 +55,48 @@ fn main() -> Result<()> { ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash); } - ph_map + &*Box::leak(Box::new(ph_map)) }; eprintln!("{DONE}"); - eprint!("… swizzle references\r"); - let mut pq = ParquetReader::new(File::open("narinfo.parquet")?) - .with_columns(Some(vec!["references".into()])) - .batched(1 << 16)?; - - let mut reference_idxs = - Series::new_empty("reference_idxs", &DataType::List(DataType::UInt32.into())); - - let mut bounce = vec![]; - let runtime = Runtime::new()?; - while let Some(batches) = runtime.block_on(pq.next_batches(48))? { - batches - .into_par_iter() - .map(|df| -> ListChunked { - df.column("references") - .unwrap() - .list() - .unwrap() - .apply_to_inner(&|series: Series| -> PolarsResult { - let series = series.binary()?; - let mut out: Vec = Vec::with_capacity(series.len()); - - out.extend(as_fixed_binary::<20>(series).flat_map(|xs| xs).map(|key| { - let hash = hash64(&key); - ph_map - .find(hash, |&(candidate_hash, candidate_index)| { - candidate_hash == hash - && &ph_array[candidate_index as usize] == key - }) - .map(|&(_, index)| index) - .unwrap_or(INDEX_NULL) - })); - - Ok(Series::from_vec("reference_idxs", out)) - }) - .unwrap() + let ph_to_idx = |key: &[u8; 20]| -> u32 { + let hash = hash64(key); + ph_map + .find(hash, |&(candidate_hash, candidate_index)| { + candidate_hash == hash && &ph_array[candidate_index as usize] == key }) - .collect_into_vec(&mut bounce); - - for batch in bounce.drain(..) { - reference_idxs.append(&batch.into_series())?; - } - } - eprintln!("{DONE}"); + .map(|&(_, index)| index) + .unwrap_or(INDEX_NULL) + }; - eprint!("… writing output\r"); - ParquetWriter::new(File::create("narinfo-references.parquet")?).finish(&mut df! { - "reference_idxs" => reference_idxs, - }?)?; + eprint!("… swizzle references\r"); + LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())? + .with_column( + col("references") + .map( + move |series: Series| -> PolarsResult> { + Ok(Some( + series + .list()? + .apply_to_inner(&|series: Series| -> PolarsResult { + let series = series.binary()?; + let mut out: Vec = Vec::with_capacity(series.len()); + out.extend(as_fixed_binary(series).flatten().map(ph_to_idx)); + Ok(Series::from_vec("reference_idxs", out)) + })? + .into_series(), + )) + }, + SpecialEq::from_type(DataType::List(DataType::UInt32.into())), + ) + .alias("reference_idxs"), + ) + .select([col("reference_idxs")]) + .with_streaming(true) + .sink_parquet( + "narinfo-references.parquet".into(), + ParquetWriteOptions::default(), + )?; eprintln!("{DONE}"); Ok(()) diff --git a/users/edef/weave/src/lib.rs b/users/edef/weave/src/lib.rs index db3d07e7de07..4ccd566ca52d 100644 --- a/users/edef/weave/src/lib.rs +++ b/users/edef/weave/src/lib.rs @@ -1,7 +1,13 @@ use anyhow::Result; -use owning_ref::ArcRef; +use owning_ref::{ArcRef, OwningRef}; use rayon::prelude::*; -use std::{fs::File, ops::Range, slice}; +use std::{ + fs::File, + mem, + ops::{Deref, Range}, + slice, + sync::Arc, +}; use polars::{ datatypes::BinaryChunked, @@ -24,6 +30,16 @@ pub fn hash64(h: &[u8; 20]) -> u64 { u64::from_ne_bytes(buf) } +pub fn leak(r: OwningRef, T>) -> &T { + // SAFETY: Either `ptr` points into the `Arc`, which lives until `r` is dropped, + // or it points at something else entirely which lives at least as long. + unsafe { + let ptr: *const T = r.deref(); + mem::forget(r); + &*ptr + } +} + /// Read a dense `store_path_hash` array from `narinfo.parquet`, /// returning it as an owned [FixedBytes]. pub fn load_ph_array() -> Result> { -- cgit 1.4.1