about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/store/src/import.rs44
1 files changed, 35 insertions, 9 deletions
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index 5449634cc9f5..bd911cc2e81d 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -1,10 +1,9 @@
-use crate::{proto, BlobWriter};
+use crate::{chunkservice::upload_chunk, proto};
 use std::{
     collections::HashMap,
     fmt::Debug,
     fs,
     fs::File,
-    io::BufReader,
     os::unix::prelude::PermissionsExt,
     path::{Path, PathBuf},
 };
@@ -115,18 +114,45 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire
 
         // hash the file contents, upload chunks if not there yet
         let (blob_digest, blob_meta) = {
-            let mut blob_writer = BlobWriter::new(chunk_service);
-
             let file = File::open(entry_path.clone())
                 .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
 
-            let mut file_reader = BufReader::new(file);
+            let mut blob_meta = proto::BlobMeta::default();
+            let mut blob_hasher = blake3::Hasher::new();
+
+            // TODO: play with chunking sizes
+            let chunker_avg_size = 64 * 1024;
+            let chunker_min_size = chunker_avg_size / 4;
+            let chunker_max_size = chunker_avg_size * 4;
+
+            let chunker = fastcdc::v2020::StreamCDC::new(
+                Box::new(file),
+                chunker_min_size,
+                chunker_avg_size,
+                chunker_max_size,
+            );
+
+            for chunking_result in chunker {
+                let chunk = chunking_result.unwrap();
+                // TODO: convert to error::UnableToRead
 
-            std::io::copy(&mut file_reader, &mut blob_writer)
-                .map_err(|e| Error::UnableToRead(entry_path, e))?;
+                let chunk_len = chunk.data.len() as u32;
 
-            // TODO: handle errors
-            blob_writer.finalize().unwrap()
+                // update calculate blob hash, and use rayon if data is > 128KiB.
+                if chunk_len > 128 * 1024 {
+                    blob_hasher.update_rayon(&chunk.data);
+                } else {
+                    blob_hasher.update(&chunk.data);
+                }
+
+                let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
+
+                blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
+                    digest: chunk_digest,
+                    size: chunk_len,
+                });
+            }
+            (blob_hasher.finalize().as_bytes().to_vec(), blob_meta)
         };
 
         // upload blobmeta if not there yet