diff options
-rw-r--r-- | tvix/Cargo.lock | 1 | ||||
-rw-r--r-- | tvix/Cargo.nix | 4 | ||||
-rw-r--r-- | tvix/castore/Cargo.toml | 1 | ||||
-rw-r--r-- | tvix/castore/src/import/archive.rs | 201 | ||||
-rw-r--r-- | tvix/castore/src/import/error.rs | 11 | ||||
-rw-r--r-- | tvix/castore/src/import/mod.rs | 2 | ||||
-rw-r--r-- | tvix/glue/src/builtins/fetchers.rs | 21 | ||||
-rw-r--r-- | tvix/glue/src/builtins/import.rs | 2 | ||||
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 93 | ||||
-rw-r--r-- | users/picnoir/tvix-daemon/Cargo.nix | 3 |
10 files changed, 306 insertions, 33 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index df7c0dbfbe76..364fbed662af 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -4292,6 +4292,7 @@ dependencies = [ "tokio", "tokio-retry", "tokio-stream", + "tokio-tar", "tokio-util", "tonic 0.11.0", "tonic-build", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 38cb770921e7..9efdb7dee94e 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -13656,6 +13656,10 @@ rec { features = [ "fs" "net" ]; } { + name = "tokio-tar"; + packageId = "tokio-tar"; + } + { name = "tokio-util"; packageId = "tokio-util"; features = [ "io" "io-util" ]; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index b68922b7ce07..f54bb2ddb5b4 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -22,6 +22,7 @@ sled = { version = "0.34.7" } thiserror = "1.0.38" tokio-stream = { version = "0.1.14", features = ["fs", "net"] } tokio-util = { version = "0.7.9", features = ["io", "io-util"] } +tokio-tar = "0.3.1" tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } tonic = "0.11.0" tower = "0.4.13" diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs new file mode 100644 index 000000000000..d0ae3c67411c --- /dev/null +++ b/tvix/castore/src/import/archive.rs @@ -0,0 +1,201 @@ +#[cfg(target_family = "unix")] +use std::os::unix::ffi::OsStrExt; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, +}; + +use tokio::io::AsyncRead; +use tokio_stream::StreamExt; +use tokio_tar::Archive; +use tracing::{instrument, Level}; + +use crate::{ + blobservice::BlobService, + directoryservice::{DirectoryPutter, DirectoryService}, + import::Error, + proto::{node::Node, Directory, DirectoryNode, FileNode, SymlinkNode}, +}; + +/// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and +/// [`DirectoryService`]. +#[instrument(skip_all, ret(level = Level::TRACE), err)] +pub async fn ingest_archive<'a, BS, DS, R>( + blob_service: BS, + directory_service: DS, + mut archive: Archive<R>, +) -> Result<Node, Error> +where + BS: AsRef<dyn BlobService> + Clone, + DS: AsRef<dyn DirectoryService>, + R: AsyncRead + Unpin, +{ + // Since tarballs can have entries in any arbitrary order, we need to + // buffer all of the directory metadata so we can reorder directory + // contents and entries to meet the requires of the castore. + + // In the first phase, collect up all the regular files and symlinks. + let mut paths = HashMap::new(); + let mut entries = archive.entries().map_err(Error::Archive)?; + while let Some(mut entry) = entries.try_next().await.map_err(Error::Archive)? { + let path = entry.path().map_err(Error::Archive)?.into_owned(); + let name = path + .file_name() + .ok_or_else(|| { + Error::Archive(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "invalid filename in archive", + )) + })? + .as_bytes() + .to_vec() + .into(); + + let node = match entry.header().entry_type() { + tokio_tar::EntryType::Regular + | tokio_tar::EntryType::GNUSparse + | tokio_tar::EntryType::Continuous => { + // TODO: If the same path is overwritten in the tarball, we may leave + // an unreferenced blob after uploading. + let mut writer = blob_service.as_ref().open_write().await; + let size = tokio::io::copy(&mut entry, &mut writer) + .await + .map_err(Error::Archive)?; + let digest = writer.close().await.map_err(Error::Archive)?; + Node::File(FileNode { + name, + digest: digest.into(), + size, + executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0, + }) + } + tokio_tar::EntryType::Symlink => Node::Symlink(SymlinkNode { + name, + target: entry + .link_name() + .map_err(Error::Archive)? + .expect("symlink missing target") + .as_os_str() + .as_bytes() + .to_vec() + .into(), + }), + // Push a bogus directory marker so we can make sure this directoy gets + // created. We don't know the digest and size until after reading the full + // tarball. + tokio_tar::EntryType::Directory => Node::Directory(DirectoryNode { + name, + digest: Default::default(), + size: 0, + }), + + tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, + + entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), + }; + + paths.insert(path, node); + } + + // In the second phase, construct all of the directories. + + // Collect into a list and then sort so all entries in the same directory + // are next to each other. + // We can detect boundaries between each directories to determine + // when to construct or push directory entries. + let mut ordered_paths = paths.into_iter().collect::<Vec<_>>(); + ordered_paths.sort_by(|a, b| a.0.cmp(&b.0)); + + let mut directory_putter = directory_service.as_ref().put_multiple_start(); + + // Start with an initial directory at the root. + let mut dir_stack = vec![(PathBuf::from(""), Directory::default())]; + + async fn pop_directory( + dir_stack: &mut Vec<(PathBuf, Directory)>, + directory_putter: &mut Box<dyn DirectoryPutter>, + ) -> Result<DirectoryNode, Error> { + let (path, directory) = dir_stack.pop().unwrap(); + + directory + .validate() + .map_err(|e| Error::InvalidDirectory(path.to_path_buf(), e))?; + + let dir_node = DirectoryNode { + name: path + .file_name() + .unwrap_or_default() + .as_bytes() + .to_vec() + .into(), + digest: directory.digest().into(), + size: directory.size(), + }; + + if let Some((_, parent)) = dir_stack.last_mut() { + parent.directories.push(dir_node.clone()); + } + + directory_putter.put(directory).await?; + + Ok(dir_node) + } + + fn push_directories(path: &Path, dir_stack: &mut Vec<(PathBuf, Directory)>) { + if path == dir_stack.last().unwrap().0 { + return; + } + if let Some(parent) = path.parent() { + push_directories(parent, dir_stack); + } + dir_stack.push((path.to_path_buf(), Directory::default())); + } + + for (path, node) in ordered_paths.into_iter() { + // Pop stack until the top dir is an ancestor of this entry. + loop { + let top = dir_stack.last().unwrap(); + if path.ancestors().any(|ancestor| ancestor == top.0) { + break; + } + + pop_directory(&mut dir_stack, &mut directory_putter).await?; + } + + // For directories, just ensure the directory node exists. + if let Node::Directory(_) = node { + push_directories(&path, &mut dir_stack); + continue; + } + + // Push all ancestor directories onto the stack. + push_directories(path.parent().unwrap(), &mut dir_stack); + + let top = dir_stack.last_mut().unwrap(); + debug_assert_eq!(Some(top.0.as_path()), path.parent()); + + match node { + Node::File(n) => top.1.files.push(n), + Node::Symlink(n) => top.1.symlinks.push(n), + // We already handled directories above. + Node::Directory(_) => unreachable!(), + } + } + + let mut root_node = None; + while !dir_stack.is_empty() { + // If the root directory only has 1 directory entry, we return the child entry + // instead... weeeee + if dir_stack.len() == 1 && dir_stack.last().unwrap().1.directories.len() == 1 { + break; + } + root_node = Some(pop_directory(&mut dir_stack, &mut directory_putter).await?); + } + let root_node = root_node.expect("no root node"); + + let root_digest = directory_putter.close().await?; + + debug_assert_eq!(root_digest.as_slice(), &root_node.digest); + + Ok(Node::Directory(root_node)) +} diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs index 15dd0664deaa..18c71aa235b8 100644 --- a/tvix/castore/src/import/error.rs +++ b/tvix/castore/src/import/error.rs @@ -1,6 +1,6 @@ use std::{fs::FileType, path::PathBuf}; -use crate::Error as CastoreError; +use crate::{proto::ValidateDirectoryError, Error as CastoreError}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -19,8 +19,17 @@ pub enum Error { #[error("unable to read {0}: {1}")] UnableToRead(PathBuf, std::io::Error), + #[error("error reading from archive: {0}")] + Archive(std::io::Error), + #[error("unsupported file {0} type: {1:?}")] UnsupportedFileType(PathBuf, FileType), + + #[error("invalid directory contents {0}: {1}")] + InvalidDirectory(PathBuf, ValidateDirectoryError), + + #[error("unsupported tar entry {0} type: {1:?}")] + UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), } impl From<CastoreError> for Error { diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index ff27c0fcfd2b..c5887685bbdb 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -1,4 +1,3 @@ -//! Deals with ingesting contents into castore. //! The main library function here is [ingest_entries], receiving a stream of //! [IngestionEntry]. //! @@ -32,6 +31,7 @@ use tracing::instrument; mod error; pub use error::Error; +pub mod archive; pub mod fs; /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService]. diff --git a/tvix/glue/src/builtins/fetchers.rs b/tvix/glue/src/builtins/fetchers.rs index cbb57532f6b3..d5735b7d09a7 100644 --- a/tvix/glue/src/builtins/fetchers.rs +++ b/tvix/glue/src/builtins/fetchers.rs @@ -175,12 +175,21 @@ async fn fetch( } } - let hash = args.hash.as_ref().map(|h| h.hash()); - let store_path = Rc::clone(&state).tokio_handle.block_on(state.fetch_url( - &args.url, - &args.name, - hash.as_deref(), - ))?; + let ca = args.hash; + let store_path = Rc::clone(&state).tokio_handle.block_on(async move { + match mode { + FetchMode::Url => { + state + .fetch_url( + &args.url, + &args.name, + ca.as_ref().map(|c| c.hash().into_owned()).as_ref(), + ) + .await + } + FetchMode::Tarball => state.fetch_tarball(&args.url, &args.name, ca).await, + } + })?; Ok(string_from_store_path(store_path.as_ref()).into()) } diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs index 639095c459e0..df3d2178696d 100644 --- a/tvix/glue/src/builtins/import.rs +++ b/tvix/glue/src/builtins/import.rs @@ -205,7 +205,7 @@ mod import_builtins { }; let obtained_hash = ca.hash().clone().into_owned(); - let (path_info, output_path) = state.tokio_handle.block_on(async { + let (path_info, _hash, output_path) = state.tokio_handle.block_on(async { state .node_to_path_info(name.as_ref(), path.as_ref(), ca, root_node) .await diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index f0f2f5cf918b..8f44d2fe834d 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -15,9 +15,11 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio_util::io::SyncIoBridge; +use tokio::io::AsyncBufRead; +use tokio_util::io::{InspectReader, SyncIoBridge}; use tracing::{error, instrument, warn, Level}; use tvix_build::buildservice::BuildService; +use tvix_castore::import::archive::ingest_archive; use tvix_castore::import::fs::dir_entry_iter_to_ingestion_stream; use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; use tvix_store::utils::AsyncIoBridge; @@ -32,6 +34,7 @@ use tvix_castore::{ use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; use crate::builtins::FetcherError; +use crate::decompression::DecompressedReader; use crate::known_paths::KnownPaths; use crate::tvix_build::derivation_to_build_request; @@ -298,7 +301,7 @@ impl TvixStoreIO { path: &Path, ca: CAHash, root_node: Node, - ) -> io::Result<(PathInfo, StorePath)> { + ) -> io::Result<(PathInfo, NixHash, StorePath)> { // Ask the PathInfoService for the NAR size and sha256 // We always need it no matter what is the actual hash mode // because the path info construct a narinfo which *always* @@ -327,7 +330,11 @@ impl TvixStoreIO { let path_info = tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, Some(ca), root_node); - Ok((path_info, output_path.to_owned())) + Ok(( + path_info, + NixHash::Sha256(nar_sha256), + output_path.to_owned(), + )) } pub(crate) async fn register_node_in_path_info_service( @@ -337,7 +344,7 @@ impl TvixStoreIO { ca: CAHash, root_node: Node, ) -> io::Result<StorePath> { - let (path_info, output_path) = self.node_to_path_info(name, path, ca, root_node).await?; + let (path_info, _, output_path) = self.node_to_path_info(name, path, ca, root_node).await?; let _path_info = self.path_info_service.as_ref().put(path_info).await?; Ok(output_path) @@ -372,33 +379,34 @@ impl TvixStoreIO { .is_some()) } - pub async fn fetch_url( - &self, - url: &str, - name: &str, - hash: Option<&NixHash>, - ) -> Result<StorePath, ErrorKind> { + async fn download<'a>(&self, url: &str) -> Result<impl AsyncBufRead + Unpin + 'a, ErrorKind> { let resp = self .http_client .get(url) .send() .await .map_err(FetcherError::from)?; - let mut sha = Sha256::new(); - let mut data = tokio_util::io::StreamReader::new( - resp.bytes_stream() - .inspect_ok(|data| { - sha.update(data); - }) - .map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) - }), - ); + Ok(tokio_util::io::StreamReader::new( + resp.bytes_stream().map_err(|e| { + let e = e.without_url(); + warn!(%e, "failed to get response body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + }), + )) + } + pub async fn fetch_url( + &self, + url: &str, + name: &str, + hash: Option<&NixHash>, + ) -> Result<StorePath, ErrorKind> { + let mut sha = Sha256::new(); + let data = self.download(url).await?; + let mut data = InspectReader::new(data, |b| sha.update(b)); let mut blob = self.blob_service.open_write().await; let size = tokio::io::copy(&mut data, blob.as_mut()).await?; + drop(data); let blob_digest = blob.close().await?; let got = NixHash::Sha256(sha.finalize().into()); @@ -453,6 +461,47 @@ impl TvixStoreIO { Ok(path.to_owned()) } + + pub async fn fetch_tarball( + &self, + url: &str, + name: &str, + ca: Option<CAHash>, + ) -> Result<StorePath, ErrorKind> { + let data = self.download(url).await?; + let data = DecompressedReader::new(data); + let archive = tokio_tar::Archive::new(data); + let node = ingest_archive(&self.blob_service, &self.directory_service, archive) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + let (path_info, got, output_path) = self + .node_to_path_info( + name, + Path::new(""), + ca.clone().expect("TODO: support unspecified CA hash"), + node, + ) + .await?; + + if let Some(wanted) = &ca { + if *wanted.hash() != got { + return Err(FetcherError::HashMismatch { + url: url.to_owned(), + wanted: wanted.hash().into_owned(), + got, + } + .into()); + } + } + + self.path_info_service + .put(path_info) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + Ok(output_path) + } } impl EvalIO for TvixStoreIO { diff --git a/users/picnoir/tvix-daemon/Cargo.nix b/users/picnoir/tvix-daemon/Cargo.nix index d73a65c82121..2382027f9b13 100644 --- a/users/picnoir/tvix-daemon/Cargo.nix +++ b/users/picnoir/tvix-daemon/Cargo.nix @@ -2366,8 +2366,7 @@ rec { } ]; features = { - "async" = [ "futures-util" ]; - "futures-util" = [ "dep:futures-util" ]; + "async" = [ "tokio" ]; "pin-project-lite" = [ "dep:pin-project-lite" ]; "tokio" = [ "dep:tokio" ]; "wire" = [ "tokio" "pin-project-lite" ]; |