about summary refs log tree commit diff
path: root/tvix/tools/turbofetch/src/lib.rs
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2023-11-04T10·09+0000
committeredef <edef@edef.eu>2023-11-10T19·05+0000
commitbdda10a2f54974053b1d78b801469f076e90a6da (patch)
treeda10ab5e9f64b624fec66a9a0f1aaa94947afd59 /tvix/tools/turbofetch/src/lib.rs
parent9cd2e920653914d4ef7bec525614d101f3fdd207 (diff)
feat(tvix/tools/turbofetch): init r/6979
Change-Id: I2efa6f94f57e812c52371256a4e62d1d54ff5057
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9925
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/tools/turbofetch/src/lib.rs')
-rw-r--r--tvix/tools/turbofetch/src/lib.rs103
1 files changed, 103 insertions, 0 deletions
diff --git a/tvix/tools/turbofetch/src/lib.rs b/tvix/tools/turbofetch/src/lib.rs
new file mode 100644
index 000000000000..4b62fa4d75e7
--- /dev/null
+++ b/tvix/tools/turbofetch/src/lib.rs
@@ -0,0 +1,103 @@
+use std::{mem::MaybeUninit, str};
+use tokio::io::{self, AsyncRead, AsyncReadExt};
+
+pub use buffer::Buffer;
+mod buffer;
+
+/// Read as much data into `buffer` as possible.
+/// Returns [io::ErrorKind::OutOfMemory] if the buffer is already full.
+async fn slurp(buffer: &mut Buffer, sock: &mut (impl AsyncRead + Unpin)) -> io::Result<()> {
+    match buffer.space() {
+        [] => Err(io::Error::new(io::ErrorKind::OutOfMemory, "buffer filled")),
+        buf => {
+            let n = sock.read(buf).await?;
+            if n == 0 {
+                return Err(io::ErrorKind::UnexpectedEof.into());
+            }
+            buffer.commit(n);
+
+            Ok(())
+        }
+    }
+}
+
+fn get_content_length(headers: &[httparse::Header]) -> io::Result<u64> {
+    for header in headers {
+        if header.name == "Transfer-Encoding" {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "Transfer-Encoding is unsupported",
+            ));
+        }
+
+        if header.name == "Content-Length" {
+            return str::from_utf8(header.value)
+                .ok()
+                .and_then(|v| v.parse().ok())
+                .ok_or_else(|| {
+                    io::Error::new(io::ErrorKind::InvalidData, "invalid Content-Length")
+                });
+        }
+    }
+
+    Err(io::Error::new(
+        io::ErrorKind::InvalidData,
+        "Content-Length missing",
+    ))
+}
+
+/// Read an HTTP response from `sock` using `buffer`, returning the response body.
+/// Returns an error if anything but 200 OK is received.
+///
+/// The buffer must have enough space to contain the entire response body.
+/// If there is not enough space, [io::ErrorKind::OutOfMemory] is returned.
+///
+/// The HTTP response must use `Content-Length`, without `Transfer-Encoding`.
+pub async fn parse_response<'a>(
+    sock: &mut (impl AsyncRead + Unpin),
+    buffer: &'a mut Buffer,
+) -> io::Result<&'a [u8]> {
+    let body_len = loop {
+        let mut headers = [MaybeUninit::uninit(); 16];
+        let mut response = httparse::Response::new(&mut []);
+        let status = httparse::ParserConfig::default()
+            .parse_response_with_uninit_headers(&mut response, buffer.data(), &mut headers)
+            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
+
+        if let httparse::Status::Complete(n) = status {
+            buffer.consume(n);
+
+            let code = response.code.unwrap();
+            if code != 200 {
+                return Err(io::Error::new(
+                    io::ErrorKind::Other,
+                    format!("HTTP response {code}"),
+                ));
+            }
+
+            break get_content_length(response.headers)?;
+        }
+
+        slurp(buffer, sock).await?;
+    };
+
+    let buf_len = buffer.space().len() + buffer.data().len();
+
+    if body_len > buf_len as u64 {
+        return Err(io::Error::new(
+            io::ErrorKind::OutOfMemory,
+            "HTTP response body does not fit in buffer",
+        ));
+    }
+
+    let body_len = body_len as usize;
+
+    while buffer.data().len() < body_len {
+        slurp(buffer, sock).await?;
+    }
+
+    let data = buffer.data();
+    buffer.consume(body_len);
+
+    Ok(&data[..body_len])
+}