about summary refs log tree commit diff
path: root/tvix/store/src/blobservice/dumb_seeker.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-13T12·20+0200
committerflokli <flokli@flokli.de>2023-09-18T10·33+0000
commitda6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch)
tree5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/store/src/blobservice/dumb_seeker.rs
parent3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff)
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync.

This however had some annoying consequences:

 - It became more and more complicated to track when we're in a context
   with an async runtime in the context or not, producing bugs like
   https://b.tvl.fyi/issues/304
 - The sync trait shielded away async clients from async worloads,
   requiring manual block_on code inside the gRPC client code, and
   spawn_blocking calls in consumers of the trait, even if they were
   async (like the gRPC server)
 - We had to write our own custom glue code (SyncReadIntoAsyncRead)
   to convert a sync io::Read into a tokio::io::AsyncRead, which already
   existed in tokio internally, but upstream ia hesitant to expose.

This now makes the BlobService trait async (via the async_trait macro,
like we already do in various gRPC parts), and replaces the sync readers
and writers with their async counterparts.

Tests interacting with a BlobService now need to have an async runtime
available, the easiest way for this is to mark the test functions
with the tokio::test macro, allowing us to directly .await in the test
function.

In places where we don't have an async runtime available from context
(like tvix-cli), we can pass one down explicitly.

Now that we don't provide a sync interface anymore, the (sync) FUSE
library now holds a pointer to a tokio runtime handle, and needs to at
least have 2 threads available when talking to a blob service (which is
why some of the tests now use the multi_thread flavor).

The FUSE tests got a bit more verbose, as we couldn't use the
setup_and_mount function accepting a callback anymore. We can hopefully
move some of the test fixture setup to rstest in the future to make this
less repetitive.

Co-Authored-By: Connor Brewster <cbrewster@hey.com>
Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/blobservice/dumb_seeker.rs')
-rw-r--r--tvix/store/src/blobservice/dumb_seeker.rs110
1 files changed, 0 insertions, 110 deletions
diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs
deleted file mode 100644
index 6df4eb57f5f1..000000000000
--- a/tvix/store/src/blobservice/dumb_seeker.rs
+++ /dev/null
@@ -1,110 +0,0 @@
-use std::io;
-
-use tracing::{debug, instrument};
-
-use super::BlobReader;
-
-/// This implements [io::Seek] for and [io::Read] by simply skipping over some
-/// bytes, keeping track of the position.
-/// It fails whenever you try to seek backwards.
-pub struct DumbSeeker<R: io::Read> {
-    r: R,
-    pos: u64,
-}
-
-impl<R: io::Read> DumbSeeker<R> {
-    pub fn new(r: R) -> Self {
-        DumbSeeker { r, pos: 0 }
-    }
-}
-
-impl<R: io::Read> io::Read for DumbSeeker<R> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        let bytes_read = self.r.read(buf)?;
-
-        self.pos += bytes_read as u64;
-
-        Ok(bytes_read)
-    }
-}
-
-impl<R: io::Read> io::Seek for DumbSeeker<R> {
-    #[instrument(skip(self))]
-    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
-        let absolute_offset: u64 = match pos {
-            io::SeekFrom::Start(start_offset) => {
-                if start_offset < self.pos {
-                    return Err(io::Error::new(
-                        io::ErrorKind::Unsupported,
-                        format!("can't seek backwards ({} -> {})", self.pos, start_offset),
-                    ));
-                } else {
-                    start_offset
-                }
-            }
-            // we don't know the total size, can't support this.
-            io::SeekFrom::End(_end_offset) => {
-                return Err(io::Error::new(
-                    io::ErrorKind::Unsupported,
-                    "can't seek from end",
-                ));
-            }
-            io::SeekFrom::Current(relative_offset) => {
-                if relative_offset < 0 {
-                    return Err(io::Error::new(
-                        io::ErrorKind::Unsupported,
-                        "can't seek backwards relative to current position",
-                    ));
-                } else {
-                    self.pos + relative_offset as u64
-                }
-            }
-        };
-
-        debug!(absolute_offset=?absolute_offset, "seek");
-
-        // we already know absolute_offset is larger than self.pos
-        debug_assert!(
-            absolute_offset >= self.pos,
-            "absolute_offset {} is larger than self.pos {}",
-            absolute_offset,
-            self.pos
-        );
-
-        // calculate bytes to skip
-        let bytes_to_skip: u64 = absolute_offset - self.pos;
-
-        // discard these bytes. We can't use take() as it requires ownership of
-        // self.r, but we only have &mut self.
-        let mut buf = [0; 1024];
-        let mut bytes_skipped: u64 = 0;
-        while bytes_skipped < bytes_to_skip {
-            let len = std::cmp::min(bytes_to_skip - bytes_skipped, buf.len() as u64);
-            match self.r.read(&mut buf[..len as usize]) {
-                Ok(0) => break,
-                Ok(n) => bytes_skipped += n as u64,
-                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
-                Err(e) => return Err(e),
-            }
-        }
-
-        // This will fail when seeking past the end of self.r
-        if bytes_to_skip != bytes_skipped {
-            return Err(std::io::Error::new(
-                std::io::ErrorKind::UnexpectedEof,
-                format!(
-                    "tried to skip {} bytes, but only was able to skip {} until reaching EOF",
-                    bytes_to_skip, bytes_skipped
-                ),
-            ));
-        }
-
-        self.pos = absolute_offset;
-
-        // return the new position from the start of the stream
-        Ok(absolute_offset)
-    }
-}
-
-/// A Cursor<Vec<u8>> can be used as a BlobReader.
-impl<R: io::Read + Send + 'static> BlobReader for DumbSeeker<R> {}