diff options
Diffstat (limited to 'tvix/castore')
-rw-r--r-- | tvix/castore/src/import/archive.rs | 82 | ||||
-rw-r--r-- | tvix/castore/src/import/error.rs | 43 | ||||
-rw-r--r-- | tvix/castore/src/import/fs.rs | 39 | ||||
-rw-r--r-- | tvix/castore/src/import/mod.rs | 20 |
4 files changed, 119 insertions, 65 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index a3f63be49098..2da8b945d948 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -17,7 +17,7 @@ use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; -use crate::import::{ingest_entries, Error as ImportError, IngestionEntry}; +use crate::import::{ingest_entries, IngestionEntry, IngestionError}; use crate::proto::node::Node; use crate::B3Digest; @@ -34,20 +34,39 @@ const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024; #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("error reading archive entry: {0}")] - Io(#[from] std::io::Error), + #[error("unable to construct stream of entries: {0}")] + Entries(std::io::Error), + + #[error("unable to read next entry: {0}")] + NextEntry(std::io::Error), + + #[error("unable to read path for entry: {0}")] + Path(std::io::Error), + + #[error("unable to read size field for {0}: {1}")] + Size(PathBuf, std::io::Error), + + #[error("unable to read mode field for {0}: {1}")] + Mode(PathBuf, std::io::Error), + + #[error("unable to read link name field for {0}: {1}")] + LinkName(PathBuf, std::io::Error), + + #[error("unable to read blob contents for {0}: {1}")] + BlobRead(PathBuf, std::io::Error), + + // TODO: proper error for blob finalize + #[error("unable to finalize blob {0}: {1}")] + BlobFinalize(PathBuf, std::io::Error), #[error("unsupported tar entry {0} type: {1:?}")] - UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), + EntryType(PathBuf, tokio_tar::EntryType), #[error("symlink missing target {0}")] MissingSymlinkTarget(PathBuf), #[error("unexpected number of top level directory entries")] UnexpectedNumberOfTopLevelEntries, - - #[error("failed to import into castore {0}")] - Import(#[from] ImportError), } /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and @@ -57,7 +76,7 @@ pub async fn ingest_archive<BS, DS, R>( blob_service: BS, directory_service: DS, mut archive: Archive<R>, -) -> Result<Node, Error> +) -> Result<Node, IngestionError<Error>> where BS: BlobService + Clone + 'static, DS: AsRef<dyn DirectoryService>, @@ -73,16 +92,18 @@ where let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE)); let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new(); - let mut entries_iter = archive.entries()?; - while let Some(mut entry) = entries_iter.try_next().await? { - let path: PathBuf = entry.path()?.into(); + let mut entries_iter = archive.entries().map_err(Error::Entries)?; + while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? { + let path: PathBuf = entry.path().map_err(Error::Path)?.into(); let header = entry.header(); let entry = match header.entry_type() { tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { - let header_size = header.size()?; + let header_size = header + .size() + .map_err(|e| Error::Size(path.to_path_buf(), e))?; // If the blob is small enough, read it off the wire, compute the digest, // and upload it to the [BlobService] in the background. @@ -103,7 +124,9 @@ where .acquire_many_owned(header_size as u32) .await .unwrap(); - let size = tokio::io::copy(&mut reader, &mut buffer).await?; + let size = tokio::io::copy(&mut reader, &mut buffer) + .await + .map_err(|e| Error::Size(path.to_path_buf(), e))?; let digest: B3Digest = hasher.finalize().as_bytes().into(); @@ -111,12 +134,18 @@ where let blob_service = blob_service.clone(); let digest = digest.clone(); async_blob_uploads.spawn({ + let path = path.clone(); async move { let mut writer = blob_service.open_write().await; - tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?; + tokio::io::copy(&mut Cursor::new(buffer), &mut writer) + .await + .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?; - let blob_digest = writer.close().await?; + let blob_digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?; assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); @@ -132,23 +161,36 @@ where } else { let mut writer = blob_service.open_write().await; - let size = tokio::io::copy(&mut entry, &mut writer).await?; + let size = tokio::io::copy(&mut entry, &mut writer) + .await + .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?; - let digest = writer.close().await?; + let digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?; (size, digest) }; + let executable = entry + .header() + .mode() + .map_err(|e| Error::Mode(path.to_path_buf(), e))? + & 64 + != 0; + IngestionEntry::Regular { path, size, - executable: entry.header().mode()? & 64 != 0, + executable, digest, } } tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { target: entry - .link_name()? + .link_name() + .map_err(|e| Error::LinkName(path.to_path_buf(), e))? .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? .into_owned() .into_os_string() @@ -162,7 +204,7 @@ where tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, - entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), + entry_type => return Err(Error::EntryType(path, entry_type).into()), }; nodes.add(entry)?; diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs index 15dd0664deaa..3c6689dce50c 100644 --- a/tvix/castore/src/import/error.rs +++ b/tvix/castore/src/import/error.rs @@ -1,39 +1,20 @@ -use std::{fs::FileType, path::PathBuf}; +use std::path::PathBuf; use crate::Error as CastoreError; +/// Represents all error types that emitted by ingest_entries. +/// It can represent errors uploading individual Directories and finalizing +/// the upload. +/// It also contains a generic error kind that'll carry ingestion-method +/// specific errors. #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum IngestionError<E: std::fmt::Display> { + #[error("error from producer: {0}")] + Producer(#[from] E), + #[error("failed to upload directory at {0}: {1}")] UploadDirectoryError(PathBuf, CastoreError), - #[error("invalid encoding encountered for entry {0:?}")] - InvalidEncoding(PathBuf), - - #[error("unable to stat {0}: {1}")] - UnableToStat(PathBuf, std::io::Error), - - #[error("unable to open {0}: {1}")] - UnableToOpen(PathBuf, std::io::Error), - - #[error("unable to read {0}: {1}")] - UnableToRead(PathBuf, std::io::Error), - - #[error("unsupported file {0} type: {1:?}")] - UnsupportedFileType(PathBuf, FileType), -} - -impl From<CastoreError> for Error { - fn from(value: CastoreError) -> Self { - match value { - CastoreError::InvalidRequest(_) => panic!("tvix bug"), - CastoreError::StorageError(_) => panic!("error"), - } - } -} - -impl From<Error> for std::io::Error { - fn from(value: Error) -> Self { - std::io::Error::new(std::io::ErrorKind::Other, value) - } + #[error("failed to finalize directory upload: {0}")] + FinalizeDirectoryUpload(CastoreError), } diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index 6eab245f5500..9a0bb5855a90 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -2,10 +2,12 @@ use futures::stream::BoxStream; use futures::StreamExt; +use std::fs::FileType; use std::os::unix::ffi::OsStringExt; use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; use std::path::Path; +use std::path::PathBuf; use tracing::instrument; use walkdir::DirEntry; use walkdir::WalkDir; @@ -16,8 +18,8 @@ use crate::proto::node::Node; use crate::B3Digest; use super::ingest_entries; -use super::Error; use super::IngestionEntry; +use super::IngestionError; /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and /// [DirectoryService]. It returns the root node or an error. @@ -31,7 +33,7 @@ pub async fn ingest_path<BS, DS, P>( blob_service: BS, directory_service: DS, path: P, -) -> Result<Node, Error> +) -> Result<Node, IngestionError<Error>> where P: AsRef<Path> + std::fmt::Debug, BS: BlobService + Clone, @@ -73,7 +75,7 @@ where Ok(dir_entry) => { dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await } - Err(e) => Err(Error::UnableToStat( + Err(e) => Err(Error::Stat( prefix.to_path_buf(), e.into_io_error().expect("walkdir err must be some"), )), @@ -109,7 +111,7 @@ where Ok(IngestionEntry::Dir { path }) } else if file_type.is_symlink() { let target = std::fs::read_link(entry.path()) - .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))? + .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))? .into_os_string() .into_vec(); @@ -117,7 +119,7 @@ where } else if file_type.is_file() { let metadata = entry .metadata() - .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?; + .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?; let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?; @@ -130,7 +132,7 @@ where digest, }) } else { - return Err(Error::UnsupportedFileType(path, file_type)); + return Err(Error::FileType(path, file_type)); } } @@ -142,19 +144,38 @@ where { let mut file = match tokio::fs::File::open(path.as_ref()).await { Ok(file) => file, - Err(e) => return Err(Error::UnableToRead(path.as_ref().to_path_buf(), e)), + Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)), }; let mut writer = blob_service.open_write().await; if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { - return Err(Error::UnableToRead(path.as_ref().to_path_buf(), e)); + return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)); }; let digest = writer .close() .await - .map_err(|e| Error::UnableToRead(path.as_ref().to_path_buf(), e))?; + .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?; Ok(digest) } + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("unsupported file type at {0}: {1:?}")] + FileType(PathBuf, FileType), + + #[error("unable to stat {0}: {1}")] + Stat(PathBuf, std::io::Error), + + #[error("unable to open {0}: {1}")] + Open(PathBuf, std::io::Error), + + #[error("unable to read {0}: {1}")] + BlobRead(PathBuf, std::io::Error), + + // TODO: proper error for blob finalize + #[error("unable to finalize blob {0}: {1}")] + BlobFinalize(PathBuf, std::io::Error), +} diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index d2234846dcbc..bf21001822e0 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -26,7 +26,7 @@ use std::{ use tracing::instrument; mod error; -pub use error::Error; +pub use error::IngestionError; pub mod archive; pub mod fs; @@ -49,10 +49,14 @@ pub mod fs; /// /// On success, returns the root node. #[instrument(skip_all, ret(level = Level::TRACE), err)] -pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error> +pub async fn ingest_entries<DS, S, E>( + directory_service: DS, + mut entries: S, +) -> Result<Node, IngestionError<E>> where DS: AsRef<dyn DirectoryService>, - S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin, + S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin, + E: std::error::Error, { // For a given path, this holds the [Directory] structs as they are populated. let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); @@ -102,7 +106,10 @@ where maybe_directory_putter .get_or_insert_with(|| directory_service.as_ref().put_multiple_start()) .put(directory) - .await?; + .await + .map_err(|e| { + IngestionError::UploadDirectoryError(entry.path().to_path_buf(), e) + })?; Node::Directory(DirectoryNode { name, @@ -147,7 +154,10 @@ where // they're all persisted to the backend. if let Some(mut directory_putter) = maybe_directory_putter { #[cfg_attr(not(debug_assertions), allow(unused))] - let root_directory_digest = directory_putter.close().await?; + let root_directory_digest = directory_putter + .close() + .await + .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?; #[cfg(debug_assertions)] { |