about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/import/archive.rs82
-rw-r--r--tvix/castore/src/import/error.rs43
-rw-r--r--tvix/castore/src/import/fs.rs39
-rw-r--r--tvix/castore/src/import/mod.rs20
4 files changed, 119 insertions, 65 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index a3f63be49098..2da8b945d948 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -17,7 +17,7 @@ use tracing::{instrument, warn, Level};
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
-use crate::import::{ingest_entries, Error as ImportError, IngestionEntry};
+use crate::import::{ingest_entries, IngestionEntry, IngestionError};
 use crate::proto::node::Node;
 use crate::B3Digest;
 
@@ -34,20 +34,39 @@ const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
-    #[error("error reading archive entry: {0}")]
-    Io(#[from] std::io::Error),
+    #[error("unable to construct stream of entries: {0}")]
+    Entries(std::io::Error),
+
+    #[error("unable to read next entry: {0}")]
+    NextEntry(std::io::Error),
+
+    #[error("unable to read path for entry: {0}")]
+    Path(std::io::Error),
+
+    #[error("unable to read size field for {0}: {1}")]
+    Size(PathBuf, std::io::Error),
+
+    #[error("unable to read mode field for {0}: {1}")]
+    Mode(PathBuf, std::io::Error),
+
+    #[error("unable to read link name field for {0}: {1}")]
+    LinkName(PathBuf, std::io::Error),
+
+    #[error("unable to read blob contents for {0}: {1}")]
+    BlobRead(PathBuf, std::io::Error),
+
+    // TODO: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(PathBuf, std::io::Error),
 
     #[error("unsupported tar entry {0} type: {1:?}")]
-    UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
+    EntryType(PathBuf, tokio_tar::EntryType),
 
     #[error("symlink missing target {0}")]
     MissingSymlinkTarget(PathBuf),
 
     #[error("unexpected number of top level directory entries")]
     UnexpectedNumberOfTopLevelEntries,
-
-    #[error("failed to import into castore {0}")]
-    Import(#[from] ImportError),
 }
 
 /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
@@ -57,7 +76,7 @@ pub async fn ingest_archive<BS, DS, R>(
     blob_service: BS,
     directory_service: DS,
     mut archive: Archive<R>,
-) -> Result<Node, Error>
+) -> Result<Node, IngestionError<Error>>
 where
     BS: BlobService + Clone + 'static,
     DS: AsRef<dyn DirectoryService>,
@@ -73,16 +92,18 @@ where
     let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE));
     let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new();
 
-    let mut entries_iter = archive.entries()?;
-    while let Some(mut entry) = entries_iter.try_next().await? {
-        let path: PathBuf = entry.path()?.into();
+    let mut entries_iter = archive.entries().map_err(Error::Entries)?;
+    while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
+        let path: PathBuf = entry.path().map_err(Error::Path)?.into();
 
         let header = entry.header();
         let entry = match header.entry_type() {
             tokio_tar::EntryType::Regular
             | tokio_tar::EntryType::GNUSparse
             | tokio_tar::EntryType::Continuous => {
-                let header_size = header.size()?;
+                let header_size = header
+                    .size()
+                    .map_err(|e| Error::Size(path.to_path_buf(), e))?;
 
                 // If the blob is small enough, read it off the wire, compute the digest,
                 // and upload it to the [BlobService] in the background.
@@ -103,7 +124,9 @@ where
                         .acquire_many_owned(header_size as u32)
                         .await
                         .unwrap();
-                    let size = tokio::io::copy(&mut reader, &mut buffer).await?;
+                    let size = tokio::io::copy(&mut reader, &mut buffer)
+                        .await
+                        .map_err(|e| Error::Size(path.to_path_buf(), e))?;
 
                     let digest: B3Digest = hasher.finalize().as_bytes().into();
 
@@ -111,12 +134,18 @@ where
                         let blob_service = blob_service.clone();
                         let digest = digest.clone();
                         async_blob_uploads.spawn({
+                            let path = path.clone();
                             async move {
                                 let mut writer = blob_service.open_write().await;
 
-                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?;
+                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer)
+                                    .await
+                                    .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?;
 
-                                let blob_digest = writer.close().await?;
+                                let blob_digest = writer
+                                    .close()
+                                    .await
+                                    .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?;
 
                                 assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
 
@@ -132,23 +161,36 @@ where
                 } else {
                     let mut writer = blob_service.open_write().await;
 
-                    let size = tokio::io::copy(&mut entry, &mut writer).await?;
+                    let size = tokio::io::copy(&mut entry, &mut writer)
+                        .await
+                        .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?;
 
-                    let digest = writer.close().await?;
+                    let digest = writer
+                        .close()
+                        .await
+                        .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?;
 
                     (size, digest)
                 };
 
+                let executable = entry
+                    .header()
+                    .mode()
+                    .map_err(|e| Error::Mode(path.to_path_buf(), e))?
+                    & 64
+                    != 0;
+
                 IngestionEntry::Regular {
                     path,
                     size,
-                    executable: entry.header().mode()? & 64 != 0,
+                    executable,
                     digest,
                 }
             }
             tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
                 target: entry
-                    .link_name()?
+                    .link_name()
+                    .map_err(|e| Error::LinkName(path.to_path_buf(), e))?
                     .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
                     .into_owned()
                     .into_os_string()
@@ -162,7 +204,7 @@ where
 
             tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
 
-            entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)),
+            entry_type => return Err(Error::EntryType(path, entry_type).into()),
         };
 
         nodes.add(entry)?;
diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs
index 15dd0664deaa..3c6689dce50c 100644
--- a/tvix/castore/src/import/error.rs
+++ b/tvix/castore/src/import/error.rs
@@ -1,39 +1,20 @@
-use std::{fs::FileType, path::PathBuf};
+use std::path::PathBuf;
 
 use crate::Error as CastoreError;
 
+/// Represents all error types that emitted by ingest_entries.
+/// It can represent errors uploading individual Directories and finalizing
+/// the upload.
+/// It also contains a generic error kind that'll carry ingestion-method
+/// specific errors.
 #[derive(Debug, thiserror::Error)]
-pub enum Error {
+pub enum IngestionError<E: std::fmt::Display> {
+    #[error("error from producer: {0}")]
+    Producer(#[from] E),
+
     #[error("failed to upload directory at {0}: {1}")]
     UploadDirectoryError(PathBuf, CastoreError),
 
-    #[error("invalid encoding encountered for entry {0:?}")]
-    InvalidEncoding(PathBuf),
-
-    #[error("unable to stat {0}: {1}")]
-    UnableToStat(PathBuf, std::io::Error),
-
-    #[error("unable to open {0}: {1}")]
-    UnableToOpen(PathBuf, std::io::Error),
-
-    #[error("unable to read {0}: {1}")]
-    UnableToRead(PathBuf, std::io::Error),
-
-    #[error("unsupported file {0} type: {1:?}")]
-    UnsupportedFileType(PathBuf, FileType),
-}
-
-impl From<CastoreError> for Error {
-    fn from(value: CastoreError) -> Self {
-        match value {
-            CastoreError::InvalidRequest(_) => panic!("tvix bug"),
-            CastoreError::StorageError(_) => panic!("error"),
-        }
-    }
-}
-
-impl From<Error> for std::io::Error {
-    fn from(value: Error) -> Self {
-        std::io::Error::new(std::io::ErrorKind::Other, value)
-    }
+    #[error("failed to finalize directory upload: {0}")]
+    FinalizeDirectoryUpload(CastoreError),
 }
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 6eab245f5500..9a0bb5855a90 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -2,10 +2,12 @@
 
 use futures::stream::BoxStream;
 use futures::StreamExt;
+use std::fs::FileType;
 use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
 use std::path::Path;
+use std::path::PathBuf;
 use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
@@ -16,8 +18,8 @@ use crate::proto::node::Node;
 use crate::B3Digest;
 
 use super::ingest_entries;
-use super::Error;
 use super::IngestionEntry;
+use super::IngestionError;
 
 /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
 /// [DirectoryService]. It returns the root node or an error.
@@ -31,7 +33,7 @@ pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
     path: P,
-) -> Result<Node, Error>
+) -> Result<Node, IngestionError<Error>>
 where
     P: AsRef<Path> + std::fmt::Debug,
     BS: BlobService + Clone,
