diff options
author | Florian Klink <flokli@flokli.de> | 2023-06-30T14·08+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-07-21T18·52+0000 |
commit | 7613e2e76972554ee2a5ae1397f8b5ca84f4f729 (patch) | |
tree | 366554a7137abbd63971e8c517a0926fcffc56a7 /tvix/store/src/blobservice/dumb_seeker.rs | |
parent | 42dc18353d99453bc0f83492f9f5bc4796f4cc4c (diff) |
feat(tvix/store/blobservice): implement seek r/6434
For memory and sled, it's trivial, as we already have a Cursor<Vec<u8>>. For gRPC, we simply reject going backwards, and skip n bytes for now. Once the gRPC protocol gets support for offsets and verified streaming, this can be improved. Change-Id: I734066a514aed287ea3db64bfb1680911ac1eeb0 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8885 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/store/src/blobservice/dumb_seeker.rs')
-rw-r--r-- | tvix/store/src/blobservice/dumb_seeker.rs | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs new file mode 100644 index 000000000000..5548ea0bd33d --- /dev/null +++ b/tvix/store/src/blobservice/dumb_seeker.rs @@ -0,0 +1,93 @@ +use std::io; + +use super::BlobReader; + +/// This implements [io::Seek] for and [io::Read] by simply skipping over some +/// bytes, keeping track of the position. +/// It fails whenever you try to seek backwards. +pub struct DumbSeeker<R: io::Read> { + r: R, + pos: u64, +} + +impl<R: io::Read> DumbSeeker<R> { + pub fn new(r: R) -> Self { + DumbSeeker { r, pos: 0 } + } +} + +impl<R: io::Read> io::Read for DumbSeeker<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let bytes_read = self.r.read(buf)?; + + self.pos += bytes_read as u64; + + Ok(bytes_read) + } +} + +impl<R: io::Read> io::Seek for DumbSeeker<R> { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> { + let absolute_offset: u64 = match pos { + io::SeekFrom::Start(start_offset) => { + if start_offset < self.pos { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + format!("can't seek backwards ({} -> {})", self.pos, start_offset), + )); + } else { + start_offset + } + } + // we don't know the total size, can't support this. + io::SeekFrom::End(_end_offset) => { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek from end", + )); + } + io::SeekFrom::Current(relative_offset) => { + if relative_offset < 0 { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "can't seek backwards relative to current position", + )); + } else { + self.pos + relative_offset as u64 + } + } + }; + + // we already know absolute_offset is larger than self.pos + debug_assert!( + absolute_offset > self.pos, + "absolute_offset is larger than self.pos" + ); + + // calculate bytes to skip + let bytes_to_skip: u64 = absolute_offset - self.pos; + + // discard these bytes. We can't use take() as it requires ownership of + // self.r, but we only have &mut self. + let mut buf = [0; 1024]; + let mut bytes_skipped: u64 = 0; + while bytes_skipped < bytes_to_skip { + let len = std::cmp::min(bytes_to_skip - bytes_skipped, buf.len() as u64); + match self.r.read(&mut buf[..len as usize]) { + Ok(0) => break, + Ok(n) => bytes_skipped += n as u64, + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + debug_assert_eq!(bytes_to_skip, bytes_skipped); + + self.pos = absolute_offset; + + // return the new position from the start of the stream + Ok(absolute_offset) + } +} + +/// A Cursor<Vec<u8>> can be used as a BlobReader. +impl<R: io::Read + Send + 'static> BlobReader for DumbSeeker<R> {} |