about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
Diffstat (limited to 'tvix')
-rw-r--r--tvix/store/src/blobwriter.rs149
-rw-r--r--tvix/store/src/errors.rs10
-rw-r--r--tvix/store/src/lib.rs2
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs87
4 files changed, 172 insertions, 76 deletions
diff --git a/tvix/store/src/blobwriter.rs b/tvix/store/src/blobwriter.rs
new file mode 100644
index 000000000000..8cb09ecc6d3c
--- /dev/null
+++ b/tvix/store/src/blobwriter.rs
@@ -0,0 +1,149 @@
+use crate::chunkservice::ChunkService;
+use crate::{proto, Error};
+use tracing::{debug, instrument};
+
+pub struct BlobWriter<'a, CS: ChunkService> {
+    chunk_service: &'a CS,
+
+    blob_hasher: blake3::Hasher,
+
+    blob_meta: proto::BlobMeta,
+
+    // filled with data from previous writes that didn't end up producing a full chunk.
+    buf: Vec<u8>,
+}
+
+impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
+    pub fn new(chunk_service: &'a CS) -> Self {
+        Self {
+            chunk_service,
+            blob_hasher: blake3::Hasher::new(),
+            blob_meta: proto::BlobMeta::default(),
+            buf: vec![],
+        }
+    }
+
+    // Return the digest of the blob, as well as the blobmeta containing info of all the chunks,
+    // or an error if there's still bytes left to be flushed.
+    // In case there was still data to be written a last unfinalized chunk,
+    // it's written as well.
+    #[instrument(skip(self))]
+    pub fn finalize(&mut self) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
+        // If there's something still left in self.buf, upload this as the last
+        // chunk to the chunk service and record it in BlobMeta.
+        if !self.buf.is_empty() {
+            // Also upload the last chunk (what's left in `self.buf`) to the chunk
+            // service and record it in BlobMeta.
+            let buf_len = self.buf.len() as u32;
+            let chunk_digest = self.upload_chunk(self.buf.clone())?;
+
+            self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
+                digest: chunk_digest,
+                size: buf_len,
+            });
+
+            self.buf.clear();
+        }
+        return Ok((
+            self.blob_hasher.finalize().as_bytes().to_vec(),
+            self.blob_meta.clone(),
+        ));
+    }
+
+    // upload a chunk to the chunk service, and return its digest (or an error) when done.
+    #[instrument(skip(self, chunk_data))]
+    fn upload_chunk(&mut self, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
+        let mut hasher = blake3::Hasher::new();
+        if chunk_data.len() >= 128 * 1024 {
+            hasher.update_rayon(&chunk_data);
+        } else {
+            hasher.update(&chunk_data);
+        }
+        let digest = hasher.finalize();
+
+        if self.chunk_service.has(digest.as_bytes())? {
+            debug!("already has chunk, skipping");
+        }
+        let digest_resp = self.chunk_service.put(chunk_data)?;
+
+        assert_eq!(digest_resp, digest.as_bytes());
+
+        Ok(digest.as_bytes().to_vec())
+    }
+}
+
+/// This chunks up all data written using fastcdc, uploads all chunks to the
+// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
+impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> {
+    fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
+        // calculate input_buf.len(), we need to return that later.
+        let input_buf_len = input_buf.len();
+
+        // update calculate blob hash, and use rayon if data is > 128KiB.
+        if input_buf.len() > 128 * 1024 {
+            self.blob_hasher.update_rayon(input_buf);
+        } else {
+            self.blob_hasher.update(input_buf);
+        }
+
+        // prepend buf with existing data (from self.buf)
+        let buf: Vec<u8> = {
+            let mut b = Vec::new();
+            b.append(&mut self.buf);
+            b.append(&mut input_buf.to_vec());
+            b
+        };
+
+        // TODO: play with chunking sizes
+        let chunker_avg_size = 64 * 1024;
+        let chunker_min_size = chunker_avg_size / 4;
+        let chunker_max_size = chunker_avg_size * 4;
+
+        // initialize a chunker with the buffer
+        let chunker = fastcdc::v2020::FastCDC::new(
+            &buf,
+            chunker_min_size,
+            chunker_avg_size,
+            chunker_max_size,
+        );
+
+        // ask the chunker for cutting points in the buffer.
+        let mut start_pos = 0_usize;
+        let rest = loop {
+            // ask the chunker for the next cutting point.
+            let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
+
+            // whenever the last cut point is pointing to the end of the buffer,
+            // keep that chunk left in there.
+            // We don't know if the chunker decided to cut here simply because it was
+            // at the end of the buffer, or if it would also cut if there
+            // were more data.
+            //
+            // Split off all previous chunks and keep this chunk data in the buffer.
+            if end_pos == buf.len() {
+                break buf[start_pos..].to_vec();
+            }
+
+            // Upload that chunk to the chunk service and record it in BlobMeta.
+            // TODO: make upload_chunk async and upload concurrently?
+            let chunk_data = &buf[start_pos..end_pos];
+            let chunk_digest = self.upload_chunk(chunk_data.to_vec())?;
+
+            self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
+                digest: chunk_digest,
+                size: chunk_data.len() as u32,
+            });
+
+            // move start_pos over the processed chunk.
+            start_pos = end_pos;
+        };
+
+        self.buf = rest;
+
+        Ok(input_buf_len)
+    }
+
+    fn flush(&mut self) -> std::io::Result<()> {
+        Ok(())
+    }
+}
diff --git a/tvix/store/src/errors.rs b/tvix/store/src/errors.rs
index 25e87c8aa239..36a4320cb3e6 100644
--- a/tvix/store/src/errors.rs
+++ b/tvix/store/src/errors.rs
@@ -26,3 +26,13 @@ impl From<Error> for Status {
         }
     }
 }