@@ -73,7 +75,7 @@ where
                         Ok(dir_entry) => {
                             dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await
                         }
-                        Err(e) => Err(Error::UnableToStat(
+                        Err(e) => Err(Error::Stat(
                             prefix.to_path_buf(),
                             e.into_io_error().expect("walkdir err must be some"),
                         )),
@@ -109,7 +111,7 @@ where
         Ok(IngestionEntry::Dir { path })
     } else if file_type.is_symlink() {
         let target = std::fs::read_link(entry.path())
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))?
             .into_os_string()
             .into_vec();
 
@@ -117,7 +119,7 @@ where
     } else if file_type.is_file() {
         let metadata = entry
             .metadata()
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
 
         let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?;
 
@@ -130,7 +132,7 @@ where
             digest,
         })
     } else {
-        return Err(Error::UnsupportedFileType(path, file_type));
+        return Err(Error::FileType(path, file_type));
     }
 }
 
@@ -142,19 +144,38 @@ where
 {
     let mut file = match tokio::fs::File::open(path.as_ref()).await {
         Ok(file) => file,
-        Err(e) => return Err(Error::UnableToRead(path.as_ref().to_path_buf(), e)),
+        Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)),
     };
 
     let mut writer = blob_service.open_write().await;
 
     if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
-        return Err(Error::UnableToRead(path.as_ref().to_path_buf(), e));
+        return Err(Error::BlobRead(path.as_ref().to_path_buf(), e));
     };
 
     let digest = writer
         .close()
         .await
-        .map_err(|e| Error::UnableToRead(path.as_ref().to_path_buf(), e))?;
+        .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
 
     Ok(digest)
 }
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("unsupported file type at {0}: {1:?}")]
+    FileType(PathBuf, FileType),
+
+    #[error("unable to stat {0}: {1}")]
+    Stat(PathBuf, std::io::Error),
+
+    #[error("unable to open {0}: {1}")]
+    Open(PathBuf, std::io::Error),
+
+    #[error("unable to read {0}: {1}")]
+    BlobRead(PathBuf, std::io::Error),
+
+    // TODO: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(PathBuf, std::io::Error),
+}
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index d2234846dcbc..bf21001822e0 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -26,7 +26,7 @@ use std::{
 use tracing::instrument;
 
 mod error;
-pub use error::Error;
+pub use error::IngestionError;
 
 pub mod archive;
 pub mod fs;
@@ -49,10 +49,14 @@ pub mod fs;
 ///
 /// On success, returns the root node.
 #[instrument(skip_all, ret(level = Level::TRACE), err)]
-pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error>
+pub async fn ingest_entries<DS, S, E>(
+    directory_service: DS,
+    mut entries: S,
+) -> Result<Node, IngestionError<E>>
 where
     DS: AsRef<dyn DirectoryService>,
-    S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin,
+    S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
+    E: std::error::Error,
 {
     // For a given path, this holds the [Directory] structs as they are populated.
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
@@ -102,7 +106,10 @@ where
                 maybe_directory_putter
                     .get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
                     .put(directory)
-                    .await?;
+                    .await
+                    .map_err(|e| {
+                        IngestionError::UploadDirectoryError(entry.path().to_path_buf(), e)
+                    })?;
 
                 Node::Directory(DirectoryNode {
                     name,
@@ -147,7 +154,10 @@ where
     // they're all persisted to the backend.
     if let Some(mut directory_putter) = maybe_directory_putter {
         #[cfg_attr(not(debug_assertions), allow(unused))]
-        let root_directory_digest = directory_putter.close().await?;
+        let root_directory_digest = directory_putter
+            .close()
+            .await
+            .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
 
         #[cfg(debug_assertions)]
         {