use std::{collections::HashMap, path::PathBuf}; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::{DfsPostOrder, EdgeRef}; use petgraph::Direction; use tokio::io::AsyncRead; use tokio_stream::StreamExt; use tokio_tar::Archive; use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::import::{ingest_entries, Error, IngestionEntry}; use crate::proto::node::Node; /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and /// [`DirectoryService`]. #[instrument(skip_all, ret(level = Level::TRACE), err)] pub async fn ingest_archive<'a, BS, DS, R>( blob_service: BS, directory_service: DS, mut archive: Archive, ) -> Result where BS: AsRef + Clone, DS: AsRef, R: AsyncRead + Unpin, { // Since tarballs can have entries in any arbitrary order, we need to // buffer all of the directory metadata so we can reorder directory // contents and entries to meet the requires of the castore. // In the first phase, collect up all the regular files and symlinks. let mut nodes = IngestionEntryGraph::new(); let mut entries_iter = archive.entries().map_err(Error::Archive)?; while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::Archive)? { let path: PathBuf = entry.path().map_err(Error::Archive)?.into(); let entry = match entry.header().entry_type() { tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { // TODO: If the same path is overwritten in the tarball, we may leave // an unreferenced blob after uploading. let mut writer = blob_service.as_ref().open_write().await; let size = tokio::io::copy(&mut entry, &mut writer) .await .map_err(Error::Archive)?; let digest = writer.close().await.map_err(Error::Archive)?; IngestionEntry::Regular { path, size, executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0, digest, } } tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { target: entry .link_name() .map_err(Error::Archive)? .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? .into(), path, }, // Push a bogus directory marker so we can make sure this directoy gets // created. We don't know the digest and size until after reading the full // tarball. tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() }, tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), }; nodes.add(entry)?; } ingest_entries( directory_service, futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)), ) .await } /// Keep track of the directory structure of a file tree being ingested. This is used /// for ingestion sources which do not provide any ordering or uniqueness guarantees /// like tarballs. /// /// If we ingest multiple entries with the same paths and both entries are not directories, /// the newer entry will replace the latter entry, disconnecting the old node's children /// from the graph. /// /// Once all nodes are ingested a call to [IngestionEntryGraph::finalize] will return /// a list of entries compute by performaing a DFS post order traversal of the graph /// from the top-level directory entry. /// /// This expects the directory structure to contain a single top-level directory entry. /// An error is returned if this is not the case and ingestion will fail. struct IngestionEntryGraph { graph: DiGraph, path_to_index: HashMap, root_node: Option, } impl Default for IngestionEntryGraph { fn default() -> Self { Self::new() } } impl IngestionEntryGraph { /// Creates a new ingestion entry graph. pub fn new() -> Self { IngestionEntryGraph { graph: DiGraph::new(), path_to_index: HashMap::new(), root_node: None, } } /// Adds a new entry to the graph. Parent directories are automatically inserted. /// If a node exists in the graph with the same name as the new entry and both the old /// and new nodes are not directories, the node is replaced and is disconnected from its /// children. pub fn add(&mut self, entry: IngestionEntry) -> Result { let path = entry.path().to_path_buf(); let index = match self.path_to_index.get(entry.path()) { Some(&index) => { // If either the old entry or new entry are not directories, we'll replace the old // entry. if !entry.is_dir() || !self.get_node(index).is_dir() { self.replace_node(index, entry); } index } None => self.graph.add_node(entry), }; // A path with 1 component is the root node if path.components().count() == 1 { // We expect archives to contain a single root node, if there is another root node // entry with a different path name, this is unsupported. if let Some(root_node) = self.root_node { if self.get_node(root_node).path() != path { return Err(Error::UnexpectedNumberOfTopLevelEntries); } } self.root_node = Some(index) } else if let Some(parent_path) = path.parent() { // Recursively add the parent node until it hits the root node. let parent_index = self.add(IngestionEntry::Dir { path: parent_path.to_path_buf(), })?; // Insert an edge from the parent directory to the child entry. self.graph.add_edge(parent_index, index, ()); } self.path_to_index.insert(path, index); Ok(index) } /// Traverses the graph in DFS post order and collects the entries into a [Vec]. /// /// Unreachable parts of the graph are not included in the result. pub fn finalize(self) -> Result, Error> { // There must be a root node. let Some(root_node_index) = self.root_node else { return Err(Error::UnexpectedNumberOfTopLevelEntries); }; // The root node must be a directory. if !self.get_node(root_node_index).is_dir() { return Err(Error::UnexpectedNumberOfTopLevelEntries); } let mut traversal = DfsPostOrder::new(&self.graph, root_node_index); let mut nodes = Vec::with_capacity(self.graph.node_count()); while let Some(node_index) = traversal.next(&self.graph) { nodes.push(self.get_node(node_index).clone()); } Ok(nodes) } /// Replaces the node with the specified entry. The node's children are disconnected. /// /// This should never be called if both the old and new nodes are directories. fn replace_node(&mut self, index: NodeIndex, new_entry: IngestionEntry) { let entry = self .graph .node_weight_mut(index) .expect("Tvix bug: missing node entry"); debug_assert!(!(entry.is_dir() && new_entry.is_dir())); // Replace the node itself. warn!( "saw duplicate entry in archive at path {:?}. old: {:?} new: {:?}", entry.path(), &entry, &new_entry ); *entry = new_entry; // Remove any outgoing edges to disconnect the old node's children. let edges = self .graph .edges_directed(index, Direction::Outgoing) .map(|edge| edge.id()) .collect::>(); for edge in edges { self.graph.remove_edge(edge); } } fn get_node(&self, index: NodeIndex) -> &IngestionEntry { self.graph .node_weight(index) .expect("Tvix bug: missing node entry") } } #[cfg(test)] mod test { use crate::import::IngestionEntry; use crate::B3Digest; use super::{Error, IngestionEntryGraph}; use lazy_static::lazy_static; use rstest::rstest; lazy_static! { pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into(); pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() }; pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() }; pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() }; pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular { path: "a".into(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular { path: "a/b".into(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular { path: "a/b/c".into(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; } #[rstest] #[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])] #[case::explicit_directories(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])] #[case::inaccesible_tree(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B], &[&*FILE_A_B, &*DIR_A])] fn node_ingestion_success( #[case] in_entries: &[&IngestionEntry], #[case] exp_entries: &[&IngestionEntry], ) { let mut nodes = IngestionEntryGraph::new(); for entry in in_entries { nodes.add((*entry).clone()).expect("failed to add entry"); } let entries = nodes.finalize().expect("invalid entries"); let exp_entries: Vec = exp_entries.iter().map(|entry| (*entry).clone()).collect(); assert_eq!(entries, exp_entries); } #[rstest] #[case::no_top_level_entries(&[], Error::UnexpectedNumberOfTopLevelEntries)] #[case::multiple_top_level_dirs(&[&*DIR_A, &*DIR_B], Error::UnexpectedNumberOfTopLevelEntries)] #[case::top_level_file_entry(&[&*FILE_A], Error::UnexpectedNumberOfTopLevelEntries)] fn node_ingestion_error(#[case] in_entries: &[&IngestionEntry], #[case] exp_error: Error) { let mut nodes = IngestionEntryGraph::new(); let result = (|| { for entry in in_entries { nodes.add((*entry).clone())?; } nodes.finalize() })(); let error = result.expect_err("expected error"); assert_eq!(error.to_string(), exp_error.to_string()); } }