diff options
Diffstat (limited to 'tvix/glue')
-rw-r--r-- | tvix/glue/src/builtins/fetchers.rs | 21 | ||||
-rw-r--r-- | tvix/glue/src/builtins/import.rs | 2 | ||||
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 93 |
3 files changed, 87 insertions, 29 deletions
diff --git a/tvix/glue/src/builtins/fetchers.rs b/tvix/glue/src/builtins/fetchers.rs index cbb57532f6b3..d5735b7d09a7 100644 --- a/tvix/glue/src/builtins/fetchers.rs +++ b/tvix/glue/src/builtins/fetchers.rs @@ -175,12 +175,21 @@ async fn fetch( } } - let hash = args.hash.as_ref().map(|h| h.hash()); - let store_path = Rc::clone(&state).tokio_handle.block_on(state.fetch_url( - &args.url, - &args.name, - hash.as_deref(), - ))?; + let ca = args.hash; + let store_path = Rc::clone(&state).tokio_handle.block_on(async move { + match mode { + FetchMode::Url => { + state + .fetch_url( + &args.url, + &args.name, + ca.as_ref().map(|c| c.hash().into_owned()).as_ref(), + ) + .await + } + FetchMode::Tarball => state.fetch_tarball(&args.url, &args.name, ca).await, + } + })?; Ok(string_from_store_path(store_path.as_ref()).into()) } diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs index 639095c459e0..df3d2178696d 100644 --- a/tvix/glue/src/builtins/import.rs +++ b/tvix/glue/src/builtins/import.rs @@ -205,7 +205,7 @@ mod import_builtins { }; let obtained_hash = ca.hash().clone().into_owned(); - let (path_info, output_path) = state.tokio_handle.block_on(async { + let (path_info, _hash, output_path) = state.tokio_handle.block_on(async { state .node_to_path_info(name.as_ref(), path.as_ref(), ca, root_node) .await diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index f0f2f5cf918b..8f44d2fe834d 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -15,9 +15,11 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio_util::io::SyncIoBridge; +use tokio::io::AsyncBufRead; +use tokio_util::io::{InspectReader, SyncIoBridge}; use tracing::{error, instrument, warn, Level}; use tvix_build::buildservice::BuildService; +use tvix_castore::import::archive::ingest_archive; use tvix_castore::import::fs::dir_entry_iter_to_ingestion_stream; use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; use tvix_store::utils::AsyncIoBridge; @@ -32,6 +34,7 @@ use tvix_castore::{ use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; use crate::builtins::FetcherError; +use crate::decompression::DecompressedReader; use crate::known_paths::KnownPaths; use crate::tvix_build::derivation_to_build_request; @@ -298,7 +301,7 @@ impl TvixStoreIO { path: &Path, ca: CAHash, root_node: Node, - ) -> io::Result<(PathInfo, StorePath)> { + ) -> io::Result<(PathInfo, NixHash, StorePath)> { // Ask the PathInfoService for the NAR size and sha256 // We always need it no matter what is the actual hash mode // because the path info construct a narinfo which *always* @@ -327,7 +330,11 @@ impl TvixStoreIO { let path_info = tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, Some(ca), root_node); - Ok((path_info, output_path.to_owned())) + Ok(( + path_info, + NixHash::Sha256(nar_sha256), + output_path.to_owned(), + )) } pub(crate) async fn register_node_in_path_info_service( @@ -337,7 +344,7 @@ impl TvixStoreIO { ca: CAHash, root_node: Node, ) -> io::Result<StorePath> { - let (path_info, output_path) = self.node_to_path_info(name, path, ca, root_node).await?; + let (path_info, _, output_path) = self.node_to_path_info(name, path, ca, root_node).await?; let _path_info = self.path_info_service.as_ref().put(path_info).await?; Ok(output_path) @@ -372,33 +379,34 @@ impl TvixStoreIO { .is_some()) } - pub async fn fetch_url( - &self, - url: &str, - name: &str, - hash: Option<&NixHash>, - ) -> Result<StorePath, ErrorKind> { + async fn download<'a>(&self, url: &str) -> Result<impl AsyncBufRead + Unpin + 'a, ErrorKind> { let resp = self .http_client .get(url) .send() .await .map_err(FetcherError::from)?; - let mut sha = Sha256::new(); - let mut data = tokio_util::io::StreamReader::new( - resp.bytes_stream() - .inspect_ok(|data| { - sha.update(data); - }) - .map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) - }), - ); + Ok(tokio_util::io::StreamReader::new( + resp.bytes_stream().map_err(|e| { + let e = e.without_url(); + warn!(%e, "failed to get response body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + }), + )) + } + pub async fn fetch_url( + &self, + url: &str, + name: &str, + hash: Option<&NixHash>, + ) -> Result<StorePath, ErrorKind> { + let mut sha = Sha256::new(); + let data = self.download(url).await?; + let mut data = InspectReader::new(data, |b| sha.update(b)); let mut blob = self.blob_service.open_write().await; let size = tokio::io::copy(&mut data, blob.as_mut()).await?; + drop(data); let blob_digest = blob.close().await?; let got = NixHash::Sha256(sha.finalize().into()); @@ -453,6 +461,47 @@ impl TvixStoreIO { Ok(path.to_owned()) } + + pub async fn fetch_tarball( + &self, + url: &str, + name: &str, + ca: Option<CAHash>, + ) -> Result<StorePath, ErrorKind> { + let data = self.download(url).await?; + let data = DecompressedReader::new(data); + let archive = tokio_tar::Archive::new(data); + let node = ingest_archive(&self.blob_service, &self.directory_service, archive) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + let (path_info, got, output_path) = self + .node_to_path_info( + name, + Path::new(""), + ca.clone().expect("TODO: support unspecified CA hash"), + node, + ) + .await?; + + if let Some(wanted) = &ca { + if *wanted.hash() != got { + return Err(FetcherError::HashMismatch { + url: url.to_owned(), + wanted: wanted.hash().into_owned(), + got, + } + .into()); + } + } + + self.path_info_service + .put(path_info) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + Ok(output_path) + } } impl EvalIO for TvixStoreIO { |