about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-06-30T14·08+0200
committerclbot <clbot@tvl.fyi>2023-07-21T18·52+0000
commit7613e2e76972554ee2a5ae1397f8b5ca84f4f729 (patch)
tree366554a7137abbd63971e8c517a0926fcffc56a7
parent42dc18353d99453bc0f83492f9f5bc4796f4cc4c (diff)
feat(tvix/store/blobservice): implement seek r/6434
For memory and sled, it's trivial, as we already have a Cursor<Vec<u8>>.
For gRPC, we simply reject going backwards, and skip n bytes for now.

Once the gRPC protocol gets support for offsets and verified streaming,
this can be improved.

Change-Id: I734066a514aed287ea3db64bfb1680911ac1eeb0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8885
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
-rw-r--r--tvix/store/src/blobservice/dumb_seeker.rs93
-rw-r--r--tvix/store/src/blobservice/grpc.rs9
-rw-r--r--tvix/store/src/blobservice/memory.rs4
-rw-r--r--tvix/store/src/blobservice/mod.rs9
-rw-r--r--tvix/store/src/blobservice/sled.rs4
-rw-r--r--tvix/store/src/blobservice/tests.rs132
6 files changed, 240 insertions, 11 deletions
diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs
new file mode 100644
index 0000000000..5548ea0bd3
--- /dev/null
+++ b/tvix/store/src/blobservice/dumb_seeker.rs
@@ -0,0 +1,93 @@
+use std::io;
+
+use super::BlobReader;
+
+/// This implements [io::Seek] for and [io::Read] by simply skipping over some
+/// bytes, keeping track of the position.
+/// It fails whenever you try to seek backwards.
+pub struct DumbSeeker<R: io::Read> {
+    r: R,
+    pos: u64,
+}
+
+impl<R: io::Read> DumbSeeker<R> {
+    pub fn new(r: R) -> Self {
+        DumbSeeker { r, pos: 0 }
+    }
+}
+
+impl<R: io::Read> io::Read for DumbSeeker<R> {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        let bytes_read = self.r.read(buf)?;
+
+        self.pos += bytes_read as u64;
+
+        Ok(bytes_read)
+    }
+}
+
+impl<R: io::Read> io::Seek for DumbSeeker<R> {
+    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
+        let absolute_offset: u64 = match pos {
+            io::SeekFrom::Start(start_offset) => {
+                if start_offset < self.pos {
+                    return Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        format!("can't seek backwards ({} -> {})", self.pos, start_offset),
+                    ));
+                } else {
+                    start_offset
+                }
+            }
+            // we don't know the total size, can't support this.
+            io::SeekFrom::End(_end_offset) => {
+                return Err(io::Error::new(
+                    io::ErrorKind::Unsupported,
+                    "can't seek from end",
+                ));
+            }
+            io::SeekFrom::Current(relative_offset) => {
+                if relative_offset < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        "can't seek backwards relative to current position",
+                    ));
+                } else {
+                    self.pos + relative_offset as u64
+                }
+            }
+        };
+
+        // we already know absolute_offset is larger than self.pos
+        debug_assert!(
+            absolute_offset > self.pos,
+            "absolute_offset is larger than self.pos"
+        );
+
+        // calculate bytes to skip
+        let bytes_to_skip: u64 = absolute_offset - self.pos;
+
+        // discard these bytes. We can't use take() as it requires ownership of
+        // self.r, but we only have &mut self.
+        let mut buf = [0; 1024];
+        let mut bytes_skipped: u64 = 0;
+        while bytes_skipped < bytes_to_skip {
+            let len = std::cmp::min(bytes_to_skip - bytes_skipped, buf.len() as u64);
+            match self.r.read(&mut buf[..len as usize]) {
+                Ok(0) => break,
+                Ok(n) => bytes_skipped += n as u64,
+                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
+                Err(e) => return Err(e),
+            }
+        }
+        debug_assert_eq!(bytes_to_skip, bytes_skipped);
+
+        self.pos = absolute_offset;
+
+        // return the new position from the start of the stream
+        Ok(absolute_offset)
+    }
+}
+
+/// A Cursor<Vec<u8>> can be used as a BlobReader.
+impl<R: io::Read + Send + 'static> BlobReader for DumbSeeker<R> {}
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
index a0006eb72f..96e2869a4f 100644
--- a/tvix/store/src/blobservice/grpc.rs
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -1,4 +1,4 @@
-use super::{BlobService, BlobWriter};
+use super::{dumb_seeker::DumbSeeker, BlobReader, BlobService, BlobWriter};
 use crate::{proto, B3Digest};
 use futures::sink::{SinkExt, SinkMapErr};
 use std::{collections::VecDeque, io};
