about summary refs log tree commit diff
path: root/tvix/store/src/chunkservice/util.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/chunkservice/util.rs')
-rw-r--r--tvix/store/src/chunkservice/util.rs43
1 files changed, 41 insertions, 2 deletions
diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs
index 2897d4e58e94..761a58a646a3 100644
--- a/tvix/store/src/chunkservice/util.rs
+++ b/tvix/store/src/chunkservice/util.rs
@@ -1,7 +1,7 @@
+use crate::{proto, Error};
+use std::io::Read;
 use tracing::{debug, instrument};
 
-use crate::Error;
-
 use super::ChunkService;
 
 /// uploads a chunk to a chunk service, and returns its digest (or an error) when done.
@@ -24,6 +24,45 @@ pub fn upload_chunk<CS: ChunkService>(
     Ok(digest.as_bytes().to_vec())
 }
 
+/// reads through a reader, writes chunks to a [ChunkService] and returns a
+/// [proto::BlobMeta] pointing to all the chunks.
+#[instrument(skip_all, err)]
+pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
+    chunk_service: &CS,
+    r: R,
+) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
+    let mut blob_meta = proto::BlobMeta::default();
+
+    // hash the file contents, upload chunks if not there yet
+    let mut blob_hasher = blake3::Hasher::new();
+
+    // TODO: play with chunking sizes
+    let chunker_avg_size = 64 * 1024;
+    let chunker_min_size = chunker_avg_size / 4;
+    let chunker_max_size = chunker_avg_size * 4;
+
+    let chunker =
+        fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size);
+
+    for chunking_result in chunker {
+        let chunk = chunking_result.unwrap();
+        // TODO: convert to error::UnableToRead
+
+        let chunk_len = chunk.data.len() as u32;
+
+        // update calculate blob hash
+        update_hasher(&mut blob_hasher, &chunk.data);
+
+        let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
+
+        blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
+            digest: chunk_digest,
+            size: chunk_len,
+        });
+    }
+    Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta))
+}
+
 /// updates a given hasher with more data. Uses rayon if the data is
 /// sufficiently big.
 ///