about summary refs log tree commit diff
path: root/tvix/castore/src/import
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/import')
-rw-r--r--tvix/castore/src/import/archive.rs201
-rw-r--r--tvix/castore/src/import/error.rs11
-rw-r--r--tvix/castore/src/import/mod.rs2
3 files changed, 212 insertions, 2 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
new file mode 100644
index 000000000000..d0ae3c67411c
--- /dev/null
+++ b/tvix/castore/src/import/archive.rs
@@ -0,0 +1,201 @@
+#[cfg(target_family = "unix")]
+use std::os::unix::ffi::OsStrExt;
+use std::{
+    collections::HashMap,
+    path::{Path, PathBuf},
+};
+
+use tokio::io::AsyncRead;
+use tokio_stream::StreamExt;
+use tokio_tar::Archive;
+use tracing::{instrument, Level};
+
+use crate::{
+    blobservice::BlobService,
+    directoryservice::{DirectoryPutter, DirectoryService},
+    import::Error,
+    proto::{node::Node, Directory, DirectoryNode, FileNode, SymlinkNode},
+};
+
+/// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
+/// [`DirectoryService`].
+#[instrument(skip_all, ret(level = Level::TRACE), err)]
+pub async fn ingest_archive<'a, BS, DS, R>(
+    blob_service: BS,
+    directory_service: DS,
+    mut archive: Archive<R>,
+) -> Result<Node, Error>
+where
+    BS: AsRef<dyn BlobService> + Clone,
+    DS: AsRef<dyn DirectoryService>,
+    R: AsyncRead + Unpin,
+{
+    // Since tarballs can have entries in any arbitrary order, we need to
+    // buffer all of the directory metadata so we can reorder directory
+    // contents and entries to meet the requires of the castore.
+
+    // In the first phase, collect up all the regular files and symlinks.
+    let mut paths = HashMap::new();
+    let mut entries = archive.entries().map_err(Error::Archive)?;
+    while let Some(mut entry) = entries.try_next().await.map_err(Error::Archive)? {
+        let path = entry.path().map_err(Error::Archive)?.into_owned();
+        let name = path
+            .file_name()
+            .ok_or_else(|| {
+                Error::Archive(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    "invalid filename in archive",
+                ))
+            })?
+            .as_bytes()
+            .to_vec()
+            .into();
+
+        let node = match entry.header().entry_type() {
+            tokio_tar::EntryType::Regular
+            | tokio_tar::EntryType::GNUSparse
+            | tokio_tar::EntryType::Continuous => {
+                // TODO: If the same path is overwritten in the tarball, we may leave
+                // an unreferenced blob after uploading.
+                let mut writer = blob_service.as_ref().open_write().await;
+                let size = tokio::io::copy(&mut entry, &mut writer)
+                    .await
+                    .map_err(Error::Archive)?;
+                let digest = writer.close().await.map_err(Error::Archive)?;
+                Node::File(FileNode {
+                    name,
+                    digest: digest.into(),
+                    size,
+                    executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0,
+                })
+            }
+            tokio_tar::EntryType::Symlink => Node::Symlink(SymlinkNode {
+                name,
+                target: entry
+                    .link_name()
+                    .map_err(Error::Archive)?
+                    .expect("symlink missing target")
+                    .as_os_str()
+                    .as_bytes()
+                    .to_vec()
+                    .into(),
+            }),
+            // Push a bogus directory marker so we can make sure this directoy gets
+            // created. We don't know the digest and size until after reading the full
+            // tarball.
+            tokio_tar::EntryType::Directory => Node::Directory(DirectoryNode {
+                name,
+                digest: Default::default(),
+                size: 0,
+            }),
+
+            tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
+
+            entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)),
+        };
+
+        paths.insert(path, node);
+    }
+
+    // In the second phase, construct all of the directories.
+
+    // Collect into a list and then sort so all entries in the same directory
+    // are next to each other.
+    // We can detect boundaries between each directories to determine
+    // when to construct or push directory entries.
+    let mut ordered_paths = paths.into_iter().collect::<Vec<_>>();
+    ordered_paths.sort_by(|a, b| a.0.cmp(&b.0));
+
+    let mut directory_putter = directory_service.as_ref().put_multiple_start();
+
+    // Start with an initial directory at the root.
+    let mut dir_stack = vec![(PathBuf::from(""), Directory::default())];
+
+    async fn pop_directory(
+        dir_stack: &mut Vec<(PathBuf, Directory)>,
+        directory_putter: &mut Box<dyn DirectoryPutter>,
+    ) -> Result<DirectoryNode, Error> {
+        let (path, directory) = dir_stack.pop().unwrap();
+
+        directory
+            .validate()
+            .map_err(|e| Error::InvalidDirectory(path.to_path_buf(), e))?;
+
+        let dir_node = DirectoryNode {
+            name: path
+                .file_name()
+                .unwrap_or_default()
+                .as_bytes()
+                .to_vec()
+                .into(),
+            digest: directory.digest().into(),
+            size: directory.size(),
+        };
+
+        if let Some((_, parent)) = dir_stack.last_mut() {
+            parent.directories.push(dir_node.clone());
+        }
+
+        directory_putter.put(directory).await?;
+
+        Ok(dir_node)
+    }
+
+    fn push_directories(path: &Path, dir_stack: &mut Vec<(PathBuf, Directory)>) {
+        if path == dir_stack.last().unwrap().0 {
+            return;
+        }
+        if let Some(parent) = path.parent() {
+            push_directories(parent, dir_stack);
+        }
+        dir_stack.push((path.to_path_buf(), Directory::default()));
+    }
+
+    for (path, node) in ordered_paths.into_iter() {
+        // Pop stack until the top dir is an ancestor of this entry.
+        loop {
+            let top = dir_stack.last().unwrap();
+            if path.ancestors().any(|ancestor| ancestor == top.0) {
+                break;
+            }
+
+            pop_directory(&mut dir_stack, &mut directory_putter).await?;
+        }
+
+        // For directories, just ensure the directory node exists.
+        if let Node::Directory(_) = node {
+            push_directories(&path, &mut dir_stack);
+            continue;
+        }
+
+        // Push all ancestor directories onto the stack.
+        push_directories(path.parent().unwrap(), &mut dir_stack);
+
+        let top = dir_stack.last_mut().unwrap();
+        debug_assert_eq!(Some(top.0.as_path()), path.parent());
+
+        match node {
+            Node::File(n) => top.1.files.push(n),
+            Node::Symlink(n) => top.1.symlinks.push(n),
+            // We already handled directories above.
+            Node::Directory(_) => unreachable!(),
+        }
+    }
+
+    let mut root_node = None;
+    while !dir_stack.is_empty() {
+        // If the root directory only has 1 directory entry, we return the child entry
+        // instead... weeeee
+        if dir_stack.len() == 1 && dir_stack.last().unwrap().1.directories.len() == 1 {
+            break;
+        }
+        root_node = Some(pop_directory(&mut dir_stack, &mut directory_putter).await?);
+    }
+    let root_node = root_node.expect("no root node");
+
+    let root_digest = directory_putter.close().await?;
+
+    debug_assert_eq!(root_digest.as_slice(), &root_node.digest);
+
+    Ok(Node::Directory(root_node))
+}
diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs
index 15dd0664deaa..18c71aa235b8 100644
--- a/tvix/castore/src/import/error.rs
+++ b/tvix/castore/src/import/error.rs
@@ -1,6 +1,6 @@
 use std::{fs::FileType, path::PathBuf};
 
-use crate::Error as CastoreError;
+use crate::{proto::ValidateDirectoryError, Error as CastoreError};
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
@@ -19,8 +19,17 @@ pub enum Error {
     #[error("unable to read {0}: {1}")]
     UnableToRead(PathBuf, std::io::Error),
 
+    #[error("error reading from archive: {0}")]
+    Archive(std::io::Error),
+
     #[error("unsupported file {0} type: {1:?}")]
     UnsupportedFileType(PathBuf, FileType),
+
+    #[error("invalid directory contents {0}: {1}")]
+    InvalidDirectory(PathBuf, ValidateDirectoryError),
+
+    #[error("unsupported tar entry {0} type: {1:?}")]
+    UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
 }
 
 impl From<CastoreError> for Error {
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index ff27c0fcfd2b..c5887685bbdb 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -1,4 +1,3 @@
-//! Deals with ingesting contents into castore.
 //! The main library function here is [ingest_entries], receiving a stream of
 //! [IngestionEntry].
 //!
@@ -32,6 +31,7 @@ use tracing::instrument;
 mod error;
 pub use error::Error;
 
+pub mod archive;
 pub mod fs;
 
 /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService].