about summary refs log tree commit diff
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2024-05-18T18·25-0500
committerConnor Brewster <cbrewster@hey.com>2024-05-20T15·21+0000
commitb0aaff25fa68a7b55b870b59024ca2b40c658f33 (patch)
treec7cf430a1c59cd1b66ec6ac2608930cde19d86bf
parentbc42c355cf8d83120b16214f3a1a17f67851d157 (diff)
refactor(tvix/castore): extract concurrent blob uploader r/8158
The archive ingester has a mechanism for concurrently uploading small
blobs to the blob service in order to hide round trip latency with the
blob service when ingesting many small blobs.

Other ingestion sources like NARs also need a similar mechanism, this
extracts the concurrent blob uploading mechanism into its own struct to
make it more reusable.

Change-Id: I05020419ff4b9ad5829fbfb5cd08d36db983b8c0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11693
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
-rw-r--r--tvix/castore/src/import/archive.rs109
-rw-r--r--tvix/castore/src/import/blobs.rs177
-rw-r--r--tvix/castore/src/import/mod.rs1
3 files changed, 190 insertions, 97 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index 0ebb4a236117..cd5b1290e031 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -1,38 +1,23 @@
 //! Imports from an archive (tarballs)
 
 use std::collections::HashMap;
-use std::io::{Cursor, Write};
-use std::sync::Arc;
 
 use petgraph::graph::{DiGraph, NodeIndex};
 use petgraph::visit::{DfsPostOrder, EdgeRef};
 use petgraph::Direction;
 use tokio::io::AsyncRead;
-use tokio::sync::Semaphore;
-use tokio::task::JoinSet;
 use tokio_stream::StreamExt;
 use tokio_tar::Archive;
-use tokio_util::io::InspectReader;
 use tracing::{instrument, warn, Level};
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
 use crate::import::{ingest_entries, IngestionEntry, IngestionError};
 use crate::proto::node::Node;
-use crate::B3Digest;
 
-type TarPathBuf = std::path::PathBuf;
-
-/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
-/// background.
-///
-/// This is a u32 since we acquire a weighted semaphore using the size of the blob.
-/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of
-/// the blob can be represented using a u32 and will not cause an overflow.
-const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
+use super::blobs::{self, ConcurrentBlobUploader};
 
-/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
-const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
+type TarPathBuf = std::path::PathBuf;
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
@@ -57,13 +42,6 @@ pub enum Error {
     #[error("unable to read link name field for {0}: {1}")]
     LinkName(TarPathBuf, std::io::Error),
 
-    #[error("unable to read blob contents for {0}: {1}")]
-    BlobRead(TarPathBuf, std::io::Error),
-
-    // FUTUREWORK: proper error for blob finalize
-    #[error("unable to finalize blob {0}: {1}")]
-    BlobFinalize(TarPathBuf, std::io::Error),
-
     #[error("unsupported tar entry {0} type: {1:?}")]
     EntryType(TarPathBuf, tokio_tar::EntryType),
 
@@ -72,6 +50,9 @@ pub enum Error {
 
     #[error("unexpected number of top level directory entries")]
     UnexpectedNumberOfTopLevelEntries,
+
+    #[error(transparent)]
+    BlobUploadError(#[from] blobs::Error),
 }
 
 /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
@@ -94,8 +75,7 @@ where
     // In the first phase, collect up all the regular files and symlinks.
     let mut nodes = IngestionEntryGraph::new();
 
-    let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE));
-    let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new();
+    let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
 
     let mut entries_iter = archive.entries().map_err(Error::Entries)?;
     while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
