From b049b88d2d7bcb9caef158ffdf9cd931c62d2511 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 10 Mar 2023 23:24:23 +0100 Subject: refactor(tvix/store): factor out hash update into function We're using this in a bunch of places. Let's move it into a helper function. Change-Id: I118fba35f6d343704520ba37280e4ca52a61da44 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8251 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/store/src/chunkservice/mod.rs | 1 + tvix/store/src/chunkservice/util.rs | 28 +++++++++++++++++++----- tvix/store/src/import.rs | 13 +++++------ tvix/store/src/proto/grpc_blobservice_wrapper.rs | 12 +++++----- 4 files changed, 35 insertions(+), 19 deletions(-) (limited to 'tvix/store/src') diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs index 725ed2014e5b..60bef3765d1b 100644 --- a/tvix/store/src/chunkservice/mod.rs +++ b/tvix/store/src/chunkservice/mod.rs @@ -7,6 +7,7 @@ use crate::Error; pub use self::memory::MemoryChunkService; pub use self::sled::SledChunkService; +pub use self::util::update_hasher; pub use self::util::upload_chunk; /// The base trait all ChunkService services need to implement. diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs index fe8e4b350fe4..2897d4e58e94 100644 --- a/tvix/store/src/chunkservice/util.rs +++ b/tvix/store/src/chunkservice/util.rs @@ -11,12 +11,7 @@ pub fn upload_chunk( chunk_data: Vec, ) -> Result, Error> { let mut hasher = blake3::Hasher::new(); - // TODO: benchmark this number and factor it out - if chunk_data.len() >= 128 * 1024 { - hasher.update_rayon(&chunk_data); - } else { - hasher.update(&chunk_data); - } + update_hasher(&mut hasher, &chunk_data); let digest = hasher.finalize(); if chunk_service.has(digest.as_bytes())? { @@ -28,3 +23,24 @@ pub fn upload_chunk( Ok(digest.as_bytes().to_vec()) } + +/// updates a given hasher with more data. Uses rayon if the data is +/// sufficiently big. +/// +/// From the docs: +/// +/// To get any performance benefit from multithreading, the input buffer needs +/// to be large. As a rule of thumb on x86_64, update_rayon is slower than +/// update for inputs under 128 KiB. That threshold varies quite a lot across +/// different processors, and it’s important to benchmark your specific use +/// case. +/// +/// We didn't benchmark yet, so these numbers might need tweaking. +#[instrument(skip_all)] +pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) { + if data.len() > 128 * 1024 { + hasher.update_rayon(data); + } else { + hasher.update(data); + } +} diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index bd911cc2e81d..564363f1ff1a 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,4 +1,7 @@ -use crate::{chunkservice::upload_chunk, proto}; +use crate::{ + chunkservice::{update_hasher, upload_chunk}, + proto, +}; use std::{ collections::HashMap, fmt::Debug, @@ -138,12 +141,8 @@ fn process_entry 128KiB. - if chunk_len > 128 * 1024 { - blob_hasher.update_rayon(&chunk.data); - } else { - blob_hasher.update(&chunk.data); - } + // update calculate blob hash + update_hasher(&mut blob_hasher, &chunk.data); let chunk_digest = upload_chunk(chunk_service, chunk.data)?; diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index eea837608b55..72eb6fe17727 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,4 +1,8 @@ -use crate::{blobservice::BlobService, chunkservice::ChunkService, Error}; +use crate::{ + blobservice::BlobService, + chunkservice::{update_hasher, ChunkService}, + Error, +}; use data_encoding::BASE64; use std::io::{BufWriter, Write}; use tokio::{sync::mpsc::channel, task}; @@ -23,11 +27,7 @@ impl GRPCBlobServiceWrapper { #[instrument(skip(chunk_service))] fn upload_chunk(chunk_service: CS, chunk_data: Vec) -> Result, Error> { let mut hasher = blake3::Hasher::new(); - if chunk_data.len() >= 128 * 1024 { - hasher.update_rayon(&chunk_data); - } else { - hasher.update(&chunk_data); - } + update_hasher(&mut hasher, &chunk_data); let digest = hasher.finalize(); if chunk_service.has(digest.as_bytes())? { -- cgit 1.4.1