diff options
Diffstat (limited to 'tvix/store/src/proto')
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 53 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs | 10 | ||||
-rw-r--r-- | tvix/store/src/proto/mod.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/proto/sync_read_into_async_read.rs | 158 |
4 files changed, 27 insertions, 196 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 2d8c396539d8..8bd3083c17ad 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,4 +1,6 @@ -use crate::{blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead}; +use crate::blobservice::BlobService; +use core::pin::pin; +use futures::TryFutureExt; use std::{ collections::VecDeque, io, @@ -6,7 +8,6 @@ use std::{ pin::Pin, sync::Arc, }; -use tokio::task; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; @@ -103,7 +104,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { return Err(Status::internal("not implemented")); } - match self.blob_service.has(&req_digest) { + match self.blob_service.has(&req_digest).await { Ok(true) => Ok(Response::new(super::BlobMeta::default())), Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))), Err(e) => Err(e.into()), @@ -122,13 +123,8 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; - match self.blob_service.open_read(&req_digest) { + match self.blob_service.open_read(&req_digest).await { Ok(Some(reader)) => { - let async_reader: SyncReadIntoAsyncRead< - _, - BytesMutWithDefaultCapacity<{ 100 * 1024 }>, - > = reader.into(); - fn stream_mapper( x: Result<bytes::Bytes, io::Error>, ) -> Result<super::BlobChunk, Status> { @@ -138,7 +134,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { } } - let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper); + let chunks_stream = ReaderStream::new(reader).map(stream_mapper); Ok(Response::new(Box::pin(chunks_stream))) } Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), @@ -158,35 +154,28 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) }); - let data_reader = tokio_util::io::StreamReader::new(data_stream); - - // prepare a writer, which we'll use in the blocking task below. - let mut writer = self.blob_service.open_write(); + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); - let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> { - // construct a sync reader to the data - let mut reader = tokio_util::io::SyncIoBridge::new(data_reader); + let mut blob_writer = pin!(self.blob_service.open_write().await); - io::copy(&mut reader, &mut writer).map_err(|e| { + tokio::io::copy(&mut data_reader, &mut blob_writer) + .await + .map_err(|e| { warn!("error copying: {}", e); Status::internal("error copying") })?; - let digest = writer - .close() - .map_err(|e| { - warn!("error closing stream: {}", e); - Status::internal("error closing stream") - })? - .to_vec(); - - Ok(super::PutBlobResponse { - digest: digest.into(), + let digest = blob_writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") }) - }) - .await - .map_err(|_| Status::internal("failed to wait for task"))??; + .await? + .to_vec(); - Ok(Response::new(result)) + Ok(Response::new(super::PutBlobResponse { + digest: digest.into(), + })) } } diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 16a2fd51d0df..33861d9ffa4e 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -71,10 +71,12 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra match request.into_inner().node { None => Err(Status::invalid_argument("no root node sent")), Some(root_node) => { - let (nar_size, nar_sha256) = self - .path_info_service - .calculate_nar(&root_node) - .expect("error during nar calculation"); // TODO: handle error + let path_info_service = self.path_info_service.clone(); + let (nar_size, nar_sha256) = + task::spawn_blocking(move || path_info_service.calculate_nar(&root_node)) + .await + .unwrap() + .expect("error during nar calculation"); // TODO: handle error Ok(Response::new(proto::CalculateNarResponse { nar_size, diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index 044769ce579d..97a2694ac3de 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -12,8 +12,6 @@ mod grpc_blobservice_wrapper; mod grpc_directoryservice_wrapper; mod grpc_pathinfoservice_wrapper; -mod sync_read_into_async_read; - pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs deleted file mode 100644 index 0a0ef019781c..000000000000 --- a/tvix/store/src/proto/sync_read_into_async_read.rs +++ /dev/null @@ -1,158 +0,0 @@ -use bytes::Buf; -use core::task::Poll::Ready; -use futures::ready; -use futures::Future; -use std::io; -use std::io::Read; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Context; -use std::task::Poll; -use tokio::io::AsyncRead; -use tokio::runtime::Handle; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; - -#[derive(Debug)] -enum State<Buf: bytes::Buf + bytes::BufMut> { - Idle(Option<Buf>), - Busy(JoinHandle<(io::Result<usize>, Buf)>), -} - -use State::{Busy, Idle}; - -/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a -/// synchronous API. -#[derive(Debug)] -pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> { - state: Mutex<State<Buf>>, - reader: Arc<Mutex<R>>, - rt: Handle, -} - -impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> { - /// This must be called from within a Tokio runtime context, or else it will panic. - #[track_caller] - pub fn new(rt: Handle, reader: R) -> Self { - Self { - rt, - state: State::Idle(None).into(), - reader: Arc::new(reader.into()), - } - } - - /// This must be called from within a Tokio runtime context, or else it will panic. - pub fn new_with_reader(readable: R) -> Self { - Self::new(Handle::current(), readable) - } -} - -/// Repeats operations that are interrupted. -macro_rules! uninterruptibly { - ($e:expr) => {{ - loop { - match $e { - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - res => break res, - } - } - }}; -} - -impl< - R: Read + Send + 'static + std::marker::Unpin, - Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static, - > AsyncRead for SyncReadIntoAsyncRead<R, Buf> -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - dst: &mut tokio::io::ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - let me = self.get_mut(); - // Do we need this mutex? - let state = me.state.get_mut(); - - loop { - match state { - Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap_or_default(); - - if buf.has_remaining() { - // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]` - // The `rest` is stuffed into the `buf_cell` for further poll_read. - // The other is completely consumed into the unfilled destination. - // `rest` can be empty. - let mut adjusted_src = - buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining())); - let copied_size = adjusted_src.remaining(); - adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size)); - dst.set_filled(copied_size); - *buf_cell = Some(buf); - return Ready(Ok(())); - } - - let reader = me.reader.clone(); - *state = Busy(me.rt.spawn_blocking(move || { - let result = uninterruptibly!(reader.blocking_lock().read( - // SAFETY: `reader.read` will *ONLY* write initialized bytes - // and never *READ* uninitialized bytes - // inside this buffer. - // - // Furthermore, casting the slice as `*mut [u8]` - // is safe because it has the same layout. - // - // Finally, the pointer obtained is valid and owned - // by `buf` only as we have a valid mutable reference - // to it, it is valid for write. - // - // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998 - unsafe { - &mut *(buf.chunk_mut().as_uninit_slice_mut() - as *mut [std::mem::MaybeUninit<u8>] - as *mut [u8]) - } - )); - - if let Ok(n) = result { - // SAFETY: given we initialize `n` bytes, we can move `n` bytes - // forward. - unsafe { - buf.advance_mut(n); - } - } - - (result, buf) - })); - } - Busy(ref mut rx) => { - let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?; - - match result { - Ok(n) => { - if n > 0 { - let remaining = std::cmp::min(n, dst.remaining()); - let mut adjusted_src = buf.copy_to_bytes(remaining); - adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining)); - dst.advance(remaining); - } - *state = Idle(Some(buf)); - return Ready(Ok(())); - } - Err(e) => { - *state = Idle(None); - return Ready(Err(e)); - } - } - } - } - } - } -} - -impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> { - /// This must be called from within a Tokio runtime context, or else it will panic. - fn from(value: R) -> Self { - Self::new_with_reader(value) - } -} |