about summary refs log blame commit diff
path: root/tvix/castore/src/import/archive.rs
blob: d0ae3c67411cf1abc8319df9a423c6345f6bd455 (plain) (tree)








































































































































































































                                                                                            
#[cfg(target_family = "unix")]
use std::os::unix::ffi::OsStrExt;
use std::{
    collections::HashMap,
    path::{Path, PathBuf},
};

use tokio::io::AsyncRead;
use tokio_stream::StreamExt;
use tokio_tar::Archive;
use tracing::{instrument, Level};

use crate::{
    blobservice::BlobService,
    directoryservice::{DirectoryPutter, DirectoryService},
    import::Error,
    proto::{node::Node, Directory, DirectoryNode, FileNode, SymlinkNode},
};

/// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
/// [`DirectoryService`].
#[instrument(skip_all, ret(level = Level::TRACE), err)]
pub async fn ingest_archive<'a, BS, DS, R>(
    blob_service: BS,
    directory_service: DS,
    mut archive: Archive<R>,
) -> Result<Node, Error>
where
    BS: AsRef<dyn BlobService> + Clone,
    DS: AsRef<dyn DirectoryService>,
    R: AsyncRead + Unpin,
{
    // Since tarballs can have entries in any arbitrary order, we need to
    // buffer all of the directory metadata so we can reorder directory
    // contents and entries to meet the requires of the castore.

    // In the first phase, collect up all the regular files and symlinks.
    let mut paths = HashMap::new();
    let mut entries = archive.entries().map_err(Error::Archive)?;
    while let Some(mut entry) = entries.try_next().await.map_err(Error::Archive)? {
        let path = entry.path().map_err(Error::Archive)?.into_owned();
        let name = path
            .file_name()
            .ok_or_else(|| {
                Error::Archive(std::io::Error::new(
                    std::io::ErrorKind::InvalidInput,
                    "invalid filename in archive",
                ))
            })?
            .as_bytes()
            .to_vec()
            .into();

        let node = match entry.header().entry_type() {
            tokio_tar::EntryType::Regular
            | tokio_tar::EntryType::GNUSparse
            | tokio_tar::EntryType::Continuous => {
                // TODO: If the same path is overwritten in the tarball, we may leave
                // an unreferenced blob after uploading.
                let mut writer = blob_service.as_ref().open_write().await;
                let size = tokio::io::copy(&mut entry, &mut writer)
                    .await
                    .map_err(Error::Archive)?;
                let digest = writer.close().await.map_err(Error::Archive)?;
                Node::File(FileNode {
                    name,
                    digest: digest.into(),
                    size,
                    executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0,
                })
            }
            tokio_tar::EntryType::Symlink => Node::Symlink(SymlinkNode {
                name,
                target: entry
                    .link_name()
                    .map_err(Error::Archive)?
                    .expect("symlink missing target")
                    .as_os_str()
                    .as_bytes()
                    .to_vec()
                    .into(),
            }),
            // Push a bogus directory marker so we can make sure this directoy gets
            // created. We don't know the digest and size until after reading the full
            // tarball.
            tokio_tar::EntryType::Directory => Node::Directory(DirectoryNode {
                name,
                digest: Default::default(),
                size: 0,
            }),

            tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,

            entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)),
        };

        paths.insert(path, node);
    }

    // In the second phase, construct all of the directories.

    // Collect into a list and then sort so all entries in the same directory
    // are next to each other.
    // We can detect boundaries between each directories to determine
    // when to construct or push directory entries.
    let mut ordered_paths = paths.into_iter().collect::<Vec<_>>();
    ordered_paths.sort_by(|a, b| a.0.cmp(&b.0));

    let mut directory_putter = directory_service.as_ref().put_multiple_start();

    // Start with an initial directory at the root.
    let mut dir_stack = vec![(PathBuf::from(""), Directory::default())];

    async fn pop_directory(
        dir_stack: &mut Vec<(PathBuf, Directory)>,
        directory_putter: &mut Box<dyn DirectoryPutter>,
    ) -> Result<DirectoryNode, Error> {
        let (path, directory) = dir_stack.pop().unwrap();

        directory
            .validate()
            .map_err(|e| Error::InvalidDirectory(path.to_path_buf(), e))?;

        let dir_node = DirectoryNode {
            name: path
                .file_name()
                .unwrap_or_default()
                .as_bytes()
                .to_vec()
                .into(),
            digest: directory.digest().into(),
            size: directory.size(),
        };

        if let Some((_, parent)) = dir_stack.last_mut() {
            parent.directories.push(dir_node.clone());
        }

        directory_putter.put(directory).await?;

        Ok(dir_node)
    }

    fn push_directories(path: &Path, dir_stack: &mut Vec<(PathBuf, Directory)>) {
        if path == dir_stack.last().unwrap().0 {
            return;
        }
        if let Some(parent) = path.parent() {
            push_directories(parent, dir_stack);
        }
        dir_stack.push((path.to_path_buf(), Directory::default()));
    }

    for (path, node) in ordered_paths.into_iter() {
        // Pop stack until the top dir is an ancestor of this entry.
        loop {
            let top = dir_stack.last().unwrap();
            if path.ancestors().any(|ancestor| ancestor == top.0) {
                break;
            }

            pop_directory(&mut dir_stack, &mut directory_putter).await?;
        }

        // For directories, just ensure the directory node exists.
        if let Node::Directory(_) = node {
            push_directories(&path, &mut dir_stack);
            continue;
        }

        // Push all ancestor directories onto the stack.
        push_directories(path.parent().unwrap(), &mut dir_stack);

        let top = dir_stack.last_mut().unwrap();
        debug_assert_eq!(Some(top.0.as_path()), path.parent());

        match node {
            Node::File(n) => top.1.files.push(n),
            Node::Symlink(n) => top.1.symlinks.push(n),
            // We already handled directories above.
            Node::Directory(_) => unreachable!(),
        }
    }

    let mut root_node = None;
    while !dir_stack.is_empty() {
        // If the root directory only has 1 directory entry, we return the child entry
        // instead... weeeee
        if dir_stack.len() == 1 && dir_stack.last().unwrap().1.directories.len() == 1 {
            break;
        }
        root_node = Some(pop_directory(&mut dir_stack, &mut directory_putter).await?);
    }
    let root_node = root_node.expect("no root node");

    let root_digest = directory_putter.close().await?;

    debug_assert_eq!(root_digest.as_slice(), &root_node.digest);

    Ok(Node::Directory(root_node))
}