about summary refs log tree commit diff
path: root/tvix/castore/src/import/blobs.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/import/blobs.rs')
-rw-r--r--tvix/castore/src/import/blobs.rs177
1 files changed, 177 insertions, 0 deletions
diff --git a/tvix/castore/src/import/blobs.rs b/tvix/castore/src/import/blobs.rs
new file mode 100644
index 0000000000..8135d871d6
--- /dev/null
+++ b/tvix/castore/src/import/blobs.rs
@@ -0,0 +1,177 @@
+use std::{
+    io::{Cursor, Write},
+    sync::Arc,
+};
+
+use tokio::{
+    io::AsyncRead,
+    sync::Semaphore,
+    task::{JoinError, JoinSet},
+};
+use tokio_util::io::InspectReader;
+
+use crate::{blobservice::BlobService, B3Digest, Path, PathBuf};
+
+/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
+/// background.
+///
+/// This is a u32 since we acquire a weighted semaphore using the size of the blob.
+/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of
+/// the blob can be represented using a u32 and will not cause an overflow.
+const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
+
+/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
+const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024;
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("unable to read blob contents for {0}: {1}")]
+    BlobRead(PathBuf, std::io::Error),
+
+    // FUTUREWORK: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(PathBuf, std::io::Error),
+
+    #[error("unexpected size for {path} wanted: {wanted} got: {got}")]
+    UnexpectedSize {
+        path: PathBuf,
+        wanted: u64,
+        got: u64,
+    },
+
+    #[error("blob upload join error: {0}")]
+    JoinError(#[from] JoinError),
+}
+
+/// The concurrent blob uploader provides a mechanism for concurrently uploading small blobs.
+/// This is useful when ingesting from sources like tarballs and archives which each blob entry
+/// must be read sequentially. Ingesting many small blobs sequentially becomes slow due to
+/// round trip time with the blob service. The concurrent blob uploader will buffer small
+/// blobs in memory and upload them to the blob service in the background.
+///
+/// Once all blobs have been uploaded, make sure to call [ConcurrentBlobUploader::join] to wait
+/// for all background jobs to complete and check for any errors.
+pub struct ConcurrentBlobUploader<BS> {
+    blob_service: BS,
+    upload_tasks: JoinSet<Result<(), Error>>,
+    upload_semaphore: Arc<Semaphore>,
+}
+
+impl<BS> ConcurrentBlobUploader<BS>
+where
+    BS: BlobService + Clone + 'static,
+{
+    /// Creates a new concurrent blob uploader which uploads blobs to the provided
+    /// blob service.
+    pub fn new(blob_service: BS) -> Self {
+        Self {
+            blob_service,
+            upload_tasks: JoinSet::new(),
+            upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)),
+        }
+    }
+
+    /// Uploads a blob to the blob service. If the blob is small enough it will be read to a buffer
+    /// and uploaded in the background.
+    /// This will read the entirety of the provided reader unless an error occurs, even if blobs
+    /// are uploaded in the background..
+    pub async fn upload<R>(
+        &mut self,
+        path: &Path,
+        expected_size: u64,
+        mut r: R,
+    ) -> Result<B3Digest, Error>
+    where
+        R: AsyncRead + Unpin,
+    {
+        if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
+            let mut buffer = Vec::with_capacity(expected_size as usize);
+            let mut hasher = blake3::Hasher::new();
+            let mut reader = InspectReader::new(&mut r, |bytes| {
+                hasher.write_all(bytes).unwrap();
+            });
+
+            let permit = self
+                .upload_semaphore
+                .clone()
+                // This cast is safe because ensure the header_size is less than
+                // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32.
+                .acquire_many_owned(expected_size as u32)
+                .await
+                .unwrap();
+            let size = tokio::io::copy(&mut reader, &mut buffer)
+                .await
+                .map_err(|e| Error::BlobRead(path.into(), e))?;
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
+
+            if size != expected_size {
+                return Err(Error::UnexpectedSize {
+                    path: path.into(),
+                    wanted: expected_size,
+                    got: size,
+                });
+            }
+
+            self.upload_tasks.spawn({
+                let blob_service = self.blob_service.clone();
+                let expected_digest = digest.clone();
+                let path = path.to_owned();
+                let r = Cursor::new(buffer);
+                async move {
+                    let digest = upload_blob(&blob_service, &path, expected_size, r).await?;
+
+                    assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch");
+
+                    // Make sure we hold the permit until we finish writing the blob
+                    // to the [BlobService].
+                    drop(permit);
+                    Ok(())
+                }
+            });
+
+            return Ok(digest);
+        }
+
+        upload_blob(&self.blob_service, path, expected_size, r).await
+    }
+
+    /// Waits for all background upload jobs to complete, returning any upload errors.
+    pub async fn join(mut self) -> Result<(), Error> {
+        while let Some(result) = self.upload_tasks.join_next().await {
+            result??;
+        }
+        Ok(())
+    }
+}
+
+async fn upload_blob<BS, R>(
+    blob_service: &BS,
+    path: &Path,
+    expected_size: u64,
+    mut r: R,
+) -> Result<B3Digest, Error>
+where
+    BS: BlobService,
+    R: AsyncRead + Unpin,
+{
+    let mut writer = blob_service.open_write().await;
+
+    let size = tokio::io::copy(&mut r, &mut writer)
+        .await
+        .map_err(|e| Error::BlobRead(path.into(), e))?;
+
+    let digest = writer
+        .close()
+        .await
+        .map_err(|e| Error::BlobFinalize(path.into(), e))?;
+
+    if size != expected_size {
+        return Err(Error::UnexpectedSize {
+            path: path.into(),
+            wanted: expected_size,
+            got: size,
+        });
+    }
+
+    Ok(digest)
+}