about summary refs log tree commit diff
path: root/tvix/tools/narinfo2parquet/src/main.rs
//! narinfo2parquet operates on a narinfo.zst directory produced by turbofetch.
//! It takes the name of a segment file in `narinfo.zst` and writes a Parquet file
//! with the same name into the `narinfo.pq` directory.
//!
//! Run it under GNU Parallel for parallelism:
//! ```shell
//! mkdir narinfo.pq && ls narinfo.zst | parallel --bar 'narinfo2parquet {}'
//! ```

use anyhow::{bail, Context, Result};
use jemallocator::Jemalloc;
use nix_compat::{
    narinfo::{self, NarInfo},
    nixbase32,
};
use polars::{io::parquet::ParquetWriter, prelude::*};
use std::{
    fs::{self, File},
    io::{self, BufRead, BufReader, Read},
    path::Path,
};
use tempfile_fast::PersistableTempFile;

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

fn main() -> Result<()> {
    let file_name = std::env::args().nth(1).expect("file name missing");
    let input_path = Path::new("narinfo.zst").join(&file_name);
    let output_path = Path::new("narinfo.pq").join(&file_name);

    match fs::metadata(&output_path) {
        Err(e) if e.kind() == io::ErrorKind::NotFound => {}
        Err(e) => bail!(e),
        Ok(_) => bail!("output path already exists: {output_path:?}"),
    }

    let reader = File::open(input_path).and_then(zstd::Decoder::new)?;
    let mut frame = FrameBuilder::default();

    for_each(reader, |s| {
        let entry = NarInfo::parse(&s).context("couldn't parse entry:\n{s}")?;
        frame.push(&entry);
        Ok(())
    })?;

    let mut frame = frame.finish();
    let mut writer = PersistableTempFile::new_in(output_path.parent().unwrap())?;

    ParquetWriter::new(&mut writer)
        .with_compression(ParquetCompression::Gzip(None))
        .with_statistics(true)
        .finish(frame.align_chunks())?;

    writer
        .persist_noclobber(output_path)
        .map_err(|e| e.error)
        .context("couldn't commit output file")?;

    Ok(())
}

fn for_each(reader: impl Read, mut f: impl FnMut(&str) -> Result<()>) -> Result<()> {
    let mut reader = BufReader::new(reader);
    let mut group = String::new();
    loop {
        let prev_len = group.len();

        if prev_len > 1024 * 1024 {
            bail!("excessively large segment");
        }

        reader.read_line(&mut group)?;
        let (prev, line) = group.split_at(prev_len);

        // EOF
        if line.is_empty() {
            break;
        }

        // skip empty line
        if line == "\n" {
            group.pop().unwrap();
            continue;
        }

        if !prev.is_empty() && line.starts_with("StorePath:") {
            f(prev)?;
            group.drain(..prev_len);
        }
    }

    if !group.is_empty() {
        f(&group)?;
    }

    Ok(())
}

/// [FrameBuilder] builds a [DataFrame] out of [NarInfo]s.
/// The exact format is still in flux.
///
/// # Example
///
/// ```no_run
/// |narinfos: &[NarInfo]| -> DataFrame {
///     let frame_builder = FrameBuilder::default();
///     narinfos.for_each(|n| frame_builder.push(n));
///     frame_builder.finish()
/// }
/// ```
struct FrameBuilder {
    store_path_hash_str: StringChunkedBuilder,
    store_path_hash: BinaryChunkedBuilder,
    store_path_name: StringChunkedBuilder,
    deriver_hash_str: StringChunkedBuilder,
    deriver_hash: BinaryChunkedBuilder,
    deriver_name: StringChunkedBuilder,
    nar_hash: BinaryChunkedBuilder,
    nar_size: PrimitiveChunkedBuilder<UInt64Type>,
    references: ListBinaryChunkedBuilder,
    ca_algo: CategoricalChunkedBuilder<'static>,
    ca_hash: BinaryChunkedBuilder,
    signature: BinaryChunkedBuilder,
    file_hash: BinaryChunkedBuilder,
    file_size: PrimitiveChunkedBuilder<UInt64Type>,
    compression: CategoricalChunkedBuilder<'static>,
    quirk_references_out_of_order: BooleanChunkedBuilder,
    quirk_nar_hash_hex: BooleanChunkedBuilder,
}

