diff options
Diffstat (limited to 'tvix/tools/narinfo2parquet/src/main.rs')
-rw-r--r-- | tvix/tools/narinfo2parquet/src/main.rs | 264 |
1 files changed, 0 insertions, 264 deletions
diff --git a/tvix/tools/narinfo2parquet/src/main.rs b/tvix/tools/narinfo2parquet/src/main.rs deleted file mode 100644 index ea3d39af5503..000000000000 --- a/tvix/tools/narinfo2parquet/src/main.rs +++ /dev/null @@ -1,264 +0,0 @@ -//! narinfo2parquet operates on a narinfo.zst directory produced by turbofetch. -//! It takes the name of a segment file in `narinfo.zst` and writes a Parquet file -//! with the same name into the `narinfo.pq` directory. -//! -//! Run it under GNU Parallel for parallelism: -//! ```shell -//! mkdir narinfo.pq && ls narinfo.zst | parallel --bar 'narinfo2parquet {}' -//! ``` - -use anyhow::{bail, Context, Result}; -use jemallocator::Jemalloc; -use nix_compat::{ - narinfo::{self, NarInfo}, - nixbase32, -}; -use polars::{io::parquet::ParquetWriter, prelude::*}; -use std::{ - fs::{self, File}, - io::{self, BufRead, BufReader, Read}, - path::Path, -}; -use tempfile_fast::PersistableTempFile; - -#[global_allocator] -static GLOBAL: Jemalloc = Jemalloc; - -fn main() -> Result<()> { - let file_name = std::env::args().nth(1).expect("file name missing"); - let input_path = Path::new("narinfo.zst").join(&file_name); - let output_path = Path::new("narinfo.pq").join(&file_name); - - match fs::metadata(&output_path) { - Err(e) if e.kind() == io::ErrorKind::NotFound => {} - Err(e) => bail!(e), - Ok(_) => bail!("output path already exists: {output_path:?}"), - } - - let reader = File::open(input_path).and_then(zstd::Decoder::new)?; - let mut frame = FrameBuilder::default(); - - for_each(reader, |s| { - let entry = NarInfo::parse(&s).context("couldn't parse entry:\n{s}")?; - frame.push(&entry); - Ok(()) - })?; - - let mut frame = frame.finish(); - let mut writer = PersistableTempFile::new_in(output_path.parent().unwrap())?; - - ParquetWriter::new(&mut writer) - .with_compression(ParquetCompression::Gzip(None)) - .with_statistics(true) - .finish(frame.align_chunks())?; - - writer - .persist_noclobber(output_path) - .map_err(|e| e.error) - .context("couldn't commit output file")?; - - Ok(()) -} - -fn for_each(reader: impl Read, mut f: impl FnMut(&str) -> Result<()>) -> Result<()> { - let mut reader = BufReader::new(reader); - let mut group = String::new(); - loop { - let prev_len = group.len(); - - if prev_len > 1024 * 1024 { - bail!("excessively large segment"); - } - - reader.read_line(&mut group)?; - let (prev, line) = group.split_at(prev_len); - - // EOF - if line.is_empty() { - break; - } - - // skip empty line - if line == "\n" { - group.pop().unwrap(); - continue; - } - - if !prev.is_empty() && line.starts_with("StorePath:") { - f(prev)?; - group.drain(..prev_len); - } - } - - if !group.is_empty() { - f(&group)?; - } - - Ok(()) -} - -/// [FrameBuilder] builds a [DataFrame] out of [NarInfo]s. -/// The exact format is still in flux. -/// -/// # Example -/// -/// ```no_run -/// |narinfos: &[NarInfo]| -> DataFrame { -/// let frame_builder = FrameBuilder::default(); -/// narinfos.for_each(|n| frame_builder.push(n)); -/// frame_builder.finish() -/// } -/// ``` -struct FrameBuilder { - store_path_hash_str: StringChunkedBuilder, - store_path_hash: BinaryChunkedBuilder, - store_path_name: StringChunkedBuilder, - deriver_hash_str: StringChunkedBuilder, - deriver_hash: BinaryChunkedBuilder, - deriver_name: StringChunkedBuilder, - nar_hash: BinaryChunkedBuilder, - nar_size: PrimitiveChunkedBuilder<UInt64Type>, - references: ListBinaryChunkedBuilder, - ca_algo: CategoricalChunkedBuilder<'static>, - ca_hash: BinaryChunkedBuilder, - signature: BinaryChunkedBuilder, - file_hash: BinaryChunkedBuilder, - file_size: PrimitiveChunkedBuilder<UInt64Type>, - compression: CategoricalChunkedBuilder<'static>, - quirk_references_out_of_order: BooleanChunkedBuilder, - quirk_nar_hash_hex: BooleanChunkedBuilder, -} - -impl Default for FrameBuilder { - fn default() -> Self { - Self { - store_path_hash_str: StringChunkedBuilder::new("store_path_hash_str", 0, 0), - store_path_hash: BinaryChunkedBuilder::new("store_path_hash", 0, 0), - store_path_name: StringChunkedBuilder::new("store_path_name", 0, 0), - deriver_hash_str: StringChunkedBuilder::new("deriver_hash_str", 0, 0), - deriver_hash: BinaryChunkedBuilder::new("deriver_hash", 0, 0), - deriver_name: StringChunkedBuilder::new("deriver_name", 0, 0), - nar_hash: BinaryChunkedBuilder::new("nar_hash", 0, 0), - nar_size: PrimitiveChunkedBuilder::new("nar_size", 0), - references: ListBinaryChunkedBuilder::new("references", 0, 0), - signature: BinaryChunkedBuilder::new("signature", 0, 0), - ca_algo: CategoricalChunkedBuilder::new("ca_algo", 0, CategoricalOrdering::Lexical), - ca_hash: BinaryChunkedBuilder::new("ca_hash", 0, 0), - file_hash: BinaryChunkedBuilder::new("file_hash", 0, 0), - file_size: PrimitiveChunkedBuilder::new("file_size", 0), - compression: CategoricalChunkedBuilder::new( - "compression", - 0, - CategoricalOrdering::Lexical, - ), - quirk_references_out_of_order: BooleanChunkedBuilder::new( - "quirk_references_out_of_order", - 0, - ), - quirk_nar_hash_hex: BooleanChunkedBuilder::new("quirk_nar_hash_hex", 0), - } - } -} - -impl FrameBuilder { - fn push(&mut self, entry: &NarInfo) { - self.store_path_hash_str - .append_value(nixbase32::encode(entry.store_path.digest())); - self.store_path_hash.append_value(entry.store_path.digest()); - self.store_path_name.append_value(entry.store_path.name()); - - if let Some(deriver) = &entry.deriver { - self.deriver_hash_str - .append_value(nixbase32::encode(deriver.digest())); - self.deriver_hash.append_value(deriver.digest()); - self.deriver_name.append_value(deriver.name()); - } else { - self.deriver_hash_str.append_null(); - self.deriver_hash.append_null(); - self.deriver_name.append_null(); - } - - self.nar_hash.append_value(&entry.nar_hash); - self.nar_size.append_value(entry.nar_size); - - self.references - .append_values_iter(entry.references.iter().map(|r| r.digest().as_slice())); - - assert!(entry.signatures.len() <= 1); - self.signature - .append_option(entry.signatures.get(0).map(|sig| { - assert_eq!(sig.name(), &"cache.nixos.org-1"); - sig.bytes() - })); - - if let Some(ca) = &entry.ca { - self.ca_algo.append_value(ca.algo_str()); - self.ca_hash.append_value(ca.hash().digest_as_bytes()); - } else { - self.ca_algo.append_null(); - self.ca_hash.append_null(); - } - - let file_hash = entry.file_hash.as_ref().unwrap(); - let file_size = entry.file_size.unwrap(); - - self.file_hash.append_value(file_hash); - self.file_size.append_value(file_size); - - let (compression, extension) = match entry.compression { - Some("bzip2") => ("bzip2", "bz2"), - Some("xz") => ("xz", "xz"), - Some("zstd") => ("zstd", "zst"), - x => panic!("unknown compression algorithm: {x:?}"), - }; - - self.compression.append_value(compression); - - let mut file_name = nixbase32::encode(file_hash); - file_name.push_str(".nar."); - file_name.push_str(extension); - - assert_eq!(entry.url.strip_prefix("nar/").unwrap(), file_name); - - { - use narinfo::Flags; - - self.quirk_references_out_of_order - .append_value(entry.flags.contains(Flags::REFERENCES_OUT_OF_ORDER)); - - self.quirk_nar_hash_hex - .append_value(entry.flags.contains(Flags::NAR_HASH_HEX)); - - let quirks = Flags::REFERENCES_OUT_OF_ORDER | Flags::NAR_HASH_HEX; - let unknown_flags = entry.flags.difference(quirks); - - assert!( - unknown_flags.is_empty(), - "rejecting flags: {unknown_flags:?}" - ); - } - } - - fn finish(mut self) -> DataFrame { - df! { - "store_path_hash_str" => self.store_path_hash_str.finish().into_series(), - "store_path_hash" => self.store_path_hash.finish().into_series(), - "store_path_name" => self.store_path_name.finish().into_series(), - "deriver_hash_str" => self.deriver_hash_str.finish().into_series(), - "deriver_hash" => self.deriver_hash.finish().into_series(), - "deriver_name" => self.deriver_name.finish().into_series(), - "nar_hash" => self.nar_hash.finish().into_series(), - "nar_size" => self.nar_size.finish().into_series(), - "references" => self.references.finish().into_series(), - "signature" => self.signature.finish().into_series(), - "ca_algo" => self.ca_algo.finish().into_series(), - "ca_hash" => self.ca_hash.finish().into_series(), - "file_hash" => self.file_hash.finish().into_series(), - "file_size" => self.file_size.finish().into_series(), - "compression" => self.compression.finish().into_series(), - "quirk_references_out_of_order" => self.quirk_references_out_of_order.finish().into_series(), - "quirk_nar_hash_hex" => self.quirk_nar_hash_hex.finish().into_series() - } - .unwrap() - } -} |