about summary refs log tree commit diff
path: root/tvix/store/src/proto/grpc_blobservice_wrapper.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-02-18T19·44+0100
committerflokli <flokli@flokli.de>2023-03-10T10·58+0000
commitd8ab140d2505aa1669bc9378012d736dfa19cac4 (patch)
tree04b4614d499b5e005abdcc9ab0470932e114308b /tvix/store/src/proto/grpc_blobservice_wrapper.rs
parenta40d2dcdcd453e44d53d44b4c1471f3b503c7cd6 (diff)
feat(tvix/store): do not buffer blob data r/5926
Use the FastCDC::cut function to ask fastcd for cutting points as we
receive the data. Make sure to keep the last chunk in the temporary
buffer, as we might not actually cut at the end.

Also, use rayon to calculate the blake3 hash if the input data is
> 128KiB.

Change-Id: I6195f3b74eac5516965cb12d8d026aa720c8b891
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8135
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/proto/grpc_blobservice_wrapper.rs')
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs179
1 files changed, 106 insertions, 73 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index 2516b5d3f9..4ae3093492 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -17,6 +17,27 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
             chunk_service,
         }
     }
+
+    // upload the chunk to the chunk service, and return its digest (or an error) when done.
+    #[instrument(skip(chunk_service))]
+    fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
+        let mut hasher = blake3::Hasher::new();
+        if chunk_data.len() >= 128 * 1024 {
+            hasher.update_rayon(&chunk_data);
+        } else {
+            hasher.update(&chunk_data);
+        }
+        let digest = hasher.finalize();
+
+        if chunk_service.has(digest.as_bytes())? {
+            debug!("already has chunk, skipping");
+        }
+        let digest_resp = chunk_service.put(chunk_data)?;
+
+        assert_eq!(digest_resp, digest.as_bytes());
+
+        Ok(digest.as_bytes().to_vec())
+    }
 }
 
 #[async_trait]
