about summary refs log tree commit diff
path: root/tvix/glue/src/tvix_store_io.rs
diff options
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
1 files changed, 236 insertions, 283 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 10a5902785..7b8ef3ff0a 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -1,15 +1,11 @@
 //! This module provides an implementation of EvalIO talking to tvix-store.
-use async_recursion::async_recursion;
 use bytes::Bytes;
-use futures::Stream;
 use futures::{StreamExt, TryStreamExt};
 use nix_compat::nixhash::NixHash;
-use nix_compat::store_path::{build_ca_path, StorePathRef};
+use nix_compat::store_path::StorePathRef;
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
 use sha2::{Digest, Sha256};
-use std::marker::Unpin;
-use std::rc::Rc;
 use std::{
@@ -18,21 +14,22 @@ use std::{
 use tokio_util::io::SyncIoBridge;
-use tracing::{error, instrument, warn, Level};
+use tracing::{error, info, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
-use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
+use tvix_castore::proto::node::Node;
+use tvix_eval::{EvalIO, FileType, StdIO};
+use tvix_store::nar::NarCalculationService;
 use tvix_store::utils::AsyncIoBridge;
-use walkdir::DirEntry;
 use tvix_castore::{
     directoryservice::{self, DirectoryService},
-    proto::{node::Node, FileNode, NamedNode},
+    proto::NamedNode,
 use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
-use crate::builtins::FetcherError;
+use crate::fetchers::Fetcher;
 use crate::known_paths::KnownPaths;
 use crate::tvix_build::derivation_to_build_request;
@@ -52,15 +49,26 @@ use crate::tvix_build::derivation_to_build_request;
 /// implementation of "Tvix Store IO" which does not necessarily bring the concept of blob service,
 /// directory service or path info service.
 pub struct TvixStoreIO {
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-    // This is public so builtins can put PathInfos directly.
+    // This is public so helper functions can interact with the stores directly.
+    pub(crate) blob_service: Arc<dyn BlobService>,
+    pub(crate) directory_service: Arc<dyn DirectoryService>,
     pub(crate) path_info_service: Arc<dyn PathInfoService>,
+    pub(crate) nar_calculation_service: Arc<dyn NarCalculationService>,
     std_io: StdIO,
     build_service: Arc<dyn BuildService>,
     pub(crate) tokio_handle: tokio::runtime::Handle,
-    http_client: reqwest::Client,
+    #[allow(clippy::type_complexity)]
+    pub(crate) fetcher: Fetcher<
+        Arc<dyn BlobService>,
+        Arc<dyn DirectoryService>,
+        Arc<dyn PathInfoService>,
+        Arc<dyn NarCalculationService>,
+    >,
+    // Paths known how to produce, by building or fetching.
     pub(crate) known_paths: RefCell<KnownPaths>,
@@ -69,17 +77,24 @@ impl TvixStoreIO {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
         path_info_service: Arc<dyn PathInfoService>,
+        nar_calculation_service: Arc<dyn NarCalculationService>,
         build_service: Arc<dyn BuildService>,
         tokio_handle: tokio::runtime::Handle,
     ) -> Self {
         Self {
-            blob_service,
-            directory_service,
-            path_info_service,
+            blob_service: blob_service.clone(),
+            directory_service: directory_service.clone(),
+            path_info_service: path_info_service.clone(),
+            nar_calculation_service: nar_calculation_service.clone(),
             std_io: StdIO {},
-            http_client: reqwest::Client::new(),
+            fetcher: Fetcher::new(
+                blob_service,
+                directory_service,
+                path_info_service,
+                nar_calculation_service,
+            ),
             known_paths: Default::default(),
@@ -91,7 +106,6 @@ impl TvixStoreIO {
     /// In case there is no PathInfo yet, this means we need to build it
     /// (which currently is stubbed out still).
-    #[async_recursion(?Send)]
     #[instrument(skip(self, store_path), fields(store_path=%store_path), ret(level = Level::TRACE), err)]
     async fn store_path_to_node(
@@ -121,189 +135,212 @@ impl TvixStoreIO {
             // it for things like <nixpkgs> pointing to a store path.
             // In the future, these things will (need to) have PathInfo.
             None => {
-                // The store path doesn't exist yet, so we need to build it.
-                warn!("triggering build");
-                // Look up the derivation for this output path.
-                let (drv_path, drv) = {
-                    let known_paths = self.known_paths.borrow();
-                    match known_paths.get_drv_path_for_output_path(store_path) {
-                        Some(drv_path) => (
-                            drv_path.to_owned(),
-                            known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(),
-                        ),
-                        None => {
-                            warn!(store_path=%store_path, "no drv found");
-                            // let StdIO take over
-                            return Ok(None);
-                        }
+                // The store path doesn't exist yet, so we need to fetch or build it.
+                // We check for fetches first, as we might have both native
+                // fetchers and FODs in KnownPaths, and prefer the former.
+                let maybe_fetch = self
+                    .known_paths
+                    .borrow()
+                    .get_fetch_for_output_path(store_path);
+                match maybe_fetch {
+                    Some((name, fetch)) => {
+                        info!(?fetch, "triggering lazy fetch");
+                        let (sp, root_node) = self
+                            .fetcher
+                            .ingest_and_persist(&name, fetch)
+                            .await
+                            .map_err(|e| {
+                            std::io::Error::new(std::io::ErrorKind::InvalidData, e)
+                        })?;
+                        debug_assert_eq!(
+                            sp.to_string(),
+                            store_path.to_string(),
+                            "store path returned from fetcher should match"
+                        );
+                        root_node
-                };
-                // derivation_to_build_request needs castore nodes for all inputs.
-                // Provide them, which means, here is where we recursively build
-                // all dependencies.
-                #[allow(clippy::mutable_key_type)]
-                let input_nodes: BTreeSet<Node> =
-                    futures::stream::iter(drv.input_derivations.iter())
-                        .map(|(input_drv_path, output_names)| {
-                            // look up the derivation object
-                            let input_drv = {
-                                let known_paths = self.known_paths.borrow();
-                                known_paths
-                                    .get_drv_by_drvpath(input_drv_path)
-                                    .unwrap_or_else(|| panic!("{} not found", input_drv_path))
-                                    .to_owned()
+                    None => {
+                        // Look up the derivation for this output path.
+                        let (drv_path, drv) = {
+                            let known_paths = self.known_paths.borrow();
+                            match known_paths.get_drv_path_for_output_path(store_path) {
+                                Some(drv_path) => (
+                                    drv_path.to_owned(),
+                                    known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(),
+                                ),
+                                None => {
+                                    warn!(store_path=%store_path, "no drv found");
+                                    // let StdIO take over
+                                    return Ok(None);
+                                }
+                            }
+                        };
+                        warn!("triggering build");
+                        // derivation_to_build_request needs castore nodes for all inputs.
+                        // Provide them, which means, here is where we recursively build
+                        // all dependencies.
+                        #[allow(clippy::mutable_key_type)]
+                        let input_nodes: BTreeSet<Node> =
+                            futures::stream::iter(drv.input_derivations.iter())
+                                .map(|(input_drv_path, output_names)| {
+                                    // look up the derivation object
+                                    let input_drv = {
+                                        let known_paths = self.known_paths.borrow();
+                                        known_paths
+                                            .get_drv_by_drvpath(input_drv_path)
+                                            .unwrap_or_else(|| {
+                                                panic!("{} not found", input_drv_path)
+                                            })
+                                            .to_owned()
+                                    };
+                                    // convert output names to actual paths
+                                    let output_paths: Vec<StorePath> = output_names
+                                        .iter()
+                                        .map(|output_name| {
+                                            input_drv
+                                                .outputs
+                                                .get(output_name)
+                                                .expect("missing output_name")
+                                                .path
+                                                .as_ref()
+                                                .expect("missing output path")
+                                                .clone()
+                                        })
+                                        .collect();
+                                    // For each output, ask for the castore node.
+                                    // We're in a per-derivation context, so if they're
+                                    // not built yet they'll all get built together.
+                                    // If they don't need to build, we can however still
+                                    // substitute all in parallel (if they don't need to
+                                    // be built) - so we turn this into a stream of streams.
+                                    // It's up to the builder to deduplicate same build requests.
+                                    futures::stream::iter(output_paths.into_iter()).map(
+                                        |output_path| async move {
+                                            let node = self
+                                                .store_path_to_node(&output_path, Path::new(""))
+                                                .await?;
+                                            if let Some(node) = node {
+                                                Ok(node)
+                                            } else {
+                                                Err(io::Error::other("no node produced"))
+                                            }
+                                        },
+                                    )
+                                })
+                                .flatten()
+                                .buffer_unordered(10) // TODO: make configurable
+                                .try_collect()
+                                .await?;
+                        // TODO: check if input sources are sufficiently dealth with,
+                        // I think yes, they must be imported into the store by other
+                        // operations, so dealt with in the Some(…) match arm
+                        // synthesize the build request.
+                        let build_request = derivation_to_build_request(&drv, input_nodes)?;
+                        // create a build
+                        let build_result = self
+                            .build_service
+                            .as_ref()
+                            .do_build(build_request)
+                            .await
+                            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+                        // TODO: refscan?
+                        // For each output, insert a PathInfo.
+                        for output in &build_result.outputs {
+                            let root_node = output.node.as_ref().expect("invalid root node");
+                            // calculate the nar representation
+                            let (nar_size, nar_sha256) = self
+                                .nar_calculation_service
+                                .calculate_nar(root_node)
+                                .await?;
+                            // assemble the PathInfo to persist
+                            let path_info = PathInfo {
+                                node: Some(tvix_castore::proto::Node {
+                                    node: Some(root_node.clone()),
+                                }),
+                                references: vec![], // TODO: refscan
+                                narinfo: Some(tvix_store::proto::NarInfo {
+                                    nar_size,
+                                    nar_sha256: Bytes::from(nar_sha256.to_vec()),
+                                    signatures: vec![],
+                                    reference_names: vec![], // TODO: refscan
+                                    deriver: Some(tvix_store::proto::StorePath {
+                                        name: drv_path
+                                            .name()
+                                            .strip_suffix(".drv")
+                                            .expect("missing .drv suffix")
+                                            .to_string(),
+                                        digest: drv_path.digest().to_vec().into(),
+                                    }),
+                                    ca: drv.fod_digest().map(
+                                        |fod_digest| -> tvix_store::proto::nar_info::Ca {
+                                            (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(
+                                                fod_digest,
+                                            )))
+                                                .into()
+                                        },
+                                    ),
+                                }),
-                            // convert output names to actual paths
-                            let output_paths: Vec<StorePath> = output_names
-                                .iter()
-                                .map(|output_name| {
-                                    input_drv
-                                        .outputs
-                                        .get(output_name)
-                                        .expect("missing output_name")
-                                        .path
-                                        .as_ref()
-                                        .expect("missing output path")
-                                        .clone()
-                                })
-                                .collect();
-                            // For each output, ask for the castore node.
-                            // We're in a per-derivation context, so if they're
-                            // not built yet they'll all get built together.
-                            // If they don't need to build, we can however still
-                            // substitute all in parallel (if they don't need to
-                            // be built) - so we turn this into a stream of streams.
-                            // It's up to the builder to deduplicate same build requests.
-                            futures::stream::iter(output_paths.into_iter()).map(
-                                |output_path| async move {
-                                    let node = self
-                                        .store_path_to_node(&output_path, Path::new(""))
-                                        .await?;
-                                    if let Some(node) = node {
-                                        Ok(node)
-                                    } else {
-                                        Err(io::Error::other("no node produced"))
-                                    }
-                                },
-                            )
-                        })
-                        .flatten()
-                        .buffer_unordered(10) // TODO: make configurable
-                        .try_collect()
-                        .await?;
-                // TODO: check if input sources are sufficiently dealth with,
-                // I think yes, they must be imported into the store by other
-                // operations, so dealt with in the Some(…) match arm
-                // synthesize the build request.
-                let build_request = derivation_to_build_request(&drv, input_nodes)?;
-                // create a build
-                let build_result = self
-                    .build_service
-                    .as_ref()
-                    .do_build(build_request)
-                    .await
-                    .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-                // TODO: refscan?
-                // For each output, insert a PathInfo.
-                for output in &build_result.outputs {
-                    let root_node = output.node.as_ref().expect("invalid root node");
-                    // calculate the nar representation
-                    let (nar_size, nar_sha256) =
-                        self.path_info_service.calculate_nar(root_node).await?;
-                    // assemble the PathInfo to persist
-                    let path_info = PathInfo {
-                        node: Some(tvix_castore::proto::Node {
-                            node: Some(root_node.clone()),
-                        }),
-                        references: vec![], // TODO: refscan
-                        narinfo: Some(tvix_store::proto::NarInfo {
-                            nar_size,
-                            nar_sha256: Bytes::from(nar_sha256.to_vec()),
-                            signatures: vec![],
-                            reference_names: vec![], // TODO: refscan
-                            deriver: Some(tvix_store::proto::StorePath {
-                                name: drv_path
-                                    .name()
-                                    .strip_suffix(".drv")
-                                    .expect("missing .drv suffix")
-                                    .to_string(),
-                                digest: drv_path.digest().to_vec().into(),
-                            }),
-                            ca: drv.fod_digest().map(
-                                |fod_digest| -> tvix_store::proto::nar_info::Ca {
-                                    (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest)))
-                                        .into()
-                                },
-                            ),
-                        }),
-                    };
-                    self.path_info_service
-                        .put(path_info)
-                        .await
-                        .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-                }
+                            self.path_info_service
+                                .put(path_info)
+                                .await
+                                .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+                        }
-                // find the output for the store path requested
-                build_result
-                    .outputs
-                    .into_iter()
-                    .find(|output_node| {
-                        output_node.node.as_ref().expect("invalid node").get_name()
-                            == store_path.to_string().as_bytes()
-                    })
-                    .expect("build didn't produce the store path")
-                    .node
-                    .expect("invalid node")
+                        // find the output for the store path requested
+                        build_result
+                            .outputs
+                            .into_iter()
+                            .find(|output_node| {
+                                output_node.node.as_ref().expect("invalid node").get_name()
+                                    == store_path.to_string().as_bytes()
+                            })
+                            .expect("build didn't produce the store path")
+                            .node
+                            .expect("invalid node")
+                    }
+                }
         // now with the root_node and sub_path, descend to the node requested.
+        // We convert sub_path to the castore model here.
+        let sub_path = tvix_castore::PathBuf::from_host_path(sub_path, true)?;
         directoryservice::descend_to(&self.directory_service, root_node, sub_path)
             .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))
-    /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`],
-    /// passing the blob_service and directory_service that's used.
-    /// The error is mapped to std::io::Error for simplicity.
-    pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node>
-    where
-        S: Stream<Item = DirEntry> + Unpin,
-    {
-        tvix_castore::import::ingest_entries(
-            &self.blob_service,
-            &self.directory_service,
-            entries_stream,
-        )
-        .await
-        .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
-    }
     pub(crate) async fn node_to_path_info(
         name: &str,
         path: &Path,
         ca: CAHash,
         root_node: Node,
-    ) -> io::Result<(PathInfo, StorePath)> {
+    ) -> io::Result<(PathInfo, NixHash, StorePath)> {
         // Ask the PathInfoService for the NAR size and sha256
         // We always need it no matter what is the actual hash mode
         // because the path info construct a narinfo which *always*
         // require a SHA256 of the NAR representation and the NAR size.
         let (nar_size, nar_sha256) = self
-            .path_info_service
+            .nar_calculation_service
@@ -326,7 +363,11 @@ impl TvixStoreIO {
         let path_info =
             tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, Some(ca), root_node);
-        Ok((path_info, output_path.to_owned()))
+        Ok((
+            path_info,
+            NixHash::Sha256(nar_sha256),
+            output_path.to_owned(),
+        ))
     pub(crate) async fn register_node_in_path_info_service(
@@ -336,7 +377,7 @@ impl TvixStoreIO {
         ca: CAHash,
         root_node: Node,
     ) -> io::Result<StorePath> {
-        let (path_info, output_path) = self.node_to_path_info(name, path, ca, root_node).await?;
+        let (path_info, _, output_path) = self.node_to_path_info(name, path, ca, root_node).await?;
         let _path_info = self.path_info_service.as_ref().put(path_info).await?;
@@ -370,88 +411,6 @@ impl TvixStoreIO {
-    pub async fn fetch_url(
-        &self,
-        url: &str,
-        name: &str,
-        hash: Option<&NixHash>,
-    ) -> Result<StorePath, ErrorKind> {
-        let resp = self
-            .http_client
-            .get(url)
-            .send()
-            .await
-            .map_err(FetcherError::from)?;
-        let mut sha = Sha256::new();
-        let mut data = tokio_util::io::StreamReader::new(
-            resp.bytes_stream()
-                .inspect_ok(|data| {
-                    sha.update(data);
-                })
-                .map_err(|e| {
-                    let e = e.without_url();
-                    warn!(%e, "failed to get response body");
-                    io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
-                }),
-        );
-        let mut blob = self.blob_service.open_write().await;
-        let size = tokio::io::copy(&mut data, blob.as_mut()).await?;
-        let blob_digest = blob.close().await?;
-        let got = NixHash::Sha256(sha.finalize().into());
-        let hash = CAHash::Flat(if let Some(wanted) = hash {
-            if *wanted != got {
-                return Err(FetcherError::HashMismatch {
-                    url: url.to_owned(),
-                    wanted: wanted.clone(),
-                    got,
-                }
-                .into());
-            }
-            wanted.clone()
-        } else {
-            got
-        });
-        let path = build_ca_path(name, &hash, Vec::<String>::new(), false)
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
-        let node = Node::File(FileNode {
-            name: path.to_string().into(),
-            digest: blob_digest.into(),
-            size,
-            executable: false,
-        });
-        let (nar_size, nar_sha256) = self
-            .path_info_service
-            .calculate_nar(&node)
-            .await
-            .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
-        let path_info = PathInfo {
-            node: Some(tvix_castore::proto::Node {
-                node: Some(node.clone()),
-            }),
-            references: vec![],
-            narinfo: Some(tvix_store::proto::NarInfo {
-                nar_size,
-                nar_sha256: nar_sha256.to_vec().into(),
-                signatures: vec![],
-                reference_names: vec![],
-                deriver: None, /* ? */
-                ca: Some((&hash).into()),
-            }),
-        };
-        self.path_info_service
-            .put(path_info)
-            .await
-            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-        Ok(path.to_owned())
-    }
 impl EvalIO for TvixStoreIO {
@@ -486,13 +445,13 @@ impl EvalIO for TvixStoreIO {
                 .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })?
-                // depending on the node type, treat read_to_string differently
+                // depending on the node type, treat open differently
                 match node {
                     Node::Directory(_) => {
                         // This would normally be a io::ErrorKind::IsADirectory (still unstable)
-                            format!("tried to read directory at {:?} to string", path),
+                            format!("tried to open directory at {:?}", path),
                     Node::File(file_node) => {
@@ -531,7 +490,7 @@ impl EvalIO for TvixStoreIO {
                     Node::Symlink(_symlink_node) => Err(io::Error::new(
-                        "read_to_string for symlinks is unsupported",
+                        "open for symlinks is unsupported",
             } else {
@@ -622,6 +581,7 @@ impl EvalIO for TvixStoreIO {
+                &self.nar_calculation_service,
@@ -642,12 +602,8 @@ mod tests {
     use bstr::ByteSlice;
     use tempfile::TempDir;
     use tvix_build::buildservice::DummyBuildService;
-    use tvix_castore::{
-        blobservice::{BlobService, MemoryBlobService},
-        directoryservice::{DirectoryService, MemoryDirectoryService},
-    };
     use tvix_eval::{EvalIO, EvaluationResult};
-    use tvix_store::pathinfoservice::MemoryPathInfoService;
+    use tvix_store::utils::construct_services;
     use super::TvixStoreIO;
     use crate::builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins};
@@ -656,22 +612,19 @@ mod tests {
     /// Takes care of setting up the evaluator so it knows about the
     // `derivation` builtin.
     fn eval(str: &str) -> EvaluationResult {
-        let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
-        let directory_service =
-            Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>;
-        let path_info_service = Arc::new(MemoryPathInfoService::new(
-            blob_service.clone(),
-            directory_service.clone(),
-        ));
-        let runtime = tokio::runtime::Runtime::new().unwrap();
+        let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
+        let (blob_service, directory_service, path_info_service, nar_calculation_service) =
+            tokio_runtime
+                .block_on(async { construct_services("memory://", "memory://", "memory://").await })
+                .unwrap();
         let io = Rc::new(TvixStoreIO::new(
-            blob_service.clone(),
-            directory_service.clone(),
-            path_info_service,
+            blob_service,
+            directory_service,
+            path_info_service.into(),
+            nar_calculation_service.into(),
-            runtime.handle().clone(),
+            tokio_runtime.handle().clone(),
         let mut eval = tvix_eval::Evaluation::new(io.clone() as Rc<dyn EvalIO>, true);