about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/Cargo.lock1
-rw-r--r--tvix/Cargo.nix4
-rw-r--r--tvix/castore/Cargo.toml1
-rw-r--r--tvix/castore/src/import/archive.rs201
-rw-r--r--tvix/castore/src/import/error.rs11
-rw-r--r--tvix/castore/src/import/mod.rs2
-rw-r--r--tvix/glue/src/builtins/fetchers.rs21
-rw-r--r--tvix/glue/src/builtins/import.rs2
-rw-r--r--tvix/glue/src/tvix_store_io.rs93
-rw-r--r--users/picnoir/tvix-daemon/Cargo.nix3
10 files changed, 306 insertions, 33 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index df7c0dbfbe76..364fbed662af 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -4292,6 +4292,7 @@ dependencies = [
  "tokio",
  "tokio-retry",
  "tokio-stream",
+ "tokio-tar",
  "tokio-util",
  "tonic 0.11.0",
  "tonic-build",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 38cb770921e7..9efdb7dee94e 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -13656,6 +13656,10 @@ rec {
             features = [ "fs" "net" ];
           }
           {
+            name = "tokio-tar";
+            packageId = "tokio-tar";
+          }
+          {
             name = "tokio-util";
             packageId = "tokio-util";
             features = [ "io" "io-util" ];
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index b68922b7ce07..f54bb2ddb5b4 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -22,6 +22,7 @@ sled = { version = "0.34.7" }
 thiserror = "1.0.38"
 tokio-stream = { version = "0.1.14", features = ["fs", "net"] }
 tokio-util = { version = "0.7.9", features = ["io", "io-util"] }
+tokio-tar = "0.3.1"
 tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
 tonic = "0.11.0"
 tower = "0.4.13"
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
new file mode 100644
index 000000000000..d0ae3c67411c
--- /dev/null
+++ b/tvix/castore/src/import/archive.rs
@@ -0,0 +1,201 @@
+#[cfg(target_family = "unix")]
+use std::os::unix::ffi::OsStrExt;
+use std::{
+    collections::HashMap,
+    path::{Path, PathBuf},
+};
+
+use tokio::io::AsyncRead;
+use tokio_stream::StreamExt;
+use tokio_tar::Archive;
+use tracing::{instrument, Level};
+
+use crate::{
+    blobservice::BlobService,
+    directoryservice::{DirectoryPutter, DirectoryService},
+    import::Error,
+    proto::{node::Node, Directory, DirectoryNode, FileNode, SymlinkNode},
+};
+
+/// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
+/// [`DirectoryService`].
+#[instrument(skip_all, ret(level = Level::TRACE), err)]
+pub async fn ingest_archive<'a, BS, DS, R>(
+    blob_service: BS,
+    directory_service: DS,
+    mut archive: Archive<R>,
+) -> Result<Node, Error>
+where
+    BS: AsRef<dyn BlobService> + Clone,
+    DS: AsRef<dyn DirectoryService>,
+    R: AsyncRead + Unpin,
+{
+    // Since tarballs can have entries in any arbitrary order, we need to
+    // buffer all of the directory metadata so we can reorder directory
+    // contents and entries to meet the requires of the castore.
+
+    // In the first phase, collect up all the regular files and symlinks.
+    let mut paths = HashMap::new();
+    let mut entries = archive.entries().map_err(Error::Archive)?;
+    while let Some(mut entry) = entries.try_next().await.map_err(Error::Archive)? {
+        let path = entry.path().map_err(Error::Archive)?.into_owned();
+        let name = path
+            .file_name()
+            .ok_or_else(|| {
+                Error::Archive(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    "invalid filename in archive",
+                ))
+            })?
+            .as_bytes()
+            .to_vec()
+            .into();
+
+        let node = match entry.header().entry_type() {
+            tokio_tar::EntryType::Regular
+            | tokio_tar::EntryType::GNUSparse
+            | tokio_tar::EntryType::Continuous => {
+                // TODO: If the same path is overwritten in the tarball, we may leave
+                // an unreferenced blob after uploading.
+                let mut writer = blob_service.as_ref().open_write().await;
+                let size = tokio::io::copy(&mut entry, &mut writer)
+                    .await
+                    .map_err(Error::Archive)?;
+                let digest = writer.close().await.map_err(Error::Archive)?;
+                Node::File(FileNode {
+                    name,
+                    digest: digest.into(),
+                    size,
+                    executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0,
+                })
+            }
+            tokio_tar::EntryType::Symlink => Node::Symlink(SymlinkNode {
+                name,
+                target: entry
+                    .link_name()
+                    .map_err(Error::Archive)?
+                    .expect("symlink missing target")
+                    .as_os_str()
+                    .as_bytes()
+                    .to_vec()
+                    .into(),
+            }),
+            // Push a bogus directory marker so we can make sure this directoy gets
+            // created. We don't know the digest and size until after reading the full
+            // tarball.
+            tokio_tar::EntryType::Directory => Node::Directory(DirectoryNode {
+                name,
+                digest: Default::default(),
+                size: 0,
+            }),
+
+            tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
+
+            entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)),
+        };
+
+        paths.insert(path, node);
+    }
+
+    // In the second phase, construct all of the directories.
+
+    // Collect into a list and then sort so all entries in the same directory
+    // are next to each other.
+    // We can detect boundaries between each directories to determine
+    // when to construct or push directory entries.
+    let mut ordered_paths = paths.into_iter().collect::<Vec<_>>();
+    ordered_paths.sort_by(|a, b| a.0.cmp(&b.0));
+
+    let mut directory_putter = directory_service.as_ref().put_multiple_start();
+
+    // Start with an initial directory at the root.
+    let mut dir_stack = vec![(PathBuf::from(""), Directory::default())];
+
+    async fn pop_directory(
+        dir_stack: &mut Vec<(PathBuf, Directory)>,
+        directory_putter: &mut Box<dyn DirectoryPutter>,
+    ) -> Result<DirectoryNode, Error> {
+        let (path, directory) = dir_stack.pop().unwrap();
+
+        directory
+            .validate()
+            .map_err(|e| Error::InvalidDirectory(path.to_path_buf(), e))?;
+
+        let dir_node = DirectoryNode {
+            name: path
+                .file_name()
+                .unwrap_or_default()
+                .as_bytes()
+                .to_vec()
+                .into(),
+            digest: directory.digest().into(),
+            size: directory.size(),
+        };
+
+        if let Some((_, parent)) = dir_stack.last_mut() {
+            parent.directories.push(dir_node.clone());
+        }
+
+        directory_putter.put(directory).await?;
+
+        Ok(dir_node)
+    }
+
+    fn push_directories(path: &Path, dir_stack: &mut Vec<(PathBuf, Directory)>) {
+        if path == dir_stack.last().unwrap().0 {
+            return;
+        }
+        if let Some(parent) = path.parent() {
+            push_directories(parent, dir_stack);
+        }
+        dir_stack.push((path.to_path_buf(), Directory::default()));
+    }
+
+    for (path, node) in ordered_paths.into_iter() {
+        // Pop stack until the top dir is an ancestor of this entry.
+        loop {
+            let top = dir_stack.last().unwrap();
+            if path.ancestors().any(|ancestor| ancestor == top.0) {
+                break;
+            }
+
+            pop_directory(&mut dir_stack, &mut directory_putter).await?;
+        }
+
+        // For directories, just ensure the directory node exists.
+        if let Node::Directory(_) = node {
+            push_directories(&path, &mut dir_stack);
+            continue;
+        }
+
+        // Push all ancestor directories onto the stack.
+        push_directories(path.parent().unwrap(), &mut dir_stack);
+
+        let top = dir_stack.last_mut().unwrap();
+        debug_assert_eq!(Some(top.0.as_path()), path.parent());
+
+        match node {
+            Node::File(n) => top.1.files.push(n),
+            Node::Symlink(n) => top.1.symlinks.push(n),
+            // We already handled directories above.
+            Node::Directory(_) => unreachable!(),
+        }
+    }
+
+    let mut root_node = None;
+    while !dir_stack.is_empty() {
+        // If the root directory only has 1 directory entry, we return the child entry
+        // instead... weeeee
+        if dir_stack.len() == 1 && dir_stack.last().unwrap().1.directories.len() == 1 {
+            break;
+        }
+        root_node = Some(pop_directory(&mut dir_stack, &mut directory_putter).await?);
+    }
+    let root_node = root_node.expect("no root node");
+
+    let root_digest = directory_putter.close().await?;
+
+    debug_assert_eq!(root_digest.as_slice(), &root_node.digest);
+
+    Ok(Node::Directory(root_node))
+}
diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs
index 15dd0664deaa..18c71aa235b8 100644
--- a/tvix/castore/src/import/error.rs
+++ b/tvix/castore/src/import/error.rs
@@ -1,6 +1,6 @@
 use std::{fs::FileType, path::PathBuf};
 
