#[cfg(target_family = "unix")] use std::os::unix::ffi::OsStrExt; use std::{ collections::HashMap, path::{Path, PathBuf}, }; use tokio::io::AsyncRead; use tokio_stream::StreamExt; use tokio_tar::Archive; use tracing::{instrument, Level}; use crate::{ blobservice::BlobService, directoryservice::{DirectoryPutter, DirectoryService}, import::Error, proto::{node::Node, Directory, DirectoryNode, FileNode, SymlinkNode}, }; /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and /// [`DirectoryService`]. #[instrument(skip_all, ret(level = Level::TRACE), err)] pub async fn ingest_archive<'a, BS, DS, R>( blob_service: BS, directory_service: DS, mut archive: Archive, ) -> Result where BS: AsRef + Clone, DS: AsRef, R: AsyncRead + Unpin, { // Since tarballs can have entries in any arbitrary order, we need to // buffer all of the directory metadata so we can reorder directory // contents and entries to meet the requires of the castore. // In the first phase, collect up all the regular files and symlinks. let mut paths = HashMap::new(); let mut entries = archive.entries().map_err(Error::Archive)?; while let Some(mut entry) = entries.try_next().await.map_err(Error::Archive)? { let path = entry.path().map_err(Error::Archive)?.into_owned(); let name = path .file_name() .ok_or_else(|| { Error::Archive(std::io::Error::new( std::io::ErrorKind::InvalidInput, "invalid filename in archive", )) })? .as_bytes() .to_vec() .into(); let node = match entry.header().entry_type() { tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { // TODO: If the same path is overwritten in the tarball, we may leave // an unreferenced blob after uploading. let mut writer = blob_service.as_ref().open_write().await; let size = tokio::io::copy(&mut entry, &mut writer) .await .map_err(Error::Archive)?; let digest = writer.close().await.map_err(Error::Archive)?; Node::File(FileNode { name, digest: digest.into(), size, executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0, }) } tokio_tar::EntryType::Symlink => Node::Symlink(SymlinkNode { name, target: entry .link_name() .map_err(Error::Archive)? .expect("symlink missing target") .as_os_str() .as_bytes() .to_vec() .into(), }), // Push a bogus directory marker so we can make sure this directoy gets // created. We don't know the digest and size until after reading the full // tarball. tokio_tar::EntryType::Directory => Node::Directory(DirectoryNode { name, digest: Default::default(), size: 0, }), tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), }; paths.insert(path, node); } // In the second phase, construct all of the directories. // Collect into a list and then sort so all entries in the same directory // are next to each other. // We can detect boundaries between each directories to determine // when to construct or push directory entries. let mut ordered_paths = paths.into_iter().collect::>(); ordered_paths.sort_by(|a, b| a.0.cmp(&b.0)); let mut directory_putter = directory_service.as_ref().put_multiple_start(); // Start with an initial directory at the root. let mut dir_stack = vec![(PathBuf::from(""), Directory::default())]; async fn pop_directory( dir_stack: &mut Vec<(PathBuf, Directory)>, directory_putter: &mut Box, ) -> Result { let (path, directory) = dir_stack.pop().unwrap(); directory .validate() .map_err(|e| Error::InvalidDirectory(path.to_path_buf(), e))?; let dir_node = DirectoryNode { name: path .file_name() .unwrap_or_default() .as_bytes() .to_vec() .into(), digest: directory.digest().into(), size: directory.size(), }; if let Some((_, parent)) = dir_stack.last_mut() { parent.directories.push(dir_node.clone()); } directory_putter.put(directory).await?; Ok(dir_node) } fn push_directories(path: &Path, dir_stack: &mut Vec<(PathBuf, Directory)>) { if path == dir_stack.last().unwrap().0 { return; } if let Some(parent) = path.parent() { push_directories(parent, dir_stack); } dir_stack.push((path.to_path_buf(), Directory::default())); } for (path, node) in ordered_paths.into_iter() { // Pop stack until the top dir is an ancestor of this entry. loop { let top = dir_stack.last().unwrap(); if path.ancestors().any(|ancestor| ancestor == top.0) { break; } pop_directory(&mut dir_stack, &mut directory_putter).await?; } // For directories, just ensure the directory node exists. if let Node::Directory(_) = node { push_directories(&path, &mut dir_stack); continue; } // Push all ancestor directories onto the stack. push_directories(path.parent().unwrap(), &mut dir_stack); let top = dir_stack.last_mut().unwrap(); debug_assert_eq!(Some(top.0.as_path()), path.parent()); match node { Node::File(n) => top.1.files.push(n), Node::Symlink(n) => top.1.symlinks.push(n), // We already handled directories above. Node::Directory(_) => unreachable!(), } } let mut root_node = None; while !dir_stack.is_empty() { // If the root directory only has 1 directory entry, we return the child entry // instead... weeeee if dir_stack.len() == 1 && dir_stack.last().unwrap().1.directories.len() == 1 { break; } root_node = Some(pop_directory(&mut dir_stack, &mut directory_putter).await?); } let root_node = root_node.expect("no root node"); let root_digest = directory_putter.close().await?; debug_assert_eq!(root_digest.as_slice(), &root_node.digest); Ok(Node::Directory(root_node)) }