diff options
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 179 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_blobservice.rs | 33 |
2 files changed, 139 insertions, 73 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 2516b5d3f933..4ae3093492f7 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -17,6 +17,27 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> { chunk_service, } } + + // upload the chunk to the chunk service, and return its digest (or an error) when done. + #[instrument(skip(chunk_service))] + fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> { + let mut hasher = blake3::Hasher::new(); + if chunk_data.len() >= 128 * 1024 { + hasher.update_rayon(&chunk_data); + } else { + hasher.update(&chunk_data); + } + let digest = hasher.finalize(); + + if chunk_service.has(digest.as_bytes())? { + debug!("already has chunk, skipping"); + } + let digest_resp = chunk_service.put(chunk_data)?; + + assert_eq!(digest_resp, digest.as_bytes()); + + Ok(digest.as_bytes().to_vec()) + } } #[async_trait] @@ -143,89 +164,101 @@ impl< ) -> Result<Response<super::PutBlobResponse>, Status> { let mut req_inner = request.into_inner(); - // TODO: for now, we collect all Chunks into a large Vec<u8>, and then - // pass it to a (content-defined) Chunker. - // This is because the fastcdc crate currently operates on byte slices, - // not on something implementing [std::io::Read]. - // (see https://github.com/nlfiedler/fastcdc-rs/issues/17) - - let mut blob_contents: Vec<u8> = Vec::new(); - - while let Some(mut blob_chunk) = req_inner.message().await? { - blob_contents.append(&mut blob_chunk.data); - } - - // initialize a new chunker - // TODO: play with chunking sizes - let chunker = fastcdc::v2020::FastCDC::new( - &blob_contents, - 64 * 1024 / 4, // min - 64 * 1024, // avg - 64 * 1024 * 4, // max - ); - - // initialize blake3 hashers. chunk_hasher is used and reset for each - // chunk, blob_hasher calculates the hash of the whole blob. - let mut chunk_hasher = blake3::Hasher::new(); + // initialize a blake3 hasher calculating the hash of the whole blob. let mut blob_hasher = blake3::Hasher::new(); // start a BlobMeta, which we'll fill while looping over the chunks let mut blob_meta = super::BlobMeta::default(); - // loop over all the chunks - for chunk in chunker { - // extract the data itself - let chunk_data: Vec<u8> = - blob_contents[chunk.offset..chunk.offset + chunk.length].to_vec(); - - // calculate the digest of that chunk - chunk_hasher.update(&chunk_data); - let chunk_digest = chunk_hasher.finalize(); - chunk_hasher.reset(); - - // also update blob_hasher - blob_hasher.update(&chunk_data); - - // check if chunk is already in db, and if not, insert. - match self.chunk_service.has(chunk_digest.as_bytes()) { - Err(e) => { - return Err(Error::StorageError(format!( - "unable to check if chunk {} exists: {}", - BASE64.encode(chunk_digest.as_bytes()), - e - )) - .into()); - } - Ok(has_chunk) => { - if !has_chunk { - if let Err(e) = self.chunk_service.put(chunk_data.to_vec()) { - return Err(Error::StorageError(format!( - "unable to store chunk {}: {}", - BASE64.encode(chunk_digest.as_bytes()), - e - )) - .into()); - } - } - } + // is filled with bytes received from the client. + let mut buf: Vec<u8> = vec![]; + + // This reads data from the client, chunks it up using fastcdc, + // uploads all chunks to the [ChunkService], and fills a + // [super::BlobMeta] linking to these chunks. + while let Some(blob_chunk) = req_inner.message().await? { + // calculate blob hash, and use rayon if data is > 128KiB. + if blob_chunk.data.len() > 128 * 1024 { + blob_hasher.update_rayon(&blob_chunk.data); + } else { + blob_hasher.update(&blob_chunk.data); } - // add chunk to blobmeta - blob_meta.chunks.push(super::blob_meta::ChunkMeta { - digest: chunk_digest.as_bytes().to_vec(), - size: chunk.length as u32, - }); + // extend buf with the newly received data + buf.append(&mut blob_chunk.data.clone()); + + // TODO: play with chunking sizes + let chunker_avg_size = 64 * 1024; + let chunker_min_size = chunker_avg_size / 4; + let chunker_max_size = chunker_avg_size * 4; + + // initialize a chunker with the current buffer + let chunker = fastcdc::v2020::FastCDC::new( + &buf, + chunker_min_size, + chunker_avg_size, + chunker_max_size, + ); + + // ask the chunker for cutting points in the buffer. + let mut start_pos = 0 as usize; + buf = loop { + // ask the chunker for the next cutting point. + let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos); + + // whenever the last cut point is pointing to the end of the buffer, + // keep that chunk left in there. + // We don't know if the chunker decided to cut here simply because it was + // at the end of the buffer, or if it would also cut if there + // were more data. + // + // Split off all previous chunks and keep this chunk data in the buffer. + if end_pos == buf.len() { + break buf.split_off(start_pos); + } + + // Upload that chunk to the chunk service and record it in BlobMeta. + // TODO: make upload_chunk async and upload concurrently? + let chunk_data = &buf[start_pos..end_pos]; + let chunk_digest = + Self::upload_chunk(self.chunk_service.clone(), chunk_data.to_vec())?; + + blob_meta.chunks.push(super::blob_meta::ChunkMeta { + digest: chunk_digest, + size: chunk_data.len() as u32, + }); + + // move start_pos over the processed chunk. + start_pos = end_pos; + } } - // done reading data, finalize blob_hasher and insert blobmeta. - let blob_digest = blob_hasher.finalize(); + // Also upload the last chunk (what's left in `buf`) to the chunk + // service and record it in BlobMeta. + let buf_len = buf.len() as u32; + let chunk_digest = Self::upload_chunk(self.chunk_service.clone(), buf)?; - // TODO: don't store if we already have it (potentially with different chunking) - match self.blob_service.put(blob_digest.as_bytes(), blob_meta) { - Ok(()) => Ok(Response::new(super::PutBlobResponse { - digest: blob_digest.as_bytes().to_vec(), - })), - Err(e) => Err(e.into()), + blob_meta.chunks.push(super::blob_meta::ChunkMeta { + digest: chunk_digest, + size: buf_len, + }); + + let blob_digest = blob_hasher.finalize().as_bytes().to_vec(); + + // check if we have the received blob in the [BlobService] already. + let resp = self.blob_service.stat(&super::StatBlobRequest { + digest: blob_digest.to_vec(), + ..Default::default() + })?; + + // if not, store. + if resp.is_none() { + self.blob_service.put(&blob_digest, blob_meta)?; } + + // return to client. + Ok(Response::new(super::PutBlobResponse { + digest: blob_digest, + })) } } diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs index e616d141f1a8..cd8ae3e2bf8b 100644 --- a/tvix/store/src/proto/tests/grpc_blobservice.rs +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -166,6 +166,39 @@ async fn put_read_stat_large() { } assert_eq!(BLOB_B.len() as u32, size_in_stat); + // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values. + // TODO: make the chunker config better accessible, so we don't need to synchronize this. + { + let chunker_avg_size = 64 * 1024; + let chunker_min_size = chunker_avg_size / 4; + let chunker_max_size = chunker_avg_size * 4; + + // initialize a chunker with the current buffer + let blob_b = BLOB_B.to_vec(); + let chunker = fastcdc::v2020::FastCDC::new( + &blob_b, + chunker_min_size, + chunker_avg_size, + chunker_max_size, + ); + + let mut num_chunks = 0; + for (i, chunk) in chunker.enumerate() { + assert_eq!( + resp.chunks[i].size, chunk.length as u32, + "expected locally-chunked chunk length to match stat response" + ); + + num_chunks += 1; + } + + assert_eq!( + resp.chunks.len(), + num_chunks, + "expected number of chunks to match" + ); + } + // Reading the whole blob by its digest via the read() interface should succeed. { let resp = service |