impl Default for FrameBuilder {
    fn default() -> Self {
        Self {
            store_path_hash_str: StringChunkedBuilder::new("store_path_hash_str", 0, 0),
            store_path_hash: BinaryChunkedBuilder::new("store_path_hash", 0, 0),
            store_path_name: StringChunkedBuilder::new("store_path_name", 0, 0),
            deriver_hash_str: StringChunkedBuilder::new("deriver_hash_str", 0, 0),
            deriver_hash: BinaryChunkedBuilder::new("deriver_hash", 0, 0),
            deriver_name: StringChunkedBuilder::new("deriver_name", 0, 0),
            nar_hash: BinaryChunkedBuilder::new("nar_hash", 0, 0),
            nar_size: PrimitiveChunkedBuilder::new("nar_size", 0),
            references: ListBinaryChunkedBuilder::new("references", 0, 0),
            signature: BinaryChunkedBuilder::new("signature", 0, 0),
            ca_algo: CategoricalChunkedBuilder::new("ca_algo", 0, CategoricalOrdering::Lexical),
            ca_hash: BinaryChunkedBuilder::new("ca_hash", 0, 0),
            file_hash: BinaryChunkedBuilder::new("file_hash", 0, 0),
            file_size: PrimitiveChunkedBuilder::new("file_size", 0),
            compression: CategoricalChunkedBuilder::new(
                "compression",
                0,
                CategoricalOrdering::Lexical,
            ),
            quirk_references_out_of_order: BooleanChunkedBuilder::new(
                "quirk_references_out_of_order",
                0,
            ),
            quirk_nar_hash_hex: BooleanChunkedBuilder::new("quirk_nar_hash_hex", 0),
        }
    }
}

impl FrameBuilder {
    fn push(&mut self, entry: &NarInfo) {
        self.store_path_hash_str
            .append_value(nixbase32::encode(entry.store_path.digest()));
        self.store_path_hash.append_value(entry.store_path.digest());
        self.store_path_name.append_value(entry.store_path.name());

        if let Some(deriver) = &entry.deriver {
            self.deriver_hash_str
                .append_value(nixbase32::encode(deriver.digest()));
            self.deriver_hash.append_value(deriver.digest());
            self.deriver_name.append_value(deriver.name());
        } else {
            self.deriver_hash_str.append_null();
            self.deriver_hash.append_null();
            self.deriver_name.append_null();
        }

        self.nar_hash.append_value(&entry.nar_hash);
        self.nar_size.append_value(entry.nar_size);

        self.references
            .append_values_iter(entry.references.iter().map(|r| r.digest().as_slice()));

        assert!(entry.signatures.len() <= 1);
        self.signature
            .append_option(entry.signatures.get(0).map(|sig| {
                assert_eq!(sig.name(), &"cache.nixos.org-1");
                sig.bytes()
            }));

        if let Some(ca) = &entry.ca {
            self.ca_algo.append_value(ca.algo_str());
            self.ca_hash.append_value(ca.hash().digest_as_bytes());
        } else {
            self.ca_algo.append_null();
            self.ca_hash.append_null();
        }

        let file_hash = entry.file_hash.as_ref().unwrap();
        let file_size = entry.file_size.unwrap();

        self.file_hash.append_value(file_hash);
        self.file_size.append_value(file_size);

        let (compression, extension) = match entry.compression {
            Some("bzip2") => ("bzip2", "bz2"),
            Some("xz") => ("xz", "xz"),
            Some("zstd") => ("zstd", "zst"),
            x => panic!("unknown compression algorithm: {x:?}"),
        };

        self.compression.append_value(compression);

        let mut file_name = nixbase32::encode(file_hash);
        file_name.push_str(".nar.");
        file_name.push_str(extension);

        assert_eq!(entry.url.strip_prefix("nar/").unwrap(), file_name);

        {
            use narinfo::Flags;

            self.quirk_references_out_of_order
                .append_value(entry.flags.contains(Flags::REFERENCES_OUT_OF_ORDER));

            self.quirk_nar_hash_hex
                .append_value(entry.flags.contains(Flags::NAR_HASH_HEX));

            let quirks = Flags::REFERENCES_OUT_OF_ORDER | Flags::NAR_HASH_HEX;
            let unknown_flags = entry.flags.difference(quirks);

            assert!(
                unknown_flags.is_empty(),
                "rejecting flags: {unknown_flags:?}"
            );
        }
    }

    fn finish(mut self) -> DataFrame {
        df! {
            "store_path_hash_str" => self.store_path_hash_str.finish().into_series(),
            "store_path_hash" => self.store_path_hash.finish().into_series(),
            "store_path_name" => self.store_path_name.finish().into_series(),
            "deriver_hash_str" => self.deriver_hash_str.finish().into_series(),
            "deriver_hash" => self.deriver_hash.finish().into_series(),
            "deriver_name" => self.deriver_name.finish().into_series(),
            "nar_hash" => self.nar_hash.finish().into_series(),
            "nar_size" => self.nar_size.finish().into_series(),
            "references" => self.references.finish().into_series(),
            "signature" => self.signature.finish().into_series(),
            "ca_algo" => self.ca_algo.finish().into_series(),
            "ca_hash" => self.ca_hash.finish().into_series(),
            "file_hash" => self.file_hash.finish().into_series(),
            "file_size" => self.file_size.finish().into_series(),
            "compression" => self.compression.finish().into_series(),
            "quirk_references_out_of_order" => self.quirk_references_out_of_order.finish().into_series(),
            "quirk_nar_hash_hex" => self.quirk_nar_hash_hex.finish().into_series()
        }
        .unwrap()
    }
}