diff options
author | Florian Klink <flokli@flokli.de> | 2023-09-13T12·20+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-09-18T10·33+0000 |
commit | da6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch) | |
tree | 5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/store/src/blobservice/dumb_seeker.rs | |
parent | 3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff) |
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync. This however had some annoying consequences: - It became more and more complicated to track when we're in a context with an async runtime in the context or not, producing bugs like https://b.tvl.fyi/issues/304 - The sync trait shielded away async clients from async worloads, requiring manual block_on code inside the gRPC client code, and spawn_blocking calls in consumers of the trait, even if they were async (like the gRPC server) - We had to write our own custom glue code (SyncReadIntoAsyncRead) to convert a sync io::Read into a tokio::io::AsyncRead, which already existed in tokio internally, but upstream ia hesitant to expose. This now makes the BlobService trait async (via the async_trait macro, like we already do in various gRPC parts), and replaces the sync readers and writers with their async counterparts. Tests interacting with a BlobService now need to have an async runtime available, the easiest way for this is to mark the test functions with the tokio::test macro, allowing us to directly .await in the test function. In places where we don't have an async runtime available from context (like tvix-cli), we can pass one down explicitly. Now that we don't provide a sync interface anymore, the (sync) FUSE library now holds a pointer to a tokio runtime handle, and needs to at least have 2 threads available when talking to a blob service (which is why some of the tests now use the multi_thread flavor). The FUSE tests got a bit more verbose, as we couldn't use the setup_and_mount function accepting a callback anymore. We can hopefully move some of the test fixture setup to rstest in the future to make this less repetitive. Co-Authored-By: Connor Brewster <cbrewster@hey.com> Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329 Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/blobservice/dumb_seeker.rs')
-rw-r--r-- | tvix/store/src/blobservice/dumb_seeker.rs | 110 |
1 files changed, 0 insertions, 110 deletions
diff --git a/tvix/store/src/blobservice/dumb_seeker.rs b/tvix/store/src/blobservice/dumb_seeker.rs deleted file mode 100644 index 6df4eb57f5f1..000000000000 --- a/tvix/store/src/blobservice/dumb_seeker.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::io; - -use tracing::{debug, instrument}; - -use super::BlobReader; - -/// This implements [io::Seek] for and [io::Read] by simply skipping over some -/// bytes, keeping track of the position. -/// It fails whenever you try to seek backwards. -pub struct DumbSeeker<R: io::Read> { - r: R, - pos: u64, -} - -impl<R: io::Read> DumbSeeker<R> { - pub fn new(r: R) -> Self { - DumbSeeker { r, pos: 0 } - } -} - -impl<R: io::Read> io::Read for DumbSeeker<R> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - let bytes_read = self.r.read(buf)?; - - self.pos += bytes_read as u64; - - Ok(bytes_read) - } -} - -impl<R: io::Read> io::Seek for DumbSeeker<R> { - #[instrument(skip(self))] - fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> { - let absolute_offset: u64 = match pos { - io::SeekFrom::Start(start_offset) => { - if start_offset < self.pos { - return Err(io::Error::new( - io::ErrorKind::Unsupported, - format!("can't seek backwards ({} -> {})", self.pos, start_offset), - )); - } else { - start_offset - } - } - // we don't know the total size, can't support this. - io::SeekFrom::End(_end_offset) => { - return Err(io::Error::new( - io::ErrorKind::Unsupported, - "can't seek from end", - )); - } - io::SeekFrom::Current(relative_offset) => { - if relative_offset < 0 { - return Err(io::Error::new( - io::ErrorKind::Unsupported, - "can't seek backwards relative to current position", - )); - } else { - self.pos + relative_offset as u64 - } - } - }; - - debug!(absolute_offset=?absolute_offset, "seek"); - - // we already know absolute_offset is larger than self.pos - debug_assert!( - absolute_offset >= self.pos, - "absolute_offset {} is larger than self.pos {}", - absolute_offset, - self.pos - ); - - // calculate bytes to skip - let bytes_to_skip: u64 = absolute_offset - self.pos; - - // discard these bytes. We can't use take() as it requires ownership of - // self.r, but we only have &mut self. - let mut buf = [0; 1024]; - let mut bytes_skipped: u64 = 0; - while bytes_skipped < bytes_to_skip { - let len = std::cmp::min(bytes_to_skip - bytes_skipped, buf.len() as u64); - match self.r.read(&mut buf[..len as usize]) { - Ok(0) => break, - Ok(n) => bytes_skipped += n as u64, - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - } - - // This will fail when seeking past the end of self.r - if bytes_to_skip != bytes_skipped { - return Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - format!( - "tried to skip {} bytes, but only was able to skip {} until reaching EOF", - bytes_to_skip, bytes_skipped - ), - )); - } - - self.pos = absolute_offset; - - // return the new position from the start of the stream - Ok(absolute_offset) - } -} - -/// A Cursor<Vec<u8>> can be used as a BlobReader. -impl<R: io::Read + Send + 'static> BlobReader for DumbSeeker<R> {} |