diff options
author | Florian Klink <flokli@flokli.de> | 2023-09-13T12·20+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-09-18T10·33+0000 |
commit | da6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch) | |
tree | 5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/store/src/proto/sync_read_into_async_read.rs | |
parent | 3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff) |
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync. This however had some annoying consequences: - It became more and more complicated to track when we're in a context with an async runtime in the context or not, producing bugs like https://b.tvl.fyi/issues/304 - The sync trait shielded away async clients from async worloads, requiring manual block_on code inside the gRPC client code, and spawn_blocking calls in consumers of the trait, even if they were async (like the gRPC server) - We had to write our own custom glue code (SyncReadIntoAsyncRead) to convert a sync io::Read into a tokio::io::AsyncRead, which already existed in tokio internally, but upstream ia hesitant to expose. This now makes the BlobService trait async (via the async_trait macro, like we already do in various gRPC parts), and replaces the sync readers and writers with their async counterparts. Tests interacting with a BlobService now need to have an async runtime available, the easiest way for this is to mark the test functions with the tokio::test macro, allowing us to directly .await in the test function. In places where we don't have an async runtime available from context (like tvix-cli), we can pass one down explicitly. Now that we don't provide a sync interface anymore, the (sync) FUSE library now holds a pointer to a tokio runtime handle, and needs to at least have 2 threads available when talking to a blob service (which is why some of the tests now use the multi_thread flavor). The FUSE tests got a bit more verbose, as we couldn't use the setup_and_mount function accepting a callback anymore. We can hopefully move some of the test fixture setup to rstest in the future to make this less repetitive. Co-Authored-By: Connor Brewster <cbrewster@hey.com> Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329 Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/proto/sync_read_into_async_read.rs')
-rw-r--r-- | tvix/store/src/proto/sync_read_into_async_read.rs | 158 |
1 files changed, 0 insertions, 158 deletions
diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs deleted file mode 100644 index 0a0ef019781c..000000000000 --- a/tvix/store/src/proto/sync_read_into_async_read.rs +++ /dev/null @@ -1,158 +0,0 @@ -use bytes::Buf; -use core::task::Poll::Ready; -use futures::ready; -use futures::Future; -use std::io; -use std::io::Read; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Context; -use std::task::Poll; -use tokio::io::AsyncRead; -use tokio::runtime::Handle; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; - -#[derive(Debug)] -enum State<Buf: bytes::Buf + bytes::BufMut> { - Idle(Option<Buf>), - Busy(JoinHandle<(io::Result<usize>, Buf)>), -} - -use State::{Busy, Idle}; - -/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a -/// synchronous API. -#[derive(Debug)] -pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> { - state: Mutex<State<Buf>>, - reader: Arc<Mutex<R>>, - rt: Handle, -} - -impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> { - /// This must be called from within a Tokio runtime context, or else it will panic. - #[track_caller] - pub fn new(rt: Handle, reader: R) -> Self { - Self { - rt, - state: State::Idle(None).into(), - reader: Arc::new(reader.into()), - } - } - - /// This must be called from within a Tokio runtime context, or else it will panic. - pub fn new_with_reader(readable: R) -> Self { - Self::new(Handle::current(), readable) - } -} - -/// Repeats operations that are interrupted. -macro_rules! uninterruptibly { - ($e:expr) => {{ - loop { - match $e { - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - res => break res, - } - } - }}; -} - -impl< - R: Read + Send + 'static + std::marker::Unpin, - Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static, - > AsyncRead for SyncReadIntoAsyncRead<R, Buf> -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - dst: &mut tokio::io::ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - let me = self.get_mut(); - // Do we need this mutex? - let state = me.state.get_mut(); - - loop { - match state { - Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap_or_default(); - - if buf.has_remaining() { - // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]` - // The `rest` is stuffed into the `buf_cell` for further poll_read. - // The other is completely consumed into the unfilled destination. - // `rest` can be empty. - let mut adjusted_src = - buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining())); - let copied_size = adjusted_src.remaining(); - adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size)); - dst.set_filled(copied_size); - *buf_cell = Some(buf); - return Ready(Ok(())); - } - - let reader = me.reader.clone(); - *state = Busy(me.rt.spawn_blocking(move || { - let result = uninterruptibly!(reader.blocking_lock().read( - // SAFETY: `reader.read` will *ONLY* write initialized bytes - // and never *READ* uninitialized bytes - // inside this buffer. - // - // Furthermore, casting the slice as `*mut [u8]` - // is safe because it has the same layout. - // - // Finally, the pointer obtained is valid and owned - // by `buf` only as we have a valid mutable reference - // to it, it is valid for write. - // - // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998 - unsafe { - &mut *(buf.chunk_mut().as_uninit_slice_mut() - as *mut [std::mem::MaybeUninit<u8>] - as *mut [u8]) - } - )); - - if let Ok(n) = result { - // SAFETY: given we initialize `n` bytes, we can move `n` bytes - // forward. - unsafe { - buf.advance_mut(n); - } - } - - (result, buf) - })); - } - Busy(ref mut rx) => { - let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?; - - match result { - Ok(n) => { - if n > 0 { - let remaining = std::cmp::min(n, dst.remaining()); - let mut adjusted_src = buf.copy_to_bytes(remaining); - adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining)); - dst.advance(remaining); - } - *state = Idle(Some(buf)); - return Ready(Ok(())); - } - Err(e) => { - *state = Idle(None); - return Ready(Err(e)); - } - } - } - } - } - } -} - -impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> { - /// This must be called from within a Tokio runtime context, or else it will panic. - fn from(value: R) -> Self { - Self::new_with_reader(value) - } -} |