use data_encoding::BASE64;
use futures::TryStreamExt;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncSeekExt};
use tokio_stream::StreamExt;
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
use crate::B3Digest;
use std::{cmp::Ordering, pin::Pin, task::Poll};
use super::BlobService;
/// Supports reading a blob in a chunked fashion.
/// Takes a list of blake3 digest for individual chunks (and their sizes).
/// It internally keeps:
/// - a reference to the blob service, used to fetch chunks
/// - a the list of all chunks (chunk start offset, chunk len, chunk digest)
/// - the current chunk index, and a Custor<Vec<u8>> holding the data of that chunk.
pub struct ChunkedBlob<BS> {
blob_service: BS,
chunks: Vec<(u64, u64, B3Digest)>,
}
impl<BS> ChunkedBlob<BS>
where
BS: AsRef<dyn BlobService> + Clone + 'static,
{
/// Constructs a new ChunkedBlobReader from a list of blake 3 digests of
/// chunks and their sizes.
/// Initializing it with an empty list is disallowed.
pub fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
let mut chunks = Vec::new();
let mut offset: u64 = 0;
for (chunk_digest, chunk_size) in chunks_it {
chunks.push((offset, chunk_size, chunk_digest));
offset += chunk_size;
}
assert!(
!chunks.is_empty(),
"Chunks must be provided, don't use this for blobs without chunks"
);
Self {
blob_service,
chunks,
}
}
/// Returns the length of the blob.
pub fn blob_length(&self) -> u64 {
self.chunks
.last()
.map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size)
.unwrap_or(0)
}
/// For a given position pos, return the chunk containing the data.
/// In case this would range outside the blob, None is returned.
fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> {
// FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet
self.chunks
.binary_search_by(|(chunk_start_pos, chunk_size, _)| {
if chunk_start_pos + chunk_size <= pos {
Ordering::Less
} else if *chunk_start_pos > pos {
Ordering::Greater
} else {
Ordering::Equal
}
})
.ok()
}
/// Returns a stream of bytes of the data in that blob.
/// It internally assembles a stream reading from each chunk (skipping over
/// chunks containing irrelevant data).
/// From the first relevant chunk, the irrelevant bytes are skipped too.
pub fn reader_skipped_offset(&self, offset: u64) -> Box<dyn AsyncRead + Unpin> {
if offset == self.blob_length() {
return Box::new(std::io::Cursor::new(vec![]));
}
// construct a stream of all chunks starting with the given offset
let start_chunk_idx = self
.get_chunk_idx_for_position(offset)
.expect("outside of blob");
// It's ok to panic here, we can only reach this by seeking, and seeking should already reject out-of-file seeking.
let skip_first_chunk_bytes = (offset - self.chunks[start_chunk_idx].0) as usize;
let blob_service = self.blob_service.clone();
let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec();
let readers_stream = tokio_stream::iter(chunks).map(
move |(_chunk_start_offset, _chunk_size, chunk_digest)| {
let chunk_digest = chunk_digest.to_owned();
let blob_service = blob_service.clone();
async move {
let mut blob_reader = blob_service
.as_ref()
.open_read(&chunk_digest.to_owned())
.await?
.ok_or_else(|| {
warn!(
chunk.digest = BASE64.encode(chunk_digest.as_slice()),
"chunk not found"
);
std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found")
})?;
if skip_first_chunk_bytes > 0 {
blob_reader
.seek(std::io::SeekFrom::Start(skip_first_chunk_bytes as u64))
.await?;
}
Ok::<_, std::io::Error>(blob_reader)
}
},
);
// convert the stream of readers to a stream of streams of byte chunks
let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) });
// flatten into one stream of byte chunks
let bytes_stream = bytes_streams.try_flatten();
// convert into AsyncRead
Box::new(StreamReader::new(Box::pin(bytes_stream)))
}
}
pin_project! {
/// Wraps the underlying ChunkedBlob and exposes a AsyncRead and AsyncSeek.
pub struct ChunkedReader<BS> {
chunked_blob: ChunkedBlob<BS>,
#[pin]
r: Box<dyn AsyncRead + Unpin>,
pos: u64,
}
}
impl<BS> ChunkedReader<BS>
where
BS: AsRef<dyn BlobService> + Clone + 'static,
{
pub fn from_chunked_blob(chunked_blob: ChunkedBlob<BS>) -> Self {
let r = chunked_blob.reader_skipped_offset(0);
Self {
chunked_blob,
r,
pos: 0,
}
}
}
impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
where
BS: AsRef<dyn BlobService> + Clone + 'static,
{
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
// The amount of data read can be determined by the increase
// in the length of the slice returned by `ReadBuf::filled`.
let filled_before = buf.filled().len();
let this = self.project();
match this.r.poll_read(cx, buf) {
Poll::Ready(a) => {
let bytes_read = buf.filled().len() - filled_before;
*this.pos += bytes_read as u64;
Poll::Ready(a)
}
Poll::Pending => Poll::Pending,
}
}
}
impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
where
BS: AsRef<dyn BlobService> + Clone + 'static,
{
fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
let total_len = self.chunked_blob.blob_length();
let current_pos = self.pos;
let this = self.project();
let pos: &mut u64 = this.pos;
let mut r: Pin<&mut Box<dyn AsyncRead + Unpin>> = this.r;
let new_position: u64 = match position {
std::io::SeekFrom::Start(from_start) => from_start,
std::io::SeekFrom::End(from_end) => {
// note from_end is i64, not u64, so this is usually negative.
total_len.checked_add_signed(from_end).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"over/underflow while seeking",
)
})?
}
std::io::SeekFrom::Current(from_current) => {
// note from_end is i64, not u64, so this can be positive or negative.
current_pos
.checked_add_signed(from_current)
.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"over/underflow while seeking",
)
})?
}
};
// ensure the new position still is inside the file.
if new_position > total_len {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seeked beyond EOF",
))?
}
// Update the position and the internal reader.
*pos = new_position;
*r = this.chunked_blob.reader_skipped_offset(new_position);
Ok(())
}
fn poll_complete(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<u64>> {
std::task::Poll::Ready(Ok(self.pos))
}
}
#[cfg(test)]
mod test {
use std::{io::SeekFrom, sync::Arc};
use crate::{
blobservice::{chunked_reader::ChunkedReader, BlobService, MemoryBlobService},
B3Digest,
};
use hex_literal::hex;
use lazy_static::lazy_static;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
const CHUNK_1: [u8; 2] = hex!("0001");
const CHUNK_2: [u8; 4] = hex!("02030405");
const CHUNK_3: [u8; 1] = hex!("06");
const CHUNK_4: [u8; 2] = hex!("0708");
const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f");
lazy_static! {
// `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]`
pub static ref CHUNK_1_DIGEST: B3Digest = blake3::hash(&CHUNK_1).as_bytes().into();
pub static ref CHUNK_2_DIGEST: B3Digest = blake3::hash(&CHUNK_2).as_bytes().into();
pub static ref CHUNK_3_DIGEST: B3Digest = blake3::hash(&CHUNK_3).as_bytes().into();
pub static ref CHUNK_4_DIGEST: B3Digest = blake3::hash(&CHUNK_4).as_bytes().into();
pub static ref CHUNK_5_DIGEST: B3Digest = blake3::hash(&CHUNK_5).as_bytes().into();
pub static ref BLOB_1_LIST: [(B3Digest, u64); 5] = [
(CHUNK_1_DIGEST.clone(), 2),
(CHUNK_2_DIGEST.clone(), 4),
(CHUNK_3_DIGEST.clone(), 1),
(CHUNK_4_DIGEST.clone(), 2),
(CHUNK_5_DIGEST.clone(), 7),
];
}
use super::ChunkedBlob;
/// ensure the start offsets are properly calculated.
#[test]
fn from_iter() {
let cb = ChunkedBlob::from_iter(
BLOB_1_LIST.clone().into_iter(),
Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
);
assert_eq!(
cb.chunks,
Vec::from_iter([
(0, 2, CHUNK_1_DIGEST.clone()),
(2, 4, CHUNK_2_DIGEST.clone()),
(6, 1, CHUNK_3_DIGEST.clone()),
(7, 2, CHUNK_4_DIGEST.clone()),
(9, 7, CHUNK_5_DIGEST.clone()),
])
);
}
/// ensure ChunkedBlob can't be used with an empty list of chunks
#[test]
#[should_panic]
fn from_iter_empty() {
ChunkedBlob::from_iter(
[].into_iter(),
Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
);
}
/// ensure the right chunk is selected
#[test]
fn chunk_idx_for_position() {
let cb = ChunkedBlob::from_iter(
BLOB_1_LIST.clone().into_iter(),
Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
);
assert_eq!(Some(0), cb.get_chunk_idx_for_position(0), "start of blob");
assert_eq!(
Some(0),
cb.get_chunk_idx_for_position(1),
"middle of first chunk"
);
assert_eq!(
Some(1),
cb.get_chunk_idx_for_position(2),
"beginning of second chunk"
);
assert_eq!(
Some(4),
cb.get_chunk_idx_for_position(15),
"right before the end of the blob"
);
assert_eq!(
None,
cb.get_chunk_idx_for_position(16),
"right outside the blob"
);
assert_eq!(
None,
cb.get_chunk_idx_for_position(100),
"way outside the blob"
);
}
/// returns a blobservice with all chunks in BLOB_1 present.
async fn gen_blobservice_blob1() -> Arc<dyn BlobService> {
let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
// seed blob service with all chunks
for blob_contents in [
CHUNK_1.to_vec(),
CHUNK_2.to_vec(),
CHUNK_3.to_vec(),
CHUNK_4.to_vec(),
CHUNK_5.to_vec(),
] {
let mut bw = blob_service.open_write().await;
tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
.await
.expect("writing blob");
bw.close().await.expect("close blobwriter");
}
blob_service
}
#[tokio::test]
async fn test_read() {
let blob_service = gen_blobservice_blob1().await;
let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service);
let mut chunked_reader = ChunkedReader::from_chunked_blob(cb);
// read all data
let mut buf = Vec::new();
tokio::io::copy(&mut chunked_reader, &mut buf)
.await
.expect("copy");
assert_eq!(
hex!("000102030405060708090a0b0c0d0e0f").to_vec(),
buf,
"read data must match"
);
}
#[tokio::test]
async fn test_seek() {
let blob_service = gen_blobservice_blob1().await;
let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service);
let mut chunked_reader = ChunkedReader::from_chunked_blob(cb);
// seek to the end
// expect to read 0 bytes
{
chunked_reader
.seek(SeekFrom::End(0))
.await
.expect("seek to end");
let mut buf = Vec::new();
chunked_reader
.read_to_end(&mut buf)
.await
.expect("read to end");
assert_eq!(hex!("").to_vec(), buf);
}
// seek one bytes before the end
{
chunked_reader.seek(SeekFrom::End(-1)).await.expect("seek");
let mut buf = Vec::new();
chunked_reader
.read_to_end(&mut buf)
.await
.expect("read to end");
assert_eq!(hex!("0f").to_vec(), buf);
}
// seek back three bytes, but using relative positioning
// read two bytes
{
chunked_reader
.seek(SeekFrom::Current(-3))
.await
.expect("seek");
let mut buf = [0b0; 2];
chunked_reader
.read_exact(&mut buf)
.await
.expect("read exact");
assert_eq!(hex!("0d0e"), buf);
}
}
// seeds a blob service with only the first two chunks, reads a bit in the
// front (which succeeds), but then tries to seek past and read more (which
// should fail).
#[tokio::test]
async fn test_read_missing_chunks() {
let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
for blob_contents in [CHUNK_1.to_vec(), CHUNK_2.to_vec()] {
let mut bw = blob_service.open_write().await;
tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
.await
.expect("writing blob");
bw.close().await.expect("close blobwriter");
}
let cb = ChunkedBlob::from_iter(BLOB_1_LIST.clone().into_iter(), blob_service);
let mut chunked_reader = ChunkedReader::from_chunked_blob(cb);
// read a bit from the front (5 bytes out of 6 available)
let mut buf = [0b0; 5];
chunked_reader
.read_exact(&mut buf)
.await
.expect("read exact");
assert_eq!(hex!("0001020304"), buf);
// seek 2 bytes forward, into an area where we don't have chunks
chunked_reader
.seek(SeekFrom::Current(2))
.await
.expect("seek");
let mut buf = Vec::new();
chunked_reader
.read_to_end(&mut buf)
.await
.expect_err("must fail");
// FUTUREWORK: check semantics on errorkinds. Should this be InvalidData
// or NotFound?
}
}