From de727bccf99a1dcce2bb335e56af02f80e462dbc Mon Sep 17 00:00:00 2001 From: Aspen Smith Date: Fri, 23 Feb 2024 10:09:20 -0500 Subject: feat(tvix/glue): Implement builtins.fetchurl Implement the fetchurl builtin, and lay the groundwork for implementing the fetchTarball builtin (which works very similarly, and is implemented using almost the same code in C++ nix). An overview of how this works: 1. First, we check if the store path that *would* result from the download already exists in the store - if it does, we just return that 2. If we need to download the URL, TvixStoreIO has an `http_client: reqwest::Client` field now which we use to make the request 3. As we're downloading the blob, we hash the data incrementally into a SHA256 hasher 4. We compare the hash against the expected hash (if any) and bail out if it doesn't match 5. Finally, we put the blob in the store and return the store path Since the logic is very similar, this commit also implements a *chunk* of `fetchTarball` (though the actual implementation will likely include a refactor to some of the code reuse here). The main thing that's missing here is caching of downloaded blobs when fetchurl is called without a hash - I've opened b/381 to track the TODO there. Adding the `SSL_CERT_FILE` here is necessary to teach reqwest how to load it during tests - see 1c16dee20 (feat(tvix/store): use reqwests' rustls-native-roots feature, 2024-03-03) for more info. Change-Id: I83c4abbc7c0c3bfe92461917e23d6d3430fbf137 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11017 Tested-by: BuildkiteCI Reviewed-by: flokli Autosubmit: aspen --- tvix/glue/src/tvix_store_io.rs | 107 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 103 insertions(+), 4 deletions(-) (limited to 'tvix/glue/src/tvix_store_io.rs') diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index c09f0098e43e..30ab97c0ca03 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -4,7 +4,12 @@ use async_recursion::async_recursion; use bytes::Bytes; use futures::Stream; use futures::{StreamExt, TryStreamExt}; +use nix_compat::nixhash::NixHash; +use nix_compat::store_path::{build_ca_path, StorePathRef}; use nix_compat::{nixhash::CAHash, store_path::StorePath}; +use sha2::{Digest, Sha256}; +use std::marker::Unpin; +use std::rc::Rc; use std::{ cell::RefCell, collections::BTreeSet, @@ -15,17 +20,18 @@ use std::{ use tokio::io::AsyncReadExt; use tracing::{error, instrument, warn, Level}; use tvix_build::buildservice::BuildService; -use tvix_eval::{EvalIO, FileType, StdIO}; +use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; use walkdir::DirEntry; use tvix_castore::{ blobservice::BlobService, directoryservice::{self, DirectoryService}, - proto::{node::Node, NamedNode}, + proto::{node::Node, FileNode, NamedNode}, B3Digest, }; use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; +use crate::builtins::FetcherError; use crate::known_paths::KnownPaths; use crate::tvix_build::derivation_to_build_request; @@ -51,7 +57,8 @@ pub struct TvixStoreIO { std_io: StdIO, #[allow(dead_code)] build_service: Arc, - tokio_handle: tokio::runtime::Handle, + pub(crate) tokio_handle: tokio::runtime::Handle, + http_client: reqwest::Client, pub(crate) known_paths: RefCell, } @@ -70,6 +77,7 @@ impl TvixStoreIO { std_io: StdIO {}, build_service, tokio_handle, + http_client: reqwest::Client::new(), known_paths: Default::default(), } } @@ -278,7 +286,7 @@ impl TvixStoreIO { /// with a [`tokio::runtime::Handle::block_on`] call for synchronicity. pub(crate) fn ingest_entries_sync(&self, entries_stream: S) -> io::Result where - S: Stream + std::marker::Unpin, + S: Stream + Unpin, { self.tokio_handle.block_on(async move { tvix_castore::import::ingest_entries( @@ -346,6 +354,97 @@ impl TvixStoreIO { .await }) } + + pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result { + Ok(self + .path_info_service + .as_ref() + .get(*store_path.digest()) + .await? + .is_some()) + } + + pub async fn fetch_url( + &self, + url: &str, + name: &str, + hash: Option<&NixHash>, + ) -> Result { + let resp = self + .http_client + .get(url) + .send() + .await + .map_err(FetcherError::from)?; + let mut sha = Sha256::new(); + let mut data = tokio_util::io::StreamReader::new( + resp.bytes_stream() + .inspect_ok(|data| { + sha.update(data); + }) + .map_err(|e| { + let e = e.without_url(); + warn!(%e, "failed to get response body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + }), + ); + + let mut blob = self.blob_service.open_write().await; + let size = tokio::io::copy(&mut data, blob.as_mut()).await?; + let blob_digest = blob.close().await?; + let got = NixHash::Sha256(sha.finalize().into()); + + let hash = CAHash::Flat(if let Some(wanted) = hash { + if *wanted != got { + return Err(FetcherError::HashMismatch { + url: url.to_owned(), + wanted: wanted.clone(), + got, + } + .into()); + } + wanted.clone() + } else { + got + }); + + let path = build_ca_path(name, &hash, Vec::::new(), false) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let node = Node::File(FileNode { + name: path.to_string().into(), + digest: blob_digest.into(), + size, + executable: false, + }); + + let (nar_size, nar_sha256) = self + .path_info_service + .calculate_nar(&node) + .await + .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; + + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(node.clone()), + }), + references: vec![], + narinfo: Some(tvix_store::proto::NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + deriver: None, /* ? */ + ca: Some((&hash).into()), + }), + }; + + self.path_info_service + .put(path_info) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + Ok(path.to_owned()) + } } impl EvalIO for TvixStoreIO { -- cgit 1.4.1