diff options
-rw-r--r-- | tvix/store/src/chunkservice/mod.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/chunkservice/util.rs | 28 | ||||
-rw-r--r-- | tvix/store/src/import.rs | 13 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 12 |
4 files changed, 35 insertions, 19 deletions
diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs index 725ed2014e5b..60bef3765d1b 100644 --- a/tvix/store/src/chunkservice/mod.rs +++ b/tvix/store/src/chunkservice/mod.rs @@ -7,6 +7,7 @@ use crate::Error; pub use self::memory::MemoryChunkService; pub use self::sled::SledChunkService; +pub use self::util::update_hasher; pub use self::util::upload_chunk; /// The base trait all ChunkService services need to implement. diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs index fe8e4b350fe4..2897d4e58e94 100644 --- a/tvix/store/src/chunkservice/util.rs +++ b/tvix/store/src/chunkservice/util.rs @@ -11,12 +11,7 @@ pub fn upload_chunk<CS: ChunkService>( chunk_data: Vec<u8>, ) -> Result<Vec<u8>, Error> { let mut hasher = blake3::Hasher::new(); - // TODO: benchmark this number and factor it out - if chunk_data.len() >= 128 * 1024 { - hasher.update_rayon(&chunk_data); - } else { - hasher.update(&chunk_data); - } + update_hasher(&mut hasher, &chunk_data); let digest = hasher.finalize(); if chunk_service.has(digest.as_bytes())? { @@ -28,3 +23,24 @@ pub fn upload_chunk<CS: ChunkService>( Ok(digest.as_bytes().to_vec()) } + +/// updates a given hasher with more data. Uses rayon if the data is +/// sufficiently big. +/// +/// From the docs: +/// +/// To get any performance benefit from multithreading, the input buffer needs +/// to be large. As a rule of thumb on x86_64, update_rayon is slower than +/// update for inputs under 128 KiB. That threshold varies quite a lot across +/// different processors, and it’s important to benchmark your specific use +/// case. +/// +/// We didn't benchmark yet, so these numbers might need tweaking. +#[instrument(skip_all)] +pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) { + if data.len() > 128 * 1024 { + hasher.update_rayon(data); + } else { + hasher.update(data); + } +} diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index bd911cc2e81d..564363f1ff1a 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,4 +1,7 @@ -use crate::{chunkservice::upload_chunk, proto}; +use crate::{ + chunkservice::{update_hasher, upload_chunk}, + proto, +}; use std::{ collections::HashMap, fmt::Debug, @@ -138,12 +141,8 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire let chunk_len = chunk.data.len() as u32; - // update calculate blob hash, and use rayon if data is > 128KiB. - if chunk_len > 128 * 1024 { - blob_hasher.update_rayon(&chunk.data); - } else { - blob_hasher.update(&chunk.data); - } + // update calculate blob hash + update_hasher(&mut blob_hasher, &chunk.data); let chunk_digest = upload_chunk(chunk_service, chunk.data)?; diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index eea837608b55..72eb6fe17727 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,4 +1,8 @@ -use crate::{blobservice::BlobService, chunkservice::ChunkService, Error}; +use crate::{ + blobservice::BlobService, + chunkservice::{update_hasher, ChunkService}, + Error, +}; use data_encoding::BASE64; use std::io::{BufWriter, Write}; use tokio::{sync::mpsc::channel, task}; @@ -23,11 +27,7 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> { #[instrument(skip(chunk_service))] fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> { let mut hasher = blake3::Hasher::new(); - if chunk_data.len() >= 128 * 1024 { - hasher.update_rayon(&chunk_data); - } else { - hasher.update(&chunk_data); - } + update_hasher(&mut hasher, &chunk_data); let digest = hasher.finalize(); if chunk_service.has(digest.as_bytes())? { |