diff options
Diffstat (limited to 'tvix/glue/src/fetchers')
-rw-r--r-- | tvix/glue/src/fetchers/mod.rs | 45 |
1 files changed, 38 insertions, 7 deletions
diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs index 376a4cca634c..aee79b2e7504 100644 --- a/tvix/glue/src/fetchers/mod.rs +++ b/tvix/glue/src/fetchers/mod.rs @@ -8,7 +8,8 @@ use sha1::Sha1; use sha2::{digest::Output, Digest, Sha256, Sha512}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio_util::io::{InspectReader, InspectWriter}; -use tracing::warn; +use tracing::{instrument, warn, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, @@ -196,10 +197,18 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> { /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it. /// In case the URI uses the file:// scheme, use tokio::fs to open it. + #[instrument(skip_all, fields(url, indicatif.pb_show=1), err)] async fn download( &self, url: Url, ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> { + let span = Span::current(); + span.pb_set_message(&format!( + "📡Fetching {}", + // TOOD: maybe shorten + redact_url(&url) + )); + match url.scheme() { "file" => { let f = tokio::fs::File::open(url.to_file_path().map_err(|_| { @@ -211,16 +220,38 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> { )) })?) .await?; - Ok(Box::new(tokio::io::BufReader::new(f))) + + span.pb_set_length(f.metadata().await?.len()); + span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + span.pb_start(); + Ok(Box::new(tokio::io::BufReader::new(InspectReader::new( + f, + move |d| { + span.pb_inc(d.len() as u64); + }, + )))) } _ => { let resp = self.http_client.get(url).send().await?; + + if let Some(content_length) = resp.content_length() { + span.pb_set_length(content_length); + span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + } else { + span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE); + } + span.pb_start(); + Ok(Box::new(tokio_util::io::StreamReader::new( - resp.bytes_stream().map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) - }), + resp.bytes_stream() + .inspect_ok(move |d| { + span.pb_inc(d.len() as u64); + }) + .map_err(|e| { + let e = e.without_url(); + warn!(%e, "failed to get response body"); + std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) + }), ))) } } |