diff options
author | Florian Klink <flokli@flokli.de> | 2023-03-10T13·23+0100 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-03-11T12·54+0000 |
commit | ead113cdfcc1598ec2fbed4108f2c820aaa7f231 (patch) | |
tree | 20330becb0ec7a7fa752bf0ccd6a41c817e1f9ce /tvix | |
parent | ceb9d670bf94dec47df2d49fe9ece6bf293efd27 (diff) |
feat(tvix/store/import): use StreamCDC instead of blobwriter r/5950
This seems to be way faster. Change-Id: Ica7cee95d108c51fe67365f07366634ddbbfa060 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8246 Reviewed-by: raitobezarius <tvl@lahfa.xyz> Reviewed-by: tazjin <tazjin@tvl.su> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/store/src/import.rs | 44 |
1 files changed, 35 insertions, 9 deletions
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index 5449634cc9f5..bd911cc2e81d 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,10 +1,9 @@ -use crate::{proto, BlobWriter}; +use crate::{chunkservice::upload_chunk, proto}; use std::{ collections::HashMap, fmt::Debug, fs, fs::File, - io::BufReader, os::unix::prelude::PermissionsExt, path::{Path, PathBuf}, }; @@ -115,18 +114,45 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire // hash the file contents, upload chunks if not there yet let (blob_digest, blob_meta) = { - let mut blob_writer = BlobWriter::new(chunk_service); - let file = File::open(entry_path.clone()) .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?; - let mut file_reader = BufReader::new(file); + let mut blob_meta = proto::BlobMeta::default(); + let mut blob_hasher = blake3::Hasher::new(); + + // TODO: play with chunking sizes + let chunker_avg_size = 64 * 1024; + let chunker_min_size = chunker_avg_size / 4; + let chunker_max_size = chunker_avg_size * 4; + + let chunker = fastcdc::v2020::StreamCDC::new( + Box::new(file), + chunker_min_size, + chunker_avg_size, + chunker_max_size, + ); + + for chunking_result in chunker { + let chunk = chunking_result.unwrap(); + // TODO: convert to error::UnableToRead - std::io::copy(&mut file_reader, &mut blob_writer) - .map_err(|e| Error::UnableToRead(entry_path, e))?; + let chunk_len = chunk.data.len() as u32; - // TODO: handle errors - blob_writer.finalize().unwrap() + // update calculate blob hash, and use rayon if data is > 128KiB. + if chunk_len > 128 * 1024 { + blob_hasher.update_rayon(&chunk.data); + } else { + blob_hasher.update(&chunk.data); + } + + let chunk_digest = upload_chunk(chunk_service, chunk.data)?; + + blob_meta.chunks.push(proto::blob_meta::ChunkMeta { + digest: chunk_digest, + size: chunk_len, + }); + } + (blob_hasher.finalize().as_bytes().to_vec(), blob_meta) }; // upload blobmeta if not there yet |