diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.lock | 5 | ||||
-rw-r--r-- | tvix/Cargo.nix | 11 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 1 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 73 |
4 files changed, 50 insertions, 40 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index c5b34604bddf..3381f6a8d485 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -2458,9 +2458,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.4" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" dependencies = [ "bytes", "futures-core", @@ -2804,6 +2804,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "tonic", "tonic-build", "tonic-mock", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 2d130f42c8a4..363db976dd6c 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -7004,9 +7004,9 @@ rec { }; "tokio-util" = rec { crateName = "tokio-util"; - version = "0.7.4"; + version = "0.7.7"; edition = "2018"; - sha256 = "0h67jb56bsxy4pi1a41pda8d52569ci5clvqv3c6cg9vy1sy1chb"; + sha256 = "1cp6yx4789j6gvbp4xnbk7lpd7q0j2a2qd4g1pg2b4q0afadh9sl"; authors = [ "Tokio Contributors <team@tokio.rs>" ]; @@ -7062,7 +7062,7 @@ rec { "time" = [ "tokio/time" "slab" ]; "tracing" = [ "dep:tracing" ]; }; - resolvedDefaultFeatures = [ "codec" "default" "tracing" ]; + resolvedDefaultFeatures = [ "codec" "default" "io" "io-util" "tracing" ]; }; "toml" = rec { crateName = "toml"; @@ -8357,6 +8357,11 @@ rec { packageId = "tokio-stream"; } { + name = "tokio-util"; + packageId = "tokio-util"; + features = [ "io" "io-util" ]; + } + { name = "tonic"; packageId = "tonic"; } diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index ac495b63b83d..793c75a9d84e 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -26,6 +26,7 @@ tonic = "0.8.2" tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["json"] } walkdir = "2.3.2" +tokio-util = { version = "0.7.7", features = ["io", "io-util"] } [dependencies.tonic-reflection] optional = true diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 72eb6fe17727..f4d42503e598 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,14 +1,15 @@ +use std::collections::VecDeque; + use crate::{ blobservice::BlobService, - chunkservice::{update_hasher, ChunkService}, + chunkservice::{read_all_and_chunk, update_hasher, ChunkService}, Error, }; use data_encoding::BASE64; -use std::io::{BufWriter, Write}; use tokio::{sync::mpsc::channel, task}; -use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{debug, error, instrument, warn}; +use tracing::{debug, instrument, warn}; pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> { blob_service: BS, @@ -163,38 +164,40 @@ impl< &self, request: Request<Streaming<super::BlobChunk>>, ) -> Result<Response<super::PutBlobResponse>, Status> { - let mut req_inner = request.into_inner(); - - // instantiate a [BlobWriter] to write all data received with a client, - // but wrap it in a pretty large (1MiB) [BufWriter] to prevent - // excessive useless chunk attempts. - let mut blob_writer = crate::BlobWriter::new(&self.chunk_service); + let req_inner = request.into_inner(); + + let data_stream = req_inner.map(|x| { + x.map(|x| VecDeque::from(x.data)) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + }); + + let data_reader = tokio_util::io::StreamReader::new(data_stream); + + // TODO: can we get rid of this clone? + let chunk_service = self.chunk_service.clone(); + + let (blob_digest, blob_meta) = + task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> { + // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream. + read_all_and_chunk( + &chunk_service, + tokio_util::io::SyncIoBridge::new(data_reader), + ) + }) + .await + .map_err(|e| Status::internal(e.to_string()))??; + + // upload blobmeta if not there yet + if self + .blob_service + .stat(&super::StatBlobRequest { + digest: blob_digest.to_vec(), + include_chunks: false, + include_bao: false, + })? + .is_none() { - let mut blob_writer_buffered = BufWriter::with_capacity(1024 * 1024, &mut blob_writer); - - // receive data from the client, and write them all to the blob_writer. - while let Some(blob_chunk) = req_inner.message().await? { - if let Err(e) = blob_writer_buffered.write_all(&blob_chunk.data) { - error!(e=%e,"unable to write blob data"); - return Err(Status::internal("unable to write blob data")); - } - } - blob_writer_buffered.flush()?; - } - - // run finalize - let (blob_digest, blob_meta) = blob_writer - .finalize() - .map_err(|_| Status::internal("unable to finalize blob"))?; - - // check if we have the received blob in the [BlobService] already. - let resp = self.blob_service.stat(&super::StatBlobRequest { - digest: blob_digest.to_vec(), - ..Default::default() - })?; - - // if not, store. - if resp.is_none() { + // upload blobmeta self.blob_service.put(&blob_digest, blob_meta)?; } |