From da6cbb4a459d02111c44a67d3d0dd7e654abff23 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Wed, 13 Sep 2023 14:20:21 +0200 Subject: refactor(tvix/store/blobsvc): make BlobStore async We previously kept the trait of a BlobService sync. This however had some annoying consequences: - It became more and more complicated to track when we're in a context with an async runtime in the context or not, producing bugs like https://b.tvl.fyi/issues/304 - The sync trait shielded away async clients from async worloads, requiring manual block_on code inside the gRPC client code, and spawn_blocking calls in consumers of the trait, even if they were async (like the gRPC server) - We had to write our own custom glue code (SyncReadIntoAsyncRead) to convert a sync io::Read into a tokio::io::AsyncRead, which already existed in tokio internally, but upstream ia hesitant to expose. This now makes the BlobService trait async (via the async_trait macro, like we already do in various gRPC parts), and replaces the sync readers and writers with their async counterparts. Tests interacting with a BlobService now need to have an async runtime available, the easiest way for this is to mark the test functions with the tokio::test macro, allowing us to directly .await in the test function. In places where we don't have an async runtime available from context (like tvix-cli), we can pass one down explicitly. Now that we don't provide a sync interface anymore, the (sync) FUSE library now holds a pointer to a tokio runtime handle, and needs to at least have 2 threads available when talking to a blob service (which is why some of the tests now use the multi_thread flavor). The FUSE tests got a bit more verbose, as we couldn't use the setup_and_mount function accepting a callback anymore. We can hopefully move some of the test fixture setup to rstest in the future to make this less repetitive. Co-Authored-By: Connor Brewster Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329 Reviewed-by: Connor Brewster Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/store/src/proto/grpc_blobservice_wrapper.rs | 53 ++++++++++-------------- 1 file changed, 21 insertions(+), 32 deletions(-) (limited to 'tvix/store/src/proto/grpc_blobservice_wrapper.rs') diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 2d8c396539d8..8bd3083c17ad 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,4 +1,6 @@ -use crate::{blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead}; +use crate::blobservice::BlobService; +use core::pin::pin; +use futures::TryFutureExt; use std::{ collections::VecDeque, io, @@ -6,7 +8,6 @@ use std::{ pin::Pin, sync::Arc, }; -use tokio::task; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; @@ -103,7 +104,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { return Err(Status::internal("not implemented")); } - match self.blob_service.has(&req_digest) { + match self.blob_service.has(&req_digest).await { Ok(true) => Ok(Response::new(super::BlobMeta::default())), Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))), Err(e) => Err(e.into()), @@ -122,13 +123,8 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; - match self.blob_service.open_read(&req_digest) { + match self.blob_service.open_read(&req_digest).await { Ok(Some(reader)) => { - let async_reader: SyncReadIntoAsyncRead< - _, - BytesMutWithDefaultCapacity<{ 100 * 1024 }>, - > = reader.into(); - fn stream_mapper( x: Result, ) -> Result { @@ -138,7 +134,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { } } - let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper); + let chunks_stream = ReaderStream::new(reader).map(stream_mapper); Ok(Response::new(Box::pin(chunks_stream))) } Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), @@ -158,35 +154,28 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) }); - let data_reader = tokio_util::io::StreamReader::new(data_stream); - - // prepare a writer, which we'll use in the blocking task below. - let mut writer = self.blob_service.open_write(); + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); - let result = task::spawn_blocking(move || -> Result { - // construct a sync reader to the data - let mut reader = tokio_util::io::SyncIoBridge::new(data_reader); + let mut blob_writer = pin!(self.blob_service.open_write().await); - io::copy(&mut reader, &mut writer).map_err(|e| { + tokio::io::copy(&mut data_reader, &mut blob_writer) + .await + .map_err(|e| { warn!("error copying: {}", e); Status::internal("error copying") })?; - let digest = writer - .close() - .map_err(|e| { - warn!("error closing stream: {}", e); - Status::internal("error closing stream") - })? - .to_vec(); - - Ok(super::PutBlobResponse { - digest: digest.into(), + let digest = blob_writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") }) - }) - .await - .map_err(|_| Status::internal("failed to wait for task"))??; + .await? + .to_vec(); - Ok(Response::new(result)) + Ok(Response::new(super::PutBlobResponse { + digest: digest.into(), + })) } } -- cgit 1.4.1