about summary refs log tree commit diff
path: root/tvix/glue/src/fetchers
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue/src/fetchers')
-rw-r--r--tvix/glue/src/fetchers/mod.rs45
1 files changed, 38 insertions, 7 deletions
diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs
index 376a4cca634c..aee79b2e7504 100644
--- a/tvix/glue/src/fetchers/mod.rs
+++ b/tvix/glue/src/fetchers/mod.rs
@@ -8,7 +8,8 @@ use sha1::Sha1;
 use sha2::{digest::Output, Digest, Sha256, Sha512};
 use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
 use tokio_util::io::{InspectReader, InspectWriter};
-use tracing::warn;
+use tracing::{instrument, warn, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
@@ -196,10 +197,18 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
 
     /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it.
     /// In case the URI uses the file:// scheme, use tokio::fs to open it.
+    #[instrument(skip_all, fields(url, indicatif.pb_show=1), err)]
     async fn download(
         &self,
         url: Url,
     ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
+        let span = Span::current();
+        span.pb_set_message(&format!(
+            "📡Fetching {}",
+            // TOOD: maybe shorten
+            redact_url(&url)
+        ));
+
         match url.scheme() {
             "file" => {
                 let f = tokio::fs::File::open(url.to_file_path().map_err(|_| {
@@ -211,16 +220,38 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
                     ))
                 })?)
                 .await?;
-                Ok(Box::new(tokio::io::BufReader::new(f)))
+
+                span.pb_set_length(f.metadata().await?.len());
+                span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+                span.pb_start();
+                Ok(Box::new(tokio::io::BufReader::new(InspectReader::new(
+                    f,
+                    move |d| {
+                        span.pb_inc(d.len() as u64);
+                    },
+                ))))
             }
             _ => {
                 let resp = self.http_client.get(url).send().await?;
+
+                if let Some(content_length) = resp.content_length() {
+                    span.pb_set_length(content_length);
+                    span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+                } else {
+                    span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
+                }
+                span.pb_start();
+
                 Ok(Box::new(tokio_util::io::StreamReader::new(
-                    resp.bytes_stream().map_err(|e| {
-                        let e = e.without_url();
-                        warn!(%e, "failed to get response body");
-                        std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
-                    }),
+                    resp.bytes_stream()
+                        .inspect_ok(move |d| {
+                            span.pb_inc(d.len() as u64);
+                        })
+                        .map_err(|e| {
+                            let e = e.without_url();
+                            warn!(%e, "failed to get response body");
+                            std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)
+                        }),
                 )))
             }
         }