+
+// TODO: this should probably go somewhere else?
+impl From<Error> for std::io::Error {
+    fn from(value: Error) -> Self {
+        match value {
+            Error::InvalidRequest(msg) => Self::new(std::io::ErrorKind::InvalidInput, msg),
+            Error::StorageError(msg) => Self::new(std::io::ErrorKind::Other, msg),
+        }
+    }
+}
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index f294e39ad41a..c345cb9b1009 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,4 +1,5 @@
 mod blobreader;
+mod blobwriter;
 mod errors;
 
 pub mod blobservice;
@@ -9,6 +10,7 @@ pub mod pathinfoservice;
 pub mod proto;
 
 pub use blobreader::BlobReader;
+pub use blobwriter::BlobWriter;
 pub use errors::Error;
 
 #[cfg(test)]
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index 4ae3093492f7..52d067030799 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,9 +1,10 @@
 use crate::{blobservice::BlobService, chunkservice::ChunkService, Error};
 use data_encoding::BASE64;
+use std::io::Write;
 use tokio::{sync::mpsc::channel, task};
 use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Status, Streaming};
-use tracing::{debug, instrument};
+use tracing::{debug, error, instrument, warn};
 
 pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
     blob_service: BS,
@@ -164,86 +165,20 @@ impl<
     ) -> Result<Response<super::PutBlobResponse>, Status> {
         let mut req_inner = request.into_inner();
 
-        // initialize a blake3 hasher calculating the hash of the whole blob.
-        let mut blob_hasher = blake3::Hasher::new();
+        let mut blob_writer = crate::BlobWriter::new(&self.chunk_service);
 
-        // start a BlobMeta, which we'll fill while looping over the chunks
-        let mut blob_meta = super::BlobMeta::default();
-
-        // is filled with bytes received from the client.
-        let mut buf: Vec<u8> = vec![];
-
-        // This reads data from the client, chunks it up using fastcdc,
-        // uploads all chunks to the [ChunkService], and fills a
-        // [super::BlobMeta] linking to these chunks.
+        // receive data from the client, and keep writing it to the blob writer.
         while let Some(blob_chunk) = req_inner.message().await? {
-            // calculate blob hash, and use rayon if data is > 128KiB.
-            if blob_chunk.data.len() > 128 * 1024 {
-                blob_hasher.update_rayon(&blob_chunk.data);
-            } else {
-                blob_hasher.update(&blob_chunk.data);
-            }
-
-            // extend buf with the newly received data
-            buf.append(&mut blob_chunk.data.clone());
-
-            // TODO: play with chunking sizes
-            let chunker_avg_size = 64 * 1024;
-            let chunker_min_size = chunker_avg_size / 4;
-            let chunker_max_size = chunker_avg_size * 4;
-
-            // initialize a chunker with the current buffer
-            let chunker = fastcdc::v2020::FastCDC::new(
-                &buf,
-                chunker_min_size,
-                chunker_avg_size,
-                chunker_max_size,
-            );
-
-            // ask the chunker for cutting points in the buffer.
-            let mut start_pos = 0 as usize;
-            buf = loop {
-                // ask the chunker for the next cutting point.
-                let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
-
-                // whenever the last cut point is pointing to the end of the buffer,
-                // keep that chunk left in there.
-                // We don't know if the chunker decided to cut here simply because it was
-                // at the end of the buffer, or if it would also cut if there
-                // were more data.
-                //
-                // Split off all previous chunks and keep this chunk data in the buffer.
-                if end_pos == buf.len() {
-                    break buf.split_off(start_pos);
-                }
-
-                // Upload that chunk to the chunk service and record it in BlobMeta.
-                // TODO: make upload_chunk async and upload concurrently?
-                let chunk_data = &buf[start_pos..end_pos];
-                let chunk_digest =
-                    Self::upload_chunk(self.chunk_service.clone(), chunk_data.to_vec())?;
-
-                blob_meta.chunks.push(super::blob_meta::ChunkMeta {
-                    digest: chunk_digest,
-                    size: chunk_data.len() as u32,
-                });
-
-                // move start_pos over the processed chunk.
-                start_pos = end_pos;
+            if let Err(e) = blob_writer.write_all(&blob_chunk.data) {
+                error!(e=%e,"unable to write blob data");
+                return Err(Status::internal("unable to write blob data"));
             }
         }
 
-        // Also upload the last chunk (what's left in `buf`) to the chunk
-        // service and record it in BlobMeta.
-        let buf_len = buf.len() as u32;
-        let chunk_digest = Self::upload_chunk(self.chunk_service.clone(), buf)?;
-
-        blob_meta.chunks.push(super::blob_meta::ChunkMeta {
-            digest: chunk_digest,
-            size: buf_len,
-        });
-
-        let blob_digest = blob_hasher.finalize().as_bytes().to_vec();
+        // run finalize
+        let (blob_digest, blob_meta) = blob_writer
+            .finalize()
+            .map_err(|_| Status::internal("unable to finalize blob"))?;
 
         // check if we have the received blob in the [BlobService] already.
         let resp = self.blob_service.stat(&super::StatBlobRequest {