diff options
-rw-r--r-- | tvix/castore/src/import/archive.rs | 351 | ||||
-rw-r--r-- | tvix/castore/src/import/error.rs | 11 | ||||
-rw-r--r-- | tvix/castore/src/import/mod.rs | 5 |
3 files changed, 239 insertions, 128 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index d0ae3c67411c..eab695d6507b 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -1,21 +1,17 @@ -#[cfg(target_family = "unix")] -use std::os::unix::ffi::OsStrExt; -use std::{ - collections::HashMap, - path::{Path, PathBuf}, -}; +use std::{collections::HashMap, path::PathBuf}; +use petgraph::graph::{DiGraph, NodeIndex}; +use petgraph::visit::{DfsPostOrder, EdgeRef}; +use petgraph::Direction; use tokio::io::AsyncRead; use tokio_stream::StreamExt; use tokio_tar::Archive; -use tracing::{instrument, Level}; +use tracing::{instrument, warn, Level}; -use crate::{ - blobservice::BlobService, - directoryservice::{DirectoryPutter, DirectoryService}, - import::Error, - proto::{node::Node, Directory, DirectoryNode, FileNode, SymlinkNode}, -}; +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryService; +use crate::import::{ingest_entries, Error, IngestionEntry}; +use crate::proto::node::Node; /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and /// [`DirectoryService`]. @@ -35,23 +31,13 @@ where // contents and entries to meet the requires of the castore. // In the first phase, collect up all the regular files and symlinks. - let mut paths = HashMap::new(); - let mut entries = archive.entries().map_err(Error::Archive)?; - while let Some(mut entry) = entries.try_next().await.map_err(Error::Archive)? { - let path = entry.path().map_err(Error::Archive)?.into_owned(); - let name = path - .file_name() - .ok_or_else(|| { - Error::Archive(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "invalid filename in archive", - )) - })? - .as_bytes() - .to_vec() - .into(); - - let node = match entry.header().entry_type() { + let mut nodes = IngestionEntryGraph::new(); + + let mut entries_iter = archive.entries().map_err(Error::Archive)?; + while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::Archive)? { + let path: PathBuf = entry.path().map_err(Error::Archive)?.into(); + + let entry = match entry.header().entry_type() { tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { @@ -62,140 +48,257 @@ where .await .map_err(Error::Archive)?; let digest = writer.close().await.map_err(Error::Archive)?; - Node::File(FileNode { - name, - digest: digest.into(), + + IngestionEntry::Regular { + path, size, executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0, - }) + digest, + } } - tokio_tar::EntryType::Symlink => Node::Symlink(SymlinkNode { - name, + tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { target: entry .link_name() .map_err(Error::Archive)? - .expect("symlink missing target") - .as_os_str() - .as_bytes() - .to_vec() + .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? .into(), - }), + path, + }, // Push a bogus directory marker so we can make sure this directoy gets // created. We don't know the digest and size until after reading the full // tarball. - tokio_tar::EntryType::Directory => Node::Directory(DirectoryNode { - name, - digest: Default::default(), - size: 0, - }), + tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() }, tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), }; - paths.insert(path, node); + nodes.add(entry)?; } - // In the second phase, construct all of the directories. - - // Collect into a list and then sort so all entries in the same directory - // are next to each other. - // We can detect boundaries between each directories to determine - // when to construct or push directory entries. - let mut ordered_paths = paths.into_iter().collect::<Vec<_>>(); - ordered_paths.sort_by(|a, b| a.0.cmp(&b.0)); - - let mut directory_putter = directory_service.as_ref().put_multiple_start(); - - // Start with an initial directory at the root. - let mut dir_stack = vec![(PathBuf::from(""), Directory::default())]; - - async fn pop_directory( - dir_stack: &mut Vec<(PathBuf, Directory)>, - directory_putter: &mut Box<dyn DirectoryPutter>, - ) -> Result<DirectoryNode, Error> { - let (path, directory) = dir_stack.pop().unwrap(); - - directory - .validate() - .map_err(|e| Error::InvalidDirectory(path.to_path_buf(), e))?; - - let dir_node = DirectoryNode { - name: path - .file_name() - .unwrap_or_default() - .as_bytes() - .to_vec() - .into(), - digest: directory.digest().into(), - size: directory.size(), + ingest_entries( + directory_service, + futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)), + ) + .await +} + +/// Keep track of the directory structure of a file tree being ingested. This is used +/// for ingestion sources which do not provide any ordering or uniqueness guarantees +/// like tarballs. +/// +/// If we ingest multiple entries with the same paths and both entries are not directories, +/// the newer entry will replace the latter entry, disconnecting the old node's children +/// from the graph. +/// +/// Once all nodes are ingested a call to [IngestionEntryGraph::finalize] will return +/// a list of entries compute by performaing a DFS post order traversal of the graph +/// from the top-level directory entry. +/// +/// This expects the directory structure to contain a single top-level directory entry. +/// An error is returned if this is not the case and ingestion will fail. +struct IngestionEntryGraph { + graph: DiGraph<IngestionEntry, ()>, + path_to_index: HashMap<PathBuf, NodeIndex>, + root_node: Option<NodeIndex>, +} + +impl Default for IngestionEntryGraph { + fn default() -> Self { + Self::new() + } +} + +impl IngestionEntryGraph { + /// Creates a new ingestion entry graph. + pub fn new() -> Self { + IngestionEntryGraph { + graph: DiGraph::new(), + path_to_index: HashMap::new(), + root_node: None, + } + } + + /// Adds a new entry to the graph. Parent directories are automatically inserted. + /// If a node exists in the graph with the same name as the new entry and both the old + /// and new nodes are not directories, the node is replaced and is disconnected from its + /// children. + pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> { + let path = entry.path().to_path_buf(); + + let index = match self.path_to_index.get(entry.path()) { + Some(&index) => { + // If either the old entry or new entry are not directories, we'll replace the old + // entry. + if !entry.is_dir() || !self.get_node(index).is_dir() { + self.replace_node(index, entry); + } + + index + } + None => self.graph.add_node(entry), }; - if let Some((_, parent)) = dir_stack.last_mut() { - parent.directories.push(dir_node.clone()); + // A path with 1 component is the root node + if path.components().count() == 1 { + // We expect archives to contain a single root node, if there is another root node + // entry with a different path name, this is unsupported. + if let Some(root_node) = self.root_node { + if self.get_node(root_node).path() != path { + return Err(Error::UnexpectedNumberOfTopLevelEntries); + } + } + + self.root_node = Some(index) + } else if let Some(parent_path) = path.parent() { + // Recursively add the parent node until it hits the root node. + let parent_index = self.add(IngestionEntry::Dir { + path: parent_path.to_path_buf(), + })?; + + // Insert an edge from the parent directory to the child entry. + self.graph.add_edge(parent_index, index, ()); } - directory_putter.put(directory).await?; + self.path_to_index.insert(path, index); - Ok(dir_node) + Ok(index) } - fn push_directories(path: &Path, dir_stack: &mut Vec<(PathBuf, Directory)>) { - if path == dir_stack.last().unwrap().0 { - return; + /// Traverses the graph in DFS post order and collects the entries into a [Vec<IngestionEntry>]. + /// + /// Unreachable parts of the graph are not included in the result. + pub fn finalize(self) -> Result<Vec<IngestionEntry>, Error> { + // There must be a root node. + let Some(root_node_index) = self.root_node else { + return Err(Error::UnexpectedNumberOfTopLevelEntries); + }; + + // The root node must be a directory. + if !self.get_node(root_node_index).is_dir() { + return Err(Error::UnexpectedNumberOfTopLevelEntries); } - if let Some(parent) = path.parent() { - push_directories(parent, dir_stack); + + let mut traversal = DfsPostOrder::new(&self.graph, root_node_index); + let mut nodes = Vec::with_capacity(self.graph.node_count()); + while let Some(node_index) = traversal.next(&self.graph) { + nodes.push(self.get_node(node_index).clone()); } - dir_stack.push((path.to_path_buf(), Directory::default())); + + Ok(nodes) } - for (path, node) in ordered_paths.into_iter() { - // Pop stack until the top dir is an ancestor of this entry. - loop { - let top = dir_stack.last().unwrap(); - if path.ancestors().any(|ancestor| ancestor == top.0) { - break; - } + /// Replaces the node with the specified entry. The node's children are disconnected. + /// + /// This should never be called if both the old and new nodes are directories. + fn replace_node(&mut self, index: NodeIndex, new_entry: IngestionEntry) { + let entry = self + .graph + .node_weight_mut(index) + .expect("Tvix bug: missing node entry"); - pop_directory(&mut dir_stack, &mut directory_putter).await?; - } + debug_assert!(!(entry.is_dir() && new_entry.is_dir())); + + // Replace the node itself. + warn!( + "saw duplicate entry in archive at path {:?}. old: {:?} new: {:?}", + entry.path(), + &entry, + &new_entry + ); + *entry = new_entry; - // For directories, just ensure the directory node exists. - if let Node::Directory(_) = node { - push_directories(&path, &mut dir_stack); - continue; + // Remove any outgoing edges to disconnect the old node's children. + let edges = self + .graph + .edges_directed(index, Direction::Outgoing) + .map(|edge| edge.id()) + .collect::<Vec<_>>(); + for edge in edges { + self.graph.remove_edge(edge); } + } - // Push all ancestor directories onto the stack. - push_directories(path.parent().unwrap(), &mut dir_stack); + fn get_node(&self, index: NodeIndex) -> &IngestionEntry { + self.graph + .node_weight(index) + .expect("Tvix bug: missing node entry") + } +} - let top = dir_stack.last_mut().unwrap(); - debug_assert_eq!(Some(top.0.as_path()), path.parent()); +#[cfg(test)] +mod test { + use crate::import::IngestionEntry; + use crate::B3Digest; - match node { - Node::File(n) => top.1.files.push(n), - Node::Symlink(n) => top.1.symlinks.push(n), - // We already handled directories above. - Node::Directory(_) => unreachable!(), - } + use super::{Error, IngestionEntryGraph}; + + use lazy_static::lazy_static; + use rstest::rstest; + + lazy_static! { + pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into(); + pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() }; + pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() }; + pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() }; + pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular { + path: "a".into(), + size: 0, + executable: false, + digest: EMPTY_DIGEST.clone(), + }; + pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular { + path: "a/b".into(), + size: 0, + executable: false, + digest: EMPTY_DIGEST.clone(), + }; + pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular { + path: "a/b/c".into(), + size: 0, + executable: false, + digest: EMPTY_DIGEST.clone(), + }; } - let mut root_node = None; - while !dir_stack.is_empty() { - // If the root directory only has 1 directory entry, we return the child entry - // instead... weeeee - if dir_stack.len() == 1 && dir_stack.last().unwrap().1.directories.len() == 1 { - break; + #[rstest] + #[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])] + #[case::explicit_directories(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])] + #[case::inaccesible_tree(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B], &[&*FILE_A_B, &*DIR_A])] + fn node_ingestion_success( + #[case] in_entries: &[&IngestionEntry], + #[case] exp_entries: &[&IngestionEntry], + ) { + let mut nodes = IngestionEntryGraph::new(); + + for entry in in_entries { + nodes.add((*entry).clone()).expect("failed to add entry"); } - root_node = Some(pop_directory(&mut dir_stack, &mut directory_putter).await?); + + let entries = nodes.finalize().expect("invalid entries"); + + let exp_entries: Vec<IngestionEntry> = + exp_entries.iter().map(|entry| (*entry).clone()).collect(); + + assert_eq!(entries, exp_entries); } - let root_node = root_node.expect("no root node"); - let root_digest = directory_putter.close().await?; + #[rstest] + #[case::no_top_level_entries(&[], Error::UnexpectedNumberOfTopLevelEntries)] + #[case::multiple_top_level_dirs(&[&*DIR_A, &*DIR_B], Error::UnexpectedNumberOfTopLevelEntries)] + #[case::top_level_file_entry(&[&*FILE_A], Error::UnexpectedNumberOfTopLevelEntries)] + fn node_ingestion_error(#[case] in_entries: &[&IngestionEntry], #[case] exp_error: Error) { + let mut nodes = IngestionEntryGraph::new(); - debug_assert_eq!(root_digest.as_slice(), &root_node.digest); + let result = (|| { + for entry in in_entries { + nodes.add((*entry).clone())?; + } + nodes.finalize() + })(); - Ok(Node::Directory(root_node)) + let error = result.expect_err("expected error"); + assert_eq!(error.to_string(), exp_error.to_string()); + } } diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs index 18c71aa235b8..8cd4f95ffb52 100644 --- a/tvix/castore/src/import/error.rs +++ b/tvix/castore/src/import/error.rs @@ -1,6 +1,6 @@ use std::{fs::FileType, path::PathBuf}; -use crate::{proto::ValidateDirectoryError, Error as CastoreError}; +use crate::Error as CastoreError; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -25,11 +25,14 @@ pub enum Error { #[error("unsupported file {0} type: {1:?}")] UnsupportedFileType(PathBuf, FileType), - #[error("invalid directory contents {0}: {1}")] - InvalidDirectory(PathBuf, ValidateDirectoryError), - #[error("unsupported tar entry {0} type: {1:?}")] UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), + + #[error("symlink missing target {0}")] + MissingSymlinkTarget(PathBuf), + + #[error("unexpected number of top level directory entries")] + UnexpectedNumberOfTopLevelEntries, } impl From<CastoreError> for Error { diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index 09d5b8d06ea1..e9fdc750f8c1 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -199,6 +199,7 @@ where Ok(digest) } +#[derive(Debug, Clone, Eq, PartialEq)] pub enum IngestionEntry { Regular { path: PathBuf, @@ -228,4 +229,8 @@ impl IngestionEntry { IngestionEntry::Unknown { path, .. } => path, } } + + fn is_dir(&self) -> bool { + matches!(self, IngestionEntry::Dir { .. }) + } } |