//! Weave resolves a list of roots from `releases.parquet` against `narinfo.parquet`,
//! and then uses the reference graph from the accompanying `narinfo-references.parquet`
//! produced by `swizzle` to collect the closure of the roots.
//!
//! They are written to `live_idxs.parquet`, which only has one column, representing
//! the row numbers in `narinfo.parquet` corresponding to live paths.
use anyhow::Result;
use hashbrown::{hash_table, HashTable};
use rayon::prelude::*;
use rustc_hash::FxHashSet;
use std::{
collections::BTreeMap,
fs::File,
ops::Index,
sync::atomic::{AtomicU32, Ordering},
};
use tracing::{info_span, warn};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use polars::{
datatypes::StaticArray,
export::arrow::{array::UInt32Array, offset::OffsetsBuffer},
lazy::dsl::col,
prelude::*,
};
use weave::{as_fixed_binary, hash64, INDEX_NULL};
#[tracing::instrument]
fn main() -> Result<()> {
let _tracing = tvix_tracing::TracingBuilder::default()
.enable_progressbar()
.build()?;
let roots: PathSet32 = {
let span = info_span!("parse_roots", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("parse roots");
span.pb_start();
as_fixed_binary::<20>(
LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())?
.explode([col("store_path_hash")])
.select([col("store_path_hash")])
.collect()?
.column("store_path_hash")?
.binary()?,
)
.flatten()
.collect()
};
{
let span = info_span!("resolve_roots", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("resolve roots");
span.pb_start();
weave::load_ph_array()?
.into_par_iter()
.enumerate()
.for_each(|(idx, h)| {
if let Some(idx_slot) = roots.find(h) {
assert_eq!(
idx_slot.swap(idx as u32, Ordering::Relaxed),
INDEX_NULL,
"duplicate entry"
);
}
});
}
let mut todo = FxHashSet::default();
todo.reserve(roots.len());
{
let mut unknown_roots = 0usize;
for (_, idx) in roots.table {
let idx = idx.into_inner();
if idx == INDEX_NULL {
unknown_roots += 1;
continue;
}
todo.insert(idx);
}
if unknown_roots != 0 {
warn!("skipping {unknown_roots} unknown roots");
}
}
let ri_array;
let ri_array = {
let span = info_span!(
"load_reference_idxs",
indicatif.pb_show = tracing::field::Empty
)
.entered();
span.pb_set_message("load reference_idxs");
span.pb_start();
ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?)
.finish()?
.column("reference_idxs")?
.list()?
.clone();
ChunkedList::new(ri_array.downcast_iter().map(|chunk| {
(
chunk.offsets(),
chunk
.values()
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.as_slice()
.unwrap(),
)
}))
};
let mut seen = todo.clone();
{
let span = info_span!("mark", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("marking");
span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
while !todo.is_empty() {
span.pb_set_length(seen.len() as u64);
span.pb_set_position(seen.len().saturating_sub(todo.len()) as u64);
todo = todo
.par_iter()
.flat_map(|&parent| {
if parent == INDEX_NULL {
return FxHashSet::default();
}
ri_array[parent as usize]
.iter()
.cloned()
.filter(|child| !seen.contains(child))
.collect::<FxHashSet<u32>>()
})
.collect();
for &index in &todo {
seen.insert(index);
}
}
span.pb_set_length(seen.len() as u64);
span.pb_set_position(seen.len() as u64);
if seen.remove(&INDEX_NULL) {
warn!("WARNING: missing edges");
}
}
let seen = {
let span = info_span!("gather_live", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("gathering live set");
let mut seen: Vec<u32> = seen.into_iter().collect();
seen.par_sort();
seen
};
{
let span = info_span!("write_output", indicatif.pb_show = tracing::field::Empty).entered();
span.pb_set_message("writing output");
span.pb_start();
ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! {
"live_idx" => seen,
}?)?;
}
Ok(())
}
struct PathSet32 {
table: HashTable<([u8; 20], AtomicU32)>,
}
impl PathSet32 {
fn with_capacity(capacity: usize) -> Self {
Self {
table: HashTable::with_capacity(capacity),
}
}
fn insert(&mut self, value: &[u8; 20]) -> bool {
let hash = hash64(value);
match self
.table
.entry(hash, |(x, _)| x == value, |(x, _)| hash64(x))
{
hash_table::Entry::Occupied(_) => false,
hash_table::Entry::Vacant(entry) => {
entry.insert((*value, AtomicU32::new(INDEX_NULL)));
true
}
}
}
fn find(&self, value: &[u8; 20]) -> Option<&AtomicU32> {
let hash = hash64(value);
self.table
.find(hash, |(x, _)| x == value)
.as_ref()
.map(|(_, x)| x)
}
fn len(&self) -> usize {
self.table.len()
}
}
impl<'a> FromIterator<&'a [u8; 20]> for PathSet32 {
fn from_iter<T: IntoIterator<Item = &'a [u8; 20]>>(iter: T) -> Self {
let iter = iter.into_iter();
let mut this = Self::with_capacity(iter.size_hint().0);
for item in iter {
this.insert(item);
}
this.table.shrink_to_fit(|(x, _)| hash64(x));
this
}
}
struct ChunkedList<'a, T> {
by_offset: BTreeMap<usize, (&'a OffsetsBuffer<i64>, &'a [T])>,
}
impl<'a, T> ChunkedList<'a, T> {
fn new(chunks: impl IntoIterator<Item = (&'a OffsetsBuffer<i64>, &'a [T])>) -> Self {
let mut next_offset = 0usize;
ChunkedList {
by_offset: chunks
.into_iter()
.map(|(offsets, values)| {
let offset = next_offset;
next_offset = next_offset.checked_add(offsets.len_proxy()).unwrap();
(offset, (offsets, values))
})
.collect(),
}
}
}
impl<'a, T> Index<usize> for ChunkedList<'a, T> {
type Output = [T];
fn index(&self, index: usize) -> &Self::Output {
let (&base, &(offsets, values)) = self.by_offset.range(..=index).next_back().unwrap();
let (start, end) = offsets.start_end(index - base);
&values[start..end]
}
}