about summary refs log tree commit diff
path: root/users/edef/fetchroots/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'users/edef/fetchroots/src/main.rs')
-rw-r--r--users/edef/fetchroots/src/main.rs257
1 files changed, 257 insertions, 0 deletions
diff --git a/users/edef/fetchroots/src/main.rs b/users/edef/fetchroots/src/main.rs
new file mode 100644
index 000000000000..842b719c2ac9
--- /dev/null
+++ b/users/edef/fetchroots/src/main.rs
@@ -0,0 +1,257 @@
+//! Fetch all[^1] GC roots from releases.nixos.org into a `roots.parquet` file.
+//!
+//! The resulting Parquet has three columns:
+//!
+//!  * `key` (`String`): the release, eg `nixos/22.11-small/nixos-22.11.513.563dc6476b8`
+//!  * `timestamp` (`DateTime`): the timestamp of the GC roots file for this release
+//!  * `store_path_hash` (`List[Binary]`): hash part of the store paths rooted by this release
+//!
+//! [^1]: some roots are truly ancient, and aren't compatible with Nix 1.x
+
+use anyhow::Result;
+use std::{
+    collections::BTreeMap,
+    fs::File,
+    io::{BufRead, Read},
+    sync::Arc,
+    time::SystemTime,
+};
+
+use aws_config::Region;
+use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
+use bytes::{Buf, Bytes};
+use bytes_utils::SegmentedBuf;
+use chrono::{DateTime, Utc};
+use nix_compat::nixbase32;
+use polars::prelude::*;
+use tokio::{
+    sync::Semaphore,
+    task::{block_in_place, JoinSet},
+};
+
+#[derive(Debug)]
+struct Meta {
+    format: Format,
+    e_tag: String,
+    last_modified: DateTime<Utc>,
+}
+
+#[tokio::main]
+async fn main() {
+    let sdk_config = aws_config::load_defaults(aws_config::BehaviorVersion::v2023_11_09())
+        .await
+        .into_builder()
+        .region(Region::from_static("eu-west-1"))
+        .build();
+
+    let s3 = aws_sdk_s3::Client::new(&sdk_config);
+
+    let mut keys: BTreeMap<String, Meta> = {
+        let pages = s3
+            .list_objects_v2()
+            .bucket("nix-releases")
+            .into_paginator()
+            .send()
+            .try_collect()
+            .await
+            .unwrap();
+
+        let objects = pages.into_iter().flat_map(|page| {
+            assert_eq!(page.prefix().unwrap_or_default(), "");
+            assert!(page.common_prefixes.is_none());
+            page.contents.unwrap_or_default()
+        });
+
+        let mut prev_key = String::new();
+        objects
+            .filter_map(|obj| {
+                let key = obj.key().unwrap();
+
+                assert!(&*prev_key < key);
+                key.clone_into(&mut prev_key);
+
+                let (key, tail) = key.rsplit_once('/')?;
+                // Our preference order happens to match lexicographical order,
+                // and listings are returned in lexicographical order.
+                let format = match tail {
+                    "MANIFEST" => Format::Manifest,
+                    "MANIFEST.bz2" => Format::ManifestBz,
+                    "store-paths.xz" => Format::StorePathsXz,
+                    _ => return None,
+                };
+
+                Some((
+                    key.to_owned(),
+                    Meta {
+                        format,
+                        e_tag: obj.e_tag.unwrap(),
+                        last_modified: SystemTime::try_from(obj.last_modified.unwrap())
+                            .unwrap()
+                            .into(),
+                    },
+                ))
+            })
+            .collect()
+    };
+
+    // These releases are so old they don't even use nixbase32 store paths.
+    for key in [
+        "nix/nix-0.6",
+        "nix/nix-0.6.1",
+        "nix/nix-0.7",
+        "nix/nix-0.8",
+        "nixpkgs/nixpkgs-0.5",
+        "nixpkgs/nixpkgs-0.5.1",
+        "nixpkgs/nixpkgs-0.6",
+        "nixpkgs/nixpkgs-0.7",
+        "nixpkgs/nixpkgs-0.8",
+        "nixpkgs/nixpkgs-0.9",
+        "nixpkgs/nixpkgs-0.10",
+        "nixpkgs/nixpkgs-0.11",
+    ] {
+        assert!(keys.remove(key).is_some());
+    }
+
+    let mut js = JoinSet::new();
+    let sem = Arc::new(Semaphore::new(16));
+
+    let bar = indicatif::ProgressBar::new(keys.len() as u64);
+    for (root, meta) in keys {
+        let sem = sem.clone();
+        let s3 = s3.clone();
+
+        js.spawn(async move {
+            let _permit = sem.acquire().await.unwrap();
+
+            let body = get_object(
+                s3.get_object()
+                    .bucket("nix-releases")
+                    .key(format!("{root}/{}", meta.format.as_str()))
+                    .if_match(meta.e_tag),
+            )
+            .await
+            .unwrap()
+            .reader();
+
+            let ph_array = block_in_place(|| meta.format.to_ph_array(body).rechunk());
+            df! {
+                "key" => [root],
+                "timestamp" => [meta.last_modified.naive_utc()],
+                "store_path_hash" => ph_array.into_series().implode().unwrap()
+            }
+            .unwrap()
+        });
+    }
+
+    let mut writer = ParquetWriter::new(File::create("roots.parquet").unwrap())
+        .batched(&Schema::from_iter([
+            Field::new("key", DataType::String),
+            Field::new(
+                "timestamp",
+                DataType::Datetime(TimeUnit::Milliseconds, None),
+            ),
+            Field::new(
+                "store_path_hash",
+                DataType::List(Box::new(DataType::Binary)),
+            ),
+        ]))
+        .unwrap();
+
+    while let Some(df) = js.join_next().await.transpose().unwrap() {
+        block_in_place(|| writer.write_batch(&df)).unwrap();
+        bar.inc(1);
+    }
+
+    writer.finish().unwrap();
+}
+
+#[derive(Debug)]
+enum Format {
+    Manifest,
+    ManifestBz,
+    StorePathsXz,
+}
+
+impl Format {
+    fn as_str(&self) -> &'static str {
+        match self {
+            Format::Manifest => "MANIFEST",
+            Format::ManifestBz => "MANIFEST.bz2",
+            Format::StorePathsXz => "store-paths.xz",
+        }
+    }
+
+    fn to_ph_array(&self, mut body: impl BufRead) -> BinaryChunked {
+        match self {
+            Format::Manifest | Format::ManifestBz => {
+                let mut buf = String::new();
+                match self {
+                    Format::Manifest => {
+                        body.read_to_string(&mut buf).unwrap();
+                    }
+                    Format::ManifestBz => {
+                        bzip2::bufread::BzDecoder::new(body)
+                            .read_to_string(&mut buf)
+                            .unwrap();
+                    }
+                    _ => unreachable!(),
+                }
+
+                let buf = buf
+                    .strip_prefix("version {\n  ManifestVersion: 3\n}\n")
+                    .unwrap();
+
+                BinaryChunked::from_iter_values(
+                    "store_path_hash",
+                    buf.split_terminator("}\n").map(|chunk| -> [u8; 20] {
+                        let chunk = chunk.strip_prefix("patch ").unwrap_or(chunk);
+                        let line = chunk.strip_prefix("{\n  StorePath: /nix/store/").unwrap();
+                        nixbase32::decode_fixed(&line[..32]).unwrap()
+                    }),
+                )
+            }
+            Format::StorePathsXz => {
+                let mut buf = String::new();
+                xz2::bufread::XzDecoder::new(body)
+                    .read_to_string(&mut buf)
+                    .unwrap();
+
+                BinaryChunked::from_iter_values(
+                    "store_path_hash",
+                    buf.split_terminator('\n').map(|line| -> [u8; 20] {
+                        let line = line.strip_prefix("/nix/store/").unwrap();
+                        nixbase32::decode_fixed(&line[..32]).unwrap()
+                    }),
+                )
+            }
+        }
+    }
+}
+
+async fn get_object(request: GetObjectFluentBuilder) -> Result<SegmentedBuf<Bytes>> {
+    // if we don't constrain the ETag, we might experience read skew
+    assert!(request.get_if_match().is_some(), "if_match must be set");
+
+    let mut buf: SegmentedBuf<Bytes> = SegmentedBuf::new();
+    let mut resp = request.clone().send().await?;
+    let content_length: usize = resp.content_length.unwrap().try_into().unwrap();
+
+    loop {
+        while let Ok(Some(chunk)) = resp.body.try_next().await {
+            buf.push(chunk);
+        }
+
+        if buf.remaining() >= content_length {
+            assert_eq!(buf.remaining(), content_length, "got excess bytes");
+            break Ok(buf);
+        }
+
+        resp = request
+            .clone()
+            .range(format!("bytes={}-", buf.remaining()))
+            .send()
+            .await?;
+
+        assert_ne!(resp.content_range, None);
+    }
+}