about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-03-01T17·30+0100
committerflokli <flokli@flokli.de>2023-03-10T11·53+0000
commit510927e43a950e727c31e100f1e88f0d8a80b6b9 (patch)
tree918fcd3fcee62b0bafc9637c8116faa8680eb935 /tvix
parent2ef60282b61a61496e642021f0ab8eab7569bbaa (diff)
feat(tvix/store): use rayon to upload chunks concurrently r/5939
Look at the data that's written to us, and upload all chunks but the
rest in parallel, using rayon. This required moving `upload_chunk`
outside the struct, and accepting a ChunkService to use for upload
(which it was previously getting from `self.chunk_service`).

This doesn't speed up things too much for now, because things are still
mostly linear.

Change-Id: Id785b5705c3392214d2da1a5b6a182bcf5048c8d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8195
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix')
-rw-r--r--tvix/Cargo.lock1
-rw-r--r--tvix/Cargo.nix4
-rw-r--r--tvix/store/Cargo.toml3
-rw-r--r--tvix/store/src/blobwriter.rs93
-rw-r--r--tvix/store/src/import.rs4
5 files changed, 66 insertions, 39 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index cba7166ce878..de4d832b8954 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -2796,6 +2796,7 @@ dependencies = [
  "nix-compat",
  "prost",
  "prost-build",
+ "rayon",
  "sha2 0.10.6",
  "sled",
  "tempfile",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 20751f596a3f..7167a3cc1370 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -8331,6 +8331,10 @@ rec {
             packageId = "prost";
           }
           {
+            name = "rayon";
+            packageId = "rayon";
+          }
+          {
             name = "sha2";
             packageId = "sha2 0.10.6";
           }
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 09a62bc79fba..65d3420ab7d3 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -14,10 +14,11 @@ count-write = "0.1.0"
 data-encoding = "2.3.3"
 fastcdc = "3.0.0"
 lazy_static = "1.4.0"
+nix-compat = { path = "../nix-compat" }
 prost = "0.11.2"
+rayon = "1.6.1"
 sha2 = "0.10.6"
 sled = { version = "0.34.7", features = ["compression"] }
-nix-compat = { path = "../nix-compat" }
 thiserror = "1.0.38"
 tokio-stream = "0.1.11"
 tokio = { version = "1.23.0", features = ["rt-multi-thread"] }
diff --git a/tvix/store/src/blobwriter.rs b/tvix/store/src/blobwriter.rs
index 8cb09ecc6d3c..9b4dcb8f21ef 100644
--- a/tvix/store/src/blobwriter.rs
+++ b/tvix/store/src/blobwriter.rs
@@ -1,5 +1,6 @@
 use crate::chunkservice::ChunkService;
 use crate::{proto, Error};
+use rayon::prelude::*;
 use tracing::{debug, instrument};
 
 pub struct BlobWriter<'a, CS: ChunkService> {
@@ -13,6 +14,31 @@ pub struct BlobWriter<'a, CS: ChunkService> {
     buf: Vec<u8>,
 }
 
+// upload a chunk to the chunk service, and return its digest (or an error) when done.
+#[instrument(skip_all)]
+fn upload_chunk<CS: ChunkService>(
+    chunk_service: &CS,
+    chunk_data: Vec<u8>,
+) -> Result<Vec<u8>, Error> {
+    let mut hasher = blake3::Hasher::new();
+    // TODO: benchmark this number and factor it out
+    if chunk_data.len() >= 128 * 1024 {
+        hasher.update_rayon(&chunk_data);
+    } else {
+        hasher.update(&chunk_data);
+    }
+    let digest = hasher.finalize();
+
+    if chunk_service.has(digest.as_bytes())? {
+        debug!("already has chunk, skipping");
+    }
+    let digest_resp = chunk_service.put(chunk_data)?;
+
+    assert_eq!(digest_resp, digest.as_bytes());
+
+    Ok(digest.as_bytes().to_vec())
+}
+
 impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
     pub fn new(chunk_service: &'a CS) -> Self {
         Self {
@@ -35,7 +61,7 @@ impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
             // Also upload the last chunk (what's left in `self.buf`) to the chunk
             // service and record it in BlobMeta.
             let buf_len = self.buf.len() as u32;
-            let chunk_digest = self.upload_chunk(self.buf.clone())?;
+            let chunk_digest = upload_chunk(self.chunk_service, self.buf.clone())?;
 
             self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
                 digest: chunk_digest,
@@ -49,32 +75,11 @@ impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
             self.blob_meta.clone(),
         ));
     }
