about summary refs log tree commit diff
path: root/tvix/castore/src/import/archive.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/import/archive.rs')
-rw-r--r--tvix/castore/src/import/archive.rs109
1 files changed, 12 insertions, 97 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index 0ebb4a236117..cd5b1290e031 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -1,38 +1,23 @@
 //! Imports from an archive (tarballs)
 
 use std::collections::HashMap;
-use std::io::{Cursor, Write};
-use std::sync::Arc;
 
 use petgraph::graph::{DiGraph, NodeIndex};
 use petgraph::visit::{DfsPostOrder, EdgeRef};
 use petgraph::Direction;
 use tokio::io::AsyncRead;
-use tokio::sync::Semaphore;
-use tokio::task::JoinSet;
 use tokio_stream::StreamExt;
 use tokio_tar::Archive;
-use tokio_util::io::InspectReader;
 use tracing::{instrument, warn, Level};
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
 use crate::import::{ingest_entries, IngestionEntry, IngestionError};
 use crate::proto::node::Node;
-use crate::B3Digest;
 
-type TarPathBuf = std::path::PathBuf;
-
-/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
-/// background.
-///
-/// This is a u32 since we acquire a weighted semaphore using the size of the blob.
-/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of
-/// the blob can be represented using a u32 and will not cause an overflow.
-const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
+use super::blobs::{self, ConcurrentBlobUploader};
 
-/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
-const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
+type TarPathBuf = std::path::PathBuf;
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
@@ -57,13 +42,6 @@ pub enum Error {
     #[error("unable to read link name field for {0}: {1}")]
     LinkName(TarPathBuf, std::io::Error),
 
-    #[error("unable to read blob contents for {0}: {1}")]
-    BlobRead(TarPathBuf, std::io::Error),
-
-    // FUTUREWORK: proper error for blob finalize
-    #[error("unable to finalize blob {0}: {1}")]
-    BlobFinalize(TarPathBuf, std::io::Error),
-
     #[error("unsupported tar entry {0} type: {1:?}")]
     EntryType(TarPathBuf, tokio_tar::EntryType),
 
@@ -72,6 +50,9 @@ pub enum Error {
 
     #[error("unexpected number of top level directory entries")]
     UnexpectedNumberOfTopLevelEntries,
+
+    #[error(transparent)]
+    BlobUploadError(#[from] blobs::Error),
 }
 
 /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
@@ -94,8 +75,7 @@ where
     // In the first phase, collect up all the regular files and symlinks.
     let mut nodes = IngestionEntryGraph::new();
 
-    let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE));
-    let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new();
+    let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
 
     let mut entries_iter = archive.entries().map_err(Error::Entries)?;
     while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
@@ -110,77 +90,14 @@ where
             tokio_tar::EntryType::Regular
             | tokio_tar::EntryType::GNUSparse
             | tokio_tar::EntryType::Continuous => {
-                let header_size = header
+                let size = header
                     .size()
                     .map_err(|e| Error::Size(tar_path.clone(), e))?;
 
-                // If the blob is small enough, read it off the wire, compute the digest,
-                // and upload it to the [BlobService] in the background.
-                let (size, digest) = if header_size <= CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
-                    let mut buffer = Vec::with_capacity(header_size as usize);
-                    let mut hasher = blake3::Hasher::new();
-                    let mut reader = InspectReader::new(&mut entry, |bytes| {
-                        hasher.write_all(bytes).unwrap();
-                    });
-
-                    // Ensure that we don't buffer into memory until we've acquired a permit.
-                    // This prevents consuming too much memory when performing concurrent
-                    // blob uploads.
-                    let permit = semaphore
-                        .clone()
-                        // This cast is safe because ensure the header_size is less than
-                        // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32.
-                        .acquire_many_owned(header_size as u32)
-                        .await
-                        .unwrap();
-                    let size = tokio::io::copy(&mut reader, &mut buffer)
-                        .await
-                        .map_err(|e| Error::Size(tar_path.clone(), e))?;
-
-                    let digest: B3Digest = hasher.finalize().as_bytes().into();
-
-                    {
-                        let blob_service = blob_service.clone();
-                        let digest = digest.clone();
-                        async_blob_uploads.spawn({
-                            let tar_path = tar_path.clone();
-                            async move {
-                                let mut writer = blob_service.open_write().await;
-
-                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer)
-                                    .await
-                                    .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
-
-                                let blob_digest = writer
-                                    .close()
-                                    .await
-                                    .map_err(|e| Error::BlobFinalize(tar_path, e))?;
-
-                                assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
-
-                                // Make sure we hold the permit until we finish writing the blob
-                                // to the [BlobService].
-                                drop(permit);
-                                Ok(())
-                            }
-                        });
-                    }
-
-                    (size, digest)
-                } else {
-                    let mut writer = blob_service.open_write().await;
-
-                    let size = tokio::io::copy(&mut entry, &mut writer)
-                        .await
-                        .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
-
-                    let digest = writer
-                        .close()
-                        .await
-                        .map_err(|e| Error::BlobFinalize(tar_path.clone(), e))?;
-
-                    (size, digest)
-                };
+                let digest = blob_uploader
+                    .upload(&path, size, &mut entry)
+                    .await
+                    .map_err(Error::BlobUploadError)?;
 
                 let executable = entry
                     .header()
@@ -219,9 +136,7 @@ where
         nodes.add(entry)?;
     }
 
-    while let Some(result) = async_blob_uploads.join_next().await {
-        result.expect("task panicked")?;
-    }
+    blob_uploader.join().await.map_err(Error::BlobUploadError)?;
 
     let root_node = ingest_entries(
         directory_service,