about summary refs log tree commit diff
path: root/tvix/castore/src/import/fs.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-30T15·48+0300
committerclbot <clbot@tvl.fyi>2024-04-30T17·12+0000
commitc9d3946cb583631bc2ca4a1343f054f7ee64a626 (patch)
tree358102f0d0f938ea01ba905d7672c609995aadfe /tvix/castore/src/import/fs.rs
parent77546d734efe704f52a4c89b5159cb2d98d5a8aa (diff)
refactor(tvix/castore/import): restructure error types r/8048
Have ingest_entries return an Error type with only three kinds:

 - Error while uploading a specific Directory
 - Error while finalizing the directory upload
 - Error from the producer

Move all ingestion method-specific errors to the individual
implementations.

Change-Id: I2a015cb7ebc96d084cbe2b809f40d1b53a15daf3
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11557
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix/castore/src/import/fs.rs')
-rw-r--r--tvix/castore/src/import/fs.rs39
1 files changed, 30 insertions, 9 deletions
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 6eab245f5500..9a0bb5855a90 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -2,10 +2,12 @@
 
 use futures::stream::BoxStream;
 use futures::StreamExt;
+use std::fs::FileType;
 use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
 use std::path::Path;
+use std::path::PathBuf;
 use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
@@ -16,8 +18,8 @@ use crate::proto::node::Node;
 use crate::B3Digest;
 
 use super::ingest_entries;
-use super::Error;
 use super::IngestionEntry;
+use super::IngestionError;
 
 /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
 /// [DirectoryService]. It returns the root node or an error.
@@ -31,7 +33,7 @@ pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
     path: P,
-) -> Result<Node, Error>
+) -> Result<Node, IngestionError<Error>>
 where
     P: AsRef<Path> + std::fmt::Debug,
     BS: BlobService + Clone,
@@ -73,7 +75,7 @@ where
                         Ok(dir_entry) => {
                             dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await
                         }
-                        Err(e) => Err(Error::UnableToStat(
+                        Err(e) => Err(Error::Stat(
                             prefix.to_path_buf(),
                             e.into_io_error().expect("walkdir err must be some"),
                         )),
@@ -109,7 +111,7 @@ where
         Ok(IngestionEntry::Dir { path })
     } else if file_type.is_symlink() {
         let target = std::fs::read_link(entry.path())
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))?
             .into_os_string()
             .into_vec();
 
@@ -117,7 +119,7 @@ where
     } else if file_type.is_file() {
         let metadata = entry
             .metadata()
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
 
         let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?;
 
@@ -130,7 +132,7 @@ where
             digest,
         })
     } else {
-        return Err(Error::UnsupportedFileType(path, file_type));
+        return Err(Error::FileType(path, file_type));
     }
 }
 
@@ -142,19 +144,38 @@ where
 {
     let mut file = match tokio::fs::File::open(path.as_ref()).await {
         Ok(file) => file,
-        Err(e) => return Err(Error::UnableToRead(path.as_ref().to_path_buf(), e)),
+        Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)),
     };
 
     let mut writer = blob_service.open_write().await;
 
     if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
-        return Err(Error::UnableToRead(path.as_ref().to_path_buf(), e));
+        return Err(Error::BlobRead(path.as_ref().to_path_buf(), e));
     };
 
     let digest = writer
         .close()
         .await
-        .map_err(|e| Error::UnableToRead(path.as_ref().to_path_buf(), e))?;
+        .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
 
     Ok(digest)
 }
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("unsupported file type at {0}: {1:?}")]
+    FileType(PathBuf, FileType),
+
+    #[error("unable to stat {0}: {1}")]
+    Stat(PathBuf, std::io::Error),
+
+    #[error("unable to open {0}: {1}")]
+    Open(PathBuf, std::io::Error),
+
+    #[error("unable to read {0}: {1}")]
+    BlobRead(PathBuf, std::io::Error),
+
+    // TODO: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(PathBuf, std::io::Error),
+}