diff options
Diffstat (limited to 'tvix/store/src/chunkservice/util.rs')
-rw-r--r-- | tvix/store/src/chunkservice/util.rs | 43 |
1 files changed, 41 insertions, 2 deletions
diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs index 2897d4e58e94..761a58a646a3 100644 --- a/tvix/store/src/chunkservice/util.rs +++ b/tvix/store/src/chunkservice/util.rs @@ -1,7 +1,7 @@ +use crate::{proto, Error}; +use std::io::Read; use tracing::{debug, instrument}; -use crate::Error; - use super::ChunkService; /// uploads a chunk to a chunk service, and returns its digest (or an error) when done. @@ -24,6 +24,45 @@ pub fn upload_chunk<CS: ChunkService>( Ok(digest.as_bytes().to_vec()) } +/// reads through a reader, writes chunks to a [ChunkService] and returns a +/// [proto::BlobMeta] pointing to all the chunks. +#[instrument(skip_all, err)] +pub fn read_all_and_chunk<CS: ChunkService, R: Read>( + chunk_service: &CS, + r: R, +) -> Result<(Vec<u8>, proto::BlobMeta), Error> { + let mut blob_meta = proto::BlobMeta::default(); + + // hash the file contents, upload chunks if not there yet + let mut blob_hasher = blake3::Hasher::new(); + + // TODO: play with chunking sizes + let chunker_avg_size = 64 * 1024; + let chunker_min_size = chunker_avg_size / 4; + let chunker_max_size = chunker_avg_size * 4; + + let chunker = + fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size); + + for chunking_result in chunker { + let chunk = chunking_result.unwrap(); + // TODO: convert to error::UnableToRead + + let chunk_len = chunk.data.len() as u32; + + // update calculate blob hash + update_hasher(&mut blob_hasher, &chunk.data); + + let chunk_digest = upload_chunk(chunk_service, chunk.data)?; + + blob_meta.chunks.push(proto::blob_meta::ChunkMeta { + digest: chunk_digest, + size: chunk_len, + }); + } + Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta)) +} + /// updates a given hasher with more data. Uses rayon if the data is /// sufficiently big. /// |