about summary refs log tree commit diff
path: root/tvix/glue/src/tvix_store_io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r--tvix/glue/src/tvix_store_io.rs773
1 files changed, 348 insertions, 425 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 10a59027852f..67a88e13c54b 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -1,38 +1,28 @@
 //! This module provides an implementation of EvalIO talking to tvix-store.
-
-use async_recursion::async_recursion;
-use bytes::Bytes;
-use futures::Stream;
 use futures::{StreamExt, TryStreamExt};
-use nix_compat::nixhash::NixHash;
-use nix_compat::store_path::{build_ca_path, StorePathRef};
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
-use sha2::{Digest, Sha256};
-use std::marker::Unpin;
-use std::rc::Rc;
+use std::collections::BTreeMap;
 use std::{
     cell::RefCell,
-    collections::BTreeSet,
     io,
     path::{Path, PathBuf},
     sync::Arc,
 };
 use tokio_util::io::SyncIoBridge;
-use tracing::{error, instrument, warn, Level};
+use tracing::{error, instrument, warn, Level, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 use tvix_build::buildservice::BuildService;
-use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
-use tvix_store::utils::AsyncIoBridge;
-use walkdir::DirEntry;
+use tvix_eval::{EvalIO, FileType, StdIO};
+use tvix_store::nar::NarCalculationService;
 
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::{self, DirectoryService},
-    proto::{node::Node, FileNode, NamedNode},
-    B3Digest,
+    Node,
 };
-use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
+use tvix_store::pathinfoservice::{PathInfo, PathInfoService};
 
-use crate::builtins::FetcherError;
+use crate::fetchers::Fetcher;
 use crate::known_paths::KnownPaths;
 use crate::tvix_build::derivation_to_build_request;
 
