use crate::{proto, Error}; use std::io::Read; use tracing::{debug, instrument}; use super::ChunkService; /// uploads a chunk to a chunk service, and returns its digest (or an error) when done. #[instrument(skip_all, err)] pub fn upload_chunk<CS: ChunkService>( chunk_service: &CS, chunk_data: Vec<u8>, ) -> Result<Vec<u8>, Error> { let mut hasher = blake3::Hasher::new(); update_hasher(&mut hasher, &chunk_data); let digest = hasher.finalize(); if chunk_service.has(digest.as_bytes())? { debug!("already has chunk, skipping"); } let digest_resp = chunk_service.put(chunk_data)?; assert_eq!(digest_resp, digest.as_bytes()); Ok(digest.as_bytes().to_vec()) } /// reads through a reader, writes chunks to a [ChunkService] and returns a /// [proto::BlobMeta] pointing to all the chunks. #[instrument(skip_all, err)] pub fn read_all_and_chunk<CS: ChunkService, R: Read>( chunk_service: &CS, r: R, ) -> Result<(Vec<u8>, proto::BlobMeta), Error> { let mut blob_meta = proto::BlobMeta::default(); // hash the file contents, upload chunks if not there yet let mut blob_hasher = blake3::Hasher::new(); // TODO: play with chunking sizes let chunker_avg_size = 64 * 1024; let chunker_min_size = chunker_avg_size / 4; let chunker_max_size = chunker_avg_size * 4; let chunker = fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size); for chunking_result in chunker { let chunk = chunking_result.unwrap(); // TODO: convert to error::UnableToRead let chunk_len = chunk.data.len() as u32; // update calculate blob hash update_hasher(&mut blob_hasher, &chunk.data); let chunk_digest = upload_chunk(chunk_service, chunk.data)?; blob_meta.chunks.push(proto::blob_meta::ChunkMeta { digest: chunk_digest, size: chunk_len, }); } Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta)) } /// updates a given hasher with more data. Uses rayon if the data is /// sufficiently big. /// /// From the docs: /// /// To get any performance benefit from multithreading, the input buffer needs /// to be large. As a rule of thumb on x86_64, update_rayon is slower than /// update for inputs under 128 KiB. That threshold varies quite a lot across /// different processors, and it’s important to benchmark your specific use /// case. /// /// We didn't benchmark yet, so these numbers might need tweaking. #[instrument(skip_all)] pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) { if data.len() > 128 * 1024 { hasher.update_rayon(data); } else { hasher.update(data); } }