From d8ab140d2505aa1669bc9378012d736dfa19cac4 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sat, 18 Feb 2023 20:44:58 +0100 Subject: feat(tvix/store): do not buffer blob data Use the FastCDC::cut function to ask fastcd for cutting points as we receive the data. Make sure to keep the last chunk in the temporary buffer, as we might not actually cut at the end. Also, use rayon to calculate the blake3 hash if the input data is > 128KiB. Change-Id: I6195f3b74eac5516965cb12d8d026aa720c8b891 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8135 Reviewed-by: raitobezarius Tested-by: BuildkiteCI --- tvix/store/src/proto/tests/grpc_blobservice.rs | 33 ++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) (limited to 'tvix/store/src/proto/tests/grpc_blobservice.rs') diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs index e616d141f1a8..cd8ae3e2bf8b 100644 --- a/tvix/store/src/proto/tests/grpc_blobservice.rs +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -166,6 +166,39 @@ async fn put_read_stat_large() { } assert_eq!(BLOB_B.len() as u32, size_in_stat); + // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values. + // TODO: make the chunker config better accessible, so we don't need to synchronize this. + { + let chunker_avg_size = 64 * 1024; + let chunker_min_size = chunker_avg_size / 4; + let chunker_max_size = chunker_avg_size * 4; + + // initialize a chunker with the current buffer + let blob_b = BLOB_B.to_vec(); + let chunker = fastcdc::v2020::FastCDC::new( + &blob_b, + chunker_min_size, + chunker_avg_size, + chunker_max_size, + ); + + let mut num_chunks = 0; + for (i, chunk) in chunker.enumerate() { + assert_eq!( + resp.chunks[i].size, chunk.length as u32, + "expected locally-chunked chunk length to match stat response" + ); + + num_chunks += 1; + } + + assert_eq!( + resp.chunks.len(), + num_chunks, + "expected number of chunks to match" + ); + } + // Reading the whole blob by its digest via the read() interface should succeed. { let resp = service -- cgit 1.4.1