diff options
author | Florian Klink <flokli@flokli.de> | 2024-03-02T16·00+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2024-03-03T11·22+0000 |
commit | 7bebf492ec7742b8a24c695d77fa64fdf654ab19 (patch) | |
tree | 059b9651dd54ae0eb8e57f9db5d7e82baf199de3 /tvix | |
parent | 1608f935aad2696f38231ef779bffc1f5ac31fec (diff) |
refactor(tvix/castore/blobsvc/chunked_reader): refactor, document r/7631
The public-consumable thing here is ChunkedReader, not ChunkedBlob. ChunkedBlob is a helper that can be used to get a new AsyncRead, but not AsyncSeek. It is used internally by ChunkedReader whenever the client seeks. Make this more obvious, by extending the documentation, and putting ChunkedReader at the top of this file. Also make ChunkedBlob and its methods private, and give ChunkedReader a more useful constructor (from_chunks, instead of from_chunked_blob). Change-Id: I2399867591df923faa73927b924e7c116ad98dc0 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11079 Tested-by: BuildkiteCI Reviewed-by: Brian Olsen <me@griff.name> Reviewed-by: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/castore/src/blobservice/chunked_reader.rs | 270 |
1 files changed, 139 insertions, 131 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs index f3c73948f809..6ca0d2003226 100644 --- a/tvix/castore/src/blobservice/chunked_reader.rs +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -9,27 +9,145 @@ use tracing::warn; use crate::B3Digest; use std::{cmp::Ordering, pin::Pin, task::Poll}; -use super::BlobService; +use super::{BlobReader, BlobService}; -/// Supports reading a blob in a chunked fashion. -/// Takes a list of blake3 digest for individual chunks (and their sizes). -/// It internally keeps: -/// - a reference to the blob service, used to fetch chunks -/// - a the list of all chunks (chunk start offset, chunk len, chunk digest) +pin_project! { + /// ChunkedReader provides a chunk-aware [BlobReader], so allows reading and + /// seeking into a blob. + /// It internally holds a [ChunkedBlob], which is storing chunk information + /// able to emit a reader seeked to a specific position whenever we need to seek. + pub struct ChunkedReader<BS> { + chunked_blob: ChunkedBlob<BS>, + + #[pin] + r: Box<dyn AsyncRead + Unpin + Send>, + + pos: u64, + } +} + +impl<BS> ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static + Send, +{ + /// Construct a new [ChunkedReader], by retrieving a list of chunks (their + /// blake3 digests and chunk sizes) + pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self { + let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service); + let r = chunked_blob.reader_skipped_offset(0); + + Self { + chunked_blob, + r, + pos: 0, + } + } +} + +/// ChunkedReader implements BlobReader. +impl<BS> BlobReader for ChunkedReader<BS> where BS: Send + Clone + 'static + AsRef<dyn BlobService> {} + +impl<BS> tokio::io::AsyncRead for ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + // The amount of data read can be determined by the increase + // in the length of the slice returned by `ReadBuf::filled`. + let filled_before = buf.filled().len(); + + let this = self.project(); + match this.r.poll_read(cx, buf) { + Poll::Ready(a) => { + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; + + Poll::Ready(a) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + Send + 'static, +{ + fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { + let total_len = self.chunked_blob.blob_length(); + let current_pos = self.pos; + let this = self.project(); + let pos: &mut u64 = this.pos; + let mut r: Pin<&mut Box<dyn AsyncRead + Send + Unpin>> = this.r; + + let new_position: u64 = match position { + std::io::SeekFrom::Start(from_start) => from_start, + std::io::SeekFrom::End(from_end) => { + // note from_end is i64, not u64, so this is usually negative. + total_len.checked_add_signed(from_end).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "over/underflow while seeking", + ) + })? + } + std::io::SeekFrom::Current(from_current) => { + // note from_end is i64, not u64, so this can be positive or negative. + current_pos + .checked_add_signed(from_current) + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "over/underflow while seeking", + ) + })? + } + }; + + // ensure the new position still is inside the file. + if new_position > total_len { + Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seeked beyond EOF", + ))? + } + + // Update the position and the internal reader. + *pos = new_position; + *r = this.chunked_blob.reader_skipped_offset(new_position); + + Ok(()) + } + + fn poll_complete( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<std::io::Result<u64>> { + std::task::Poll::Ready(Ok(self.pos)) + } +} + +/// Holds a list of blake3 digest for individual chunks (and their sizes). +/// Is able to construct a Reader that seeked to a certain offset, which +/// is useful to construct a BlobReader (that implements AsyncSeek). /// - the current chunk index, and a Custor<Vec<u8>> holding the data of that chunk. -pub struct ChunkedBlob<BS> { +struct ChunkedBlob<BS> { blob_service: BS, chunks: Vec<(u64, u64, B3Digest)>, } impl<BS> ChunkedBlob<BS> where - BS: AsRef<dyn BlobService> + Clone + 'static, + BS: AsRef<dyn BlobService> + Clone + 'static + Send, { - /// Constructs a new ChunkedBlobReader from a list of blake 3 digests of - /// chunks and their sizes. + /// Constructs [Self] from a list of blake3 digests of chunks and their + /// sizes, and a reference to a blob service. /// Initializing it with an empty list is disallowed. - pub fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self { + fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self { let mut chunks = Vec::new(); let mut offset: u64 = 0; @@ -50,7 +168,7 @@ where } /// Returns the length of the blob. - pub fn blob_length(&self) -> u64 { + fn blob_length(&self) -> u64 { self.chunks .last() .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size) @@ -78,7 +196,9 @@ where /// It internally assembles a stream reading from each chunk (skipping over /// chunks containing irrelevant data). /// From the first relevant chunk, the irrelevant bytes are skipped too. - pub fn reader_skipped_offset(&self, offset: u64) -> Box<dyn AsyncRead + Unpin> { + /// The returned boxed thing does not implement AsyncSeek on its own, but + /// ChunkedReader does. + fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> { if offset == self.blob_length() { return Box::new(std::io::Cursor::new(vec![])); } @@ -130,117 +250,6 @@ where } } -pin_project! { - /// Wraps the underlying ChunkedBlob and exposes a AsyncRead and AsyncSeek. - pub struct ChunkedReader<BS> { - chunked_blob: ChunkedBlob<BS>, - - #[pin] - r: Box<dyn AsyncRead + Unpin>, - - pos: u64, - } -} - -impl<BS> ChunkedReader<BS> -where - BS: AsRef<dyn BlobService> + Clone + 'static, -{ - pub fn from_chunked_blob(chunked_blob: ChunkedBlob<BS>) -> Self { - let r = chunked_blob.reader_skipped_offset(0); - - Self { - chunked_blob, - r, - pos: 0, - } - } -} - -impl<BS> tokio::io::AsyncRead for ChunkedReader<BS> -where - BS: AsRef<dyn BlobService> + Clone + 'static, -{ - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll<std::io::Result<()>> { - // The amount of data read can be determined by the increase - // in the length of the slice returned by `ReadBuf::filled`. - let filled_before = buf.filled().len(); - - let this = self.project(); - match this.r.poll_read(cx, buf) { - Poll::Ready(a) => { - let bytes_read = buf.filled().len() - filled_before; - *this.pos += bytes_read as u64; - - Poll::Ready(a) - } - Poll::Pending => Poll::Pending, - } - } -} - -impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS> -where - BS: AsRef<dyn BlobService> + Clone + 'static, -{ - fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { - let total_len = self.chunked_blob.blob_length(); - let current_pos = self.pos; - let this = self.project(); - let pos: &mut u64 = this.pos; - let mut r: Pin<&mut Box<dyn AsyncRead + Unpin>> = this.r; - - let new_position: u64 = match position { - std::io::SeekFrom::Start(from_start) => from_start, - std::io::SeekFrom::End(from_end) => { - // note from_end is i64, not u64, so this is usually negative. - total_len.checked_add_signed(from_end).ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "over/underflow while seeking", - ) - })? - } - std::io::SeekFrom::Current(from_current) => { - // note from_end is i64, not u64, so this can be positive or negative. - current_pos - .checked_add_signed(from_current) - .ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "over/underflow while seeking", - ) - })? - } - }; - - // ensure the new position still is inside the file. - if new_position > total_len { - Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "seeked beyond EOF", - ))? - } - - // Update the position and the internal reader. - *pos = new_position; - *r = this.chunked_blob.reader_skipped_offset(new_position); - - Ok(()) - } - - fn poll_complete( - self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<std::io::Result<u64>> { - std::task::Poll::Ready(Ok(self.pos)) - } -} - #[cfg(test)] mod test { use std::{io::SeekFrom, sync::Arc}; @@ -370,8 +379,8 @@ mod test { #[tokio::test] async fn test_read() { let blob_service = gen_blobservice_blob1().await; - let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service); - let mut chunked_reader = ChunkedReader::from_chunked_blob(cb); + let mut chunked_reader = + ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service); // read all data let mut buf = Vec::new(); @@ -389,8 +398,8 @@ mod test { #[tokio::test] async fn test_seek() { let blob_service = gen_blobservice_blob1().await; - let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service); - let mut chunked_reader = ChunkedReader::from_chunked_blob(cb); + let mut chunked_reader = + ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service); // seek to the end // expect to read 0 bytes @@ -456,9 +465,8 @@ mod test { bw.close().await.expect("close blobwriter"); } - let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service); - - let mut chunked_reader = ChunkedReader::from_chunked_blob(cb); + let mut chunked_reader = + ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service); // read a bit from the front (5 bytes out of 6 available) let mut buf = [0b0; 5]; |