about summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tvix/Cargo.lock1
-rw-r--r--tvix/Cargo.nix4
-rw-r--r--tvix/castore/Cargo.toml1
-rw-r--r--tvix/castore/src/import/fs.rs51
-rw-r--r--tvix/castore/src/import/mod.rs7
-rw-r--r--tvix/glue/src/fetchers/mod.rs6
-rw-r--r--tvix/store/src/bin/tvix-store.rs14
-rw-r--r--tvix/tracing/src/lib.rs8
8 files changed, 52 insertions, 40 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index b4ee59031ba9..b4faaad0d298 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -4251,6 +4251,7 @@ dependencies = [
  "tower",
  "tracing",
  "tracing-indicatif",
+ "tvix-tracing",
  "url",
  "vhost",
  "vhost-user-backend",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index cd8ec667f36d..68e37a3fa045 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -13383,6 +13383,10 @@ rec {
             packageId = "tracing-indicatif";
           }
           {
+            name = "tvix-tracing";
+            packageId = "tvix-tracing";
+          }
+          {
             name = "url";
             packageId = "url";
           }
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index e82a6d12421d..ea4c598fe884 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -29,6 +29,7 @@ tonic = "0.11.0"
 tower = "0.4.13"
 tracing = "0.1.37"
 tracing-indicatif = "0.3.6"
+tvix-tracing = { path = "../tracing" }
 url = "2.4.0"
 walkdir = "2.4.0"
 zstd = "0.13.0"
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 265d7723554b..dc7821b8101e 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -6,6 +6,8 @@ use std::fs::FileType;
 use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
+use tokio::io::BufReader;
+use tokio_util::io::InspectReader;
 use tracing::instrument;
 use tracing::Span;
 use tracing_indicatif::span_ext::IndicatifSpanExt;
@@ -28,7 +30,7 @@ use super::IngestionError;
 ///
 /// This function will walk the filesystem using `walkdir` and will consume
 /// `O(#number of entries)` space.
-#[instrument(skip(blob_service, directory_service), fields(path), err)]
+#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)]
 pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
@@ -39,7 +41,10 @@ where
     BS: BlobService + Clone,
     DS: DirectoryService,
 {
-    Span::current().pb_start();
+    let span = Span::current();
+    span.pb_set_message(&format!("Ingesting {:?}", path));
+    span.pb_start();
+
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
         .follow_root_links(false)
@@ -49,11 +54,12 @@ where
     let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref());
     ingest_entries(
         directory_service,
-        entries.inspect(|e| {
-            if let Ok(e) = e {
-                let s = Span::current();
-                s.pb_inc(1);
-                s.pb_set_message(&format!("Ingesting {}", e.path()));
+        entries.inspect({
+            let span = span.clone();
+            move |e| {
+                if e.is_ok() {
+                    span.pb_inc(1)
+                }
             }
         }),
     )
@@ -151,7 +157,7 @@ where
 }
 
 /// Uploads the file at the provided [Path] the the [BlobService].
-#[instrument(skip(blob_service), fields(path), err)]
+#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)]
 async fn upload_blob<BS>(
     blob_service: BS,
     path: impl AsRef<std::path::Path>,
@@ -159,16 +165,29 @@ async fn upload_blob<BS>(
 where
     BS: BlobService,
 {
-    let mut file = match tokio::fs::File::open(path.as_ref()).await {
-        Ok(file) => file,
-        Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)),
-    };
+    let span = Span::current();
+    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
+    span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref()));
+    span.pb_start();
 
-    let mut writer = blob_service.open_write().await;
+    let file = tokio::fs::File::open(path.as_ref())
+        .await
+        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
 
-    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
-        return Err(Error::BlobRead(path.as_ref().to_path_buf(), e));
-    };
+    let metadata = file
+        .metadata()
+        .await
+        .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?;
+
+    span.pb_set_length(metadata.len());
+    let reader = InspectReader::new(file, |d| {
+        span.pb_inc(d.len() as u64);
+    });
+
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(&mut BufReader::new(reader), &mut writer)
+        .await
+        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
 
     let digest = writer
         .close()
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index c57c5bcada20..a9ac0be6b064 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -14,9 +14,6 @@ use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
 use crate::B3Digest;
 use futures::{Stream, StreamExt};
-use tracing::Span;
-use tracing_indicatif::span_ext::IndicatifSpanExt;
-
 use tracing::Level;
 
 use std::collections::HashMap;
@@ -46,7 +43,7 @@ pub mod fs;
 /// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
 ///
 /// On success, returns the root node.
-#[instrument(skip_all, fields(indicatif.pb_show=1), ret(level = Level::TRACE), err)]
+#[instrument(skip_all, ret(level = Level::TRACE), err)]
 pub async fn ingest_entries<DS, S, E>(
     directory_service: DS,
     mut entries: S,
@@ -60,8 +57,6 @@ where
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
     let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
 
-    Span::current().pb_start();
-
     let root_node = loop {
         let mut entry = entries
             .next()
diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs
index aee79b2e7504..6cce569e9799 100644
--- a/tvix/glue/src/fetchers/mod.rs
+++ b/tvix/glue/src/fetchers/mod.rs
@@ -222,7 +222,7 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
                 .await?;
 
                 span.pb_set_length(f.metadata().await?.len());
-                span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+                span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
                 span.pb_start();
                 Ok(Box::new(tokio::io::BufReader::new(InspectReader::new(
                     f,
@@ -236,9 +236,9 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
 
                 if let Some(content_length) = resp.content_length() {
                     span.pb_set_length(content_length);
-                    span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+                    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
                 } else {
-                    span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
+                    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
                 }
                 span.pb_start();
 
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 9e81e59f12b9..2a039fbebed7 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -192,7 +192,7 @@ fn default_threads() -> usize {
         .unwrap_or(4)
 }
 
-#[instrument(fields(indicatif.pb_show=1), skip(cli))]
+#[instrument(skip_all)]
 async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
     match cli.command {
         Commands::Daemon {
@@ -270,19 +270,9 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
             let nar_calculation_service: Arc<dyn NarCalculationService> =
                 nar_calculation_service.into();
 
-            let root_span = {
-                let s = Span::current();
-                s.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
-                s.pb_set_message("Importing paths");
-                s.pb_set_length(paths.len() as u64);
-                s.pb_start();
-                s
-            };
-
             let tasks = paths
                 .into_iter()
                 .map(|path| {
-                    let paths_span = root_span.clone();
                     tokio::task::spawn({
                         let blob_service = blob_service.clone();
                         let directory_service = directory_service.clone();
@@ -305,7 +295,6 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
                                     println!("{}", output_path.to_absolute_path());
                                 }
                             }
-                            paths_span.pb_inc(1);
                         }
                     })
                 })
@@ -512,7 +501,6 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
 }
 
 #[tokio::main]
-#[instrument(fields(indicatif.pb_show=1))]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let cli = Cli::parse();
 
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs
index 08e100781073..ecf31e1eb031 100644
--- a/tvix/tracing/src/lib.rs
+++ b/tvix/tracing/src/lib.rs
@@ -18,11 +18,15 @@ use tracing_tracy::TracyLayer;
 
 lazy_static! {
     pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template(
-        "{span_child_prefix}{bar:30} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+        "{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}"
+    )
+    .expect("invalid progress template");
+    pub static ref PB_TRANSFER_STYLE: ProgressStyle = ProgressStyle::with_template(
+        "{span_child_prefix} {wide_msg} {binary_bytes:>7}/{binary_total_bytes:7}@{decimal_bytes_per_sec} ({elapsed}) {bar:10} "
     )
     .expect("invalid progress template");
     pub static ref PB_SPINNER_STYLE: ProgressStyle = ProgressStyle::with_template(
-        "{span_child_prefix}{spinner} {wide_msg} [{elapsed_precise}]  {pos:>7}/{len:7}"
+        "{span_child_prefix}{spinner} {wide_msg} ({elapsed}) {pos:>7}/{len:7}"
     )
     .expect("invalid progress template");
 }