From 99c5a2e8bcb83e595d2c12bf7296338b1687f9de Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Thu, 13 Jun 2024 10:26:35 +0300 Subject: feat(tvix/glue): report progress on all fetches, use progress bars This should also report progress on fetches which we couldn't delay until actually having to IO into them, like `builtins.fetchurl` calls without a upfront-provided hash. While at it, upgrade the progress spinners to progress bars, which increment if we know the size of the fetch. Change-Id: Ic3f332286d8bc2177f3d994ba25b165728d4b702 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11797 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: aspen --- tvix/glue/src/fetchers/mod.rs | 45 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 7 deletions(-) (limited to 'tvix/glue/src/fetchers/mod.rs') diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs index 376a4cca634c..aee79b2e7504 100644 --- a/tvix/glue/src/fetchers/mod.rs +++ b/tvix/glue/src/fetchers/mod.rs @@ -8,7 +8,8 @@ use sha1::Sha1; use sha2::{digest::Output, Digest, Sha256, Sha512}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio_util::io::{InspectReader, InspectWriter}; -use tracing::warn; +use tracing::{instrument, warn, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, @@ -196,10 +197,18 @@ impl Fetcher { /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it. /// In case the URI uses the file:// scheme, use tokio::fs to open it. + #[instrument(skip_all, fields(url, indicatif.pb_show=1), err)] async fn download( &self, url: Url, ) -> Result, FetcherError> { + let span = Span::current(); + span.pb_set_message(&format!( + "📡Fetching {}", + // TOOD: maybe shorten + redact_url(&url) + )); + match url.scheme() { "file" => { let f = tokio::fs::File::open(url.to_file_path().map_err(|_| { @@ -211,16 +220,38 @@ impl Fetcher { )) })?) .await?; - Ok(Box::new(tokio::io::BufReader::new(f))) + + span.pb_set_length(f.metadata().await?.len()); + span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + span.pb_start(); + Ok(Box::new(tokio::io::BufReader::new(InspectReader::new( + f, + move |d| { + span.pb_inc(d.len() as u64); + }, + )))) } _ => { let resp = self.http_client.get(url).send().await?; + + if let Some(content_length) = resp.content_length() { + span.pb_set_length(content_length); + span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + } else { + span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE); + } + span.pb_start(); + Ok(Box::new(tokio_util::io::StreamReader::new( - resp.bytes_stream().map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) - }), + resp.bytes_stream() + .inspect_ok(move |d| { + span.pb_inc(d.len() as u64); + }) + .map_err(|e| { + let e = e.without_url(); + warn!(%e, "failed to get response body"); + std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) + }), ))) } } -- cgit 1.4.1