@@ -143,89 +164,101 @@ impl<
     ) -> Result<Response<super::PutBlobResponse>, Status> {
         let mut req_inner = request.into_inner();
 
-        // TODO: for now, we collect all Chunks into a large Vec<u8>, and then
-        // pass it to a (content-defined) Chunker.
-        // This is because the fastcdc crate currently operates on byte slices,
-        // not on something implementing [std::io::Read].
-        // (see https://github.com/nlfiedler/fastcdc-rs/issues/17)
-
-        let mut blob_contents: Vec<u8> = Vec::new();
-
-        while let Some(mut blob_chunk) = req_inner.message().await? {
-            blob_contents.append(&mut blob_chunk.data);
-        }
-
-        // initialize a new chunker
-        // TODO: play with chunking sizes
-        let chunker = fastcdc::v2020::FastCDC::new(
-            &blob_contents,
-            64 * 1024 / 4, // min
-            64 * 1024,     // avg
-            64 * 1024 * 4, // max
-        );
-
-        // initialize blake3 hashers. chunk_hasher is used and reset for each
-        // chunk, blob_hasher calculates the hash of the whole blob.
-        let mut chunk_hasher = blake3::Hasher::new();
+        // initialize a blake3 hasher calculating the hash of the whole blob.
         let mut blob_hasher = blake3::Hasher::new();
 
         // start a BlobMeta, which we'll fill while looping over the chunks
         let mut blob_meta = super::BlobMeta::default();
 
-        // loop over all the chunks
-        for chunk in chunker {
-            // extract the data itself
-            let chunk_data: Vec<u8> =
-                blob_contents[chunk.offset..chunk.offset + chunk.length].to_vec();
-
-            // calculate the digest of that chunk
-            chunk_hasher.update(&chunk_data);
-            let chunk_digest = chunk_hasher.finalize();
-            chunk_hasher.reset();
-
-            // also update blob_hasher
-            blob_hasher.update(&chunk_data);
-
-            // check if chunk is already in db, and if not, insert.
-            match self.chunk_service.has(chunk_digest.as_bytes()) {
-                Err(e) => {
-                    return Err(Error::StorageError(format!(
-                        "unable to check if chunk {} exists: {}",
-                        BASE64.encode(chunk_digest.as_bytes()),
-                        e
-                    ))
-                    .into());
-                }
-                Ok(has_chunk) => {
-                    if !has_chunk {
-                        if let Err(e) = self.chunk_service.put(chunk_data.to_vec()) {
-                            return Err(Error::StorageError(format!(
-                                "unable to store chunk {}: {}",
-                                BASE64.encode(chunk_digest.as_bytes()),
-                                e
-                            ))
-                            .into());
-                        }
-                    }
-                }
+        // is filled with bytes received from the client.
+        let mut buf: Vec<u8> = vec![];
+
+        // This reads data from the client, chunks it up using fastcdc,
+        // uploads all chunks to the [ChunkService], and fills a
+        // [super::BlobMeta] linking to these chunks.
+        while let Some(blob_chunk) = req_inner.message().await? {
+            // calculate blob hash, and use rayon if data is > 128KiB.
+            if blob_chunk.data.len() > 128 * 1024 {
+                blob_hasher.update_rayon(&blob_chunk.data);
+            } else {
+                blob_hasher.update(&blob_chunk.data);
             }
 
-            // add chunk to blobmeta
-            blob_meta.chunks.push(super::blob_meta::ChunkMeta {
-                digest: chunk_digest.as_bytes().to_vec(),
-                size: chunk.length as u32,
-            });
+            // extend buf with the newly received data
+            buf.append(&mut blob_chunk.data.clone());
+
+            // TODO: play with chunking sizes
+            let chunker_avg_size = 64 * 1024;
+            let chunker_min_size = chunker_avg_size / 4;
+            let chunker_max_size = chunker_avg_size * 4;
+
+            // initialize a chunker with the current buffer
+            let chunker = fastcdc::v2020::FastCDC::new(
+                &buf,
+                chunker_min_size,
+                chunker_avg_size,
+                chunker_max_size,
+            );
+
+            // ask the chunker for cutting points in the buffer.
+            let mut start_pos = 0 as usize;
+            buf = loop {
+                // ask the chunker for the next cutting point.
+                let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
+
+                // whenever the last cut point is pointing to the end of the buffer,
+                // keep that chunk left in there.
+                // We don't know if the chunker decided to cut here simply because it was
+                // at the end of the buffer, or if it would also cut if there
+                // were more data.
+                //
+                // Split off all previous chunks and keep this chunk data in the buffer.
+                if end_pos == buf.len() {
+                    break buf.split_off(start_pos);
+                }
+
+                // Upload that chunk to the chunk service and record it in BlobMeta.
+                // TODO: make upload_chunk async and upload concurrently?
+                let chunk_data = &buf[start_pos..end_pos];
+                let chunk_digest =
+                    Self::upload_chunk(self.chunk_service.clone(), chunk_data.to_vec())?;
+
+                blob_meta.chunks.push(super::blob_meta::ChunkMeta {
+                    digest: chunk_digest,
+                    size: chunk_data.len() as u32,
+                });
+
+                // move start_pos over the processed chunk.
+                start_pos = end_pos;
+            }
         }
 
-        // done reading data, finalize blob_hasher and insert blobmeta.
-        let blob_digest = blob_hasher.finalize();
+        // Also upload the last chunk (what's left in `buf`) to the chunk
+        // service and record it in BlobMeta.
+        let buf_len = buf.len() as u32;
+        let chunk_digest = Self::upload_chunk(self.chunk_service.clone(), buf)?;
 
-        // TODO: don't store if we already have it (potentially with different chunking)
-        match self.blob_service.put(blob_digest.as_bytes(), blob_meta) {
-            Ok(()) => Ok(Response::new(super::PutBlobResponse {
-                digest: blob_digest.as_bytes().to_vec(),
-            })),
-            Err(e) => Err(e.into()),
+        blob_meta.chunks.push(super::blob_meta::ChunkMeta {
+            digest: chunk_digest,
+            size: buf_len,
+        });
+
+        let blob_digest = blob_hasher.finalize().as_bytes().to_vec();
+
+        // check if we have the received blob in the [BlobService] already.
+        let resp = self.blob_service.stat(&super::StatBlobRequest {
+            digest: blob_digest.to_vec(),
+            ..Default::default()
+        })?;
+
+        // if not, store.
+        if resp.is_none() {
+            self.blob_service.put(&blob_digest, blob_meta)?;
         }
+
+        // return to client.
+        Ok(Response::new(super::PutBlobResponse {
+            digest: blob_digest,
+        }))
     }
 }