about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-06-16T16·22+0300
committerflokli <flokli@flokli.de>2024-06-17T12·57+0000
commit28b692fd507b5a32add96e3694bd1f2959bd9608 (patch)
tree945ed9c6feafb45a7fc1b366b7d5254064213134
parentcfab953094c6da5a919ea968f2bd7753035005c7 (diff)
feat(tvix/tvix-store): improve progress bars r/8289
Don't show an empty spinner for daemon commands.
Move the bar to the right, so the text is better aligned between spinner
progress and bar progress styles.

Generally, push progress bars a bit more down to the place where we can
track progress. This includes adding one in the upload_blob span.

Introduce another progress style template for transfers, which
interprets the counter as bytes (not just a plain integer), and also a data rate.
Use it for here and in the fetching code, and also make the progress bar
itself a bit less wide.

Change-Id: I15c2ea3d2b24b5186cec19cd3dbd706638497f40
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11845
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Simon Hauser <simon.hauser@helsinki-systems.de>
-rw-r--r--tvix/Cargo.lock1
-rw-r--r--tvix/Cargo.nix4
-rw-r--r--tvix/castore/Cargo.toml1
-rw-r--r--tvix/castore/src/import/fs.rs51
-rw-r--r--tvix/castore/src/import/mod.rs7
-rw-r--r--tvix/glue/src/fetchers/mod.rs6
-rw-r--r--tvix/store/src/bin/tvix-store.rs14
-rw-r--r--tvix/tracing/src/lib.rs8
8 files changed, 52 insertions, 40 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index b4ee59031b..b4faaad0d2 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -4251,6 +4251,7 @@ dependencies = [
  "tower",
  "tracing",
  "tracing-indicatif",
+ "tvix-tracing",
  "url",
  "vhost",
  "vhost-user-backend",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index cd8ec667f3..68e37a3fa0 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -13383,6 +13383,10 @@ rec {
             packageId = "tracing-indicatif";
           }
           {
+            name = "tvix-tracing";
+            packageId = "tvix-tracing";
+          }
+          {
             name = "url";
             packageId = "url";
           }
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index e82a6d1242..ea4c598fe8 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -29,6 +29,7 @@ tonic = "0.11.0"
 tower = "0.4.13"
 tracing = "0.1.37"
 tracing-indicatif = "0.3.6"
+tvix-tracing = { path = "../tracing" }
 url = "2.4.0"
 walkdir = "2.4.0"
 zstd = "0.13.0"
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 265d772355..dc7821b810 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -6,6 +6,8 @@ use std::fs::FileType;
 use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
+use tokio::io::BufReader;
+use tokio_util::io::InspectReader;
 use tracing::instrument;
 use tracing::Span;
 use tracing_indicatif::span_ext::IndicatifSpanExt;
@@ -28,7 +30,7 @@ use super::IngestionError;
 ///
 /// This function will walk the filesystem using `walkdir` and will consume
 /// `O(#number of entries)` space.
-#[instrument(skip(blob_service, directory_service), fields(path), err)]
+#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)]
 pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
@@ -39,7 +41,10 @@ where
     BS: BlobService + Clone,
     DS: DirectoryService,
 {
-    Span::current().pb_start();
+    let span = Span::current();
+    span.pb_set_message(&format!("Ingesting {:?}", path));
+    span.pb_start();
+
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
         .follow_root_links(false)
@@ -49,11 +54,12 @@ where
     let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref());
     ingest_entries(
         directory_service,
-        entries.inspect(|e| {
-            if let Ok(e) = e {
-                let s = Span::current();
-                s.pb_inc(1);
-                s.pb_set_message(&format!("Ingesting {}", e.path()));
+        entries.inspect({
+            let span = span.clone();
+            move |e| {
+                if e.is_ok() {
+                    span.pb_inc(1)
+                }
             }
         }),
     )
@@ -151,7 +157,7 @@ where
 }
 
 /// Uploads the file at the provided [Path] the the [BlobService].
-#[instrument(skip(blob_service), fields(path), err)]
+#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)]
 async fn upload_blob<BS>(
     blob_service: BS,
     path: impl AsRef<std::path::Path>,
@@ -159,16 +165,29 @@ async fn upload_blob<BS>(
 where
     BS: BlobService,
 {
-    let mut file = match tokio::fs::File::open(path.as_ref()).await {
-        Ok(file) => file,
-        Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)),
-    };
+    let span = Span::current();
+    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
+    span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref()));
+    span.pb_start();
 
-    let mut writer = blob_service.open_write().await;
+    let file = tokio::fs::File::open(path.as_ref())
+        .await
+        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
 
-    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
-        return Err(Error::BlobRead(path.as_ref().to_path_buf(), e));
-    };
+    let metadata = file
+        .metadata()
+        .await
+        .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?;
+
+    span.pb_set_length(metadata.len());
+    let reader = InspectReader::new(file, |d| {
+        span.pb_inc(d.len() as u64);
+    });
+
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(&mut BufReader::new(reader), &mut writer)
+        .await
+        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
 
     let digest = writer
         .close()
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index c57c5bcada..a9ac0be6b0 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -14,9 +14,6 @@ use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
 use crate::B3Digest;
 use futures::{Stream, StreamExt};
-use tracing::Span;
-use tracing_indicatif::span_ext::IndicatifSpanExt;
-
 use tracing::Level;
 
 use std::collections::HashMap;
@@ -46,7 +43,7 @@ pub mod fs;
 /// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
 ///
 /// On success, returns the root node.
-#[instrument(skip_all, fields(indicatif.pb_show=1), ret(level = Level::TRACE), err)]
+#[instrument(skip_all, ret(level = Level::TRACE), err)]
 pub async fn ingest_entries<DS, S, E>(
     directory_service: DS,
     mut entries: S,
@@ -60,8 +57,6 @@ where
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
     let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
 
-    Span::current().pb_start();
-
     let root_node = loop {
         let mut entry = entries
             .next()
diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs
index aee79b2e75..6cce569e97 100644
--- a/tvix/glue/src/fetchers/mod.rs
+++ b/tvix/glue/src/fetchers/mod.rs
@@ -222,7 +222,7 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
                 .await?;
 
                 span.pb_set_length(f.metadata().await?.len());
-                span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+                span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
                 span.pb_start();
                 Ok(Box::new(tokio::io::BufReader::new(InspectReader::new(
                     f,
@@ -236,9 +236,9 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
 
                 if let Some(content_length) = resp.content_length() {
                     span.pb_set_length(content_length);
-                    span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+                    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
                 } else {
-                    span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
+                    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
                 }
                 span.pb_start();
 
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 9e81e59f12..2a039fbebe 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -192,7 +192,7 @@ fn default_threads() -> usize {
         .unwrap_or(4)
 }
 
-#[instrument(fields(indicatif.pb_show=1), skip(cli))]
+#[instrument(skip_all)]
 async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
     match cli.command {
         Commands::Daemon {
@@ -270,19 +270,9 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
             let nar_calculation_service: Arc<dyn NarCalculationService> =
                 nar_calculation_service.into();
 
-            let root_span = {
-                let s = Span::current();
-                s.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
-                s.pb_set_message("Importing paths");
-                s.pb_set_length(paths.len() as u64);
-                s.pb_start();
-                s
-            };
-
             let tasks = paths
                 .into_iter()
                 .map(|path| {
-                    let paths_span = root_span.clone();
                     tokio::task::spawn({
                         let blob_service = blob_service.clone();
                         let directory_service = directory_service.clone();
@@ -305,7 +295,6 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
                                     println!("{}", output_path.to_absolute_path());
                                 }
                             }
-                            paths_span.pb_inc(1);
                         }
                     })
                 })
@@ -512,7 +501,6 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
 }
 
 #[tokio::main]
-#[instrument(fields(indicatif.pb_show=1))]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let cli = Cli::parse();
 
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs
index 08e1007810..ecf31e1eb0 100644
--- a/tvix/tracing/src/lib.rs
+++ b/tvix/tracing/src/lib.rs
@@ -18,11 +18,15 @@ use tracing_tracy::TracyLayer;
 
 lazy_static! {
     pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template(
-        "{span_child_prefix}{bar:30} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+        "{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}"
+    )
+    .expect("invalid progress template");
+    pub static ref PB_TRANSFER_STYLE: ProgressStyle = ProgressStyle::with_template(
+        "{span_child_prefix} {wide_msg} {binary_bytes:>7}/{binary_total_bytes:7}@{decimal_bytes_per_sec} ({elapsed}) {bar:10} "
     )
     .expect("invalid progress template");
     pub static ref PB_SPINNER_STYLE: ProgressStyle = ProgressStyle::with_template(
-        "{span_child_prefix}{spinner} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+        "{span_child_prefix}{spinner} {wide_msg} ({elapsed}) {pos:>7}/{len:7}"
     )
     .expect("invalid progress template");
 }