diff options
Diffstat (limited to 'tvix/castore/src/import/archive.rs')
-rw-r--r-- | tvix/castore/src/import/archive.rs | 82 |
1 files changed, 62 insertions, 20 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index a3f63be49098..2da8b945d948 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -17,7 +17,7 @@ use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; -use crate::import::{ingest_entries, Error as ImportError, IngestionEntry}; +use crate::import::{ingest_entries, IngestionEntry, IngestionError}; use crate::proto::node::Node; use crate::B3Digest; @@ -34,20 +34,39 @@ const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024; #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("error reading archive entry: {0}")] - Io(#[from] std::io::Error), + #[error("unable to construct stream of entries: {0}")] + Entries(std::io::Error), + + #[error("unable to read next entry: {0}")] + NextEntry(std::io::Error), + + #[error("unable to read path for entry: {0}")] + Path(std::io::Error), + + #[error("unable to read size field for {0}: {1}")] + Size(PathBuf, std::io::Error), + + #[error("unable to read mode field for {0}: {1}")] + Mode(PathBuf, std::io::Error), + + #[error("unable to read link name field for {0}: {1}")] + LinkName(PathBuf, std::io::Error), + + #[error("unable to read blob contents for {0}: {1}")] + BlobRead(PathBuf, std::io::Error), + + // TODO: proper error for blob finalize + #[error("unable to finalize blob {0}: {1}")] + BlobFinalize(PathBuf, std::io::Error), #[error("unsupported tar entry {0} type: {1:?}")] - UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), + EntryType(PathBuf, tokio_tar::EntryType), #[error("symlink missing target {0}")] MissingSymlinkTarget(PathBuf), #[error("unexpected number of top level directory entries")] UnexpectedNumberOfTopLevelEntries, - - #[error("failed to import into castore {0}")] - Import(#[from] ImportError), } /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and @@ -57,7 +76,7 @@ pub async fn ingest_archive<BS, DS, R>( blob_service: BS, directory_service: DS, mut archive: Archive<R>, -) -> Result<Node, Error> +) -> Result<Node, IngestionError<Error>> where BS: BlobService + Clone + 'static, DS: AsRef<dyn DirectoryService>, @@ -73,16 +92,18 @@ where let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE)); let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new(); - let mut entries_iter = archive.entries()?; - while let Some(mut entry) = entries_iter.try_next().await? { - let path: PathBuf = entry.path()?.into(); + let mut entries_iter = archive.entries().map_err(Error::Entries)?; + while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? { + let path: PathBuf = entry.path().map_err(Error::Path)?.into(); let header = entry.header(); let entry = match header.entry_type() { tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { - let header_size = header.size()?; + let header_size = header + .size() + .map_err(|e| Error::Size(path.to_path_buf(), e))?; // If the blob is small enough, read it off the wire, compute the digest, // and upload it to the [BlobService] in the background. @@ -103,7 +124,9 @@ where .acquire_many_owned(header_size as u32) .await .unwrap(); - let size = tokio::io::copy(&mut reader, &mut buffer).await?; + let size = tokio::io::copy(&mut reader, &mut buffer) + .await + .map_err(|e| Error::Size(path.to_path_buf(), e))?; let digest: B3Digest = hasher.finalize().as_bytes().into(); @@ -111,12 +134,18 @@ where let blob_service = blob_service.clone(); let digest = digest.clone(); async_blob_uploads.spawn({ + let path = path.clone(); async move { let mut writer = blob_service.open_write().await; - tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?; + tokio::io::copy(&mut Cursor::new(buffer), &mut writer) + .await + .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?; - let blob_digest = writer.close().await?; + let blob_digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?; assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); @@ -132,23 +161,36 @@ where } else { let mut writer = blob_service.open_write().await; - let size = tokio::io::copy(&mut entry, &mut writer).await?; + let size = tokio::io::copy(&mut entry, &mut writer) + .await + .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?; - let digest = writer.close().await?; + let digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?; (size, digest) }; + let executable = entry + .header() + .mode() + .map_err(|e| Error::Mode(path.to_path_buf(), e))? + & 64 + != 0; + IngestionEntry::Regular { path, size, - executable: entry.header().mode()? & 64 != 0, + executable, digest, } } tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { target: entry - .link_name()? + .link_name() + .map_err(|e| Error::LinkName(path.to_path_buf(), e))? .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? .into_owned() .into_os_string() @@ -162,7 +204,7 @@ where tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, - entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), + entry_type => return Err(Error::EntryType(path, entry_type).into()), }; nodes.add(entry)?; |