diff options
author | Florian Klink <flokli@flokli.de> | 2024-06-13T07·26+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-06-13T16·16+0000 |
commit | 99c5a2e8bcb83e595d2c12bf7296338b1687f9de (patch) | |
tree | de564c7c639f696f5d4a435c4fa910b9114ab2fd /tvix/glue/src/fetchers/mod.rs | |
parent | 7ee55c293cc47a6b3bebacd4d991aa9aa9cc7287 (diff) |
feat(tvix/glue): report progress on all fetches, use progress bars r/8259
This should also report progress on fetches which we couldn't delay until actually having to IO into them, like `builtins.fetchurl` calls without a upfront-provided hash. While at it, upgrade the progress spinners to progress bars, which increment if we know the size of the fetch. Change-Id: Ic3f332286d8bc2177f3d994ba25b165728d4b702 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11797 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: aspen <root@gws.fyi>
Diffstat (limited to 'tvix/glue/src/fetchers/mod.rs')
-rw-r--r-- | tvix/glue/src/fetchers/mod.rs | 45 |
1 files changed, 38 insertions, 7 deletions
diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs index 376a4cca634c..aee79b2e7504 100644 --- a/tvix/glue/src/fetchers/mod.rs +++ b/tvix/glue/src/fetchers/mod.rs @@ -8,7 +8,8 @@ use sha1::Sha1; use sha2::{digest::Output, Digest, Sha256, Sha512}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio_util::io::{InspectReader, InspectWriter}; -use tracing::warn; +use tracing::{instrument, warn, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, @@ -196,10 +197,18 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> { /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it. /// In case the URI uses the file:// scheme, use tokio::fs to open it. + #[instrument(skip_all, fields(url, indicatif.pb_show=1), err)] async fn download( &self, url: Url, ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> { + let span = Span::current(); + span.pb_set_message(&format!( + "📡Fetching {}", + // TOOD: maybe shorten + redact_url(&url) + )); + match url.scheme() { "file" => { let f = tokio::fs::File::open(url.to_file_path().map_err(|_| { @@ -211,16 +220,38 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> { )) })?) .await?; - Ok(Box::new(tokio::io::BufReader::new(f))) + + span.pb_set_length(f.metadata().await?.len()); + span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + span.pb_start(); + Ok(Box::new(tokio::io::BufReader::new(InspectReader::new( + f, + move |d| { + span.pb_inc(d.len() as u64); + }, + )))) } _ => { let resp = self.http_client.get(url).send().await?; + + if let Some(content_length) = resp.content_length() { + span.pb_set_length(content_length); + span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + } else { + span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE); + } + span.pb_start(); + Ok(Box::new(tokio_util::io::StreamReader::new( - resp.bytes_stream().map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) - }), + resp.bytes_stream() + .inspect_ok(move |d| { + span.pb_inc(d.len() as u64); + }) + .map_err(|e| { + let e = e.without_url(); + warn!(%e, "failed to get response body"); + std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) + }), ))) } } |