about summary refs log tree commit diff
path: root/tvix/tools/weave/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/tools/weave/src')
-rw-r--r--tvix/tools/weave/src/bin/swizzle.rs114
-rw-r--r--tvix/tools/weave/src/bytes.rs27
-rw-r--r--tvix/tools/weave/src/lib.rs106
-rw-r--r--tvix/tools/weave/src/main.rs216
4 files changed, 463 insertions, 0 deletions
diff --git a/tvix/tools/weave/src/bin/swizzle.rs b/tvix/tools/weave/src/bin/swizzle.rs
new file mode 100644
index 0000000000..68c1858126
--- /dev/null
+++ b/tvix/tools/weave/src/bin/swizzle.rs
@@ -0,0 +1,114 @@
+//! Swizzle reads a `narinfo.parquet` file, usually produced by `narinfo2parquet`.
+//!
+//! It swizzles the reference list, ie it converts the references from absolute,
+//! global identifiers (store path hashes) to indices into the `store_path_hash`
+//! column (ie, row numbers), so that we can later walk the reference graph
+//! efficiently.
+//!
+//! Path hashes are represented as non-null, 20-byte `Binary` values.
+//! The indices are represented as 32-bit unsigned integers, with in-band nulls
+//! represented by [INDEX_NULL] (the all-1 bit pattern), to permit swizzling
+//! partial datasets.
+//!
+//! In essence, it converts from names to pointers, so that `weave` can simply
+//! chase pointers to trace the live set. This replaces an `O(log(n))` lookup
+//! with `O(1)` indexing, and produces a much denser representation that actually
+//! fits in memory.
+//!
+//! The in-memory representation is at least 80% smaller, and the indices compress
+//! well in Parquet due to both temporal locality of reference and the power law
+//! distribution of reference "popularity".
+//!
+//! Only two columns are read from `narinfo.parquet`:
+//!
+//!  * `store_path_hash :: PathHash`
+//!  * `references :: List[PathHash]`
+//!
+//! Output is written to `narinfo-references.parquet` in the form of a single
+//! `List[u32]` column, `reference_idxs`.
+//!
+//! This file is inherently bound to the corresponding `narinfo.parquet`,
+//! since it essentially contains pointers into this file.
+
+use anyhow::Result;
+use hashbrown::HashTable;
+use polars::prelude::*;
+use rayon::prelude::*;
+use std::fs::File;
+use tokio::runtime::Runtime;
+
+use weave::{as_fixed_binary, hash64, load_ph_array, DONE, INDEX_NULL};
+
+fn main() -> Result<()> {
+    let ph_array = load_ph_array()?;
+
+    // TODO(edef): re-parallelise this
+    // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works.
+    // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory
+    eprint!("… build index\r");
+    let ph_map: HashTable<(u64, u32)> = {
+        let mut ph_map = HashTable::with_capacity(ph_array.len());
+
+        for (offset, item) in ph_array.iter().enumerate() {
+            let offset = offset as u32;
+            let hash = hash64(item);
+            ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash);
+        }
+
+        ph_map
+    };
+    eprintln!("{DONE}");
+
+    eprint!("… swizzle references\r");
+    let mut pq = ParquetReader::new(File::open("narinfo.parquet")?)
+        .with_columns(Some(vec!["references".into()]))
+        .batched(1 << 16)?;
+
+    let mut reference_idxs =
+        Series::new_empty("reference_idxs", &DataType::List(DataType::UInt32.into()));
+
+    let mut bounce = vec![];
+    let runtime = Runtime::new()?;
+    while let Some(batches) = runtime.block_on(pq.next_batches(48))? {
+        batches
+            .into_par_iter()
+            .map(|df| -> ListChunked {
+                df.column("references")
+                    .unwrap()
+                    .list()
+                    .unwrap()
+                    .apply_to_inner(&|series: Series| -> PolarsResult<Series> {
+                        let series = series.binary()?;
+                        let mut out: Vec<u32> = Vec::with_capacity(series.len());
+
+                        out.extend(as_fixed_binary::<20>(series).flat_map(|xs| xs).map(|key| {
+                            let hash = hash64(&key);
+                            ph_map
+                                .find(hash, |&(candidate_hash, candidate_index)| {
+                                    candidate_hash == hash
+                                        && &ph_array[candidate_index as usize] == key
+                                })
+                                .map(|&(_, index)| index)
+                                .unwrap_or(INDEX_NULL)
+                        }));
+
+                        Ok(Series::from_vec("reference_idxs", out))
+                    })
+                    .unwrap()
+            })
+            .collect_into_vec(&mut bounce);
+
+        for batch in bounce.drain(..) {
+            reference_idxs.append(&batch.into_series())?;
+        }
+    }
+    eprintln!("{DONE}");
+
+    eprint!("… writing output\r");
+    ParquetWriter::new(File::create("narinfo-references.parquet")?).finish(&mut df! {
+        "reference_idxs" => reference_idxs,
+    }?)?;
+    eprintln!("{DONE}");
+
+    Ok(())
+}
diff --git a/tvix/tools/weave/src/bytes.rs b/tvix/tools/weave/src/bytes.rs
new file mode 100644
index 0000000000..c6dc2ebb44
--- /dev/null
+++ b/tvix/tools/weave/src/bytes.rs
@@ -0,0 +1,27 @@
+use owning_ref::{OwningRef, StableAddress};
+use polars::export::arrow::buffer::Buffer;
+use std::ops::Deref;
+
+/// An shared `[[u8; N]]` backed by a Polars [Buffer].
+pub type FixedBytes<const N: usize> = OwningRef<Bytes, [[u8; N]]>;
+
+/// Wrapper struct to make [Buffer] implement [StableAddress].
+/// TODO(edef): upstream the `impl`
+pub struct Bytes(pub Buffer<u8>);
+
+/// SAFETY: [Buffer] is always an Arc+Vec indirection.
+unsafe impl StableAddress for Bytes {}
+
+impl Bytes {
+    pub fn map<U: ?Sized>(self, f: impl FnOnce(&[u8]) -> &U) -> OwningRef<Self, U> {
+        OwningRef::new(self).map(f)
+    }
+}
+
+impl Deref for Bytes {
+    type Target = [u8];
+
+    fn deref(&self) -> &Self::Target {
+        &*self.0
+    }
+}
diff --git a/tvix/tools/weave/src/lib.rs b/tvix/tools/weave/src/lib.rs
new file mode 100644
index 0000000000..bc2221bf5c
--- /dev/null
+++ b/tvix/tools/weave/src/lib.rs
@@ -0,0 +1,106 @@
+use anyhow::Result;
+use rayon::prelude::*;
+use std::{fs::File, ops::Range, slice};
+
+use polars::{
+    datatypes::BinaryChunked,
+    export::arrow::array::BinaryArray,
+    prelude::{ParquetReader, SerReader},
+};
+
+pub use crate::bytes::*;
+mod bytes;
+
+pub const INDEX_NULL: u32 = !0;
+pub const DONE: &str = "\u{2714}";
+
+/// A terrific hash function, turning 20 bytes of cryptographic hash
+/// into 8 bytes of cryptographic hash.
+pub fn hash64(h: &[u8; 20]) -> u64 {
+    let mut buf = [0; 8];
+    buf.copy_from_slice(&h[..8]);
+    u64::from_ne_bytes(buf)
+}
+
+/// Read a dense `store_path_hash` array from `narinfo.parquet`,
+/// returning it as an owned [FixedBytes].
+pub fn load_ph_array() -> Result<FixedBytes<20>> {
+    eprint!("… load store_path_hash\r");
+    // TODO(edef): this could use a further pushdown, since polars is more hindrance than help here
+    // We know this has to fit in memory (we can't mmap it without further encoding constraints),
+    // and we want a single `Vec<[u8; 20]>` of the data.
+    let ph_array = into_fixed_binary_rechunk::<20>(
+        ParquetReader::new(File::open("narinfo.parquet").unwrap())
+            .with_columns(Some(vec!["store_path_hash".into()]))
+            .set_rechunk(true)
+            .finish()?
+            .column("store_path_hash")?
+            .binary()?,
+    );
+
+    u32::try_from(ph_array.len()).expect("dataset exceeds 2^32");
+    eprintln!("{DONE}");
+
+    Ok(ph_array)
+}
+
+/// Iterator over `&[[u8; N]]` from a dense [BinaryChunked].
+pub fn as_fixed_binary<const N: usize>(
+    chunked: &BinaryChunked,
+) -> impl Iterator<Item = &[[u8; N]]> + DoubleEndedIterator {
+    chunked.downcast_iter().map(|array| {
+        let range = assert_fixed_dense::<N>(array);
+        exact_chunks(&array.values()[range]).unwrap()
+    })
+}
+
+/// Convert a dense [BinaryChunked] into a single chunk as [FixedBytes],
+/// without taking a reference to the offsets array and validity bitmap.
+fn into_fixed_binary_rechunk<const N: usize>(chunked: &BinaryChunked) -> FixedBytes<N> {
+    let chunked = chunked.rechunk();
+    let mut iter = chunked.downcast_iter();
+    let array = iter.next().unwrap();
+
+    let range = assert_fixed_dense::<N>(array);
+    Bytes(array.values().clone().sliced(range.start, range.len()))
+        .map(|buf| exact_chunks(buf).unwrap())
+}
+
+/// Ensures that the supplied Arrow array consists of densely packed bytestrings of length `N`.
+/// In other words, ensure that it is free of nulls, and that the offsets have a fixed stride of `N`.
+#[must_use = "only the range returned is guaranteed to be conformant"]
+fn assert_fixed_dense<const N: usize>(array: &BinaryArray<i64>) -> Range<usize> {
+    let null_count = array.validity().map_or(0, |bits| bits.unset_bits());
+    if null_count > 0 {
+        panic!("null values present");
+    }
+
+    let offsets = array.offsets();
+    let length_check = offsets
+        .as_slice()
+        .par_windows(2)
+        .all(|w| (w[1] - w[0]) == N as i64);
+
+    if !length_check {
+        panic!("lengths are inconsistent");
+    }
+
+    (*offsets.first() as usize)..(*offsets.last() as usize)
+}
+
+fn exact_chunks<const K: usize>(buf: &[u8]) -> Option<&[[u8; K]]> {
+    // SAFETY: We ensure that `buf.len()` is a multiple of K, and there are no alignment requirements.
+    unsafe {
+        let ptr = buf.as_ptr();
+        let len = buf.len();
+
+        if len % K != 0 {
+            return None;
+        }
+
+        let ptr = ptr as *mut [u8; K];
+        let len = len / K;
+
+        Some(slice::from_raw_parts(ptr, len))
+    }
+}
diff --git a/tvix/tools/weave/src/main.rs b/tvix/tools/weave/src/main.rs
new file mode 100644
index 0000000000..e8a1990a0d
--- /dev/null
+++ b/tvix/tools/weave/src/main.rs
@@ -0,0 +1,216 @@
+//! Weave resolves a list of roots from `nixpkgs.roots` against `narinfo.parquet`,
+//! and then uses the reference graph from the accompanying `narinfo-references.parquet`
+//! produced by `swizzle` to collect the closure of the roots.
+//!
+//! They are written to `live_idxs.parquet`, which only has one column, representing
+//! the row numbers in `narinfo.parquet` corresponding to live paths.
+
+use anyhow::Result;
+use hashbrown::{hash_table, HashTable};
+use nix_compat::nixbase32;
+use rayon::prelude::*;
+use std::{
+    collections::{BTreeMap, HashSet},
+    fs::{self, File},
+    ops::Index,
+    sync::atomic::{AtomicU32, Ordering},
+};
+
+use polars::{
+    datatypes::StaticArray,
+    export::arrow::{array::UInt32Array, offset::OffsetsBuffer},
+    prelude::*,
+};
+
+use weave::{hash64, DONE, INDEX_NULL};
+
+fn main() -> Result<()> {
+    eprint!("… parse roots\r");
+    let roots: PathSet32 = {
+        let mut roots = Vec::new();
+        fs::read("nixpkgs.roots")?
+            .par_chunks_exact(32 + 1)
+            .map(|e| nixbase32::decode_fixed::<20>(&e[0..32]).unwrap())
+            .collect_into_vec(&mut roots);
+
+        roots.iter().collect()
+    };
+    eprintln!("{DONE}");
+
+    {
+        let ph_array = weave::load_ph_array()?;
+
+        eprint!("… resolve roots\r");
+        ph_array.par_iter().enumerate().for_each(|(idx, h)| {
+            if let Some(idx_slot) = roots.find(h) {
+                idx_slot
+                    .compare_exchange(INDEX_NULL, idx as u32, Ordering::SeqCst, Ordering::SeqCst)
+                    .expect("duplicate entry");
+            }
+        });
+        eprintln!("{DONE}");
+    }
+
+    let mut todo = HashSet::with_capacity(roots.len());
+    {
+        let mut unknown_roots = 0usize;
+        for (_, idx) in roots.table {
+            let idx = idx.into_inner();
+            if idx == INDEX_NULL {
+                unknown_roots += 1;
+                continue;
+            }
+            todo.insert(idx);
+        }
+        println!("skipping {unknown_roots} unknown roots");
+    }
+
+    eprint!("… load reference_idxs\r");
+    let ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
+        .finish()?
+        .column("reference_idxs")?
+        .list()?
+        .clone();
+
+    let ri_array = {
+        ChunkedList::new(ri_array.downcast_iter().map(|chunk| {
+            (
+                chunk.offsets(),
+                chunk
+                    .values()
+                    .as_any()
+                    .downcast_ref::<UInt32Array>()
+                    .unwrap()
+                    .as_slice()
+                    .unwrap(),
+            )
+        }))
+    };
+    eprintln!("{DONE}");
+
+    let mut seen = todo.clone();
+    while !todo.is_empty() {
+        println!("todo: {} seen: {}", todo.len(), seen.len());
+
+        todo = todo
+            .par_iter()
+            .flat_map(|&parent| {
+                if parent == INDEX_NULL {
+                    return vec![];
+                }
+
+                ri_array[parent as usize]
+                    .iter()
+                    .cloned()
+                    .filter(|child| !seen.contains(child))
+                    .collect::<Vec<u32>>()
+            })
+            .collect();
+
+        for &index in &todo {
+            seen.insert(index);
+        }
+    }
+
+    println!("done: {} paths", seen.len());
+
+    if seen.remove(&INDEX_NULL) {
+        println!("WARNING: missing edges");
+    }
+
+    eprint!("… gathering live set\r");
+    let mut seen: Vec<u32> = seen.into_iter().collect();
+    seen.par_sort();
+    eprintln!("{DONE}");
+
+    eprint!("… writing output\r");
+    ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
+        "live_idx" => seen,
+    }?)?;
+    eprintln!("{DONE}");
+
+    Ok(())
+}
+
+struct PathSet32 {
+    table: HashTable<([u8; 20], AtomicU32)>,
+}
+
+impl PathSet32 {
+    fn with_capacity(capacity: usize) -> Self {
+        Self {
+            table: HashTable::with_capacity(capacity),
+        }
+    }
+
+    fn insert(&mut self, value: &[u8; 20]) -> bool {
+        let hash = hash64(value);
+
+        match self
+            .table
+            .entry(hash, |(x, _)| x == value, |(x, _)| hash64(x))
+        {
+            hash_table::Entry::Occupied(_) => false,
+            hash_table::Entry::Vacant(entry) => {
+                entry.insert((*value, AtomicU32::new(INDEX_NULL)));
+                true
+            }
+        }
+    }
+
+    fn find(&self, value: &[u8; 20]) -> Option<&AtomicU32> {
+        let hash = hash64(value);
+        self.table
+            .find(hash, |(x, _)| x == value)
+            .as_ref()
+            .map(|(_, x)| x)
+    }
+
+    fn len(&self) -> usize {
+        self.table.len()
+    }
+}
+
+impl<'a> FromIterator<&'a [u8; 20]> for PathSet32 {
+    fn from_iter<T: IntoIterator<Item = &'a [u8; 20]>>(iter: T) -> Self {
+        let iter = iter.into_iter();
+        let mut this = Self::with_capacity(iter.size_hint().0);
+
+        for item in iter {
+            this.insert(item);
+        }
+
+        this
+    }
+}
+
+struct ChunkedList<'a, T> {
+    by_offset: BTreeMap<usize, (&'a OffsetsBuffer<i64>, &'a [T])>,
+}
+
+impl<'a, T> ChunkedList<'a, T> {
+    fn new(chunks: impl IntoIterator<Item = (&'a OffsetsBuffer<i64>, &'a [T])>) -> Self {
+        let mut next_offset = 0usize;
+        ChunkedList {
+            by_offset: chunks
+                .into_iter()
+                .map(|(offsets, values)| {
+                    let offset = next_offset;
+                    next_offset = next_offset.checked_add(offsets.len_proxy()).unwrap();
+
+                    (offset, (offsets, values))
+                })
+                .collect(),
+        }
+    }
+}
+
+impl<'a, T> Index<usize> for ChunkedList<'a, T> {
+    type Output = [T];
+
+    fn index(&self, index: usize) -> &Self::Output {
+        let (&base, &(offsets, values)) = self.by_offset.range(..=index).next_back().unwrap();
+        let (start, end) = offsets.start_end(index - base);
+        &values[start..end]
+    }
+}