diff options
author | Florian Klink <flokli@flokli.de> | 2024-04-17T16·44+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2024-06-06T09·43+0000 |
commit | 20513e7a52fc81f05cf31c183203e1929ee13464 (patch) | |
tree | 7346aed381e29f3117fa8c12348226762e88e1d0 /tvix/store/src/bin/tvix-store.rs | |
parent | 9b77ce9f8facbb5018618738dc55f4f38c269a28 (diff) |
feat(tvix/store/bin): add progress bar infrastructure r/8220
This adds the tracing-indicatif crate, and configures it as a layer in our tracing_subscriber pipeline to emit progress for every span that's configured so. It also moves from using std::io::stderr to write logs to using their writer, to avoid clobbering output. Progress bar styles are defined in a lazy_static, moving this into a general tracing is left for later. This adds some usage of this to the `imports` and `copy` commands. The output can still be improved a bit - we should probably split each task up into a smaller (instrumented) helper functions, so we can create a progress bar for each task. Change-Id: I59a1915aa4e0caa89c911632dec59c4cbeba1b89 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11747 Reviewed-by: flokli <flokli@flokli.de> Reviewed-by: Simon Hauser <simon.hauser@helsinki-systems.de> Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 80 |
1 files changed, 64 insertions, 16 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 906d0ab5206a..2a2d6fe6f7d0 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -4,6 +4,7 @@ use clap::Subcommand; use futures::future::try_join_all; use futures::StreamExt; use futures::TryStreamExt; +use indicatif::ProgressStyle; use nix_compat::path_info::ExportedPathInfo; use serde::Deserialize; use serde::Serialize; @@ -14,10 +15,13 @@ use tokio_listener::SystemOptions; use tokio_listener::UserOptions; use tonic::transport::Server; use tracing::info; +use tracing::info_span; +use tracing::instrument; use tracing::Level; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::Layer; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tracing::Span; +use tracing_indicatif::filter::IndicatifFilter; +use tracing_indicatif::{span_ext::IndicatifSpanExt, IndicatifLayer}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; use tvix_castore::import::fs::ingest_path; use tvix_store::nar::NarCalculationService; use tvix_store::proto::NarInfo; @@ -31,6 +35,20 @@ use tvix_store::pathinfoservice::PathInfoService; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; +use lazy_static::lazy_static; + +// FUTUREWORK: move this to tracing crate +lazy_static! { + pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template( + "{span_child_prefix}{bar:30} {wide_msg} [{elapsed_precise}] {pos:>7}/{len:7}" + ) + .expect("invalid progress template"); + pub static ref PB_SPINNER_STYLE: ProgressStyle = ProgressStyle::with_template( + "{span_child_prefix}{spinner} {wide_msg} [{elapsed_precise}] {pos:>7}/{len:7}" + ) + .expect("invalid progress template"); +} + #[cfg(any(feature = "fuse", feature = "virtiofs"))] use tvix_store::pathinfoservice::make_fs; @@ -212,24 +230,32 @@ fn default_threads() -> usize { } #[tokio::main] +#[instrument(fields(indicatif.pb_show=1))] async fn main() -> Result<(), Box<dyn std::error::Error>> { let cli = Cli::parse(); // configure log settings let level = cli.log_level.unwrap_or(Level::INFO); + let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone()); + // Set up the tracing subscriber. - let subscriber = tracing_subscriber::registry().with( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .compact() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ); + let subscriber = tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::Layer::new() + .with_writer(indicatif_layer.get_stderr_writer()) + .compact() + .with_filter( + EnvFilter::builder() + .with_default_directive(level.into()) + .from_env() + .expect("invalid RUST_LOG"), + ), + ) + .with(indicatif_layer.with_filter( + // only show progress for spans with indicatif.pb_show field being set + IndicatifFilter::new(false), + )); // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI) // then init the registry. @@ -355,9 +381,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let nar_calculation_service: Arc<dyn NarCalculationService> = nar_calculation_service.into(); + let root_span = { + let s = Span::current(); + s.pb_set_style(&PB_PROGRESS_STYLE); + s.pb_set_message("Importing paths"); + s.pb_set_length(paths.len() as u64); + s.pb_start(); + s + }; + let tasks = paths .into_iter() .map(|path| { + let paths_span = root_span.clone(); tokio::task::spawn({ let blob_service = blob_service.clone(); let directory_service = directory_service.clone(); @@ -380,6 +416,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { println!("{}", output_path.to_absolute_path()); } } + paths_span.pb_inc(1); } }) }) @@ -416,15 +453,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // Arc the PathInfoService, as we clone it . let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + let lookups_span = info_span!( + "lookup pathinfos", + "indicatif.pb_show" = tracing::field::Empty + ); + lookups_span.pb_set_length(reference_graph.closure.len() as u64); + lookups_span.pb_set_style(&PB_PROGRESS_STYLE); + lookups_span.pb_start(); + // From our reference graph, lookup all pathinfos that might exist. let elems: Vec<_> = futures::stream::iter(reference_graph.closure) .map(|elem| { let path_info_service = path_info_service.clone(); async move { - path_info_service + let resp = path_info_service .get(*elem.path.digest()) .await - .map(|resp| (elem, resp)) + .map(|resp| (elem, resp)); + + Span::current().pb_inc(1); + resp } }) .buffer_unordered(50) |