about summary refs log tree commit diff
path: root/tvix/store/src/blobservice/dumb_seeker.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-06-30T14·08+0200
committerclbot <clbot@tvl.fyi>2023-07-21T18·52+0000
commit7613e2e76972554ee2a5ae1397f8b5ca84f4f729 (patch)
tree366554a7137abbd63971e8c517a0926fcffc56a7 /tvix/store/src/blobservice/dumb_seeker.rs
parent42dc18353d99453bc0f83492f9f5bc4796f4cc4c (diff)
feat(tvix/store/blobservice): implement seek r/6434
For memory and sled, it's trivial, as we already have a Cursor<Vec<u8>>.
For gRPC, we simply reject going backwards, and skip n bytes for now.

Once the gRPC protocol gets support for offsets and verified streaming,
this can be improved.

Change-Id: I734066a514aed287ea3db64bfb1680911ac1eeb0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8885
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/store/src/blobservice/dumb_seeker.rs')
-rw-r--r--tvix/store/src/blobservice/dumb_seeker.rs93
1 files changed, 93 insertions, 0 deletions
diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs
new file mode 100644
index 000000000000..5548ea0bd33d
--- /dev/null
+++ b/tvix/store/src/blobservice/dumb_seeker.rs
@@ -0,0 +1,93 @@
+use std::io;
+
+use super::BlobReader;
+
+/// This implements [io::Seek] for and [io::Read] by simply skipping over some
+/// bytes, keeping track of the position.
+/// It fails whenever you try to seek backwards.
+pub struct DumbSeeker<R: io::Read> {
+    r: R,
+    pos: u64,
+}
+
+impl<R: io::Read> DumbSeeker<R> {
+    pub fn new(r: R) -> Self {
+        DumbSeeker { r, pos: 0 }
+    }
+}
+
+impl<R: io::Read> io::Read for DumbSeeker<R> {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        let bytes_read = self.r.read(buf)?;
+
+        self.pos += bytes_read as u64;
+
+        Ok(bytes_read)
+    }
+}
+
+impl<R: io::Read> io::Seek for DumbSeeker<R> {
+    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
+        let absolute_offset: u64 = match pos {
+            io::SeekFrom::Start(start_offset) => {
+                if start_offset < self.pos {
+                    return Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        format!("can't seek backwards ({} -> {})", self.pos, start_offset),
+                    ));
+                } else {
+                    start_offset
+                }
+            }
+            // we don't know the total size, can't support this.
+            io::SeekFrom::End(_end_offset) => {
+                return Err(io::Error::new(
+                    io::ErrorKind::Unsupported,
+                    "can't seek from end",
+                ));
+            }
+            io::SeekFrom::Current(relative_offset) => {
+                if relative_offset < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        "can't seek backwards relative to current position",
+                    ));
+                } else {
+                    self.pos + relative_offset as u64
+                }
+            }
+        };
+
+        // we already know absolute_offset is larger than self.pos
+        debug_assert!(
+            absolute_offset > self.pos,
+            "absolute_offset is larger than self.pos"
+        );
+
+        // calculate bytes to skip
+        let bytes_to_skip: u64 = absolute_offset - self.pos;
+
+        // discard these bytes. We can't use take() as it requires ownership of
+        // self.r, but we only have &mut self.
+        let mut buf = [0; 1024];
+        let mut bytes_skipped: u64 = 0;
+        while bytes_skipped < bytes_to_skip {
+            let len = std::cmp::min(bytes_to_skip - bytes_skipped, buf.len() as u64);
+            match self.r.read(&mut buf[..len as usize]) {
+                Ok(0) => break,
+                Ok(n) => bytes_skipped += n as u64,
+                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
+                Err(e) => return Err(e),
+            }
+        }
+        debug_assert_eq!(bytes_to_skip, bytes_skipped);
+
+        self.pos = absolute_offset;
+
+        // return the new position from the start of the stream
+        Ok(absolute_offset)
+    }
+}
+
+/// A Cursor<Vec<u8>> can be used as a BlobReader.
+impl<R: io::Read + Send + 'static> BlobReader for DumbSeeker<R> {}