about summary refs log tree commit diff
path: root/tvix/castore/src/import/archive.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-30T15·48+0300
committerclbot <clbot@tvl.fyi>2024-04-30T17·12+0000
commitc9d3946cb583631bc2ca4a1343f054f7ee64a626 (patch)
tree358102f0d0f938ea01ba905d7672c609995aadfe /tvix/castore/src/import/archive.rs
parent77546d734efe704f52a4c89b5159cb2d98d5a8aa (diff)
refactor(tvix/castore/import): restructure error types r/8048
Have ingest_entries return an Error type with only three kinds:

 - Error while uploading a specific Directory
 - Error while finalizing the directory upload
 - Error from the producer

Move all ingestion method-specific errors to the individual
implementations.

Change-Id: I2a015cb7ebc96d084cbe2b809f40d1b53a15daf3
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11557
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix/castore/src/import/archive.rs')
-rw-r--r--tvix/castore/src/import/archive.rs82
1 files changed, 62 insertions, 20 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index a3f63be49098..2da8b945d948 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -17,7 +17,7 @@ use tracing::{instrument, warn, Level};
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
-use crate::import::{ingest_entries, Error as ImportError, IngestionEntry};
+use crate::import::{ingest_entries, IngestionEntry, IngestionError};
 use crate::proto::node::Node;
 use crate::B3Digest;
 
@@ -34,20 +34,39 @@ const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
-    #[error("error reading archive entry: {0}")]
-    Io(#[from] std::io::Error),
+    #[error("unable to construct stream of entries: {0}")]
+    Entries(std::io::Error),
+
+    #[error("unable to read next entry: {0}")]
+    NextEntry(std::io::Error),
+
+    #[error("unable to read path for entry: {0}")]
+    Path(std::io::Error),
+
+    #[error("unable to read size field for {0}: {1}")]
+    Size(PathBuf, std::io::Error),
+
+    #[error("unable to read mode field for {0}: {1}")]
+    Mode(PathBuf, std::io::Error),
+
+    #[error("unable to read link name field for {0}: {1}")]
+    LinkName(PathBuf, std::io::Error),
+
+    #[error("unable to read blob contents for {0}: {1}")]
+    BlobRead(PathBuf, std::io::Error),
+
+    // TODO: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(PathBuf, std::io::Error),
 
     #[error("unsupported tar entry {0} type: {1:?}")]
-    UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
+    EntryType(PathBuf, tokio_tar::EntryType),
 
     #[error("symlink missing target {0}")]
     MissingSymlinkTarget(PathBuf),
 
     #[error("unexpected number of top level directory entries")]
     UnexpectedNumberOfTopLevelEntries,
-
-    #[error("failed to import into castore {0}")]
-    Import(#[from] ImportError),
 }
 
 /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
@@ -57,7 +76,7 @@ pub async fn ingest_archive<BS, DS, R>(
     blob_service: BS,
     directory_service: DS,
     mut archive: Archive<R>,
-) -> Result<Node, Error>
+) -> Result<Node, IngestionError<Error>>
 where
     BS: BlobService + Clone + 'static,
     DS: AsRef<dyn DirectoryService>,
@@ -73,16 +92,18 @@ where
     let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE));
     let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new();
 
-    let mut entries_iter = archive.entries()?;
-    while let Some(mut entry) = entries_iter.try_next().await? {
-        let path: PathBuf = entry.path()?.into();
+    let mut entries_iter = archive.entries().map_err(Error::Entries)?;
+    while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
+        let path: PathBuf = entry.path().map_err(Error::Path)?.into();
 
         let header = entry.header();
         let entry = match header.entry_type() {
             tokio_tar::EntryType::Regular
             | tokio_tar::EntryType::GNUSparse
             | tokio_tar::EntryType::Continuous => {
-                let header_size = header.size()?;
+                let header_size = header
+                    .size()
+                    .map_err(|e| Error::Size(path.to_path_buf(), e))?;
 
                 // If the blob is small enough, read it off the wire, compute the digest,
                 // and upload it to the [BlobService] in the background.
@@ -103,7 +124,9 @@ where
                         .acquire_many_owned(header_size as u32)
                         .await
                         .unwrap();
-                    let size = tokio::io::copy(&mut reader, &mut buffer).await?;
+                    let size = tokio::io::copy(&mut reader, &mut buffer)
+                        .await
+                        .map_err(|e| Error::Size(path.to_path_buf(), e))?;
 
                     let digest: B3Digest = hasher.finalize().as_bytes().into();
 
@@ -111,12 +134,18 @@ where
                         let blob_service = blob_service.clone();
                         let digest = digest.clone();
                         async_blob_uploads.spawn({
+                            let path = path.clone();
                             async move {
                                 let mut writer = blob_service.open_write().await;
 
-                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?;
+                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer)
+                                    .await
+                                    .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?;
 
-                                let blob_digest = writer.close().await?;
+                                let blob_digest = writer
+                                    .close()
+                                    .await
+                                    .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?;
 
                                 assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
 
@@ -132,23 +161,36 @@ where
                 } else {
                     let mut writer = blob_service.open_write().await;
 
-                    let size = tokio::io::copy(&mut entry, &mut writer).await?;
+                    let size = tokio::io::copy(&mut entry, &mut writer)
+                        .await
+                        .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?;
 
-                    let digest = writer.close().await?;
+                    let digest = writer
+                        .close()
+                        .await
+                        .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?;
 
                     (size, digest)
                 };
 
+                let executable = entry
+                    .header()
+                    .mode()
+                    .map_err(|e| Error::Mode(path.to_path_buf(), e))?
+                    & 64
+                    != 0;
+
                 IngestionEntry::Regular {
                     path,
                     size,
-                    executable: entry.header().mode()? & 64 != 0,
+                    executable,
                     digest,
                 }
             }
             tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
                 target: entry
-                    .link_name()?
+                    .link_name()
+                    .map_err(|e| Error::LinkName(path.to_path_buf(), e))?
                     .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
                     .into_owned()
                     .into_os_string()
@@ -162,7 +204,7 @@ where
 
             tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
 
-            entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)),
+            entry_type => return Err(Error::EntryType(path, entry_type).into()),
         };
 
         nodes.add(entry)?;