about summary refs log tree commit diff
path: root/tvix/tools/narinfo2parquet/src
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2023-11-08T04·44+0000
committeredef <edef@edef.eu>2023-11-10T19·35+0000
commit8d02928b1493087b7f9e458411416ed268ce2112 (patch)
treefd615a5a4be1bc5d4353574ff192f1e3c911fca0 /tvix/tools/narinfo2parquet/src
parent45de341794b95db2e8af6dec6a8fe279cfb6c67c (diff)
feat(tvix/tools/narinfo2parquet): init r/6988
Convert turbofetch output to queryable Parquet.

Change-Id: I076f5a431f8aab8cfe7d973bdc9fe019cebde111
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9989
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/tools/narinfo2parquet/src')
-rw-r--r--tvix/tools/narinfo2parquet/src/main.rs260
1 files changed, 260 insertions, 0 deletions
diff --git a/tvix/tools/narinfo2parquet/src/main.rs b/tvix/tools/narinfo2parquet/src/main.rs
new file mode 100644
index 000000000000..50eab0a79675
--- /dev/null
+++ b/tvix/tools/narinfo2parquet/src/main.rs
@@ -0,0 +1,260 @@
+//! narinfo2parquet operates on a narinfo.zst directory produced by turbofetch.
+//! It takes the name of a segment file in `narinfo.zst` and writes a Parquet file
+//! with the same name into the `narinfo.pq` directory.
+//!
+//! Run it under GNU Parallel for parallelism:
+//! ```shell
+//! mkdir narinfo.pq && ls narinfo.zst | parallel --bar 'narinfo2parquet {}'
+//! ```
+
+use anyhow::{bail, Context, Result};
+use jemallocator::Jemalloc;
+use nix_compat::{
+    narinfo::{self, NarInfo},
+    nixbase32,
+    nixhash::{CAHash, NixHash},
+};
+use polars::{io::parquet::ParquetWriter, prelude::*};
+use std::{
+    fs::{self, File},
+    io::{self, BufRead, BufReader, Read},
+    path::Path,
+};
+use tempfile_fast::PersistableTempFile;
+
+#[global_allocator]
+static GLOBAL: Jemalloc = Jemalloc;
+
+fn main() -> Result<()> {
+    let file_name = std::env::args().nth(1).expect("file name missing");
+    let input_path = Path::new("narinfo.zst").join(&file_name);
+    let output_path = Path::new("narinfo.pq").join(&file_name);
+
+    match fs::metadata(&output_path) {
+        Err(e) if e.kind() == io::ErrorKind::NotFound => {}
+        Err(e) => bail!(e),
+        Ok(_) => bail!("output path already exists: {output_path:?}"),
+    }
+
+    let reader = File::open(input_path).and_then(zstd::Decoder::new)?;
+    let mut frame = FrameBuilder::default();
+
+    for_each(reader, |s| {
+        let entry = NarInfo::parse(&s).context("couldn't parse entry:\n{s}")?;
+        frame.push(&entry);
+        Ok(())
+    })?;
+
+    let mut frame = frame.finish();
+    let mut writer = PersistableTempFile::new_in(output_path.parent().unwrap())?;
+
+    ParquetWriter::new(&mut writer)
+        .with_compression(ParquetCompression::Gzip(None))
+        .with_statistics(true)
+        .finish(frame.align_chunks())?;
+
+    writer
+        .persist_noclobber(output_path)
+        .map_err(|e| e.error)
+        .context("couldn't commit output file")?;
+
+    Ok(())
+}
+
+fn for_each(reader: impl Read, mut f: impl FnMut(&str) -> Result<()>) -> Result<()> {
+    let mut reader = BufReader::new(reader);
+    let mut group = String::new();
+    loop {
+        let prev_len = group.len();
+
+        if prev_len > 1024 * 1024 {
+            bail!("excessively large segment");
+        }
+
+        reader.read_line(&mut group)?;
+        let (prev, line) = group.split_at(prev_len);
+
+        // EOF
+        if line.is_empty() {
+            break;
+        }
+
+        // skip empty line
+        if line == "\n" {
+            group.pop().unwrap();
+            continue;
+        }
+
+        if !prev.is_empty() && line.starts_with("StorePath:") {
+            f(prev)?;
+            group.drain(..prev_len);
+        }
+    }
+
+    if !group.is_empty() {
+        f(&group)?;
+    }
+
+    Ok(())
+}
+
+/// [FrameBuilder] builds a [DataFrame] out of [NarInfo]s.
+/// The exact format is still in flux.
+///
+/// # Example
+///
+/// ```no_run
+/// |narinfos: &[NarInfo]| -> DataFrame {
+///     let frame_builder = FrameBuilder::default();
+///     narinfos.for_each(|n| frame_builder.push(n));
+///     frame_builder.finish()
+/// }
+/// ```
+struct FrameBuilder {
+    store_path_hash_str: Utf8ChunkedBuilder,
+    store_path_hash: BinaryChunkedBuilder,
+    store_path_name: Utf8ChunkedBuilder,
+    nar_hash: BinaryChunkedBuilder,
+    nar_size: PrimitiveChunkedBuilder<UInt64Type>,
+    references: ListBinaryChunkedBuilder,
+    ca_algo: CategoricalChunkedBuilder<'static>,
+    ca_hash: BinaryChunkedBuilder,
+    signature: BinaryChunkedBuilder,
+    file_hash: BinaryChunkedBuilder,
+    file_size: PrimitiveChunkedBuilder<UInt64Type>,
+    compression: CategoricalChunkedBuilder<'static>,
+    quirk_references_out_of_order: BooleanChunkedBuilder,
+    quirk_nar_hash_hex: BooleanChunkedBuilder,
+}
+
+impl Default for FrameBuilder {
+    fn default() -> Self {
+        Self {
+            store_path_hash_str: Utf8ChunkedBuilder::new("store_path_hash_str", 0, 0),
+            store_path_hash: BinaryChunkedBuilder::new("store_path_hash", 0, 0),
+            store_path_name: Utf8ChunkedBuilder::new("store_path_name", 0, 0),
+            nar_hash: BinaryChunkedBuilder::new("nar_hash", 0, 0),
+            nar_size: PrimitiveChunkedBuilder::new("nar_size", 0),
+            references: ListBinaryChunkedBuilder::new("references", 0, 0),
+            signature: BinaryChunkedBuilder::new("signature", 0, 0),
+            ca_algo: CategoricalChunkedBuilder::new("ca_algo", 0),
+            ca_hash: BinaryChunkedBuilder::new("ca_hash", 0, 0),
+            file_hash: BinaryChunkedBuilder::new("file_hash", 0, 0),
+            file_size: PrimitiveChunkedBuilder::new("file_size", 0),
+            compression: CategoricalChunkedBuilder::new("compression", 0),
+            quirk_references_out_of_order: BooleanChunkedBuilder::new(
+                "quirk_references_out_of_order",
+                0,
+            ),
+            quirk_nar_hash_hex: BooleanChunkedBuilder::new("quirk_nar_hash_hex", 0),
+        }
+    }
+}
+
+impl FrameBuilder {
+    fn push(&mut self, entry: &NarInfo) {
+        self.store_path_hash_str
+            .append_value(nixbase32::encode(entry.store_path.digest()));
+
+        self.store_path_hash.append_value(entry.store_path.digest());
+        self.store_path_name.append_value(entry.store_path.name());
+
+        self.nar_hash.append_value(&entry.nar_hash);
+        self.nar_size.append_value(entry.nar_size);
+
+        self.references
+            .append_values_iter(entry.references.iter().map(|r| r.digest().as_slice()));
+
+        assert!(entry.signatures.len() <= 1);
+        self.signature
+            .append_option(entry.signatures.get(0).map(|sig| {
+                assert_eq!(sig.name(), "cache.nixos.org-1");
+                sig.bytes()
+            }));
+
+        if let Some(ca) = &entry.ca {
+            // decompose the CAHash into algo and hash parts
+            // TODO(edef): move this into CAHash
+            let (algo, hash) = match ca {
+                CAHash::Flat(h) => match h {
+                    NixHash::Md5(h) => ("fixed:md5", &h[..]),
+                    NixHash::Sha1(h) => ("fixed:sha1", &h[..]),
+                    NixHash::Sha256(h) => ("fixed:sha256", &h[..]),
+                    NixHash::Sha512(h) => ("fixed:sha512", &h[..]),
+                },
+                CAHash::Nar(h) => match h {
+                    NixHash::Md5(h) => ("fixed:r:md5", &h[..]),
+                    NixHash::Sha1(h) => ("fixed:r:sha1", &h[..]),
+                    NixHash::Sha256(h) => ("fixed:r:sha256", &h[..]),
+                    NixHash::Sha512(h) => ("fixed:r:sha512", &h[..]),
+                },
+                CAHash::Text(h) => ("text:sha256", &h[..]),
+            };
+
+            self.ca_algo.append_value(algo);
+            self.ca_hash.append_value(hash);
+        } else {
+            self.ca_algo.append_null();
+            self.ca_hash.append_null();
+        }
+
+        let file_hash = entry.file_hash.as_ref().unwrap();
+        let file_size = entry.file_size.unwrap();
+
+        self.file_hash.append_value(file_hash);
+        self.file_size.append_value(file_size);
+
+        let (compression, extension) = match entry.compression {
+            Some("bzip2") => ("bzip2", "bz2"),
+            Some("xz") => ("xz", "xz"),
+            Some("zstd") => ("zstd", "zst"),
+            x => panic!("unknown compression algorithm: {x:?}"),
+        };
+
+        self.compression.append_value(compression);
+
+        let mut file_name = nixbase32::encode(file_hash);
+        file_name.push_str(".nar.");
+        file_name.push_str(extension);
+
+        assert_eq!(entry.url.strip_prefix("nar/").unwrap(), file_name);
+
+        {
+            use narinfo::Flags;
+
+            self.quirk_references_out_of_order
+                .append_value(entry.flags.contains(Flags::REFERENCES_OUT_OF_ORDER));
+
+            self.quirk_nar_hash_hex
+                .append_value(entry.flags.contains(Flags::NAR_HASH_HEX));
+
+            let quirks = Flags::REFERENCES_OUT_OF_ORDER | Flags::NAR_HASH_HEX;
+            let unknown_flags = entry.flags.difference(quirks);
+
+            assert!(
+                unknown_flags.is_empty(),
+                "rejecting flags: {unknown_flags:?}"
+            );
+        }
+    }
+
+    fn finish(mut self) -> DataFrame {
+        df! {
+            "store_path_hash_str" => self.store_path_hash_str.finish().into_series(),
+            "store_path_hash" => self.store_path_hash.finish().into_series(),
+            "store_path_name" => self.store_path_name.finish().into_series(),
+            "nar_hash" => self.nar_hash.finish().into_series(),
+            "nar_size" => self.nar_size.finish().into_series(),
+            "references" => self.references.finish().into_series(),
+            "signature" => self.signature.finish().into_series(),
+            "ca_algo" => self.ca_algo.finish().into_series(),
+            "ca_hash" => self.ca_hash.finish().into_series(),
+            "file_hash" => self.file_hash.finish().into_series(),
+            "file_size" => self.file_size.finish().into_series(),
+            "compression" => self.compression.finish().into_series(),
+            "quirk_references_out_of_order" => self.quirk_references_out_of_order.finish().into_series(),
+            "quirk_nar_hash_hex" => self.quirk_nar_hash_hex.finish().into_series()
+        }
+        .unwrap()
+    }
+}