about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-17T16·44+0300
committerflokli <flokli@flokli.de>2024-06-06T09·43+0000
commit20513e7a52fc81f05cf31c183203e1929ee13464 (patch)
tree7346aed381e29f3117fa8c12348226762e88e1d0 /tvix/castore
parent9b77ce9f8facbb5018618738dc55f4f38c269a28 (diff)
feat(tvix/store/bin): add progress bar infrastructure r/8220
This adds the tracing-indicatif crate, and configures it as a layer in
our tracing_subscriber pipeline to emit progress for every span that's
configured so.

It also moves from using std::io::stderr to write logs to using their
writer, to avoid clobbering output.

Progress bar styles are defined in a lazy_static, moving this into a
general tracing is left for later.

This adds some usage of this to the `imports` and `copy` commands.

The output can still be improved a bit - we should  probably split each
task up into a smaller (instrumented) helper functions, so we can create
a progress bar for each task.

Change-Id: I59a1915aa4e0caa89c911632dec59c4cbeba1b89
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11747
Reviewed-by: flokli <flokli@flokli.de>
Reviewed-by: Simon Hauser <simon.hauser@helsinki-systems.de>
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/Cargo.toml2
-rw-r--r--tvix/castore/src/import/fs.rs15
-rw-r--r--tvix/castore/src/import/mod.rs6
3 files changed, 21 insertions, 2 deletions
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index f80c5813c141..40d4f24d9f75 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -14,6 +14,7 @@ data-encoding = "2.3.3"
 digest = "0.10.7"
 fastcdc = { version = "3.1.0", features = ["tokio"] }
 futures = "0.3.30"
+indicatif = "0.17.8"
 lazy_static = "1.4.0"
 object_store = { version = "0.9.1", features = ["http"] }
 parking_lot = "0.12.1"
@@ -28,6 +29,7 @@ tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi
 tonic = "0.11.0"
 tower = "0.4.13"
 tracing = "0.1.37"
+tracing-indicatif = "0.3.6"
 url = "2.4.0"
 walkdir = "2.4.0"
 zstd = "0.13.0"
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 9d3ecfe6ab7a..265d7723554b 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -7,6 +7,8 @@ use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
 use tracing::instrument;
+use tracing::Span;
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
 
@@ -37,6 +39,7 @@ where
     BS: BlobService + Clone,
     DS: DirectoryService,
 {
+    Span::current().pb_start();
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
         .follow_root_links(false)
@@ -44,7 +47,17 @@ where
         .into_iter();
 
     let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref());
-    ingest_entries(directory_service, entries).await
+    ingest_entries(
+        directory_service,
+        entries.inspect(|e| {
+            if let Ok(e) = e {
+                let s = Span::current();
+                s.pb_inc(1);
+                s.pb_set_message(&format!("Ingesting {}", e.path()));
+            }
+        }),
+    )
+    .await
 }
 
 /// Converts an iterator of [walkdir::DirEntry]s into a stream of ingestion entries.
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index 4223fe538756..c57c5bcada20 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -14,6 +14,8 @@ use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
 use crate::B3Digest;
 use futures::{Stream, StreamExt};
+use tracing::Span;
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 
 use tracing::Level;
 
@@ -44,7 +46,7 @@ pub mod fs;
 /// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
 ///
 /// On success, returns the root node.
-#[instrument(skip_all, ret(level = Level::TRACE), err)]
+#[instrument(skip_all, fields(indicatif.pb_show=1), ret(level = Level::TRACE), err)]
 pub async fn ingest_entries<DS, S, E>(
     directory_service: DS,
     mut entries: S,
@@ -58,6 +60,8 @@ where
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
     let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
 
+    Span::current().pb_start();
+
     let root_node = loop {
         let mut entry = entries
             .next()