about summary refs log tree commit diff
path: root/tvix/castore/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src')
-rw-r--r--tvix/castore/src/import/archive.rs351
-rw-r--r--tvix/castore/src/import/error.rs11
-rw-r--r--tvix/castore/src/import/mod.rs5
3 files changed, 239 insertions, 128 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index d0ae3c67411c..eab695d6507b 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -1,21 +1,17 @@
-#[cfg(target_family = "unix")]
-use std::os::unix::ffi::OsStrExt;
-use std::{
-    collections::HashMap,
-    path::{Path, PathBuf},
-};
+use std::{collections::HashMap, path::PathBuf};
 
+use petgraph::graph::{DiGraph, NodeIndex};
+use petgraph::visit::{DfsPostOrder, EdgeRef};
+use petgraph::Direction;
 use tokio::io::AsyncRead;
 use tokio_stream::StreamExt;
 use tokio_tar::Archive;
-use tracing::{instrument, Level};
+use tracing::{instrument, warn, Level};
 
-use crate::{
-    blobservice::BlobService,
-    directoryservice::{DirectoryPutter, DirectoryService},
-    import::Error,
-    proto::{node::Node, Directory, DirectoryNode, FileNode, SymlinkNode},
-};
+use crate::blobservice::BlobService;
+use crate::directoryservice::DirectoryService;
+use crate::import::{ingest_entries, Error, IngestionEntry};
+use crate::proto::node::Node;
 
 /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
 /// [`DirectoryService`].
@@ -35,23 +31,13 @@ where
     // contents and entries to meet the requires of the castore.
 
     // In the first phase, collect up all the regular files and symlinks.
-    let mut paths = HashMap::new();
-    let mut entries = archive.entries().map_err(Error::Archive)?;
-    while let Some(mut entry) = entries.try_next().await.map_err(Error::Archive)? {
-        let path = entry.path().map_err(Error::Archive)?.into_owned();
-        let name = path
-            .file_name()
-            .ok_or_else(|| {
-                Error::Archive(std::io::Error::new(
-                    std::io::ErrorKind::InvalidInput,
-                    "invalid filename in archive",
-                ))
-            })?
-            .as_bytes()
-            .to_vec()
-            .into();
-
-        let node = match entry.header().entry_type() {
+    let mut nodes = IngestionEntryGraph::new();
+
+    let mut entries_iter = archive.entries().map_err(Error::Archive)?;
+    while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::Archive)? {
+        let path: PathBuf = entry.path().map_err(Error::Archive)?.into();
+
+        let entry = match entry.header().entry_type() {
             tokio_tar::EntryType::Regular
             | tokio_tar::EntryType::GNUSparse
             | tokio_tar::EntryType::Continuous => {
@@ -62,140 +48,257 @@ where
                     .await
                     .map_err(Error::Archive)?;
                 let digest = writer.close().await.map_err(Error::Archive)?;
-                Node::File(FileNode {
-                    name,
-                    digest: digest.into(),
+
+                IngestionEntry::Regular {
+                    path,
                     size,
                     executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0,
-                })
+                    digest,
+                }
             }
-            tokio_tar::EntryType::Symlink => Node::Symlink(SymlinkNode {
-                name,
+            tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
                 target: entry
                     .link_name()
                     .map_err(Error::Archive)?
-                    .expect("symlink missing target")
-                    .as_os_str()
-                    .as_bytes()
-                    .to_vec()
+                    .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
                     .into(),
-            }),
+                path,
+            },
             // Push a bogus directory marker so we can make sure this directoy gets
             // created. We don't know the digest and size until after reading the full
             // tarball.
-            tokio_tar::EntryType::Directory => Node::Directory(DirectoryNode {
-                name,
-                digest: Default::default(),
-                size: 0,
-            }),
+            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() },
 
             tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
 
             entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)),
         };
 
-        paths.insert(path, node);
+        nodes.add(entry)?;
     }
 
-    // In the second phase, construct all of the directories.
-
-    // Collect into a list and then sort so all entries in the same directory
-    // are next to each other.
-    // We can detect boundaries between each directories to determine
-    // when to construct or push directory entries.
-    let mut ordered_paths = paths.into_iter().collect::<Vec<_>>();
-    ordered_paths.sort_by(|a, b| a.0.cmp(&b.0));
-
-    let mut directory_putter = directory_service.as_ref().put_multiple_start();
-
-    // Start with an initial directory at the root.
-    let mut dir_stack = vec![(PathBuf::from(""), Directory::default())];
-
-    async fn pop_directory(
-        dir_stack: &mut Vec<(PathBuf, Directory)>,
-        directory_putter: &mut Box<dyn DirectoryPutter>,
-    ) -> Result<DirectoryNode, Error> {
-        let (path, directory) = dir_stack.pop().unwrap();
-
-        directory
-            .validate()
-            .map_err(|e| Error::InvalidDirectory(path.to_path_buf(), e))?;
-
-        let dir_node = DirectoryNode {
-            name: path
-                .file_name()
-                .unwrap_or_default()
-                .as_bytes()
-                .to_vec()
-                .into(),
-            digest: directory.digest().into(),
-            size: directory.size(),
+    ingest_entries(
+        directory_service,
+        futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)),
+    )
+    .await
+}
+
+/// Keep track of the directory structure of a file tree being ingested. This is used
+/// for ingestion sources which do not provide any ordering or uniqueness guarantees
+/// like tarballs.
+///
+/// If we ingest multiple entries with the same paths and both entries are not directories,
+/// the newer entry will replace the latter entry, disconnecting the old node's children
+/// from the graph.
+///
+/// Once all nodes are ingested a call to [IngestionEntryGraph::finalize] will return
+/// a list of entries compute by performaing a DFS post order traversal of the graph
+/// from the top-level directory entry.
+///
+/// This expects the directory structure to contain a single top-level directory entry.
+/// An error is returned if this is not the case and ingestion will fail.
+struct IngestionEntryGraph {
+    graph: DiGraph<IngestionEntry, ()>,
+    path_to_index: HashMap<PathBuf, NodeIndex>,
+    root_node: Option<NodeIndex>,
+}
+
+impl Default for IngestionEntryGraph {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl IngestionEntryGraph {
+    /// Creates a new ingestion entry graph.
+    pub fn new() -> Self {
+        IngestionEntryGraph {
+            graph: DiGraph::new(),
+            path_to_index: HashMap::new(),
+            root_node: None,
+        }
+    }
+
+    /// Adds a new entry to the graph. Parent directories are automatically inserted.
+    /// If a node exists in the graph with the same name as the new entry and both the old
+    /// and new nodes are not directories, the node is replaced and is disconnected from its
+    /// children.
+    pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> {
+        let path = entry.path().to_path_buf();
+
+        let index = match self.path_to_index.get(entry.path()) {
+            Some(&index) => {
+                // If either the old entry or new entry are not directories, we'll replace the old
+                // entry.
+                if !entry.is_dir() || !self.get_node(index).is_dir() {
+                    self.replace_node(index, entry);
+                }
+
+                index
+            }
+            None => self.graph.add_node(entry),
         };
 
-        if let Some((_, parent)) = dir_stack.last_mut() {
-            parent.directories.push(dir_node.clone());
+        // A path with 1 component is the root node
+        if path.components().count() == 1 {
+            // We expect archives to contain a single root node, if there is another root node
+            // entry with a different path name, this is unsupported.
+            if let Some(root_node) = self.root_node {
+                if self.get_node(root_node).path() != path {
+                    return Err(Error::UnexpectedNumberOfTopLevelEntries);
+                }
+            }
+
+            self.root_node = Some(index)
+        } else if let Some(parent_path) = path.parent() {
+            // Recursively add the parent node until it hits the root node.
+            let parent_index = self.add(IngestionEntry::Dir {
+                path: parent_path.to_path_buf(),
+            })?;
+
+            // Insert an edge from the parent directory to the child entry.
+            self.graph.add_edge(parent_index, index, ());
         }
 
-        directory_putter.put(directory).await?;
+        self.path_to_index.insert(path, index);
 
-        Ok(dir_node)
+        Ok(index)
     }
 
-    fn push_directories(path: &Path, dir_stack: &mut Vec<(PathBuf, Directory)>) {
-        if path == dir_stack.last().unwrap().0 {
-            return;
+    /// Traverses the graph in DFS post order and collects the entries into a [Vec<IngestionEntry>].
+    ///
+    /// Unreachable parts of the graph are not included in the result.
+    pub fn finalize(self) -> Result<Vec<IngestionEntry>, Error> {
+        // There must be a root node.
+        let Some(root_node_index) = self.root_node else {
+            return Err(Error::UnexpectedNumberOfTopLevelEntries);
+        };
+
+        // The root node must be a directory.
+        if !self.get_node(root_node_index).is_dir() {
+            return Err(Error::UnexpectedNumberOfTopLevelEntries);
         }
-        if let Some(parent) = path.parent() {
-            push_directories(parent, dir_stack);
+
+        let mut traversal = DfsPostOrder::new(&self.graph, root_node_index);
+        let mut nodes = Vec::with_capacity(self.graph.node_count());
+        while let Some(node_index) = traversal.next(&self.graph) {
+            nodes.push(self.get_node(node_index).clone());
         }
-        dir_stack.push((path.to_path_buf(), Directory::default()));
+
+        Ok(nodes)
     }
 
-    for (path, node) in ordered_paths.into_iter() {
-        // Pop stack until the top dir is an ancestor of this entry.
-        loop {
-            let top = dir_stack.last().unwrap();
-            if path.ancestors().any(|ancestor| ancestor == top.0) {
-                break;
-            }
+    /// Replaces the node with the specified entry. The node's children are disconnected.
+    ///
+    /// This should never be called if both the old and new nodes are directories.
+    fn replace_node(&mut self, index: NodeIndex, new_entry: IngestionEntry) {
+        let entry = self
+            .graph
+            .node_weight_mut(index)
+            .expect("Tvix bug: missing node entry");
 
-            pop_directory(&mut dir_stack, &mut directory_putter).await?;
-        }
+        debug_assert!(!(entry.is_dir() && new_entry.is_dir()));
+
+        // Replace the node itself.
+        warn!(
+            "saw duplicate entry in archive at path {:?}. old: {:?} new: {:?}",
+            entry.path(),
+            &entry,
+            &new_entry
+        );
+        *entry = new_entry;
 
-        // For directories, just ensure the directory node exists.
-        if let Node::Directory(_) = node {
-            push_directories(&path, &mut dir_stack);
-            continue;
+        // Remove any outgoing edges to disconnect the old node's children.
+        let edges = self
+            .graph
+            .edges_directed(index, Direction::Outgoing)
+            .map(|edge| edge.id())
+            .collect::<Vec<_>>();
+        for edge in edges {
+            self.graph.remove_edge(edge);
         }
+    }
 
-        // Push all ancestor directories onto the stack.
-        push_directories(path.parent().unwrap(), &mut dir_stack);
+    fn get_node(&self, index: NodeIndex) -> &IngestionEntry {
+        self.graph
+            .node_weight(index)
+            .expect("Tvix bug: missing node entry")
+    }
+}
 
-        let top = dir_stack.last_mut().unwrap();
-        debug_assert_eq!(Some(top.0.as_path()), path.parent());
+#[cfg(test)]
+mod test {
+    use crate::import::IngestionEntry;
+    use crate::B3Digest;
 
-        match node {
-            Node::File(n) => top.1.files.push(n),
-            Node::Symlink(n) => top.1.symlinks.push(n),
-            // We already handled directories above.
-            Node::Directory(_) => unreachable!(),
-        }
+    use super::{Error, IngestionEntryGraph};
+
+    use lazy_static::lazy_static;
+    use rstest::rstest;
+
+    lazy_static! {
+        pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into();
+        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() };
+        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() };
+        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() };
+        pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular {
+            path: "a".into(),
+            size: 0,
+            executable: false,
+            digest: EMPTY_DIGEST.clone(),
+        };
+        pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular {
+            path: "a/b".into(),
+            size: 0,
+            executable: false,
+            digest: EMPTY_DIGEST.clone(),
+        };
+        pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular {
+            path: "a/b/c".into(),
+            size: 0,
+            executable: false,
+            digest: EMPTY_DIGEST.clone(),
+        };
     }
 
-    let mut root_node = None;
-    while !dir_stack.is_empty() {
-        // If the root directory only has 1 directory entry, we return the child entry
-        // instead... weeeee
-        if dir_stack.len() == 1 && dir_stack.last().unwrap().1.directories.len() == 1 {
-            break;
+    #[rstest]
+    #[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
+    #[case::explicit_directories(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
+    #[case::inaccesible_tree(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B], &[&*FILE_A_B, &*DIR_A])]
+    fn node_ingestion_success(
+        #[case] in_entries: &[&IngestionEntry],
+        #[case] exp_entries: &[&IngestionEntry],
+    ) {
+        let mut nodes = IngestionEntryGraph::new();
+
+        for entry in in_entries {
+            nodes.add((*entry).clone()).expect("failed to add entry");
         }
-        root_node = Some(pop_directory(&mut dir_stack, &mut directory_putter).await?);
+
+        let entries = nodes.finalize().expect("invalid entries");
+
+        let exp_entries: Vec<IngestionEntry> =
+            exp_entries.iter().map(|entry| (*entry).clone()).collect();
+
+        assert_eq!(entries, exp_entries);
     }
-    let root_node = root_node.expect("no root node");
 
-    let root_digest = directory_putter.close().await?;
+    #[rstest]
+    #[case::no_top_level_entries(&[], Error::UnexpectedNumberOfTopLevelEntries)]
+    #[case::multiple_top_level_dirs(&[&*DIR_A, &*DIR_B], Error::UnexpectedNumberOfTopLevelEntries)]
+    #[case::top_level_file_entry(&[&*FILE_A], Error::UnexpectedNumberOfTopLevelEntries)]
+    fn node_ingestion_error(#[case] in_entries: &[&IngestionEntry], #[case] exp_error: Error) {
+        let mut nodes = IngestionEntryGraph::new();
 
-    debug_assert_eq!(root_digest.as_slice(), &root_node.digest);
+        let result = (|| {
+            for entry in in_entries {
+                nodes.add((*entry).clone())?;
+            }
+            nodes.finalize()
+        })();
 
-    Ok(Node::Directory(root_node))
+        let error = result.expect_err("expected error");
+        assert_eq!(error.to_string(), exp_error.to_string());
+    }
 }
diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs
index 18c71aa235b8..8cd4f95ffb52 100644
--- a/tvix/castore/src/import/error.rs
+++ b/tvix/castore/src/import/error.rs
@@ -1,6 +1,6 @@
 use std::{fs::FileType, path::PathBuf};
 
-use crate::{proto::ValidateDirectoryError, Error as CastoreError};
+use crate::Error as CastoreError;
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
@@ -25,11 +25,14 @@ pub enum Error {
     #[error("unsupported file {0} type: {1:?}")]
     UnsupportedFileType(PathBuf, FileType),
 
-    #[error("invalid directory contents {0}: {1}")]
-    InvalidDirectory(PathBuf, ValidateDirectoryError),
-
     #[error("unsupported tar entry {0} type: {1:?}")]
     UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
+
+    #[error("symlink missing target {0}")]
+    MissingSymlinkTarget(PathBuf),
+
+    #[error("unexpected number of top level directory entries")]
+    UnexpectedNumberOfTopLevelEntries,
 }
 
 impl From<CastoreError> for Error {
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index 09d5b8d06ea1..e9fdc750f8c1 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -199,6 +199,7 @@ where
     Ok(digest)
 }
 
+#[derive(Debug, Clone, Eq, PartialEq)]
 pub enum IngestionEntry {
     Regular {
         path: PathBuf,
@@ -228,4 +229,8 @@ impl IngestionEntry {
             IngestionEntry::Unknown { path, .. } => path,
         }
     }
+
+    fn is_dir(&self) -> bool {
+        matches!(self, IngestionEntry::Dir { .. })
+    }
 }