diff options
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 105 |
1 files changed, 66 insertions, 39 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 7b8ef3ff0a..4e5488067f 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -1,11 +1,9 @@ //! This module provides an implementation of EvalIO talking to tvix-store. - use bytes::Bytes; use futures::{StreamExt, TryStreamExt}; use nix_compat::nixhash::NixHash; use nix_compat::store_path::StorePathRef; use nix_compat::{nixhash::CAHash, store_path::StorePath}; -use sha2::{Digest, Sha256}; use std::{ cell::RefCell, collections::BTreeSet, @@ -14,12 +12,12 @@ use std::{ sync::Arc, }; use tokio_util::io::SyncIoBridge; -use tracing::{error, info, instrument, warn, Level}; +use tracing::{error, instrument, warn, Level, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_build::buildservice::BuildService; use tvix_castore::proto::node::Node; use tvix_eval::{EvalIO, FileType, StdIO}; use tvix_store::nar::NarCalculationService; -use tvix_store::utils::AsyncIoBridge; use tvix_castore::{ blobservice::BlobService, @@ -106,7 +104,7 @@ impl TvixStoreIO { /// /// In case there is no PathInfo yet, this means we need to build it /// (which currently is stubbed out still). - #[instrument(skip(self, store_path), fields(store_path=%store_path), ret(level = Level::TRACE), err)] + #[instrument(skip(self, store_path), fields(store_path=%store_path, indicatif.pb_show=1), ret(level = Level::TRACE), err)] async fn store_path_to_node( &self, store_path: &StorePath, @@ -138,7 +136,8 @@ impl TvixStoreIO { // The store path doesn't exist yet, so we need to fetch or build it. // We check for fetches first, as we might have both native // fetchers and FODs in KnownPaths, and prefer the former. - + // This will also find [Fetch] synthesized from + // `builtin:fetchurl` Derivations. let maybe_fetch = self .known_paths .borrow() @@ -146,7 +145,6 @@ impl TvixStoreIO { match maybe_fetch { Some((name, fetch)) => { - info!(?fetch, "triggering lazy fetch"); let (sp, root_node) = self .fetcher .ingest_and_persist(&name, fetch) @@ -156,9 +154,9 @@ impl TvixStoreIO { })?; debug_assert_eq!( - sp.to_string(), - store_path.to_string(), - "store path returned from fetcher should match" + sp.to_absolute_path(), + store_path.as_ref().to_absolute_path(), + "store path returned from fetcher must match store path we have in fetchers" ); root_node @@ -179,14 +177,16 @@ impl TvixStoreIO { } } }; - - warn!("triggering build"); + let span = Span::current(); + span.pb_start(); + span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE); + span.pb_set_message(&format!("⏳Waiting for inputs {}", &store_path)); // derivation_to_build_request needs castore nodes for all inputs. // Provide them, which means, here is where we recursively build // all dependencies. #[allow(clippy::mutable_key_type)] - let input_nodes: BTreeSet<Node> = + let mut input_nodes: BTreeSet<Node> = futures::stream::iter(drv.input_derivations.iter()) .map(|(input_drv_path, output_names)| { // look up the derivation object @@ -236,9 +236,34 @@ impl TvixStoreIO { ) }) .flatten() - .buffer_unordered(10) // TODO: make configurable + .buffer_unordered( + 1, /* TODO: increase again once we prevent redundant fetches */ + ) // TODO: make configurable + .try_collect() + .await?; + + // add input sources + // FUTUREWORK: merge these who things together + #[allow(clippy::mutable_key_type)] + let input_nodes_input_sources: BTreeSet<Node> = + futures::stream::iter(drv.input_sources.iter()) + .then(|input_source| { + Box::pin(async { + let node = self + .store_path_to_node(input_source, Path::new("")) + .await?; + if let Some(node) = node { + Ok(node) + } else { + Err(io::Error::other("no node produced")) + } + }) + }) .try_collect() .await?; + input_nodes.extend(input_nodes_input_sources); + + span.pb_set_message(&format!("🔨Building {}", &store_path)); // TODO: check if input sources are sufficiently dealth with, // I think yes, they must be imported into the store by other @@ -255,7 +280,7 @@ impl TvixStoreIO { .await .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - // TODO: refscan? + // TODO: refscan // For each output, insert a PathInfo. for output in &build_result.outputs { @@ -332,7 +357,7 @@ impl TvixStoreIO { &self, name: &str, path: &Path, - ca: CAHash, + ca: &CAHash, root_node: Node, ) -> io::Result<(PathInfo, NixHash, StorePath)> { // Ask the PathInfoService for the NAR size and sha256 @@ -347,7 +372,7 @@ impl TvixStoreIO { // Calculate the output path. This might still fail, as some names are illegal. let output_path = - nix_compat::store_path::build_ca_path(name, &ca, Vec::<String>::new(), false).map_err( + nix_compat::store_path::build_ca_path(name, ca, Vec::<String>::new(), false).map_err( |_| { std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -374,7 +399,7 @@ impl TvixStoreIO { &self, name: &str, path: &Path, - ca: CAHash, + ca: &CAHash, root_node: Node, ) -> io::Result<StorePath> { let (path_info, _, output_path) = self.node_to_path_info(name, path, ca, root_node).await?; @@ -383,26 +408,6 @@ impl TvixStoreIO { Ok(output_path) } - /// Transforms a BLAKE-3 digest into a SHA256 digest - /// by re-hashing the whole file. - pub(crate) async fn blob_to_sha256_hash(&self, blob_digest: B3Digest) -> io::Result<[u8; 32]> { - let mut reader = self - .blob_service - .open_read(&blob_digest) - .await? - .ok_or_else(|| { - io::Error::new( - io::ErrorKind::NotFound, - format!("blob represented by digest: '{}' not found", blob_digest), - ) - })?; - // It is fine to use `AsyncIoBridge` here because hashing is not actually I/O. - let mut hasher = AsyncIoBridge(Sha256::new()); - - tokio::io::copy(&mut reader, &mut hasher).await?; - Ok(hasher.0.finalize().into()) - } - pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result<bool> { Ok(self .path_info_service @@ -421,7 +426,7 @@ impl EvalIO for TvixStoreIO { { if self .tokio_handle - .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })? + .block_on(self.store_path_to_node(&store_path, &sub_path))? .is_some() { Ok(true) @@ -505,6 +510,28 @@ impl EvalIO for TvixStoreIO { } #[instrument(skip(self), ret(level = Level::TRACE), err)] + fn file_type(&self, path: &Path) -> io::Result<FileType> { + if let Ok((store_path, sub_path)) = + StorePath::from_absolute_path_full(&path.to_string_lossy()) + { + if let Some(node) = self + .tokio_handle + .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })? + { + match node { + Node::Directory(_) => Ok(FileType::Directory), + Node::File(_) => Ok(FileType::Regular), + Node::Symlink(_) => Ok(FileType::Symlink), + } + } else { + self.std_io.file_type(path) + } + } else { + self.std_io.file_type(path) + } + } + + #[instrument(skip(self), ret(level = Level::TRACE), err)] fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> { if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(&path.to_string_lossy()) |