diff options
-rw-r--r-- | tvix/castore/src/blobservice/chunked_reader.rs | 270 |
1 files changed, 139 insertions, 131 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs index f3c73948f809..6ca0d2003226 100644 --- a/tvix/castore/src/blobservice/chunked_reader.rs +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -9,27 +9,145 @@ use tracing::warn; use crate::B3Digest; use std::{cmp::Ordering, pin::Pin, task::Poll}; -use super::BlobService; +use super::{BlobReader, BlobService}; -/// Supports reading a blob in a chunked fashion. -/// Takes a list of blake3 digest for individual chunks (and their sizes). -/// It internally keeps: -/// - a reference to the blob service, used to fetch chunks -/// - a the list of all chunks (chunk start offset, chunk len, chunk digest) +pin_project! { + /// ChunkedReader provides a chunk-aware [BlobReader], so allows reading and + /// seeking into a blob. + /// It internally holds a [ChunkedBlob], which is storing chunk information + /// able to emit a reader seeked to a specific position whenever we need to seek. + pub struct ChunkedReader<BS> { + chunked_blob: ChunkedBlob<BS>, + + #[pin] + r: Box<dyn AsyncRead + Unpin + Send>, + + pos: u64, + } +} + +impl<BS> ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static + Send, +{ + /// Construct a new [ChunkedReader], by retrieving a list of chunks (their + /// blake3 digests and chunk sizes) + pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self { + let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service); + let r = chunked_blob.reader_skipped_offset(0); + + Self { + chunked_blob, + r, + pos: 0, + } + } +} + +/// ChunkedReader implements BlobReader. +impl<BS> BlobReader for ChunkedReader<BS> where BS: Send + Clone + 'static + AsRef<dyn BlobService> {} + +impl<BS> tokio::io::AsyncRead for ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + // The amount of data read can be determined by the increase + // in the length of the slice returned by `ReadBuf::filled`. + let filled_before = buf.filled().len(); + + let this = self.project(); + match this.r.poll_read(cx, buf) { + Poll::Ready(a) => { + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; + + Poll::Ready(a) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + Send + 'static, +{ + fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { + let total_len = self.chunked_blob.blob_length(); + let current_pos = self.pos; + let this = self.project(); + let pos: &mut u64 = this.pos; + let mut r: Pin<&mut Box<dyn AsyncRead + Send + Unpin>> = this.r; + + let new_position: u64 = match position { + std::io::SeekFrom::Start(from_start) => from_start, + std::io::SeekFrom::End(from_end) => { + // note from_end is i64, not u64, so this is usually negative. + total_len.checked_add_signed(from_end).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "over/underflow while seeking", + ) + })? + } + std::io::SeekFrom::Current(from_current) => { + // note from_end is i64, not u64, so this can be positive or negative. + current_pos + .checked_add_signed(from_current) + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "over/underflow while seeking", + ) + })? + } + }; + + // ensure the new position still is inside the file. + if new_position > total_len { + Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seeked beyond EOF", + ))? + } + + // Update the position and the internal reader. + *pos = new_position; + *r = this.chunked_blob.reader_skipped_offset(new_position); + + Ok(()) + } + + fn poll_complete( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<std::io::Result<u64>> { + std::task::Poll::Ready(Ok(self.pos)) + } +} + +/// Holds a list of blake3 digest for individual chunks (and their sizes). +/// Is able to construct a Reader that seeked to a certain offset, which +/// is useful to construct a BlobReader (that implements AsyncSeek). /// - the current chunk index, and a Custor<Vec<u8>> holding the data of that chunk. -pub struct ChunkedBlob<BS> { +struct ChunkedBlob<BS> { blob_service: BS, chunks: Vec<(u64, u64, B3Digest)>, } impl<BS> ChunkedBlob<BS> where - BS: AsRef<dyn BlobService> + Clone + 'static, + BS: AsRef<dyn BlobService> + Clone + 'static + Send, { - /// Constructs a new ChunkedBlobReader from a list of blake 3 digests of - /// chunks and their sizes. + /// Constructs [Self] from a list of blake3 digests of chunks and their + /// sizes, and a reference to a blob service. /// Initializing it with an empty list is disallowed. - pub fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self { + fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self { let mut chunks = Vec::new(); let mut offset: u64 = 0; @@ -50,7 +168,7 @@ where } /// Returns the length of the blob. - pub fn blob_length(&self) -> u64 { + fn blob_length(&self) -> u64 { self.chunks .last() .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size) @@ -78,7 +196,9 @@ where /// It internally assembles a stream reading from each chunk (skipping over /// chunks containing irrelevant data). /// From the first relevant chunk, the irrelevant bytes are skipped too. - pub fn reader_skipped_offset(&self, offset: u64) -> Box<dyn AsyncRead + Unpin> { + /// The returned boxed thing does not implement AsyncSeek on its own, but + /// ChunkedReader does. + fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> { if offset == self.blob_length() { return Box::new(std::io::Cursor::new(vec![])); } @@ -130,117 +250,6 @@ where } } -pin_project! { - /// Wraps the underlying ChunkedBlob and exposes a AsyncRead and AsyncSeek. - pub struct ChunkedReader<BS> { - chunked_blob: ChunkedBlob<BS>, - - #[pin] - r: Box<dyn AsyncRead + Unpin>, - - pos: u64, - } -} - -impl<BS> ChunkedReader<BS> -where - BS: AsRef<dyn BlobService> + Clone + 'static, -{ - pub fn from_chunked_blob(chunked_blob: ChunkedBlob<BS>) -> Self { - let r = chunked_blob.reader_skipped_offset(0); - - Self { - chunked_blob, - r, - pos: 0, - } - } -} - -impl<BS> tokio::io::AsyncRead for ChunkedReader<BS> -where - BS: AsRef<dyn BlobService> + Clone + 'static, -{ - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll<std::io::Result<()>> { - // The amount of data read can be determined by the increase - // in the length of the slice returned by `ReadBuf::filled`. - let filled_before = buf.filled().len(); - - let this = self.project(); - match this.r.poll_read(cx, buf) { - Poll::Ready(a) => { - let bytes_read = buf.filled().len() - filled_before; - *this.pos += bytes_read as u64; - - Poll::Ready(a) - } - Poll::Pending => Poll::Pending, - } - } -} - -impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS> -where - BS: AsRef<dyn BlobService> + Clone + 'static, -{ - fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { - let total_len = self.chunked_blob.blob_length(); - let current_pos = self.pos; - let this = self.project(); - let pos: &mut u64 = this.pos; - let mut r: Pin<&mut Box<dyn AsyncRead + Unpin>> = this.r; - - let new_position: u64 = match position { - std::io::SeekFrom::Start(from_start) => from_start, - std::io::SeekFrom::End(from_end) => { - // note from_end is i64, not u64, so this is usually negative. - total_len.checked_add_signed(from_end).ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "over/underflow while seeking", - ) - })? - } - std::io::SeekFrom::Current(from_current) => { - // note from_end is i64, not u64, so this can be positive or negative. - current_pos - .checked_add_signed(from_current) - .ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "over/underflow while seeking", - ) - })? - } - }; - - // ensure the new position still is inside the file. - if new_position > total_len { - Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "seeked beyond EOF", - ))? - } - - // Update the position and the internal reader. - *pos = new_position; - *r = this.chunked_blob.reader_skipped_offset(new_position); - - Ok(()) - } - - fn poll_complete( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<std::io::Result<u64>> { - std::task::Poll::Ready(Ok(self.pos)) - } -} - #[cfg(test)] mod test { use std::{io::SeekFrom, sync::Arc}; @@ -370,8 +379,8 @@ mod test { #[tokio::test] async fn test_read() { let blob_service = gen_blobservice_blob1().await; - let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service); - let mut chunked_reader = ChunkedReader::from_chunked_blob(cb); + let mut chunked_reader = + ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service); // read all data let mut buf = Vec::new(); @@ -389,8 +398,8 @@ mod test { #[tokio::test] async fn test_seek() { let blob_service = gen_blobservice_blob1().await; - let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service); - let mut chunked_reader = ChunkedReader::from_chunked_blob(cb); + let mut chunked_reader = + ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service); // seek to the end // expect to read 0 bytes @@ -456,9 +465,8 @@ mod test { bw.close().await.expect("close blobwriter"); } - let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service); - - let mut chunked_reader = ChunkedReader::from_chunked_blob(cb); + let mut chunked_reader = + ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service); // read a bit from the front (5 bytes out of 6 available) let mut buf = [0b0; 5]; |