diff options
-rw-r--r-- | tvix/Cargo.lock | 1 | ||||
-rw-r--r-- | tvix/Cargo.nix | 4 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/store/src/blobwriter.rs | 93 | ||||
-rw-r--r-- | tvix/store/src/import.rs | 4 |
5 files changed, 66 insertions, 39 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index cba7166ce878..de4d832b8954 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -2796,6 +2796,7 @@ dependencies = [ "nix-compat", "prost", "prost-build", + "rayon", "sha2 0.10.6", "sled", "tempfile", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 20751f596a3f..7167a3cc1370 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -8331,6 +8331,10 @@ rec { packageId = "prost"; } { + name = "rayon"; + packageId = "rayon"; + } + { name = "sha2"; packageId = "sha2 0.10.6"; } diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 09a62bc79fba..65d3420ab7d3 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -14,10 +14,11 @@ count-write = "0.1.0" data-encoding = "2.3.3" fastcdc = "3.0.0" lazy_static = "1.4.0" +nix-compat = { path = "../nix-compat" } prost = "0.11.2" +rayon = "1.6.1" sha2 = "0.10.6" sled = { version = "0.34.7", features = ["compression"] } -nix-compat = { path = "../nix-compat" } thiserror = "1.0.38" tokio-stream = "0.1.11" tokio = { version = "1.23.0", features = ["rt-multi-thread"] } diff --git a/tvix/store/src/blobwriter.rs b/tvix/store/src/blobwriter.rs index 8cb09ecc6d3c..9b4dcb8f21ef 100644 --- a/tvix/store/src/blobwriter.rs +++ b/tvix/store/src/blobwriter.rs @@ -1,5 +1,6 @@ use crate::chunkservice::ChunkService; use crate::{proto, Error}; +use rayon::prelude::*; use tracing::{debug, instrument}; pub struct BlobWriter<'a, CS: ChunkService> { @@ -13,6 +14,31 @@ pub struct BlobWriter<'a, CS: ChunkService> { buf: Vec<u8>, } +// upload a chunk to the chunk service, and return its digest (or an error) when done. +#[instrument(skip_all)] +fn upload_chunk<CS: ChunkService>( + chunk_service: &CS, + chunk_data: Vec<u8>, +) -> Result<Vec<u8>, Error> { + let mut hasher = blake3::Hasher::new(); + // TODO: benchmark this number and factor it out + if chunk_data.len() >= 128 * 1024 { + hasher.update_rayon(&chunk_data); + } else { + hasher.update(&chunk_data); + } + let digest = hasher.finalize(); + + if chunk_service.has(digest.as_bytes())? { + debug!("already has chunk, skipping"); + } + let digest_resp = chunk_service.put(chunk_data)?; + + assert_eq!(digest_resp, digest.as_bytes()); + + Ok(digest.as_bytes().to_vec()) +} + impl<'a, CS: ChunkService> BlobWriter<'a, CS> { pub fn new(chunk_service: &'a CS) -> Self { Self { @@ -35,7 +61,7 @@ impl<'a, CS: ChunkService> BlobWriter<'a, CS> { // Also upload the last chunk (what's left in `self.buf`) to the chunk // service and record it in BlobMeta. let buf_len = self.buf.len() as u32; - let chunk_digest = self.upload_chunk(self.buf.clone())?; + let chunk_digest = upload_chunk(self.chunk_service, self.buf.clone())?; self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta { digest: chunk_digest, @@ -49,32 +75,11 @@ impl<'a, CS: ChunkService> BlobWriter<'a, CS> { self.blob_meta.clone(), )); } - - // upload a chunk to the chunk service, and return its digest (or an error) when done. - #[instrument(skip(self, chunk_data))] - fn upload_chunk(&mut self, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> { - let mut hasher = blake3::Hasher::new(); - if chunk_data.len() >= 128 * 1024 { - hasher.update_rayon(&chunk_data); - } else { - hasher.update(&chunk_data); - } - let digest = hasher.finalize(); - - if self.chunk_service.has(digest.as_bytes())? { - debug!("already has chunk, skipping"); - } - let digest_resp = self.chunk_service.put(chunk_data)?; - - assert_eq!(digest_resp, digest.as_bytes()); - - Ok(digest.as_bytes().to_vec()) - } } /// This chunks up all data written using fastcdc, uploads all chunks to the // [ChunkService], and fills a [proto::BlobMeta] linking to these chunks. -impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> { +impl<CS: ChunkService + std::marker::Sync> std::io::Write for BlobWriter<'_, CS> { fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> { // calculate input_buf.len(), we need to return that later. let input_buf_len = input_buf.len(); @@ -107,6 +112,9 @@ impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> { chunker_max_size, ); + // assemble a list of byte slices to be uploaded + let mut chunk_slices: Vec<&[u8]> = Vec::new(); + // ask the chunker for cutting points in the buffer. let mut start_pos = 0_usize; let rest = loop { @@ -114,31 +122,44 @@ impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> { let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos); // whenever the last cut point is pointing to the end of the buffer, - // keep that chunk left in there. + // return that from the loop. // We don't know if the chunker decided to cut here simply because it was // at the end of the buffer, or if it would also cut if there // were more data. // // Split off all previous chunks and keep this chunk data in the buffer. if end_pos == buf.len() { - break buf[start_pos..].to_vec(); + break &buf[start_pos..]; } - // Upload that chunk to the chunk service and record it in BlobMeta. - // TODO: make upload_chunk async and upload concurrently? - let chunk_data = &buf[start_pos..end_pos]; - let chunk_digest = self.upload_chunk(chunk_data.to_vec())?; - - self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta { - digest: chunk_digest, - size: chunk_data.len() as u32, - }); + // if it's an intermediate chunk, add it to chunk_slices. + // We'll later upload all of them in batch. + chunk_slices.push(&buf[start_pos..end_pos]); - // move start_pos over the processed chunk. + // advance start_pos over the processed chunk. start_pos = end_pos; }; - self.buf = rest; + // Upload all chunks to the chunk service and map them to a ChunkMeta + let blob_meta_chunks: Vec<Result<proto::blob_meta::ChunkMeta, Error>> = chunk_slices + .into_par_iter() + .map(|chunk_slice| { + let chunk_service = self.chunk_service.clone(); + let chunk_digest = upload_chunk(chunk_service.clone(), chunk_slice.to_vec())?; + + Ok(proto::blob_meta::ChunkMeta { + digest: chunk_digest, + size: chunk_slice.len() as u32, + }) + }) + .collect(); + + self.blob_meta.chunks = blob_meta_chunks + .into_iter() + .collect::<Result<Vec<proto::blob_meta::ChunkMeta>, Error>>()?; + + // update buf to point to the rest we didn't upload. + self.buf = rest.to_vec(); Ok(input_buf_len) } diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index e71798a6eb4c..5449634cc9f5 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -58,7 +58,7 @@ impl From<super::Error> for Error { // // It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] -fn process_entry<BS: BlobService, CS: ChunkService, DS: DirectoryService>( +fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: DirectoryService>( blob_service: &mut BS, chunk_service: &mut CS, directory_service: &mut DS, @@ -167,7 +167,7 @@ fn process_entry<BS: BlobService, CS: ChunkService, DS: DirectoryService>( #[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))] pub fn import_path< BS: BlobService, - CS: ChunkService, + CS: ChunkService + std::marker::Sync, DS: DirectoryService, P: AsRef<Path> + Debug, >( |