diff options
-rw-r--r-- | tvix/store/src/blobservice/dumb_seeker.rs | 93 | ||||
-rw-r--r-- | tvix/store/src/blobservice/grpc.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/blobservice/memory.rs | 4 | ||||
-rw-r--r-- | tvix/store/src/blobservice/mod.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/blobservice/sled.rs | 4 | ||||
-rw-r--r-- | tvix/store/src/blobservice/tests.rs | 132 |
6 files changed, 240 insertions, 11 deletions
diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs new file mode 100644 index 000000000000..5548ea0bd33d --- /dev/null +++ b/tvix/store/src/blobservice/dumb_seeker.rs @@ -0,0 +1,93 @@ +use std::io; + +use super::BlobReader; + +/// This implements [io::Seek] for and [io::Read] by simply skipping over some +/// bytes, keeping track of the position. +/// It fails whenever you try to seek backwards. +pub struct DumbSeeker<R: io::Read> { + r: R, + pos: u64, +} + +impl<R: io::Read> DumbSeeker<R> { + pub fn new(r: R) -> Self { + DumbSeeker { r, pos: 0 } + } +} + +impl<R: io::Read> io::Read for DumbSeeker<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let bytes_read = self.r.read(buf)?; + + self.pos += bytes_read as u64; + + Ok(bytes_read) + } +} + +impl<R: io::Read> io::Seek for DumbSeeker<R> { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> { + let absolute_offset: u64 = match pos { + io::SeekFrom::Start(start_offset) => { + if start_offset < self.pos { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + format!("can't seek backwards ({} -> {})", self.pos, start_offset), + )); + } else { + start_offset + } + } + // we don't know the total size, can't support this. + io::SeekFrom::End(_end_offset) => { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek from end", + )); + } + io::SeekFrom::Current(relative_offset) => { + if relative_offset < 0 { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek backwards relative to current position", + )); + } else { + self.pos + relative_offset as u64 + } + } + }; + + // we already know absolute_offset is larger than self.pos + debug_assert!( + absolute_offset > self.pos, + "absolute_offset is larger than self.pos" + ); + + // calculate bytes to skip + let bytes_to_skip: u64 = absolute_offset - self.pos; + + // discard these bytes. We can't use take() as it requires ownership of + // self.r, but we only have &mut self. + let mut buf = [0; 1024]; + let mut bytes_skipped: u64 = 0; + while bytes_skipped < bytes_to_skip { + let len = std::cmp::min(bytes_to_skip - bytes_skipped, buf.len() as u64); + match self.r.read(&mut buf[..len as usize]) { + Ok(0) => break, + Ok(n) => bytes_skipped += n as u64, + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + debug_assert_eq!(bytes_to_skip, bytes_skipped); + + self.pos = absolute_offset; + + // return the new position from the start of the stream + Ok(absolute_offset) + } +} + +/// A Cursor<Vec<u8>> can be used as a BlobReader. +impl<R: io::Read + Send + 'static> BlobReader for DumbSeeker<R> {} diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index a0006eb72f9e..96e2869a4feb 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -1,4 +1,4 @@ -use super::{BlobService, BlobWriter}; +use super::{dumb_seeker::DumbSeeker, BlobReader, BlobService, BlobWriter}; use crate::{proto, B3Digest}; use futures::sink::{SinkExt, SinkMapErr}; use std::{collections::VecDeque, io}; @@ -114,10 +114,7 @@ impl BlobService for GRPCBlobService { // On success, this returns a Ok(Some(io::Read)), which can be used to read // the contents of the Blob, identified by the digest. - fn open_read( - &self, - digest: &B3Digest, - ) -> Result<Option<Box<dyn io::Read + Send>>, crate::Error> { + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest = digest.clone(); @@ -155,7 +152,7 @@ impl BlobService for GRPCBlobService { // Use SyncIoBridge to turn it into a sync Read. let sync_reader = tokio_util::io::SyncIoBridge::new(data_reader); - Ok(Some(Box::new(sync_reader))) + Ok(Some(Box::new(DumbSeeker::new(sync_reader)))) } Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(crate::Error::StorageError(e.to_string())), diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 8963deefd42b..fa2826fe3112 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -5,7 +5,7 @@ use std::{ }; use tracing::{instrument, warn}; -use super::{BlobService, BlobWriter}; +use super::{BlobReader, BlobService, BlobWriter}; use crate::{B3Digest, Error}; #[derive(Clone, Default)] @@ -36,7 +36,7 @@ impl BlobService for MemoryBlobService { Ok(db.contains_key(digest)) } - fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> { + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> { let db = self.db.read().unwrap(); match db.get(digest).map(|x| Cursor::new(x.clone())) { diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index c159ce95a544..027b34eb4c6f 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -2,6 +2,7 @@ use std::io; use crate::{B3Digest, Error}; +mod dumb_seeker; mod from_addr; mod grpc; mod memory; @@ -30,7 +31,7 @@ pub trait BlobService: Send + Sync { fn has(&self, digest: &B3Digest) -> Result<bool, Error>; /// Request a blob from the store, by its content hash. - fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error>; + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error>; /// Insert a new blob into the store. Returns a [BlobWriter], which /// implements [io::Write] and a [BlobWriter::close]. @@ -46,3 +47,9 @@ pub trait BlobWriter: io::Write + Send + Sync + 'static { /// Closing a already-closed BlobWriter is a no-op. fn close(&mut self) -> Result<B3Digest, Error>; } + +/// A [io::Read] that also allows seeking. +pub trait BlobReader: io::Read + io::Seek + Send + 'static {} + +/// A Cursor<Vec<u8>> can be used as a BlobReader. +impl BlobReader for io::Cursor<Vec<u8>> {} diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index bff50ff3eb4a..67897cb94a24 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -1,4 +1,4 @@ -use super::{BlobService, BlobWriter}; +use super::{BlobReader, BlobService, BlobWriter}; use crate::{B3Digest, Error}; use std::{ io::{self, Cursor}, @@ -65,7 +65,7 @@ impl BlobService for SledBlobService { } #[instrument(skip(self), fields(blob.digest=%digest))] - fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> { + fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> { match self.db.get(digest.to_vec()) { Ok(None) => Ok(None), Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))), diff --git a/tvix/store/src/blobservice/tests.rs b/tvix/store/src/blobservice/tests.rs index 2b1c2c166596..cc43ce5b180a 100644 --- a/tvix/store/src/blobservice/tests.rs +++ b/tvix/store/src/blobservice/tests.rs @@ -84,3 +84,135 @@ fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest assert_eq!(blob_contents, buf, "read blob contents must match"); } + +/// Put a blob in the store, and seek inside it a bit. +#[test_case(gen_memory_blob_service(); "memory")] +#[test_case(gen_sled_blob_service(); "sled")] +fn put_seek(blob_service: impl BlobService) { + let mut w = blob_service.open_write(); + + io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w).expect("copy must succeed"); + w.close().expect("close must succeed"); + + // open a blob for reading + let mut r = blob_service + .open_read(&fixtures::BLOB_B_DIGEST) + .expect("open_read must succeed") + .expect("must be some"); + + let mut pos: u64 = 0; + + // read the first 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected first 10 bytes to match" + ); + + pos += buf.len() as u64; + } + // seek by 0 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos as u64)) + .expect("must not fail"); + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + pos += buf.len() as u64; + } + + // seek by 5 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos as u64 + 5)) + .expect("must not fail"); + pos += 5; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + pos += buf.len() as u64; + } + + // seek by 12345 bytes, using SeekFrom:: + let p = r.seek(io::SeekFrom::Current(12345)).expect("must not fail"); + pos += 12345; + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + #[allow(unused_assignments)] + { + pos += buf.len() as u64; + } + } + + // seeking to the end is okay… + let p = r + .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64)) + .expect("must not fail"); + pos = fixtures::BLOB_B.len() as u64; + assert_eq!(pos, p); + + { + // but it returns no more data. + let mut buf: Vec<u8> = Vec::new(); + r.read_to_end(&mut buf).expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + + // seeking past the end… + match r.seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) { + // should either be ok, but then return 0 bytes. + // this matches the behaviour or a Cursor<Vec<u8>>. + Ok(_pos) => { + let mut buf: Vec<u8> = Vec::new(); + r.read_to_end(&mut buf).expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + // or not be okay. + Err(_) => {} + } + + // TODO: this is only broken for the gRPC version + // We expect seeking backwards or relative to the end to fail. + // r.seek(io::SeekFrom::Current(-1)) + // .expect_err("SeekFrom::Current(-1) expected to fail"); + + // r.seek(io::SeekFrom::Start(pos - 1)) + // .expect_err("SeekFrom::Start(pos-1) expected to fail"); + + // r.seek(io::SeekFrom::End(0)) + // .expect_err("SeekFrom::End(_) expected to fail"); +} |