about summary refs log blame commit diff
path: root/users/edef/fetchroots/src/main.rs
blob: 842b719c2ac993cdee3e99fa290594dd91798203 (plain) (tree)
































































































































































































































































                                                                                              
//! Fetch all[^1] GC roots from releases.nixos.org into a `roots.parquet` file.
//!
//! The resulting Parquet has three columns:
//!
//!  * `key` (`String`): the release, eg `nixos/22.11-small/nixos-22.11.513.563dc6476b8`
//!  * `timestamp` (`DateTime`): the timestamp of the GC roots file for this release
//!  * `store_path_hash` (`List[Binary]`): hash part of the store paths rooted by this release
//!
//! [^1]: some roots are truly ancient, and aren't compatible with Nix 1.x

use anyhow::Result;
use std::{
    collections::BTreeMap,
    fs::File,
    io::{BufRead, Read},
    sync::Arc,
    time::SystemTime,
};

use aws_config::Region;
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
use bytes::{Buf, Bytes};
use bytes_utils::SegmentedBuf;
use chrono::{DateTime, Utc};
use nix_compat::nixbase32;
use polars::prelude::*;
use tokio::{
    sync::Semaphore,
    task::{block_in_place, JoinSet},
};

#[derive(Debug)]
struct Meta {
    format: Format,
    e_tag: String,
    last_modified: DateTime<Utc>,
}

#[tokio::main]
async fn main() {
    let sdk_config = aws_config::load_defaults(aws_config::BehaviorVersion::v2023_11_09())
        .await
        .into_builder()
        .region(Region::from_static("eu-west-1"))
        .build();

    let s3 = aws_sdk_s3::Client::new(&sdk_config);

    let mut keys: BTreeMap<String, Meta> = {
        let pages = s3
            .list_objects_v2()
            .bucket("nix-releases")
            .into_paginator()
            .send()
            .try_collect()
            .await
            .unwrap();

        let objects = pages.into_iter().flat_map(|page| {
            assert_eq!(page.prefix().unwrap_or_default(), "");
            assert!(page.common_prefixes.is_none());
            page.contents.unwrap_or_default()
        });

        let mut prev_key = String::new();
        objects
            .filter_map(|obj| {
                let key = obj.key().unwrap();

                assert!(&*prev_key < key);
                key.clone_into(&mut prev_key);

                let (key, tail) = key.rsplit_once('/')?;
                // Our preference order happens to match lexicographical order,
                // and listings are returned in lexicographical order.
                let format = match tail {
                    "MANIFEST" => Format::Manifest,
                    "MANIFEST.bz2" => Format::ManifestBz,
                    "store-paths.xz" => Format::StorePathsXz,
                    _ => return None,
                };

                Some((
                    key.to_owned(),
                    Meta {
                        format,
                        e_tag: obj.e_tag.unwrap(),
                        last_modified: SystemTime::try_from(obj.last_modified.unwrap())
                            .unwrap()
                            .into(),
                    },
                ))
            })
            .collect()
    };

    // These releases are so old they don't even use nixbase32 store paths.
    for key in [
        "nix/nix-0.6",
        "nix/nix-0.6.1",
        "nix/nix-0.7",
        "nix/nix-0.8",
        "nixpkgs/nixpkgs-0.5",
        "nixpkgs/nixpkgs-0.5.1",
        "nixpkgs/nixpkgs-0.6",
        "nixpkgs/nixpkgs-0.7",
        "nixpkgs/nixpkgs-0.8",
        "nixpkgs/nixpkgs-0.9",
        "nixpkgs/nixpkgs-0.10",
        "nixpkgs/nixpkgs-0.11",
    ] {
        assert!(keys.remove(key).is_some());
    }

    let mut js = JoinSet::new();
    let sem = Arc::new(Semaphore::new(16));

    let bar = indicatif::ProgressBar::new(keys.len() as u64);
    for (root, meta) in keys {
        let sem = sem.clone();
        let s3 = s3.clone();

        js.spawn(async move {
            let _permit = sem.acquire().await.unwrap();

            let body = get_object(
                s3.get_object()
                    .bucket("nix-releases")
                    .key(format!("{root}/{}", meta.format.as_str()))
                    .if_match(meta.e_tag),
            )
            .await
            .unwrap()
            .reader();

            let ph_array = block_in_place(|| meta.format.to_ph_array(body).rechunk());
            df! {
                "key" => [root],
                "timestamp" => [meta.last_modified.naive_utc()],
                "store_path_hash" => ph_array.into_series().implode().unwrap()
            }
            .unwrap()
        });
    }

    let mut writer = ParquetWriter::new(File::create("roots.parquet").unwrap())
        .batched(&Schema::from_iter([
            Field::new("key", DataType::String),
            Field::new(
                "timestamp",
                DataType::Datetime(TimeUnit::Milliseconds, None),
            ),
            Field::new(
                "store_path_hash",
                DataType::List(Box::new(DataType::Binary)),
            ),
        ]))
        .unwrap();

    while let Some(df) = js.join_next().await.transpose().unwrap() {
        block_in_place(|| writer.write_batch(&df)).unwrap();
        bar.inc(1);
    }

    writer.finish().unwrap();
}

