diff options
Diffstat (limited to 'tvix/glue/src/builtins/import.rs')
-rw-r--r-- | tvix/glue/src/builtins/import.rs | 428 |
1 files changed, 223 insertions, 205 deletions
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs index 273be08ef7b6..83b91165c09e 100644 --- a/tvix/glue/src/builtins/import.rs +++ b/tvix/glue/src/builtins/import.rs @@ -1,8 +1,9 @@ //! Implements builtins used to import paths in the store. -use crate::builtins::errors::ImportError; +use crate::tvix_store_io::TvixStoreIO; use std::path::Path; use tvix_castore::import::ingest_entries; +use tvix_castore::Node; use tvix_eval::{ builtin_macros::builtins, generators::{self, GenCo}, @@ -16,7 +17,7 @@ async fn filtered_ingest( co: GenCo, path: &Path, filter: Option<&Value>, -) -> Result<tvix_castore::proto::node::Node, ErrorKind> { +) -> Result<Node, ErrorKind> { let mut entries: Vec<walkdir::DirEntry> = vec![]; let mut it = walkdir::WalkDir::new(path) .follow_links(false) @@ -88,10 +89,11 @@ async fn filtered_ingest( let dir_entries = entries.into_iter().rev().map(Ok); state.tokio_handle.block_on(async { - let entries = tvix_castore::import::fs::dir_entries_to_ingestion_stream( + let entries = tvix_castore::import::fs::dir_entries_to_ingestion_stream::<'_, _, _, &[u8]>( &state.blob_service, dir_entries, path, + None, // TODO re-scan ); ingest_entries(&state.directory_service, entries) .await @@ -104,174 +106,134 @@ async fn filtered_ingest( #[builtins(state = "Rc<TvixStoreIO>")] mod import_builtins { - use std::os::unix::ffi::OsStrExt; - use std::rc::Rc; - use super::*; + use crate::builtins::ImportError; use crate::tvix_store_io::TvixStoreIO; + use bstr::ByteSlice; use nix_compat::nixhash::{CAHash, NixHash}; - use nix_compat::store_path::StorePath; + use nix_compat::store_path::{build_ca_path, StorePathRef}; use sha2::Digest; + use std::rc::Rc; use tokio::io::AsyncWriteExt; - use tvix_castore::proto::node::Node; - use tvix_castore::proto::FileNode; use tvix_eval::builtins::coerce_value_to_path; use tvix_eval::generators::Gen; use tvix_eval::{generators::GenCo, ErrorKind, Value}; use tvix_eval::{FileType, NixContextElement, NixString}; + use tvix_store::path_info::PathInfo; - #[builtin("path")] - async fn builtin_path( + // This is a helper used by both builtins.path and builtins.filterSource. + async fn import_helper( state: Rc<TvixStoreIO>, co: GenCo, - args: Value, + path: std::path::PathBuf, + name: Option<&Value>, + filter: Option<&Value>, + recursive_ingestion: bool, + expected_sha256: Option<[u8; 32]>, ) -> Result<Value, ErrorKind> { - let args = args.to_attrs()?; - let path = args.select_required("path")?; - let path = - match coerce_value_to_path(&co, generators::request_force(&co, path.clone()).await) - .await? - { - Ok(path) => path, - Err(cek) => return Ok(cek.into()), - }; - let name: String = if let Some(name) = args.select("name") { - generators::request_force(&co, name.clone()) + let name: String = match name { + Some(name) => generators::request_force(&co, name.clone()) .await .to_str()? .as_bstr() - .to_string() - } else { - tvix_store::import::path_to_name(&path) + .to_string(), + None => tvix_store::import::path_to_name(&path) .expect("Failed to derive the default name out of the path") - .to_string() + .to_string(), }; - let filter = args.select("filter"); - let recursive_ingestion = args - .select("recursive") - .map(|r| r.as_bool()) - .transpose()? - .unwrap_or(true); // Yes, yes, Nix, by default, puts `recursive = true;`. - let expected_sha256 = args - .select("sha256") - .map(|h| { - h.to_str().and_then(|expected| { - let expected = expected.into_bstring().to_string(); - // TODO: ensure that we fail if this is not a valid str. - nix_compat::nixhash::from_str(&expected, None).map_err(|_err| { - // TODO: a better error would be nice, we use - // DerivationError::InvalidOutputHash usually for derivation construction. - // This is not a derivation construction, should we move it outside and - // generalize? - ErrorKind::TypeError { - expected: "sha256", - actual: "not a sha256", - } - }) - }) - }) - .transpose()?; - - // Check if the path points to a regular file. - // If it does, the filter function is never executed. - // TODO: follow symlinks and check their type instead - let (root_node, ca_hash) = match state.file_type(path.as_ref())? { + // As a first step, we ingest the contents, and get back a root node, + // and optionally the sha256 a flat file. + let (root_node, ca) = match std::fs::metadata(&path)?.file_type().into() { + // Check if the path points to a regular file. + // If it does, the filter function is never executed, and we copy to the blobservice directly. + // If recursive is false, we need to calculate the sha256 digest of the raw contents, + // as that affects the output path calculation. FileType::Regular => { - let mut file = state.open(path.as_ref())?; - // This is a single file, copy it to the blobservice directly. - let mut hash = sha2::Sha256::new(); + let mut file = state.open(&path)?; + + let mut flat_sha256 = (!recursive_ingestion).then(sha2::Sha256::new); let mut blob_size = 0; + let mut blob_writer = state .tokio_handle .block_on(async { state.blob_service.open_write().await }); - let mut buf = [0u8; 4096]; + // read piece by piece and write to blob_writer. + // This is a bit manual due to EvalIO being sync, while everything else async. + { + let mut buf = [0u8; 4096]; - loop { - // read bytes into buffer, break out if EOF - let len = file.read(&mut buf)?; - if len == 0 { - break; - } - blob_size += len as u64; + loop { + // read bytes into buffer, break out if EOF + let len = file.read(&mut buf)?; + if len == 0 { + break; + } + blob_size += len as u64; - let data = &buf[0..len]; + let data = &buf[0..len]; - // add to blobwriter - state - .tokio_handle - .block_on(async { blob_writer.write_all(data).await })?; + // add to blobwriter + state + .tokio_handle + .block_on(async { blob_writer.write_all(data).await })?; - // update the sha256 hash function. We can skip that if we're not using it. - if !recursive_ingestion { - hash.update(data); + // update blob_sha256 if needed. + if let Some(h) = flat_sha256.as_mut() { + h.update(data) + } } } - // close the blob writer, get back the b3 digest. - let blob_digest = state - .tokio_handle - .block_on(async { blob_writer.close().await })?; - - let root_node = Node::File(FileNode { - // The name gets set further down, while constructing the PathInfo. - name: "".into(), - digest: blob_digest.into(), - size: blob_size, - executable: false, - }); - - let ca_hash = if recursive_ingestion { - let (_nar_size, nar_sha256) = state - .tokio_handle - .block_on(async { - state - .nar_calculation_service - .as_ref() - .calculate_nar(&root_node) - .await - }) - .map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?; - CAHash::Nar(NixHash::Sha256(nar_sha256)) - } else { - CAHash::Flat(NixHash::Sha256(hash.finalize().into())) - }; - - (root_node, ca_hash) + // close the blob writer, construct the root node and the blob_sha256 (later used for output path calculation) + ( + Node::File { + digest: state + .tokio_handle + .block_on(async { blob_writer.close().await })?, + size: blob_size, + executable: false, + }, + { + // If non-recursive ingestion is requested… + if let Some(flat_sha256) = flat_sha256 { + let actual_sha256 = flat_sha256.finalize().into(); + + // compare the recorded flat hash with an upfront one if provided. + if let Some(expected_sha256) = expected_sha256 { + if actual_sha256 != expected_sha256 { + return Err(ImportError::HashMismatch( + path, + NixHash::Sha256(expected_sha256), + NixHash::Sha256(actual_sha256), + ) + .into()); + } + } + + Some(CAHash::Flat(NixHash::Sha256(actual_sha256))) + } else { + None + } + }, + ) } - FileType::Directory => { - if !recursive_ingestion { - return Err(ImportError::FlatImportOfNonFile( - path.to_string_lossy().to_string(), - ))?; - } - - // do the filtered ingest - let root_node = filtered_ingest(state.clone(), co, path.as_ref(), filter).await?; - - // calculate the NAR sha256 - let (_nar_size, nar_sha256) = state - .tokio_handle - .block_on(async { - state - .nar_calculation_service - .as_ref() - .calculate_nar(&root_node) - .await - }) - .map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?; - - let ca_hash = CAHash::Nar(NixHash::Sha256(nar_sha256)); - - (root_node, ca_hash) + FileType::Directory if !recursive_ingestion => { + return Err(ImportError::FlatImportOfNonFile(path))? } + + // do the filtered ingest + FileType::Directory => ( + filtered_ingest(state.clone(), co, path.as_ref(), filter).await?, + None, + ), FileType::Symlink => { // FUTUREWORK: Nix follows a symlink if it's at the root, // except if it's not resolve-able (NixOS/nix#7761).i return Err(tvix_eval::ErrorKind::IO { - path: Some(path.to_path_buf()), + path: Some(path), error: Rc::new(std::io::Error::new( std::io::ErrorKind::Unsupported, "builtins.path pointing to a symlink is ill-defined.", @@ -280,7 +242,7 @@ mod import_builtins { } FileType::Unknown => { return Err(tvix_eval::ErrorKind::IO { - path: Some(path.to_path_buf()), + path: Some(path), error: Rc::new(std::io::Error::new( std::io::ErrorKind::Unsupported, "unsupported file type", @@ -289,32 +251,67 @@ mod import_builtins { } }; - let (path_info, _hash, output_path) = state.tokio_handle.block_on(async { - state - .node_to_path_info(name.as_ref(), path.as_ref(), &ca_hash, root_node) - .await - })?; - - if let Some(expected_sha256) = expected_sha256 { - if *ca_hash.hash() != expected_sha256 { - Err(ImportError::HashMismatch( - path.to_string_lossy().to_string(), - expected_sha256, - ca_hash.hash().into_owned(), - ))?; + // Calculate the NAR sha256. + let (nar_size, nar_sha256) = state + .tokio_handle + .block_on(async { + state + .nar_calculation_service + .as_ref() + .calculate_nar(&root_node) + .await + }) + .map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?; + + // Calculate the CA hash for the recursive cases, this is only already + // `Some(_)` for flat ingestion. + let ca = match ca { + None => { + // If an upfront-expected NAR hash was specified, compare. + if let Some(expected_nar_sha256) = expected_sha256 { + if expected_nar_sha256 != nar_sha256 { + return Err(ImportError::HashMismatch( + path, + NixHash::Sha256(expected_nar_sha256), + NixHash::Sha256(nar_sha256), + ) + .into()); + } + } + CAHash::Nar(NixHash::Sha256(nar_sha256)) } - } + Some(ca) => ca, + }; + + let store_path = build_ca_path(&name, &ca, Vec::<&str>::new(), false) + .map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?; - state + let path_info = state .tokio_handle - .block_on(async { state.path_info_service.as_ref().put(path_info).await }) + .block_on(async { + state + .path_info_service + .as_ref() + .put(PathInfo { + store_path, + node: root_node, + // There's no reference scanning on path contents ingested like this. + references: vec![], + nar_size, + nar_sha256, + signatures: vec![], + deriver: None, + ca: Some(ca), + }) + .await + }) .map_err(|e| tvix_eval::ErrorKind::IO { - path: Some(path.to_path_buf()), + path: Some(path), error: Rc::new(e.into()), })?; // We need to attach context to the final output path. - let outpath = output_path.to_absolute_path(); + let outpath = path_info.store_path.to_absolute_path(); Ok( NixString::new_context_from(NixContextElement::Plain(outpath.clone()).into(), outpath) @@ -322,45 +319,72 @@ mod import_builtins { ) } - #[builtin("filterSource")] - async fn builtin_filter_source( + #[builtin("path")] + async fn builtin_path( state: Rc<TvixStoreIO>, co: GenCo, - #[lazy] filter: Value, - path: Value, + args: Value, ) -> Result<Value, ErrorKind> { - let p = path.to_path()?; - let root_node = filtered_ingest(Rc::clone(&state), co, &p, Some(&filter)).await?; - let name = tvix_store::import::path_to_name(&p)?; + let args = args.to_attrs()?; - let outpath = state - .tokio_handle - .block_on(async { - let (_, nar_sha256) = state - .nar_calculation_service - .as_ref() - .calculate_nar(&root_node) - .await?; + let path = match coerce_value_to_path( + &co, + generators::request_force(&co, args.select_required("path")?.clone()).await, + ) + .await? + { + Ok(path) => path, + Err(cek) => return Ok(cek.into()), + }; - state - .register_node_in_path_info_service( - name, - &p, - &CAHash::Nar(NixHash::Sha256(nar_sha256)), - root_node, - ) - .await + let filter = args.select("filter"); + + // Construct a sha256 hasher, which is needed for flat ingestion. + let recursive_ingestion = args + .select("recursive") + .map(|r| r.as_bool()) + .transpose()? + .unwrap_or(true); // Yes, yes, Nix, by default, puts `recursive = true;`. + + let expected_sha256 = args + .select("sha256") + .map(|h| { + h.to_str().and_then(|expected| { + match nix_compat::nixhash::from_str(expected.to_str()?, Some("sha256")) { + Ok(NixHash::Sha256(digest)) => Ok(digest), + Ok(_) => unreachable!(), + Err(e) => Err(ErrorKind::InvalidHash(e.to_string())), + } + }) }) - .map_err(|err| ErrorKind::IO { - path: Some(p.to_path_buf()), - error: err.into(), - })? - .to_absolute_path(); + .transpose()?; - Ok( - NixString::new_context_from(NixContextElement::Plain(outpath.clone()).into(), outpath) - .into(), + import_helper( + state, + co, + path, + args.select("name"), + filter, + recursive_ingestion, + expected_sha256, ) + .await + } + + #[builtin("filterSource")] + async fn builtin_filter_source( + state: Rc<TvixStoreIO>, + co: GenCo, + #[lazy] filter: Value, + path: Value, + ) -> Result<Value, ErrorKind> { + let path = + match coerce_value_to_path(&co, generators::request_force(&co, path).await).await? { + Ok(path) => path, + Err(cek) => return Ok(cek.into()), + }; + + import_helper(state, co, path, None, Some(&filter), true, None).await } #[builtin("storePath")] @@ -369,39 +393,33 @@ mod import_builtins { co: GenCo, path: Value, ) -> Result<Value, ErrorKind> { - let p = std::str::from_utf8(match &path { - Value::String(s) => s.as_bytes(), - Value::Path(p) => p.as_os_str().as_bytes(), + let p = match &path { + Value::String(s) => Path::new(s.as_bytes().to_os_str()?), + Value::Path(p) => p.as_path(), _ => { return Err(ErrorKind::TypeError { expected: "string or path", actual: path.type_of(), }) } - })?; - - let path_exists = if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(p) - { - if !sub_path.as_os_str().is_empty() { - false - } else { - state.store_path_exists(store_path.as_ref()).await? - } - } else { - false }; - if !path_exists { - return Err(ImportError::PathNotInStore(p.into()).into()); - } + // For this builtin, the path needs to start with an absolute store path. + let (store_path, _sub_path) = StorePathRef::from_absolute_path_full(p) + .map_err(|_e| ImportError::PathNotAbsoluteOrInvalid(p.to_path_buf()))?; - Ok(Value::String(NixString::new_context_from( - [NixContextElement::Plain(p.into())].into(), - p, - ))) + if state.path_exists(p)? { + Ok(Value::String(NixString::new_context_from( + [NixContextElement::Plain(store_path.to_absolute_path())].into(), + p.as_os_str().as_encoded_bytes(), + ))) + } else { + Err(ErrorKind::IO { + path: Some(p.to_path_buf()), + error: Rc::new(std::io::ErrorKind::NotFound.into()), + }) + } } } pub use import_builtins::builtins as import_builtins; - -use crate::tvix_store_io::TvixStoreIO; |