about summary refs log tree commit diff
path: root/users/edef/weave/src/bin/swizzle.rs
diff options
context:
space:
mode:
Diffstat (limited to 'users/edef/weave/src/bin/swizzle.rs')
-rw-r--r--users/edef/weave/src/bin/swizzle.rs114
1 files changed, 114 insertions, 0 deletions
diff --git a/users/edef/weave/src/bin/swizzle.rs b/users/edef/weave/src/bin/swizzle.rs
new file mode 100644
index 000000000000..68c18581268a
--- /dev/null
+++ b/users/edef/weave/src/bin/swizzle.rs
@@ -0,0 +1,114 @@
+//! Swizzle reads a `narinfo.parquet` file, usually produced by `narinfo2parquet`.
+//!
+//! It swizzles the reference list, ie it converts the references from absolute,
+//! global identifiers (store path hashes) to indices into the `store_path_hash`
+//! column (ie, row numbers), so that we can later walk the reference graph
+//! efficiently.
+//!
+//! Path hashes are represented as non-null, 20-byte `Binary` values.
+//! The indices are represented as 32-bit unsigned integers, with in-band nulls
+//! represented by [INDEX_NULL] (the all-1 bit pattern), to permit swizzling
+//! partial datasets.
+//!
+//! In essence, it converts from names to pointers, so that `weave` can simply
+//! chase pointers to trace the live set. This replaces an `O(log(n))` lookup
+//! with `O(1)` indexing, and produces a much denser representation that actually
+//! fits in memory.
+//!
+//! The in-memory representation is at least 80% smaller, and the indices compress
+//! well in Parquet due to both temporal locality of reference and the power law
+//! distribution of reference "popularity".
+//!
+//! Only two columns are read from `narinfo.parquet`:
+//!
+//!  * `store_path_hash :: PathHash`
+//!  * `references :: List[PathHash]`
+//!
+//! Output is written to `narinfo-references.parquet` in the form of a single
+//! `List[u32]` column, `reference_idxs`.
+//!
+//! This file is inherently bound to the corresponding `narinfo.parquet`,
+//! since it essentially contains pointers into this file.
+
+use anyhow::Result;
+use hashbrown::HashTable;
+use polars::prelude::*;
+use rayon::prelude::*;
+use std::fs::File;
+use tokio::runtime::Runtime;
+
+use weave::{as_fixed_binary, hash64, load_ph_array, DONE, INDEX_NULL};
+
+fn main() -> Result<()> {
+    let ph_array = load_ph_array()?;
+
+    // TODO(edef): re-parallelise this
+    // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works.
+    // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory
+    eprint!("… build index\r");
+    let ph_map: HashTable<(u64, u32)> = {
+        let mut ph_map = HashTable::with_capacity(ph_array.len());
+
+        for (offset, item) in ph_array.iter().enumerate() {
+            let offset = offset as u32;
+            let hash = hash64(item);
+            ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash);
+        }
+
+        ph_map
+    };
+    eprintln!("{DONE}");
+
+    eprint!("… swizzle references\r");
+    let mut pq = ParquetReader::new(File::open("narinfo.parquet")?)
+        .with_columns(Some(vec!["references".into()]))
+        .batched(1 << 16)?;
+
+    let mut reference_idxs =
+        Series::new_empty("reference_idxs", &DataType::List(DataType::UInt32.into()));
+
+    let mut bounce = vec![];
+    let runtime = Runtime::new()?;
+    while let Some(batches) = runtime.block_on(pq.next_batches(48))? {
+        batches
+            .into_par_iter()
+            .map(|df| -> ListChunked {
+                df.column("references")
+                    .unwrap()
+                    .list()
+                    .unwrap()
+                    .apply_to_inner(&|series: Series| -> PolarsResult<Series> {
+                        let series = series.binary()?;
+                        let mut out: Vec<u32> = Vec::with_capacity(series.len());
+
+                        out.extend(as_fixed_binary::<20>(series).flat_map(|xs| xs).map(|key| {
+                            let hash = hash64(&key);
+                            ph_map
+                                .find(hash, |&(candidate_hash, candidate_index)| {
+                                    candidate_hash == hash
+                                        && &ph_array[candidate_index as usize] == key
+                                })
+                                .map(|&(_, index)| index)
+                                .unwrap_or(INDEX_NULL)
+                        }));
+
+                        Ok(Series::from_vec("reference_idxs", out))
+                    })
+                    .unwrap()
+            })
+            .collect_into_vec(&mut bounce);
+
+        for batch in bounce.drain(..) {
+            reference_idxs.append(&batch.into_series())?;
+        }
+    }
+    eprintln!("{DONE}");
+
+    eprint!("… writing output\r");
+    ParquetWriter::new(File::create("narinfo-references.parquet")?).finish(&mut df! {
+        "reference_idxs" => reference_idxs,
+    }?)?;
+    eprintln!("{DONE}");
+
+    Ok(())
+}