diff options
Diffstat (limited to 'users/edef/fetchroots/src/main.rs')
-rw-r--r-- | users/edef/fetchroots/src/main.rs | 257 |
1 files changed, 257 insertions, 0 deletions
diff --git a/users/edef/fetchroots/src/main.rs b/users/edef/fetchroots/src/main.rs new file mode 100644 index 000000000000..842b719c2ac9 --- /dev/null +++ b/users/edef/fetchroots/src/main.rs @@ -0,0 +1,257 @@ +//! Fetch all[^1] GC roots from releases.nixos.org into a `roots.parquet` file. +//! +//! The resulting Parquet has three columns: +//! +//! * `key` (`String`): the release, eg `nixos/22.11-small/nixos-22.11.513.563dc6476b8` +//! * `timestamp` (`DateTime`): the timestamp of the GC roots file for this release +//! * `store_path_hash` (`List[Binary]`): hash part of the store paths rooted by this release +//! +//! [^1]: some roots are truly ancient, and aren't compatible with Nix 1.x + +use anyhow::Result; +use std::{ + collections::BTreeMap, + fs::File, + io::{BufRead, Read}, + sync::Arc, + time::SystemTime, +}; + +use aws_config::Region; +use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; +use bytes::{Buf, Bytes}; +use bytes_utils::SegmentedBuf; +use chrono::{DateTime, Utc}; +use nix_compat::nixbase32; +use polars::prelude::*; +use tokio::{ + sync::Semaphore, + task::{block_in_place, JoinSet}, +}; + +#[derive(Debug)] +struct Meta { + format: Format, + e_tag: String, + last_modified: DateTime<Utc>, +} + +#[tokio::main] +async fn main() { + let sdk_config = aws_config::load_defaults(aws_config::BehaviorVersion::v2023_11_09()) + .await + .into_builder() + .region(Region::from_static("eu-west-1")) + .build(); + + let s3 = aws_sdk_s3::Client::new(&sdk_config); + + let mut keys: BTreeMap<String, Meta> = { + let pages = s3 + .list_objects_v2() + .bucket("nix-releases") + .into_paginator() + .send() + .try_collect() + .await + .unwrap(); + + let objects = pages.into_iter().flat_map(|page| { + assert_eq!(page.prefix().unwrap_or_default(), ""); + assert!(page.common_prefixes.is_none()); + page.contents.unwrap_or_default() + }); + + let mut prev_key = String::new(); + objects + .filter_map(|obj| { + let key = obj.key().unwrap(); + + assert!(&*prev_key < key); + key.clone_into(&mut prev_key); + + let (key, tail) = key.rsplit_once('/')?; + // Our preference order happens to match lexicographical order, + // and listings are returned in lexicographical order. + let format = match tail { + "MANIFEST" => Format::Manifest, + "MANIFEST.bz2" => Format::ManifestBz, + "store-paths.xz" => Format::StorePathsXz, + _ => return None, + }; + + Some(( + key.to_owned(), + Meta { + format, + e_tag: obj.e_tag.unwrap(), + last_modified: SystemTime::try_from(obj.last_modified.unwrap()) + .unwrap() + .into(), + }, + )) + }) + .collect() + }; + + // These releases are so old they don't even use nixbase32 store paths. + for key in [ + "nix/nix-0.6", + "nix/nix-0.6.1", + "nix/nix-0.7", + "nix/nix-0.8", + "nixpkgs/nixpkgs-0.5", + "nixpkgs/nixpkgs-0.5.1", + "nixpkgs/nixpkgs-0.6", + "nixpkgs/nixpkgs-0.7", + "nixpkgs/nixpkgs-0.8", + "nixpkgs/nixpkgs-0.9", + "nixpkgs/nixpkgs-0.10", + "nixpkgs/nixpkgs-0.11", + ] { + assert!(keys.remove(key).is_some()); + } + + let mut js = JoinSet::new(); + let sem = Arc::new(Semaphore::new(16)); + + let bar = indicatif::ProgressBar::new(keys.len() as u64); + for (root, meta) in keys { + let sem = sem.clone(); + let s3 = s3.clone(); + + js.spawn(async move { + let _permit = sem.acquire().await.unwrap(); + + let body = get_object( + s3.get_object() + .bucket("nix-releases") + .key(format!("{root}/{}", meta.format.as_str())) + .if_match(meta.e_tag), + ) + .await + .unwrap() + .reader(); + + let ph_array = block_in_place(|| meta.format.to_ph_array(body).rechunk()); + df! { + "key" => [root], + "timestamp" => [meta.last_modified.naive_utc()], + "store_path_hash" => ph_array.into_series().implode().unwrap() + } + .unwrap() + }); + } + + let mut writer = ParquetWriter::new(File::create("roots.parquet").unwrap()) + .batched(&Schema::from_iter([ + Field::new("key", DataType::String), + Field::new( + "timestamp", + DataType::Datetime(TimeUnit::Milliseconds, None), + ), + Field::new( + "store_path_hash", + DataType::List(Box::new(DataType::Binary)), + ), + ])) + .unwrap(); + + while let Some(df) = js.join_next().await.transpose().unwrap() { + block_in_place(|| writer.write_batch(&df)).unwrap(); + bar.inc(1); + } + + writer.finish().unwrap(); +} + +#[derive(Debug)] +enum Format { + Manifest, + ManifestBz, + StorePathsXz, +} + +impl Format { + fn as_str(&self) -> &'static str { + match self { + Format::Manifest => "MANIFEST", + Format::ManifestBz => "MANIFEST.bz2", + Format::StorePathsXz => "store-paths.xz", + } + } + + fn to_ph_array(&self, mut body: impl BufRead) -> BinaryChunked { + match self { + Format::Manifest | Format::ManifestBz => { + let mut buf = String::new(); + match self { + Format::Manifest => { + body.read_to_string(&mut buf).unwrap(); + } + Format::ManifestBz => { + bzip2::bufread::BzDecoder::new(body) + .read_to_string(&mut buf) + .unwrap(); + } + _ => unreachable!(), + } + + let buf = buf + .strip_prefix("version {\n ManifestVersion: 3\n}\n") + .unwrap(); + + BinaryChunked::from_iter_values( + "store_path_hash", + buf.split_terminator("}\n").map(|chunk| -> [u8; 20] { + let chunk = chunk.strip_prefix("patch ").unwrap_or(chunk); + let line = chunk.strip_prefix("{\n StorePath: /nix/store/").unwrap(); + nixbase32::decode_fixed(&line[..32]).unwrap() + }), + ) + } + Format::StorePathsXz => { + let mut buf = String::new(); + xz2::bufread::XzDecoder::new(body) + .read_to_string(&mut buf) + .unwrap(); + + BinaryChunked::from_iter_values( + "store_path_hash", + buf.split_terminator('\n').map(|line| -> [u8; 20] { + let line = line.strip_prefix("/nix/store/").unwrap(); + nixbase32::decode_fixed(&line[..32]).unwrap() + }), + ) + } + } + } +} + +async fn get_object(request: GetObjectFluentBuilder) -> Result<SegmentedBuf<Bytes>> { + // if we don't constrain the ETag, we might experience read skew + assert!(request.get_if_match().is_some(), "if_match must be set"); + + let mut buf: SegmentedBuf<Bytes> = SegmentedBuf::new(); + let mut resp = request.clone().send().await?; + let content_length: usize = resp.content_length.unwrap().try_into().unwrap(); + + loop { + while let Ok(Some(chunk)) = resp.body.try_next().await { + buf.push(chunk); + } + + if buf.remaining() >= content_length { + assert_eq!(buf.remaining(), content_length, "got excess bytes"); + break Ok(buf); + } + + resp = request + .clone() + .range(format!("bytes={}-", buf.remaining())) + .send() + .await?; + + assert_ne!(resp.content_range, None); + } +} |