about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
Diffstat (limited to 'tvix')
-rw-r--r--tvix/Cargo.lock5
-rw-r--r--tvix/Cargo.nix11
-rw-r--r--tvix/store/Cargo.toml1
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs73
4 files changed, 50 insertions, 40 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index c5b34604bddf..3381f6a8d485 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -2458,9 +2458,9 @@ dependencies = [
 
 [[package]]
 name = "tokio-util"
-version = "0.7.4"
+version = "0.7.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
+checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
 dependencies = [
  "bytes",
  "futures-core",
@@ -2804,6 +2804,7 @@ dependencies = [
  "thiserror",
  "tokio",
  "tokio-stream",
+ "tokio-util",
  "tonic",
  "tonic-build",
  "tonic-mock",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 2d130f42c8a4..363db976dd6c 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -7004,9 +7004,9 @@ rec {
       };
       "tokio-util" = rec {
         crateName = "tokio-util";
-        version = "0.7.4";
+        version = "0.7.7";
         edition = "2018";
-        sha256 = "0h67jb56bsxy4pi1a41pda8d52569ci5clvqv3c6cg9vy1sy1chb";
+        sha256 = "1cp6yx4789j6gvbp4xnbk7lpd7q0j2a2qd4g1pg2b4q0afadh9sl";
         authors = [
           "Tokio Contributors <team@tokio.rs>"
         ];
@@ -7062,7 +7062,7 @@ rec {
           "time" = [ "tokio/time" "slab" ];
           "tracing" = [ "dep:tracing" ];
         };
-        resolvedDefaultFeatures = [ "codec" "default" "tracing" ];
+        resolvedDefaultFeatures = [ "codec" "default" "io" "io-util" "tracing" ];
       };
       "toml" = rec {
         crateName = "toml";
@@ -8357,6 +8357,11 @@ rec {
             packageId = "tokio-stream";
           }
           {
+            name = "tokio-util";
+            packageId = "tokio-util";
+            features = [ "io" "io-util" ];
+          }
+          {
             name = "tonic";
             packageId = "tonic";
           }
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index ac495b63b83d..793c75a9d84e 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -26,6 +26,7 @@ tonic = "0.8.2"
 tracing = "0.1.37"
 tracing-subscriber = { version = "0.3.16", features = ["json"] }
 walkdir = "2.3.2"
+tokio-util = { version = "0.7.7", features = ["io", "io-util"] }
 
 [dependencies.tonic-reflection]
 optional = true
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index 72eb6fe17727..f4d42503e598 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,14 +1,15 @@
+use std::collections::VecDeque;
+
 use crate::{
     blobservice::BlobService,
-    chunkservice::{update_hasher, ChunkService},
+    chunkservice::{read_all_and_chunk, update_hasher, ChunkService},
     Error,
 };
 use data_encoding::BASE64;
-use std::io::{BufWriter, Write};
 use tokio::{sync::mpsc::channel, task};
-use tokio_stream::wrappers::ReceiverStream;
+use tokio_stream::{wrappers::ReceiverStream, StreamExt};
 use tonic::{async_trait, Request, Response, Status, Streaming};
-use tracing::{debug, error, instrument, warn};
+use tracing::{debug, instrument, warn};
 
 pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
     blob_service: BS,
@@ -163,38 +164,40 @@ impl<
         &self,
         request: Request<Streaming<super::BlobChunk>>,
     ) -> Result<Response<super::PutBlobResponse>, Status> {
-        let mut req_inner = request.into_inner();
-
-        // instantiate a [BlobWriter] to write all data received with a client,
-        // but wrap it in a pretty large (1MiB) [BufWriter] to prevent
-        // excessive useless chunk attempts.
-        let mut blob_writer = crate::BlobWriter::new(&self.chunk_service);
+        let req_inner = request.into_inner();
+
+        let data_stream = req_inner.map(|x| {
+            x.map(|x| VecDeque::from(x.data))
+                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
+        });
+
+        let data_reader = tokio_util::io::StreamReader::new(data_stream);
+
+        // TODO: can we get rid of this clone?
+        let chunk_service = self.chunk_service.clone();
+
+        let (blob_digest, blob_meta) =
+            task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> {
+                // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream.
+                read_all_and_chunk(
+                    &chunk_service,
+                    tokio_util::io::SyncIoBridge::new(data_reader),
+                )
+            })
+            .await
+            .map_err(|e| Status::internal(e.to_string()))??;
+
+        // upload blobmeta if not there yet
+        if self
+            .blob_service
+            .stat(&super::StatBlobRequest {
+                digest: blob_digest.to_vec(),
+                include_chunks: false,
+                include_bao: false,
+            })?
+            .is_none()
         {
-            let mut blob_writer_buffered = BufWriter::with_capacity(1024 * 1024, &mut blob_writer);
-
-            // receive data from the client, and write them all to the blob_writer.
-            while let Some(blob_chunk) = req_inner.message().await? {
-                if let Err(e) = blob_writer_buffered.write_all(&blob_chunk.data) {
-                    error!(e=%e,"unable to write blob data");
-                    return Err(Status::internal("unable to write blob data"));
-                }
-            }
-            blob_writer_buffered.flush()?;
-        }
-
-        // run finalize
-        let (blob_digest, blob_meta) = blob_writer
-            .finalize()
-            .map_err(|_| Status::internal("unable to finalize blob"))?;
-
-        // check if we have the received blob in the [BlobService] already.
-        let resp = self.blob_service.stat(&super::StatBlobRequest {
-            digest: blob_digest.to_vec(),
-            ..Default::default()
-        })?;
-
-        // if not, store.
-        if resp.is_none() {
+            // upload blobmeta
             self.blob_service.put(&blob_digest, blob_meta)?;
         }