From cecb5e295a7ca1d1c8eea273afc9a03434b78cf8 Mon Sep 17 00:00:00 2001 From: Ryan Lahfa Date: Tue, 9 Jan 2024 00:16:52 +0100 Subject: feat(tvix/eval): implement `builtins.path` Now, it supports almost everything except `recursive = false;`, i.e. `flat`-ingestion because we have no knob exposed in the tvix store import side to do it. This has been tested to work. Change-Id: I2e9da10ceccdfbf45b43c532077ed45d6306aa98 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10597 Tested-by: BuildkiteCI Autosubmit: raitobezarius Reviewed-by: flokli --- tvix/glue/src/builtins/errors.rs | 16 ++++ tvix/glue/src/builtins/import.rs | 122 ++++++++++++++++++++++++++- tvix/glue/src/builtins/mod.rs | 178 ++++++++++++++++++++++++++++++++++++++- tvix/glue/src/tvix_store_io.rs | 21 +++++ tvix/store/src/nar/renderer.rs | 31 +------ tvix/store/src/utils.rs | 30 +++++++ 6 files changed, 367 insertions(+), 31 deletions(-) (limited to 'tvix') diff --git a/tvix/glue/src/builtins/errors.rs b/tvix/glue/src/builtins/errors.rs index c753a125e030..53351cf902e7 100644 --- a/tvix/glue/src/builtins/errors.rs +++ b/tvix/glue/src/builtins/errors.rs @@ -53,3 +53,19 @@ impl From for tvix_eval::ErrorKind { tvix_eval::ErrorKind::TvixError(Rc::new(err)) } } + +/// Errors related to `builtins.path` and `builtins.filterSource`, +/// a.k.a. "importing" builtins. +#[derive(Debug, Error)] +pub enum ImportError { + #[error("non-file '{0}' cannot be imported in 'flat' mode")] + FlatImportOfNonFile(String), + #[error("hash mismatch at ingestion of '{0}', expected: '{1}', got: '{2}'")] + HashMismatch(String, NixHash, NixHash), +} + +impl From for tvix_eval::ErrorKind { + fn from(err: ImportError) -> Self { + tvix_eval::ErrorKind::TvixError(Rc::new(err)) + } +} diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs index 8c9efc679171..fd791b8091fd 100644 --- a/tvix/glue/src/builtins/import.rs +++ b/tvix/glue/src/builtins/import.rs @@ -1,11 +1,12 @@ //! Implements builtins used to import paths in the store. +use crate::builtins::errors::ImportError; use futures::pin_mut; use std::path::Path; use tvix_eval::{ builtin_macros::builtins, generators::{self, GenCo}, - ErrorKind, Value, + ErrorKind, EvalIO, Value, }; use std::rc::Rc; @@ -123,8 +124,127 @@ mod import_builtins { use tvix_eval::generators::Gen; use tvix_eval::{generators::GenCo, ErrorKind, Value}; + use tvix_castore::B3Digest; + use crate::tvix_store_io::TvixStoreIO; + #[builtin("path")] + async fn builtin_path( + state: Rc, + co: GenCo, + args: Value, + ) -> Result { + let args = args.to_attrs()?; + let path = args.select_required("path")?; + let path = generators::request_force(&co, path.clone()) + .await + .to_path()?; + let name: String = if let Some(name) = args.select("name") { + generators::request_force(&co, name.clone()) + .await + .to_str()? + .as_bstr() + .to_string() + } else { + tvix_store::import::path_to_name(&path) + .expect("Failed to derive the default name out of the path") + .to_string() + }; + let filter = args.select("filter"); + let recursive_ingestion = args + .select("recursive") + .map(|r| r.as_bool()) + .transpose()? + .unwrap_or(true); // Yes, yes, Nix, by default, puts `recursive = true;`. + let expected_sha256 = args + .select("sha256") + .map(|h| { + h.to_str().and_then(|expected| { + let expected = expected.into_bstring().to_string(); + // TODO: ensure that we fail if this is not a valid str. + nix_compat::nixhash::from_str(&expected, None).map_err(|_err| { + // TODO: a better error would be nice, we use + // DerivationError::InvalidOutputHash usually for derivation construction. + // This is not a derivation construction, should we move it outside and + // generalize? + ErrorKind::TypeError { + expected: "sha256", + actual: "not a sha256", + } + }) + }) + }) + .transpose()?; + + // FUTUREWORK(performance): this reads the file instead of using a stat-like + // system call to the file, this degrades very badly on large files. + if !recursive_ingestion && state.read_to_end(path.as_ref()).is_err() { + Err(ImportError::FlatImportOfNonFile( + path.to_string_lossy().to_string(), + ))?; + } + + let root_node = filtered_ingest(state.clone(), co, path.as_ref(), filter).await?; + let ca: CAHash = if recursive_ingestion { + CAHash::Nar(NixHash::Sha256(state.tokio_handle.block_on(async { + Ok::<_, tvix_eval::ErrorKind>( + state + .path_info_service + .as_ref() + .calculate_nar(&root_node) + .await + .map_err(|e| ErrorKind::TvixError(Rc::new(e)))? + .1, + ) + })?)) + } else { + let digest: B3Digest = match root_node { + tvix_castore::proto::node::Node::File(ref fnode) => { + // It's already validated. + fnode.digest.clone().try_into().unwrap() + } + // We cannot hash anything else than file in flat import mode. + _ => { + return Err(ImportError::FlatImportOfNonFile( + path.to_string_lossy().to_string(), + ) + .into()) + } + }; + + // FUTUREWORK: avoid hashing again. + CAHash::Flat(NixHash::Sha256( + state + .tokio_handle + .block_on(async { state.blob_to_sha256_hash(digest).await })?, + )) + }; + + let obtained_hash = ca.hash().clone().into_owned(); + let (path_info, output_path) = state.tokio_handle.block_on(async { + state + .node_to_path_info(name.as_ref(), path.as_ref(), ca, root_node) + .await + })?; + + if let Some(expected_sha256) = expected_sha256 { + if obtained_hash != expected_sha256 { + Err(ImportError::HashMismatch( + path.to_string_lossy().to_string(), + expected_sha256, + obtained_hash, + ))?; + } + } + + let _: tvix_store::proto::PathInfo = state.tokio_handle.block_on(async { + // This is necessary to cause the coercion of the error type. + Ok::<_, std::io::Error>(state.path_info_service.as_ref().put(path_info).await?) + })?; + + Ok(output_path.to_absolute_path().into()) + } + #[builtin("filterSource")] async fn builtin_filter_source( state: Rc, diff --git a/tvix/glue/src/builtins/mod.rs b/tvix/glue/src/builtins/mod.rs index 138a52633da8..7cfe9f90a621 100644 --- a/tvix/glue/src/builtins/mod.rs +++ b/tvix/glue/src/builtins/mod.rs @@ -10,7 +10,7 @@ mod fetchers; mod import; mod utils; -pub use errors::{DerivationError, FetcherError}; +pub use errors::{DerivationError, FetcherError, ImportError}; /// Adds derivation-related builtins to the passed [tvix_eval::Evaluation]. /// @@ -438,6 +438,182 @@ mod tests { assert!(eval_result.errors.is_empty(), "errors should be empty"); } + // Space is an illegal character. + #[test_case( + r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = true; })"#, + true + )] + // Space is still an illegal character. + #[test_case( + r#"(builtins.path { name = "invalid name"; path = @fixtures + "/te st"; recursive = true; })"#, + false + )] + fn builtins_path_recursive_rename(code: &str, success: bool) { + // populate the fixtures dir + let temp = TempDir::new().expect("create temporary directory"); + let p = temp.path().join("import_fixtures"); + + // create the fixtures directory. + // We produce them at runtime rather than shipping it inside the source + // tree, as git can't model certain things - like directories without any + // items. + { + fs::create_dir(&p).expect("creating import_fixtures"); + fs::write(p.join("te st"), "").expect("creating `/te st`"); + } + // replace @fixtures with the temporary path containing the fixtures + let code_replaced = code.replace("@fixtures", &p.to_string_lossy()); + + let eval_result = eval(&code_replaced); + + let value = eval_result.value; + + if success { + match value.expect("expected successful evaluation on legal rename") { + tvix_eval::Value::String(s) => { + assert_eq!( + "/nix/store/nd5z11x7zjqqz44rkbhc6v7yifdkn659-valid-name", + s.as_bstr() + ); + } + v => panic!("unexpected value type: {:?}", v), + } + } else { + assert!(value.is_none(), "unexpected success on illegal store paths"); + } + } + + // Space is an illegal character. + #[test_case( + r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = false; })"#, + true + )] + // Space is still an illegal character. + #[test_case( + r#"(builtins.path { name = "invalid name"; path = @fixtures + "/te st"; recursive = false; })"#, + false + )] + // The non-recursive variant passes explicitly `recursive = false;` + fn builtins_path_nonrecursive_rename(code: &str, success: bool) { + // populate the fixtures dir + let temp = TempDir::new().expect("create temporary directory"); + let p = temp.path().join("import_fixtures"); + + // create the fixtures directory. + // We produce them at runtime rather than shipping it inside the source + // tree, as git can't model certain things - like directories without any + // items. + { + fs::create_dir(&p).expect("creating import_fixtures"); + fs::write(p.join("te st"), "").expect("creating `/te st`"); + } + // replace @fixtures with the temporary path containing the fixtures + let code_replaced = code.replace("@fixtures", &p.to_string_lossy()); + + let eval_result = eval(&code_replaced); + + let value = eval_result.value; + + if success { + match value.expect("expected successful evaluation on legal rename") { + tvix_eval::Value::String(s) => { + assert_eq!( + "/nix/store/il2rmfbqgs37rshr8w7x64hd4d3b4bsa-valid-name", + s.as_bstr() + ); + } + v => panic!("unexpected value type: {:?}", v), + } + } else { + assert!(value.is_none(), "unexpected success on illegal store paths"); + } + } + + #[test_case( + r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = false; sha256 = "sha256-47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU="; })"#, + true + )] + #[test_case( + r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = true; sha256 = "sha256-47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU="; })"#, + false + )] + #[test_case( + r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = true; sha256 = "sha256-d6xi4mKdjkX2JFicDIv5niSzpyI0m/Hnm8GGAIU04kY="; })"#, + true + )] + #[test_case( + r#"(builtins.path { name = "valid-name"; path = @fixtures + "/te st"; recursive = false; sha256 = "sha256-d6xi4mKdjkX2JFicDIv5niSzpyI0m/Hnm8GGAIU04kY="; })"#, + false + )] + fn builtins_path_fod_locking(code: &str, success: bool) { + // populate the fixtures dir + let temp = TempDir::new().expect("create temporary directory"); + let p = temp.path().join("import_fixtures"); + + // create the fixtures directory. + // We produce them at runtime rather than shipping it inside the source + // tree, as git can't model certain things - like directories without any + // items. + { + fs::create_dir(&p).expect("creating import_fixtures"); + fs::write(p.join("te st"), "").expect("creating `/te st`"); + } + // replace @fixtures with the temporary path containing the fixtures + let code_replaced = code.replace("@fixtures", &p.to_string_lossy()); + + let eval_result = eval(&code_replaced); + + let value = eval_result.value; + + if success { + assert!( + value.is_some(), + "expected successful evaluation on legal rename and valid FOD sha256" + ); + } else { + assert!(value.is_none(), "unexpected success on invalid FOD sha256"); + } + } + + #[test_case( + r#"(builtins.path { name = "valid-path"; path = @fixtures + "/te st dir"; filter = _: _: true; })"#, + "/nix/store/i28jmi4fwym4fw3flkrkp2mdxx50pdy0-valid-path" + )] + #[test_case( + r#"(builtins.path { name = "valid-path"; path = @fixtures + "/te st dir"; filter = _: _: false; })"#, + "/nix/store/pwza2ij9gk1fmzhbjnynmfv2mq2sgcap-valid-path" + )] + fn builtins_path_filter(code: &str, expected_outpath: &str) { + // populate the fixtures dir + let temp = TempDir::new().expect("create temporary directory"); + let p = temp.path().join("import_fixtures"); + + // create the fixtures directory. + // We produce them at runtime rather than shipping it inside the source + // tree, as git can't model certain things - like directories without any + // items. + { + fs::create_dir(&p).expect("creating import_fixtures"); + fs::create_dir(p.join("te st dir")).expect("creating `/te st dir`"); + fs::write(p.join("te st dir").join("test"), "").expect("creating `/te st dir/test`"); + } + // replace @fixtures with the temporary path containing the fixtures + let code_replaced = code.replace("@fixtures", &p.to_string_lossy()); + + let eval_result = eval(&code_replaced); + + let value = eval_result.value.expect("must succeed"); + + match value { + tvix_eval::Value::String(s) => { + assert_eq!(expected_outpath, s.as_bstr()); + } + _ => panic!("unexpected value type: {:?}", value), + } + + assert!(eval_result.errors.is_empty(), "errors should be empty"); + } + // All tests filter out some unsupported (not representable in castore) nodes, confirming // invalid, but filtered-out nodes don't prevent ingestion of a path. #[cfg(target_family = "unix")] diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index bfdab0846165..7b675bfc7d88 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -21,6 +21,7 @@ use tokio::io::AsyncReadExt; use tracing::{error, instrument, warn, Level}; use tvix_build::buildservice::BuildService; use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; +use tvix_store::utils::AsyncIoBridge; use walkdir::DirEntry; use tvix_castore::{ @@ -341,6 +342,26 @@ impl TvixStoreIO { Ok(output_path) } + /// Transforms a BLAKE-3 digest into a SHA256 digest + /// by re-hashing the whole file. + pub(crate) async fn blob_to_sha256_hash(&self, blob_digest: B3Digest) -> io::Result<[u8; 32]> { + let mut reader = self + .blob_service + .open_read(&blob_digest) + .await? + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + format!("blob represented by digest: '{}' not found", blob_digest), + ) + })?; + // It is fine to use `AsyncIoBridge` here because hashing is not actually I/O. + let mut hasher = AsyncIoBridge(Sha256::new()); + + tokio::io::copy(&mut reader, &mut hasher).await?; + Ok(hasher.0.finalize().into()) + } + pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result { Ok(self .path_info_service diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 9ac363ff578d..36d184f3b6c5 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -1,12 +1,10 @@ +use crate::utils::AsyncIoBridge; + use super::RenderError; use async_recursion::async_recursion; use count_write::CountWrite; use nix_compat::nar::writer::r#async as nar_writer; use sha2::{Digest, Sha256}; -use std::{ - pin::Pin, - task::{self, Poll}, -}; use tokio::io::{self, AsyncWrite, BufReader}; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use tvix_castore::{ @@ -42,31 +40,6 @@ where Ok((cw.count(), h.finalize().into())) } -/// The inverse of [tokio_util::io::SyncIoBridge]. -/// Don't use this with anything that actually does blocking I/O. -struct AsyncIoBridge(T); - -impl AsyncWrite for AsyncIoBridge { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, - buf: &[u8], - ) -> Poll> { - Poll::Ready(self.get_mut().0.write(buf)) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { - Poll::Ready(self.get_mut().0.flush()) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } -} - /// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, /// and uses the passed blob_service and directory_service to perform the /// necessary lookups as it traverses the structure. diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index 041a9e683d59..0b171377bde1 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -1,4 +1,9 @@ use std::sync::Arc; +use std::{ + pin::Pin, + task::{self, Poll}, +}; +use tokio::io::{self, AsyncWrite}; use tvix_castore::{ blobservice::{self, BlobService}, @@ -33,3 +38,28 @@ pub async fn construct_services( Ok((blob_service, directory_service, path_info_service)) } + +/// The inverse of [tokio_util::io::SyncIoBridge]. +/// Don't use this with anything that actually does blocking I/O. +pub struct AsyncIoBridge(pub T); + +impl AsyncWrite for AsyncIoBridge { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll> { + Poll::Ready(self.get_mut().0.write(buf)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { + Poll::Ready(self.get_mut().0.flush()) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } +} -- cgit 1.4.1