@@ -52,16 +42,27 @@ use crate::tvix_build::derivation_to_build_request;
 /// implementation of "Tvix Store IO" which does not necessarily bring the concept of blob service,
 /// directory service or path info service.
 pub struct TvixStoreIO {
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-    // This is public so builtins can put PathInfos directly.
+    // This is public so helper functions can interact with the stores directly.
+    pub(crate) blob_service: Arc<dyn BlobService>,
+    pub(crate) directory_service: Arc<dyn DirectoryService>,
     pub(crate) path_info_service: Arc<dyn PathInfoService>,
+    pub(crate) nar_calculation_service: Arc<dyn NarCalculationService>,
+
     std_io: StdIO,
     #[allow(dead_code)]
     build_service: Arc<dyn BuildService>,
     pub(crate) tokio_handle: tokio::runtime::Handle,
-    http_client: reqwest::Client,
-    pub(crate) known_paths: RefCell<KnownPaths>,
+
+    #[allow(clippy::type_complexity)]
+    pub(crate) fetcher: Fetcher<
+        Arc<dyn BlobService>,
+        Arc<dyn DirectoryService>,
+        Arc<dyn PathInfoService>,
+        Arc<dyn NarCalculationService>,
+    >,
+
+    // Paths known how to produce, by building or fetching.
+    pub known_paths: RefCell<KnownPaths>,
 }
 
 impl TvixStoreIO {
@@ -69,17 +70,24 @@ impl TvixStoreIO {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
         path_info_service: Arc<dyn PathInfoService>,
+        nar_calculation_service: Arc<dyn NarCalculationService>,
         build_service: Arc<dyn BuildService>,
         tokio_handle: tokio::runtime::Handle,
     ) -> Self {
         Self {
-            blob_service,
-            directory_service,
-            path_info_service,
+            blob_service: blob_service.clone(),
+            directory_service: directory_service.clone(),
+            path_info_service: path_info_service.clone(),
+            nar_calculation_service: nar_calculation_service.clone(),
             std_io: StdIO {},
             build_service,
             tokio_handle,
-            http_client: reqwest::Client::new(),
+            fetcher: Fetcher::new(
+                blob_service,
+                directory_service,
+                path_info_service,
+                nar_calculation_service,
+            ),
             known_paths: Default::default(),
         }
     }
@@ -91,11 +99,10 @@ impl TvixStoreIO {
     ///
     /// In case there is no PathInfo yet, this means we need to build it
     /// (which currently is stubbed out still).
-    #[async_recursion(?Send)]
-    #[instrument(skip(self, store_path), fields(store_path=%store_path), ret(level = Level::TRACE), err)]
+    #[instrument(skip(self, store_path), fields(store_path=%store_path, indicatif.pb_show=1), ret(level = Level::TRACE), err(level = Level::TRACE))]
     async fn store_path_to_node(
         &self,
-        store_path: &StorePath,
+        store_path: &StorePath<String>,
         sub_path: &Path,
     ) -> io::Result<Option<Node>> {
         // Find the root node for the store_path.
@@ -109,8 +116,8 @@ impl TvixStoreIO {
             .get(*store_path.digest())
             .await?
         {
-            // if we have a PathInfo, we know there will be a root_node (due to validation)
-            Some(path_info) => path_info.node.expect("no node").node.expect("no node"),
+            // TODO: use stricter typed BuildRequest here.
+            Some(path_info) => path_info.node,
             // If there's no PathInfo found, this normally means we have to
             // trigger the build (and insert into PathInfoService, after
             // reference scanning).
@@ -121,348 +128,270 @@ impl TvixStoreIO {
             // it for things like <nixpkgs> pointing to a store path.
             // In the future, these things will (need to) have PathInfo.
             None => {
-                // The store path doesn't exist yet, so we need to build it.
-                warn!("triggering build");
-
-                // Look up the derivation for this output path.
-                let (drv_path, drv) = {
-                    let known_paths = self.known_paths.borrow();
-                    match known_paths.get_drv_path_for_output_path(store_path) {
-                        Some(drv_path) => (
-                            drv_path.to_owned(),
-                            known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(),
-                        ),
-                        None => {
-                            warn!(store_path=%store_path, "no drv found");
-                            // let StdIO take over
-                            return Ok(None);
-                        }
+                // The store path doesn't exist yet, so we need to fetch or build it.
+                // We check for fetches first, as we might have both native
+                // fetchers and FODs in KnownPaths, and prefer the former.
+                // This will also find [Fetch] synthesized from
+                // `builtin:fetchurl` Derivations.
+                let maybe_fetch = self
+                    .known_paths
+                    .borrow()
+                    .get_fetch_for_output_path(store_path);
+
+                match maybe_fetch {
+                    Some((name, fetch)) => {
+                        let (sp, root_node) = self
+                            .fetcher
+                            .ingest_and_persist(&name, fetch)
+                            .await
+                            .map_err(|e| {
+                            std::io::Error::new(std::io::ErrorKind::InvalidData, e)
+                        })?;
+
+                        debug_assert_eq!(
+                            sp.to_absolute_path(),
+                            store_path.as_ref().to_absolute_path(),
+                            "store path returned from fetcher must match store path we have in fetchers"
+                        );
+
+                        root_node
                     }
-                };
-
-                // derivation_to_build_request needs castore nodes for all inputs.
-                // Provide them, which means, here is where we recursively build
-                // all dependencies.
-                #[allow(clippy::mutable_key_type)]
-                let input_nodes: BTreeSet<Node> =
-                    futures::stream::iter(drv.input_derivations.iter())
-                        .map(|(input_drv_path, output_names)| {
-                            // look up the derivation object
-                            let input_drv = {
-                                let known_paths = self.known_paths.borrow();
-                                known_paths
-                                    .get_drv_by_drvpath(input_drv_path)
-                                    .unwrap_or_else(|| panic!("{} not found", input_drv_path))
-                                    .to_owned()
-                            };
-
-                            // convert output names to actual paths
-                            let output_paths: Vec<StorePath> = output_names
+                    None => {
+                        // Look up the derivation for this output path.
+                        let (drv_path, drv) = {
+                            let known_paths = self.known_paths.borrow();
+                            match known_paths.get_drv_path_for_output_path(store_path) {
+                                Some(drv_path) => (
+                                    drv_path.to_owned(),
+                                    known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(),
+                                ),
+                                None => {
+                                    warn!(store_path=%store_path, "no drv found");
+                                    // let StdIO take over
+                                    return Ok(None);
+                                }
+                            }
+                        };
+                        let span = Span::current();
+                        span.pb_start();
+                        span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
+                        span.pb_set_message(&format!("⏳Waiting for inputs {}", &store_path));
+
+                        // derivation_to_build_request needs castore nodes for all inputs.
+                        // Provide them, which means, here is where we recursively build
+                        // all dependencies.
+                        let mut inputs: BTreeMap<StorePath<String>, Node> =
+                            futures::stream::iter(drv.input_derivations.iter())
+                                .map(|(input_drv_path, output_names)| {
+                                    // look up the derivation object
+                                    let input_drv = {
+                                        let known_paths = self.known_paths.borrow();
+                                        known_paths
+                                            .get_drv_by_drvpath(input_drv_path)
+                                            .unwrap_or_else(|| {
+                                                panic!("{} not found", input_drv_path)
+                                            })
+                                            .to_owned()
+                                    };
+
+                                    // convert output names to actual paths
+                                    let output_paths: Vec<StorePath<String>> = output_names
+                                        .iter()
+                                        .map(|output_name| {
+                                            input_drv
+                                                .outputs
+                                                .get(output_name)
+                                                .expect("missing output_name")
+                                                .path
+                                                .as_ref()
+                                                .expect("missing output path")
+                                                .clone()
+                                        })
+                                        .collect();
+
+                                    // For each output, ask for the castore node.
+                                    // We're in a per-derivation context, so if they're
+                                    // not built yet they'll all get built together.
+                                    // If they don't need to build, we can however still
+                                    // substitute all in parallel (if they don't need to
+                                    // be built) - so we turn this into a stream of streams.
+                                    // It's up to the builder to deduplicate same build requests.
+                                    futures::stream::iter(output_paths.into_iter()).map(
+                                        |output_path| async move {
+                                            let node = self
+                                                .store_path_to_node(&output_path, Path::new(""))
+                                                .await?;
+
+                                            if let Some(node) = node {
+                                                Ok((output_path, node))
+                                            } else {
+                                                Err(io::Error::other("no node produced"))
+                                            }
+                                        },
+                                    )
+                                })
+                                .flatten()
+                                .buffer_unordered(
+                                    1, /* TODO: increase again once we prevent redundant fetches */
+                                ) // TODO: make configurable
+                                .try_collect()
+                                .await?;
+
+                        // FUTUREWORK: merge these who things together
+                        // add input sources
+                        let input_sources: BTreeMap<_, _> =
+                            futures::stream::iter(drv.input_sources.iter())
+                                .then(|input_source| {
+                                    Box::pin({
+                                        let input_source = input_source.clone();
+                                        async move {
+                                            let node = self
+                                                .store_path_to_node(&input_source, Path::new(""))
+                                                .await?;
+                                            if let Some(node) = node {
+                                                Ok((input_source, node))
+                                            } else {
+                                                Err(io::Error::other("no node produced"))
+                                            }
+                                        }
+                                    })
+                                })
+                                .try_collect()
+                                .await?;
+
+                        inputs.extend(input_sources);
+
+                        span.pb_set_message(&format!("🔨Building {}", &store_path));
+
+                        // TODO: check if input sources are sufficiently dealth with,
+                        // I think yes, they must be imported into the store by other
+                        // operations, so dealt with in the Some(…) match arm
+
+                        // synthesize the build request.
+                        let build_request = derivation_to_build_request(&drv, inputs)?;
+
+                        // create a build
+                        let build_result = self
+                            .build_service
+                            .as_ref()
+                            .do_build(build_request)
+                            .await
+                            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+
+                        // Maps from the index in refscan_needles to the full store path
+                        // Used to map back to the actual store path from the found needles
+                        // Importantly, this must match the order of the needles generated in derivation_to_build_request
+                        let refscan_needles =
+                            crate::tvix_build::get_refscan_needles(&drv).collect::<Vec<_>>();
+
+                        // For each output, insert a PathInfo.
+                        for ((output, output_needles), drv_output) in build_result
+                            .outputs
+                            .iter()
+                            .zip(build_result.outputs_needles.iter())
+                            .zip(drv.outputs.iter())
+                        {
+                            let output_node = output
+                                .clone()
+                                .try_into_anonymous_node()
+                                .expect("invalid node");
+
+                            let output_needles: Vec<_> = output_needles
+                                .needles
                                 .iter()
-                                .map(|output_name| {
-                                    input_drv
-                                        .outputs
-                                        .get(output_name)
-                                        .expect("missing output_name")
-                                        .path
-                                        .as_ref()
-                                        .expect("missing output path")
-                                        .clone()
+                                // Map each output needle index back to the refscan_needle
+                                .map(|idx| {
+                                    refscan_needles
+                                        .get(*idx as usize)
+                                        .ok_or(std::io::Error::new(
+                                            std::io::ErrorKind::Other,
+                                            "invalid build response",
+                                        ))
                                 })
-                                .collect();
-                            // For each output, ask for the castore node.
-                            // We're in a per-derivation context, so if they're
-                            // not built yet they'll all get built together.
-                            // If they don't need to build, we can however still
-                            // substitute all in parallel (if they don't need to
-                            // be built) - so we turn this into a stream of streams.
-                            // It's up to the builder to deduplicate same build requests.
-                            futures::stream::iter(output_paths.into_iter()).map(
-                                |output_path| async move {
-                                    let node = self
-                                        .store_path_to_node(&output_path, Path::new(""))
-                                        .await?;
-
-                                    if let Some(node) = node {
-                                        Ok(node)
-                                    } else {
-                                        Err(io::Error::other("no node produced"))
-                                    }
-                                },
-                            )
-                        })
-                        .flatten()
-                        .buffer_unordered(10) // TODO: make configurable
-                        .try_collect()
-                        .await?;
-
-                // TODO: check if input sources are sufficiently dealth with,
-                // I think yes, they must be imported into the store by other
-                // operations, so dealt with in the Some(…) match arm
-
-                // synthesize the build request.
-                let build_request = derivation_to_build_request(&drv, input_nodes)?;
-
-                // create a build
-                let build_result = self
-                    .build_service
-                    .as_ref()
-                    .do_build(build_request)
-                    .await
-                    .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-
-                // TODO: refscan?
-
-                // For each output, insert a PathInfo.
-                for output in &build_result.outputs {
-                    let root_node = output.node.as_ref().expect("invalid root node");
-
-                    // calculate the nar representation
-                    let (nar_size, nar_sha256) =
-                        self.path_info_service.calculate_nar(root_node).await?;
-
-                    // assemble the PathInfo to persist
-                    let path_info = PathInfo {
-                        node: Some(tvix_castore::proto::Node {
-                            node: Some(root_node.clone()),
-                        }),
-                        references: vec![], // TODO: refscan
-                        narinfo: Some(tvix_store::proto::NarInfo {
-                            nar_size,
-                            nar_sha256: Bytes::from(nar_sha256.to_vec()),
-                            signatures: vec![],
-                            reference_names: vec![], // TODO: refscan
-                            deriver: Some(tvix_store::proto::StorePath {
-                                name: drv_path
-                                    .name()
-                                    .strip_suffix(".drv")
-                                    .expect("missing .drv suffix")
-                                    .to_string(),
-                                digest: drv_path.digest().to_vec().into(),
-                            }),
-                            ca: drv.fod_digest().map(
-                                |fod_digest| -> tvix_store::proto::nar_info::Ca {
-                                    (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest)))
-                                        .into()
-                                },
-                            ),
-                        }),
-                    };
-
-                    self.path_info_service
-                        .put(path_info)
-                        .await
-                        .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-                }
+                                .collect::<Result<_, std::io::Error>>()?;
+
+                            // calculate the nar representation
+                            let (nar_size, nar_sha256) = self
+                                .nar_calculation_service
+                                .calculate_nar(&output_node)
+                                .await?;
+
+                            // assemble the PathInfo to persist
+                            let path_info = PathInfo {
+                                store_path: drv_output
+                                    .1
+                                    .path
+                                    .as_ref()
+                                    .ok_or(std::io::Error::new(
+                                        std::io::ErrorKind::Other,
+                                        "Tvix bug: missing output store path",
+                                    ))?
+                                    .to_owned(),
+                                node: output_node,
+                                references: output_needles
+                                    .iter()
+                                    .map(|s| (**s).to_owned())
+                                    .collect(),
+                                nar_size,
+                                nar_sha256,
+                                signatures: vec![],
+                                deriver: Some(
+                                    StorePath::from_name_and_digest_fixed(
+                                        drv_path
+                                            .name()
+                                            .strip_suffix(".drv")
+                                            .expect("missing .drv suffix"),
+                                        *drv_path.digest(),
+                                    )
+                                    .expect(
+                                        "Tvix bug: StorePath without .drv suffix must be valid",
+                                    ),
+                                ),
+                                ca: drv.fod_digest().map(|fod_digest| {
+                                    CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest))
+                                }),
+                            };
+
+                            self.path_info_service
+                                .put(path_info)
+                                .await
+                                .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+                        }
 
-                // find the output for the store path requested
-                build_result
-                    .outputs
-                    .into_iter()
-                    .find(|output_node| {
-                        output_node.node.as_ref().expect("invalid node").get_name()
-                            == store_path.to_string().as_bytes()
-                    })
-                    .expect("build didn't produce the store path")
-                    .node
-                    .expect("invalid node")
+                        // find the output for the store path requested
+                        let s = store_path.to_string();
+
+                        build_result
+                            .outputs
+                            .into_iter()
+                            .map(|e| e.try_into_name_and_node().expect("invalid node"))
+                            .find(|(output_name, _output_node)| {
+                                output_name.as_ref() == s.as_bytes()
+                            })
+                            .expect("build didn't produce the store path")
+                            .1
+                    }
+                }
             }
         };
 
         // now with the root_node and sub_path, descend to the node requested.
+        // We convert sub_path to the castore model here.
+        let sub_path = tvix_castore::PathBuf::from_host_path(sub_path, true)?;
+
         directoryservice::descend_to(&self.directory_service, root_node, sub_path)
             .await
             .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))
     }
-
-    /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`],
-    /// passing the blob_service and directory_service that's used.
-    /// The error is mapped to std::io::Error for simplicity.
-    pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node>
-    where
-        S: Stream<Item = DirEntry> + Unpin,
-    {
-        tvix_castore::import::ingest_entries(
-            &self.blob_service,
-            &self.directory_service,
-            entries_stream,
-        )
-        .await
-        .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
-    }
-
-    pub(crate) async fn node_to_path_info(
-        &self,
-        name: &str,
-        path: &Path,
-        ca: CAHash,
-        root_node: Node,
-    ) -> io::Result<(PathInfo, StorePath)> {
-        // Ask the PathInfoService for the NAR size and sha256
-        // We always need it no matter what is the actual hash mode
-        // because the path info construct a narinfo which *always*
-        // require a SHA256 of the NAR representation and the NAR size.
-        let (nar_size, nar_sha256) = self
-            .path_info_service
-            .as_ref()
-            .calculate_nar(&root_node)
-            .await?;
-
-        // Calculate the output path. This might still fail, as some names are illegal.
-        let output_path =
-            nix_compat::store_path::build_ca_path(name, &ca, Vec::<String>::new(), false).map_err(
-                |_| {
-                    std::io::Error::new(
-                        std::io::ErrorKind::InvalidData,
-                        format!("invalid name: {}", name),
-                    )
-                },
-            )?;
-
-        // assemble a new root_node with a name that is derived from the nar hash.
-        let root_node = root_node.rename(output_path.to_string().into_bytes().into());
-        tvix_store::import::log_node(&root_node, path);
-
-        let path_info =
-            tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, Some(ca), root_node);
-
-        Ok((path_info, output_path.to_owned()))
-    }
-
-    pub(crate) async fn register_node_in_path_info_service(
-        &self,
-        name: &str,
-        path: &Path,
-        ca: CAHash,
-        root_node: Node,
-    ) -> io::Result<StorePath> {
-        let (path_info, output_path) = self.node_to_path_info(name, path, ca, root_node).await?;
-        let _path_info = self.path_info_service.as_ref().put(path_info).await?;
-
-        Ok(output_path)
-    }
-
-    /// Transforms a BLAKE-3 digest into a SHA256 digest
-    /// by re-hashing the whole file.
-    pub(crate) async fn blob_to_sha256_hash(&self, blob_digest: B3Digest) -> io::Result<[u8; 32]> {
-        let mut reader = self
-            .blob_service
-            .open_read(&blob_digest)
-            .await?
-            .ok_or_else(|| {
-                io::Error::new(
-                    io::ErrorKind::NotFound,
-                    format!("blob represented by digest: '{}' not found", blob_digest),
-                )
-            })?;
-        // It is fine to use `AsyncIoBridge` here because hashing is not actually I/O.
-        let mut hasher = AsyncIoBridge(Sha256::new());
-
-        tokio::io::copy(&mut reader, &mut hasher).await?;
-        Ok(hasher.0.finalize().into())
-    }
-
-    pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result<bool> {
-        Ok(self
-            .path_info_service
-            .as_ref()
-            .get(*store_path.digest())
-            .await?
-            .is_some())
-    }
-
-    pub async fn fetch_url(
-        &self,
-        url: &str,
-        name: &str,
-        hash: Option<&NixHash>,
-    ) -> Result<StorePath, ErrorKind> {
-        let resp = self
-            .http_client
-            .get(url)
-            .send()
-            .await
-            .map_err(FetcherError::from)?;
-        let mut sha = Sha256::new();
-        let mut data = tokio_util::io::StreamReader::new(
-            resp.bytes_stream()
-                .inspect_ok(|data| {
-                    sha.update(data);
-                })
-                .map_err(|e| {
-                    let e = e.without_url();
-                    warn!(%e, "failed to get response body");
-                    io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
-                }),
-        );
-
-        let mut blob = self.blob_service.open_write().await;
-        let size = tokio::io::copy(&mut data, blob.as_mut()).await?;
-        let blob_digest = blob.close().await?;
-        let got = NixHash::Sha256(sha.finalize().into());
-
-        let hash = CAHash::Flat(if let Some(wanted) = hash {
-            if *wanted != got {
-                return Err(FetcherError::HashMismatch {
-                    url: url.to_owned(),
-                    wanted: wanted.clone(),
-                    got,
-                }
-                .into());
-            }
-            wanted.clone()
-        } else {
-            got
-        });
-
-        let path = build_ca_path(name, &hash, Vec::<String>::new(), false)
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
-        let node = Node::File(FileNode {
-            name: path.to_string().into(),
-            digest: blob_digest.into(),
-            size,
-            executable: false,
-        });
-
-        let (nar_size, nar_sha256) = self
-            .path_info_service
-            .calculate_nar(&node)
-            .await
-            .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
-
-        let path_info = PathInfo {
-            node: Some(tvix_castore::proto::Node {
-                node: Some(node.clone()),
-            }),
-            references: vec![],
-            narinfo: Some(tvix_store::proto::NarInfo {
-                nar_size,
-                nar_sha256: nar_sha256.to_vec().into(),
-                signatures: vec![],
-                reference_names: vec![],
-                deriver: None, /* ? */
-                ca: Some((&hash).into()),
-            }),
-        };
-
-        self.path_info_service
-            .put(path_info)
-            .await
-            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-
-        Ok(path.to_owned())
-    }
 }
 
 impl EvalIO for TvixStoreIO {
     #[instrument(skip(self), ret(level = Level::TRACE), err)]
     fn path_exists(&self, path: &Path) -> io::Result<bool> {
-        if let Ok((store_path, sub_path)) =
-            StorePath::from_absolute_path_full(&path.to_string_lossy())
-        {
+        if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
             if self
                 .tokio_handle
-                .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })?
+                .block_on(self.store_path_to_node(&store_path, sub_path))?
                 .is_some()
             {
                 Ok(true)
@@ -479,35 +408,21 @@ impl EvalIO for TvixStoreIO {
 
     #[instrument(skip(self), err)]
     fn open(&self, path: &Path) -> io::Result<Box<dyn io::Read>> {
-        if let Ok((store_path, sub_path)) =
-            StorePath::from_absolute_path_full(&path.to_string_lossy())
-        {
+        if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
             if let Some(node) = self
                 .tokio_handle
-                .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })?
+                .block_on(async { self.store_path_to_node(&store_path, sub_path).await })?
             {
-                // depending on the node type, treat read_to_string differently
+                // depending on the node type, treat open differently
                 match node {
-                    Node::Directory(_) => {
+                    Node::Directory { .. } => {
                         // This would normally be a io::ErrorKind::IsADirectory (still unstable)
                         Err(io::Error::new(
                             io::ErrorKind::Unsupported,
-                            format!("tried to read directory at {:?} to string", path),
+                            format!("tried to open directory at {:?}", path),
                         ))
                     }
-                    Node::File(file_node) => {
-                        let digest: B3Digest =
-                            file_node.digest.clone().try_into().map_err(|_e| {
-                                error!(
-                                    file_node = ?file_node,
-                                    "invalid digest"
-                                );
-                                io::Error::new(
-                                    io::ErrorKind::InvalidData,
-                                    format!("invalid digest length in file node: {:?}", file_node),
-                                )
-                            })?;
-
+                    Node::File { digest, .. } => {
                         self.tokio_handle.block_on(async {
                             let resp = self.blob_service.as_ref().open_read(&digest).await?;
                             match resp {
@@ -529,9 +444,9 @@ impl EvalIO for TvixStoreIO {
                             }
                         })
                     }
-                    Node::Symlink(_symlink_node) => Err(io::Error::new(
+                    Node::Symlink { .. } => Err(io::Error::new(
                         io::ErrorKind::Unsupported,
-                        "read_to_string for symlinks is unsupported",
+                        "open for symlinks is unsupported",
                     ))?,
                 }
             } else {
@@ -546,37 +461,45 @@ impl EvalIO for TvixStoreIO {
     }
 
     #[instrument(skip(self), ret(level = Level::TRACE), err)]
+    fn file_type(&self, path: &Path) -> io::Result<FileType> {
+        if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
+            if let Some(node) = self
+                .tokio_handle
+                .block_on(async { self.store_path_to_node(&store_path, sub_path).await })?
+            {
+                match node {
+                    Node::Directory { .. } => Ok(FileType::Directory),
+                    Node::File { .. } => Ok(FileType::Regular),
+                    Node::Symlink { .. } => Ok(FileType::Symlink),
+                }
+            } else {
+                self.std_io.file_type(path)
+            }
+        } else {
+            self.std_io.file_type(path)
+        }
+    }
+
+    #[instrument(skip(self), ret(level = Level::TRACE), err)]
     fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> {
-        if let Ok((store_path, sub_path)) =
-            StorePath::from_absolute_path_full(&path.to_string_lossy())
-        {
+        if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(path) {
             if let Some(node) = self
                 .tokio_handle
-                .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })?
+                .block_on(async { self.store_path_to_node(&store_path, sub_path).await })?
             {
                 match node {
-                    Node::Directory(directory_node) => {
+                    Node::Directory { digest, .. } => {
                         // fetch the Directory itself.
-                        let digest: B3Digest =
-                            directory_node.digest.clone().try_into().map_err(|_e| {
-                                io::Error::new(
-                                    io::ErrorKind::InvalidData,
-                                    format!(
-                                        "invalid digest length in directory node: {:?}",
-                                        directory_node
-                                    ),
-                                )
-                            })?;
-
-                        if let Some(directory) = self.tokio_handle.block_on(async {
-                            self.directory_service.as_ref().get(&digest).await
+                        if let Some(directory) = self.tokio_handle.block_on({
+                            let digest = digest.clone();
+                            async move { self.directory_service.as_ref().get(&digest).await }
                         })? {
                             let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new();
-                            for node in directory.nodes() {
+                            for (name, node) in directory.into_nodes() {
                                 children.push(match node {
-                                    Node::Directory(e) => (e.name, FileType::Directory),
-                                    Node::File(e) => (e.name, FileType::Regular),
-                                    Node::Symlink(e) => (e.name, FileType::Symlink),
+                                    Node::Directory { .. } => (name.into(), FileType::Directory),
+                                    Node::File { .. } => (name.clone().into(), FileType::Regular),
+                                    Node::Symlink { .. } => (name.into(), FileType::Symlink),
                                 })
                             }
                             Ok(children)
@@ -593,14 +516,14 @@ impl EvalIO for TvixStoreIO {
                             ))?
                         }
                     }
-                    Node::File(_file_node) => {
+                    Node::File { .. } => {
                         // This would normally be a io::ErrorKind::NotADirectory (still unstable)
                         Err(io::Error::new(
                             io::ErrorKind::Unsupported,
                             "tried to readdir path {:?}, which is a file",
                         ))?
                     }
-                    Node::Symlink(_symlink_node) => Err(io::Error::new(
+                    Node::Symlink { .. } => Err(io::Error::new(
                         io::ErrorKind::Unsupported,
                         "read_dir for symlinks is unsupported",
                     ))?,
@@ -615,18 +538,19 @@ impl EvalIO for TvixStoreIO {
 
     #[instrument(skip(self), ret(level = Level::TRACE), err)]
     fn import_path(&self, path: &Path) -> io::Result<PathBuf> {
-        let output_path = self.tokio_handle.block_on(async {
+        let path_info = self.tokio_handle.block_on({
             tvix_store::import::import_path_as_nar_ca(
                 path,
                 tvix_store::import::path_to_name(path)?,
                 &self.blob_service,
                 &self.directory_service,
                 &self.path_info_service,
+                &self.nar_calculation_service,
             )
-            .await
         })?;
 
-        Ok(output_path.to_absolute_path().into())
+        // From the returned PathInfo, extract the store path and return it.
+        Ok(path_info.store_path.to_absolute_path().into())
     }
 
     #[instrument(skip(self), ret(level = Level::TRACE))]
@@ -640,14 +564,11 @@ mod tests {
     use std::{path::Path, rc::Rc, sync::Arc};
 
     use bstr::ByteSlice;
+    use clap::Parser;
     use tempfile::TempDir;
     use tvix_build::buildservice::DummyBuildService;
-    use tvix_castore::{
-        blobservice::{BlobService, MemoryBlobService},
-        directoryservice::{DirectoryService, MemoryDirectoryService},
-    };
     use tvix_eval::{EvalIO, EvaluationResult};
-    use tvix_store::pathinfoservice::MemoryPathInfoService;
+    use tvix_store::utils::{construct_services, ServiceUrlsMemory};
 
     use super::TvixStoreIO;
     use crate::builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins};
@@ -656,28 +577,30 @@ mod tests {
     /// Takes care of setting up the evaluator so it knows about the
     // `derivation` builtin.
     fn eval(str: &str) -> EvaluationResult {
-        let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
-        let directory_service =
-            Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>;
-        let path_info_service = Arc::new(MemoryPathInfoService::new(
-            blob_service.clone(),
-            directory_service.clone(),
-        ));
-
-        let runtime = tokio::runtime::Runtime::new().unwrap();
+        let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
+        let (blob_service, directory_service, path_info_service, nar_calculation_service) =
+            tokio_runtime
+                .block_on(async {
+                    construct_services(ServiceUrlsMemory::parse_from(std::iter::empty::<&str>()))
+                        .await
+                })
+                .unwrap();
 
         let io = Rc::new(TvixStoreIO::new(
-            blob_service.clone(),
-            directory_service.clone(),
+            blob_service,
+            directory_service,
             path_info_service,
+            nar_calculation_service.into(),
             Arc::<DummyBuildService>::default(),
-            runtime.handle().clone(),
+            tokio_runtime.handle().clone(),
         ));
-        let mut eval = tvix_eval::Evaluation::new(io.clone() as Rc<dyn EvalIO>, true);
 
-        add_derivation_builtins(&mut eval, io.clone());
-        add_fetcher_builtins(&mut eval, io.clone());
-        add_import_builtins(&mut eval, io);
+        let mut eval_builder =
+            tvix_eval::Evaluation::builder(io.clone() as Rc<dyn EvalIO>).enable_import();
+        eval_builder = add_derivation_builtins(eval_builder, Rc::clone(&io));
+        eval_builder = add_fetcher_builtins(eval_builder, Rc::clone(&io));
+        eval_builder = add_import_builtins(eval_builder, io);
+        let eval = eval_builder.build();
 
         // run the evaluation itself.
         eval.evaluate(str, None)