diff options
author | edef <edef@edef.eu> | 2024-01-17T16·04+0000 |
---|---|---|
committer | edef <edef@edef.eu> | 2024-01-27T18·23+0000 |
commit | 4f22203a3aecd070881ae9b4eabc47532d948f01 (patch) | |
tree | 39e493d8d4fcf6362dd4e25be8bcb64fe0b0e25e /tvix/tools/crunch-v2/src/remote.rs | |
parent | e0a1c03b2471271fc96690b3dc5dd5423f93fa42 (diff) |
feat(tvix/tools/crunch-v2): init r/7452
This is a tool for ingesting subsets of cache.nixos.org into its own flattened castore format. Currently, produced chunks are not preserved, and this purely serves as a way of measuring compression/deduplication ratios for various chunking and compression parameters. Change-Id: I3983af02a66f7837d76874ee0fc8b2fab62ac17e Reviewed-on: https://cl.tvl.fyi/c/depot/+/10486 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/tools/crunch-v2/src/remote.rs')
-rw-r--r-- | tvix/tools/crunch-v2/src/remote.rs | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/tvix/tools/crunch-v2/src/remote.rs b/tvix/tools/crunch-v2/src/remote.rs new file mode 100644 index 000000000000..93952ecd737f --- /dev/null +++ b/tvix/tools/crunch-v2/src/remote.rs @@ -0,0 +1,211 @@ +use std::{ + cmp, + io::{self, BufRead, BufReader, Read}, + pin::Pin, + task::{self, Poll}, +}; + +use anyhow::{bail, Result}; +use bytes::{Buf, Bytes}; +use futures::{future::BoxFuture, Future, FutureExt, Stream, StreamExt}; +use lazy_static::lazy_static; +use tokio::runtime::Handle; + +use nix_compat::nixbase32; + +use rusoto_core::{ByteStream, Region}; +use rusoto_s3::{GetObjectOutput, GetObjectRequest, S3Client, S3}; + +use bzip2::read::BzDecoder; +use xz2::read::XzDecoder; + +lazy_static! { + static ref S3_CLIENT: S3Client = S3Client::new(Region::UsEast1); +} + +const BUCKET: &str = "nix-cache"; + +pub async fn nar( + file_hash: [u8; 32], + compression: &str, +) -> Result<Box<BufReader<dyn Read + Send>>> { + let (extension, decompress): (&'static str, fn(_) -> Box<_>) = match compression { + "bzip2" => ("bz2", decompress_bz2), + "xz" => ("xz", decompress_xz), + _ => bail!("unknown compression: {compression}"), + }; + + Ok(decompress( + FileStream::new(FileKey { + file_hash, + extension, + }) + .await? + .into(), + )) +} + +fn decompress_xz(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> { + Box::new(BufReader::new(XzDecoder::new(reader))) +} + +fn decompress_bz2(reader: FileStreamReader) -> Box<BufReader<dyn Read + Send>> { + Box::new(BufReader::new(BzDecoder::new(reader))) +} + +struct FileStreamReader { + inner: FileStream, + buffer: Bytes, +} + +impl From<FileStream> for FileStreamReader { + fn from(value: FileStream) -> Self { + FileStreamReader { + inner: value, + buffer: Bytes::new(), + } + } +} + +impl Read for FileStreamReader { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + let src = self.fill_buf()?; + let n = cmp::min(src.len(), dst.len()); + dst[..n].copy_from_slice(&src[..n]); + self.consume(n); + Ok(n) + } +} + +impl BufRead for FileStreamReader { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + if !self.buffer.is_empty() { + return Ok(&self.buffer); + } + + self.buffer = Handle::current() + .block_on(self.inner.next()) + .transpose()? + .unwrap_or_default(); + + Ok(&self.buffer) + } + + fn consume(&mut self, cnt: usize) { + self.buffer.advance(cnt); + } +} + +struct FileKey { + file_hash: [u8; 32], + extension: &'static str, +} + +impl FileKey { + fn get( + &self, + offset: u64, + e_tag: Option<&str>, + ) -> impl Future<Output = io::Result<GetObjectOutput>> + Send + 'static { + let input = GetObjectRequest { + bucket: BUCKET.to_string(), + key: format!( + "nar/{}.nar.{}", + nixbase32::encode(&self.file_hash), + self.extension + ), + if_match: e_tag.map(str::to_owned), + range: Some(format!("bytes {}-", offset + 1)), + ..Default::default() + }; + + async { + S3_CLIENT + .get_object(input) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + } + } +} + +struct FileStream { + key: FileKey, + e_tag: String, + offset: u64, + length: u64, + inner: FileStreamState, +} + +enum FileStreamState { + Response(BoxFuture<'static, io::Result<GetObjectOutput>>), + Body(ByteStream), + Eof, +} + +impl FileStream { + pub async fn new(key: FileKey) -> io::Result<Self> { + let resp = key.get(0, None).await?; + + Ok(FileStream { + key, + e_tag: resp.e_tag.unwrap(), + offset: 0, + length: resp.content_length.unwrap().try_into().unwrap(), + inner: FileStreamState::Body(resp.body.unwrap()), + }) + } +} + +macro_rules! poll { + ($expr:expr) => { + match $expr { + Poll::Pending => { + return Poll::Pending; + } + Poll::Ready(value) => value, + } + }; +} + +impl Stream for FileStream { + type Item = io::Result<Bytes>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> { + let this = self.get_mut(); + + let chunk = loop { + match &mut this.inner { + FileStreamState::Response(resp) => match poll!(resp.poll_unpin(cx)) { + Err(err) => { + this.inner = FileStreamState::Eof; + return Poll::Ready(Some(Err(err))); + } + Ok(resp) => { + this.inner = FileStreamState::Body(resp.body.unwrap()); + } + }, + FileStreamState::Body(body) => match poll!(body.poll_next_unpin(cx)) { + None | Some(Err(_)) => { + this.inner = FileStreamState::Response( + this.key.get(this.offset, Some(&this.e_tag)).boxed(), + ); + } + Some(Ok(chunk)) => { + break chunk; + } + }, + FileStreamState::Eof => { + return Poll::Ready(None); + } + } + }; + + this.offset += chunk.len() as u64; + + if this.offset >= this.length { + this.inner = FileStreamState::Eof; + } + + Poll::Ready(Some(Ok(chunk))) + } +} |