-
-    // upload a chunk to the chunk service, and return its digest (or an error) when done.
-    #[instrument(skip(self, chunk_data))]
-    fn upload_chunk(&mut self, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
-        let mut hasher = blake3::Hasher::new();
-        if chunk_data.len() >= 128 * 1024 {
-            hasher.update_rayon(&chunk_data);
-        } else {
-            hasher.update(&chunk_data);
-        }
-        let digest = hasher.finalize();
-
-        if self.chunk_service.has(digest.as_bytes())? {
-            debug!("already has chunk, skipping");
-        }
-        let digest_resp = self.chunk_service.put(chunk_data)?;
-
-        assert_eq!(digest_resp, digest.as_bytes());
-
-        Ok(digest.as_bytes().to_vec())
-    }
 }
 
 /// This chunks up all data written using fastcdc, uploads all chunks to the
 // [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
-impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> {
+impl<CS: ChunkService + std::marker::Sync> std::io::Write for BlobWriter<'_, CS> {
     fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
         // calculate input_buf.len(), we need to return that later.
         let input_buf_len = input_buf.len();
@@ -107,6 +112,9 @@ impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> {
             chunker_max_size,
         );
 
+        // assemble a list of byte slices to be uploaded
+        let mut chunk_slices: Vec<&[u8]> = Vec::new();
+
         // ask the chunker for cutting points in the buffer.
         let mut start_pos = 0_usize;
         let rest = loop {
@@ -114,31 +122,44 @@ impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> {
             let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
 
             // whenever the last cut point is pointing to the end of the buffer,
-            // keep that chunk left in there.
+            // return that from the loop.
             // We don't know if the chunker decided to cut here simply because it was
             // at the end of the buffer, or if it would also cut if there
             // were more data.
             //
             // Split off all previous chunks and keep this chunk data in the buffer.
             if end_pos == buf.len() {
-                break buf[start_pos..].to_vec();
+                break &buf[start_pos..];
             }
 
-            // Upload that chunk to the chunk service and record it in BlobMeta.
-            // TODO: make upload_chunk async and upload concurrently?
-            let chunk_data = &buf[start_pos..end_pos];
-            let chunk_digest = self.upload_chunk(chunk_data.to_vec())?;
-
-            self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
-                digest: chunk_digest,
-                size: chunk_data.len() as u32,
-            });
+            // if it's an intermediate chunk, add it to chunk_slices.
+            // We'll later upload all of them in batch.
+            chunk_slices.push(&buf[start_pos..end_pos]);
 
-            // move start_pos over the processed chunk.
+            // advance start_pos over the processed chunk.
             start_pos = end_pos;
         };
 
-        self.buf = rest;
+        // Upload all chunks to the chunk service and map them to a ChunkMeta
+        let blob_meta_chunks: Vec<Result<proto::blob_meta::ChunkMeta, Error>> = chunk_slices
+            .into_par_iter()
+            .map(|chunk_slice| {
+                let chunk_service = self.chunk_service.clone();
+                let chunk_digest = upload_chunk(chunk_service.clone(), chunk_slice.to_vec())?;
+
+                Ok(proto::blob_meta::ChunkMeta {
+                    digest: chunk_digest,
+                    size: chunk_slice.len() as u32,
+                })
+            })
+            .collect();
+
+        self.blob_meta.chunks = blob_meta_chunks
+            .into_iter()
+            .collect::<Result<Vec<proto::blob_meta::ChunkMeta>, Error>>()?;
+
+        // update buf to point to the rest we didn't upload.
+        self.buf = rest.to_vec();
 
         Ok(input_buf_len)
     }
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index e71798a6eb4c..5449634cc9f5 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -58,7 +58,7 @@ impl From<super::Error> for Error {
 //
 // It assumes the caller adds returned nodes to the directories it assembles.
 #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
-fn process_entry<BS: BlobService, CS: ChunkService, DS: DirectoryService>(
+fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DS: DirectoryService>(
     blob_service: &mut BS,
     chunk_service: &mut CS,
     directory_service: &mut DS,
@@ -167,7 +167,7 @@ fn process_entry<BS: BlobService, CS: ChunkService, DS: DirectoryService>(
 #[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))]
 pub fn import_path<
     BS: BlobService,
-    CS: ChunkService,
+    CS: ChunkService + std::marker::Sync,
     DS: DirectoryService,
     P: AsRef<Path> + Debug,
 >(