diff options
author | Vincent Ambo <mail@tazj.in> | 2024-10-12T23·00+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-10-12T23·17+0000 |
commit | cb032b250e7b46ebc28038c8efb48f6a9c0ac75b (patch) | |
tree | 1a49250019a952e7ec1e0bc65f4213596c4f2992 /users/edef/narinfo2parquet/src/main.rs | |
parent | bb34210a6457b1d4d433531d5788c4e819f3eb11 (diff) |
chore(tvix/tools): move narinfo2parquet to //users/edef r/8801
This is not a core Tvix tool, it's a tool that uses a Tvix component. Change-Id: I81d2b2374da23489df0097dcabb8295c82652fc1 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12606 Reviewed-by: edef <edef@edef.eu> Tested-by: BuildkiteCI Autosubmit: tazjin <tazjin@tvl.su>
Diffstat (limited to 'users/edef/narinfo2parquet/src/main.rs')
-rw-r--r-- | users/edef/narinfo2parquet/src/main.rs | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/users/edef/narinfo2parquet/src/main.rs b/users/edef/narinfo2parquet/src/main.rs new file mode 100644 index 000000000000..ea3d39af5503 --- /dev/null +++ b/users/edef/narinfo2parquet/src/main.rs @@ -0,0 +1,264 @@ +//! narinfo2parquet operates on a narinfo.zst directory produced by turbofetch. +//! It takes the name of a segment file in `narinfo.zst` and writes a Parquet file +//! with the same name into the `narinfo.pq` directory. +//! +//! Run it under GNU Parallel for parallelism: +//! ```shell +//! mkdir narinfo.pq && ls narinfo.zst | parallel --bar 'narinfo2parquet {}' +//! ``` + +use anyhow::{bail, Context, Result}; +use jemallocator::Jemalloc; +use nix_compat::{ + narinfo::{self, NarInfo}, + nixbase32, +}; +use polars::{io::parquet::ParquetWriter, prelude::*}; +use std::{ + fs::{self, File}, + io::{self, BufRead, BufReader, Read}, + path::Path, +}; +use tempfile_fast::PersistableTempFile; + +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +fn main() -> Result<()> { + let file_name = std::env::args().nth(1).expect("file name missing"); + let input_path = Path::new("narinfo.zst").join(&file_name); + let output_path = Path::new("narinfo.pq").join(&file_name); + + match fs::metadata(&output_path) { + Err(e) if e.kind() == io::ErrorKind::NotFound => {} + Err(e) => bail!(e), + Ok(_) => bail!("output path already exists: {output_path:?}"), + } + + let reader = File::open(input_path).and_then(zstd::Decoder::new)?; + let mut frame = FrameBuilder::default(); + + for_each(reader, |s| { + let entry = NarInfo::parse(&s).context("couldn't parse entry:\n{s}")?; + frame.push(&entry); + Ok(()) + })?; + + let mut frame = frame.finish(); + let mut writer = PersistableTempFile::new_in(output_path.parent().unwrap())?; + + ParquetWriter::new(&mut writer) + .with_compression(ParquetCompression::Gzip(None)) + .with_statistics(true) + .finish(frame.align_chunks())?; + + writer + .persist_noclobber(output_path) + .map_err(|e| e.error) + .context("couldn't commit output file")?; + + Ok(()) +} + +fn for_each(reader: impl Read, mut f: impl FnMut(&str) -> Result<()>) -> Result<()> { + let mut reader = BufReader::new(reader); + let mut group = String::new(); + loop { + let prev_len = group.len(); + + if prev_len > 1024 * 1024 { + bail!("excessively large segment"); + } + + reader.read_line(&mut group)?; + let (prev, line) = group.split_at(prev_len); + + // EOF + if line.is_empty() { + break; + } + + // skip empty line + if line == "\n" { + group.pop().unwrap(); + continue; + } + + if !prev.is_empty() && line.starts_with("StorePath:") { + f(prev)?; + group.drain(..prev_len); + } + } + + if !group.is_empty() { + f(&group)?; + } + + Ok(()) +} + +/// [FrameBuilder] builds a [DataFrame] out of [NarInfo]s. +/// The exact format is still in flux. +/// +/// # Example +/// +/// ```no_run +/// |narinfos: &[NarInfo]| -> DataFrame { +/// let frame_builder = FrameBuilder::default(); +/// narinfos.for_each(|n| frame_builder.push(n)); +/// frame_builder.finish() +/// } +/// ``` +struct FrameBuilder { + store_path_hash_str: StringChunkedBuilder, + store_path_hash: BinaryChunkedBuilder, + store_path_name: StringChunkedBuilder, + deriver_hash_str: StringChunkedBuilder, + deriver_hash: BinaryChunkedBuilder, + deriver_name: StringChunkedBuilder, + nar_hash: BinaryChunkedBuilder, + nar_size: PrimitiveChunkedBuilder<UInt64Type>, + references: ListBinaryChunkedBuilder, + ca_algo: CategoricalChunkedBuilder<'static>, + ca_hash: BinaryChunkedBuilder, + signature: BinaryChunkedBuilder, + file_hash: BinaryChunkedBuilder, + file_size: PrimitiveChunkedBuilder<UInt64Type>, + compression: CategoricalChunkedBuilder<'static>, + quirk_references_out_of_order: BooleanChunkedBuilder, + quirk_nar_hash_hex: BooleanChunkedBuilder, +} + +impl Default for FrameBuilder { + fn default() -> Self { + Self { + store_path_hash_str: StringChunkedBuilder::new("store_path_hash_str", 0, 0), + store_path_hash: BinaryChunkedBuilder::new("store_path_hash", 0, 0), + store_path_name: StringChunkedBuilder::new("store_path_name", 0, 0), + deriver_hash_str: StringChunkedBuilder::new("deriver_hash_str", 0, 0), + deriver_hash: BinaryChunkedBuilder::new("deriver_hash", 0, 0), + deriver_name: StringChunkedBuilder::new("deriver_name", 0, 0), + nar_hash: BinaryChunkedBuilder::new("nar_hash", 0, 0), + nar_size: PrimitiveChunkedBuilder::new("nar_size", 0), + references: ListBinaryChunkedBuilder::new("references", 0, 0), + signature: BinaryChunkedBuilder::new("signature", 0, 0), + ca_algo: CategoricalChunkedBuilder::new("ca_algo", 0, CategoricalOrdering::Lexical), + ca_hash: BinaryChunkedBuilder::new("ca_hash", 0, 0), + file_hash: BinaryChunkedBuilder::new("file_hash", 0, 0), + file_size: PrimitiveChunkedBuilder::new("file_size", 0), + compression: CategoricalChunkedBuilder::new( + "compression", + 0, + CategoricalOrdering::Lexical, + ), + quirk_references_out_of_order: BooleanChunkedBuilder::new( + "quirk_references_out_of_order", + 0, + ), + quirk_nar_hash_hex: BooleanChunkedBuilder::new("quirk_nar_hash_hex", 0), + } + } +} + +impl FrameBuilder { + fn push(&mut self, entry: &NarInfo) { + self.store_path_hash_str + .append_value(nixbase32::encode(entry.store_path.digest())); + self.store_path_hash.append_value(entry.store_path.digest()); + self.store_path_name.append_value(entry.store_path.name()); + + if let Some(deriver) = &entry.deriver { + self.deriver_hash_str + .append_value(nixbase32::encode(deriver.digest())); + self.deriver_hash.append_value(deriver.digest()); + self.deriver_name.append_value(deriver.name()); + } else { + self.deriver_hash_str.append_null(); + self.deriver_hash.append_null(); + self.deriver_name.append_null(); + } + + self.nar_hash.append_value(&entry.nar_hash); + self.nar_size.append_value(entry.nar_size); + + self.references + .append_values_iter(entry.references.iter().map(|r| r.digest().as_slice())); + + assert!(entry.signatures.len() <= 1); + self.signature + .append_option(entry.signatures.get(0).map(|sig| { + assert_eq!(sig.name(), &"cache.nixos.org-1"); + sig.bytes() + })); + + if let Some(ca) = &entry.ca { + self.ca_algo.append_value(ca.algo_str()); + self.ca_hash.append_value(ca.hash().digest_as_bytes()); + } else { + self.ca_algo.append_null(); + self.ca_hash.append_null(); + } + + let file_hash = entry.file_hash.as_ref().unwrap(); + let file_size = entry.file_size.unwrap(); + + self.file_hash.append_value(file_hash); + self.file_size.append_value(file_size); + + let (compression, extension) = match entry.compression { + Some("bzip2") => ("bzip2", "bz2"), + Some("xz") => ("xz", "xz"), + Some("zstd") => ("zstd", "zst"), + x => panic!("unknown compression algorithm: {x:?}"), + }; + + self.compression.append_value(compression); + + let mut file_name = nixbase32::encode(file_hash); + file_name.push_str(".nar."); + file_name.push_str(extension); + + assert_eq!(entry.url.strip_prefix("nar/").unwrap(), file_name); + + { + use narinfo::Flags; + + self.quirk_references_out_of_order + .append_value(entry.flags.contains(Flags::REFERENCES_OUT_OF_ORDER)); + + self.quirk_nar_hash_hex + .append_value(entry.flags.contains(Flags::NAR_HASH_HEX)); + + let quirks = Flags::REFERENCES_OUT_OF_ORDER | Flags::NAR_HASH_HEX; + let unknown_flags = entry.flags.difference(quirks); + + assert!( + unknown_flags.is_empty(), + "rejecting flags: {unknown_flags:?}" + ); + } + } + + fn finish(mut self) -> DataFrame { + df! { + "store_path_hash_str" => self.store_path_hash_str.finish().into_series(), + "store_path_hash" => self.store_path_hash.finish().into_series(), + "store_path_name" => self.store_path_name.finish().into_series(), + "deriver_hash_str" => self.deriver_hash_str.finish().into_series(), + "deriver_hash" => self.deriver_hash.finish().into_series(), + "deriver_name" => self.deriver_name.finish().into_series(), + "nar_hash" => self.nar_hash.finish().into_series(), + "nar_size" => self.nar_size.finish().into_series(), + "references" => self.references.finish().into_series(), + "signature" => self.signature.finish().into_series(), + "ca_algo" => self.ca_algo.finish().into_series(), + "ca_hash" => self.ca_hash.finish().into_series(), + "file_hash" => self.file_hash.finish().into_series(), + "file_size" => self.file_size.finish().into_series(), + "compression" => self.compression.finish().into_series(), + "quirk_references_out_of_order" => self.quirk_references_out_of_order.finish().into_series(), + "quirk_nar_hash_hex" => self.quirk_nar_hash_hex.finish().into_series() + } + .unwrap() + } +} |