#[derive(Debug)]
enum Format {
    Manifest,
    ManifestBz,
    StorePathsXz,
}

impl Format {
    fn as_str(&self) -> &'static str {
        match self {
            Format::Manifest => "MANIFEST",
            Format::ManifestBz => "MANIFEST.bz2",
            Format::StorePathsXz => "store-paths.xz",
        }
    }

    fn to_ph_array(&self, mut body: impl BufRead) -> BinaryChunked {
        match self {
            Format::Manifest | Format::ManifestBz => {
                let mut buf = String::new();
                match self {
                    Format::Manifest => {
                        body.read_to_string(&mut buf).unwrap();
                    }
                    Format::ManifestBz => {
                        bzip2::bufread::BzDecoder::new(body)
                            .read_to_string(&mut buf)
                            .unwrap();
                    }
                    _ => unreachable!(),
                }

                let buf = buf
                    .strip_prefix("version {\n  ManifestVersion: 3\n}\n")
                    .unwrap();

                BinaryChunked::from_iter_values(
                    "store_path_hash",
                    buf.split_terminator("}\n").map(|chunk| -> [u8; 20] {
                        let chunk = chunk.strip_prefix("patch ").unwrap_or(chunk);
                        let line = chunk.strip_prefix("{\n  StorePath: /nix/store/").unwrap();
                        nixbase32::decode_fixed(&line[..32]).unwrap()
                    }),
                )
            }
            Format::StorePathsXz => {
                let mut buf = String::new();
                xz2::bufread::XzDecoder::new(body)
                    .read_to_string(&mut buf)
                    .unwrap();

                BinaryChunked::from_iter_values(
                    "store_path_hash",
                    buf.split_terminator('\n').map(|line| -> [u8; 20] {
                        let line = line.strip_prefix("/nix/store/").unwrap();
                        nixbase32::decode_fixed(&line[..32]).unwrap()
                    }),
                )
            }
        }
    }
}

async fn get_object(request: GetObjectFluentBuilder) -> Result<SegmentedBuf<Bytes>> {
    // if we don't constrain the ETag, we might experience read skew
    assert!(request.get_if_match().is_some(), "if_match must be set");

    let mut buf: SegmentedBuf<Bytes> = SegmentedBuf::new();
    let mut resp = request.clone().send().await?;
    let content_length: usize = resp.content_length.unwrap().try_into().unwrap();

    loop {
        while let Ok(Some(chunk)) = resp.body.try_next().await {
            buf.push(chunk);
        }

        if buf.remaining() >= content_length {
            assert_eq!(buf.remaining(), content_length, "got excess bytes");
            break Ok(buf);
        }

        resp = request
            .clone()
            .range(format!("bytes={}-", buf.remaining()))
            .send()
            .await?;

        assert_ne!(resp.content_range, None);
    }
}