about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-02T16·00+0200
committerflokli <flokli@flokli.de>2024-03-03T11·22+0000
commit7bebf492ec7742b8a24c695d77fa64fdf654ab19 (patch)
tree059b9651dd54ae0eb8e57f9db5d7e82baf199de3 /tvix
parent1608f935aad2696f38231ef779bffc1f5ac31fec (diff)
refactor(tvix/castore/blobsvc/chunked_reader): refactor, document r/7631
The public-consumable thing here is ChunkedReader, not ChunkedBlob.

ChunkedBlob is a helper that can be used to get a new AsyncRead, but
not AsyncSeek. It is used internally by ChunkedReader whenever the
client seeks.

Make this more obvious, by extending the documentation, and putting
ChunkedReader at the top of this file.

Also make ChunkedBlob and its methods private, and give ChunkedReader a
more useful constructor (from_chunks, instead of from_chunked_blob).

Change-Id: I2399867591df923faa73927b924e7c116ad98dc0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11079
Tested-by: BuildkiteCI
Reviewed-by: Brian Olsen <me@griff.name>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix')
-rw-r--r--tvix/castore/src/blobservice/chunked_reader.rs270
1 files changed, 139 insertions, 131 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs
index f3c73948f809..6ca0d2003226 100644
--- a/tvix/castore/src/blobservice/chunked_reader.rs
+++ b/tvix/castore/src/blobservice/chunked_reader.rs
@@ -9,27 +9,145 @@ use tracing::warn;
 use crate::B3Digest;
 use std::{cmp::Ordering, pin::Pin, task::Poll};
 
-use super::BlobService;
+use super::{BlobReader, BlobService};
 
