diff options
-rw-r--r-- | tvix/castore/src/blobservice/chunked_reader.rs | 487 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/mod.rs | 2 |
2 files changed, 489 insertions, 0 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs new file mode 100644 index 000000000000..f3c73948f809 --- /dev/null +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -0,0 +1,487 @@ +use data_encoding::BASE64; +use futures::TryStreamExt; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncSeekExt}; +use tokio_stream::StreamExt; +use tokio_util::io::{ReaderStream, StreamReader}; +use tracing::warn; + +use crate::B3Digest; +use std::{cmp::Ordering, pin::Pin, task::Poll}; + +use super::BlobService; + +/// Supports reading a blob in a chunked fashion. +/// Takes a list of blake3 digest for individual chunks (and their sizes). +/// It internally keeps: +/// - a reference to the blob service, used to fetch chunks +/// - a the list of all chunks (chunk start offset, chunk len, chunk digest) +/// - the current chunk index, and a Custor<Vec<u8>> holding the data of that chunk. +pub struct ChunkedBlob<BS> { + blob_service: BS, + chunks: Vec<(u64, u64, B3Digest)>, +} + +impl<BS> ChunkedBlob<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static, +{ + /// Constructs a new ChunkedBlobReader from a list of blake 3 digests of + /// chunks and their sizes. + /// Initializing it with an empty list is disallowed. + pub fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self { + let mut chunks = Vec::new(); + let mut offset: u64 = 0; + + for (chunk_digest, chunk_size) in chunks_it { + chunks.push((offset, chunk_size, chunk_digest)); + offset += chunk_size; + } + + assert!( + !chunks.is_empty(), + "Chunks must be provided, don't use this for blobs without chunks" + ); + + Self { + blob_service, + chunks, + } + } + + /// Returns the length of the blob. + pub fn blob_length(&self) -> u64 { + self.chunks + .last() + .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size) + .unwrap_or(0) + } + + /// For a given position pos, return the chunk containing the data. + /// In case this would range outside the blob, None is returned. + fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> { + // FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet + self.chunks + .binary_search_by(|(chunk_start_pos, chunk_size, _)| { + if chunk_start_pos + chunk_size <= pos { + Ordering::Less + } else if *chunk_start_pos > pos { + Ordering::Greater + } else { + Ordering::Equal + } + }) + .ok() + } + + /// Returns a stream of bytes of the data in that blob. + /// It internally assembles a stream reading from each chunk (skipping over + /// chunks containing irrelevant data). + /// From the first relevant chunk, the irrelevant bytes are skipped too. + pub fn reader_skipped_offset(&self, offset: u64) -> Box<dyn AsyncRead + Unpin> { + if offset == self.blob_length() { + return Box::new(std::io::Cursor::new(vec![])); + } + // construct a stream of all chunks starting with the given offset + let start_chunk_idx = self + .get_chunk_idx_for_position(offset) + .expect("outside of blob"); + // It's ok to panic here, we can only reach this by seeking, and seeking should already reject out-of-file seeking. + + let skip_first_chunk_bytes = (offset - self.chunks[start_chunk_idx].0) as usize; + + let blob_service = self.blob_service.clone(); + let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec(); + let readers_stream = tokio_stream::iter(chunks).map( + move |(_chunk_start_offset, _chunk_size, chunk_digest)| { + let chunk_digest = chunk_digest.to_owned(); + let blob_service = blob_service.clone(); + async move { + let mut blob_reader = blob_service + .as_ref() + .open_read(&chunk_digest.to_owned()) + .await? + .ok_or_else(|| { + warn!( + chunk.digest = BASE64.encode(chunk_digest.as_slice()), + "chunk not found" + ); + std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found") + })?; + + if skip_first_chunk_bytes > 0 { + blob_reader + .seek(std::io::SeekFrom::Start(skip_first_chunk_bytes as u64)) + .await?; + } + Ok::<_, std::io::Error>(blob_reader) + } + }, + ); + + // convert the stream of readers to a stream of streams of byte chunks + let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) }); + + // flatten into one stream of byte chunks + let bytes_stream = bytes_streams.try_flatten(); + + // convert into AsyncRead + Box::new(StreamReader::new(Box::pin(bytes_stream))) + } +} + +pin_project! { + /// Wraps the underlying ChunkedBlob and exposes a AsyncRead and AsyncSeek. + pub struct ChunkedReader<BS> { + chunked_blob: ChunkedBlob<BS>, + + #[pin] + r: Box<dyn AsyncRead + Unpin>, + + pos: u64, + } +} + +impl<BS> ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static, +{ + pub fn from_chunked_blob(chunked_blob: ChunkedBlob<BS>) -> Self { + let r = chunked_blob.reader_skipped_offset(0); + + Self { + chunked_blob, + r, + pos: 0, + } + } +} + +impl<BS> tokio::io::AsyncRead for ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + // The amount of data read can be determined by the increase + // in the length of the slice returned by `ReadBuf::filled`. + let filled_before = buf.filled().len(); + + let this = self.project(); + match this.r.poll_read(cx, buf) { + Poll::Ready(a) => { + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; + + Poll::Ready(a) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS> +where + BS: AsRef<dyn BlobService> + Clone + 'static, +{ + fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { + let total_len = self.chunked_blob.blob_length(); + let current_pos = self.pos; + let this = self.project(); + let pos: &mut u64 = this.pos; + let mut r: Pin<&mut Box<dyn AsyncRead + Unpin>> = this.r; + + let new_position: u64 = match position { + std::io::SeekFrom::Start(from_start) => from_start, + std::io::SeekFrom::End(from_end) => { + // note from_end is i64, not u64, so this is usually negative. + total_len.checked_add_signed(from_end).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "over/underflow while seeking", + ) + })? + } + std::io::SeekFrom::Current(from_current) => { + // note from_end is i64, not u64, so this can be positive or negative. + current_pos + .checked_add_signed(from_current) + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "over/underflow while seeking", + ) + })? + } + }; + + // ensure the new position still is inside the file. + if new_position > total_len { + Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "seeked beyond EOF", + ))? + } + + // Update the position and the internal reader. + *pos = new_position; + *r = this.chunked_blob.reader_skipped_offset(new_position); + + Ok(()) + } + + fn poll_complete( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<std::io::Result<u64>> { + std::task::Poll::Ready(Ok(self.pos)) + } +} + +#[cfg(test)] +mod test { + use std::{io::SeekFrom, sync::Arc}; + + use crate::{ + blobservice::{chunked_reader::ChunkedReader, BlobService, MemoryBlobService}, + B3Digest, + }; + use hex_literal::hex; + use lazy_static::lazy_static; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + const CHUNK_1: [u8; 2] = hex!("0001"); + const CHUNK_2: [u8; 4] = hex!("02030405"); + const CHUNK_3: [u8; 1] = hex!("06"); + const CHUNK_4: [u8; 2] = hex!("0708"); + const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f"); + + lazy_static! { + // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]` + pub static ref CHUNK_1_DIGEST: B3Digest = blake3::hash(&CHUNK_1).as_bytes().into(); + pub static ref CHUNK_2_DIGEST: B3Digest = blake3::hash(&CHUNK_2).as_bytes().into(); + pub static ref CHUNK_3_DIGEST: B3Digest = blake3::hash(&CHUNK_3).as_bytes().into(); + pub static ref CHUNK_4_DIGEST: B3Digest = blake3::hash(&CHUNK_4).as_bytes().into(); + pub static ref CHUNK_5_DIGEST: B3Digest = blake3::hash(&CHUNK_5).as_bytes().into(); + pub static ref BLOB_1_LIST: [(B3Digest, u64); 5] = [ + (CHUNK_1_DIGEST.clone(), 2), + (CHUNK_2_DIGEST.clone(), 4), + (CHUNK_3_DIGEST.clone(), 1), + (CHUNK_4_DIGEST.clone(), 2), + (CHUNK_5_DIGEST.clone(), 7), + ]; + } + + use super::ChunkedBlob; + + /// ensure the start offsets are properly calculated. + #[test] + fn from_iter() { + let cb = ChunkedBlob::from_iter( + BLOB_1_LIST.clone().into_iter(), + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>, + ); + + assert_eq!( + cb.chunks, + Vec::from_iter([ + (0, 2, CHUNK_1_DIGEST.clone()), + (2, 4, CHUNK_2_DIGEST.clone()), + (6, 1, CHUNK_3_DIGEST.clone()), + (7, 2, CHUNK_4_DIGEST.clone()), + (9, 7, CHUNK_5_DIGEST.clone()), + ]) + ); + } + + /// ensure ChunkedBlob can't be used with an empty list of chunks + #[test] + #[should_panic] + fn from_iter_empty() { + ChunkedBlob::from_iter( + [].into_iter(), + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>, + ); + } + + /// ensure the right chunk is selected + #[test] + fn chunk_idx_for_position() { + let cb = ChunkedBlob::from_iter( + BLOB_1_LIST.clone().into_iter(), + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>, + ); + + assert_eq!(Some(0), cb.get_chunk_idx_for_position(0), "start of blob"); + + assert_eq!( + Some(0), + cb.get_chunk_idx_for_position(1), + "middle of first chunk" + ); + assert_eq!( + Some(1), + cb.get_chunk_idx_for_position(2), + "beginning of second chunk" + ); + + assert_eq!( + Some(4), + cb.get_chunk_idx_for_position(15), + "right before the end of the blob" + ); + assert_eq!( + None, + cb.get_chunk_idx_for_position(16), + "right outside the blob" + ); + assert_eq!( + None, + cb.get_chunk_idx_for_position(100), + "way outside the blob" + ); + } + + /// returns a blobservice with all chunks in BLOB_1 present. + async fn gen_blobservice_blob1() -> Arc<dyn BlobService> { + let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>; + + // seed blob service with all chunks + for blob_contents in [ + CHUNK_1.to_vec(), + CHUNK_2.to_vec(), + CHUNK_3.to_vec(), + CHUNK_4.to_vec(), + CHUNK_5.to_vec(), + ] { + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw) + .await + .expect("writing blob"); + bw.close().await.expect("close blobwriter"); + } + + blob_service + } + + #[tokio::test] + async fn test_read() { + let blob_service = gen_blobservice_blob1().await; + let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service); + let mut chunked_reader = ChunkedReader::from_chunked_blob(cb); + + // read all data + let mut buf = Vec::new(); + tokio::io::copy(&mut chunked_reader, &mut buf) + .await + .expect("copy"); + + assert_eq!( + hex!("000102030405060708090a0b0c0d0e0f").to_vec(), + buf, + "read data must match" + ); + } + + #[tokio::test] + async fn test_seek() { + let blob_service = gen_blobservice_blob1().await; + let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service); + let mut chunked_reader = ChunkedReader::from_chunked_blob(cb); + + // seek to the end + // expect to read 0 bytes + { + chunked_reader + .seek(SeekFrom::End(0)) + .await + .expect("seek to end"); + + let mut buf = Vec::new(); + chunked_reader + .read_to_end(&mut buf) + .await + .expect("read to end"); + + assert_eq!(hex!("").to_vec(), buf); + } + + // seek one bytes before the end + { + chunked_reader.seek(SeekFrom::End(-1)).await.expect("seek"); + + let mut buf = Vec::new(); + chunked_reader + .read_to_end(&mut buf) + .await + .expect("read to end"); + + assert_eq!(hex!("0f").to_vec(), buf); + } + + // seek back three bytes, but using relative positioning + // read two bytes + { + chunked_reader + .seek(SeekFrom::Current(-3)) + .await + .expect("seek"); + + let mut buf = [0b0; 2]; + chunked_reader + .read_exact(&mut buf) + .await + .expect("read exact"); + + assert_eq!(hex!("0d0e"), buf); + } + } + + // seeds a blob service with only the first two chunks, reads a bit in the + // front (which succeeds), but then tries to seek past and read more (which + // should fail). + #[tokio::test] + async fn test_read_missing_chunks() { + let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>; + + for blob_contents in [CHUNK_1.to_vec(), CHUNK_2.to_vec()] { + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw) + .await + .expect("writing blob"); + + bw.close().await.expect("close blobwriter"); + } + + let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service); + + let mut chunked_reader = ChunkedReader::from_chunked_blob(cb); + + // read a bit from the front (5 bytes out of 6 available) + let mut buf = [0b0; 5]; + chunked_reader + .read_exact(&mut buf) + .await + .expect("read exact"); + + assert_eq!(hex!("0001020304"), buf); + + // seek 2 bytes forward, into an area where we don't have chunks + chunked_reader + .seek(SeekFrom::Current(2)) + .await + .expect("seek"); + + let mut buf = Vec::new(); + chunked_reader + .read_to_end(&mut buf) + .await + .expect_err("must fail"); + + // FUTUREWORK: check semantics on errorkinds. Should this be InvalidData + // or NotFound? + } +} diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index fa6b87926ba1..eeb8b9649c1a 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -4,6 +4,7 @@ use tonic::async_trait; use crate::proto::stat_blob_response::ChunkMeta; use crate::B3Digest; +mod chunked_reader; mod combinator; mod from_addr; mod grpc; @@ -15,6 +16,7 @@ mod sled; #[cfg(test)] mod tests; +pub use self::chunked_reader::ChunkedReader; pub use self::combinator::CombinedBlobService; pub use self::from_addr::from_addr; pub use self::grpc::GRPCBlobService; |