about summary refs log tree commit diff
path: root/tvix/store/src/bin/tvix-store.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-17T16·44+0300
committerflokli <flokli@flokli.de>2024-06-06T09·43+0000
commit20513e7a52fc81f05cf31c183203e1929ee13464 (patch)
tree7346aed381e29f3117fa8c12348226762e88e1d0 /tvix/store/src/bin/tvix-store.rs
parent9b77ce9f8facbb5018618738dc55f4f38c269a28 (diff)
feat(tvix/store/bin): add progress bar infrastructure r/8220
This adds the tracing-indicatif crate, and configures it as a layer in
our tracing_subscriber pipeline to emit progress for every span that's
configured so.

It also moves from using std::io::stderr to write logs to using their
writer, to avoid clobbering output.

Progress bar styles are defined in a lazy_static, moving this into a
general tracing is left for later.

This adds some usage of this to the `imports` and `copy` commands.

The output can still be improved a bit - we should  probably split each
task up into a smaller (instrumented) helper functions, so we can create
a progress bar for each task.

Change-Id: I59a1915aa4e0caa89c911632dec59c4cbeba1b89
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11747
Reviewed-by: flokli <flokli@flokli.de>
Reviewed-by: Simon Hauser <simon.hauser@helsinki-systems.de>
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r--tvix/store/src/bin/tvix-store.rs80
1 files changed, 64 insertions, 16 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 906d0ab5206a..2a2d6fe6f7d0 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -4,6 +4,7 @@ use clap::Subcommand;
 use futures::future::try_join_all;
 use futures::StreamExt;
 use futures::TryStreamExt;
+use indicatif::ProgressStyle;
 use nix_compat::path_info::ExportedPathInfo;
 use serde::Deserialize;
 use serde::Serialize;
@@ -14,10 +15,13 @@ use tokio_listener::SystemOptions;
 use tokio_listener::UserOptions;
 use tonic::transport::Server;
 use tracing::info;
+use tracing::info_span;
+use tracing::instrument;
 use tracing::Level;
-use tracing_subscriber::EnvFilter;
-use tracing_subscriber::Layer;
-use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
+use tracing::Span;
+use tracing_indicatif::filter::IndicatifFilter;
+use tracing_indicatif::{span_ext::IndicatifSpanExt, IndicatifLayer};
+use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
 use tvix_castore::import::fs::ingest_path;
 use tvix_store::nar::NarCalculationService;
 use tvix_store::proto::NarInfo;
@@ -31,6 +35,20 @@ use tvix_store::pathinfoservice::PathInfoService;
 use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
 use tvix_store::proto::GRPCPathInfoServiceWrapper;
 
+use lazy_static::lazy_static;
+
+// FUTUREWORK: move this to tracing crate
+lazy_static! {
+    pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template(
+        "{span_child_prefix}{bar:30} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+    )
+    .expect("invalid progress template");
+    pub static ref PB_SPINNER_STYLE: ProgressStyle = ProgressStyle::with_template(
+        "{span_child_prefix}{spinner} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+    )
+    .expect("invalid progress template");
+}
+
 #[cfg(any(feature = "fuse", feature = "virtiofs"))]
 use tvix_store::pathinfoservice::make_fs;
 
@@ -212,24 +230,32 @@ fn default_threads() -> usize {
 }
 
 #[tokio::main]
+#[instrument(fields(indicatif.pb_show=1))]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let cli = Cli::parse();
 
     // configure log settings
     let level = cli.log_level.unwrap_or(Level::INFO);
 
+    let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
+
     // Set up the tracing subscriber.
-    let subscriber = tracing_subscriber::registry().with(
-        tracing_subscriber::fmt::Layer::new()
-            .with_writer(std::io::stderr)
-            .compact()
-            .with_filter(
-                EnvFilter::builder()
-                    .with_default_directive(level.into())
-                    .from_env()
-                    .expect("invalid RUST_LOG"),
-            ),
-    );
+    let subscriber = tracing_subscriber::registry()
+        .with(
+            tracing_subscriber::fmt::Layer::new()
+                .with_writer(indicatif_layer.get_stderr_writer())
+                .compact()
+                .with_filter(
+                    EnvFilter::builder()
+                        .with_default_directive(level.into())
+                        .from_env()
+                        .expect("invalid RUST_LOG"),
+                ),
+        )
+        .with(indicatif_layer.with_filter(
+            // only show progress for spans with indicatif.pb_show field being set
+            IndicatifFilter::new(false),
+        ));
 
     // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
     // then init the registry.
@@ -355,9 +381,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             let nar_calculation_service: Arc<dyn NarCalculationService> =
                 nar_calculation_service.into();
 
+            let root_span = {
+                let s = Span::current();
+                s.pb_set_style(&PB_PROGRESS_STYLE);
+                s.pb_set_message("Importing paths");
+                s.pb_set_length(paths.len() as u64);
+                s.pb_start();
+                s
+            };
+
             let tasks = paths
                 .into_iter()
                 .map(|path| {
+                    let paths_span = root_span.clone();
                     tokio::task::spawn({
                         let blob_service = blob_service.clone();
                         let directory_service = directory_service.clone();
@@ -380,6 +416,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                                     println!("{}", output_path.to_absolute_path());
                                 }
                             }
+                            paths_span.pb_inc(1);
                         }
                     })
                 })
@@ -416,15 +453,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             // Arc the PathInfoService, as we clone it .
             let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
 
+            let lookups_span = info_span!(
+                "lookup pathinfos",
+                "indicatif.pb_show" = tracing::field::Empty
+            );
+            lookups_span.pb_set_length(reference_graph.closure.len() as u64);
+            lookups_span.pb_set_style(&PB_PROGRESS_STYLE);
+            lookups_span.pb_start();
+
             // From our reference graph, lookup all pathinfos that might exist.
             let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
                 .map(|elem| {
                     let path_info_service = path_info_service.clone();
                     async move {
-                        path_info_service
+                        let resp = path_info_service
                             .get(*elem.path.digest())
                             .await
-                            .map(|resp| (elem, resp))
+                            .map(|resp| (elem, resp));
+
+                        Span::current().pb_inc(1);
+                        resp
                     }
                 })
                 .buffer_unordered(50)