-/// Supports reading a blob in a chunked fashion.
-/// Takes a list of blake3 digest for individual chunks (and their sizes).
-/// It internally keeps:
-/// - a reference to the blob service, used to fetch chunks
-/// - a the list of all chunks (chunk start offset, chunk len, chunk digest)
+pin_project! {
+    /// ChunkedReader provides a chunk-aware [BlobReader], so allows reading and
+    /// seeking into a blob.
+    /// It internally holds a [ChunkedBlob], which is storing chunk information
+    /// able to emit a reader seeked to a specific position whenever we need to seek.
+    pub struct ChunkedReader<BS> {
+        chunked_blob: ChunkedBlob<BS>,
+
+        #[pin]
+        r: Box<dyn AsyncRead + Unpin + Send>,
+
+        pos: u64,
+    }
+}
+
+impl<BS> ChunkedReader<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + 'static + Send,
+{
+    /// Construct a new [ChunkedReader], by retrieving a list of chunks (their
+    /// blake3 digests and chunk sizes)
+    pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
+        let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service);
+        let r = chunked_blob.reader_skipped_offset(0);
+
+        Self {
+            chunked_blob,
+            r,
+            pos: 0,
+        }
+    }
+}
+
+/// ChunkedReader implements BlobReader.
+impl<BS> BlobReader for ChunkedReader<BS> where BS: Send + Clone + 'static + AsRef<dyn BlobService> {}
+
+impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + 'static,
+{
+    fn poll_read(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> std::task::Poll<std::io::Result<()>> {
+        // The amount of data read can be determined by the increase
+        // in the length of the slice returned by `ReadBuf::filled`.
+        let filled_before = buf.filled().len();
+
+        let this = self.project();
+        match this.r.poll_read(cx, buf) {
+            Poll::Ready(a) => {
+                let bytes_read = buf.filled().len() - filled_before;
+                *this.pos += bytes_read as u64;
+
+                Poll::Ready(a)
+            }
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + Send + 'static,
+{
+    fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
+        let total_len = self.chunked_blob.blob_length();
+        let current_pos = self.pos;
+        let this = self.project();
+        let pos: &mut u64 = this.pos;
+        let mut r: Pin<&mut Box<dyn AsyncRead + Send + Unpin>> = this.r;
+
+        let new_position: u64 = match position {
+            std::io::SeekFrom::Start(from_start) => from_start,
+            std::io::SeekFrom::End(from_end) => {
+                // note from_end is i64, not u64, so this is usually negative.
+                total_len.checked_add_signed(from_end).ok_or_else(|| {
+                    std::io::Error::new(
+                        std::io::ErrorKind::InvalidInput,
+                        "over/underflow while seeking",
+                    )
+                })?
+            }
+            std::io::SeekFrom::Current(from_current) => {
+                // note from_end is i64, not u64, so this can be positive or negative.
+                current_pos
+                    .checked_add_signed(from_current)
+                    .ok_or_else(|| {
+                        std::io::Error::new(
+                            std::io::ErrorKind::InvalidInput,
+                            "over/underflow while seeking",
+                        )
+                    })?
+            }
+        };
+
+        // ensure the new position still is inside the file.
+        if new_position > total_len {
+            Err(std::io::Error::new(
+                std::io::ErrorKind::InvalidInput,
+                "seeked beyond EOF",
+            ))?
+        }
+
+        // Update the position and the internal reader.
+        *pos = new_position;
+        *r = this.chunked_blob.reader_skipped_offset(new_position);
+
+        Ok(())
+    }
+
+    fn poll_complete(
+        self: Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<std::io::Result<u64>> {
+        std::task::Poll::Ready(Ok(self.pos))
+    }
+}
+
+/// Holds a list of blake3 digest for individual chunks (and their sizes).
+/// Is able to construct a Reader that seeked to a certain offset, which
+/// is useful to construct a BlobReader (that implements AsyncSeek).
 /// - the current chunk index, and a Custor<Vec<u8>> holding the data of that chunk.
-pub struct ChunkedBlob<BS> {
+struct ChunkedBlob<BS> {
     blob_service: BS,
     chunks: Vec<(u64, u64, B3Digest)>,
 }
 
 impl<BS> ChunkedBlob<BS>
 where
-    BS: AsRef<dyn BlobService> + Clone + 'static,
+    BS: AsRef<dyn BlobService> + Clone + 'static + Send,
 {
-    /// Constructs a new ChunkedBlobReader from a list of blake 3 digests of
-    /// chunks and their sizes.
+    /// Constructs [Self] from a list of blake3 digests of chunks and their
+    /// sizes, and a reference to a blob service.
     /// Initializing it with an empty list is disallowed.
-    pub fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
+    fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
         let mut chunks = Vec::new();
         let mut offset: u64 = 0;
 
@@ -50,7 +168,7 @@ where
     }
 
     /// Returns the length of the blob.
-    pub fn blob_length(&self) -> u64 {
+    fn blob_length(&self) -> u64 {
         self.chunks
             .last()
             .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size)
@@ -78,7 +196,9 @@ where
     /// It internally assembles a stream reading from each chunk (skipping over
     /// chunks containing irrelevant data).
     /// From the first relevant chunk, the irrelevant bytes are skipped too.
-    pub fn reader_skipped_offset(&self, offset: u64) -> Box<dyn AsyncRead + Unpin> {
+    /// The returned boxed thing does not implement AsyncSeek on its own, but
+    /// ChunkedReader does.
+    fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
         if offset == self.blob_length() {
             return Box::new(std::io::Cursor::new(vec![]));
         }
@@ -130,117 +250,6 @@ where
     }
 }
 
-pin_project! {
-    /// Wraps the underlying ChunkedBlob and exposes a AsyncRead and AsyncSeek.
-    pub struct ChunkedReader<BS> {
-        chunked_blob: ChunkedBlob<BS>,
-
-        #[pin]
-        r: Box<dyn AsyncRead + Unpin>,
-
-        pos: u64,
-    }
-}
-
-impl<BS> ChunkedReader<BS>
-where
-    BS: AsRef<dyn BlobService> + Clone + 'static,
-{
-    pub fn from_chunked_blob(chunked_blob: ChunkedBlob<BS>) -> Self {
-        let r = chunked_blob.reader_skipped_offset(0);
-
-        Self {
-            chunked_blob,
-            r,
-            pos: 0,
-        }
-    }
-}
-
-impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
-where
-    BS: AsRef<dyn BlobService> + Clone + 'static,
-{
-    fn poll_read(
-        self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &mut tokio::io::ReadBuf<'_>,
-    ) -> std::task::Poll<std::io::Result<()>> {
-        // The amount of data read can be determined by the increase
-        // in the length of the slice returned by `ReadBuf::filled`.
-        let filled_before = buf.filled().len();
-
-        let this = self.project();
-        match this.r.poll_read(cx, buf) {
-            Poll::Ready(a) => {
-                let bytes_read = buf.filled().len() - filled_before;
-                *this.pos += bytes_read as u64;
-
-                Poll::Ready(a)
-            }
-            Poll::Pending => Poll::Pending,
-        }
-    }
-}
-
-impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
-where
-    BS: AsRef<dyn BlobService> + Clone + 'static,
-{
-    fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
-        let total_len = self.chunked_blob.blob_length();
-        let current_pos = self.pos;
-        let this = self.project();
-        let pos: &mut u64 = this.pos;
-        let mut r: Pin<&mut Box<dyn AsyncRead + Unpin>> = this.r;
-
-        let new_position: u64 = match position {
-            std::io::SeekFrom::Start(from_start) => from_start,
-            std::io::SeekFrom::End(from_end) => {
-                // note from_end is i64, not u64, so this is usually negative.
-                total_len.checked_add_signed(from_end).ok_or_else(|| {
-                    std::io::Error::new(
-                        std::io::ErrorKind::InvalidInput,
-                        "over/underflow while seeking",
-                    )
-                })?
-            }
-            std::io::SeekFrom::Current(from_current) => {
-                // note from_end is i64, not u64, so this can be positive or negative.
-                current_pos
-                    .checked_add_signed(from_current)
-                    .ok_or_else(|| {
-                        std::io::Error::new(
-                            std::io::ErrorKind::InvalidInput,
-                            "over/underflow while seeking",
-                        )
-                    })?
-            }
-        };
-
-        // ensure the new position still is inside the file.
-        if new_position > total_len {
-            Err(std::io::Error::new(
-                std::io::ErrorKind::InvalidInput,
-                "seeked beyond EOF",
-            ))?
-        }
-
-        // Update the position and the internal reader.
-        *pos = new_position;
-        *r = this.chunked_blob.reader_skipped_offset(new_position);
-
-        Ok(())
-    }
-
-    fn poll_complete(
-        self: Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<std::io::Result<u64>> {
-        std::task::Poll::Ready(Ok(self.pos))
-    }
-}
-
 #[cfg(test)]
 mod test {
     use std::{io::SeekFrom, sync::Arc};
@@ -370,8 +379,8 @@ mod test {
     #[tokio::test]
     async fn test_read() {
         let blob_service = gen_blobservice_blob1().await;
-        let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service);
-        let mut chunked_reader = ChunkedReader::from_chunked_blob(cb);
+        let mut chunked_reader =
+            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
 
         // read all data
         let mut buf = Vec::new();
@@ -389,8 +398,8 @@ mod test {
     #[tokio::test]
     async fn test_seek() {
         let blob_service = gen_blobservice_blob1().await;
-        let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service);
-        let mut chunked_reader = ChunkedReader::from_chunked_blob(cb);
+        let mut chunked_reader =
+            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
 
         // seek to the end
         // expect to read 0 bytes
@@ -456,9 +465,8 @@ mod test {
             bw.close().await.expect("close blobwriter");
         }
 
-        let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service);
-
-        let mut chunked_reader = ChunkedReader::from_chunked_blob(cb);
+        let mut chunked_reader =
+            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
 
         // read a bit from the front (5 bytes out of 6 available)
         let mut buf = [0b0; 5];