diff options
Diffstat (limited to 'tvix/store/src/proto/grpc_blobservice_wrapper.rs')
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 53 |
1 files changed, 21 insertions, 32 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 2d8c396539d8..8bd3083c17ad 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,4 +1,6 @@ -use crate::{blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead}; +use crate::blobservice::BlobService; +use core::pin::pin; +use futures::TryFutureExt; use std::{ collections::VecDeque, io, @@ -6,7 +8,6 @@ use std::{ pin::Pin, sync::Arc, }; -use tokio::task; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; @@ -103,7 +104,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { return Err(Status::internal("not implemented")); } - match self.blob_service.has(&req_digest) { + match self.blob_service.has(&req_digest).await { Ok(true) => Ok(Response::new(super::BlobMeta::default())), Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))), Err(e) => Err(e.into()), @@ -122,13 +123,8 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; - match self.blob_service.open_read(&req_digest) { + match self.blob_service.open_read(&req_digest).await { Ok(Some(reader)) => { - let async_reader: SyncReadIntoAsyncRead< - _, - BytesMutWithDefaultCapacity<{ 100 * 1024 }>, - > = reader.into(); - fn stream_mapper( x: Result<bytes::Bytes, io::Error>, ) -> Result<super::BlobChunk, Status> { @@ -138,7 +134,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { } } - let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper); + let chunks_stream = ReaderStream::new(reader).map(stream_mapper); Ok(Response::new(Box::pin(chunks_stream))) } Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), @@ -158,35 +154,28 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) }); - let data_reader = tokio_util::io::StreamReader::new(data_stream); - - // prepare a writer, which we'll use in the blocking task below. - let mut writer = self.blob_service.open_write(); + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); - let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> { - // construct a sync reader to the data - let mut reader = tokio_util::io::SyncIoBridge::new(data_reader); + let mut blob_writer = pin!(self.blob_service.open_write().await); - io::copy(&mut reader, &mut writer).map_err(|e| { + tokio::io::copy(&mut data_reader, &mut blob_writer) + .await + .map_err(|e| { warn!("error copying: {}", e); Status::internal("error copying") })?; - let digest = writer - .close() - .map_err(|e| { - warn!("error closing stream: {}", e); - Status::internal("error closing stream") - })? - .to_vec(); - - Ok(super::PutBlobResponse { - digest: digest.into(), + let digest = blob_writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") }) - }) - .await - .map_err(|_| Status::internal("failed to wait for task"))??; + .await? + .to_vec(); - Ok(Response::new(result)) + Ok(Response::new(super::PutBlobResponse { + digest: digest.into(), + })) } } |