diff options
-rw-r--r-- | tvix/store/src/import.rs | 44 |
1 files changed, 35 insertions, 9 deletions
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index 5449634cc9f5..bd911cc2e81d 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,10 +1,9 @@ -use crate::{proto, BlobWriter}; +use crate::{chunkservice::upload_chunk, proto}; use std::{ collections::HashMap, fmt::Debug, fs, fs::File, - io::BufReader, os::unix::prelude::PermissionsExt, path::{Path, PathBuf}, }; @@ -115,18 +114,45 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire // hash the file contents, upload chunks if not there yet let (blob_digest, blob_meta) = { - let mut blob_writer = BlobWriter::new(chunk_service); - let file = File::open(entry_path.clone()) .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?; - let mut file_reader = BufReader::new(file); + let mut blob_meta = proto::BlobMeta::default(); + let mut blob_hasher = blake3::Hasher::new(); + + // TODO: play with chunking sizes + let chunker_avg_size = 64 * 1024; + let chunker_min_size = chunker_avg_size / 4; + let chunker_max_size = chunker_avg_size * 4; + + let chunker = fastcdc::v2020::StreamCDC::new( + Box::new(file), + chunker_min_size, + chunker_avg_size, + chunker_max_size, + ); + + for chunking_result in chunker { + let chunk = chunking_result.unwrap(); + // TODO: convert to error::UnableToRead - std::io::copy(&mut file_reader, &mut blob_writer) - .map_err(|e| Error::UnableToRead(entry_path, e))?; + let chunk_len = chunk.data.len() as u32; - // TODO: handle errors - blob_writer.finalize().unwrap() + // update calculate blob hash, and use rayon if data is > 128KiB. + if chunk_len > 128 * 1024 { + blob_hasher.update_rayon(&chunk.data); + } else { + blob_hasher.update(&chunk.data); + } + + let chunk_digest = upload_chunk(chunk_service, chunk.data)?; + + blob_meta.chunks.push(proto::blob_meta::ChunkMeta { + digest: chunk_digest, + size: chunk_len, + }); + } + (blob_hasher.finalize().as_bytes().to_vec(), blob_meta) }; // upload blobmeta if not there yet |