diff options
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 519 |
1 files changed, 236 insertions, 283 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 10a5902785..7b8ef3ff0a 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -1,15 +1,11 @@ //! This module provides an implementation of EvalIO talking to tvix-store. -use async_recursion::async_recursion; use bytes::Bytes; -use futures::Stream; use futures::{StreamExt, TryStreamExt}; use nix_compat::nixhash::NixHash; -use nix_compat::store_path::{build_ca_path, StorePathRef}; +use nix_compat::store_path::StorePathRef; use nix_compat::{nixhash::CAHash, store_path::StorePath}; use sha2::{Digest, Sha256}; -use std::marker::Unpin; -use std::rc::Rc; use std::{ cell::RefCell, collections::BTreeSet, @@ -18,21 +14,22 @@ use std::{ sync::Arc, }; use tokio_util::io::SyncIoBridge; -use tracing::{error, instrument, warn, Level}; +use tracing::{error, info, instrument, warn, Level}; use tvix_build::buildservice::BuildService; -use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; +use tvix_castore::proto::node::Node; +use tvix_eval::{EvalIO, FileType, StdIO}; +use tvix_store::nar::NarCalculationService; use tvix_store::utils::AsyncIoBridge; -use walkdir::DirEntry; use tvix_castore::{ blobservice::BlobService, directoryservice::{self, DirectoryService}, - proto::{node::Node, FileNode, NamedNode}, + proto::NamedNode, B3Digest, }; use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; -use crate::builtins::FetcherError; +use crate::fetchers::Fetcher; use crate::known_paths::KnownPaths; use crate::tvix_build::derivation_to_build_request; @@ -52,15 +49,26 @@ use crate::tvix_build::derivation_to_build_request; /// implementation of "Tvix Store IO" which does not necessarily bring the concept of blob service, /// directory service or path info service. pub struct TvixStoreIO { - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, - // This is public so builtins can put PathInfos directly. + // This is public so helper functions can interact with the stores directly. + pub(crate) blob_service: Arc<dyn BlobService>, + pub(crate) directory_service: Arc<dyn DirectoryService>, pub(crate) path_info_service: Arc<dyn PathInfoService>, + pub(crate) nar_calculation_service: Arc<dyn NarCalculationService>, + std_io: StdIO, #[allow(dead_code)] build_service: Arc<dyn BuildService>, pub(crate) tokio_handle: tokio::runtime::Handle, - http_client: reqwest::Client, + + #[allow(clippy::type_complexity)] + pub(crate) fetcher: Fetcher< + Arc<dyn BlobService>, + Arc<dyn DirectoryService>, + Arc<dyn PathInfoService>, + Arc<dyn NarCalculationService>, + >, + + // Paths known how to produce, by building or fetching. pub(crate) known_paths: RefCell<KnownPaths>, } @@ -69,17 +77,24 @@ impl TvixStoreIO { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, path_info_service: Arc<dyn PathInfoService>, + nar_calculation_service: Arc<dyn NarCalculationService>, build_service: Arc<dyn BuildService>, tokio_handle: tokio::runtime::Handle, ) -> Self { Self { - blob_service, - directory_service, - path_info_service, + blob_service: blob_service.clone(), + directory_service: directory_service.clone(), + path_info_service: path_info_service.clone(), + nar_calculation_service: nar_calculation_service.clone(), std_io: StdIO {}, build_service, tokio_handle, - http_client: reqwest::Client::new(), + fetcher: Fetcher::new( + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + ), known_paths: Default::default(), } } @@ -91,7 +106,6 @@ impl TvixStoreIO { /// /// In case there is no PathInfo yet, this means we need to build it /// (which currently is stubbed out still). - #[async_recursion(?Send)] #[instrument(skip(self, store_path), fields(store_path=%store_path), ret(level = Level::TRACE), err)] async fn store_path_to_node( &self, @@ -121,189 +135,212 @@ impl TvixStoreIO { // it for things like <nixpkgs> pointing to a store path. // In the future, these things will (need to) have PathInfo. None => { - // The store path doesn't exist yet, so we need to build it. - warn!("triggering build"); - - // Look up the derivation for this output path. - let (drv_path, drv) = { - let known_paths = self.known_paths.borrow(); - match known_paths.get_drv_path_for_output_path(store_path) { - Some(drv_path) => ( - drv_path.to_owned(), - known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(), - ), - None => { - warn!(store_path=%store_path, "no drv found"); - // let StdIO take over - return Ok(None); - } + // The store path doesn't exist yet, so we need to fetch or build it. + // We check for fetches first, as we might have both native + // fetchers and FODs in KnownPaths, and prefer the former. + + let maybe_fetch = self + .known_paths + .borrow() + .get_fetch_for_output_path(store_path); + + match maybe_fetch { + Some((name, fetch)) => { + info!(?fetch, "triggering lazy fetch"); + let (sp, root_node) = self + .fetcher + .ingest_and_persist(&name, fetch) + .await + .map_err(|e| { + std::io::Error::new(std::io::ErrorKind::InvalidData, e) + })?; + + debug_assert_eq!( + sp.to_string(), + store_path.to_string(), + "store path returned from fetcher should match" + ); + + root_node } - }; - - // derivation_to_build_request needs castore nodes for all inputs. - // Provide them, which means, here is where we recursively build - // all dependencies. - #[allow(clippy::mutable_key_type)] - let input_nodes: BTreeSet<Node> = - futures::stream::iter(drv.input_derivations.iter()) - .map(|(input_drv_path, output_names)| { - // look up the derivation object - let input_drv = { - let known_paths = self.known_paths.borrow(); - known_paths - .get_drv_by_drvpath(input_drv_path) - .unwrap_or_else(|| panic!("{} not found", input_drv_path)) - .to_owned() + None => { + // Look up the derivation for this output path. + let (drv_path, drv) = { + let known_paths = self.known_paths.borrow(); + match known_paths.get_drv_path_for_output_path(store_path) { + Some(drv_path) => ( + drv_path.to_owned(), + known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(), + ), + None => { + warn!(store_path=%store_path, "no drv found"); + // let StdIO take over + return Ok(None); + } + } + }; + + warn!("triggering build"); + + // derivation_to_build_request needs castore nodes for all inputs. + // Provide them, which means, here is where we recursively build + // all dependencies. + #[allow(clippy::mutable_key_type)] + let input_nodes: BTreeSet<Node> = + futures::stream::iter(drv.input_derivations.iter()) + .map(|(input_drv_path, output_names)| { + // look up the derivation object + let input_drv = { + let known_paths = self.known_paths.borrow(); + known_paths + .get_drv_by_drvpath(input_drv_path) + .unwrap_or_else(|| { + panic!("{} not found", input_drv_path) + }) + .to_owned() + }; + + // convert output names to actual paths + let output_paths: Vec<StorePath> = output_names + .iter() + .map(|output_name| { + input_drv + .outputs + .get(output_name) + .expect("missing output_name") + .path + .as_ref() + .expect("missing output path") + .clone() + }) + .collect(); + // For each output, ask for the castore node. + // We're in a per-derivation context, so if they're + // not built yet they'll all get built together. + // If they don't need to build, we can however still + // substitute all in parallel (if they don't need to + // be built) - so we turn this into a stream of streams. + // It's up to the builder to deduplicate same build requests. + futures::stream::iter(output_paths.into_iter()).map( + |output_path| async move { + let node = self + .store_path_to_node(&output_path, Path::new("")) + .await?; + + if let Some(node) = node { + Ok(node) + } else { + Err(io::Error::other("no node produced")) + } + }, + ) + }) + .flatten() + .buffer_unordered(10) // TODO: make configurable + .try_collect() + .await?; + + // TODO: check if input sources are sufficiently dealth with, + // I think yes, they must be imported into the store by other + // operations, so dealt with in the Some(…) match arm + + // synthesize the build request. + let build_request = derivation_to_build_request(&drv, input_nodes)?; + + // create a build + let build_result = self + .build_service + .as_ref() + .do_build(build_request) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + // TODO: refscan? + + // For each output, insert a PathInfo. + for output in &build_result.outputs { + let root_node = output.node.as_ref().expect("invalid root node"); + + // calculate the nar representation + let (nar_size, nar_sha256) = self + .nar_calculation_service + .calculate_nar(root_node) + .await?; + + // assemble the PathInfo to persist + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node.clone()), + }), + references: vec![], // TODO: refscan + narinfo: Some(tvix_store::proto::NarInfo { + nar_size, + nar_sha256: Bytes::from(nar_sha256.to_vec()), + signatures: vec![], + reference_names: vec![], // TODO: refscan + deriver: Some(tvix_store::proto::StorePath { + name: drv_path + .name() + .strip_suffix(".drv") + .expect("missing .drv suffix") + .to_string(), + digest: drv_path.digest().to_vec().into(), + }), + ca: drv.fod_digest().map( + |fod_digest| -> tvix_store::proto::nar_info::Ca { + (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256( + fod_digest, + ))) + .into() + }, + ), + }), }; - // convert output names to actual paths - let output_paths: Vec<StorePath> = output_names - .iter() - .map(|output_name| { - input_drv - .outputs - .get(output_name) - .expect("missing output_name") - .path - .as_ref() - .expect("missing output path") - .clone() - }) - .collect(); - // For each output, ask for the castore node. - // We're in a per-derivation context, so if they're - // not built yet they'll all get built together. - // If they don't need to build, we can however still - // substitute all in parallel (if they don't need to - // be built) - so we turn this into a stream of streams. - // It's up to the builder to deduplicate same build requests. - futures::stream::iter(output_paths.into_iter()).map( - |output_path| async move { - let node = self - .store_path_to_node(&output_path, Path::new("")) - .await?; - - if let Some(node) = node { - Ok(node) - } else { - Err(io::Error::other("no node produced")) - } - }, - ) - }) - .flatten() - .buffer_unordered(10) // TODO: make configurable - .try_collect() - .await?; - - // TODO: check if input sources are sufficiently dealth with, - // I think yes, they must be imported into the store by other - // operations, so dealt with in the Some(…) match arm - - // synthesize the build request. - let build_request = derivation_to_build_request(&drv, input_nodes)?; - - // create a build - let build_result = self - .build_service - .as_ref() - .do_build(build_request) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - // TODO: refscan? - - // For each output, insert a PathInfo. - for output in &build_result.outputs { - let root_node = output.node.as_ref().expect("invalid root node"); - - // calculate the nar representation - let (nar_size, nar_sha256) = - self.path_info_service.calculate_nar(root_node).await?; - - // assemble the PathInfo to persist - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(root_node.clone()), - }), - references: vec![], // TODO: refscan - narinfo: Some(tvix_store::proto::NarInfo { - nar_size, - nar_sha256: Bytes::from(nar_sha256.to_vec()), - signatures: vec![], - reference_names: vec![], // TODO: refscan - deriver: Some(tvix_store::proto::StorePath { - name: drv_path - .name() - .strip_suffix(".drv") - .expect("missing .drv suffix") - .to_string(), - digest: drv_path.digest().to_vec().into(), - }), - ca: drv.fod_digest().map( - |fod_digest| -> tvix_store::proto::nar_info::Ca { - (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest))) - .into() - }, - ), - }), - }; - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - } + self.path_info_service + .put(path_info) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + } - // find the output for the store path requested - build_result - .outputs - .into_iter() - .find(|output_node| { - output_node.node.as_ref().expect("invalid node").get_name() - == store_path.to_string().as_bytes() - }) - .expect("build didn't produce the store path") - .node - .expect("invalid node") + // find the output for the store path requested + build_result + .outputs + .into_iter() + .find(|output_node| { + output_node.node.as_ref().expect("invalid node").get_name() + == store_path.to_string().as_bytes() + }) + .expect("build didn't produce the store path") + .node + .expect("invalid node") + } + } } }; // now with the root_node and sub_path, descend to the node requested. + // We convert sub_path to the castore model here. + let sub_path = tvix_castore::PathBuf::from_host_path(sub_path, true)?; + directoryservice::descend_to(&self.directory_service, root_node, sub_path) .await .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e)) } - /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`], - /// passing the blob_service and directory_service that's used. - /// The error is mapped to std::io::Error for simplicity. - pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node> - where - S: Stream<Item = DirEntry> + Unpin, - { - tvix_castore::import::ingest_entries( - &self.blob_service, - &self.directory_service, - entries_stream, - ) - .await - .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err)) - } - pub(crate) async fn node_to_path_info( &self, name: &str, path: &Path, ca: CAHash, root_node: Node, - ) -> io::Result<(PathInfo, StorePath)> { + ) -> io::Result<(PathInfo, NixHash, StorePath)> { // Ask the PathInfoService for the NAR size and sha256 // We always need it no matter what is the actual hash mode // because the path info construct a narinfo which *always* // require a SHA256 of the NAR representation and the NAR size. let (nar_size, nar_sha256) = self - .path_info_service + .nar_calculation_service .as_ref() .calculate_nar(&root_node) .await?; @@ -326,7 +363,11 @@ impl TvixStoreIO { let path_info = tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, Some(ca), root_node); - Ok((path_info, output_path.to_owned())) + Ok(( + path_info, + NixHash::Sha256(nar_sha256), + output_path.to_owned(), + )) } pub(crate) async fn register_node_in_path_info_service( @@ -336,7 +377,7 @@ impl TvixStoreIO { ca: CAHash, root_node: Node, ) -> io::Result<StorePath> { - let (path_info, output_path) = self.node_to_path_info(name, path, ca, root_node).await?; + let (path_info, _, output_path) = self.node_to_path_info(name, path, ca, root_node).await?; let _path_info = self.path_info_service.as_ref().put(path_info).await?; Ok(output_path) @@ -370,88 +411,6 @@ impl TvixStoreIO { .await? .is_some()) } - - pub async fn fetch_url( - &self, - url: &str, - name: &str, - hash: Option<&NixHash>, - ) -> Result<StorePath, ErrorKind> { - let resp = self - .http_client - .get(url) - .send() - .await - .map_err(FetcherError::from)?; - let mut sha = Sha256::new(); - let mut data = tokio_util::io::StreamReader::new( - resp.bytes_stream() - .inspect_ok(|data| { - sha.update(data); - }) - .map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) - }), - ); - - let mut blob = self.blob_service.open_write().await; - let size = tokio::io::copy(&mut data, blob.as_mut()).await?; - let blob_digest = blob.close().await?; - let got = NixHash::Sha256(sha.finalize().into()); - - let hash = CAHash::Flat(if let Some(wanted) = hash { - if *wanted != got { - return Err(FetcherError::HashMismatch { - url: url.to_owned(), - wanted: wanted.clone(), - got, - } - .into()); - } - wanted.clone() - } else { - got - }); - - let path = build_ca_path(name, &hash, Vec::<String>::new(), false) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let node = Node::File(FileNode { - name: path.to_string().into(), - digest: blob_digest.into(), - size, - executable: false, - }); - - let (nar_size, nar_sha256) = self - .path_info_service - .calculate_nar(&node) - .await - .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; - - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(node.clone()), - }), - references: vec![], - narinfo: Some(tvix_store::proto::NarInfo { - nar_size, - nar_sha256: nar_sha256.to_vec().into(), - signatures: vec![], - reference_names: vec![], - deriver: None, /* ? */ - ca: Some((&hash).into()), - }), - }; - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - Ok(path.to_owned()) - } } impl EvalIO for TvixStoreIO { @@ -486,13 +445,13 @@ impl EvalIO for TvixStoreIO { .tokio_handle .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })? { - // depending on the node type, treat read_to_string differently + // depending on the node type, treat open differently match node { Node::Directory(_) => { // This would normally be a io::ErrorKind::IsADirectory (still unstable) Err(io::Error::new( io::ErrorKind::Unsupported, - format!("tried to read directory at {:?} to string", path), + format!("tried to open directory at {:?}", path), )) } Node::File(file_node) => { @@ -531,7 +490,7 @@ impl EvalIO for TvixStoreIO { } Node::Symlink(_symlink_node) => Err(io::Error::new( io::ErrorKind::Unsupported, - "read_to_string for symlinks is unsupported", + "open for symlinks is unsupported", ))?, } } else { @@ -622,6 +581,7 @@ impl EvalIO for TvixStoreIO { &self.blob_service, &self.directory_service, &self.path_info_service, + &self.nar_calculation_service, ) .await })?; @@ -642,12 +602,8 @@ mod tests { use bstr::ByteSlice; use tempfile::TempDir; use tvix_build::buildservice::DummyBuildService; - use tvix_castore::{ - blobservice::{BlobService, MemoryBlobService}, - directoryservice::{DirectoryService, MemoryDirectoryService}, - }; use tvix_eval::{EvalIO, EvaluationResult}; - use tvix_store::pathinfoservice::MemoryPathInfoService; + use tvix_store::utils::construct_services; use super::TvixStoreIO; use crate::builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins}; @@ -656,22 +612,19 @@ mod tests { /// Takes care of setting up the evaluator so it knows about the // `derivation` builtin. fn eval(str: &str) -> EvaluationResult { - let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>; - let directory_service = - Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>; - let path_info_service = Arc::new(MemoryPathInfoService::new( - blob_service.clone(), - directory_service.clone(), - )); - - let runtime = tokio::runtime::Runtime::new().unwrap(); + let tokio_runtime = tokio::runtime::Runtime::new().unwrap(); + let (blob_service, directory_service, path_info_service, nar_calculation_service) = + tokio_runtime + .block_on(async { construct_services("memory://", "memory://", "memory://").await }) + .unwrap(); let io = Rc::new(TvixStoreIO::new( - blob_service.clone(), - directory_service.clone(), - path_info_service, + blob_service, + directory_service, + path_info_service.into(), + nar_calculation_service.into(), Arc::<DummyBuildService>::default(), - runtime.handle().clone(), + tokio_runtime.handle().clone(), )); let mut eval = tvix_eval::Evaluation::new(io.clone() as Rc<dyn EvalIO>, true); |