about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/glue/src/builtins/fetchers.rs3
-rw-r--r--tvix/glue/src/fetchers/mod.rs45
-rw-r--r--tvix/glue/src/tvix_store_io.rs7
3 files changed, 41 insertions, 14 deletions
diff --git a/tvix/glue/src/builtins/fetchers.rs b/tvix/glue/src/builtins/fetchers.rs
index 08a882586e9b..d95287aeca3a 100644
--- a/tvix/glue/src/builtins/fetchers.rs
+++ b/tvix/glue/src/builtins/fetchers.rs
@@ -7,7 +7,6 @@ use crate::{
 };
 use nix_compat::nixhash;
 use std::rc::Rc;
-use tracing::info;
 use tvix_eval::builtin_macros::builtins;
 use tvix_eval::generators::Gen;
 use tvix_eval::generators::GenCo;
@@ -112,8 +111,6 @@ pub(crate) mod fetcher_builtins {
             }
             None => {
                 // If we don't have enough info, do the fetch now.
-                info!(?fetch, "triggering required fetch");
-
                 let (store_path, _root_node) = state
                     .tokio_handle
                     .block_on(async { state.fetcher.ingest_and_persist(&name, fetch).await })
diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs
index 376a4cca634c..aee79b2e7504 100644
--- a/tvix/glue/src/fetchers/mod.rs
+++ b/tvix/glue/src/fetchers/mod.rs
@@ -8,7 +8,8 @@ use sha1::Sha1;
 use sha2::{digest::Output, Digest, Sha256, Sha512};
 use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
 use tokio_util::io::{InspectReader, InspectWriter};
-use tracing::warn;
+use tracing::{instrument, warn, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
@@ -196,10 +197,18 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
 
     /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it.
     /// In case the URI uses the file:// scheme, use tokio::fs to open it.
+    #[instrument(skip_all, fields(url, indicatif.pb_show=1), err)]
     async fn download(
         &self,
         url: Url,
     ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
+        let span = Span::current();
+        span.pb_set_message(&format!(
+            "📡Fetching {}",
+            // TOOD: maybe shorten
+            redact_url(&url)
+        ));
+
         match url.scheme() {
             "file" => {
                 let f = tokio::fs::File::open(url.to_file_path().map_err(|_| {
@@ -211,16 +220,38 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
                     ))
                 })?)
                 .await?;
-                Ok(Box::new(tokio::io::BufReader::new(f)))
+
+                span.pb_set_length(f.metadata().await?.len());
+                span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+                span.pb_start();
+                Ok(Box::new(tokio::io::BufReader::new(InspectReader::new(
+                    f,
+                    move |d| {
+                        span.pb_inc(d.len() as u64);
+                    },
+                ))))
             }
             _ => {
                 let resp = self.http_client.get(url).send().await?;
+
+                if let Some(content_length) = resp.content_length() {
+                    span.pb_set_length(content_length);
+                    span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+                } else {
+                    span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
+                }
+                span.pb_start();
+
                 Ok(Box::new(tokio_util::io::StreamReader::new(
-                    resp.bytes_stream().map_err(|e| {
-                        let e = e.without_url();
-                        warn!(%e, "failed to get response body");
-                        std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
-                    }),
+                    resp.bytes_stream()
+                        .inspect_ok(move |d| {
+                            span.pb_inc(d.len() as u64);
+                        })
+                        .map_err(|e| {
+                            let e = e.without_url();
+                            warn!(%e, "failed to get response body");
+                            std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
+                        }),
                 )))
             }
         }
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 8ea964d18650..ad9e4a8ef897 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -135,9 +135,6 @@ impl TvixStoreIO {
             // it for things like <nixpkgs> pointing to a store path.
             // In the future, these things will (need to) have PathInfo.
             None => {
-                let span = Span::current();
-                span.pb_start();
-                span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
                 // The store path doesn't exist yet, so we need to fetch or build it.
                 // We check for fetches first, as we might have both native
                 // fetchers and FODs in KnownPaths, and prefer the former.
@@ -150,7 +147,6 @@ impl TvixStoreIO {
 
                 match maybe_fetch {
                     Some((name, fetch)) => {
-                        span.pb_set_message(&format!("⏳Fetching {}", &store_path));
                         let (sp, root_node) = self
                             .fetcher
                             .ingest_and_persist(&name, fetch)
@@ -183,6 +179,9 @@ impl TvixStoreIO {
                                 }
                             }
                         };
+                        let span = Span::current();
+                        span.pb_start();
+                        span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
                         span.pb_set_message(&format!("🔨Building {}", &store_path));
 
                         // derivation_to_build_request needs castore nodes for all inputs.