diff options
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/import/archive.rs | 109 | ||||
-rw-r--r-- | tvix/castore/src/import/blobs.rs | 177 | ||||
-rw-r--r-- | tvix/castore/src/import/mod.rs | 1 |
3 files changed, 190 insertions, 97 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index 0ebb4a236117..cd5b1290e031 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -1,38 +1,23 @@ //! Imports from an archive (tarballs) use std::collections::HashMap; -use std::io::{Cursor, Write}; -use std::sync::Arc; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::{DfsPostOrder, EdgeRef}; use petgraph::Direction; use tokio::io::AsyncRead; -use tokio::sync::Semaphore; -use tokio::task::JoinSet; use tokio_stream::StreamExt; use tokio_tar::Archive; -use tokio_util::io::InspectReader; use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::{ingest_entries, IngestionEntry, IngestionError}; use crate::proto::node::Node; -use crate::B3Digest; -type TarPathBuf = std::path::PathBuf; - -/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the -/// background. -/// -/// This is a u32 since we acquire a weighted semaphore using the size of the blob. -/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of -/// the blob can be represented using a u32 and will not cause an overflow. -const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024; +use super::blobs::{self, ConcurrentBlobUploader}; -/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads. -const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024; +type TarPathBuf = std::path::PathBuf; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -57,13 +42,6 @@ pub enum Error { #[error("unable to read link name field for {0}: {1}")] LinkName(TarPathBuf, std::io::Error), - #[error("unable to read blob contents for {0}: {1}")] - BlobRead(TarPathBuf, std::io::Error), - - // FUTUREWORK: proper error for blob finalize - #[error("unable to finalize blob {0}: {1}")] - BlobFinalize(TarPathBuf, std::io::Error), - #[error("unsupported tar entry {0} type: {1:?}")] EntryType(TarPathBuf, tokio_tar::EntryType), @@ -72,6 +50,9 @@ pub enum Error { #[error("unexpected number of top level directory entries")] UnexpectedNumberOfTopLevelEntries, + + #[error(transparent)] + BlobUploadError(#[from] blobs::Error), } /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and @@ -94,8 +75,7 @@ where // In the first phase, collect up all the regular files and symlinks. let mut nodes = IngestionEntryGraph::new(); - let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE)); - let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new(); + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); let mut entries_iter = archive.entries().map_err(Error::Entries)?; while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? { @@ -110,77 +90,14 @@ where tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { - let header_size = header + let size = header .size() .map_err(|e| Error::Size(tar_path.clone(), e))?; - // If the blob is small enough, read it off the wire, compute the digest, - // and upload it to the [BlobService] in the background. - let (size, digest) = if header_size <= CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 { - let mut buffer = Vec::with_capacity(header_size as usize); - let mut hasher = blake3::Hasher::new(); - let mut reader = InspectReader::new(&mut entry, |bytes| { - hasher.write_all(bytes).unwrap(); - }); - - // Ensure that we don't buffer into memory until we've acquired a permit. - // This prevents consuming too much memory when performing concurrent - // blob uploads. - let permit = semaphore - .clone() - // This cast is safe because ensure the header_size is less than - // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32. - .acquire_many_owned(header_size as u32) - .await - .unwrap(); - let size = tokio::io::copy(&mut reader, &mut buffer) - .await - .map_err(|e| Error::Size(tar_path.clone(), e))?; - - let digest: B3Digest = hasher.finalize().as_bytes().into(); - - { - let blob_service = blob_service.clone(); - let digest = digest.clone(); - async_blob_uploads.spawn({ - let tar_path = tar_path.clone(); - async move { - let mut writer = blob_service.open_write().await; - - tokio::io::copy(&mut Cursor::new(buffer), &mut writer) - .await - .map_err(|e| Error::BlobRead(tar_path.clone(), e))?; - - let blob_digest = writer - .close() - .await - .map_err(|e| Error::BlobFinalize(tar_path, e))?; - - assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); - - // Make sure we hold the permit until we finish writing the blob - // to the [BlobService]. - drop(permit); - Ok(()) - } - }); - } - - (size, digest) - } else { - let mut writer = blob_service.open_write().await; - - let size = tokio::io::copy(&mut entry, &mut writer) - .await - .map_err(|e| Error::BlobRead(tar_path.clone(), e))?; - - let digest = writer - .close() - .await - .map_err(|e| Error::BlobFinalize(tar_path.clone(), e))?; - - (size, digest) - }; + let digest = blob_uploader + .upload(&path, size, &mut entry) + .await + .map_err(Error::BlobUploadError)?; let executable = entry .header() @@ -219,9 +136,7 @@ where nodes.add(entry)?; } - while let Some(result) = async_blob_uploads.join_next().await { - result.expect("task panicked")?; - } + blob_uploader.join().await.map_err(Error::BlobUploadError)?; let root_node = ingest_entries( directory_service, diff --git a/tvix/castore/src/import/blobs.rs b/tvix/castore/src/import/blobs.rs new file mode 100644 index 000000000000..8135d871d6c0 --- /dev/null +++ b/tvix/castore/src/import/blobs.rs @@ -0,0 +1,177 @@ +use std::{ + io::{Cursor, Write}, + sync::Arc, +}; + +use tokio::{ + io::AsyncRead, + sync::Semaphore, + task::{JoinError, JoinSet}, +}; +use tokio_util::io::InspectReader; + +use crate::{blobservice::BlobService, B3Digest, Path, PathBuf}; + +/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the +/// background. +/// +/// This is a u32 since we acquire a weighted semaphore using the size of the blob. +/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of +/// the blob can be represented using a u32 and will not cause an overflow. +const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024; + +/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads. +const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("unable to read blob contents for {0}: {1}")] + BlobRead(PathBuf, std::io::Error), + + // FUTUREWORK: proper error for blob finalize + #[error("unable to finalize blob {0}: {1}")] + BlobFinalize(PathBuf, std::io::Error), + + #[error("unexpected size for {path} wanted: {wanted} got: {got}")] + UnexpectedSize { + path: PathBuf, + wanted: u64, + got: u64, + }, + + #[error("blob upload join error: {0}")] + JoinError(#[from] JoinError), +} + +/// The concurrent blob uploader provides a mechanism for concurrently uploading small blobs. +/// This is useful when ingesting from sources like tarballs and archives which each blob entry +/// must be read sequentially. Ingesting many small blobs sequentially becomes slow due to +/// round trip time with the blob service. The concurrent blob uploader will buffer small +/// blobs in memory and upload them to the blob service in the background. +/// +/// Once all blobs have been uploaded, make sure to call [ConcurrentBlobUploader::join] to wait +/// for all background jobs to complete and check for any errors. +pub struct ConcurrentBlobUploader<BS> { + blob_service: BS, + upload_tasks: JoinSet<Result<(), Error>>, + upload_semaphore: Arc<Semaphore>, +} + +impl<BS> ConcurrentBlobUploader<BS> +where + BS: BlobService + Clone + 'static, +{ + /// Creates a new concurrent blob uploader which uploads blobs to the provided + /// blob service. + pub fn new(blob_service: BS) -> Self { + Self { + blob_service, + upload_tasks: JoinSet::new(), + upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)), + } + } + + /// Uploads a blob to the blob service. If the blob is small enough it will be read to a buffer + /// and uploaded in the background. + /// This will read the entirety of the provided reader unless an error occurs, even if blobs + /// are uploaded in the background.. + pub async fn upload<R>( + &mut self, + path: &Path, + expected_size: u64, + mut r: R, + ) -> Result<B3Digest, Error> + where + R: AsyncRead + Unpin, + { + if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 { + let mut buffer = Vec::with_capacity(expected_size as usize); + let mut hasher = blake3::Hasher::new(); + let mut reader = InspectReader::new(&mut r, |bytes| { + hasher.write_all(bytes).unwrap(); + }); + + let permit = self + .upload_semaphore + .clone() + // This cast is safe because ensure the header_size is less than + // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32. + .acquire_many_owned(expected_size as u32) + .await + .unwrap(); + let size = tokio::io::copy(&mut reader, &mut buffer) + .await + .map_err(|e| Error::BlobRead(path.into(), e))?; + let digest: B3Digest = hasher.finalize().as_bytes().into(); + + if size != expected_size { + return Err(Error::UnexpectedSize { + path: path.into(), + wanted: expected_size, + got: size, + }); + } + + self.upload_tasks.spawn({ + let blob_service = self.blob_service.clone(); + let expected_digest = digest.clone(); + let path = path.to_owned(); + let r = Cursor::new(buffer); + async move { + let digest = upload_blob(&blob_service, &path, expected_size, r).await?; + + assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch"); + + // Make sure we hold the permit until we finish writing the blob + // to the [BlobService]. + drop(permit); + Ok(()) + } + }); + + return Ok(digest); + } + + upload_blob(&self.blob_service, path, expected_size, r).await + } + + /// Waits for all background upload jobs to complete, returning any upload errors. + pub async fn join(mut self) -> Result<(), Error> { + while let Some(result) = self.upload_tasks.join_next().await { + result??; + } + Ok(()) + } +} + +async fn upload_blob<BS, R>( + blob_service: &BS, + path: &Path, + expected_size: u64, + mut r: R, +) -> Result<B3Digest, Error> +where + BS: BlobService, + R: AsyncRead + Unpin, +{ + let mut writer = blob_service.open_write().await; + + let size = tokio::io::copy(&mut r, &mut writer) + .await + .map_err(|e| Error::BlobRead(path.into(), e))?; + + let digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(path.into(), e))?; + + if size != expected_size { + return Err(Error::UnexpectedSize { + path: path.into(), + wanted: expected_size, + got: size, + }); + } + + Ok(digest) +} diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index e8b27e469c3d..4223fe538756 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -24,6 +24,7 @@ mod error; pub use error::IngestionError; pub mod archive; +pub mod blobs; pub mod fs; /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService]. |