about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-03-10T22·24+0100
committerclbot <clbot@tvl.fyi>2023-03-11T14·12+0000
commitb049b88d2d7bcb9caef158ffdf9cd931c62d2511 (patch)
tree904030b3aaddf712ddc3259ecdd3a872d7afc3ee /tvix/store/src
parent2dc93f8de26ba15106fe8a086bb85ca50c09860a (diff)
refactor(tvix/store): factor out hash update into function r/5952
We're using this in a bunch of places. Let's move it into a helper
function.

Change-Id: I118fba35f6d343704520ba37280e4ca52a61da44
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8251
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/chunkservice/mod.rs1
-rw-r--r--tvix/store/src/chunkservice/util.rs28
-rw-r--r--tvix/store/src/import.rs13
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs12
4 files changed, 35 insertions, 19 deletions
diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs
index 725ed2014e5b..60bef3765d1b 100644
--- a/tvix/store/src/chunkservice/mod.rs
+++ b/tvix/store/src/chunkservice/mod.rs
@@ -7,6 +7,7 @@ use crate::Error;
 
 pub use self::memory::MemoryChunkService;
 pub use self::sled::SledChunkService;
+pub use self::util::update_hasher;
 pub use self::util::upload_chunk;
 
 /// The base trait all ChunkService services need to implement.
diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs
index fe8e4b350fe4..2897d4e58e94 100644
--- a/tvix/store/src/chunkservice/util.rs
+++ b/tvix/store/src/chunkservice/util.rs
@@ -11,12 +11,7 @@ pub fn upload_chunk<CS: ChunkService>(
     chunk_data: Vec<u8>,
 ) -> Result<Vec<u8>, Error> {
     let mut hasher = blake3::Hasher::new();
-    // TODO: benchmark this number and factor it out
-    if chunk_data.len() >= 128 * 1024 {
-        hasher.update_rayon(&chunk_data);
-    } else {
-        hasher.update(&chunk_data);
-    }
+    update_hasher(&mut hasher, &chunk_data);
     let digest = hasher.finalize();
 
     if chunk_service.has(digest.as_bytes())? {
@@ -28,3 +23,24 @@ pub fn upload_chunk<CS: ChunkService>(
 
     Ok(digest.as_bytes().to_vec())
 }
+
+/// updates a given hasher with more data. Uses rayon if the data is
+/// sufficiently big.
+///
+/// From the docs:
+///
+/// To get any performance benefit from multithreading, the input buffer needs
+/// to be large. As a rule of thumb on x86_64, update_rayon is slower than
+/// update for inputs under 128 KiB. That threshold varies quite a lot across
+/// different processors, and it’s important to benchmark your specific use
+/// case.
+///
+/// We didn't benchmark yet, so these numbers might need tweaking.
+#[instrument(skip_all)]
+pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) {
+    if data.len() > 128 * 1024 {
+        hasher.update_rayon(data);
+    } else {
+        hasher.update(data);
+    }
+}
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index bd911cc2e81d..564363f1ff1a 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -1,4 +1,7 @@
-use crate::{chunkservice::upload_chunk, proto};
+use crate::{
+    chunkservice::{update_hasher, upload_chunk},
+    proto,
+};
 use std::{
     collections::HashMap,
     fmt::Debug,
@@ -138,12 +141,8 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire
 
                 let chunk_len = chunk.data.len() as u32;
 
-                // update calculate blob hash, and use rayon if data is > 128KiB.
-                if chunk_len > 128 * 1024 {
-                    blob_hasher.update_rayon(&chunk.data);
-                } else {
-                    blob_hasher.update(&chunk.data);
-                }
+                // update calculate blob hash
+                update_hasher(&mut blob_hasher, &chunk.data);
 
                 let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
 
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index eea837608b55..72eb6fe17727 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,4 +1,8 @@
-use crate::{blobservice::BlobService, chunkservice::ChunkService, Error};
+use crate::{
+    blobservice::BlobService,
+    chunkservice::{update_hasher, ChunkService},
+    Error,
+};
 use data_encoding::BASE64;
 use std::io::{BufWriter, Write};
 use tokio::{sync::mpsc::channel, task};
@@ -23,11 +27,7 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
     #[instrument(skip(chunk_service))]
     fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
         let mut hasher = blake3::Hasher::new();
-        if chunk_data.len() >= 128 * 1024 {
-            hasher.update_rayon(&chunk_data);
-        } else {
-            hasher.update(&chunk_data);
-        }
+        update_hasher(&mut hasher, &chunk_data);
         let digest = hasher.finalize();
 
         if chunk_service.has(digest.as_bytes())? {