about summary refs log tree commit diff
path: root/tvix/glue/src/tvix_store_io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r--tvix/glue/src/tvix_store_io.rs105
1 files changed, 66 insertions, 39 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 7b8ef3ff0a..4e5488067f 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -1,11 +1,9 @@
 //! This module provides an implementation of EvalIO talking to tvix-store.
-
 use bytes::Bytes;
 use futures::{StreamExt, TryStreamExt};
 use nix_compat::nixhash::NixHash;
 use nix_compat::store_path::StorePathRef;
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
-use sha2::{Digest, Sha256};
 use std::{
     cell::RefCell,
     collections::BTreeSet,
@@ -14,12 +12,12 @@ use std::{
     sync::Arc,
 };
 use tokio_util::io::SyncIoBridge;
-use tracing::{error, info, instrument, warn, Level};
+use tracing::{error, instrument, warn, Level, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 use tvix_build::buildservice::BuildService;
 use tvix_castore::proto::node::Node;
 use tvix_eval::{EvalIO, FileType, StdIO};
 use tvix_store::nar::NarCalculationService;
-use tvix_store::utils::AsyncIoBridge;
 
 use tvix_castore::{
     blobservice::BlobService,
@@ -106,7 +104,7 @@ impl TvixStoreIO {
     ///
     /// In case there is no PathInfo yet, this means we need to build it
     /// (which currently is stubbed out still).
-    #[instrument(skip(self, store_path), fields(store_path=%store_path), ret(level = Level::TRACE), err)]
+    #[instrument(skip(self, store_path), fields(store_path=%store_path, indicatif.pb_show=1), ret(level = Level::TRACE), err)]
     async fn store_path_to_node(
         &self,
         store_path: &StorePath,
@@ -138,7 +136,8 @@ impl TvixStoreIO {
                 // The store path doesn't exist yet, so we need to fetch or build it.
                 // We check for fetches first, as we might have both native
                 // fetchers and FODs in KnownPaths, and prefer the former.
-
+                // This will also find [Fetch] synthesized from
+                // `builtin:fetchurl` Derivations.
                 let maybe_fetch = self
                     .known_paths
                     .borrow()
@@ -146,7 +145,6 @@ impl TvixStoreIO {
 
                 match maybe_fetch {
                     Some((name, fetch)) => {
-                        info!(?fetch, "triggering lazy fetch");
                         let (sp, root_node) = self
                             .fetcher
                             .ingest_and_persist(&name, fetch)
@@ -156,9 +154,9 @@ impl TvixStoreIO {
                         })?;
 
                         debug_assert_eq!(
-                            sp.to_string(),
-                            store_path.to_string(),
-                            "store path returned from fetcher should match"
+                            sp.to_absolute_path(),
+                            store_path.as_ref().to_absolute_path(),
+                            "store path returned from fetcher must match store path we have in fetchers"
                         );
 
                         root_node
@@ -179,14 +177,16 @@ impl TvixStoreIO {
                                 }
                             }
                         };
-
-                        warn!("triggering build");
+                        let span = Span::current();
+                        span.pb_start();
+                        span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
+                        span.pb_set_message(&format!("⏳Waiting for inputs {}", &store_path));
 
                         // derivation_to_build_request needs castore nodes for all inputs.
                         // Provide them, which means, here is where we recursively build
                         // all dependencies.
                         #[allow(clippy::mutable_key_type)]
-                        let input_nodes: BTreeSet<Node> =
+                        let mut input_nodes: BTreeSet<Node> =
                             futures::stream::iter(drv.input_derivations.iter())
                                 .map(|(input_drv_path, output_names)| {
                                     // look up the derivation object
@@ -236,9 +236,34 @@ impl TvixStoreIO {
                                     )
                                 })
                                 .flatten()
-                                .buffer_unordered(10) // TODO: make configurable
+                                .buffer_unordered(
+                                    1, /* TODO: increase again once we prevent redundant fetches */
+                                ) // TODO: make configurable
+                                .try_collect()
+                                .await?;
+
+                        // add input sources
+                        // FUTUREWORK: merge these who things together
+                        #[allow(clippy::mutable_key_type)]
+                        let input_nodes_input_sources: BTreeSet<Node> =
+                            futures::stream::iter(drv.input_sources.iter())
+                                .then(|input_source| {
+                                    Box::pin(async {
+                                        let node = self
+                                            .store_path_to_node(input_source, Path::new(""))
+                                            .await?;
+                                        if let Some(node) = node {
+                                            Ok(node)
+                                        } else {
+                                            Err(io::Error::other("no node produced"))
+                                        }
+                                    })
+                                })
                                 .try_collect()
                                 .await?;
+                        input_nodes.extend(input_nodes_input_sources);
+
+                        span.pb_set_message(&format!("🔨Building {}", &store_path));
 
                         // TODO: check if input sources are sufficiently dealth with,
                         // I think yes, they must be imported into the store by other
@@ -255,7 +280,7 @@ impl TvixStoreIO {
                             .await
                             .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
 
-                        // TODO: refscan?
+                        // TODO: refscan
 
                         // For each output, insert a PathInfo.
                         for output in &build_result.outputs {
@@ -332,7 +357,7 @@ impl TvixStoreIO {
         &self,
         name: &str,
         path: &Path,
-        ca: CAHash,
+        ca: &CAHash,
         root_node: Node,
     ) -> io::Result<(PathInfo, NixHash, StorePath)> {
         // Ask the PathInfoService for the NAR size and sha256
@@ -347,7 +372,7 @@ impl TvixStoreIO {
 
         // Calculate the output path. This might still fail, as some names are illegal.
         let output_path =
-            nix_compat::store_path::build_ca_path(name, &ca, Vec::<String>::new(), false).map_err(
+            nix_compat::store_path::build_ca_path(name, ca, Vec::<String>::new(), false).map_err(
                 |_| {
                     std::io::Error::new(
                         std::io::ErrorKind::InvalidData,
@@ -374,7 +399,7 @@ impl TvixStoreIO {
         &self,
         name: &str,
         path: &Path,
-        ca: CAHash,
+        ca: &CAHash,
         root_node: Node,
     ) -> io::Result<StorePath> {
         let (path_info, _, output_path) = self.node_to_path_info(name, path, ca, root_node).await?;
@@ -383,26 +408,6 @@ impl TvixStoreIO {
         Ok(output_path)
     }
 
-    /// Transforms a BLAKE-3 digest into a SHA256 digest
-    /// by re-hashing the whole file.
-    pub(crate) async fn blob_to_sha256_hash(&self, blob_digest: B3Digest) -> io::Result<[u8; 32]> {
-        let mut reader = self
-            .blob_service
-            .open_read(&blob_digest)
-            .await?
-            .ok_or_else(|| {
-                io::Error::new(
-                    io::ErrorKind::NotFound,
-                    format!("blob represented by digest: '{}' not found", blob_digest),
-                )
-            })?;
-        // It is fine to use `AsyncIoBridge` here because hashing is not actually I/O.
-        let mut hasher = AsyncIoBridge(Sha256::new());
-
-        tokio::io::copy(&mut reader, &mut hasher).await?;
-        Ok(hasher.0.finalize().into())
-    }
-
     pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result<bool> {
         Ok(self
             .path_info_service
@@ -421,7 +426,7 @@ impl EvalIO for TvixStoreIO {
         {
             if self
                 .tokio_handle
-                .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })?
+                .block_on(self.store_path_to_node(&store_path, &sub_path))?
                 .is_some()
             {
                 Ok(true)
@@ -505,6 +510,28 @@ impl EvalIO for TvixStoreIO {
     }
 
     #[instrument(skip(self), ret(level = Level::TRACE), err)]
+    fn file_type(&self, path: &Path) -> io::Result<FileType> {
+        if let Ok((store_path, sub_path)) =
+            StorePath::from_absolute_path_full(&path.to_string_lossy())
+        {
+            if let Some(node) = self
+                .tokio_handle
+                .block_on(async { self.store_path_to_node(&store_path, &sub_path).await })?
+            {
+                match node {
+                    Node::Directory(_) => Ok(FileType::Directory),
+                    Node::File(_) => Ok(FileType::Regular),
+                    Node::Symlink(_) => Ok(FileType::Symlink),
+                }
+            } else {
+                self.std_io.file_type(path)
+            }
+        } else {
+            self.std_io.file_type(path)
+        }
+    }
+
+    #[instrument(skip(self), ret(level = Level::TRACE), err)]
     fn read_dir(&self, path: &Path) -> io::Result<Vec<(bytes::Bytes, FileType)>> {
         if let Ok((store_path, sub_path)) =
             StorePath::from_absolute_path_full(&path.to_string_lossy())