//! Weave resolves a list of roots from `releases.parquet` against `narinfo.parquet`, //! and then uses the reference graph from the accompanying `narinfo-references.parquet` //! produced by `swizzle` to collect the closure of the roots. //! //! They are written to `live_idxs.parquet`, which only has one column, representing //! the row numbers in `narinfo.parquet` corresponding to live paths. use anyhow::Result; use hashbrown::{hash_table, HashTable}; use rayon::prelude::*; use rustc_hash::FxHashSet; use std::{ collections::BTreeMap, fs::File, ops::Index, sync::atomic::{AtomicU32, Ordering}, }; use tracing::{info_span, warn}; use tracing_indicatif::span_ext::IndicatifSpanExt; use polars::{ datatypes::StaticArray, export::arrow::{array::UInt32Array, offset::OffsetsBuffer}, lazy::dsl::col, prelude::*, }; use weave::{as_fixed_binary, hash64, INDEX_NULL}; #[tracing::instrument] fn main() -> Result<()> { let _tracing = tvix_tracing::TracingBuilder::default() .enable_progressbar() .build()?; let roots: PathSet32 = { let span = info_span!("parse_roots", indicatif.pb_show = tracing::field::Empty).entered(); span.pb_set_message("parse roots"); span.pb_start(); as_fixed_binary::<20>( LazyFrame::scan_parquet("releases.parquet", ScanArgsParquet::default())? .explode([col("store_path_hash")]) .select([col("store_path_hash")]) .collect()? .column("store_path_hash")? .binary()?, ) .flatten() .collect() }; { let span = info_span!("resolve_roots", indicatif.pb_show = tracing::field::Empty).entered(); span.pb_set_message("resolve roots"); span.pb_start(); weave::load_ph_array()? .into_par_iter() .enumerate() .for_each(|(idx, h)| { if let Some(idx_slot) = roots.find(h) { assert_eq!( idx_slot.swap(idx as u32, Ordering::Relaxed), INDEX_NULL, "duplicate entry" ); } }); } let mut todo = FxHashSet::default(); todo.reserve(roots.len()); { let mut unknown_roots = 0usize; for (_, idx) in roots.table { let idx = idx.into_inner(); if idx == INDEX_NULL { unknown_roots += 1; continue; } todo.insert(idx); } if unknown_roots != 0 { warn!("skipping {unknown_roots} unknown roots"); } } let ri_array; let ri_array = { let span = info_span!( "load_reference_idxs", indicatif.pb_show = tracing::field::Empty ) .entered(); span.pb_set_message("load reference_idxs"); span.pb_start(); ri_array = ParquetReader::new(File::open("narinfo-references.parquet")?) .finish()? .column("reference_idxs")? .list()? .clone(); ChunkedList::new(ri_array.downcast_iter().map(|chunk| { ( chunk.offsets(), chunk .values() .as_any() .downcast_ref::<UInt32Array>() .unwrap() .as_slice() .unwrap(), ) })) }; let mut seen = todo.clone(); { let span = info_span!("mark", indicatif.pb_show = tracing::field::Empty).entered(); span.pb_set_message("marking"); span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); while !todo.is_empty() { span.pb_set_length(seen.len() as u64); span.pb_set_position(seen.len().saturating_sub(todo.len()) as u64); todo = todo .par_iter() .flat_map(|&parent| { if parent == INDEX_NULL { return FxHashSet::default(); } ri_array[parent as usize] .iter() .cloned() .filter(|child| !seen.contains(child)) .collect::<FxHashSet<u32>>() }) .collect(); for &index in &todo { seen.insert(index); } } span.pb_set_length(seen.len() as u64); span.pb_set_position(seen.len() as u64); if seen.remove(&INDEX_NULL) { warn!("WARNING: missing edges"); } } let seen = { let span = info_span!("gather_live", indicatif.pb_show = tracing::field::Empty).entered(); span.pb_set_message("gathering live set"); let mut seen: Vec<u32> = seen.into_iter().collect(); seen.par_sort(); seen }; { let span = info_span!("write_output", indicatif.pb_show = tracing::field::Empty).entered(); span.pb_set_message("writing output"); span.pb_start(); ParquetWriter::new(File::create("live_idxs.parquet")?).finish(&mut df! { "live_idx" => seen, }?)?; } Ok(()) } struct PathSet32 { table: HashTable<([u8; 20], AtomicU32)>, } impl PathSet32 { fn with_capacity(capacity: usize) -> Self { Self { table: HashTable::with_capacity(capacity), } } fn insert(&mut self, value: &[u8; 20]) -> bool { let hash = hash64(value); match self .table .entry(hash, |(x, _)| x == value, |(x, _)| hash64(x)) { hash_table::Entry::Occupied(_) => false, hash_table::Entry::Vacant(entry) => { entry.insert((*value, AtomicU32::new(INDEX_NULL))); true } } } fn find(&self, value: &[u8; 20]) -> Option<&AtomicU32> { let hash = hash64(value); self.table .find(hash, |(x, _)| x == value) .as_ref() .map(|(_, x)| x) } fn len(&self) -> usize { self.table.len() } } impl<'a> FromIterator<&'a [u8; 20]> for PathSet32 { fn from_iter<T: IntoIterator<Item = &'a [u8; 20]>>(iter: T) -> Self { let iter = iter.into_iter(); let mut this = Self::with_capacity(iter.size_hint().0); for item in iter { this.insert(item); } this.table.shrink_to_fit(|(x, _)| hash64(x)); this } } struct ChunkedList<'a, T> { by_offset: BTreeMap<usize, (&'a OffsetsBuffer<i64>, &'a [T])>, } impl<'a, T> ChunkedList<'a, T> { fn new(chunks: impl IntoIterator<Item = (&'a OffsetsBuffer<i64>, &'a [T])>) -> Self { let mut next_offset = 0usize; ChunkedList { by_offset: chunks .into_iter() .map(|(offsets, values)| { let offset = next_offset; next_offset = next_offset.checked_add(offsets.len_proxy()).unwrap(); (offset, (offsets, values)) }) .collect(), } } } impl<'a, T> Index<usize> for ChunkedList<'a, T> { type Output = [T]; fn index(&self, index: usize) -> &Self::Output { let (&base, &(offsets, values)) = self.by_offset.range(..=index).next_back().unwrap(); let (start, end) = offsets.start_end(index - base); &values[start..end] } }