about summary refs log tree commit diff
path: root/users/edef/weave/src/bin/swizzle.rs
//! Swizzle reads a `narinfo.parquet` file, usually produced by `narinfo2parquet`.
//!
//! It swizzles the reference list, ie it converts the references from absolute,
//! global identifiers (store path hashes) to indices into the `store_path_hash`
//! column (ie, row numbers), so that we can later walk the reference graph
//! efficiently.
//!
//! Path hashes are represented as non-null, 20-byte `Binary` values.
//! The indices are represented as 32-bit unsigned integers, with in-band nulls
//! represented by [INDEX_NULL] (the all-1 bit pattern), to permit swizzling
//! partial datasets.
//!
//! In essence, it converts from names to pointers, so that `weave` can simply
//! chase pointers to trace the live set. This replaces an `O(log(n))` lookup
//! with `O(1)` indexing, and produces a much denser representation that actually
//! fits in memory.
//!
//! The in-memory representation is at least 80% smaller, and the indices compress
//! well in Parquet due to both temporal locality of reference and the power law
//! distribution of reference "popularity".
//!
//! Only two columns are read from `narinfo.parquet`:
//!
//!  * `store_path_hash :: PathHash`
//!  * `references :: List[PathHash]`
//!
//! Output is written to `narinfo-references.parquet` in the form of a single
//! `List[u32]` column, `reference_idxs`.
//!
//! This file is inherently bound to the corresponding `narinfo.parquet`,
//! since it essentially contains pointers into this file.

use anyhow::Result;
use hashbrown::HashTable;
use polars::{
    lazy::dsl::{col, SpecialEq},
    prelude::*,
};
use tracing::info_span;
use tracing_indicatif::span_ext::IndicatifSpanExt as _;

use weave::{as_fixed_binary, hash64, leak, load_ph_array, INDEX_NULL};

#[tracing::instrument]
fn main() -> Result<()> {
    let _tracing = tvix_tracing::TracingBuilder::default()
        .enable_progressbar()
        .build()?;

    let ph_array: &'static [[u8; 20]] = leak(load_ph_array()?);

    // TODO(edef): re-parallelise this
    // We originally parallelised on chunks, but ph_array is only a single chunk, due to how Parquet loading works.
    // TODO(edef): outline the 64-bit hash prefix? it's an indirection, but it saves ~2G of memory
    let ph_map: &'static HashTable<(u64, u32)> = {
        let span = info_span!("ph_map", indicatif.pb_show = tracing::field::Empty).entered();
        span.pb_set_message("build index");
        span.pb_start();

        let mut ph_map = HashTable::with_capacity(ph_array.len());

        for (offset, item) in ph_array.iter().enumerate() {
            let offset = offset as u32;
            let hash = hash64(item);
            ph_map.insert_unique(hash, (hash, offset), |&(hash, _)| hash);
        }

        &*Box::leak(Box::new(ph_map))
    };

    let ph_to_idx = |key: &[u8; 20]| -> u32 {
        let hash = hash64(key);
        ph_map
            .find(hash, |&(candidate_hash, candidate_index)| {
                candidate_hash == hash && &ph_array[candidate_index as usize] == key
            })
            .map(|&(_, index)| index)
            .unwrap_or(INDEX_NULL)
    };

    {
        let span = info_span!("swizzle_refs", indicatif.pb_show = tracing::field::Empty).entered();
        span.pb_set_message("swizzle references");
        span.pb_start();

        LazyFrame::scan_parquet("narinfo.parquet", ScanArgsParquet::default())?
            .with_column(
                col("references")
                    .map(
                        move |series: Series| -> PolarsResult<Option<Series>> {
                            Ok(Some(
                                series
                                    .list()?
                                    .apply_to_inner(&|series: Series| -> PolarsResult<Series> {
                                        let series = series.binary()?;
                                        let mut out: Vec<u32> = Vec::with_capacity(series.len());
                                        out.extend(
                                            as_fixed_binary(series).flatten().map(ph_to_idx),
                                        );
                                        Ok(Series::from_vec("reference_idxs", out))
                                    })?
                                    .into_series(),
                            ))
                        },
                        SpecialEq::from_type(DataType::List(DataType::UInt32.into())),
                    )
                    .alias("reference_idxs"),
            )
            .select([col("reference_idxs")])
            .with_streaming(true)
            .sink_parquet(
                "narinfo-references.parquet".into(),
                ParquetWriteOptions::default(),
            )?;
    };

    Ok(())
}