diff options
Diffstat (limited to 'tvix/tools/weave/src')
-rw-r--r-- | tvix/tools/weave/src/bin/swizzle.rs | 114 | ||||
-rw-r--r-- | tvix/tools/weave/src/bytes.rs | 27 | ||||
-rw-r--r-- | tvix/tools/weave/src/lib.rs | 102 | ||||
-rw-r--r-- | tvix/tools/weave/src/main.rs | 216 |
4 files changed, 459 insertions, 0 deletions
diff --git a/tvix/tools/weave/src/bin/swizzle.rs b/tvix/tools/weave/src/bin/swizzle.rs new file mode 100644 index 000000000000..68c18581268a --- /dev/null +++ b/tvix/tools/weave/src/bin/swizzle.rs @@ -0,0 +1,114 @@ +//! Swizzle reads a `narinfo.parquet` file, usually produced by `narinfo2parquet`. +//! +//! It swizzles the reference list, ie it converts the references from absolute, +//! global identifiers (store path hashes) to indices into the `store_path_hash` +//! column (ie, row numbers), so that we can later walk the reference graph +//! efficiently. +//! +//! Path hashes are represented as non-null, 20-byte `Binary` values. +//! The indices are represented as 32-bit unsigned integers, with in-band nulls +//! represented by [INDEX_NULL] (the all-1 bit pattern), to permit swizzling +//! partial datasets. +//! +//! In essence, it converts from names to pointers, so that `weave` can simply +//! chase pointers to trace the live set. This replaces an `O(log(n))` lookup +//! with `O(1)` indexing, and produces a much denser representation that actually +//! fits in memory. +//! +//! The in-memory representation is at least 80% smaller, and the indices compress +//! well in Parquet due to both temporal locality of reference and the power law +//! distribution of reference "popularity". +//! +//! Only two columns are read from `narinfo.parquet`: +//! +//! * `store_path_hash :: PathHash` +//! * `references :: List[PathHash]` +//! +//! Output is written to `narinfo-references.parquet` in the form of a single +//! `List[u32]` column, `reference_idxs`. +//! +//! This file is inherently bound to the corresponding `narinfo.parquet`, +//! since it essentially contains pointers into this file. + +use anyhow::Result; +use hashbrown::HashTable; +use polars::prelude::*; +use rayon::prelude::*; +use std::fs::File; +use tokio::runtime::Runtime; + +use weave::{as_fixed_binary, hash64, load_ph_array, DONE, INDEX_NULL}; + +fn main() -> Result<()> { + let ph_array = load_ph_array()?; + + // TODO(edef): re-parallelise this + // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works. + // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory + eprint!("… build index\r"); + let ph_map: HashTable<(u64, u32)> = { + let mut ph_map = HashTable::with_capacity(ph_array.len()); + + for (offset, item) in ph_array.iter().enumerate() { + let offset = offset as u32; + let hash = hash64(item); + ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash); + } + + ph_map + }; + eprintln!("{DONE}"); + + eprint!("… swizzle references\r"); + let mut pq = ParquetReader::new(File::open("narinfo.parquet")?) + .with_columns(Some(vec!["references".into()])) + .batched(1 << 16)?; + + let mut reference_idxs = + Series::new_empty("reference_idxs", &DataType::List(DataType::UInt32.into())); + + let mut bounce = vec![]; + let runtime = Runtime::new()?; + while let Some(batches) = runtime.block_on(pq.next_batches(48))? { + batches + .into_par_iter() + .map(|df| -> ListChunked { + df.column("references") + .unwrap() + .list() + .unwrap() + .apply_to_inner(&|series: Series| -> PolarsResult<Series> { + let series = series.binary()?; + let mut out: Vec<u32> = Vec::with_capacity(series.len()); + + out.extend(as_fixed_binary::<20>(series).flat_map(|xs| xs).map(|key| { + let hash = hash64(&key); + ph_map + .find(hash, |&(candidate_hash, candidate_index)| { + candidate_hash == hash + && &ph_array[candidate_index as usize] == key + }) + .map(|&(_, index)| index) + .unwrap_or(INDEX_NULL) + })); + + Ok(Series::from_vec("reference_idxs", out)) + }) + .unwrap() + }) + .collect_into_vec(&mut bounce); + + for batch in bounce.drain(..) { + reference_idxs.append(&batch.into_series())?; + } + } + eprintln!("{DONE}"); + + eprint!("… writing output\r"); + ParquetWriter::new(File::create("narinfo-references.parquet")?).finish(&mut df! { + "reference_idxs" => reference_idxs, + }?)?; + eprintln!("{DONE}"); + + Ok(()) +} diff --git a/tvix/tools/weave/src/bytes.rs b/tvix/tools/weave/src/bytes.rs new file mode 100644 index 000000000000..c6dc2ebb4492 --- /dev/null +++ b/tvix/tools/weave/src/bytes.rs @@ -0,0 +1,27 @@ +use owning_ref::{OwningRef, StableAddress}; +use polars::export::arrow::buffer::Buffer; +use std::ops::Deref; + +/// An shared `[[u8; N]]` backed by a Polars [Buffer]. +pub type FixedBytes<const N: usize> = OwningRef<Bytes, [[u8; N]]>; + +/// Wrapper struct to make [Buffer] implement [StableAddress]. +/// TODO(edef): upstream the `impl` +pub struct Bytes(pub Buffer<u8>); + +/// SAFETY: [Buffer] is always an Arc+Vec indirection. +unsafe impl StableAddress for Bytes {} + +impl Bytes { + pub fn map<U: ?Sized>(self, f: impl FnOnce(&[u8]) -> &U) -> OwningRef<Self, U> { + OwningRef::new(self).map(f) + } +} + +impl Deref for Bytes { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &*self.0 + } +} diff --git a/tvix/tools/weave/src/lib.rs b/tvix/tools/weave/src/lib.rs new file mode 100644 index 000000000000..12a86f9fb002 --- /dev/null +++ b/tvix/tools/weave/src/lib.rs @@ -0,0 +1,102 @@ +use anyhow::Result; +use rayon::prelude::*; +use std::{fs::File, slice}; + +use polars::{ + datatypes::BinaryChunked, + export::arrow::array::BinaryArray, + prelude::{ParquetReader, SerReader}, +}; + +pub use crate::bytes::*; +mod bytes; + +pub const INDEX_NULL: u32 = !0; +pub const DONE: &str = "\u{2714}"; + +/// A terrific hash function, turning 20 bytes of cryptographic hash +/// into 8 bytes of cryptographic hash. +pub fn hash64(h: &[u8; 20]) -> u64 { + let mut buf = [0; 8]; + buf.copy_from_slice(&h[..8]); + u64::from_ne_bytes(buf) +} + +/// Read a dense `store_path_hash` array from `narinfo.parquet`, +/// returning it as an owned [FixedBytes]. +pub fn load_ph_array() -> Result<FixedBytes<20>> { + eprint!("… load store_path_hash\r"); + // TODO(edef): this could use a further pushdown, since polars is more hindrance than help here + // We know this has to fit in memory (we can't mmap it without further encoding constraints), + // and we want a single `Vec<[u8; 20]>` of the data. + let ph_array = into_fixed_binary_rechunk::<20>( + ParquetReader::new(File::open("narinfo.parquet").unwrap()) + .with_columns(Some(vec!["store_path_hash".into()])) + .set_rechunk(true) + .finish()? + .column("store_path_hash")? + .binary()?, + ); + + u32::try_from(ph_array.len()).expect("dataset exceeds 2^32"); + eprintln!("{DONE}"); + + Ok(ph_array) +} + +/// Iterator over `&[[u8; N]]` from a dense [BinaryChunked]. +pub fn as_fixed_binary<const N: usize>( + chunked: &BinaryChunked, +) -> impl Iterator<Item = &[[u8; N]]> + DoubleEndedIterator { + chunked.downcast_iter().map(|array| { + assert_fixed_dense::<N>(array); + exact_chunks(array.values()).unwrap() + }) +} + +/// Convert a dense [BinaryChunked] into a single chunk as [FixedBytes], +/// without taking a reference to the offsets array and validity bitmap. +fn into_fixed_binary_rechunk<const N: usize>(chunked: &BinaryChunked) -> FixedBytes<N> { + let chunked = chunked.rechunk(); + let mut iter = chunked.downcast_iter(); + let array = iter.next().unwrap(); + + assert_fixed_dense::<N>(array); + Bytes(array.values().clone()).map(|buf| exact_chunks(buf).unwrap()) +} + +/// Ensures that the supplied Arrow array consists of densely packed bytestrings of length `N`. +/// In other words, ensure that it is free of nulls, and that the offsets have a fixed stride of `N`. +fn assert_fixed_dense<const N: usize>(array: &BinaryArray<i64>) { + let null_count = array.validity().map_or(0, |bits| bits.unset_bits()); + if null_count > 0 { + panic!("null values present"); + } + + let length_check = array + .offsets() + .as_slice() + .par_windows(2) + .all(|w| (w[1] - w[0]) == N as i64); + + if !length_check { + panic!("lengths are inconsistent"); + } +} + +fn exact_chunks<const K: usize>(buf: &[u8]) -> Option<&[[u8; K]]> { + // SAFETY: We ensure that `buf.len()` is a multiple of K, and there are no alignment requirements. + unsafe { + let ptr = buf.as_ptr(); + let len = buf.len(); + + if len % K != 0 { + return None; + } + + let ptr = ptr as *mut [u8; K]; + let len = len / K; + + Some(slice::from_raw_parts(ptr, len)) + } +} diff --git a/tvix/tools/weave/src/main.rs b/tvix/tools/weave/src/main.rs new file mode 100644 index 000000000000..e8a1990a0df3 --- /dev/null +++ b/tvix/tools/weave/src/main.rs @@ -0,0 +1,216 @@ +//! Weave resolves a list of roots from `nixpkgs.roots` against `narinfo.parquet`, +//! and then uses the reference graph from the accompanying `narinfo-references.parquet` +//! produced by `swizzle` to collect the closure of the roots. +//! +//! They are written to `live_idxs.parquet`, which only has one column, representing +//! the row numbers in `narinfo.parquet` corresponding to live paths. + +use anyhow::Result; +use hashbrown::{hash_table, HashTable}; +use nix_compat::nixbase32; +use rayon::prelude::*; +use std::{ + collections::{BTreeMap, HashSet}, + fs::{self, File}, + ops::Index, + sync::atomic::{AtomicU32, Ordering}, +}; + +use polars::{ + datatypes::StaticArray, + export::arrow::{array::UInt32Array, offset::OffsetsBuffer}, + prelude::*, +}; + +use weave::{hash64, DONE, INDEX_NULL}; + +fn main() -> Result<()> { + eprint!("… parse roots\r"); + let roots: PathSet32 = { + let mut roots = Vec::new(); + fs::read("nixpkgs.roots")? + .par_chunks_exact(32 + 1) + .map(|e| nixbase32::decode_fixed::<20>(&e[0..32]).unwrap()) + .collect_into_vec(&mut roots); + + roots.iter().collect() + }; + eprintln!("{DONE}"); + + { + let ph_array = weave::load_ph_array()?; + + eprint!("… resolve roots\r"); + ph_array.par_iter().enumerate().for_each(|(idx, h)| { + if let Some(idx_slot) = roots.find(h) { + idx_slot + .compare_exchange(INDEX_NULL, idx as u32, Ordering::SeqCst, Ordering::SeqCst) + .expect("duplicate entry"); + } + }); + eprintln!("{DONE}"); + } + + let mut todo = HashSet::with_capacity(roots.len()); + { + let mut unknown_roots = 0usize; + for (_, idx) in roots.table { + let idx = idx.into_inner(); + if idx == INDEX_NULL { + unknown_roots += 1; + continue; + } + todo.insert(idx); + } + println!("skipping {unknown_roots} unknown roots"); + } + + eprint!("… load reference_idxs\r"); + let ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?) + .finish()? + .column("reference_idxs")? + .list()? + .clone(); + + let ri_array = { + ChunkedList::new(ri_array.downcast_iter().map(|chunk| { + ( + chunk.offsets(), + chunk + .values() + .as_any() + .downcast_ref::<UInt32Array>() + .unwrap() + .as_slice() + .unwrap(), + ) + })) + }; + eprintln!("{DONE}"); + + let mut seen = todo.clone(); + while !todo.is_empty() { + println!("todo: {} seen: {}", todo.len(), seen.len()); + + todo = todo + .par_iter() + .flat_map(|&parent| { + if parent == INDEX_NULL { + return vec![]; + } + + ri_array[parent as usize] + .iter() + .cloned() + .filter(|child| !seen.contains(child)) + .collect::<Vec<u32>>() + }) + .collect(); + + for &index in &todo { + seen.insert(index); + } + } + + println!("done: {} paths", seen.len()); + + if seen.remove(&INDEX_NULL) { + println!("WARNING: missing edges"); + } + + eprint!("… gathering live set\r"); + let mut seen: Vec<u32> = seen.into_iter().collect(); + seen.par_sort(); + eprintln!("{DONE}"); + + eprint!("… writing output\r"); + ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! { + "live_idx" => seen, + }?)?; + eprintln!("{DONE}"); + + Ok(()) +} + +struct PathSet32 { + table: HashTable<([u8; 20], AtomicU32)>, +} + +impl PathSet32 { + fn with_capacity(capacity: usize) -> Self { + Self { + table: HashTable::with_capacity(capacity), + } + } + + fn insert(&mut self, value: &[u8; 20]) -> bool { + let hash = hash64(value); + + match self + .table + .entry(hash, |(x, _)| x == value, |(x, _)| hash64(x)) + { + hash_table::Entry::Occupied(_) => false, + hash_table::Entry::Vacant(entry) => { + entry.insert((*value, AtomicU32::new(INDEX_NULL))); + true + } + } + } + + fn find(&self, value: &[u8; 20]) -> Option<&AtomicU32> { + let hash = hash64(value); + self.table + .find(hash, |(x, _)| x == value) + .as_ref() + .map(|(_, x)| x) + } + + fn len(&self) -> usize { + self.table.len() + } +} + +impl<'a> FromIterator<&'a [u8; 20]> for PathSet32 { + fn from_iter<T: IntoIterator<Item = &'a [u8; 20]>>(iter: T) -> Self { + let iter = iter.into_iter(); + let mut this = Self::with_capacity(iter.size_hint().0); + + for item in iter { + this.insert(item); + } + + this + } +} + +struct ChunkedList<'a, T> { + by_offset: BTreeMap<usize, (&'a OffsetsBuffer<i64>, &'a [T])>, +} + +impl<'a, T> ChunkedList<'a, T> { + fn new(chunks: impl IntoIterator<Item = (&'a OffsetsBuffer<i64>, &'a [T])>) -> Self { + let mut next_offset = 0usize; + ChunkedList { + by_offset: chunks + .into_iter() + .map(|(offsets, values)| { + let offset = next_offset; + next_offset = next_offset.checked_add(offsets.len_proxy()).unwrap(); + + (offset, (offsets, values)) + }) + .collect(), + } + } +} + +impl<'a, T> Index<usize> for ChunkedList<'a, T> { + type Output = [T]; + + fn index(&self, index: usize) -> &Self::Output { + let (&base, &(offsets, values)) = self.by_offset.range(..=index).next_back().unwrap(); + let (start, end) = offsets.start_end(index - base); + &values[start..end] + } +} |