diff options
Diffstat (limited to 'tvix/store/src/blobreader.rs')
-rw-r--r-- | tvix/store/src/blobreader.rs | 376 |
1 files changed, 0 insertions, 376 deletions
diff --git a/tvix/store/src/blobreader.rs b/tvix/store/src/blobreader.rs deleted file mode 100644 index ff0efdf15237..000000000000 --- a/tvix/store/src/blobreader.rs +++ /dev/null @@ -1,376 +0,0 @@ -use std::io::{self, Cursor, Read, Write}; - -use data_encoding::BASE64; - -use crate::{chunkservice::ChunkService, proto}; - -/// BlobReader implements reading of a blob, by querying individual chunks. -/// -/// It doesn't talk to BlobService, but assumes something has already fetched -/// blob_meta already. -pub struct BlobReader<'a, CS: ChunkService> { - // used to look up chunks - chunk_service: &'a CS, - - // internal iterator over chunk hashes and their sizes - chunks_iter: std::vec::IntoIter<proto::blob_meta::ChunkMeta>, - - // If a chunk was partially read (if buf.len() < chunk.size), - // a cursor to its contents are stored here. - current_chunk: Option<Cursor<Vec<u8>>>, -} - -impl<'a, CS: ChunkService> BlobReader<'a, CS> { - pub fn open(chunk_service: &'a CS, blob_meta: proto::BlobMeta) -> Self { - Self { - chunk_service, - chunks_iter: blob_meta.chunks.into_iter(), - current_chunk: None, - } - } - - /// reads (up to n bytes) from the current chunk into buf (if there is - /// a chunk). - /// - /// If it arrives at the end of the chunk, sets it back to None. - /// Returns a [std::io::Result<usize>] of the bytes read from the chunk. - fn read_from_current_chunk<W: std::io::Write>( - &mut self, - m: usize, - buf: &mut W, - ) -> std::io::Result<usize> { - // If there's still something in partial_chunk, read from there - // (up to m: usize bytes) and return the number of bytes read. - if let Some(current_chunk) = &mut self.current_chunk { - let result = io::copy(&mut current_chunk.take(m as u64), buf); - - match result { - Ok(n) => { - // if we were not able to read all off m bytes, - // this means we arrived at the end of the chunk. - if n < m as u64 { - self.current_chunk = None - } - - // n can never be > m, so downcasting this to usize is ok. - Ok(n as usize) - } - Err(e) => Err(e), - } - } else { - Ok(0) - } - } -} - -impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { - let read_max = buf.len(); - let mut bytes_read = 0_usize; - let mut buf_w = std::io::BufWriter::new(buf); - - // read up to buf.len() bytes into buf, by reading from the current - // chunk and subsequent ones. - loop { - // try to fill buf with bytes from the current chunk - // (if there's still one) - let n = self.read_from_current_chunk(read_max - bytes_read, &mut buf_w)?; - bytes_read += n; - - // We want to make sure we don't accidentially read past more than - // we're allowed to. - assert!(bytes_read <= read_max); - - // buf is entirerly filled, we're done. - if bytes_read == read_max { - buf_w.flush()?; - break Ok(bytes_read); - } - - // Otherwise, bytes_read is < read_max, so we could still write - // more to buf. - // Check if we have more chunks to read from. - match self.chunks_iter.next() { - // No more chunks, we're done. - None => { - buf_w.flush()?; - return Ok(bytes_read); - } - // There's another chunk to visit, fetch its contents - Some(chunk_meta) => { - let chunk_meta_digest: [u8; 32] = - chunk_meta.digest.clone().try_into().map_err(|_e| { - std::io::Error::new( - io::ErrorKind::InvalidData, - format!( - "chunk in chunkmeta has wrong digest size, expected 32, got {}", - chunk_meta.digest.len(), - ), - ) - })?; - match self.chunk_service.get(&chunk_meta_digest) { - // Fetch successful, put it into `self.current_chunk` and restart the loop. - Ok(Some(chunk_data)) => { - // make sure the size matches what chunk_meta says as well. - if chunk_data.len() as u32 != chunk_meta.size { - break Err(std::io::Error::new( - io::ErrorKind::InvalidData, - format!( - "chunk_service returned chunk with wrong size for {}, expected {}, got {}", - BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len() - ) - )); - } - self.current_chunk = Some(Cursor::new(chunk_data)); - } - // Chunk requested does not exist - Ok(None) => { - break Err(std::io::Error::new( - io::ErrorKind::NotFound, - format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)), - )) - } - // Error occured while fetching the next chunk, propagate the error from the chunk service - Err(e) => { - break Err(std::io::Error::new(io::ErrorKind::InvalidData, e)); - } - } - } - } - } - } -} - -#[cfg(test)] -mod tests { - use super::BlobReader; - use crate::chunkservice::ChunkService; - use crate::proto; - use crate::tests::fixtures::DUMMY_DATA_1; - use crate::tests::fixtures::DUMMY_DATA_2; - use crate::tests::fixtures::DUMMY_DIGEST; - use crate::tests::utils::gen_chunk_service; - use std::io::Cursor; - use std::io::Read; - use std::io::Write; - - #[test] - /// reading from a blobmeta with zero chunks should produce zero bytes. - fn empty_blobmeta() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - let blobmeta = proto::BlobMeta { - chunks: vec![], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - let mut buf = Cursor::new(Vec::new()); - - let res = std::io::copy(&mut blob_reader, &mut buf); - - assert_eq!(0, res.unwrap()); - - Ok(()) - } - - #[test] - /// trying to read something where the chunk doesn't exist should fail - fn missing_chunk_fail() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: DUMMY_DIGEST.to_vec(), - size: 42, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - let mut buf = Cursor::new(Vec::new()); - - let res = std::io::copy(&mut blob_reader, &mut buf); - - assert!(res.is_err()); - - Ok(()) - } - - #[test] - /// read something containing the single (empty) chunk - fn empty_chunk() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert a single chunk - let dgst = chunk_service.put(vec![]).expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst.to_vec(), - size: 0, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - - let res = - std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed"); - - assert_eq!(res, 0, "number of bytes read must match"); - assert!(buf.is_empty(), "buf must be empty"); - - Ok(()) - } - - /// read something which contains a single chunk - #[test] - fn single_chunk() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert a single chunk - let dgst = chunk_service - .put(DUMMY_DATA_1.clone()) - .expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst.to_vec(), - size: 3, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - - let res = - std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed"); - - assert_eq!(res, 3, "number of bytes read must match"); - assert_eq!(DUMMY_DATA_1[..], buf[..], "data read must match"); - - Ok(()) - } - - /// read something referring to a chunk, but with wrong size - #[test] - fn wrong_size_fail() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert chunks - let dgst_1 = chunk_service - .put(DUMMY_DATA_1.clone()) - .expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst_1.to_vec(), - size: 42, - }], - inline_bao: vec![], - }; - - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - - let res = std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)); - - assert!(res.is_err(), "reading must fail"); - - Ok(()) - } - - /// read something referring to multiple chunks - #[test] - fn multiple_chunks() -> anyhow::Result<()> { - let chunk_service = gen_chunk_service(); - - // insert chunks - let dgst_1 = chunk_service - .put(DUMMY_DATA_1.clone()) - .expect("must succeed"); - let dgst_2 = chunk_service - .put(DUMMY_DATA_2.clone()) - .expect("must succeed"); - - // assemble a blobmeta - let blobmeta = proto::BlobMeta { - chunks: vec![ - proto::blob_meta::ChunkMeta { - digest: dgst_1.to_vec(), - size: 3, - }, - proto::blob_meta::ChunkMeta { - digest: dgst_2.to_vec(), - size: 2, - }, - proto::blob_meta::ChunkMeta { - digest: dgst_1.to_vec(), - size: 3, - }, - ], - inline_bao: vec![], - }; - - // assemble ecpected data - let mut expected_data: Vec<u8> = Vec::new(); - expected_data.extend_from_slice(&DUMMY_DATA_1[..]); - expected_data.extend_from_slice(&DUMMY_DATA_2[..]); - expected_data.extend_from_slice(&DUMMY_DATA_1[..]); - - // read via io::copy - { - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta.clone()); - - let mut buf: Vec<u8> = Vec::new(); - - let res = - std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed"); - - assert_eq!(8, res, "number of bytes read must match"); - - assert_eq!(expected_data[..], buf[..], "data read must match"); - } - - // now read the same thing again, but not via io::copy, but individually - { - let mut blob_reader = BlobReader::open(&chunk_service, blobmeta); - - let mut buf: Vec<u8> = Vec::new(); - let mut cursor = Cursor::new(&mut buf); - - let mut bytes_read = 0; - - loop { - let mut smallbuf = [0xff; 1]; - match blob_reader.read(&mut smallbuf) { - Ok(n) => { - if n == 0 { - break; - } - let w_b = cursor.write(&smallbuf).unwrap(); - assert_eq!(n, w_b); - bytes_read += w_b; - } - Err(_) => { - panic!("error occured during read"); - } - } - } - - assert_eq!(8, bytes_read, "number of bytes read must match"); - assert_eq!(expected_data[..], buf[..], "data read must match"); - } - - Ok(()) - } -} |