-use crate::Error as CastoreError;
+use crate::{proto::ValidateDirectoryError, Error as CastoreError};
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
@@ -19,8 +19,17 @@ pub enum Error {
     #[error("unable to read {0}: {1}")]
     UnableToRead(PathBuf, std::io::Error),
 
+    #[error("error reading from archive: {0}")]
+    Archive(std::io::Error),
+
     #[error("unsupported file {0} type: {1:?}")]
     UnsupportedFileType(PathBuf, FileType),
+
+    #[error("invalid directory contents {0}: {1}")]
+    InvalidDirectory(PathBuf, ValidateDirectoryError),
+
+    #[error("unsupported tar entry {0} type: {1:?}")]
+    UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
 }
 
 impl From<CastoreError> for Error {
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index ff27c0fcfd2b..c5887685bbdb 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -1,4 +1,3 @@
-//! Deals with ingesting contents into castore.
 //! The main library function here is [ingest_entries], receiving a stream of
 //! [IngestionEntry].
 //!
@@ -32,6 +31,7 @@ use tracing::instrument;
 mod error;
 pub use error::Error;
 
+pub mod archive;
 pub mod fs;
 
 /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService].
diff --git a/tvix/glue/src/builtins/fetchers.rs b/tvix/glue/src/builtins/fetchers.rs
index cbb57532f6b3..d5735b7d09a7 100644
--- a/tvix/glue/src/builtins/fetchers.rs
+++ b/tvix/glue/src/builtins/fetchers.rs
@@ -175,12 +175,21 @@ async fn fetch(
         }
     }
 
-    let hash = args.hash.as_ref().map(|h| h.hash());
-    let store_path = Rc::clone(&state).tokio_handle.block_on(state.fetch_url(
-        &args.url,
-        &args.name,
-        hash.as_deref(),
-    ))?;
+    let ca = args.hash;
+    let store_path = Rc::clone(&state).tokio_handle.block_on(async move {
+        match mode {
+            FetchMode::Url => {
+                state
+                    .fetch_url(
+                        &args.url,
+                        &args.name,
+                        ca.as_ref().map(|c| c.hash().into_owned()).as_ref(),
+                    )
+                    .await
+            }
+            FetchMode::Tarball => state.fetch_tarball(&args.url, &args.name, ca).await,
+        }
+    })?;
 
     Ok(string_from_store_path(store_path.as_ref()).into())
 }
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs
index 639095c459e0..df3d2178696d 100644
--- a/tvix/glue/src/builtins/import.rs
+++ b/tvix/glue/src/builtins/import.rs
@@ -205,7 +205,7 @@ mod import_builtins {
         };
 
         let obtained_hash = ca.hash().clone().into_owned();
-        let (path_info, output_path) = state.tokio_handle.block_on(async {
+        let (path_info, _hash, output_path) = state.tokio_handle.block_on(async {
             state
                 .node_to_path_info(name.as_ref(), path.as_ref(), ca, root_node)
                 .await
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index f0f2f5cf918b..8f44d2fe834d 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -15,9 +15,11 @@ use std::{
     path::{Path, PathBuf},
     sync::Arc,
 };
-use tokio_util::io::SyncIoBridge;
+use tokio::io::AsyncBufRead;
+use tokio_util::io::{InspectReader, SyncIoBridge};
 use tracing::{error, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
+use tvix_castore::import::archive::ingest_archive;
 use tvix_castore::import::fs::dir_entry_iter_to_ingestion_stream;
 use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
 use tvix_store::utils::AsyncIoBridge;
@@ -32,6 +34,7 @@ use tvix_castore::{
 use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
 
 use crate::builtins::FetcherError;
+use crate::decompression::DecompressedReader;
 use crate::known_paths::KnownPaths;
 use crate::tvix_build::derivation_to_build_request;
 
@@ -298,7 +301,7 @@ impl TvixStoreIO {
         path: &Path,
         ca: CAHash,
         root_node: Node,
-    ) -> io::Result<(PathInfo, StorePath)> {
+    ) -> io::Result<(PathInfo, NixHash, StorePath)> {
         // Ask the PathInfoService for the NAR size and sha256
         // We always need it no matter what is the actual hash mode
         // because the path info construct a narinfo which *always*
@@ -327,7 +330,11 @@ impl TvixStoreIO {
         let path_info =
             tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, Some(ca), root_node);
 
-        Ok((path_info, output_path.to_owned()))
+        Ok((
+            path_info,
+            NixHash::Sha256(nar_sha256),
+            output_path.to_owned(),
+        ))
     }
 
     pub(crate) async fn register_node_in_path_info_service(
@@ -337,7 +344,7 @@ impl TvixStoreIO {
         ca: CAHash,
         root_node: Node,
     ) -> io::Result<StorePath> {
-        let (path_info, output_path) = self.node_to_path_info(name, path, ca, root_node).await?;
+        let (path_info, _, output_path) = self.node_to_path_info(name, path, ca, root_node).await?;
         let _path_info = self.path_info_service.as_ref().put(path_info).await?;
 
         Ok(output_path)
@@ -372,33 +379,34 @@ impl TvixStoreIO {
             .is_some())
     }
 
-    pub async fn fetch_url(
-        &self,
-        url: &str,
-        name: &str,
-        hash: Option<&NixHash>,
-    ) -> Result<StorePath, ErrorKind> {
+    async fn download<'a>(&self, url: &str) -> Result<impl AsyncBufRead + Unpin + 'a, ErrorKind> {
         let resp = self
             .http_client
             .get(url)
             .send()
             .await
             .map_err(FetcherError::from)?;
-        let mut sha = Sha256::new();
-        let mut data = tokio_util::io::StreamReader::new(
-            resp.bytes_stream()
-                .inspect_ok(|data| {
-                    sha.update(data);
-                })
-                .map_err(|e| {
-                    let e = e.without_url();
-                    warn!(%e, "failed to get response body");
-                    io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
-                }),
-        );
+        Ok(tokio_util::io::StreamReader::new(
+            resp.bytes_stream().map_err(|e| {
+                let e = e.without_url();
+                warn!(%e, "failed to get response body");
+                io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+            }),
+        ))
+    }
 
+    pub async fn fetch_url(
+        &self,
+        url: &str,
+        name: &str,
+        hash: Option<&NixHash>,
+    ) -> Result<StorePath, ErrorKind> {
+        let mut sha = Sha256::new();
+        let data = self.download(url).await?;
+        let mut data = InspectReader::new(data, |b| sha.update(b));
         let mut blob = self.blob_service.open_write().await;
         let size = tokio::io::copy(&mut data, blob.as_mut()).await?;
+        drop(data);
         let blob_digest = blob.close().await?;
         let got = NixHash::Sha256(sha.finalize().into());
 
@@ -453,6 +461,47 @@ impl TvixStoreIO {
 
         Ok(path.to_owned())
     }
+
+    pub async fn fetch_tarball(
+        &self,
+        url: &str,
+        name: &str,
+        ca: Option<CAHash>,
+    ) -> Result<StorePath, ErrorKind> {
+        let data = self.download(url).await?;
+        let data = DecompressedReader::new(data);
+        let archive = tokio_tar::Archive::new(data);
+        let node = ingest_archive(&self.blob_service, &self.directory_service, archive)
+            .await
+            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+
+        let (path_info, got, output_path) = self
+            .node_to_path_info(
+                name,
+                Path::new(""),
+                ca.clone().expect("TODO: support unspecified CA hash"),
+                node,
+            )
+            .await?;
+
+        if let Some(wanted) = &ca {
+            if *wanted.hash() != got {
+                return Err(FetcherError::HashMismatch {
+                    url: url.to_owned(),
+                    wanted: wanted.hash().into_owned(),
+                    got,
+                }
+                .into());
+            }
+        }
+
+        self.path_info_service
+            .put(path_info)
+            .await
+            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+
+        Ok(output_path)
+    }
 }
 
 impl EvalIO for TvixStoreIO {
diff --git a/users/picnoir/tvix-daemon/Cargo.nix b/users/picnoir/tvix-daemon/Cargo.nix
index d73a65c82121..2382027f9b13 100644
--- a/users/picnoir/tvix-daemon/Cargo.nix
+++ b/users/picnoir/tvix-daemon/Cargo.nix
@@ -2366,8 +2366,7 @@ rec {
           }
         ];
         features = {
-          "async" = [ "futures-util" ];
-          "futures-util" = [ "dep:futures-util" ];
+          "async" = [ "tokio" ];
           "pin-project-lite" = [ "dep:pin-project-lite" ];
           "tokio" = [ "dep:tokio" ];
           "wire" = [ "tokio" "pin-project-lite" ];