@@ -110,77 +90,14 @@ where
             tokio_tar::EntryType::Regular
             | tokio_tar::EntryType::GNUSparse
             | tokio_tar::EntryType::Continuous => {
-                let header_size = header
+                let size = header
                     .size()
                     .map_err(|e| Error::Size(tar_path.clone(), e))?;
 
-                // If the blob is small enough, read it off the wire, compute the digest,
-                // and upload it to the [BlobService] in the background.
-                let (size, digest) = if header_size <= CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
-                    let mut buffer = Vec::with_capacity(header_size as usize);
-                    let mut hasher = blake3::Hasher::new();
-                    let mut reader = InspectReader::new(&mut entry, |bytes| {
-                        hasher.write_all(bytes).unwrap();
-                    });
-
-                    // Ensure that we don't buffer into memory until we've acquired a permit.
-                    // This prevents consuming too much memory when performing concurrent
-                    // blob uploads.
-                    let permit = semaphore
-                        .clone()
-                        // This cast is safe because ensure the header_size is less than
-                        // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32.
-                        .acquire_many_owned(header_size as u32)
-                        .await
-                        .unwrap();
-                    let size = tokio::io::copy(&mut reader, &mut buffer)
-                        .await
-                        .map_err(|e| Error::Size(tar_path.clone(), e))?;
-
-                    let digest: B3Digest = hasher.finalize().as_bytes().into();
-
-                    {
-                        let blob_service = blob_service.clone();
-                        let digest = digest.clone();
-                        async_blob_uploads.spawn({
-                            let tar_path = tar_path.clone();
-                            async move {
-                                let mut writer = blob_service.open_write().await;
-
-                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer)
-                                    .await
-                                    .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
-
-                                let blob_digest = writer
-                                    .close()
-                                    .await
-                                    .map_err(|e| Error::BlobFinalize(tar_path, e))?;
-
-                                assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
-
-                                // Make sure we hold the permit until we finish writing the blob
-                                // to the [BlobService].
-                                drop(permit);
-                                Ok(())
-                            }
-                        });
-                    }
-
-                    (size, digest)
-                } else {
-                    let mut writer = blob_service.open_write().await;
-
-                    let size = tokio::io::copy(&mut entry, &mut writer)
-                        .await
-                        .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
-
-                    let digest = writer
-                        .close()
-                        .await
-                        .map_err(|e| Error::BlobFinalize(tar_path.clone(), e))?;
-
-                    (size, digest)
-                };
+                let digest = blob_uploader
+                    .upload(&path, size, &mut entry)
+                    .await
+                    .map_err(Error::BlobUploadError)?;
 
                 let executable = entry
                     .header()
@@ -219,9 +136,7 @@ where
         nodes.add(entry)?;
     }
 
