From bdda10a2f54974053b1d78b801469f076e90a6da Mon Sep 17 00:00:00 2001 From: edef Date: Sat, 4 Nov 2023 10:09:20 +0000 Subject: feat(tvix/tools/turbofetch): init Change-Id: I2efa6f94f57e812c52371256a4e62d1d54ff5057 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9925 Reviewed-by: flokli Tested-by: BuildkiteCI --- tvix/tools/turbofetch/src/buffer.rs | 87 ++++++++++++++ tvix/tools/turbofetch/src/lib.rs | 103 +++++++++++++++++ tvix/tools/turbofetch/src/main.rs | 220 ++++++++++++++++++++++++++++++++++++ 3 files changed, 410 insertions(+) create mode 100644 tvix/tools/turbofetch/src/buffer.rs create mode 100644 tvix/tools/turbofetch/src/lib.rs create mode 100644 tvix/tools/turbofetch/src/main.rs (limited to 'tvix/tools/turbofetch/src') diff --git a/tvix/tools/turbofetch/src/buffer.rs b/tvix/tools/turbofetch/src/buffer.rs new file mode 100644 index 000000000000..13207f562184 --- /dev/null +++ b/tvix/tools/turbofetch/src/buffer.rs @@ -0,0 +1,87 @@ +use magic_buffer::MagicBuffer; +use std::cell::Cell; + +/// Buffer is a FIFO queue for bytes, built on a ring buffer. +/// It always provides contiguous slices for both the readable and writable parts, +/// using an underlying buffer that is "mirrored" in virtual memory. +pub struct Buffer { + buffer: MagicBuffer, + /// first readable byte + head: Cell, + /// first writable byte + tail: usize, +} + +// SAFETY: MagicBuffer isn't bound to a thread, and neither are any of the other fields. +// MagicBuffer ought to be Send+Sync itself, upstream PR at https://github.com/sklose/magic-buffer/pull/4 +unsafe impl Send for Buffer {} + +impl Buffer { + /// Allocate a fresh buffer, with the specified capacity. + /// The buffer can contain at most `capacity - 1` bytes. + /// The capacity must be a power of two, and at least [Buffer::min_len]. + pub fn new(capacity: usize) -> Buffer { + Buffer { + // MagicBuffer::new verifies that `capacity` is a power of two, + // and at least MagicBuffer::min_len(). + buffer: MagicBuffer::new(capacity).unwrap(), + // `head == tail` means the buffer is empty. + // In order to ensure that this remains unambiguous, + // the buffer can only be filled with capacity-1 bytes. + head: Cell::new(0), + tail: 0, + } + } + + /// Returns the minimum buffer capacity. + /// This depends on the operating system and architecture. + pub fn min_capacity() -> usize { + MagicBuffer::min_len() + } + + /// Return the capacity of the buffer. + /// This is equal to `self.data().len() + self.space().len() + 1`. + pub fn capacity(&self) -> usize { + self.buffer.len() + } + + /// Return the valid, readable data in the buffer. + pub fn data(&self) -> &[u8] { + let len = self.buffer.len(); + let head = self.head.get(); + + if head <= self.tail { + &self.buffer[head..self.tail] + } else { + &self.buffer[head..self.tail + len] + } + } + + /// Mark `read_len` bytes of the readable data as consumed, freeing the space. + pub fn consume(&self, read_len: usize) { + debug_assert!(read_len <= self.data().len()); + let mut head = self.head.get(); + head += read_len; + head &= self.buffer.len() - 1; + self.head.set(head); + } + + /// Return the empty, writable space in the buffer. + pub fn space(&mut self) -> &mut [u8] { + let len = self.buffer.len(); + let head = self.head.get(); + + if head <= self.tail { + &mut self.buffer[self.tail..head + len - 1] + } else { + &mut self.buffer[self.tail..head - 1] + } + } + + /// Mark `written_len` bytes of the writable space as valid, readable data. + pub fn commit(&mut self, written_len: usize) { + debug_assert!(written_len <= self.space().len()); + self.tail += written_len; + self.tail &= self.buffer.len() - 1; + } +} diff --git a/tvix/tools/turbofetch/src/lib.rs b/tvix/tools/turbofetch/src/lib.rs new file mode 100644 index 000000000000..4b62fa4d75e7 --- /dev/null +++ b/tvix/tools/turbofetch/src/lib.rs @@ -0,0 +1,103 @@ +use std::{mem::MaybeUninit, str}; +use tokio::io::{self, AsyncRead, AsyncReadExt}; + +pub use buffer::Buffer; +mod buffer; + +/// Read as much data into `buffer` as possible. +/// Returns [io::ErrorKind::OutOfMemory] if the buffer is already full. +async fn slurp(buffer: &mut Buffer, sock: &mut (impl AsyncRead + Unpin)) -> io::Result<()> { + match buffer.space() { + [] => Err(io::Error::new(io::ErrorKind::OutOfMemory, "buffer filled")), + buf => { + let n = sock.read(buf).await?; + if n == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + buffer.commit(n); + + Ok(()) + } + } +} + +fn get_content_length(headers: &[httparse::Header]) -> io::Result { + for header in headers { + if header.name == "Transfer-Encoding" { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Transfer-Encoding is unsupported", + )); + } + + if header.name == "Content-Length" { + return str::from_utf8(header.value) + .ok() + .and_then(|v| v.parse().ok()) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "invalid Content-Length") + }); + } + } + + Err(io::Error::new( + io::ErrorKind::InvalidData, + "Content-Length missing", + )) +} + +/// Read an HTTP response from `sock` using `buffer`, returning the response body. +/// Returns an error if anything but 200 OK is received. +/// +/// The buffer must have enough space to contain the entire response body. +/// If there is not enough space, [io::ErrorKind::OutOfMemory] is returned. +/// +/// The HTTP response must use `Content-Length`, without `Transfer-Encoding`. +pub async fn parse_response<'a>( + sock: &mut (impl AsyncRead + Unpin), + buffer: &'a mut Buffer, +) -> io::Result<&'a [u8]> { + let body_len = loop { + let mut headers = [MaybeUninit::uninit(); 16]; + let mut response = httparse::Response::new(&mut []); + let status = httparse::ParserConfig::default() + .parse_response_with_uninit_headers(&mut response, buffer.data(), &mut headers) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + if let httparse::Status::Complete(n) = status { + buffer.consume(n); + + let code = response.code.unwrap(); + if code != 200 { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("HTTP response {code}"), + )); + } + + break get_content_length(response.headers)?; + } + + slurp(buffer, sock).await?; + }; + + let buf_len = buffer.space().len() + buffer.data().len(); + + if body_len > buf_len as u64 { + return Err(io::Error::new( + io::ErrorKind::OutOfMemory, + "HTTP response body does not fit in buffer", + )); + } + + let body_len = body_len as usize; + + while buffer.data().len() < body_len { + slurp(buffer, sock).await?; + } + + let data = buffer.data(); + buffer.consume(body_len); + + Ok(&data[..body_len]) +} diff --git a/tvix/tools/turbofetch/src/main.rs b/tvix/tools/turbofetch/src/main.rs new file mode 100644 index 000000000000..4b3a50eb3941 --- /dev/null +++ b/tvix/tools/turbofetch/src/main.rs @@ -0,0 +1,220 @@ +//! turbofetch is a high-performance bulk S3 object aggregator. +//! +//! It operates on two S3 buckets: a source bucket (nix-cache), and a +//! work bucket defined at runtime. The work bucket contains a job file +//! consisting of concatenated 32-character keys, representing narinfo +//! files in the source bucket, without the `.narinfo` suffix or any +//! other separators. +//! +//! Each run of turbofetch processes a half-open range of indices from the +//! job file, and outputs a zstd stream of concatenated objects, without +//! additional separators and in no particular order. These segment files +//! are written into the work bucket, named for the range of indices they +//! cover. `/narinfo.zst/000000000c380d40-000000000c385b60` covers the 20k +//! objects `[0xc380d40, 0xc385b60) = [205000000, 205020000)`. Empirically, +//! segment files of 20k objects achieve a compression ratio of 4.7x. +//! +//! Reassembly is left to narinfo2parquet, which interprets StorePath lines. +//! +//! TODO(edef): any retries/error handling whatsoever +//! Currently, it fails an entire range if anything goes wrong, and doesn't +//! write any output. + +use bytes::Bytes; +use futures::{stream::FuturesUnordered, Stream, TryStreamExt}; +use rusoto_core::ByteStream; +use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; +use serde::Deserialize; +use std::{io::Write, mem, ops::Range, ptr}; +use tokio::{ + io::{self, AsyncReadExt, AsyncWriteExt}, + net::TcpStream, +}; + +/// Fetch a group of keys, streaming concatenated chunks as they arrive from S3. +/// `keys` must be a slice from the job file. Any network error at all fails the +/// entire batch, and there is no rate limiting. +fn fetch(keys: &[[u8; 32]]) -> impl Stream> { + // S3 supports only HTTP/1.1, but we can ease the pain somewhat by using + // HTTP pipelining. It terminates the TCP connection after receiving 100 + // requests, so we chunk the keys up accordingly, and make one connection + // for each chunk. + keys.chunks(100) + .map(|chunk| { + const PREFIX: &[u8] = b"GET /nix-cache/"; + const SUFFIX: &[u8] = b".narinfo HTTP/1.1\nHost: s3.amazonaws.com\n\n"; + const LENGTH: usize = PREFIX.len() + 32 + SUFFIX.len(); + + let mut request = Vec::with_capacity(LENGTH * 100); + for key in chunk { + request.extend_from_slice(PREFIX); + request.extend_from_slice(key); + request.extend_from_slice(SUFFIX); + } + + (request, chunk.len()) + }) + .map(|(request, n)| async move { + let (mut read, mut write) = TcpStream::connect("s3.amazonaws.com:80") + .await? + .into_split(); + + let _handle = tokio::spawn(async move { + let request = request; + write.write_all(&request).await + }); + + let mut buffer = turbofetch::Buffer::new(512 * 1024); + let mut bodies = vec![]; + + for _ in 0..n { + let body = turbofetch::parse_response(&mut read, &mut buffer).await?; + bodies.extend_from_slice(body); + } + + Ok::<_, io::Error>(Bytes::from(bodies)) + }) + .collect::>() +} + +/// Retrieve a range of keys from the job file. +async fn get_range( + s3: &'static S3Client, + bucket: String, + key: String, + range: Range, +) -> io::Result> { + let resp = s3 + .get_object(GetObjectRequest { + bucket, + key, + range: Some(format!("bytes={}-{}", range.start * 32, range.end * 32 - 1)), + ..GetObjectRequest::default() + }) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + let mut body = vec![]; + resp.body + .ok_or(io::ErrorKind::InvalidData)? + .into_async_read() + .read_to_end(&mut body) + .await?; + + let body = exact_chunks(body.into_boxed_slice()).ok_or(io::ErrorKind::InvalidData)?; + + Ok(body) +} + +fn exact_chunks(mut buf: Box<[u8]>) -> Option> { + // SAFETY: We ensure that `buf.len()` is a multiple of 32, and there are no alignment requirements. + unsafe { + let ptr = buf.as_mut_ptr(); + let len = buf.len(); + + if len % 32 != 0 { + return None; + } + + let ptr = ptr as *mut [u8; 32]; + let len = len / 32; + mem::forget(buf); + + Some(Box::from_raw(ptr::slice_from_raw_parts_mut(ptr, len))) + } +} + +// TODO(edef): factor this out into a separate entry point +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), lambda_runtime::Error> { + let s3 = S3Client::new(rusoto_core::Region::UsEast1); + let s3 = &*Box::leak(Box::new(s3)); + + tracing_subscriber::fmt() + .json() + .with_max_level(tracing::Level::INFO) + // this needs to be set to remove duplicated information in the log. + .with_current_span(false) + // this needs to be set to false, otherwise ANSI color codes will + // show up in a confusing manner in CloudWatch logs. + .with_ansi(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + // remove the name of the function from every log entry + .with_target(false) + .init(); + + lambda_runtime::run(lambda_runtime::service_fn(|event| func(s3, event))).await +} + +/// Lambda request body +#[derive(Debug, Deserialize)] +struct Params { + work_bucket: String, + job_file: String, + start: u64, + end: u64, +} + +#[tracing::instrument(skip(s3, event), fields(req_id = %event.context.request_id))] +async fn func( + s3: &'static S3Client, + event: lambda_runtime::LambdaEvent< + aws_lambda_events::lambda_function_urls::LambdaFunctionUrlRequest, + >, +) -> Result<&'static str, lambda_runtime::Error> { + let mut params = event.payload.body.ok_or("no body")?; + + if event.payload.is_base64_encoded { + params = String::from_utf8(data_encoding::BASE64.decode(params.as_bytes())?)?; + } + + let params: Params = serde_json::from_str(¶ms)?; + + if params.start >= params.end { + return Err("nope".into()); + } + + let keys = get_range( + s3, + params.work_bucket.clone(), + params.job_file.to_owned(), + params.start..params.end, + ) + .await?; + + let zchunks = fetch(&keys) + .try_fold( + Box::new(zstd::Encoder::new(vec![], zstd::DEFAULT_COMPRESSION_LEVEL).unwrap()), + |mut w, buf| { + w.write_all(&buf).unwrap(); + async { Ok(w) } + }, + ) + .await?; + + let zchunks = to_byte_stream(zchunks.finish().unwrap()); + + tracing::info!("we got to put_object"); + + s3.put_object(PutObjectRequest { + bucket: params.work_bucket, + key: format!("narinfo.zst/{:016x}-{:016x}", params.start, params.end), + body: Some(zchunks), + ..Default::default() + }) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + tracing::info!("… and it worked!"); + + Ok("OK") +} + +fn to_byte_stream(buffer: Vec) -> ByteStream { + let size_hint = buffer.len(); + ByteStream::new_with_size( + futures::stream::once(async { Ok(buffer.into()) }), + size_hint, + ) +} -- cgit 1.4.1