about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
Diffstat (limited to 'tvix')
-rw-r--r--tvix/store/src/chunkservice/mod.rs1
-rw-r--r--tvix/store/src/chunkservice/util.rs43
-rw-r--r--tvix/store/src/import.rs45
3 files changed, 46 insertions, 43 deletions
diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs
index 60bef3765d1b..50365caccf4a 100644
--- a/tvix/store/src/chunkservice/mod.rs
+++ b/tvix/store/src/chunkservice/mod.rs
@@ -7,6 +7,7 @@ use crate::Error;
 
 pub use self::memory::MemoryChunkService;
 pub use self::sled::SledChunkService;
+pub use self::util::read_all_and_chunk;
 pub use self::util::update_hasher;
 pub use self::util::upload_chunk;
 
diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs
index 2897d4e58e94..761a58a646a3 100644
--- a/tvix/store/src/chunkservice/util.rs
+++ b/tvix/store/src/chunkservice/util.rs
@@ -1,7 +1,7 @@
+use crate::{proto, Error};
+use std::io::Read;
 use tracing::{debug, instrument};
 
-use crate::Error;
-
 use super::ChunkService;
 
 /// uploads a chunk to a chunk service, and returns its digest (or an error) when done.
@@ -24,6 +24,45 @@ pub fn upload_chunk<CS: ChunkService>(
     Ok(digest.as_bytes().to_vec())
 }
 
+/// reads through a reader, writes chunks to a [ChunkService] and returns a
+/// [proto::BlobMeta] pointing to all the chunks.
+#[instrument(skip_all, err)]
+pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
+    chunk_service: &CS,
+    r: R,
+) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
+    let mut blob_meta = proto::BlobMeta::default();
+
+    // hash the file contents, upload chunks if not there yet
+    let mut blob_hasher = blake3::Hasher::new();
+
+    // TODO: play with chunking sizes
+    let chunker_avg_size = 64 * 1024;
+    let chunker_min_size = chunker_avg_size / 4;
+    let chunker_max_size = chunker_avg_size * 4;
+
+    let chunker =
+        fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size);
+
+    for chunking_result in chunker {
+        let chunk = chunking_result.unwrap();
+        // TODO: convert to error::UnableToRead
+
+        let chunk_len = chunk.data.len() as u32;
+
+        // update calculate blob hash
+        update_hasher(&mut blob_hasher, &chunk.data);
+
+        let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
+
+        blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
+            digest: chunk_digest,
+            size: chunk_len,
+        });
+    }
+    Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta))
+}
+
 /// updates a given hasher with more data. Uses rayon if the data is
 /// sufficiently big.
 ///
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index d2974d93c030..1e12bd36e460 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -1,7 +1,4 @@
-use crate::{
-    chunkservice::{update_hasher, upload_chunk},
-    proto,
-};
+use crate::{chunkservice::read_all_and_chunk, proto};
 use std::{
     collections::HashMap,
     fmt::Debug,
@@ -115,44 +112,10 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: Dire
             .metadata()
             .map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?;
 
-        // hash the file contents, upload chunks if not there yet
-        let (blob_digest, blob_meta) = {
-            let file = File::open(entry_path.clone())
-                .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
-
-            let mut blob_meta = proto::BlobMeta::default();
-            let mut blob_hasher = blake3::Hasher::new();
-
-            // TODO: play with chunking sizes
-            let chunker_avg_size = 64 * 1024;
-            let chunker_min_size = chunker_avg_size / 4;
-            let chunker_max_size = chunker_avg_size * 4;
-
-            let chunker = fastcdc::v2020::StreamCDC::new(
-                file,
-                chunker_min_size,
-                chunker_avg_size,
-                chunker_max_size,
-            );
-
-            for chunking_result in chunker {
-                let chunk = chunking_result.unwrap();
-                // TODO: convert to error::UnableToRead
+        let file = File::open(entry_path.clone())
+            .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
 
-                let chunk_len = chunk.data.len() as u32;
-
-                // update calculate blob hash
-                update_hasher(&mut blob_hasher, &chunk.data);
-
-                let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
-
-                blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
-                    digest: chunk_digest,
-                    size: chunk_len,
-                });
-            }
-            (blob_hasher.finalize().as_bytes().to_vec(), blob_meta)
-        };
+        let (blob_digest, blob_meta) = read_all_and_chunk(chunk_service, file)?;
 
         // upload blobmeta if not there yet
         if blob_service