diff options
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 249 |
1 files changed, 146 insertions, 103 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 4e5488067f8f..8a11e293f0ac 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -4,9 +4,9 @@ use futures::{StreamExt, TryStreamExt}; use nix_compat::nixhash::NixHash; use nix_compat::store_path::StorePathRef; use nix_compat::{nixhash::CAHash, store_path::StorePath}; +use std::collections::BTreeMap; use std::{ cell::RefCell, - collections::BTreeSet, io, path::{Path, PathBuf}, sync::Arc, @@ -15,15 +15,13 @@ use tokio_util::io::SyncIoBridge; use tracing::{error, instrument, warn, Level, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_build::buildservice::BuildService; -use tvix_castore::proto::node::Node; use tvix_eval::{EvalIO, FileType, StdIO}; use tvix_store::nar::NarCalculationService; use tvix_castore::{ blobservice::BlobService, directoryservice::{self, DirectoryService}, - proto::NamedNode, - B3Digest, + Node, }; use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; @@ -67,7 +65,7 @@ pub struct TvixStoreIO { >, // Paths known how to produce, by building or fetching. - pub(crate) known_paths: RefCell<KnownPaths>, + pub known_paths: RefCell<KnownPaths>, } impl TvixStoreIO { @@ -107,7 +105,7 @@ impl TvixStoreIO { #[instrument(skip(self, store_path), fields(store_path=%store_path, indicatif.pb_show=1), ret(level = Level::TRACE), err)] async fn store_path_to_node( &self, - store_path: &StorePath, + store_path: &StorePath<String>, sub_path: &Path, ) -> io::Result<Option<Node>> { // Find the root node for the store_path. @@ -122,7 +120,22 @@ impl TvixStoreIO { .await? { // if we have a PathInfo, we know there will be a root_node (due to validation) - Some(path_info) => path_info.node.expect("no node").node.expect("no node"), + // TODO: use stricter typed BuildRequest here. + Some(path_info) => { + let (name, node) = path_info + .node + .expect("no node") + .into_name_and_node() + .expect("invalid node"); + + assert_eq!( + store_path.to_string().as_bytes(), + name.as_ref(), + "returned node basename must match requested store path" + ); + + node + } // If there's no PathInfo found, this normally means we have to // trigger the build (and insert into PathInfoService, after // reference scanning). @@ -185,8 +198,7 @@ impl TvixStoreIO { // derivation_to_build_request needs castore nodes for all inputs. // Provide them, which means, here is where we recursively build // all dependencies. - #[allow(clippy::mutable_key_type)] - let mut input_nodes: BTreeSet<Node> = + let mut inputs: BTreeMap<Bytes, Node> = futures::stream::iter(drv.input_derivations.iter()) .map(|(input_drv_path, output_names)| { // look up the derivation object @@ -201,7 +213,7 @@ impl TvixStoreIO { }; // convert output names to actual paths - let output_paths: Vec<StorePath> = output_names + let output_paths: Vec<StorePath<String>> = output_names .iter() .map(|output_name| { input_drv @@ -214,6 +226,7 @@ impl TvixStoreIO { .clone() }) .collect(); + // For each output, ask for the castore node. // We're in a per-derivation context, so if they're // not built yet they'll all get built together. @@ -228,7 +241,7 @@ impl TvixStoreIO { .await?; if let Some(node) = node { - Ok(node) + Ok((output_path.to_string().into(), node)) } else { Err(io::Error::other("no node produced")) } @@ -242,26 +255,29 @@ impl TvixStoreIO { .try_collect() .await?; - // add input sources // FUTUREWORK: merge these who things together - #[allow(clippy::mutable_key_type)] - let input_nodes_input_sources: BTreeSet<Node> = + // add input sources + let input_sources: BTreeMap<_, _> = futures::stream::iter(drv.input_sources.iter()) .then(|input_source| { - Box::pin(async { - let node = self - .store_path_to_node(input_source, Path::new("")) - .await?; - if let Some(node) = node { - Ok(node) - } else { - Err(io::Error::other("no node produced")) + Box::pin({ + let input_source = input_source.clone(); + async move { + let node = self + .store_path_to_node(&input_source, Path::new("")) + .await?; + if let Some(node) = node { + Ok((input_source.to_string().into(), node)) + } else { + Err(io::Error::other("no node produced")) + } } }) }) .try_collect() .await?; - input_nodes.extend(input_nodes_input_sources); + + inputs.extend(input_sources); span.pb_set_message(&format!("🔨Building {}", &store_path)); @@ -270,7 +286,7 @@ impl TvixStoreIO { // operations, so dealt with in the Some(…) match arm // synthesize the build request. - let build_request = derivation_to_build_request(&drv, input_nodes)?; + let build_request = derivation_to_build_request(&drv, inputs)?; // create a build let build_result = self @@ -280,29 +296,71 @@ impl TvixStoreIO { .await .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - // TODO: refscan + // Maps from the index in refscan_needles to the full store path + // Used to map back to the actual store path from the found needles + // Importantly, this must match the order of the needles generated in derivation_to_build_request + let refscan_needles = + crate::tvix_build::get_refscan_needles(&drv).collect::<Vec<_>>(); // For each output, insert a PathInfo. - for output in &build_result.outputs { - let root_node = output.node.as_ref().expect("invalid root node"); + for ((output, output_needles), drv_output) in build_result + .outputs + .iter() + .zip(build_result.outputs_needles.iter()) + .zip(drv.outputs.iter()) + { + let (_, output_node) = output + .clone() + .into_name_bytes_and_node() + .expect("invalid node"); + + let output_needles: Vec<_> = output_needles + .needles + .iter() + // Map each output needle index back to the refscan_needle + .map(|idx| { + refscan_needles + .get(*idx as usize) + .ok_or(std::io::Error::new( + std::io::ErrorKind::Other, + "invalid build response", + )) + }) + .collect::<Result<_, std::io::Error>>()?; // calculate the nar representation let (nar_size, nar_sha256) = self .nar_calculation_service - .calculate_nar(root_node) + .calculate_nar(&output_node) .await?; // assemble the PathInfo to persist let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(root_node.clone()), - }), - references: vec![], // TODO: refscan + node: Some(tvix_castore::proto::Node::from_name_and_node( + drv_output + .1 + .path + .as_ref() + .ok_or(std::io::Error::new( + std::io::ErrorKind::Other, + "missing output store path", + ))? + .to_string() + .into(), + output_node, + )), + references: output_needles + .iter() + .map(|path| Bytes::from(path.digest().as_slice().to_vec())) + .collect(), narinfo: Some(tvix_store::proto::NarInfo { nar_size, nar_sha256: Bytes::from(nar_sha256.to_vec()), signatures: vec![], - reference_names: vec![], // TODO: refscan + reference_names: output_needles + .iter() + .map(|path| path.to_string()) + .collect(), deriver: Some(tvix_store::proto::StorePath { name: drv_path .name() @@ -329,16 +387,17 @@ impl TvixStoreIO { } // find the output for the store path requested + let s = store_path.to_string(); + build_result .outputs .into_iter() - .find(|output_node| { - output_node.node.as_ref().expect("invalid node").get_name() - == store_path.to_string().as_bytes() + .map(|e| e.into_name_and_node().expect("invalid node")) + .find(|(output_name, _output_node)| { + output_name.as_ref() == s.as_bytes() }) .expect("build didn't produce the store path") - .node - .expect("invalid node") + .1 } } } @@ -353,13 +412,13 @@ impl TvixStoreIO { .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e)) } - pub(crate) async fn node_to_path_info( + pub(crate) async fn node_to_path_info<'a>( &self, - name: &str, + name: &'a str, path: &Path, ca: &CAHash, root_node: Node, - ) -> io::Result<(PathInfo, NixHash, StorePath)> { + ) -> io::Result<(PathInfo, NixHash, StorePathRef<'a>)> { // Ask the PathInfoService for the NAR size and sha256 // We always need it no matter what is the actual hash mode // because the path info construct a narinfo which *always* @@ -381,27 +440,27 @@ impl TvixStoreIO { }, )?; - // assemble a new root_node with a name that is derived from the nar hash. - let root_node = root_node.rename(output_path.to_string().into_bytes().into()); - tvix_store::import::log_node(&root_node, path); + tvix_store::import::log_node(name.as_bytes(), &root_node, path); - let path_info = - tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, Some(ca), root_node); + // construct a PathInfo + let path_info = tvix_store::import::derive_nar_ca_path_info( + nar_size, + nar_sha256, + Some(ca), + output_path.to_string().into(), + root_node, + ); - Ok(( - path_info, - NixHash::Sha256(nar_sha256), - output_path.to_owned(), - )) + Ok((path_info, NixHash::Sha256(nar_sha256), output_path)) } - pub(crate) async fn register_node_in_path_info_service( + pub(crate) async fn register_node_in_path_info_service<'a>( &self, - name: &str, + name: &'a str, path: &Path, ca: &CAHash, root_node: Node, - ) -> io::Result<StorePath> { + ) -> io::Result<StorePathRef<'a>> { let (path_info, _, output_path) = self.node_to_path_info(name, path, ca, root_node).await?; let _path_info = self.path_info_service.as_ref().put(path_info).await?; @@ -426,7 +485,7 @@ impl EvalIO for TvixStoreIO { { if self .tokio_handle - .block_on(self.store_path_to_node(&store_path, &sub_path))? + .block_on(self.store_path_to_node(&store_path, sub_path))? .is_some() { Ok(true) @@ -448,30 +507,18 @@ impl EvalIO for TvixStoreIO { { if let Some(node) = self .tokio_handle - .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })? + .block_on(async { self.store_path_to_node(&store_path, sub_path).await })? { // depending on the node type, treat open differently match node { - Node::Directory(_) => { + Node::Directory { .. } => { // This would normally be a io::ErrorKind::IsADirectory (still unstable) Err(io::Error::new( io::ErrorKind::Unsupported, format!("tried to open directory at {:?}", path), )) } - Node::File(file_node) => { - let digest: B3Digest = - file_node.digest.clone().try_into().map_err(|_e| { - error!( - file_node = ?file_node, - "invalid digest" - ); - io::Error::new( - io::ErrorKind::InvalidData, - format!("invalid digest length in file node: {:?}", file_node), - ) - })?; - + Node::File { digest, .. } => { self.tokio_handle.block_on(async { let resp = self.blob_service.as_ref().open_read(&digest).await?; match resp { @@ -493,7 +540,7 @@ impl EvalIO for TvixStoreIO { } }) } - Node::Symlink(_symlink_node) => Err(io::Error::new( + Node::Symlink { .. } => Err(io::Error::new( io::ErrorKind::Unsupported, "open for symlinks is unsupported", ))?, @@ -516,12 +563,12 @@ impl EvalIO for TvixStoreIO { { if let Some(node) = self .tokio_handle - .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })? + .block_on(async { self.store_path_to_node(&store_path, sub_path).await })? { match node { - Node::Directory(_) => Ok(FileType::Directory), - Node::File(_) => Ok(FileType::Regular), - Node::Symlink(_) => Ok(FileType::Symlink), + Node::Directory { .. } => Ok(FileType::Directory), + Node::File { .. } => Ok(FileType::Regular), + Node::Symlink { .. } => Ok(FileType::Symlink), } } else { self.std_io.file_type(path) @@ -538,31 +585,21 @@ impl EvalIO for TvixStoreIO { { if let Some(node) = self .tokio_handle - .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })? + .block_on(async { self.store_path_to_node(&store_path, sub_path).await })? { match node { - Node::Directory(directory_node) => { + Node::Directory { digest, .. } => { // fetch the Directory itself. - let digest: B3Digest = - directory_node.digest.clone().try_into().map_err(|_e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!( - "invalid digest length in directory node: {:?}", - directory_node - ), - ) - })?; - - if let Some(directory) = self.tokio_handle.block_on(async { - self.directory_service.as_ref().get(&digest).await + if let Some(directory) = self.tokio_handle.block_on({ + let digest = digest.clone(); + async move { self.directory_service.as_ref().get(&digest).await } })? { let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new(); - for node in directory.nodes() { + for (name, node) in directory.into_nodes() { children.push(match node { - Node::Directory(e) => (e.name, FileType::Directory), - Node::File(e) => (e.name, FileType::Regular), - Node::Symlink(e) => (e.name, FileType::Symlink), + Node::Directory { .. } => (name.into(), FileType::Directory), + Node::File { .. } => (name.clone().into(), FileType::Regular), + Node::Symlink { .. } => (name.into(), FileType::Symlink), }) } Ok(children) @@ -579,14 +616,14 @@ impl EvalIO for TvixStoreIO { ))? } } - Node::File(_file_node) => { + Node::File { .. } => { // This would normally be a io::ErrorKind::NotADirectory (still unstable) Err(io::Error::new( io::ErrorKind::Unsupported, "tried to readdir path {:?}, which is a file", ))? } - Node::Symlink(_symlink_node) => Err(io::Error::new( + Node::Symlink { .. } => Err(io::Error::new( io::ErrorKind::Unsupported, "read_dir for symlinks is unsupported", ))?, @@ -627,10 +664,11 @@ mod tests { use std::{path::Path, rc::Rc, sync::Arc}; use bstr::ByteSlice; + use clap::Parser; use tempfile::TempDir; use tvix_build::buildservice::DummyBuildService; use tvix_eval::{EvalIO, EvaluationResult}; - use tvix_store::utils::construct_services; + use tvix_store::utils::{construct_services, ServiceUrlsMemory}; use super::TvixStoreIO; use crate::builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins}; @@ -642,22 +680,27 @@ mod tests { let tokio_runtime = tokio::runtime::Runtime::new().unwrap(); let (blob_service, directory_service, path_info_service, nar_calculation_service) = tokio_runtime - .block_on(async { construct_services("memory://", "memory://", "memory://").await }) + .block_on(async { + construct_services(ServiceUrlsMemory::parse_from(std::iter::empty::<&str>())) + .await + }) .unwrap(); let io = Rc::new(TvixStoreIO::new( blob_service, directory_service, - path_info_service.into(), + path_info_service, nar_calculation_service.into(), Arc::<DummyBuildService>::default(), tokio_runtime.handle().clone(), )); - let mut eval = tvix_eval::Evaluation::new(io.clone() as Rc<dyn EvalIO>, true); - add_derivation_builtins(&mut eval, io.clone()); - add_fetcher_builtins(&mut eval, io.clone()); - add_import_builtins(&mut eval, io); + let mut eval_builder = + tvix_eval::Evaluation::builder(io.clone() as Rc<dyn EvalIO>).enable_import(); + eval_builder = add_derivation_builtins(eval_builder, Rc::clone(&io)); + eval_builder = add_fetcher_builtins(eval_builder, Rc::clone(&io)); + eval_builder = add_import_builtins(eval_builder, io); + let eval = eval_builder.build(); // run the evaluation itself. eval.evaluate(str, None) |