diff options
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/Cargo.toml | 1 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 73 |
2 files changed, 39 insertions, 35 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index ac495b63b83d..793c75a9d84e 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -26,6 +26,7 @@ tonic = "0.8.2" tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["json"] } walkdir = "2.3.2" +tokio-util = { version = "0.7.7", features = ["io", "io-util"] } [dependencies.tonic-reflection] optional = true diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 72eb6fe17727..f4d42503e598 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,14 +1,15 @@ +use std::collections::VecDeque; + use crate::{ blobservice::BlobService, - chunkservice::{update_hasher, ChunkService}, + chunkservice::{read_all_and_chunk, update_hasher, ChunkService}, Error, }; use data_encoding::BASE64; -use std::io::{BufWriter, Write}; use tokio::{sync::mpsc::channel, task}; -use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{debug, error, instrument, warn}; +use tracing::{debug, instrument, warn}; pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> { blob_service: BS, @@ -163,38 +164,40 @@ impl< &self, request: Request<Streaming<super::BlobChunk>>, ) -> Result<Response<super::PutBlobResponse>, Status> { - let mut req_inner = request.into_inner(); - - // instantiate a [BlobWriter] to write all data received with a client, - // but wrap it in a pretty large (1MiB) [BufWriter] to prevent - // excessive useless chunk attempts. - let mut blob_writer = crate::BlobWriter::new(&self.chunk_service); + let req_inner = request.into_inner(); + + let data_stream = req_inner.map(|x| { + x.map(|x| VecDeque::from(x.data)) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + }); + + let data_reader = tokio_util::io::StreamReader::new(data_stream); + + // TODO: can we get rid of this clone? + let chunk_service = self.chunk_service.clone(); + + let (blob_digest, blob_meta) = + task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> { + // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream. + read_all_and_chunk( + &chunk_service, + tokio_util::io::SyncIoBridge::new(data_reader), + ) + }) + .await + .map_err(|e| Status::internal(e.to_string()))??; + + // upload blobmeta if not there yet + if self + .blob_service + .stat(&super::StatBlobRequest { + digest: blob_digest.to_vec(), + include_chunks: false, + include_bao: false, + })? + .is_none() { - let mut blob_writer_buffered = BufWriter::with_capacity(1024 * 1024, &mut blob_writer); - - // receive data from the client, and write them all to the blob_writer. - while let Some(blob_chunk) = req_inner.message().await? { - if let Err(e) = blob_writer_buffered.write_all(&blob_chunk.data) { - error!(e=%e,"unable to write blob data"); - return Err(Status::internal("unable to write blob data")); - } - } - blob_writer_buffered.flush()?; - } - - // run finalize - let (blob_digest, blob_meta) = blob_writer - .finalize() - .map_err(|_| Status::internal("unable to finalize blob"))?; - - // check if we have the received blob in the [BlobService] already. - let resp = self.blob_service.stat(&super::StatBlobRequest { - digest: blob_digest.to_vec(), - ..Default::default() - })?; - - // if not, store. - if resp.is_none() { + // upload blobmeta self.blob_service.put(&blob_digest, blob_meta)?; } |