about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-02-27T14·48+0100
committerflokli <flokli@flokli.de>2023-03-10T10·58+0000
commit535e1b15ab457c9015baebf14bd7b50a38edadb4 (patch)
treefb53e725fbd5543efa458f3ebc3d0f81731e17c1
parentb29d1ae372bb0794cc8425ced7986b3d059a2be5 (diff)
fix(tvix/store/proto/grpc_blobservice_wrapper): buffer recv data r/5933
While we don't want to keep all of the data in memory, we want to
feed a reasonably-enough buffer to the chunking function, to prevent
unnecessarily trying to chunk over and over again.

Change-Id: I5bbe2d55e8c1c63f8f7ce343889d374b528b559e
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8160
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs21
1 files changed, 14 insertions, 7 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index 52d0670307..eea837608b 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,6 +1,6 @@
 use crate::{blobservice::BlobService, chunkservice::ChunkService, Error};
 use data_encoding::BASE64;
-use std::io::Write;
+use std::io::{BufWriter, Write};
 use tokio::{sync::mpsc::channel, task};
 use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Status, Streaming};
@@ -165,14 +165,21 @@ impl<
     ) -> Result<Response<super::PutBlobResponse>, Status> {
         let mut req_inner = request.into_inner();
 
+        // instantiate a [BlobWriter] to write all data received with a client,
+        // but wrap it in a pretty large (1MiB) [BufWriter] to prevent
+        // excessive useless chunk attempts.
         let mut blob_writer = crate::BlobWriter::new(&self.chunk_service);
-
-        // receive data from the client, and keep writing it to the blob writer.
-        while let Some(blob_chunk) = req_inner.message().await? {
-            if let Err(e) = blob_writer.write_all(&blob_chunk.data) {
-                error!(e=%e,"unable to write blob data");
-                return Err(Status::internal("unable to write blob data"));
+        {
+            let mut blob_writer_buffered = BufWriter::with_capacity(1024 * 1024, &mut blob_writer);
+
+            // receive data from the client, and write them all to the blob_writer.
+            while let Some(blob_chunk) = req_inner.message().await? {
+                if let Err(e) = blob_writer_buffered.write_all(&blob_chunk.data) {
+                    error!(e=%e,"unable to write blob data");
+                    return Err(Status::internal("unable to write blob data"));
+                }
             }
+            blob_writer_buffered.flush()?;
         }
 
         // run finalize