diff options
Diffstat (limited to 'tvix/castore/src/import')
-rw-r--r-- | tvix/castore/src/import/archive.rs | 201 | ||||
-rw-r--r-- | tvix/castore/src/import/error.rs | 11 | ||||
-rw-r--r-- | tvix/castore/src/import/mod.rs | 2 |
3 files changed, 212 insertions, 2 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs new file mode 100644 index 000000000000..d0ae3c67411c --- /dev/null +++ b/tvix/castore/src/import/archive.rs @@ -0,0 +1,201 @@ +#[cfg(target_family = "unix")] +use std::os::unix::ffi::OsStrExt; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, +}; + +use tokio::io::AsyncRead; +use tokio_stream::StreamExt; +use tokio_tar::Archive; +use tracing::{instrument, Level}; + +use crate::{ + blobservice::BlobService, + directoryservice::{DirectoryPutter, DirectoryService}, + import::Error, + proto::{node::Node, Directory, DirectoryNode, FileNode, SymlinkNode}, +}; + +/// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and +/// [`DirectoryService`]. +#[instrument(skip_all, ret(level = Level::TRACE), err)] +pub async fn ingest_archive<'a, BS, DS, R>( + blob_service: BS, + directory_service: DS, + mut archive: Archive<R>, +) -> Result<Node, Error> +where + BS: AsRef<dyn BlobService> + Clone, + DS: AsRef<dyn DirectoryService>, + R: AsyncRead + Unpin, +{ + // Since tarballs can have entries in any arbitrary order, we need to + // buffer all of the directory metadata so we can reorder directory + // contents and entries to meet the requires of the castore. + + // In the first phase, collect up all the regular files and symlinks. + let mut paths = HashMap::new(); + let mut entries = archive.entries().map_err(Error::Archive)?; + while let Some(mut entry) = entries.try_next().await.map_err(Error::Archive)? { + let path = entry.path().map_err(Error::Archive)?.into_owned(); + let name = path + .file_name() + .ok_or_else(|| { + Error::Archive(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "invalid filename in archive", + )) + })? + .as_bytes() + .to_vec() + .into(); + + let node = match entry.header().entry_type() { + tokio_tar::EntryType::Regular + | tokio_tar::EntryType::GNUSparse + | tokio_tar::EntryType::Continuous => { + // TODO: If the same path is overwritten in the tarball, we may leave + // an unreferenced blob after uploading. + let mut writer = blob_service.as_ref().open_write().await; + let size = tokio::io::copy(&mut entry, &mut writer) + .await + .map_err(Error::Archive)?; + let digest = writer.close().await.map_err(Error::Archive)?; + Node::File(FileNode { + name, + digest: digest.into(), + size, + executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0, + }) + } + tokio_tar::EntryType::Symlink => Node::Symlink(SymlinkNode { + name, + target: entry + .link_name() + .map_err(Error::Archive)? + .expect("symlink missing target") + .as_os_str() + .as_bytes() + .to_vec() + .into(), + }), + // Push a bogus directory marker so we can make sure this directoy gets + // created. We don't know the digest and size until after reading the full + // tarball. + tokio_tar::EntryType::Directory => Node::Directory(DirectoryNode { + name, + digest: Default::default(), + size: 0, + }), + + tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, + + entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), + }; + + paths.insert(path, node); + } + + // In the second phase, construct all of the directories. + + // Collect into a list and then sort so all entries in the same directory + // are next to each other. + // We can detect boundaries between each directories to determine + // when to construct or push directory entries. + let mut ordered_paths = paths.into_iter().collect::<Vec<_>>(); + ordered_paths.sort_by(|a, b| a.0.cmp(&b.0)); + + let mut directory_putter = directory_service.as_ref().put_multiple_start(); + + // Start with an initial directory at the root. + let mut dir_stack = vec![(PathBuf::from(""), Directory::default())]; + + async fn pop_directory( + dir_stack: &mut Vec<(PathBuf, Directory)>, + directory_putter: &mut Box<dyn DirectoryPutter>, + ) -> Result<DirectoryNode, Error> { + let (path, directory) = dir_stack.pop().unwrap(); + + directory + .validate() + .map_err(|e| Error::InvalidDirectory(path.to_path_buf(), e))?; + + let dir_node = DirectoryNode { + name: path + .file_name() + .unwrap_or_default() + .as_bytes() + .to_vec() + .into(), + digest: directory.digest().into(), + size: directory.size(), + }; + + if let Some((_, parent)) = dir_stack.last_mut() { + parent.directories.push(dir_node.clone()); + } + + directory_putter.put(directory).await?; + + Ok(dir_node) + } + + fn push_directories(path: &Path, dir_stack: &mut Vec<(PathBuf, Directory)>) { + if path == dir_stack.last().unwrap().0 { + return; + } + if let Some(parent) = path.parent() { + push_directories(parent, dir_stack); + } + dir_stack.push((path.to_path_buf(), Directory::default())); + } + + for (path, node) in ordered_paths.into_iter() { + // Pop stack until the top dir is an ancestor of this entry. + loop { + let top = dir_stack.last().unwrap(); + if path.ancestors().any(|ancestor| ancestor == top.0) { + break; + } + + pop_directory(&mut dir_stack, &mut directory_putter).await?; + } + + // For directories, just ensure the directory node exists. + if let Node::Directory(_) = node { + push_directories(&path, &mut dir_stack); + continue; + } + + // Push all ancestor directories onto the stack. + push_directories(path.parent().unwrap(), &mut dir_stack); + + let top = dir_stack.last_mut().unwrap(); + debug_assert_eq!(Some(top.0.as_path()), path.parent()); + + match node { + Node::File(n) => top.1.files.push(n), + Node::Symlink(n) => top.1.symlinks.push(n), + // We already handled directories above. + Node::Directory(_) => unreachable!(), + } + } + + let mut root_node = None; + while !dir_stack.is_empty() { + // If the root directory only has 1 directory entry, we return the child entry + // instead... weeeee + if dir_stack.len() == 1 && dir_stack.last().unwrap().1.directories.len() == 1 { + break; + } + root_node = Some(pop_directory(&mut dir_stack, &mut directory_putter).await?); + } + let root_node = root_node.expect("no root node"); + + let root_digest = directory_putter.close().await?; + + debug_assert_eq!(root_digest.as_slice(), &root_node.digest); + + Ok(Node::Directory(root_node)) +} diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs index 15dd0664deaa..18c71aa235b8 100644 --- a/tvix/castore/src/import/error.rs +++ b/tvix/castore/src/import/error.rs @@ -1,6 +1,6 @@ use std::{fs::FileType, path::PathBuf}; -use crate::Error as CastoreError; +use crate::{proto::ValidateDirectoryError, Error as CastoreError}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -19,8 +19,17 @@ pub enum Error { #[error("unable to read {0}: {1}")] UnableToRead(PathBuf, std::io::Error), + #[error("error reading from archive: {0}")] + Archive(std::io::Error), + #[error("unsupported file {0} type: {1:?}")] UnsupportedFileType(PathBuf, FileType), + + #[error("invalid directory contents {0}: {1}")] + InvalidDirectory(PathBuf, ValidateDirectoryError), + + #[error("unsupported tar entry {0} type: {1:?}")] + UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), } impl From<CastoreError> for Error { diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index ff27c0fcfd2b..c5887685bbdb 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -1,4 +1,3 @@ -//! Deals with ingesting contents into castore. //! The main library function here is [ingest_entries], receiving a stream of //! [IngestionEntry]. //! @@ -32,6 +31,7 @@ use tracing::instrument; mod error; pub use error::Error; +pub mod archive; pub mod fs; /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService]. |