diff options
Diffstat (limited to 'tvix/tools/turbofetch/src')
-rw-r--r-- | tvix/tools/turbofetch/src/buffer.rs | 83 | ||||
-rw-r--r-- | tvix/tools/turbofetch/src/lib.rs | 103 | ||||
-rw-r--r-- | tvix/tools/turbofetch/src/main.rs | 220 |
3 files changed, 0 insertions, 406 deletions
diff --git a/tvix/tools/turbofetch/src/buffer.rs b/tvix/tools/turbofetch/src/buffer.rs deleted file mode 100644 index d6ff93e3cfe7..000000000000 --- a/tvix/tools/turbofetch/src/buffer.rs +++ /dev/null @@ -1,83 +0,0 @@ -use magic_buffer::MagicBuffer; -use std::cell::Cell; - -/// Buffer is a FIFO queue for bytes, built on a ring buffer. -/// It always provides contiguous slices for both the readable and writable parts, -/// using an underlying buffer that is "mirrored" in virtual memory. -pub struct Buffer { - buffer: MagicBuffer, - /// first readable byte - head: Cell<usize>, - /// first writable byte - tail: usize, -} - -impl Buffer { - /// Allocate a fresh buffer, with the specified capacity. - /// The buffer can contain at most `capacity - 1` bytes. - /// The capacity must be a power of two, and at least [Buffer::min_len]. - pub fn new(capacity: usize) -> Buffer { - Buffer { - // MagicBuffer::new verifies that `capacity` is a power of two, - // and at least MagicBuffer::min_len(). - buffer: MagicBuffer::new(capacity).unwrap(), - // `head == tail` means the buffer is empty. - // In order to ensure that this remains unambiguous, - // the buffer can only be filled with capacity-1 bytes. - head: Cell::new(0), - tail: 0, - } - } - - /// Returns the minimum buffer capacity. - /// This depends on the operating system and architecture. - pub fn min_capacity() -> usize { - MagicBuffer::min_len() - } - - /// Return the capacity of the buffer. - /// This is equal to `self.data().len() + self.space().len() + 1`. - pub fn capacity(&self) -> usize { - self.buffer.len() - } - - /// Return the valid, readable data in the buffer. - pub fn data(&self) -> &[u8] { - let len = self.buffer.len(); - let head = self.head.get(); - - if head <= self.tail { - &self.buffer[head..self.tail] - } else { - &self.buffer[head..self.tail + len] - } - } - - /// Mark `read_len` bytes of the readable data as consumed, freeing the space. - pub fn consume(&self, read_len: usize) { - debug_assert!(read_len <= self.data().len()); - let mut head = self.head.get(); - head += read_len; - head &= self.buffer.len() - 1; - self.head.set(head); - } - - /// Return the empty, writable space in the buffer. - pub fn space(&mut self) -> &mut [u8] { - let len = self.buffer.len(); - let head = self.head.get(); - - if head <= self.tail { - &mut self.buffer[self.tail..head + len - 1] - } else { - &mut self.buffer[self.tail..head - 1] - } - } - - /// Mark `written_len` bytes of the writable space as valid, readable data. - pub fn commit(&mut self, written_len: usize) { - debug_assert!(written_len <= self.space().len()); - self.tail += written_len; - self.tail &= self.buffer.len() - 1; - } -} diff --git a/tvix/tools/turbofetch/src/lib.rs b/tvix/tools/turbofetch/src/lib.rs deleted file mode 100644 index 4b62fa4d75e7..000000000000 --- a/tvix/tools/turbofetch/src/lib.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::{mem::MaybeUninit, str}; -use tokio::io::{self, AsyncRead, AsyncReadExt}; - -pub use buffer::Buffer; -mod buffer; - -/// Read as much data into `buffer` as possible. -/// Returns [io::ErrorKind::OutOfMemory] if the buffer is already full. -async fn slurp(buffer: &mut Buffer, sock: &mut (impl AsyncRead + Unpin)) -> io::Result<()> { - match buffer.space() { - [] => Err(io::Error::new(io::ErrorKind::OutOfMemory, "buffer filled")), - buf => { - let n = sock.read(buf).await?; - if n == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - buffer.commit(n); - - Ok(()) - } - } -} - -fn get_content_length(headers: &[httparse::Header]) -> io::Result<u64> { - for header in headers { - if header.name == "Transfer-Encoding" { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Transfer-Encoding is unsupported", - )); - } - - if header.name == "Content-Length" { - return str::from_utf8(header.value) - .ok() - .and_then(|v| v.parse().ok()) - .ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidData, "invalid Content-Length") - }); - } - } - - Err(io::Error::new( - io::ErrorKind::InvalidData, - "Content-Length missing", - )) -} - -/// Read an HTTP response from `sock` using `buffer`, returning the response body. -/// Returns an error if anything but 200 OK is received. -/// -/// The buffer must have enough space to contain the entire response body. -/// If there is not enough space, [io::ErrorKind::OutOfMemory] is returned. -/// -/// The HTTP response must use `Content-Length`, without `Transfer-Encoding`. -pub async fn parse_response<'a>( - sock: &mut (impl AsyncRead + Unpin), - buffer: &'a mut Buffer, -) -> io::Result<&'a [u8]> { - let body_len = loop { - let mut headers = [MaybeUninit::uninit(); 16]; - let mut response = httparse::Response::new(&mut []); - let status = httparse::ParserConfig::default() - .parse_response_with_uninit_headers(&mut response, buffer.data(), &mut headers) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - - if let httparse::Status::Complete(n) = status { - buffer.consume(n); - - let code = response.code.unwrap(); - if code != 200 { - return Err(io::Error::new( - io::ErrorKind::Other, - format!("HTTP response {code}"), - )); - } - - break get_content_length(response.headers)?; - } - - slurp(buffer, sock).await?; - }; - - let buf_len = buffer.space().len() + buffer.data().len(); - - if body_len > buf_len as u64 { - return Err(io::Error::new( - io::ErrorKind::OutOfMemory, - "HTTP response body does not fit in buffer", - )); - } - - let body_len = body_len as usize; - - while buffer.data().len() < body_len { - slurp(buffer, sock).await?; - } - - let data = buffer.data(); - buffer.consume(body_len); - - Ok(&data[..body_len]) -} diff --git a/tvix/tools/turbofetch/src/main.rs b/tvix/tools/turbofetch/src/main.rs deleted file mode 100644 index 4b3a50eb3941..000000000000 --- a/tvix/tools/turbofetch/src/main.rs +++ /dev/null @@ -1,220 +0,0 @@ -//! turbofetch is a high-performance bulk S3 object aggregator. -//! -//! It operates on two S3 buckets: a source bucket (nix-cache), and a -//! work bucket defined at runtime. The work bucket contains a job file -//! consisting of concatenated 32-character keys, representing narinfo -//! files in the source bucket, without the `.narinfo` suffix or any -//! other separators. -//! -//! Each run of turbofetch processes a half-open range of indices from the -//! job file, and outputs a zstd stream of concatenated objects, without -//! additional separators and in no particular order. These segment files -//! are written into the work bucket, named for the range of indices they -//! cover. `/narinfo.zst/000000000c380d40-000000000c385b60` covers the 20k -//! objects `[0xc380d40, 0xc385b60) = [205000000, 205020000)`. Empirically, -//! segment files of 20k objects achieve a compression ratio of 4.7x. -//! -//! Reassembly is left to narinfo2parquet, which interprets StorePath lines. -//! -//! TODO(edef): any retries/error handling whatsoever -//! Currently, it fails an entire range if anything goes wrong, and doesn't -//! write any output. - -use bytes::Bytes; -use futures::{stream::FuturesUnordered, Stream, TryStreamExt}; -use rusoto_core::ByteStream; -use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3}; -use serde::Deserialize; -use std::{io::Write, mem, ops::Range, ptr}; -use tokio::{ - io::{self, AsyncReadExt, AsyncWriteExt}, - net::TcpStream, -}; - -/// Fetch a group of keys, streaming concatenated chunks as they arrive from S3. -/// `keys` must be a slice from the job file. Any network error at all fails the -/// entire batch, and there is no rate limiting. -fn fetch(keys: &[[u8; 32]]) -> impl Stream<Item = io::Result<Bytes>> { - // S3 supports only HTTP/1.1, but we can ease the pain somewhat by using - // HTTP pipelining. It terminates the TCP connection after receiving 100 - // requests, so we chunk the keys up accordingly, and make one connection - // for each chunk. - keys.chunks(100) - .map(|chunk| { - const PREFIX: &[u8] = b"GET /nix-cache/"; - const SUFFIX: &[u8] = b".narinfo HTTP/1.1\nHost: s3.amazonaws.com\n\n"; - const LENGTH: usize = PREFIX.len() + 32 + SUFFIX.len(); - - let mut request = Vec::with_capacity(LENGTH * 100); - for key in chunk { - request.extend_from_slice(PREFIX); - request.extend_from_slice(key); - request.extend_from_slice(SUFFIX); - } - - (request, chunk.len()) - }) - .map(|(request, n)| async move { - let (mut read, mut write) = TcpStream::connect("s3.amazonaws.com:80") - .await? - .into_split(); - - let _handle = tokio::spawn(async move { - let request = request; - write.write_all(&request).await - }); - - let mut buffer = turbofetch::Buffer::new(512 * 1024); - let mut bodies = vec![]; - - for _ in 0..n { - let body = turbofetch::parse_response(&mut read, &mut buffer).await?; - bodies.extend_from_slice(body); - } - - Ok::<_, io::Error>(Bytes::from(bodies)) - }) - .collect::<FuturesUnordered<_>>() -} - -/// Retrieve a range of keys from the job file. -async fn get_range( - s3: &'static S3Client, - bucket: String, - key: String, - range: Range<u64>, -) -> io::Result<Box<[[u8; 32]]>> { - let resp = s3 - .get_object(GetObjectRequest { - bucket, - key, - range: Some(format!("bytes={}-{}", range.start * 32, range.end * 32 - 1)), - ..GetObjectRequest::default() - }) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - let mut body = vec![]; - resp.body - .ok_or(io::ErrorKind::InvalidData)? - .into_async_read() - .read_to_end(&mut body) - .await?; - - let body = exact_chunks(body.into_boxed_slice()).ok_or(io::ErrorKind::InvalidData)?; - - Ok(body) -} - -fn exact_chunks(mut buf: Box<[u8]>) -> Option<Box<[[u8; 32]]>> { - // SAFETY: We ensure that `buf.len()` is a multiple of 32, and there are no alignment requirements. - unsafe { - let ptr = buf.as_mut_ptr(); - let len = buf.len(); - - if len % 32 != 0 { - return None; - } - - let ptr = ptr as *mut [u8; 32]; - let len = len / 32; - mem::forget(buf); - - Some(Box::from_raw(ptr::slice_from_raw_parts_mut(ptr, len))) - } -} - -// TODO(edef): factor this out into a separate entry point -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), lambda_runtime::Error> { - let s3 = S3Client::new(rusoto_core::Region::UsEast1); - let s3 = &*Box::leak(Box::new(s3)); - - tracing_subscriber::fmt() - .json() - .with_max_level(tracing::Level::INFO) - // this needs to be set to remove duplicated information in the log. - .with_current_span(false) - // this needs to be set to false, otherwise ANSI color codes will - // show up in a confusing manner in CloudWatch logs. - .with_ansi(false) - // disabling time is handy because CloudWatch will add the ingestion time. - .without_time() - // remove the name of the function from every log entry - .with_target(false) - .init(); - - lambda_runtime::run(lambda_runtime::service_fn(|event| func(s3, event))).await -} - -/// Lambda request body -#[derive(Debug, Deserialize)] -struct Params { - work_bucket: String, - job_file: String, - start: u64, - end: u64, -} - -#[tracing::instrument(skip(s3, event), fields(req_id = %event.context.request_id))] -async fn func( - s3: &'static S3Client, - event: lambda_runtime::LambdaEvent< - aws_lambda_events::lambda_function_urls::LambdaFunctionUrlRequest, - >, -) -> Result<&'static str, lambda_runtime::Error> { - let mut params = event.payload.body.ok_or("no body")?; - - if event.payload.is_base64_encoded { - params = String::from_utf8(data_encoding::BASE64.decode(params.as_bytes())?)?; - } - - let params: Params = serde_json::from_str(¶ms)?; - - if params.start >= params.end { - return Err("nope".into()); - } - - let keys = get_range( - s3, - params.work_bucket.clone(), - params.job_file.to_owned(), - params.start..params.end, - ) - .await?; - - let zchunks = fetch(&keys) - .try_fold( - Box::new(zstd::Encoder::new(vec![], zstd::DEFAULT_COMPRESSION_LEVEL).unwrap()), - |mut w, buf| { - w.write_all(&buf).unwrap(); - async { Ok(w) } - }, - ) - .await?; - - let zchunks = to_byte_stream(zchunks.finish().unwrap()); - - tracing::info!("we got to put_object"); - - s3.put_object(PutObjectRequest { - bucket: params.work_bucket, - key: format!("narinfo.zst/{:016x}-{:016x}", params.start, params.end), - body: Some(zchunks), - ..Default::default() - }) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - tracing::info!("… and it worked!"); - - Ok("OK") -} - -fn to_byte_stream(buffer: Vec<u8>) -> ByteStream { - let size_hint = buffer.len(); - ByteStream::new_with_size( - futures::stream::once(async { Ok(buffer.into()) }), - size_hint, - ) -} |