diff options
Diffstat (limited to 'tvix/castore/src/import/archive.rs')
-rw-r--r-- | tvix/castore/src/import/archive.rs | 188 |
1 files changed, 82 insertions, 106 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index adcfb871d5..cd5b1290e0 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -1,51 +1,58 @@ -use std::io::{Cursor, Write}; -use std::sync::Arc; -use std::{collections::HashMap, path::PathBuf}; +//! Imports from an archive (tarballs) + +use std::collections::HashMap; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::{DfsPostOrder, EdgeRef}; use petgraph::Direction; use tokio::io::AsyncRead; -use tokio::sync::Semaphore; -use tokio::task::JoinSet; use tokio_stream::StreamExt; use tokio_tar::Archive; -use tokio_util::io::InspectReader; use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; -use crate::import::{ingest_entries, Error as ImportError, IngestionEntry}; +use crate::import::{ingest_entries, IngestionEntry, IngestionError}; use crate::proto::node::Node; -use crate::B3Digest; -/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the -/// background. -/// -/// This is a u32 since we acquire a weighted semaphore using the size of the blob. -/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of -/// the blob can be represented using a u32 and will not cause an overflow. -const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024; +use super::blobs::{self, ConcurrentBlobUploader}; -/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads. -const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024; +type TarPathBuf = std::path::PathBuf; #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("error reading archive entry: {0}")] - Io(#[from] std::io::Error), + #[error("unable to construct stream of entries: {0}")] + Entries(std::io::Error), + + #[error("unable to read next entry: {0}")] + NextEntry(std::io::Error), + + #[error("unable to read path for entry: {0}")] + PathRead(std::io::Error), + + #[error("unable to convert path {0} for entry: {1}")] + PathConvert(TarPathBuf, std::io::Error), + + #[error("unable to read size field for {0}: {1}")] + Size(TarPathBuf, std::io::Error), + + #[error("unable to read mode field for {0}: {1}")] + Mode(TarPathBuf, std::io::Error), + + #[error("unable to read link name field for {0}: {1}")] + LinkName(TarPathBuf, std::io::Error), #[error("unsupported tar entry {0} type: {1:?}")] - UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), + EntryType(TarPathBuf, tokio_tar::EntryType), #[error("symlink missing target {0}")] - MissingSymlinkTarget(PathBuf), + MissingSymlinkTarget(TarPathBuf), #[error("unexpected number of top level directory entries")] UnexpectedNumberOfTopLevelEntries, - #[error("failed to import into castore {0}")] - Import(#[from] ImportError), + #[error(transparent)] + BlobUploadError(#[from] blobs::Error), } /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and @@ -55,10 +62,10 @@ pub async fn ingest_archive<BS, DS, R>( blob_service: BS, directory_service: DS, mut archive: Archive<R>, -) -> Result<Node, Error> +) -> Result<Node, IngestionError<Error>> where BS: BlobService + Clone + 'static, - DS: AsRef<dyn DirectoryService>, + DS: DirectoryService, R: AsyncRead + Unpin, { // Since tarballs can have entries in any arbitrary order, we need to @@ -68,105 +75,68 @@ where // In the first phase, collect up all the regular files and symlinks. let mut nodes = IngestionEntryGraph::new(); - let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE)); - let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new(); + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); + + let mut entries_iter = archive.entries().map_err(Error::Entries)?; + while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? { + let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into(); - let mut entries_iter = archive.entries()?; - while let Some(mut entry) = entries_iter.try_next().await? { - let path: PathBuf = entry.path()?.into(); + // construct a castore PathBuf, which we use in the produced IngestionEntry. + let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true) + .map_err(|e| Error::PathConvert(tar_path.clone(), e))?; let header = entry.header(); let entry = match header.entry_type() { tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { - let header_size = header.size()?; - - // If the blob is small enough, read it off the wire, compute the digest, - // and upload it to the [BlobService] in the background. - let (size, digest) = if header_size <= CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 { - let mut buffer = Vec::with_capacity(header_size as usize); - let mut hasher = blake3::Hasher::new(); - let mut reader = InspectReader::new(&mut entry, |bytes| { - hasher.write_all(bytes).unwrap(); - }); - - // Ensure that we don't buffer into memory until we've acquired a permit. - // This prevents consuming too much memory when performing concurrent - // blob uploads. - let permit = semaphore - .clone() - // This cast is safe because ensure the header_size is less than - // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32. - .acquire_many_owned(header_size as u32) - .await - .unwrap(); - let size = tokio::io::copy(&mut reader, &mut buffer).await?; - - let digest: B3Digest = hasher.finalize().as_bytes().into(); - - { - let blob_service = blob_service.clone(); - let digest = digest.clone(); - async_blob_uploads.spawn({ - async move { - let mut writer = blob_service.open_write().await; - - tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?; - - let blob_digest = writer.close().await?; - - assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); - - // Make sure we hold the permit until we finish writing the blob - // to the [BlobService]. - drop(permit); - Ok(()) - } - }); - } - - (size, digest) - } else { - let mut writer = blob_service.open_write().await; - - let size = tokio::io::copy(&mut entry, &mut writer).await?; - - let digest = writer.close().await?; - - (size, digest) - }; + let size = header + .size() + .map_err(|e| Error::Size(tar_path.clone(), e))?; + + let digest = blob_uploader + .upload(&path, size, &mut entry) + .await + .map_err(Error::BlobUploadError)?; + + let executable = entry + .header() + .mode() + .map_err(|e| Error::Mode(tar_path, e))? + & 64 + != 0; IngestionEntry::Regular { path, size, - executable: entry.header().mode()? & 64 != 0, + executable, digest, } } tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { target: entry - .link_name()? - .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? - .into(), + .link_name() + .map_err(|e| Error::LinkName(tar_path.clone(), e))? + .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))? + .into_owned() + .into_os_string() + .into_encoded_bytes(), path, }, // Push a bogus directory marker so we can make sure this directoy gets // created. We don't know the digest and size until after reading the full // tarball. - tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() }, + tokio_tar::EntryType::Directory => IngestionEntry::Dir { path }, tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, - entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), + entry_type => return Err(Error::EntryType(tar_path, entry_type).into()), }; nodes.add(entry)?; } - while let Some(result) = async_blob_uploads.join_next().await { - result.expect("task panicked")?; - } + blob_uploader.join().await.map_err(Error::BlobUploadError)?; let root_node = ingest_entries( directory_service, @@ -193,7 +163,7 @@ where /// An error is returned if this is not the case and ingestion will fail. struct IngestionEntryGraph { graph: DiGraph<IngestionEntry, ()>, - path_to_index: HashMap<PathBuf, NodeIndex>, + path_to_index: HashMap<crate::path::PathBuf, NodeIndex>, root_node: Option<NodeIndex>, } @@ -218,7 +188,7 @@ impl IngestionEntryGraph { /// and new nodes are not directories, the node is replaced and is disconnected from its /// children. pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> { - let path = entry.path().to_path_buf(); + let path = entry.path().to_owned(); let index = match self.path_to_index.get(entry.path()) { Some(&index) => { @@ -233,12 +203,12 @@ impl IngestionEntryGraph { None => self.graph.add_node(entry), }; - // A path with 1 component is the root node + // for archives, a path with 1 component is the root node if path.components().count() == 1 { // We expect archives to contain a single root node, if there is another root node // entry with a different path name, this is unsupported. if let Some(root_node) = self.root_node { - if self.get_node(root_node).path() != path { + if self.get_node(root_node).path() != path.as_ref() { return Err(Error::UnexpectedNumberOfTopLevelEntries); } } @@ -247,7 +217,7 @@ impl IngestionEntryGraph { } else if let Some(parent_path) = path.parent() { // Recursively add the parent node until it hits the root node. let parent_index = self.add(IngestionEntry::Dir { - path: parent_path.to_path_buf(), + path: parent_path.to_owned(), })?; // Insert an edge from the parent directory to the child entry. @@ -332,23 +302,29 @@ mod test { lazy_static! { pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into(); - pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() }; - pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() }; - pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() }; + pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { + path: "a".parse().unwrap() + }; + pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { + path: "b".parse().unwrap() + }; + pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { + path: "a/b".parse().unwrap() + }; pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular { - path: "a".into(), + path: "a".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular { - path: "a/b".into(), + path: "a/b".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular { - path: "a/b/c".into(), + path: "a/b/c".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), |