about summary refs log tree commit diff
path: root/tvix/store/src/chunkservice/util.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-03-11T20·14+0100
committerclbot <clbot@tvl.fyi>2023-03-13T08·46+0000
commit7ffb2676ee6d96f382d138d638b4e2a6a3f6841d (patch)
treeb9d45fc799d3ca1613d8191c1dad9da541857425 /tvix/store/src/chunkservice/util.rs
parent2fe7192dbc8787884934cb7623c0c85d12def7f8 (diff)
refactor(tvix/store): add read_all_and_chunk method r/5958
This moves the logic from src/import.rs that

 - reads over the contents of a file
 - chunks them up and uploads individual chunks
 - keeps track of the uploaded chunks in a BlobMeta structure
 - returns the hash of the blob and the BlobMeta structure

… into a generic read_all_and_chunk function in
src/chunkservice/util.rs.

It will work on anything implementing io::Read, not just files, which
will help us in a bit.

Change-Id: I53bf628114b73ee2e515bdae29974571ea2b6f6f
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8259
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/chunkservice/util.rs')
-rw-r--r--tvix/store/src/chunkservice/util.rs43
1 files changed, 41 insertions, 2 deletions
diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs
index 2897d4e58e94..761a58a646a3 100644
--- a/tvix/store/src/chunkservice/util.rs
+++ b/tvix/store/src/chunkservice/util.rs
@@ -1,7 +1,7 @@
+use crate::{proto, Error};
+use std::io::Read;
 use tracing::{debug, instrument};
 
-use crate::Error;
-
 use super::ChunkService;
 
 /// uploads a chunk to a chunk service, and returns its digest (or an error) when done.
@@ -24,6 +24,45 @@ pub fn upload_chunk<CS: ChunkService>(
     Ok(digest.as_bytes().to_vec())
 }
 
+/// reads through a reader, writes chunks to a [ChunkService] and returns a
+/// [proto::BlobMeta] pointing to all the chunks.
+#[instrument(skip_all, err)]
+pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
+    chunk_service: &CS,
+    r: R,
+) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
+    let mut blob_meta = proto::BlobMeta::default();
+
+    // hash the file contents, upload chunks if not there yet
+    let mut blob_hasher = blake3::Hasher::new();
+
+    // TODO: play with chunking sizes
+    let chunker_avg_size = 64 * 1024;
+    let chunker_min_size = chunker_avg_size / 4;
+    let chunker_max_size = chunker_avg_size * 4;
+
+    let chunker =
+        fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size);
+
+    for chunking_result in chunker {
+        let chunk = chunking_result.unwrap();
+        // TODO: convert to error::UnableToRead
+
+        let chunk_len = chunk.data.len() as u32;
+
+        // update calculate blob hash
+        update_hasher(&mut blob_hasher, &chunk.data);
+
+        let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
+
+        blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
+            digest: chunk_digest,
+            size: chunk_len,
+        });
+    }
+    Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta))
+}
+
 /// updates a given hasher with more data. Uses rayon if the data is
 /// sufficiently big.
 ///