diff options
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r-- | tvix/glue/src/tvix_store_io.rs | 773 |
1 files changed, 348 insertions, 425 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 10a59027852f..67a88e13c54b 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -1,38 +1,28 @@ //! This module provides an implementation of EvalIO talking to tvix-store. - -use async_recursion::async_recursion; -use bytes::Bytes; -use futures::Stream; use futures::{StreamExt, TryStreamExt}; -use nix_compat::nixhash::NixHash; -use nix_compat::store_path::{build_ca_path, StorePathRef}; use nix_compat::{nixhash::CAHash, store_path::StorePath}; -use sha2::{Digest, Sha256}; -use std::marker::Unpin; -use std::rc::Rc; +use std::collections::BTreeMap; use std::{ cell::RefCell, - collections::BTreeSet, io, path::{Path, PathBuf}, sync::Arc, }; use tokio_util::io::SyncIoBridge; -use tracing::{error, instrument, warn, Level}; +use tracing::{error, instrument, warn, Level, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_build::buildservice::BuildService; -use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; -use tvix_store::utils::AsyncIoBridge; -use walkdir::DirEntry; +use tvix_eval::{EvalIO, FileType, StdIO}; +use tvix_store::nar::NarCalculationService; use tvix_castore::{ blobservice::BlobService, directoryservice::{self, DirectoryService}, - proto::{node::Node, FileNode, NamedNode}, - B3Digest, + Node, }; -use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; +use tvix_store::pathinfoservice::{PathInfo, PathInfoService}; -use crate::builtins::FetcherError; +use crate::fetchers::Fetcher; use crate::known_paths::KnownPaths; use crate::tvix_build::derivation_to_build_request; @@ -52,16 +42,27 @@ use crate::tvix_build::derivation_to_build_request; /// implementation of "Tvix Store IO" which does not necessarily bring the concept of blob service, /// directory service or path info service. pub struct TvixStoreIO { - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, - // This is public so builtins can put PathInfos directly. + // This is public so helper functions can interact with the stores directly. + pub(crate) blob_service: Arc<dyn BlobService>, + pub(crate) directory_service: Arc<dyn DirectoryService>, pub(crate) path_info_service: Arc<dyn PathInfoService>, + pub(crate) nar_calculation_service: Arc<dyn NarCalculationService>, + std_io: StdIO, #[allow(dead_code)] build_service: Arc<dyn BuildService>, pub(crate) tokio_handle: tokio::runtime::Handle, - http_client: reqwest::Client, - pub(crate) known_paths: RefCell<KnownPaths>, + + #[allow(clippy::type_complexity)] + pub(crate) fetcher: Fetcher< + Arc<dyn BlobService>, + Arc<dyn DirectoryService>, + Arc<dyn PathInfoService>, + Arc<dyn NarCalculationService>, + >, + + // Paths known how to produce, by building or fetching. + pub known_paths: RefCell<KnownPaths>, } impl TvixStoreIO { @@ -69,17 +70,24 @@ impl TvixStoreIO { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, path_info_service: Arc<dyn PathInfoService>, + nar_calculation_service: Arc<dyn NarCalculationService>, build_service: Arc<dyn BuildService>, tokio_handle: tokio::runtime::Handle, ) -> Self { Self { - blob_service, - directory_service, - path_info_service, + blob_service: blob_service.clone(), + directory_service: directory_service.clone(), + path_info_service: path_info_service.clone(), + nar_calculation_service: nar_calculation_service.clone(), std_io: StdIO {}, build_service, tokio_handle, - http_client: reqwest::Client::new(), + fetcher: Fetcher::new( + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + ), known_paths: Default::default(), } } @@ -91,11 +99,10 @@ impl TvixStoreIO { /// /// In case there is no PathInfo yet, this means we need to build it /// (which currently is stubbed out still). - #[async_recursion(?Send)] - #[instrument(skip(self, store_path), fields(store_path=%store_path), ret(level = Level::TRACE), err)] + #[instrument(skip(self, store_path), fields(store_path=%store_path, indicatif.pb_show=1), ret(level = Level::TRACE), err(level = Level::TRACE))] async fn store_path_to_node( &self, - store_path: &StorePath, + store_path: &StorePath<String>, sub_path: &Path, ) -> io::Result<Option<Node>> { // Find the root node for the store_path. @@ -109,8 +116,8 @@ impl TvixStoreIO { .get(*store_path.digest()) .await? { - // if we have a PathInfo, we know there will be a root_node (due to validation) - Some(path_info) => path_info.node.expect("no node").node.expect("no node"), + // TODO: use stricter typed BuildRequest here. + Some(path_info) => path_info.node, // If there's no PathInfo found, this normally means we have to // trigger the build (and insert into PathInfoService, after // reference scanning). @@ -121,348 +128,270 @@ impl TvixStoreIO { // it for things like <nixpkgs> pointing to a store path. // In the future, these things will (need to) have PathInfo. None => { - // The store path doesn't exist yet, so we need to build it. - warn!("triggering build"); - - // Look up the derivation for this output path. - let (drv_path, drv) = { - let known_paths = self.known_paths.borrow(); - match known_paths.get_drv_path_for_output_path(store_path) { - Some(drv_path) => ( - drv_path.to_owned(), - known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(), - ), - None => { - warn!(store_path=%store_path, "no drv found"); - // let StdIO take over - return Ok(None); - } + // The store path doesn't exist yet, so we need to fetch or build it. + // We check for fetches first, as we might have both native + // fetchers and FODs in KnownPaths, and prefer the former. + // This will also find [Fetch] synthesized from + // `builtin:fetchurl` Derivations. + let maybe_fetch = self + .known_paths + .borrow() + .get_fetch_for_output_path(store_path); + + match maybe_fetch { + Some((name, fetch)) => { + let (sp, root_node) = self + .fetcher + .ingest_and_persist(&name, fetch) + .await + .map_err(|e| { + std::io::Error::new(std::io::ErrorKind::InvalidData, e) + })?; + + debug_assert_eq!( + sp.to_absolute_path(), + store_path.as_ref().to_absolute_path(), + "store path returned from fetcher must match store path we have in fetchers" + ); + + root_node } - }; - - // derivation_to_build_request needs castore nodes for all inputs. - // Provide them, which means, here is where we recursively build - // all dependencies. - #[allow(clippy::mutable_key_type)] - let input_nodes: BTreeSet<Node> = - futures::stream::iter(drv.input_derivations.iter()) - .map(|(input_drv_path, output_names)| { - // look up the derivation object - let input_drv = { - let known_paths = self.known_paths.borrow(); - known_paths - .get_drv_by_drvpath(input_drv_path) - .unwrap_or_else(|| panic!("{} not found", input_drv_path)) - .to_owned() - }; - - // convert output names to actual paths - let output_paths: Vec<StorePath> = output_names + None => { + // Look up the derivation for this output path. + let (drv_path, drv) = { + let known_paths = self.known_paths.borrow(); + match known_paths.get_drv_path_for_output_path(store_path) { + Some(drv_path) => ( + drv_path.to_owned(), + known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(), + ), + None => { + warn!(store_path=%store_path, "no drv found"); + // let StdIO take over + return Ok(None); + } + } + }; + let span = Span::current(); + span.pb_start(); + span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE); + span.pb_set_message(&format!("⏳Waiting for inputs {}", &store_path)); + + // derivation_to_build_request needs castore nodes for all inputs. + // Provide them, which means, here is where we recursively build + // all dependencies. + let mut inputs: BTreeMap<StorePath<String>, Node> = + futures::stream::iter(drv.input_derivations.iter()) + .map(|(input_drv_path, output_names)| { + // look up the derivation object + let input_drv = { + let known_paths = self.known_paths.borrow(); + known_paths + .get_drv_by_drvpath(input_drv_path) + .unwrap_or_else(|| { + panic!("{} not found", input_drv_path) + }) + .to_owned() + }; + + // convert output names to actual paths + let output_paths: Vec<StorePath<String>> = output_names + .iter() + .map(|output_name| { + input_drv + .outputs + .get(output_name) + .expect("missing output_name") + .path + .as_ref() + .expect("missing output path") + .clone() + }) + .collect(); + + // For each output, ask for the castore node. + // We're in a per-derivation context, so if they're + // not built yet they'll all get built together. + // If they don't need to build, we can however still + // substitute all in parallel (if they don't need to + // be built) - so we turn this into a stream of streams. + // It's up to the builder to deduplicate same build requests. + futures::stream::iter(output_paths.into_iter()).map( + |output_path| async move { + let node = self + .store_path_to_node(&output_path, Path::new("")) + .await?; + + if let Some(node) = node { + Ok((output_path, node)) + } else { + Err(io::Error::other("no node produced")) + } + }, + ) + }) + .flatten() + .buffer_unordered( + 1, /* TODO: increase again once we prevent redundant fetches */ + ) // TODO: make configurable + .try_collect() + .await?; + + // FUTUREWORK: merge these who things together + // add input sources + let input_sources: BTreeMap<_, _> = + futures::stream::iter(drv.input_sources.iter()) + .then(|input_source| { + Box::pin({ + let input_source = input_source.clone(); + async move { + let node = self + .store_path_to_node(&input_source, Path::new("")) + .await?; + if let Some(node) = node { + Ok((input_source, node)) + } else { + Err(io::Error::other("no node produced")) + } + } + }) + }) + .try_collect() + .await?; + + inputs.extend(input_sources); + + span.pb_set_message(&format!("🔨Building {}", &store_path)); + + // TODO: check if input sources are sufficiently dealth with, + // I think yes, they must be imported into the store by other + // operations, so dealt with in the Some(…) match arm + + // synthesize the build request. + let build_request = derivation_to_build_request(&drv, inputs)?; + + // create a build + let build_result = self + .build_service + .as_ref() + .do_build(build_request) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + + // Maps from the index in refscan_needles to the full store path + // Used to map back to the actual store path from the found needles + // Importantly, this must match the order of the needles generated in derivation_to_build_request + let refscan_needles = + crate::tvix_build::get_refscan_needles(&drv).collect::<Vec<_>>(); + + // For each output, insert a PathInfo. + for ((output, output_needles), drv_output) in build_result + .outputs + .iter() + .zip(build_result.outputs_needles.iter()) + .zip(drv.outputs.iter()) + { + let output_node = output + .clone() + .try_into_anonymous_node() + .expect("invalid node"); + + let output_needles: Vec<_> = output_needles + .needles .iter() - .map(|output_name| { - input_drv - .outputs - .get(output_name) - .expect("missing output_name") - .path - .as_ref() - .expect("missing output path") - .clone() + // Map each output needle index back to the refscan_needle + .map(|idx| { + refscan_needles + .get(*idx as usize) + .ok_or(std::io::Error::new( + std::io::ErrorKind::Other, + "invalid build response", + )) }) - .collect(); - // For each output, ask for the castore node. - // We're in a per-derivation context, so if they're - // not built yet they'll all get built together. - // If they don't need to build, we can however still - // substitute all in parallel (if they don't need to - // be built) - so we turn this into a stream of streams. - // It's up to the builder to deduplicate same build requests. - futures::stream::iter(output_paths.into_iter()).map( - |output_path| async move { - let node = self - .store_path_to_node(&output_path, Path::new("")) - .await?; - - if let Some(node) = node { - Ok(node) - } else { - Err(io::Error::other("no node produced")) - } - }, - ) - }) - .flatten() - .buffer_unordered(10) // TODO: make configurable - .try_collect() - .await?; - - // TODO: check if input sources are sufficiently dealth with, - // I think yes, they must be imported into the store by other - // operations, so dealt with in the Some(…) match arm - - // synthesize the build request. - let build_request = derivation_to_build_request(&drv, input_nodes)?; - - // create a build - let build_result = self - .build_service - .as_ref() - .do_build(build_request) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - // TODO: refscan? - - // For each output, insert a PathInfo. - for output in &build_result.outputs { - let root_node = output.node.as_ref().expect("invalid root node"); - - // calculate the nar representation - let (nar_size, nar_sha256) = - self.path_info_service.calculate_nar(root_node).await?; - - // assemble the PathInfo to persist - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(root_node.clone()), - }), - references: vec![], // TODO: refscan - narinfo: Some(tvix_store::proto::NarInfo { - nar_size, - nar_sha256: Bytes::from(nar_sha256.to_vec()), - signatures: vec![], - reference_names: vec![], // TODO: refscan - deriver: Some(tvix_store::proto::StorePath { - name: drv_path - .name() - .strip_suffix(".drv") - .expect("missing .drv suffix") - .to_string(), - digest: drv_path.digest().to_vec().into(), - }), - ca: drv.fod_digest().map( - |fod_digest| -> tvix_store::proto::nar_info::Ca { - (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest))) - .into() - }, - ), - }), - }; - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - } + .collect::<Result<_, std::io::Error>>()?; + + // calculate the nar representation + let (nar_size, nar_sha256) = self + .nar_calculation_service + .calculate_nar(&output_node) + .await?; + + // assemble the PathInfo to persist + let path_info = PathInfo { + store_path: drv_output + .1 + .path + .as_ref() + .ok_or(std::io::Error::new( + std::io::ErrorKind::Other, + "Tvix bug: missing output store path", + ))? + .to_owned(), + node: output_node, + references: output_needles + .iter() + .map(|s| (**s).to_owned()) + .collect(), + nar_size, + nar_sha256, + signatures: vec![], + deriver: Some( + StorePath::from_name_and_digest_fixed( + drv_path + .name() + .strip_suffix(".drv") + .expect("missing .drv suffix"), + *drv_path.digest(), + ) + .expect( + "Tvix bug: StorePath without .drv suffix must be valid", + ), + ), + ca: drv.fod_digest().map(|fod_digest| { + CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest)) + }), + }; + + self.path_info_service + .put(path_info) + .await + .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; + } - // find the output for the store path requested - build_result - .outputs - .into_iter() - .find(|output_node| { - output_node.node.as_ref().expect("invalid node").get_name() - == store_path.to_string().as_bytes() - }) - .expect("build didn't produce the store path") - .node - .expect("invalid node") + // find the output for the store path requested + let s = store_path.to_string(); + + build_result + .outputs + .into_iter() + .map(|e| e.try_into_name_and_node().expect("invalid node")) + .find(|(output_name, _output_node)| { + output_name.as_ref() == s.as_bytes() + }) + .expect("build didn't produce the store path") + .1 + } + } } }; // now with the root_node and sub_path, descend to the node requested. + // We convert sub_path to the castore model here. + let sub_path = tvix_castore::PathBuf::from_host_path(sub_path, true)?; + directoryservice::descend_to(&self.directory_service, root_node, sub_path) .await .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e)) } - - /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`], - /// passing the blob_service and directory_service that's used. - /// The error is mapped to std::io::Error for simplicity. - pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node> - where - S: Stream<Item = DirEntry> + Unpin, - { - tvix_castore::import::ingest_entries( - &self.blob_service, - &self.directory_service, - entries_stream, - ) - .await - .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err)) - } - - pub(crate) async fn node_to_path_info( - &self, - name: &str, - path: &Path, - ca: CAHash, - root_node: Node, - ) -> io::Result<(PathInfo, StorePath)> { - // Ask the PathInfoService for the NAR size and sha256 - // We always need it no matter what is the actual hash mode - // because the path info construct a narinfo which *always* - // require a SHA256 of the NAR representation and the NAR size. - let (nar_size, nar_sha256) = self - .path_info_service - .as_ref() - .calculate_nar(&root_node) - .await?; - - // Calculate the output path. This might still fail, as some names are illegal. - let output_path = - nix_compat::store_path::build_ca_path(name, &ca, Vec::<String>::new(), false).map_err( - |_| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("invalid name: {}", name), - ) - }, - )?; - - // assemble a new root_node with a name that is derived from the nar hash. - let root_node = root_node.rename(output_path.to_string().into_bytes().into()); - tvix_store::import::log_node(&root_node, path); - - let path_info = - tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, Some(ca), root_node); - - Ok((path_info, output_path.to_owned())) - } - - pub(crate) async fn register_node_in_path_info_service( - &self, - name: &str, - path: &Path, - ca: CAHash, - root_node: Node, - ) -> io::Result<StorePath> { - let (path_info, output_path) = self.node_to_path_info(name, path, ca, root_node).await?; - let _path_info = self.path_info_service.as_ref().put(path_info).await?; - - Ok(output_path) - } - - /// Transforms a BLAKE-3 digest into a SHA256 digest - /// by re-hashing the whole file. - pub(crate) async fn blob_to_sha256_hash(&self, blob_digest: B3Digest) -> io::Result<[u8; 32]> { - let mut reader = self - .blob_service - .open_read(&blob_digest) - .await? - .ok_or_else(|| { - io::Error::new( - io::ErrorKind::NotFound, - format!("blob represented by digest: '{}' not found", blob_digest), - ) - })?; - // It is fine to use `AsyncIoBridge` here because hashing is not actually I/O. - let mut hasher = AsyncIoBridge(Sha256::new()); - - tokio::io::copy(&mut reader, &mut hasher).await?; - Ok(hasher.0.finalize().into()) - } - - pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result<bool> { - Ok(self - .path_info_service - .as_ref() - .get(*store_path.digest()) - .await? - .is_some()) - } - - pub async fn fetch_url( - &self, - url: &str, - name: &str, - hash: Option<&NixHash>, - ) -> Result<StorePath, ErrorKind> { - let resp = self - .http_client - .get(url) - .send() - .await - .map_err(FetcherError::from)?; - let mut sha = Sha256::new(); - let mut data = tokio_util::io::StreamReader::new( - resp.bytes_stream() - .inspect_ok(|data| { - sha.update(data); - }) - .map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) - }), - ); - - let mut blob = self.blob_service.open_write().await; - let size = tokio::io::copy(&mut data, blob.as_mut()).await?; - let blob_digest = blob.close().await?; - let got = NixHash::Sha256(sha.finalize().into()); - - let hash = CAHash::Flat(if let Some(wanted) = hash { - if *wanted != got { - return Err(FetcherError::HashMismatch { - url: url.to_owned(), - wanted: wanted.clone(), - got, - } - .into()); - } - wanted.clone() - } else { - got - }); - - let path = build_ca_path(name, &hash, Vec::<String>::new(), false) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let node = Node::File(FileNode { - name: path.to_string().into(), - digest: blob_digest.into(), - size, - executable: false, - }); - - let (nar_size, nar_sha256) = self - .path_info_service - .calculate_nar(&node) - .await - .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; - - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(node.clone()), - }), - references: vec![], - narinfo: Some(tvix_store::proto::NarInfo { - nar_size, - nar_sha256: nar_sha256.to_vec().into(), - signatures: vec![], - reference_names: vec![], - deriver: None, /* ? */ - ca: Some((&hash).into()), - }), - }; - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - Ok(path.to_owned()) - } } impl EvalIO for TvixStoreIO { #[instrument(skip(self), ret(level = Level::TRACE), err)] fn path_exists(&self, path: &Path) -> io::Result<bool> { - if let Ok((store_path, sub_path)) = - StorePath::from_absolute_path_full(&path.to_string_lossy()) - { + if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) { if self .tokio_handle - .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })? + .block_on(self.store_path_to_node(&store_path, sub_path))? .is_some() { Ok(true) @@ -479,35 +408,21 @@ impl EvalIO for TvixStoreIO { #[instrument(skip(self), err)] fn open(&self, path: &Path) -> io::Result<Box<dyn io::Read>> { - if let Ok((store_path, sub_path)) = - StorePath::from_absolute_path_full(&path.to_string_lossy()) - { + if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) { if let Some(node) = self .tokio_handle - .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })? + .block_on(async { self.store_path_to_node(&store_path, sub_path).await })? { - // depending on the node type, treat read_to_string differently + // depending on the node type, treat open differently match node { - Node::Directory(_) => { + Node::Directory { .. } => { // This would normally be a io::ErrorKind::IsADirectory (still unstable) Err(io::Error::new( io::ErrorKind::Unsupported, - format!("tried to read directory at {:?} to string", path), + format!("tried to open directory at {:?}", path), )) } - Node::File(file_node) => { - let digest: B3Digest = - file_node.digest.clone().try_into().map_err(|_e| { - error!( - file_node = ?file_node, - "invalid digest" - ); - io::Error::new( - io::ErrorKind::InvalidData, - format!("invalid digest length in file node: {:?}", file_node), - ) - })?; - + Node::File { digest, .. } => { self.tokio_handle.block_on(async { let resp = self.blob_service.as_ref().open_read(&digest).await?; match resp { @@ -529,9 +444,9 @@ impl EvalIO for TvixStoreIO { } }) } - Node::Symlink(_symlink_node) => Err(io::Error::new( + Node::Symlink { .. } => Err(io::Error::new( io::ErrorKind::Unsupported, - "read_to_string for symlinks is unsupported", + "open for symlinks is unsupported", ))?, } } else { @@ -546,37 +461,45 @@ impl EvalIO for TvixStoreIO { } #[instrument(skip(self), ret(level = Level::TRACE), err)] + fn file_type(&self, path: &Path) -> io::Result<FileType> { + if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) { + if let Some(node) = self + .tokio_handle + .block_on(async { self.store_path_to_node(&store_path, sub_path).await })? + { + match node { + Node::Directory { .. } => Ok(FileType::Directory), + Node::File { .. } => Ok(FileType::Regular), + Node::Symlink { .. } => Ok(FileType::Symlink), + } + } else { + self.std_io.file_type(path) + } + } else { + self.std_io.file_type(path) + } + } + + #[instrument(skip(self), ret(level = Level::TRACE), err)] fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> { - if let Ok((store_path, sub_path)) = - StorePath::from_absolute_path_full(&path.to_string_lossy()) - { + if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) { if let Some(node) = self .tokio_handle - .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })? + .block_on(async { self.store_path_to_node(&store_path, sub_path).await })? { match node { - Node::Directory(directory_node) => { + Node::Directory { digest, .. } => { // fetch the Directory itself. - let digest: B3Digest = - directory_node.digest.clone().try_into().map_err(|_e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!( - "invalid digest length in directory node: {:?}", - directory_node - ), - ) - })?; - - if let Some(directory) = self.tokio_handle.block_on(async { - self.directory_service.as_ref().get(&digest).await + if let Some(directory) = self.tokio_handle.block_on({ + let digest = digest.clone(); + async move { self.directory_service.as_ref().get(&digest).await } })? { let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new(); - for node in directory.nodes() { + for (name, node) in directory.into_nodes() { children.push(match node { - Node::Directory(e) => (e.name, FileType::Directory), - Node::File(e) => (e.name, FileType::Regular), - Node::Symlink(e) => (e.name, FileType::Symlink), + Node::Directory { .. } => (name.into(), FileType::Directory), + Node::File { .. } => (name.clone().into(), FileType::Regular), + Node::Symlink { .. } => (name.into(), FileType::Symlink), }) } Ok(children) @@ -593,14 +516,14 @@ impl EvalIO for TvixStoreIO { ))? } } - Node::File(_file_node) => { + Node::File { .. } => { // This would normally be a io::ErrorKind::NotADirectory (still unstable) Err(io::Error::new( io::ErrorKind::Unsupported, "tried to readdir path {:?}, which is a file", ))? } - Node::Symlink(_symlink_node) => Err(io::Error::new( + Node::Symlink { .. } => Err(io::Error::new( io::ErrorKind::Unsupported, "read_dir for symlinks is unsupported", ))?, @@ -615,18 +538,19 @@ impl EvalIO for TvixStoreIO { #[instrument(skip(self), ret(level = Level::TRACE), err)] fn import_path(&self, path: &Path) -> io::Result<PathBuf> { - let output_path = self.tokio_handle.block_on(async { + let path_info = self.tokio_handle.block_on({ tvix_store::import::import_path_as_nar_ca( path, tvix_store::import::path_to_name(path)?, &self.blob_service, &self.directory_service, &self.path_info_service, + &self.nar_calculation_service, ) - .await })?; - Ok(output_path.to_absolute_path().into()) + // From the returned PathInfo, extract the store path and return it. + Ok(path_info.store_path.to_absolute_path().into()) } #[instrument(skip(self), ret(level = Level::TRACE))] @@ -640,14 +564,11 @@ mod tests { use std::{path::Path, rc::Rc, sync::Arc}; use bstr::ByteSlice; + use clap::Parser; use tempfile::TempDir; use tvix_build::buildservice::DummyBuildService; - use tvix_castore::{ - blobservice::{BlobService, MemoryBlobService}, - directoryservice::{DirectoryService, MemoryDirectoryService}, - }; use tvix_eval::{EvalIO, EvaluationResult}; - use tvix_store::pathinfoservice::MemoryPathInfoService; + use tvix_store::utils::{construct_services, ServiceUrlsMemory}; use super::TvixStoreIO; use crate::builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins}; @@ -656,28 +577,30 @@ mod tests { /// Takes care of setting up the evaluator so it knows about the // `derivation` builtin. fn eval(str: &str) -> EvaluationResult { - let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>; - let directory_service = - Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>; - let path_info_service = Arc::new(MemoryPathInfoService::new( - blob_service.clone(), - directory_service.clone(), - )); - - let runtime = tokio::runtime::Runtime::new().unwrap(); + let tokio_runtime = tokio::runtime::Runtime::new().unwrap(); + let (blob_service, directory_service, path_info_service, nar_calculation_service) = + tokio_runtime + .block_on(async { + construct_services(ServiceUrlsMemory::parse_from(std::iter::empty::<&str>())) + .await + }) + .unwrap(); let io = Rc::new(TvixStoreIO::new( - blob_service.clone(), - directory_service.clone(), + blob_service, + directory_service, path_info_service, + nar_calculation_service.into(), Arc::<DummyBuildService>::default(), - runtime.handle().clone(), + tokio_runtime.handle().clone(), )); - let mut eval = tvix_eval::Evaluation::new(io.clone() as Rc<dyn EvalIO>, true); - add_derivation_builtins(&mut eval, io.clone()); - add_fetcher_builtins(&mut eval, io.clone()); - add_import_builtins(&mut eval, io); + let mut eval_builder = + tvix_eval::Evaluation::builder(io.clone() as Rc<dyn EvalIO>).enable_import(); + eval_builder = add_derivation_builtins(eval_builder, Rc::clone(&io)); + eval_builder = add_fetcher_builtins(eval_builder, Rc::clone(&io)); + eval_builder = add_import_builtins(eval_builder, io); + let eval = eval_builder.build(); // run the evaluation itself. eval.evaluate(str, None) |