-    while let Some(result) = async_blob_uploads.join_next().await {
-        result.expect("task panicked")?;
-    }
+    blob_uploader.join().await.map_err(Error::BlobUploadError)?;
 
     let root_node = ingest_entries(
         directory_service,
diff --git a/tvix/castore/src/import/blobs.rs b/tvix/castore/src/import/blobs.rs
new file mode 100644
index 000000000000..8135d871d6c0
--- /dev/null
+++ b/tvix/castore/src/import/blobs.rs
@@ -0,0 +1,177 @@
+use std::{
+    io::{Cursor, Write},
+    sync::Arc,
+};
+
+use tokio::{
+    io::AsyncRead,
+    sync::Semaphore,
+    task::{JoinError, JoinSet},
+};
+use tokio_util::io::InspectReader;
+
+use crate::{blobservice::BlobService, B3Digest, Path, PathBuf};
+
+/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
+/// background.
+///
+/// This is a u32 since we acquire a weighted semaphore using the size of the blob.
+/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of
+/// the blob can be represented using a u32 and will not cause an overflow.
+const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
+
+/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
+const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024;
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("unable to read blob contents for {0}: {1}")]
+    BlobRead(PathBuf, std::io::Error),
+
+    // FUTUREWORK: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(PathBuf, std::io::Error),
+
+    #[error("unexpected size for {path} wanted: {wanted} got: {got}")]
+    UnexpectedSize {
+        path: PathBuf,
+        wanted: u64,
+        got: u64,
+    },
+
+    #[error("blob upload join error: {0}")]
+    JoinError(#[from] JoinError),
+}
+
+/// The concurrent blob uploader provides a mechanism for concurrently uploading small blobs.
+/// This is useful when ingesting from sources like tarballs and archives which each blob entry
+/// must be read sequentially. Ingesting many small blobs sequentially becomes slow due to
+/// round trip time with the blob service. The concurrent blob uploader will buffer small
+/// blobs in memory and upload them to the blob service in the background.
+///
+/// Once all blobs have been uploaded, make sure to call [ConcurrentBlobUploader::join] to wait
+/// for all background jobs to complete and check for any errors.
+pub struct ConcurrentBlobUploader<BS> {
+    blob_service: BS,
+    upload_tasks: JoinSet<Result<(), Error>>,
+    upload_semaphore: Arc<Semaphore>,
+}
+
+impl<BS> ConcurrentBlobUploader<BS>
+where
+    BS: BlobService + Clone + 'static,
+{
+    /// Creates a new concurrent blob uploader which uploads blobs to the provided
+    /// blob service.
+    pub fn new(blob_service: BS) -> Self {
+        Self {
+            blob_service,
+            upload_tasks: JoinSet::new(),
+            upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)),
+        }
+    }
+
+    /// Uploads a blob to the blob service. If the blob is small enough it will be read to a buffer
+    /// and uploaded in the background.
+    /// This will read the entirety of the provided reader unless an error occurs, even if blobs
+    /// are uploaded in the background..
+    pub async fn upload<R>(
+        &mut self,
+        path: &Path,
+        expected_size: u64,
+        mut r: R,
+    ) -> Result<B3Digest, Error>
+    where
+        R: AsyncRead + Unpin,
+    {
+        if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
+            let mut buffer = Vec::with_capacity(expected_size as usize);
+            let mut hasher = blake3::Hasher::new();
+            let mut reader = InspectReader::new(&mut r, |bytes| {
+                hasher.write_all(bytes).unwrap();
+            });
+
+            let permit = self
+                .upload_semaphore
+                .clone()
+                // This cast is safe because ensure the header_size is less than
+                // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32.
+                .acquire_many_owned(expected_size as u32)
+                .await
+                .unwrap();
+            let size = tokio::io::copy(&mut reader, &mut buffer)
+                .await
+                .map_err(|e| Error::BlobRead(path.into(), e))?;
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
+
+            if size != expected_size {
+                return Err(Error::UnexpectedSize {
+                    path: path.into(),
+                    wanted: expected_size,
+                    got: size,
+                });
+            }
+
+            self.upload_tasks.spawn({
+                let blob_service = self.blob_service.clone();
+                let expected_digest = digest.clone();
+                let path = path.to_owned();
+                let r = Cursor::new(buffer);
+                async move {
+                    let digest = upload_blob(&blob_service, &path, expected_size, r).await?;
+
+                    assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch");
+
+                    // Make sure we hold the permit until we finish writing the blob
+                    // to the [BlobService].
+                    drop(permit);
+                    Ok(())
+                }
+            });
+
+            return Ok(digest);
+        }
+
+        upload_blob(&self.blob_service, path, expected_size, r).await
+    }
+
+    /// Waits for all background upload jobs to complete, returning any upload errors.
+    pub async fn join(mut self) -> Result<(), Error> {
+        while let Some(result) = self.upload_tasks.join_next().await {
+            result??;
+        }
+        Ok(())
+    }
+}
+
+async fn upload_blob<BS, R>(
+    blob_service: &BS,
+    path: &Path,
+    expected_size: u64,
+    mut r: R,
+) -> Result<B3Digest, Error>
+where
+    BS: BlobService,
+    R: AsyncRead + Unpin,
+{
+    let mut writer = blob_service.open_write().await;
+
+    let size = tokio::io::copy(&mut r, &mut writer)
+        .await
+        .map_err(|e| Error::BlobRead(path.into(), e))?;
+
+    let digest = writer
+        .close()
+        .await
+        .map_err(|e| Error::BlobFinalize(path.into(), e))?;
+
+    if size != expected_size {
+        return Err(Error::UnexpectedSize {
+            path: path.into(),
+            wanted: expected_size,
+            got: size,
+        });
+    }
+
+    Ok(digest)
+}
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index e8b27e469c3d..4223fe538756 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -24,6 +24,7 @@ mod error;
 pub use error::IngestionError;
 
 pub mod archive;
+pub mod blobs;
 pub mod fs;
 
 /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService].