about summary refs log tree commit diff
path: root/tvix/castore/src/import/archive.rs
//! Imports from an archive (tarballs)

use std::collections::HashMap;

use petgraph::graph::{DiGraph, NodeIndex};
use petgraph::visit::{DfsPostOrder, EdgeRef};
use petgraph::Direction;
use tokio::io::AsyncRead;
use tokio_stream::StreamExt;
use tokio_tar::Archive;
use tracing::{instrument, warn, Level};

use crate::blobservice::BlobService;
use crate::directoryservice::DirectoryService;
use crate::import::{ingest_entries, IngestionEntry, IngestionError};
use crate::Node;

use super::blobs::{self, ConcurrentBlobUploader};

type TarPathBuf = std::path::PathBuf;

#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error("unable to construct stream of entries: {0}")]
    Entries(std::io::Error),

    #[error("unable to read next entry: {0}")]
    NextEntry(std::io::Error),

    #[error("unable to read path for entry: {0}")]
    PathRead(std::io::Error),

    #[error("unable to convert path {0} for entry: {1}")]
    PathConvert(TarPathBuf, std::io::Error),

    #[error("unable to read size field for {0}: {1}")]
    Size(TarPathBuf, std::io::Error),

    #[error("unable to read mode field for {0}: {1}")]
    Mode(TarPathBuf, std::io::Error),

    #[error("unable to read link name field for {0}: {1}")]
    LinkName(TarPathBuf, std::io::Error),

    #[error("unsupported tar entry {0} type: {1:?}")]
    EntryType(TarPathBuf, tokio_tar::EntryType),

    #[error("symlink missing target {0}")]
    MissingSymlinkTarget(TarPathBuf),

    #[error("unexpected number of top level directory entries")]
    UnexpectedNumberOfTopLevelEntries,

    #[error(transparent)]
    BlobUploadError(#[from] blobs::Error),
}

/// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
/// [`DirectoryService`].
#[instrument(skip_all, ret(level = Level::TRACE), err)]
pub async fn ingest_archive<BS, DS, R>(
    blob_service: BS,
    directory_service: DS,
    mut archive: Archive<R>,
) -> Result<Node, IngestionError<Error>>
where
    BS: BlobService + Clone + 'static,
    DS: DirectoryService,
    R: AsyncRead + Unpin,
{
    // Since tarballs can have entries in any arbitrary order, we need to
    // buffer all of the directory metadata so we can reorder directory
    // contents and entries to meet the requires of the castore.

    // In the first phase, collect up all the regular files and symlinks.
    let mut nodes = IngestionEntryGraph::new();

    let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);

    let mut entries_iter = archive.entries().map_err(Error::Entries)?;
    while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
        let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into();

        // construct a castore PathBuf, which we use in the produced IngestionEntry.
        let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true)
            .map_err(|e| Error::PathConvert(tar_path.clone(), e))?;

        let header = entry.header();
        let entry = match header.entry_type() {
            tokio_tar::EntryType::Regular
            | tokio_tar::EntryType::GNUSparse
            | tokio_tar::EntryType::Continuous => {
                let size = header
                    .size()
                    .map_err(|e| Error::Size(tar_path.clone(), e))?;

                let digest = blob_uploader
                    .upload(&path, size, &mut entry)
                    .await
                    .map_err(Error::BlobUploadError)?;

                let executable = entry
                    .header()
                    .mode()
                    .map_err(|e| Error::Mode(tar_path, e))?
                    & 64
                    != 0;

                IngestionEntry::Regular {
                    path,
                    size,
                    executable,
                    digest,
                }
            }
            tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
                target: entry
                    .link_name()
                    .map_err(|e| Error::LinkName(tar_path.clone(), e))?
                    .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))?
                    .into_owned()
                    .into_os_string()
                    .into_encoded_bytes(),
                path,
            },
            // Push a bogus directory marker so we can make sure this directoy gets
            // created. We don't know the digest and size until after reading the full
            // tarball.
            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path },

            tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,

            entry_type => return Err(Error::EntryType(tar_path, entry_type).into()),
        };

        nodes.add(entry)?;
    }

    blob_uploader.join().await.map_err(Error::BlobUploadError)?;

    let root_node = ingest_entries(
        directory_service,
        futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)),
    )
    .await?;

    Ok(root_node)
}

/// Keep track of the directory structure of a file tree being ingested. This is used
/// for ingestion sources which do not provide any ordering or uniqueness guarantees
/// like tarballs.
///
/// If we ingest multiple entries with the same paths and both entries are not directories,
/// the newer entry will replace the latter entry, disconnecting the old node's children
/// from the graph.
///
/// Once all nodes are ingested a call to [IngestionEntryGraph::finalize] will return
/// a list of entries compute by performaing a DFS post order traversal of the graph
/// from the top-level directory entry.
///
/// This expects the directory structure to contain a single top-level directory entry.
/// An error is returned if this is not the case and ingestion will fail.
struct IngestionEntryGraph {
    graph: DiGraph<IngestionEntry, ()>,
    path_to_index: HashMap<crate::path::PathBuf, NodeIndex>,
    root_node: Option<NodeIndex>,
}

impl Default for IngestionEntryGraph {
    fn default() -> Self {
        Self::new()
    }
}

impl IngestionEntryGraph {
    /// Creates a new ingestion entry graph.
    pub fn new() -> Self {
        IngestionEntryGraph {
            graph: DiGraph::new(),
            path_to_index: HashMap::new(),
            root_node: None,
        }
    }

