//! Fetch all[^1] GC roots from releases.nixos.org into a `roots.parquet` file. //! //! The resulting Parquet has three columns: //! //! * `key` (`String`): the release, eg `nixos/22.11-small/nixos-22.11.513.563dc6476b8` //! * `timestamp` (`DateTime`): the timestamp of the GC roots file for this release //! * `store_path_hash` (`List[Binary]`): hash part of the store paths rooted by this release //! //! [^1]: some roots are truly ancient, and aren't compatible with Nix 1.x use anyhow::Result; use std::{ collections::BTreeMap, fs::File, io::{BufRead, Read}, sync::Arc, time::SystemTime, }; use aws_config::Region; use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; use bytes::{Buf, Bytes}; use bytes_utils::SegmentedBuf; use chrono::{DateTime, Utc}; use nix_compat::nixbase32; use polars::prelude::*; use tokio::{ sync::Semaphore, task::{block_in_place, JoinSet}, }; #[derive(Debug)] struct Meta { format: Format, e_tag: String, last_modified: DateTime<Utc>, } #[tokio::main] async fn main() { let sdk_config = aws_config::load_defaults(aws_config::BehaviorVersion::v2023_11_09()) .await .into_builder() .region(Region::from_static("eu-west-1")) .build(); let s3 = aws_sdk_s3::Client::new(&sdk_config); let mut keys: BTreeMap<String, Meta> = { let pages = s3 .list_objects_v2() .bucket("nix-releases") .into_paginator() .send() .try_collect() .await .unwrap(); let objects = pages.into_iter().flat_map(|page| { assert_eq!(page.prefix().unwrap_or_default(), ""); assert!(page.common_prefixes.is_none()); page.contents.unwrap_or_default() }); let mut prev_key = String::new(); objects .filter_map(|obj| { let key = obj.key().unwrap(); assert!(&*prev_key < key); key.clone_into(&mut prev_key); let (key, tail) = key.rsplit_once('/')?; // Our preference order happens to match lexicographical order, // and listings are returned in lexicographical order. let format = match tail { "MANIFEST" => Format::Manifest, "MANIFEST.bz2" => Format::ManifestBz, "store-paths.xz" => Format::StorePathsXz, _ => return None, }; Some(( key.to_owned(), Meta { format, e_tag: obj.e_tag.unwrap(), last_modified: SystemTime::try_from(obj.last_modified.unwrap()) .unwrap() .into(), }, )) }) .collect() }; // These releases are so old they don't even use nixbase32 store paths. for key in [ "nix/nix-0.6", "nix/nix-0.6.1", "nix/nix-0.7", "nix/nix-0.8", "nixpkgs/nixpkgs-0.5", "nixpkgs/nixpkgs-0.5.1", "nixpkgs/nixpkgs-0.6", "nixpkgs/nixpkgs-0.7", "nixpkgs/nixpkgs-0.8", "nixpkgs/nixpkgs-0.9", "nixpkgs/nixpkgs-0.10", "nixpkgs/nixpkgs-0.11", ] { assert!(keys.remove(key).is_some()); } let mut js = JoinSet::new(); let sem = Arc::new(Semaphore::new(16)); let bar = indicatif::ProgressBar::new(keys.len() as u64); for (root, meta) in keys { let sem = sem.clone(); let s3 = s3.clone(); js.spawn(async move { let _permit = sem.acquire().await.unwrap(); let body = get_object( s3.get_object() .bucket("nix-releases") .key(format!("{root}/{}", meta.format.as_str())) .if_match(meta.e_tag), ) .await .unwrap() .reader(); let ph_array = block_in_place(|| meta.format.to_ph_array(body).rechunk()); df! { "key" => [root], "timestamp" => [meta.last_modified.naive_utc()], "store_path_hash" => ph_array.into_series().implode().unwrap() } .unwrap() }); } let mut writer = ParquetWriter::new(File::create("roots.parquet").unwrap()) .batched(&Schema::from_iter([ Field::new("key", DataType::String), Field::new( "timestamp", DataType::Datetime(TimeUnit::Milliseconds, None), ), Field::new( "store_path_hash", DataType::List(Box::new(DataType::Binary)), ), ])) .unwrap(); while let Some(df) = js.join_next().await.transpose().unwrap() { block_in_place(|| writer.write_batch(&df)).unwrap(); bar.inc(1); } writer.finish().unwrap(); } #[derive(Debug)] enum Format { Manifest, ManifestBz, StorePathsXz, } impl Format { fn as_str(&self) -> &'static str { match self { Format::Manifest => "MANIFEST", Format::ManifestBz => "MANIFEST.bz2", Format::StorePathsXz => "store-paths.xz", } } fn to_ph_array(&self, mut body: impl BufRead) -> BinaryChunked { match self { Format::Manifest | Format::ManifestBz => { let mut buf = String::new(); match self { Format::Manifest => { body.read_to_string(&mut buf).unwrap(); } Format::ManifestBz => { bzip2::bufread::BzDecoder::new(body) .read_to_string(&mut buf) .unwrap(); } _ => unreachable!(), } let buf = buf .strip_prefix("version {\n ManifestVersion: 3\n}\n") .unwrap(); BinaryChunked::from_iter_values( "store_path_hash", buf.split_terminator("}\n").map(|chunk| -> [u8; 20] { let chunk = chunk.strip_prefix("patch ").unwrap_or(chunk); let line = chunk.strip_prefix("{\n StorePath: /nix/store/").unwrap(); nixbase32::decode_fixed(&line[..32]).unwrap() }), ) } Format::StorePathsXz => { let mut buf = String::new(); xz2::bufread::XzDecoder::new(body) .read_to_string(&mut buf) .unwrap(); BinaryChunked::from_iter_values( "store_path_hash", buf.split_terminator('\n').map(|line| -> [u8; 20] { let line = line.strip_prefix("/nix/store/").unwrap(); nixbase32::decode_fixed(&line[..32]).unwrap() }), ) } } } } async fn get_object(request: GetObjectFluentBuilder) -> Result<SegmentedBuf<Bytes>> { // if we don't constrain the ETag, we might experience read skew assert!(request.get_if_match().is_some(), "if_match must be set"); let mut buf: SegmentedBuf<Bytes> = SegmentedBuf::new(); let mut resp = request.clone().send().await?; let content_length: usize = resp.content_length.unwrap().try_into().unwrap(); loop { while let Ok(Some(chunk)) = resp.body.try_next().await { buf.push(chunk); } if buf.remaining() >= content_length { assert_eq!(buf.remaining(), content_length, "got excess bytes"); break Ok(buf); } resp = request .clone() .range(format!("bytes={}-", buf.remaining())) .send() .await?; assert_ne!(resp.content_range, None); } }