diff options
author | Florian Klink <flokli@flokli.de> | 2023-03-11T20·14+0100 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-03-13T08·46+0000 |
commit | 7ffb2676ee6d96f382d138d638b4e2a6a3f6841d (patch) | |
tree | b9d45fc799d3ca1613d8191c1dad9da541857425 /tvix/store/src | |
parent | 2fe7192dbc8787884934cb7623c0c85d12def7f8 (diff) |
refactor(tvix/store): add read_all_and_chunk method r/5958
This moves the logic from src/import.rs that - reads over the contents of a file - chunks them up and uploads individual chunks - keeps track of the uploaded chunks in a BlobMeta structure - returns the hash of the blob and the BlobMeta structure … into a generic read_all_and_chunk function in src/chunkservice/util.rs. It will work on anything implementing io::Read, not just files, which will help us in a bit. Change-Id: I53bf628114b73ee2e515bdae29974571ea2b6f6f Reviewed-on: https://cl.tvl.fyi/c/depot/+/8259 Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz> Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src')
-rw-r--r-- | tvix/store/src/chunkservice/mod.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/chunkservice/util.rs | 43 | ||||
-rw-r--r-- | tvix/store/src/import.rs | 45 |
3 files changed, 46 insertions, 43 deletions
diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs index 60bef3765d1b..50365caccf4a 100644 --- a/tvix/store/src/chunkservice/mod.rs +++ b/tvix/store/src/chunkservice/mod.rs @@ -7,6 +7,7 @@ use crate::Error; pub use self::memory::MemoryChunkService; pub use self::sled::SledChunkService; +pub use self::util::read_all_and_chunk; pub use self::util::update_hasher; pub use self::util::upload_chunk; diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs index 2897d4e58e94..761a58a646a3 100644 --- a/tvix/store/src/chunkservice/util.rs +++ b/tvix/store/src/chunkservice/util.rs @@ -1,7 +1,7 @@ +use crate::{proto, Error}; +use std::io::Read; use tracing::{debug, instrument}; -use crate::Error; - use super::ChunkService; /// uploads a chunk to a chunk service, and returns its digest (or an error) when done. @@ -24,6 +24,45 @@ pub fn upload_chunk<CS: ChunkService>( Ok(digest.as_bytes().to_vec()) } +/// reads through a reader, writes chunks to a [ChunkService] and returns a +/// [proto::BlobMeta] pointing to all the chunks. +#[instrument(skip_all, err)] +pub fn read_all_and_chunk<CS: ChunkService, R: Read>( + chunk_service: &CS, + r: R, +) -> Result<(Vec<u8>, proto::BlobMeta), Error> { + let mut blob_meta = proto::BlobMeta::default(); + + // hash the file contents, upload chunks if not there yet + let mut blob_hasher = blake3::Hasher::new(); + + // TODO: play with chunking sizes + let chunker_avg_size = 64 * 1024; + let chunker_min_size = chunker_avg_size / 4; + let chunker_max_size = chunker_avg_size * 4; + + let chunker = + fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size); + + for chunking_result in chunker { + let chunk = chunking_result.unwrap(); + // TODO: convert to error::UnableToRead + + let chunk_len = chunk.data.len() as u32; + + // update calculate blob hash + update_hasher(&mut blob_hasher, &chunk.data); + + let chunk_digest = upload_chunk(chunk_service, chunk.data)?; + + blob_meta.chunks.push(proto::blob_meta::ChunkMeta { + digest: chunk_digest, + size: chunk_len, + }); + } + Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta)) +} + /// updates a given hasher with more data. Uses rayon if the data is /// sufficiently big. /// diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index d2974d93c030..1e12bd36e460 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,7 +1,4 @@ -use crate::{ - chunkservice::{update_hasher, upload_chunk}, - proto, -}; +use crate::{chunkservice::read_all_and_chunk, proto}; use std::{ collections::HashMap, fmt::Debug, @@ -115,44 +112,10 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire .metadata() .map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?; - // hash the file contents, upload chunks if not there yet - let (blob_digest, blob_meta) = { - let file = File::open(entry_path.clone()) - .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?; - - let mut blob_meta = proto::BlobMeta::default(); - let mut blob_hasher = blake3::Hasher::new(); - - // TODO: play with chunking sizes - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - let chunker = fastcdc::v2020::StreamCDC::new( - file, - chunker_min_size, - chunker_avg_size, - chunker_max_size, - ); - - for chunking_result in chunker { - let chunk = chunking_result.unwrap(); - // TODO: convert to error::UnableToRead + let file = File::open(entry_path.clone()) + .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?; - let chunk_len = chunk.data.len() as u32; - - // update calculate blob hash - update_hasher(&mut blob_hasher, &chunk.data); - - let chunk_digest = upload_chunk(chunk_service, chunk.data)?; - - blob_meta.chunks.push(proto::blob_meta::ChunkMeta { - digest: chunk_digest, - size: chunk_len, - }); - } - (blob_hasher.finalize().as_bytes().to_vec(), blob_meta) - }; + let (blob_digest, blob_meta) = read_all_and_chunk(chunk_service, file)?; // upload blobmeta if not there yet if blob_service |