    /// Adds a new entry to the graph. Parent directories are automatically inserted.
    /// If a node exists in the graph with the same name as the new entry and both the old
    /// and new nodes are not directories, the node is replaced and is disconnected from its
    /// children.
    pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> {
        let path = entry.path().to_owned();

        let index = match self.path_to_index.get(entry.path()) {
            Some(&index) => {
                // If either the old entry or new entry are not directories, we'll replace the old
                // entry.
                if !entry.is_dir() || !self.get_node(index).is_dir() {
                    self.replace_node(index, entry);
                }

                index
            }
            None => self.graph.add_node(entry),
        };

        // for archives, a path with 1 component is the root node
        if path.components().count() == 1 {
            // We expect archives to contain a single root node, if there is another root node
            // entry with a different path name, this is unsupported.
            if let Some(root_node) = self.root_node {
                if self.get_node(root_node).path() != path.as_ref() {
                    return Err(Error::UnexpectedNumberOfTopLevelEntries);
                }
            }

            self.root_node = Some(index)
        } else if let Some(parent_path) = path.parent() {
            // Recursively add the parent node until it hits the root node.
            let parent_index = self.add(IngestionEntry::Dir {
                path: parent_path.to_owned(),
            })?;

            // Insert an edge from the parent directory to the child entry.
            self.graph.add_edge(parent_index, index, ());
        }

        self.path_to_index.insert(path, index);

        Ok(index)
    }

    /// Traverses the graph in DFS post order and collects the entries into a [Vec<IngestionEntry>].
    ///
    /// Unreachable parts of the graph are not included in the result.
    pub fn finalize(self) -> Result<Vec<IngestionEntry>, Error> {
        // There must be a root node.
        let Some(root_node_index) = self.root_node else {
            return Err(Error::UnexpectedNumberOfTopLevelEntries);
        };

        // The root node must be a directory.
        if !self.get_node(root_node_index).is_dir() {
            return Err(Error::UnexpectedNumberOfTopLevelEntries);
        }

        let mut traversal = DfsPostOrder::new(&self.graph, root_node_index);
        let mut nodes = Vec::with_capacity(self.graph.node_count());
        while let Some(node_index) = traversal.next(&self.graph) {
            nodes.push(self.get_node(node_index).clone());
        }

        Ok(nodes)
    }

    /// Replaces the node with the specified entry. The node's children are disconnected.
    ///
    /// This should never be called if both the old and new nodes are directories.
    fn replace_node(&mut self, index: NodeIndex, new_entry: IngestionEntry) {
        let entry = self
            .graph
            .node_weight_mut(index)
            .expect("Tvix bug: missing node entry");

        debug_assert!(!(entry.is_dir() && new_entry.is_dir()));

        // Replace the node itself.
        warn!(
            "saw duplicate entry in archive at path {:?}. old: {:?} new: {:?}",
            entry.path(),
            &entry,
            &new_entry
        );
        *entry = new_entry;

        // Remove any outgoing edges to disconnect the old node's children.
        let edges = self
            .graph
            .edges_directed(index, Direction::Outgoing)
            .map(|edge| edge.id())
            .collect::<Vec<_>>();
        for edge in edges {
            self.graph.remove_edge(edge);
        }
    }

    fn get_node(&self, index: NodeIndex) -> &IngestionEntry {
        self.graph
            .node_weight(index)
            .expect("Tvix bug: missing node entry")
    }
}

#[cfg(test)]
mod test {
    use crate::import::IngestionEntry;
    use crate::B3Digest;

    use super::{Error, IngestionEntryGraph};

    use lazy_static::lazy_static;
    use rstest::rstest;

    lazy_static! {
        pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into();
        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir {
            path: "a".parse().unwrap()
        };
        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir {
            path: "b".parse().unwrap()
        };
        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir {
            path: "a/b".parse().unwrap()
        };
        pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular {
            path: "a".parse().unwrap(),
            size: 0,
            executable: false,
            digest: EMPTY_DIGEST.clone(),
        };
        pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular {
            path: "a/b".parse().unwrap(),
            size: 0,
            executable: false,
            digest: EMPTY_DIGEST.clone(),
        };
        pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular {
            path: "a/b/c".parse().unwrap(),
            size: 0,
            executable: false,
            digest: EMPTY_DIGEST.clone(),
        };
    }

    #[rstest]
    #[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
    #[case::explicit_directories(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
    #[case::inaccesible_tree(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B], &[&*FILE_A_B, &*DIR_A])]
    fn node_ingestion_success(
        #[case] in_entries: &[&IngestionEntry],
        #[case] exp_entries: &[&IngestionEntry],
    ) {
        let mut nodes = IngestionEntryGraph::new();

        for entry in in_entries {
            nodes.add((*entry).clone()).expect("failed to add entry");
        }

        let entries = nodes.finalize().expect("invalid entries");

        let exp_entries: Vec<IngestionEntry> =
            exp_entries.iter().map(|entry| (*entry).clone()).collect();

        assert_eq!(entries, exp_entries);
    }

    #[rstest]
    #[case::no_top_level_entries(&[], Error::UnexpectedNumberOfTopLevelEntries)]
    #[case::multiple_top_level_dirs(&[&*DIR_A, &*DIR_B], Error::UnexpectedNumberOfTopLevelEntries)]
    #[case::top_level_file_entry(&[&*FILE_A], Error::UnexpectedNumberOfTopLevelEntries)]
    fn node_ingestion_error(#[case] in_entries: &[&IngestionEntry], #[case] exp_error: Error) {
        let mut nodes = IngestionEntryGraph::new();

        let result = (|| {
            for entry in in_entries {
                nodes.add((*entry).clone())?;
            }
            nodes.finalize()
        })();

        let error = result.expect_err("expected error");
        assert_eq!(error.to_string(), exp_error.to_string());
    }
}