about summary refs log tree commit diff
path: root/tvix/store/src/proto/grpc_blobservice_wrapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/proto/grpc_blobservice_wrapper.rs')
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs179
1 files changed, 106 insertions, 73 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index 2516b5d3f933..4ae3093492f7 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -17,6 +17,27 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
             chunk_service,
         }
     }
+
+    // upload the chunk to the chunk service, and return its digest (or an error) when done.
+    #[instrument(skip(chunk_service))]
+    fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
+        let mut hasher = blake3::Hasher::new();
+        if chunk_data.len() >= 128 * 1024 {
+            hasher.update_rayon(&chunk_data);
+        } else {
+            hasher.update(&chunk_data);
+        }
+        let digest = hasher.finalize();
+
+        if chunk_service.has(digest.as_bytes())? {
+            debug!("already has chunk, skipping");
+        }
+        let digest_resp = chunk_service.put(chunk_data)?;
+
+        assert_eq!(digest_resp, digest.as_bytes());
+
+        Ok(digest.as_bytes().to_vec())
+    }
 }
 
 #[async_trait]
@@ -143,89 +164,101 @@ impl<
     ) -> Result<Response<super::PutBlobResponse>, Status> {
         let mut req_inner = request.into_inner();
 
-        // TODO: for now, we collect all Chunks into a large Vec<u8>, and then
-        // pass it to a (content-defined) Chunker.
-        // This is because the fastcdc crate currently operates on byte slices,
-        // not on something implementing [std::io::Read].
-        // (see https://github.com/nlfiedler/fastcdc-rs/issues/17)
-
-        let mut blob_contents: Vec<u8> = Vec::new();
-
-        while let Some(mut blob_chunk) = req_inner.message().await? {
-            blob_contents.append(&mut blob_chunk.data);
-        }
-
-        // initialize a new chunker
-        // TODO: play with chunking sizes
-        let chunker = fastcdc::v2020::FastCDC::new(
-            &blob_contents,
-            64 * 1024 / 4, // min
-            64 * 1024,     // avg
-            64 * 1024 * 4, // max
-        );
-
-        // initialize blake3 hashers. chunk_hasher is used and reset for each
-        // chunk, blob_hasher calculates the hash of the whole blob.
-        let mut chunk_hasher = blake3::Hasher::new();
+        // initialize a blake3 hasher calculating the hash of the whole blob.
         let mut blob_hasher = blake3::Hasher::new();
 
         // start a BlobMeta, which we'll fill while looping over the chunks
         let mut blob_meta = super::BlobMeta::default();
 
-        // loop over all the chunks
-        for chunk in chunker {
-            // extract the data itself
-            let chunk_data: Vec<u8> =
-                blob_contents[chunk.offset..chunk.offset + chunk.length].to_vec();
-
-            // calculate the digest of that chunk
-            chunk_hasher.update(&chunk_data);
-            let chunk_digest = chunk_hasher.finalize();
-            chunk_hasher.reset();
-
-            // also update blob_hasher
-            blob_hasher.update(&chunk_data);
-
-            // check if chunk is already in db, and if not, insert.
-            match self.chunk_service.has(chunk_digest.as_bytes()) {
-                Err(e) => {
-                    return Err(Error::StorageError(format!(
-                        "unable to check if chunk {} exists: {}",
-                        BASE64.encode(chunk_digest.as_bytes()),
-                        e
-                    ))
-                    .into());
-                }
-                Ok(has_chunk) => {
-                    if !has_chunk {
-                        if let Err(e) = self.chunk_service.put(chunk_data.to_vec()) {
-                            return Err(Error::StorageError(format!(
-                                "unable to store chunk {}: {}",
-                                BASE64.encode(chunk_digest.as_bytes()),
-                                e
-                            ))
-                            .into());
-                        }
-                    }
-                }
+        // is filled with bytes received from the client.
+        let mut buf: Vec<u8> = vec![];
+
+        // This reads data from the client, chunks it up using fastcdc,
+        // uploads all chunks to the [ChunkService], and fills a
+        // [super::BlobMeta] linking to these chunks.
+        while let Some(blob_chunk) = req_inner.message().await? {
+            // calculate blob hash, and use rayon if data is > 128KiB.
+            if blob_chunk.data.len() > 128 * 1024 {
+                blob_hasher.update_rayon(&blob_chunk.data);
+            } else {
+                blob_hasher.update(&blob_chunk.data);
             }
 
-            // add chunk to blobmeta
-            blob_meta.chunks.push(super::blob_meta::ChunkMeta {
-                digest: chunk_digest.as_bytes().to_vec(),
-                size: chunk.length as u32,
-            });
+            // extend buf with the newly received data
+            buf.append(&mut blob_chunk.data.clone());
+
+            // TODO: play with chunking sizes
+            let chunker_avg_size = 64 * 1024;
+            let chunker_min_size = chunker_avg_size / 4;
+            let chunker_max_size = chunker_avg_size * 4;
+
+            // initialize a chunker with the current buffer
+            let chunker = fastcdc::v2020::FastCDC::new(
+                &buf,
+                chunker_min_size,
+                chunker_avg_size,
+                chunker_max_size,
+            );
+
+            // ask the chunker for cutting points in the buffer.
+            let mut start_pos = 0 as usize;
+            buf = loop {
+                // ask the chunker for the next cutting point.
+                let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
+
+                // whenever the last cut point is pointing to the end of the buffer,
+                // keep that chunk left in there.
+                // We don't know if the chunker decided to cut here simply because it was
+                // at the end of the buffer, or if it would also cut if there
+                // were more data.
+                //
+                // Split off all previous chunks and keep this chunk data in the buffer.
+                if end_pos == buf.len() {
+                    break buf.split_off(start_pos);
+                }
+
+                // Upload that chunk to the chunk service and record it in BlobMeta.
+                // TODO: make upload_chunk async and upload concurrently?
+                let chunk_data = &buf[start_pos..end_pos];
+                let chunk_digest =
+                    Self::upload_chunk(self.chunk_service.clone(), chunk_data.to_vec())?;
+
+                blob_meta.chunks.push(super::blob_meta::ChunkMeta {
+                    digest: chunk_digest,
+                    size: chunk_data.len() as u32,
+                });
+
+                // move start_pos over the processed chunk.
+                start_pos = end_pos;
+            }
         }
 
-        // done reading data, finalize blob_hasher and insert blobmeta.
-        let blob_digest = blob_hasher.finalize();
+        // Also upload the last chunk (what's left in `buf`) to the chunk
+        // service and record it in BlobMeta.
+        let buf_len = buf.len() as u32;
+        let chunk_digest = Self::upload_chunk(self.chunk_service.clone(), buf)?;
 
-        // TODO: don't store if we already have it (potentially with different chunking)
-        match self.blob_service.put(blob_digest.as_bytes(), blob_meta) {
-            Ok(()) => Ok(Response::new(super::PutBlobResponse {
-                digest: blob_digest.as_bytes().to_vec(),
-            })),
-            Err(e) => Err(e.into()),
+        blob_meta.chunks.push(super::blob_meta::ChunkMeta {
+            digest: chunk_digest,
+            size: buf_len,
+        });
+
+        let blob_digest = blob_hasher.finalize().as_bytes().to_vec();
+
+        // check if we have the received blob in the [BlobService] already.
+        let resp = self.blob_service.stat(&super::StatBlobRequest {
+            digest: blob_digest.to_vec(),
+            ..Default::default()
+        })?;
+
+        // if not, store.
+        if resp.is_none() {
+            self.blob_service.put(&blob_digest, blob_meta)?;
         }
+
+        // return to client.
+        Ok(Response::new(super::PutBlobResponse {
+            digest: blob_digest,
+        }))
     }
 }