diff options
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 192 |
1 files changed, 180 insertions, 12 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 330a7d3f9be8..a64e2d4c7bf0 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -1,8 +1,15 @@ //! This module provides an implementation of EvalIO talking to tvix-store. -use nix_compat::store_path::StorePath; +use async_recursion::async_recursion; +use bytes::Bytes; +use futures::{StreamExt, TryStreamExt}; +use nix_compat::{ + nixhash::CAHash, + store_path::{StorePath, StorePathRef}, +}; use std::{ cell::RefCell, + collections::BTreeSet, io, path::{Path, PathBuf}, sync::Arc, @@ -15,12 +22,13 @@ use tvix_eval::{EvalIO, FileType, StdIO}; use tvix_castore::{ blobservice::BlobService, directoryservice::{self, DirectoryService}, - proto::node::Node, + proto::{node::Node, NamedNode}, B3Digest, }; -use tvix_store::pathinfoservice::PathInfoService; +use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; use crate::known_paths::KnownPaths; +use crate::tvix_build::derivation_to_build_request; /// Implements [EvalIO], asking given [PathInfoService], [DirectoryService] /// and [BlobService]. @@ -74,12 +82,18 @@ impl TvixStoreIO { /// /// In case there is no PathInfo yet, this means we need to build it /// (which currently is stubbed out still). + #[async_recursion(?Send)] #[instrument(skip(self, store_path), fields(store_path=%store_path), ret(level = Level::TRACE), err)] async fn store_path_to_node( &self, store_path: &StorePath, sub_path: &Path, ) -> io::Result<Option<Node>> { + // Find the root node for the store_path. + // It asks the PathInfoService first, but in case there was a Derivation + // produced that would build it, fall back to triggering the build. + // To populate the input nodes, it might recursively trigger builds of + // its dependencies too. let root_node = match self .path_info_service .as_ref() @@ -88,19 +102,173 @@ impl TvixStoreIO { { // if we have a PathInfo, we know there will be a root_node (due to validation) Some(path_info) => path_info.node.expect("no node").node.expect("no node"), - // If there's no PathInfo found, we didn't build that path yet. - // and have to trigger the build (and probably insert into the - // PathInfoService (which requires refscan)) - // FUTUREWORK: We don't do builds yet, so log a warning and let - // std_io take over. - // In the future, not getting a root node means a failed build! + // If there's no PathInfo found, this normally means we have to + // trigger the build (and insert into PathInfoService, after + // reference scanning). + // However, as Tvix is (currently) not managing /nix/store itself, + // we return Ok(None) to let std_io take over. + // While reading from store paths that are not known to Tvix during + // that evaluation clearly is an impurity, we still need to support + // it for things like <nixpkgs> pointing to a store path. + // In the future, these things will (need to) have PathInfo. None => { - warn!("would trigger build, skipping"); - return Ok(None); + // The store path doesn't exist yet, so we need to build it. + warn!("triggering build"); + + // Look up the derivation for this output path. + let (drv_path, drv) = { + let known_paths = self.known_paths.borrow(); + match known_paths.get_drv_path_for_output_path(store_path) { + Some(drv_path) => ( + drv_path.to_owned(), + known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(), + ), + None => { + warn!(store_path=%store_path, "no drv found"); + // let StdIO take over + return Ok(None); + } + } + }; + + // derivation_to_build_request needs castore nodes for all inputs. + // Provide them, which means, here is where we recursively build + // all dependencies. + #[allow(clippy::mutable_key_type)] + let input_nodes: BTreeSet<Node> = + futures::stream::iter(drv.input_derivations.iter()) + .map(|(input_drv_path, output_names)| { + // since Derivation is validated, we know this can be parsed. + let input_drv_path = + StorePathRef::from_absolute_path(input_drv_path.as_bytes()) + .expect("invalid drv path") + .to_owned(); + + // look up the derivation object + let input_drv = { + let known_paths = self.known_paths.borrow(); + known_paths + .get_drv_by_drvpath(&input_drv_path) + .unwrap_or_else(|| panic!("{} not found", input_drv_path)) + .to_owned() + }; + + // convert output names to actual paths + let output_paths: Vec<StorePath> = output_names + .iter() + .map(|output_name| { + let output_path = &input_drv + .outputs + .get(output_name) + .expect("missing output_name") + .path; + + // since Derivation is validated, we this can be parsed. + StorePathRef::from_absolute_path(output_path.as_bytes()) + .expect("invalid output path") + .to_owned() + }) + .collect(); + // For each output, ask for the castore node. + // We're in a per-derivation context, so if they're + // not built yet they'll all get built together. + // If they don't need to build, we can however still + // substitute all in parallel (if they don't need to + // be built) - so we turn this into a stream of streams. + // It's up to the builder to deduplicate same build requests. + futures::stream::iter(output_paths.into_iter()).map( + |output_path| async move { + let node = self + .store_path_to_node(&output_path, Path::new("")) + .await?; + + if let Some(node) = node { + Ok(node) + } else { + Err(io::Error::other("no node produced")) + } + }, + ) + }) + .flatten() + .buffer_unordered(10) // TODO: make configurable + .try_collect() + .await?; + + // TODO: check if input sources are sufficiently dealth with, + // I think yes, they must be imported into the store by other + // operations, so dealt with in the Some(…) match arm + + // synthesize the build request. + let build_request = derivation_to_build_request(&drv, input_nodes)?; + + // create a build + let build_result = self + .build_service + .as_ref() + .do_build(build_request) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + // TODO: refscan? + + // For each output, insert a PathInfo. + for output in &build_result.outputs { + let root_node = output.node.as_ref().expect("invalid root node"); + + // calculate the nar representation + let (nar_size, nar_sha256) = + self.path_info_service.calculate_nar(root_node).await?; + + // assemble the PathInfo to persist + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node.clone()), + }), + references: vec![], // TODO: refscan + narinfo: Some(tvix_store::proto::NarInfo { + nar_size, + nar_sha256: Bytes::from(nar_sha256.to_vec()), + signatures: vec![], + reference_names: vec![], // TODO: refscan + deriver: Some(tvix_store::proto::StorePath { + name: drv_path + .name() + .strip_suffix(".drv") + .expect("missing .drv suffix") + .to_string(), + digest: drv_path.digest().to_vec().into(), + }), + ca: drv.fod_digest().map( + |fod_digest| -> tvix_store::proto::nar_info::Ca { + (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest))) + .into() + }, + ), + }), + }; + + self.path_info_service + .put(path_info) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + } + + // find the output for the store path requested + build_result + .outputs + .into_iter() + .find(|output_node| { + output_node.node.as_ref().expect("invalid node").get_name() + == store_path.to_string().as_bytes() + }) + .expect("build didn't produce the store path") + .node + .expect("invalid node") } }; - // with the root_node and sub_path, descend to the node requested. + // now with the root_node and sub_path, descend to the node requested. directoryservice::descend_to(&self.directory_service, root_node, sub_path) .await .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e)) |