@@ -114,10 +114,7 @@ impl BlobService for GRPCBlobService {
 
     // On success, this returns a Ok(Some(io::Read)), which can be used to read
     // the contents of the Blob, identified by the digest.
-    fn open_read(
-        &self,
-        digest: &B3Digest,
-    ) -> Result<Option<Box<dyn io::Read + Send>>, crate::Error> {
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
         let digest = digest.clone();
@@ -155,7 +152,7 @@ impl BlobService for GRPCBlobService {
 
                 // Use SyncIoBridge to turn it into a sync Read.
                 let sync_reader = tokio_util::io::SyncIoBridge::new(data_reader);
-                Ok(Some(Box::new(sync_reader)))
+                Ok(Some(Box::new(DumbSeeker::new(sync_reader))))
             }
             Err(e) if e.code() == Code::NotFound => Ok(None),
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index 8963deefd4..fa2826fe31 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -5,7 +5,7 @@ use std::{
 };
 use tracing::{instrument, warn};
 
-use super::{BlobService, BlobWriter};
+use super::{BlobReader, BlobService, BlobWriter};
 use crate::{B3Digest, Error};
 
 #[derive(Clone, Default)]
@@ -36,7 +36,7 @@ impl BlobService for MemoryBlobService {
         Ok(db.contains_key(digest))
     }
 
-    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> {
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> {
         let db = self.db.read().unwrap();
 
         match db.get(digest).map(|x| Cursor::new(x.clone())) {
diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs
index c159ce95a5..027b34eb4c 100644
--- a/tvix/store/src/blobservice/mod.rs
+++ b/tvix/store/src/blobservice/mod.rs
@@ -2,6 +2,7 @@ use std::io;
 
 use crate::{B3Digest, Error};
 
+mod dumb_seeker;
 mod from_addr;
 mod grpc;
 mod memory;
@@ -30,7 +31,7 @@ pub trait BlobService: Send + Sync {
     fn has(&self, digest: &B3Digest) -> Result<bool, Error>;
 
     /// Request a blob from the store, by its content hash.
-    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error>;
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error>;
 
     /// Insert a new blob into the store. Returns a [BlobWriter], which
     /// implements [io::Write] and a [BlobWriter::close].
@@ -46,3 +47,9 @@ pub trait BlobWriter: io::Write + Send + Sync + 'static {
     /// Closing a already-closed BlobWriter is a no-op.
     fn close(&mut self) -> Result<B3Digest, Error>;
 }
+
+/// A [io::Read] that also allows seeking.
+pub trait BlobReader: io::Read + io::Seek + Send + 'static {}
+
+/// A Cursor<Vec<u8>> can be used as a BlobReader.
+impl BlobReader for io::Cursor<Vec<u8>> {}
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
index bff50ff3eb..67897cb94a 100644
--- a/tvix/store/src/blobservice/sled.rs
+++ b/tvix/store/src/blobservice/sled.rs
@@ -1,4 +1,4 @@
-use super::{BlobService, BlobWriter};
+use super::{BlobReader, BlobService, BlobWriter};
 use crate::{B3Digest, Error};
 use std::{
     io::{self, Cursor},
@@ -65,7 +65,7 @@ impl BlobService for SledBlobService {
     }
 
     #[instrument(skip(self), fields(blob.digest=%digest))]
-    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn io::Read + Send>>, Error> {
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, Error> {
         match self.db.get(digest.to_vec()) {
             Ok(None) => Ok(None),
             Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))),
diff --git a/tvix/store/src/blobservice/tests.rs b/tvix/store/src/blobservice/tests.rs
index 2b1c2c1665..cc43ce5b18 100644
--- a/tvix/store/src/blobservice/tests.rs
+++ b/tvix/store/src/blobservice/tests.rs
@@ -84,3 +84,135 @@ fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest
 
     assert_eq!(blob_contents, buf, "read blob contents must match");
 }
+
+/// Put a blob in the store, and seek inside it a bit.
+#[test_case(gen_memory_blob_service(); "memory")]
+#[test_case(gen_sled_blob_service(); "sled")]
+fn put_seek(blob_service: impl BlobService) {
+    let mut w = blob_service.open_write();
+
+    io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w).expect("copy must succeed");
+    w.close().expect("close must succeed");
+
+    // open a blob for reading
+    let mut r = blob_service
+        .open_read(&fixtures::BLOB_B_DIGEST)
+        .expect("open_read must succeed")
+        .expect("must be some");
+
+    let mut pos: u64 = 0;
+
+    // read the first 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).expect("must succeed");
+
+        assert_eq!(
+            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected first 10 bytes to match"
+        );
+
+        pos += buf.len() as u64;
+    }
+    // seek by 0 bytes, using SeekFrom::Start.
+    let p = r
+        .seek(io::SeekFrom::Start(pos as u64))
+        .expect("must not fail");
+    assert_eq!(pos, p);
+
+    // read the next 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).expect("must succeed");
+
+        assert_eq!(
+            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected data to match"
+        );
+
+        pos += buf.len() as u64;
+    }
+
+    // seek by 5 bytes, using SeekFrom::Start.
+    let p = r
+        .seek(io::SeekFrom::Start(pos as u64 + 5))
+        .expect("must not fail");
+    pos += 5;
+    assert_eq!(pos, p);
+
+    // read the next 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).expect("must succeed");
+
+        assert_eq!(
+            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected data to match"
+        );
+
+        pos += buf.len() as u64;
+    }
+
+    // seek by 12345 bytes, using SeekFrom::
+    let p = r.seek(io::SeekFrom::Current(12345)).expect("must not fail");
+    pos += 12345;
+    assert_eq!(pos, p);
+
+    // read the next 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).expect("must succeed");
+
+        assert_eq!(
+            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected data to match"
+        );
+
+        #[allow(unused_assignments)]
+        {
+            pos += buf.len() as u64;
+        }
+    }
+
+    // seeking to the end is okay…
+    let p = r
+        .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64))
+        .expect("must not fail");
+    pos = fixtures::BLOB_B.len() as u64;
+    assert_eq!(pos, p);
+
+    {
+        // but it returns no more data.
+        let mut buf: Vec<u8> = Vec::new();
+        r.read_to_end(&mut buf).expect("must not fail");
+        assert!(buf.is_empty(), "expected no more data to be read");
+    }
+
+    // seeking past the end…
+    match r.seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) {
+        // should either be ok, but then return 0 bytes.
+        // this matches the behaviour or a Cursor<Vec<u8>>.
+        Ok(_pos) => {
+            let mut buf: Vec<u8> = Vec::new();
+            r.read_to_end(&mut buf).expect("must not fail");
+            assert!(buf.is_empty(), "expected no more data to be read");
+        }
+        // or not be okay.
+        Err(_) => {}
+    }
+
+    // TODO: this is only broken for the gRPC version
+    // We expect seeking backwards or relative to the end to fail.
+    // r.seek(io::SeekFrom::Current(-1))
+    //     .expect_err("SeekFrom::Current(-1) expected to fail");
+
+    // r.seek(io::SeekFrom::Start(pos - 1))
+    //     .expect_err("SeekFrom::Start(pos-1) expected to fail");
+
+    // r.seek(io::SeekFrom::End(0))
+    //     .expect_err("SeekFrom::End(_) expected to fail");
+}