diff options
author | Florian Klink <flokli@flokli.de> | 2023-02-27T14·48+0100 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-03-10T10·58+0000 |
commit | 535e1b15ab457c9015baebf14bd7b50a38edadb4 (patch) | |
tree | fb53e725fbd5543efa458f3ebc3d0f81731e17c1 /tvix | |
parent | b29d1ae372bb0794cc8425ced7986b3d059a2be5 (diff) |
fix(tvix/store/proto/grpc_blobservice_wrapper): buffer recv data r/5933
While we don't want to keep all of the data in memory, we want to feed a reasonably-enough buffer to the chunking function, to prevent unnecessarily trying to chunk over and over again. Change-Id: I5bbe2d55e8c1c63f8f7ce343889d374b528b559e Reviewed-on: https://cl.tvl.fyi/c/depot/+/8160 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 52d067030799..eea837608b55 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,6 +1,6 @@ use crate::{blobservice::BlobService, chunkservice::ChunkService, Error}; use data_encoding::BASE64; -use std::io::Write; +use std::io::{BufWriter, Write}; use tokio::{sync::mpsc::channel, task}; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status, Streaming}; @@ -165,14 +165,21 @@ impl< ) -> Result<Response<super::PutBlobResponse>, Status> { let mut req_inner = request.into_inner(); + // instantiate a [BlobWriter] to write all data received with a client, + // but wrap it in a pretty large (1MiB) [BufWriter] to prevent + // excessive useless chunk attempts. let mut blob_writer = crate::BlobWriter::new(&self.chunk_service); - - // receive data from the client, and keep writing it to the blob writer. - while let Some(blob_chunk) = req_inner.message().await? { - if let Err(e) = blob_writer.write_all(&blob_chunk.data) { - error!(e=%e,"unable to write blob data"); - return Err(Status::internal("unable to write blob data")); + { + let mut blob_writer_buffered = BufWriter::with_capacity(1024 * 1024, &mut blob_writer); + + // receive data from the client, and write them all to the blob_writer. + while let Some(blob_chunk) = req_inner.message().await? { + if let Err(e) = blob_writer_buffered.write_all(&blob_chunk.data) { + error!(e=%e,"unable to write blob data"); + return Err(Status::internal("unable to write blob data")); + } } + blob_writer_buffered.flush()?; } // run finalize |