about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/castore/src/blobservice/chunked_reader.rs270
1 files changed, 139 insertions, 131 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs
index f3c73948f809..6ca0d2003226 100644
--- a/tvix/castore/src/blobservice/chunked_reader.rs
+++ b/tvix/castore/src/blobservice/chunked_reader.rs
@@ -9,27 +9,145 @@ use tracing::warn;
 use crate::B3Digest;
 use std::{cmp::Ordering, pin::Pin, task::Poll};
 
-use super::BlobService;
+use super::{BlobReader, BlobService};
 
-/// Supports reading a blob in a chunked fashion.
-/// Takes a list of blake3 digest for individual chunks (and their sizes).
-/// It internally keeps:
-/// - a reference to the blob service, used to fetch chunks
-/// - a the list of all chunks (chunk start offset, chunk len, chunk digest)
+pin_project! {
+    /// ChunkedReader provides a chunk-aware [BlobReader], so allows reading and
+    /// seeking into a blob.
+    /// It internally holds a [ChunkedBlob], which is storing chunk information
+    /// able to emit a reader seeked to a specific position whenever we need to seek.
+    pub struct ChunkedReader<BS> {
+        chunked_blob: ChunkedBlob<BS>,
+
+        #[pin]
+        r: Box<dyn AsyncRead + Unpin + Send>,
+
+        pos: u64,
+    }
+}
+
+impl<BS> ChunkedReader<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + 'static + Send,
+{
+    /// Construct a new [ChunkedReader], by retrieving a list of chunks (their
+    /// blake3 digests and chunk sizes)
+    pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
+        let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service);
+        let r = chunked_blob.reader_skipped_offset(0);
+
+        Self {
+            chunked_blob,
+            r,
+            pos: 0,
+        }
+    }
+}
+
+/// ChunkedReader implements BlobReader.
+impl<BS> BlobReader for ChunkedReader<BS> where BS: Send + Clone + 'static + AsRef<dyn BlobService> {}
+
+impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + 'static,
+{
+    fn poll_read(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> std::task::Poll<std::io::Result<()>> {
+        // The amount of data read can be determined by the increase
+        // in the length of the slice returned by `ReadBuf::filled`.
+        let filled_before = buf.filled().len();
+
+        let this = self.project();
+        match this.r.poll_read(cx, buf) {
+            Poll::Ready(a) => {
+                let bytes_read = buf.filled().len() - filled_before;
+                *this.pos += bytes_read as u64;
+
+                Poll::Ready(a)
+            }
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + Send + 'static,
+{
+    fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
+        let total_len = self.chunked_blob.blob_length();
+        let current_pos = self.pos;
+        let this = self.project();
+        let pos: &mut u64 = this.pos;
+        let mut r: Pin<&mut Box<dyn AsyncRead + Send + Unpin>> = this.r;
+
+        let new_position: u64 = match position {
+            std::io::SeekFrom::Start(from_start) => from_start,
+            std::io::SeekFrom::End(from_end) => {
+                // note from_end is i64, not u64, so this is usually negative.
+                total_len.checked_add_signed(from_end).ok_or_else(|| {
+                    std::io::Error::new(
+                        std::io::ErrorKind::InvalidInput,
+                        "over/underflow while seeking",
+                    )
+                })?
+            }
+            std::io::SeekFrom::Current(from_current) => {
+                // note from_end is i64, not u64, so this can be positive or negative.
+                current_pos
+                    .checked_add_signed(from_current)
+                    .ok_or_else(|| {
+                        std::io::Error::new(
+                            std::io::ErrorKind::InvalidInput,
+                            "over/underflow while seeking",
+                        )
+                    })?
+            }
+        };
+
+        // ensure the new position still is inside the file.
+        if new_position > total_len {
+            Err(std::io::Error::new(
+                std::io::ErrorKind::InvalidInput,
+                "seeked beyond EOF",
+            ))?
+        }
+
+        // Update the position and the internal reader.
+        *pos = new_position;
+        *r = this.chunked_blob.reader_skipped_offset(new_position);
+
+        Ok(())
+    }
+
+    fn poll_complete(
+        self: Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<std::io::Result<u64>> {
+        std::task::Poll::Ready(Ok(self.pos))
+    }
+}
+
+/// Holds a list of blake3 digest for individual chunks (and their sizes).
+/// Is able to construct a Reader that seeked to a certain offset, which
+/// is useful to construct a BlobReader (that implements AsyncSeek).
 /// - the current chunk index, and a Custor<Vec<u8>> holding the data of that chunk.
-pub struct ChunkedBlob<BS> {
+struct ChunkedBlob<BS> {
     blob_service: BS,
     chunks: Vec<(u64, u64, B3Digest)>,
 }
 
 impl<BS> ChunkedBlob<BS>
 where
-    BS: AsRef<dyn BlobService> + Clone + 'static,
+    BS: AsRef<dyn BlobService> + Clone + 'static + Send,
 {
-    /// Constructs a new ChunkedBlobReader from a list of blake 3 digests of
-    /// chunks and their sizes.
+    /// Constructs [Self] from a list of blake3 digests of chunks and their
+    /// sizes, and a reference to a blob service.
     /// Initializing it with an empty list is disallowed.
-    pub fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
+    fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
         let mut chunks = Vec::new();
         let mut offset: u64 = 0;
 
@@ -50,7 +168,7 @@ where
     }
 
     /// Returns the length of the blob.
-    pub fn blob_length(&self) -> u64 {
+    fn blob_length(&self) -> u64 {
         self.chunks
             .last()
             .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size)
@@ -78,7 +196,9 @@ where
     /// It internally assembles a stream reading from each chunk (skipping over
     /// chunks containing irrelevant data).
     /// From the first relevant chunk, the irrelevant bytes are skipped too.
-    pub fn reader_skipped_offset(&self, offset: u64) -> Box<dyn AsyncRead + Unpin> {
+    /// The returned boxed thing does not implement AsyncSeek on its own, but
+    /// ChunkedReader does.
+    fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
         if offset == self.blob_length() {
             return Box::new(std::io::Cursor::new(vec![]));
         }
@@ -130,117 +250,6 @@ where
     }
 }
 
-pin_project! {
-    /// Wraps the underlying ChunkedBlob and exposes a AsyncRead and AsyncSeek.
-    pub struct ChunkedReader<BS> {
-        chunked_blob: ChunkedBlob<BS>,
-
-        #[pin]
-        r: Box<dyn AsyncRead + Unpin>,
-
-        pos: u64,
-    }
-}
-
-impl<BS> ChunkedReader<BS>
-where
-    BS: AsRef<dyn BlobService> + Clone + 'static,
-{
-    pub fn from_chunked_blob(chunked_blob: ChunkedBlob<BS>) -> Self {
-        let r = chunked_blob.reader_skipped_offset(0);
-
-        Self {
-            chunked_blob,
-            r,
-            pos: 0,
-        }
-    }
-}
-
-impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
-where
-    BS: AsRef<dyn BlobService> + Clone + 'static,
-{
-    fn poll_read(
-        self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &mut tokio::io::ReadBuf<'_>,
-    ) -> std::task::Poll<std::io::Result<()>> {
-        // The amount of data read can be determined by the increase
-        // in the length of the slice returned by `ReadBuf::filled`.
-        let filled_before = buf.filled().len();
-
-        let this = self.project();
-        match this.r.poll_read(cx, buf) {
-            Poll::Ready(a) => {
-                let bytes_read = buf.filled().len() - filled_before;
-                *this.pos += bytes_read as u64;
-
-                Poll::Ready(a)
-            }
-            Poll::Pending => Poll::Pending,
-        }
-    }
-}
-
-impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
-where
-    BS: AsRef<dyn BlobService> + Clone + 'static,
-{
-    fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
-        let total_len = self.chunked_blob.blob_length();
-        let current_pos = self.pos;
-        let this = self.project();
-        let pos: &mut u64 = this.pos;
-        let mut r: Pin<&mut Box<dyn AsyncRead + Unpin>> = this.r;
-
-        let new_position: u64 = match position {
-            std::io::SeekFrom::Start(from_start) => from_start,
-            std::io::SeekFrom::End(from_end) => {
-                // note from_end is i64, not u64, so this is usually negative.
-                total_len.checked_add_signed(from_end).ok_or_else(|| {
-                    std::io::Error::new(
-                        std::io::ErrorKind::InvalidInput,
-                        "over/underflow while seeking",
-                    )
-                })?
-            }
-            std::io::SeekFrom::Current(from_current) => {
-                // note from_end is i64, not u64, so this can be positive or negative.
-                current_pos
-                    .checked_add_signed(from_current)
-                    .ok_or_else(|| {
-                        std::io::Error::new(
-                            std::io::ErrorKind::InvalidInput,
-                            "over/underflow while seeking",
-                        )
-                    })?
-            }
-        };
-
-        // ensure the new position still is inside the file.
-        if new_position > total_len {
-            Err(std::io::Error::new(
-                std::io::ErrorKind::InvalidInput,
-                "seeked beyond EOF",
-            ))?
-        }
-
-        // Update the position and the internal reader.
-        *pos = new_position;
-        *r = this.chunked_blob.reader_skipped_offset(new_position);
-
-        Ok(())
-    }
-
-    fn poll_complete(
-        self: Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<std::io::Result<u64>> {
-        std::task::Poll::Ready(Ok(self.pos))
-    }
-}
-
 #[cfg(test)]
 mod test {
     use std::{io::SeekFrom, sync::Arc};
@@ -370,8 +379,8 @@ mod test {
     #[tokio::test]
     async fn test_read() {
         let blob_service = gen_blobservice_blob1().await;
-        let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service);
-        let mut chunked_reader = ChunkedReader::from_chunked_blob(cb);
+        let mut chunked_reader =
+            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
 
         // read all data
         let mut buf = Vec::new();
@@ -389,8 +398,8 @@ mod test {
     #[tokio::test]
     async fn test_seek() {
         let blob_service = gen_blobservice_blob1().await;
-        let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service);
-        let mut chunked_reader = ChunkedReader::from_chunked_blob(cb);
+        let mut chunked_reader =
+            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
 
         // seek to the end
         // expect to read 0 bytes
@@ -456,9 +465,8 @@ mod test {
             bw.close().await.expect("close blobwriter");
         }
 
-        let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service);
-
-        let mut chunked_reader = ChunkedReader::from_chunked_blob(cb);
+        let mut chunked_reader =
+            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
 
         // read a bit from the front (5 bytes out of 6 available)
         let mut buf = [0b0; 5];