about summary refs log tree commit diff
path: root/tvix/tools/turbofetch/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/tools/turbofetch/src')
-rw-r--r--tvix/tools/turbofetch/src/buffer.rs83
-rw-r--r--tvix/tools/turbofetch/src/lib.rs103
-rw-r--r--tvix/tools/turbofetch/src/main.rs220
3 files changed, 0 insertions, 406 deletions
diff --git a/tvix/tools/turbofetch/src/buffer.rs b/tvix/tools/turbofetch/src/buffer.rs
deleted file mode 100644
index d6ff93e3cfe7..000000000000
--- a/tvix/tools/turbofetch/src/buffer.rs
+++ /dev/null
@@ -1,83 +0,0 @@
-use magic_buffer::MagicBuffer;
-use std::cell::Cell;
-
-/// Buffer is a FIFO queue for bytes, built on a ring buffer.
-/// It always provides contiguous slices for both the readable and writable parts,
-/// using an underlying buffer that is "mirrored" in virtual memory.
-pub struct Buffer {
-    buffer: MagicBuffer,
-    /// first readable byte
-    head: Cell<usize>,
-    /// first writable byte
-    tail: usize,
-}
-
-impl Buffer {
-    /// Allocate a fresh buffer, with the specified capacity.
-    /// The buffer can contain at most `capacity - 1` bytes.
-    /// The capacity must be a power of two, and at least [Buffer::min_len].
-    pub fn new(capacity: usize) -> Buffer {
-        Buffer {
-            // MagicBuffer::new verifies that `capacity` is a power of two,
-            // and at least MagicBuffer::min_len().
-            buffer: MagicBuffer::new(capacity).unwrap(),
-            // `head == tail` means the buffer is empty.
-            // In order to ensure that this remains unambiguous,
-            // the buffer can only be filled with capacity-1 bytes.
-            head: Cell::new(0),
-            tail: 0,
-        }
-    }
-
-    /// Returns the minimum buffer capacity.
-    /// This depends on the operating system and architecture.
-    pub fn min_capacity() -> usize {
-        MagicBuffer::min_len()
-    }
-
-    /// Return the capacity of the buffer.
-    /// This is equal to `self.data().len() + self.space().len() + 1`.
-    pub fn capacity(&self) -> usize {
-        self.buffer.len()
-    }
-
-    /// Return the valid, readable data in the buffer.
-    pub fn data(&self) -> &[u8] {
-        let len = self.buffer.len();
-        let head = self.head.get();
-
-        if head <= self.tail {
-            &self.buffer[head..self.tail]
-        } else {
-            &self.buffer[head..self.tail + len]
-        }
-    }
-
-    /// Mark `read_len` bytes of the readable data as consumed, freeing the space.
-    pub fn consume(&self, read_len: usize) {
-        debug_assert!(read_len <= self.data().len());
-        let mut head = self.head.get();
-        head += read_len;
-        head &= self.buffer.len() - 1;
-        self.head.set(head);
-    }
-
-    /// Return the empty, writable space in the buffer.
-    pub fn space(&mut self) -> &mut [u8] {
-        let len = self.buffer.len();
-        let head = self.head.get();
-
-        if head <= self.tail {
-            &mut self.buffer[self.tail..head + len - 1]
-        } else {
-            &mut self.buffer[self.tail..head - 1]
-        }
-    }
-
-    /// Mark `written_len` bytes of the writable space as valid, readable data.
-    pub fn commit(&mut self, written_len: usize) {
-        debug_assert!(written_len <= self.space().len());
-        self.tail += written_len;
-        self.tail &= self.buffer.len() - 1;
-    }
-}
diff --git a/tvix/tools/turbofetch/src/lib.rs b/tvix/tools/turbofetch/src/lib.rs
deleted file mode 100644
index 4b62fa4d75e7..000000000000
--- a/tvix/tools/turbofetch/src/lib.rs
+++ /dev/null
@@ -1,103 +0,0 @@
-use std::{mem::MaybeUninit, str};
-use tokio::io::{self, AsyncRead, AsyncReadExt};
-
-pub use buffer::Buffer;
-mod buffer;
-
-/// Read as much data into `buffer` as possible.
-/// Returns [io::ErrorKind::OutOfMemory] if the buffer is already full.
-async fn slurp(buffer: &mut Buffer, sock: &mut (impl AsyncRead + Unpin)) -> io::Result<()> {
-    match buffer.space() {
-        [] => Err(io::Error::new(io::ErrorKind::OutOfMemory, "buffer filled")),
-        buf => {
-            let n = sock.read(buf).await?;
-            if n == 0 {
-                return Err(io::ErrorKind::UnexpectedEof.into());
-            }
-            buffer.commit(n);
-
-            Ok(())
-        }
-    }
-}
-
-fn get_content_length(headers: &[httparse::Header]) -> io::Result<u64> {
-    for header in headers {
-        if header.name == "Transfer-Encoding" {
-            return Err(io::Error::new(
-                io::ErrorKind::InvalidData,
-                "Transfer-Encoding is unsupported",
-            ));
-        }
-
-        if header.name == "Content-Length" {
-            return str::from_utf8(header.value)
-                .ok()
-                .and_then(|v| v.parse().ok())
-                .ok_or_else(|| {
-                    io::Error::new(io::ErrorKind::InvalidData, "invalid Content-Length")
-                });
-        }
-    }
-
-    Err(io::Error::new(
-        io::ErrorKind::InvalidData,
-        "Content-Length missing",
-    ))
-}
-
-/// Read an HTTP response from `sock` using `buffer`, returning the response body.
-/// Returns an error if anything but 200 OK is received.
-///
-/// The buffer must have enough space to contain the entire response body.
-/// If there is not enough space, [io::ErrorKind::OutOfMemory] is returned.
-///
-/// The HTTP response must use `Content-Length`, without `Transfer-Encoding`.
-pub async fn parse_response<'a>(
-    sock: &mut (impl AsyncRead + Unpin),
-    buffer: &'a mut Buffer,
-) -> io::Result<&'a [u8]> {
-    let body_len = loop {
-        let mut headers = [MaybeUninit::uninit(); 16];
-        let mut response = httparse::Response::new(&mut []);
-        let status = httparse::ParserConfig::default()
-            .parse_response_with_uninit_headers(&mut response, buffer.data(), &mut headers)
-            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
-
-        if let httparse::Status::Complete(n) = status {
-            buffer.consume(n);
-
-            let code = response.code.unwrap();
-            if code != 200 {
-                return Err(io::Error::new(
-                    io::ErrorKind::Other,
-                    format!("HTTP response {code}"),
-                ));
-            }
-
-            break get_content_length(response.headers)?;
-        }
-
-        slurp(buffer, sock).await?;
-    };
-
-    let buf_len = buffer.space().len() + buffer.data().len();
-
-    if body_len > buf_len as u64 {
-        return Err(io::Error::new(
-            io::ErrorKind::OutOfMemory,
-            "HTTP response body does not fit in buffer",
-        ));
-    }
-
-    let body_len = body_len as usize;
-
-    while buffer.data().len() < body_len {
-        slurp(buffer, sock).await?;
-    }
-
-    let data = buffer.data();
-    buffer.consume(body_len);
-
-    Ok(&data[..body_len])
-}
diff --git a/tvix/tools/turbofetch/src/main.rs b/tvix/tools/turbofetch/src/main.rs
deleted file mode 100644
index 4b3a50eb3941..000000000000
--- a/tvix/tools/turbofetch/src/main.rs
+++ /dev/null
@@ -1,220 +0,0 @@
-//! turbofetch is a high-performance bulk S3 object aggregator.
-//!
-//! It operates on two S3 buckets: a source bucket (nix-cache), and a
-//! work bucket defined at runtime. The work bucket contains a job file
-//! consisting of concatenated 32-character keys, representing narinfo
-//! files in the source bucket, without the `.narinfo` suffix or any
-//! other separators.
-//!
-//! Each run of turbofetch processes a half-open range of indices from the
-//! job file, and outputs a zstd stream of concatenated objects, without
-//! additional separators and in no particular order. These segment files
-//! are written into the work bucket, named for the range of indices they
-//! cover. `/narinfo.zst/000000000c380d40-000000000c385b60` covers the 20k
-//! objects `[0xc380d40, 0xc385b60) = [205000000, 205020000)`. Empirically,
-//! segment files of 20k objects achieve a compression ratio of 4.7x.
-//!
-//! Reassembly is left to narinfo2parquet, which interprets StorePath lines.
-//!
-//! TODO(edef): any retries/error handling whatsoever
-//! Currently, it fails an entire range if anything goes wrong, and doesn't
-//! write any output.
-
-use bytes::Bytes;
-use futures::{stream::FuturesUnordered, Stream, TryStreamExt};
-use rusoto_core::ByteStream;
-use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3};
-use serde::Deserialize;
-use std::{io::Write, mem, ops::Range, ptr};
-use tokio::{
-    io::{self, AsyncReadExt, AsyncWriteExt},
-    net::TcpStream,
-};
-
-/// Fetch a group of keys, streaming concatenated chunks as they arrive from S3.
-/// `keys` must be a slice from the job file. Any network error at all fails the
-/// entire batch, and there is no rate limiting.
-fn fetch(keys: &[[u8; 32]]) -> impl Stream<Item = io::Result<Bytes>> {
-    // S3 supports only HTTP/1.1, but we can ease the pain somewhat by using
-    // HTTP pipelining. It terminates the TCP connection after receiving 100
-    // requests, so we chunk the keys up accordingly, and make one connection
-    // for each chunk.
-    keys.chunks(100)
-        .map(|chunk| {
-            const PREFIX: &[u8] = b"GET /nix-cache/";
-            const SUFFIX: &[u8] = b".narinfo HTTP/1.1\nHost: s3.amazonaws.com\n\n";
-            const LENGTH: usize = PREFIX.len() + 32 + SUFFIX.len();
-
-            let mut request = Vec::with_capacity(LENGTH * 100);
-            for key in chunk {
-                request.extend_from_slice(PREFIX);
-                request.extend_from_slice(key);
-                request.extend_from_slice(SUFFIX);
-            }
-
-            (request, chunk.len())
-        })
-        .map(|(request, n)| async move {
-            let (mut read, mut write) = TcpStream::connect("s3.amazonaws.com:80")
-                .await?
-                .into_split();
-
-            let _handle = tokio::spawn(async move {
-                let request = request;
-                write.write_all(&request).await
-            });
-
-            let mut buffer = turbofetch::Buffer::new(512 * 1024);
-            let mut bodies = vec![];
-
-            for _ in 0..n {
-                let body = turbofetch::parse_response(&mut read, &mut buffer).await?;
-                bodies.extend_from_slice(body);
-            }
-
-            Ok::<_, io::Error>(Bytes::from(bodies))
-        })
-        .collect::<FuturesUnordered<_>>()
-}
-
-/// Retrieve a range of keys from the job file.
-async fn get_range(
-    s3: &'static S3Client,
-    bucket: String,
-    key: String,
-    range: Range<u64>,
-) -> io::Result<Box<[[u8; 32]]>> {
-    let resp = s3
-        .get_object(GetObjectRequest {
-            bucket,
-            key,
-            range: Some(format!("bytes={}-{}", range.start * 32, range.end * 32 - 1)),
-            ..GetObjectRequest::default()
-        })
-        .await
-        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
-
-    let mut body = vec![];
-    resp.body
-        .ok_or(io::ErrorKind::InvalidData)?
-        .into_async_read()
-        .read_to_end(&mut body)
-        .await?;
-
-    let body = exact_chunks(body.into_boxed_slice()).ok_or(io::ErrorKind::InvalidData)?;
-
-    Ok(body)
-}
-
-fn exact_chunks(mut buf: Box<[u8]>) -> Option<Box<[[u8; 32]]>> {
-    // SAFETY: We ensure that `buf.len()` is a multiple of 32, and there are no alignment requirements.
-    unsafe {
-        let ptr = buf.as_mut_ptr();
-        let len = buf.len();
-
-        if len % 32 != 0 {
-            return None;
-        }
-
-        let ptr = ptr as *mut [u8; 32];
-        let len = len / 32;
-        mem::forget(buf);
-
-        Some(Box::from_raw(ptr::slice_from_raw_parts_mut(ptr, len)))
-    }
-}
-
-// TODO(edef): factor this out into a separate entry point
-#[tokio::main(flavor = "current_thread")]
-async fn main() -> Result<(), lambda_runtime::Error> {
-    let s3 = S3Client::new(rusoto_core::Region::UsEast1);
-    let s3 = &*Box::leak(Box::new(s3));
-
-    tracing_subscriber::fmt()
-        .json()
-        .with_max_level(tracing::Level::INFO)
-        // this needs to be set to remove duplicated information in the log.
-        .with_current_span(false)
-        // this needs to be set to false, otherwise ANSI color codes will
-        // show up in a confusing manner in CloudWatch logs.
-        .with_ansi(false)
-        // disabling time is handy because CloudWatch will add the ingestion time.
-        .without_time()
-        // remove the name of the function from every log entry
-        .with_target(false)
-        .init();
-
-    lambda_runtime::run(lambda_runtime::service_fn(|event| func(s3, event))).await
-}
-
-/// Lambda request body
-#[derive(Debug, Deserialize)]
-struct Params {
-    work_bucket: String,
-    job_file: String,
-    start: u64,
-    end: u64,
-}
-
-#[tracing::instrument(skip(s3, event), fields(req_id = %event.context.request_id))]
-async fn func(
-    s3: &'static S3Client,
-    event: lambda_runtime::LambdaEvent<
-        aws_lambda_events::lambda_function_urls::LambdaFunctionUrlRequest,
-    >,
-) -> Result<&'static str, lambda_runtime::Error> {
-    let mut params = event.payload.body.ok_or("no body")?;
-
-    if event.payload.is_base64_encoded {
-        params = String::from_utf8(data_encoding::BASE64.decode(params.as_bytes())?)?;
-    }
-
-    let params: Params = serde_json::from_str(&params)?;
-
-    if params.start >= params.end {
-        return Err("nope".into());
-    }
-
-    let keys = get_range(
-        s3,
-        params.work_bucket.clone(),
-        params.job_file.to_owned(),
-        params.start..params.end,
-    )
-    .await?;
-
-    let zchunks = fetch(&keys)
-        .try_fold(
-            Box::new(zstd::Encoder::new(vec![], zstd::DEFAULT_COMPRESSION_LEVEL).unwrap()),
-            |mut w, buf| {
-                w.write_all(&buf).unwrap();
-                async { Ok(w) }
-            },
-        )
-        .await?;
-
-    let zchunks = to_byte_stream(zchunks.finish().unwrap());
-
-    tracing::info!("we got to put_object");
-
-    s3.put_object(PutObjectRequest {
-        bucket: params.work_bucket,
-        key: format!("narinfo.zst/{:016x}-{:016x}", params.start, params.end),
-        body: Some(zchunks),
-        ..Default::default()
-    })
-    .await
-    .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
-
-    tracing::info!("… and it worked!");
-
-    Ok("OK")
-}
-
-fn to_byte_stream(buffer: Vec<u8>) -> ByteStream {
-    let size_hint = buffer.len();
-    ByteStream::new_with_size(
-        futures::stream::once(async { Ok(buffer.into()) }),
-        size_hint,
-    )
-}