about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/store/src/blobreader.rs384
-rw-r--r--tvix/store/src/lib.rs2
2 files changed, 386 insertions, 0 deletions
diff --git a/tvix/store/src/blobreader.rs b/tvix/store/src/blobreader.rs
new file mode 100644
index 000000000000..47ddfae03c7b
--- /dev/null
+++ b/tvix/store/src/blobreader.rs
@@ -0,0 +1,384 @@
+use std::io::{self, Cursor, Read, Write};
+
+use data_encoding::BASE64;
+
+use crate::{chunkservice::ChunkService, proto};
+
+/// BlobReader implements reading of a blob, by querying individual chunks.
+///
+/// It doesn't talk to BlobService, but assumes something has already fetched
+/// blob_meta already.
+pub struct BlobReader<'a, CS: ChunkService> {
+    // used to look up chunks
+    chunk_service: &'a CS,
+
+    // internal iterator over chunk hashes and their sizes
+    chunks_iter: std::vec::IntoIter<proto::blob_meta::ChunkMeta>,
+
+    // If a chunk was partially read (if buf.len() < chunk.size),
+    // a cursor to its contents are stored here.
+    current_chunk: Option<Cursor<Vec<u8>>>,
+}
+
+impl<'a, CS: ChunkService> BlobReader<'a, CS> {
+    pub fn open(chunk_service: &'a CS, blob_meta: proto::BlobMeta) -> Self {
+        Self {
+            chunk_service,
+            chunks_iter: blob_meta.chunks.into_iter(),
+            current_chunk: None,
+        }
+    }
+
+    /// reads (up to n bytes) from the current chunk into buf (if there is
+    /// a chunk).
+    ///
+    /// If it arrives at the end of the chunk, sets it back to None.
+    /// Returns a io::Result<usize> of the bytes read from the chunk.
+    fn read_from_current_chunk<W: std::io::Write>(
+        &mut self,
+        m: usize,
+        buf: &mut W,
+    ) -> std::io::Result<usize> {
+        // If there's still something in partial_chunk, read from there
+        // (up to m: usize bytes) and return the number of bytes read.
+        if let Some(current_chunk) = &mut self.current_chunk {
+            let result = io::copy(&mut current_chunk.take(m as u64), buf);
+
+            match result {
+                Ok(n) => {
+                    // if we were not able to read all off m bytes,
+                    // this means we arrived at the end of the chunk.
+                    if n < m as u64 {
+                        self.current_chunk = None
+                    }
+
+                    // n can never be > m, so downcasting this to usize is ok.
+                    Ok(n as usize)
+                }
+                Err(e) => Err(e),
+            }
+        } else {
+            Ok(0)
+        }
+    }
+}
+
+impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        let read_max = buf.len();
+        let mut bytes_read = 0_usize;
+        let mut buf_w = std::io::BufWriter::new(buf);
+
+        // read up to buf.len() bytes into buf, by reading from the current
+        // chunk and subsequent ones.
+        loop {
+            // try to fill buf with bytes from the current chunk
+            // (if there's still one)
+            let n = self.read_from_current_chunk(read_max - bytes_read, &mut buf_w)?;
+            bytes_read += n;
+
+            // We want to make sure we don't accidentially read past more than
+            // we're allowed to.
+            assert!(bytes_read <= read_max);
+
+            // buf is entirerly filled, we're done.
+            if bytes_read == read_max {
+                buf_w.flush()?;
+                break Ok(bytes_read);
+            }
+
+            // Otherwise, bytes_read is < read_max, so we could still write
+            // more to buf.
+            // Check if we have more chunks to read from.
+            match self.chunks_iter.next() {
+                // No more chunks, we're done.
+                None => {
+                    buf_w.flush()?;
+                    return Ok(bytes_read);
+                }
+                // There's another chunk to visit, fetch its contents
+                Some(chunk_meta) => match self.chunk_service.get(&chunk_meta.digest) {
+                    // Fetch successful, put it into `self.current_chunk` and restart the loop.
+                    Ok(Some(chunk_data)) => {
+                        // make sure the size matches what chunk_meta says as well.
+                        if chunk_data.len() as u32 != chunk_meta.size {
+                            break Err(std::io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                format!(
+                                    "chunk_service returned chunk with wrong size for {}, expected {}, got {}",
+                                    BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len()
+                                )
+                            ));
+                        }
+                        self.current_chunk = Some(Cursor::new(chunk_data));
+                    }
+                    // Chunk requested does not exist
+                    Ok(None) => {
+                        break Err(std::io::Error::new(
+                            io::ErrorKind::NotFound,
+                            format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)),
+                        ))
+                    }
+                    // Error occured while fetching the next chunk, propagate the error from the chunk service
+                    Err(e) => {
+                        break Err(std::io::Error::new(io::ErrorKind::InvalidData, e));
+                    }
+                },
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::BlobReader;
+    use crate::chunkservice::ChunkService;
+    use crate::chunkservice::SledChunkService;
+    use crate::proto;
+    use lazy_static::lazy_static;
+    use std::io::Cursor;
+    use std::io::Read;
+    use std::io::Write;
+    use std::path::PathBuf;
+    use tempfile::TempDir;
+
+    lazy_static! {
+        static ref DUMMY_DIGEST: Vec<u8> = vec![
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+            0x00, 0x00, 0x00, 0x00,
+        ];
+        static ref DUMMY_DATA_1: Vec<u8> = vec![0x01, 0x02, 0x03];
+        static ref DUMMY_DATA_2: Vec<u8> = vec![0x04, 0x05];
+    }
+
+    fn gen_chunk_service(p: PathBuf) -> impl ChunkService {
+        SledChunkService::new(p.join("chunks")).unwrap()
+    }
+
+    #[test]
+    /// reading from a blobmeta with zero chunks should produce zero bytes.
+    fn empty_blobmeta() -> anyhow::Result<()> {
+        let tmpdir = TempDir::new()?;
+        let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
+
+        let blobmeta = proto::BlobMeta {
+            chunks: vec![],
+            inline_bao: vec![],
+        };
+
+        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
+        let mut buf = Cursor::new(Vec::new());
+
+        let res = std::io::copy(&mut blob_reader, &mut buf);
+
+        assert_eq!(0, res.unwrap());
+
+        Ok(())
+    }
+
+    #[test]
+    /// trying to read something where the chunk doesn't exist should fail
+    fn missing_chunk_fail() -> anyhow::Result<()> {
+        let tmpdir = TempDir::new()?;
+        let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
+
+        let blobmeta = proto::BlobMeta {
+            chunks: vec![proto::blob_meta::ChunkMeta {
+                digest: DUMMY_DIGEST.to_vec(),
+                size: 42,
+            }],
+            inline_bao: vec![],
+        };
+
+        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
+        let mut buf = Cursor::new(Vec::new());
+
+        let res = std::io::copy(&mut blob_reader, &mut buf);
+
+        assert!(res.is_err());
+
+        Ok(())
+    }
+
+    #[test]
+    /// read something containing the single (empty) chunk
+    fn empty_chunk() -> anyhow::Result<()> {
+        let tmpdir = TempDir::new()?;
+        let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
+
+        // insert a single chunk
+        let dgst = chunk_service.put(vec![]).expect("must succeed");
+
+        // assemble a blobmeta
+        let blobmeta = proto::BlobMeta {
+            chunks: vec![proto::blob_meta::ChunkMeta {
+                digest: dgst,
+                size: 0,
+            }],
+            inline_bao: vec![],
+        };
+
+        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
+
+        let mut buf: Vec<u8> = Vec::new();
+
+        let res =
+            std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
+
+        assert_eq!(res, 0, "number of bytes read must match");
+        assert!(buf.is_empty(), "buf must be empty");
+
+        Ok(())
+    }
+
+    /// read something which contains a single chunk
+    #[test]
+    fn single_chunk() -> anyhow::Result<()> {
+        let tmpdir = TempDir::new()?;
+        let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
+
+        // insert a single chunk
+        let dgst = chunk_service
+            .put(DUMMY_DATA_1.clone())
+            .expect("must succeed");
+
+        // assemble a blobmeta
+        let blobmeta = proto::BlobMeta {
+            chunks: vec![proto::blob_meta::ChunkMeta {
+                digest: dgst,
+                size: 3,
+            }],
+            inline_bao: vec![],
+        };
+
+        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
+
+        let mut buf: Vec<u8> = Vec::new();
+
+        let res =
+            std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
+
+        assert_eq!(res, 3, "number of bytes read must match");
+        assert_eq!(DUMMY_DATA_1[..], buf[..], "data read must match");
+
+        Ok(())
+    }
+
+    /// read something referring to a chunk, but with wrong size
+    #[test]
+    fn wrong_size_fail() -> anyhow::Result<()> {
+        let tmpdir = TempDir::new()?;
+        let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
+
+        // insert chunks
+        let dgst_1 = chunk_service
+            .put(DUMMY_DATA_1.clone())
+            .expect("must succeed");
+
+        // assemble a blobmeta
+        let blobmeta = proto::BlobMeta {
+            chunks: vec![proto::blob_meta::ChunkMeta {
+                digest: dgst_1,
+                size: 42,
+            }],
+            inline_bao: vec![],
+        };
+
+        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
+
+        let mut buf: Vec<u8> = Vec::new();
+
+        let res = std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf));
+
+        assert!(res.is_err(), "reading must fail");
+
+        Ok(())
+    }
+
+    /// read something referring to multiple chunks
+    #[test]
+    fn multiple_chunks() -> anyhow::Result<()> {
+        let tmpdir = TempDir::new()?;
+        let chunk_service = gen_chunk_service(tmpdir.path().to_path_buf());
+
+        // insert chunks
+        let dgst_1 = chunk_service
+            .put(DUMMY_DATA_1.clone())
+            .expect("must succeed");
+        let dgst_2 = chunk_service
+            .put(DUMMY_DATA_2.clone())
+            .expect("must succeed");
+
+        // assemble a blobmeta
+        let blobmeta = proto::BlobMeta {
+            chunks: vec![
+                proto::blob_meta::ChunkMeta {
+                    digest: dgst_1.clone(),
+                    size: 3,
+                },
+                proto::blob_meta::ChunkMeta {
+                    digest: dgst_2,
+                    size: 2,
+                },
+                proto::blob_meta::ChunkMeta {
+                    digest: dgst_1,
+                    size: 3,
+                },
+            ],
+            inline_bao: vec![],
+        };
+
+        // assemble ecpected data
+        let mut expected_data: Vec<u8> = Vec::new();
+        expected_data.extend_from_slice(&DUMMY_DATA_1[..]);
+        expected_data.extend_from_slice(&DUMMY_DATA_2[..]);
+        expected_data.extend_from_slice(&DUMMY_DATA_1[..]);
+
+        // read via io::copy
+        {
+            let mut blob_reader = BlobReader::open(&chunk_service, blobmeta.clone());
+
+            let mut buf: Vec<u8> = Vec::new();
+
+            let res =
+                std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
+
+            assert_eq!(8, res, "number of bytes read must match");
+
+            assert_eq!(expected_data[..], buf[..], "data read must match");
+        }
+
+        // now read the same thing again, but not via io::copy, but individually
+        {
+            let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
+
+            let mut buf: Vec<u8> = Vec::new();
+            let mut cursor = Cursor::new(&mut buf);
+
+            let mut bytes_read = 0;
+
+            loop {
+                let mut smallbuf = [0xff; 1];
+                match blob_reader.read(&mut smallbuf) {
+                    Ok(n) => {
+                        if n == 0 {
+                            break;
+                        }
+                        let w_b = cursor.write(&smallbuf).unwrap();
+                        assert_eq!(n, w_b);
+                        bytes_read += w_b;
+                    }
+                    Err(_) => {
+                        panic!("error occured during read");
+                    }
+                }
+            }
+
+            assert_eq!(8, bytes_read, "number of bytes read must match");
+            assert_eq!(expected_data[..], buf[..], "data read must match");
+        }
+
+        Ok(())
+    }
+}
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index aa0fcd0c9619..12250a9c0d51 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,11 +1,13 @@
 pub mod client;
 
+mod blobreader;
 mod errors;
 
 pub mod blobservice;
 pub mod chunkservice;
 pub mod proto;
 
+pub use blobreader::BlobReader;
 pub mod dummy_blob_service;
 pub mod sled_directory_service;
 pub mod sled_path_info_service;