about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2024-04-21T22·40-0500
committerclbot <clbot@tvl.fyi>2024-04-24T15·41+0000
commitd2e67f021ecb54435d5ac57aa431364417eb59fa (patch)
tree84e23282225675365b64d7415fd2793e95be5aeb /tvix
parent79698c470cf9e043204741b727ce34041fcb1e32 (diff)
refactor(tvix/castore): add separate Error enum for archives r/8002
The `Error` enum for the `imports` crate has both filesystem and archive
specific errors and was starting to get messy.

This adds a separate `Error` enum for archive-specific errors and then
keeps a single `Archive` variant in the top-level import `Error` for all
archive errors.

Change-Id: I4cd0746c864e5ec50b1aa68c0630ef9cd05176c7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11498
Tested-by: BuildkiteCI
Autosubmit: Connor Brewster <cbrewster@hey.com>
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix')
-rw-r--r--tvix/castore/src/import/archive.rs55
-rw-r--r--tvix/castore/src/import/error.rs12
-rw-r--r--tvix/glue/src/builtins/errors.rs3
3 files changed, 37 insertions, 33 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index 84e280fafb..adcfb871d5 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -15,7 +15,7 @@ use tracing::{instrument, warn, Level};
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
-use crate::import::{ingest_entries, Error, IngestionEntry};
+use crate::import::{ingest_entries, Error as ImportError, IngestionEntry};
 use crate::proto::node::Node;
 use crate::B3Digest;
 
@@ -30,6 +30,24 @@ const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
 /// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
 const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
 
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("error reading archive entry: {0}")]
+    Io(#[from] std::io::Error),
+
+    #[error("unsupported tar entry {0} type: {1:?}")]
+    UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
+
+    #[error("symlink missing target {0}")]
+    MissingSymlinkTarget(PathBuf),
+
+    #[error("unexpected number of top level directory entries")]
+    UnexpectedNumberOfTopLevelEntries,
+
+    #[error("failed to import into castore {0}")]
+    Import(#[from] ImportError),
+}
+
 /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
 /// [`DirectoryService`].
 #[instrument(skip_all, ret(level = Level::TRACE), err)]
@@ -53,16 +71,16 @@ where
     let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE));
     let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new();
 
-    let mut entries_iter = archive.entries().map_err(Error::Archive)?;
-    while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::Archive)? {
-        let path: PathBuf = entry.path().map_err(Error::Archive)?.into();
+    let mut entries_iter = archive.entries()?;
+    while let Some(mut entry) = entries_iter.try_next().await? {
+        let path: PathBuf = entry.path()?.into();
 
         let header = entry.header();
         let entry = match header.entry_type() {
             tokio_tar::EntryType::Regular
             | tokio_tar::EntryType::GNUSparse
             | tokio_tar::EntryType::Continuous => {
-                let header_size = header.size().map_err(Error::Archive)?;
+                let header_size = header.size()?;
 
                 // If the blob is small enough, read it off the wire, compute the digest,
                 // and upload it to the [BlobService] in the background.
@@ -83,9 +101,7 @@ where
                         .acquire_many_owned(header_size as u32)
                         .await
                         .unwrap();
-                    let size = tokio::io::copy(&mut reader, &mut buffer)
-                        .await
-                        .map_err(Error::Archive)?;
+                    let size = tokio::io::copy(&mut reader, &mut buffer).await?;
 
                     let digest: B3Digest = hasher.finalize().as_bytes().into();
 
@@ -96,11 +112,9 @@ where
                             async move {
                                 let mut writer = blob_service.open_write().await;
 
-                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer)
-                                    .await
-                                    .map_err(Error::Archive)?;
+                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?;
 
-                                let blob_digest = writer.close().await.map_err(Error::Archive)?;
+                                let blob_digest = writer.close().await?;
 
                                 assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
 
@@ -116,11 +130,9 @@ where
                 } else {
                     let mut writer = blob_service.open_write().await;
 
-                    let size = tokio::io::copy(&mut entry, &mut writer)
-                        .await
-                        .map_err(Error::Archive)?;
+                    let size = tokio::io::copy(&mut entry, &mut writer).await?;
 
-                    let digest = writer.close().await.map_err(Error::Archive)?;
+                    let digest = writer.close().await?;
 
                     (size, digest)
                 };
@@ -128,14 +140,13 @@ where
                 IngestionEntry::Regular {
                     path,
                     size,
-                    executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0,
+                    executable: entry.header().mode()? & 64 != 0,
                     digest,
                 }
             }
             tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
                 target: entry
-                    .link_name()
-                    .map_err(Error::Archive)?
+                    .link_name()?
                     .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
                     .into(),
                 path,
@@ -157,11 +168,13 @@ where
         result.expect("task panicked")?;
     }
 
-    ingest_entries(
+    let root_node = ingest_entries(
         directory_service,
         futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)),
     )
-    .await
+    .await?;
+
+    Ok(root_node)
 }
 
 /// Keep track of the directory structure of a file tree being ingested. This is used
diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs
index 8cd4f95ffb..15dd0664de 100644
--- a/tvix/castore/src/import/error.rs
+++ b/tvix/castore/src/import/error.rs
@@ -19,20 +19,8 @@ pub enum Error {
     #[error("unable to read {0}: {1}")]
     UnableToRead(PathBuf, std::io::Error),
 
-    #[error("error reading from archive: {0}")]
-    Archive(std::io::Error),
-
     #[error("unsupported file {0} type: {1:?}")]
     UnsupportedFileType(PathBuf, FileType),
-
-    #[error("unsupported tar entry {0} type: {1:?}")]
-    UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
-
-    #[error("symlink missing target {0}")]
-    MissingSymlinkTarget(PathBuf),
-
-    #[error("unexpected number of top level directory entries")]
-    UnexpectedNumberOfTopLevelEntries,
 }
 
 impl From<CastoreError> for Error {
diff --git a/tvix/glue/src/builtins/errors.rs b/tvix/glue/src/builtins/errors.rs
index 5e36bc1a24..c05d366f13 100644
--- a/tvix/glue/src/builtins/errors.rs
+++ b/tvix/glue/src/builtins/errors.rs
@@ -54,6 +54,9 @@ pub enum FetcherError {
     #[error(transparent)]
     Import(#[from] tvix_castore::import::Error),
 
+    #[error(transparent)]
+    ImportArchive(#[from] tvix_castore::import::archive::Error),
+
     #[error("Error calculating store path for fetcher output: {0}")]
     StorePath(#[from] BuildStorePathError),
 }