diff options
-rw-r--r-- | tvix/Cargo.lock | 1 | ||||
-rw-r--r-- | tvix/Cargo.nix | 4 | ||||
-rw-r--r-- | tvix/castore/Cargo.toml | 1 | ||||
-rw-r--r-- | tvix/castore/src/import/fs.rs | 51 | ||||
-rw-r--r-- | tvix/castore/src/import/mod.rs | 7 | ||||
-rw-r--r-- | tvix/glue/src/fetchers/mod.rs | 6 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 14 | ||||
-rw-r--r-- | tvix/tracing/src/lib.rs | 8 |
8 files changed, 52 insertions, 40 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index b4ee59031ba9..b4faaad0d298 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -4251,6 +4251,7 @@ dependencies = [ "tower", "tracing", "tracing-indicatif", + "tvix-tracing", "url", "vhost", "vhost-user-backend", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index cd8ec667f36d..68e37a3fa045 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -13383,6 +13383,10 @@ rec { packageId = "tracing-indicatif"; } { + name = "tvix-tracing"; + packageId = "tvix-tracing"; + } + { name = "url"; packageId = "url"; } diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index e82a6d12421d..ea4c598fe884 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -29,6 +29,7 @@ tonic = "0.11.0" tower = "0.4.13" tracing = "0.1.37" tracing-indicatif = "0.3.6" +tvix-tracing = { path = "../tracing" } url = "2.4.0" walkdir = "2.4.0" zstd = "0.13.0" diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index 265d7723554b..dc7821b8101e 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -6,6 +6,8 @@ use std::fs::FileType; use std::os::unix::ffi::OsStringExt; use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; +use tokio::io::BufReader; +use tokio_util::io::InspectReader; use tracing::instrument; use tracing::Span; use tracing_indicatif::span_ext::IndicatifSpanExt; @@ -28,7 +30,7 @@ use super::IngestionError; /// /// This function will walk the filesystem using `walkdir` and will consume /// `O(#number of entries)` space. -#[instrument(skip(blob_service, directory_service), fields(path), err)] +#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)] pub async fn ingest_path<BS, DS, P>( blob_service: BS, directory_service: DS, @@ -39,7 +41,10 @@ where BS: BlobService + Clone, DS: DirectoryService, { - Span::current().pb_start(); + let span = Span::current(); + span.pb_set_message(&format!("Ingesting {:?}", path)); + span.pb_start(); + let iter = WalkDir::new(path.as_ref()) .follow_links(false) .follow_root_links(false) @@ -49,11 +54,12 @@ where let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref()); ingest_entries( directory_service, - entries.inspect(|e| { - if let Ok(e) = e { - let s = Span::current(); - s.pb_inc(1); - s.pb_set_message(&format!("Ingesting {}", e.path())); + entries.inspect({ + let span = span.clone(); + move |e| { + if e.is_ok() { + span.pb_inc(1) + } } }), ) @@ -151,7 +157,7 @@ where } /// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service), fields(path), err)] +#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)] async fn upload_blob<BS>( blob_service: BS, path: impl AsRef<std::path::Path>, @@ -159,16 +165,29 @@ async fn upload_blob<BS>( where BS: BlobService, { - let mut file = match tokio::fs::File::open(path.as_ref()).await { - Ok(file) => file, - Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)), - }; + let span = Span::current(); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); + span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref())); + span.pb_start(); - let mut writer = blob_service.open_write().await; + let file = tokio::fs::File::open(path.as_ref()) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; - if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { - return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)); - }; + let metadata = file + .metadata() + .await + .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?; + + span.pb_set_length(metadata.len()); + let reader = InspectReader::new(file, |d| { + span.pb_inc(d.len() as u64); + }); + + let mut writer = blob_service.open_write().await; + tokio::io::copy(&mut BufReader::new(reader), &mut writer) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; let digest = writer .close() diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index c57c5bcada20..a9ac0be6b064 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -14,9 +14,6 @@ use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; -use tracing::Span; -use tracing_indicatif::span_ext::IndicatifSpanExt; - use tracing::Level; use std::collections::HashMap; @@ -46,7 +43,7 @@ pub mod fs; /// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter]. /// /// On success, returns the root node. -#[instrument(skip_all, fields(indicatif.pb_show=1), ret(level = Level::TRACE), err)] +#[instrument(skip_all, ret(level = Level::TRACE), err)] pub async fn ingest_entries<DS, S, E>( directory_service: DS, mut entries: S, @@ -60,8 +57,6 @@ where let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None; - Span::current().pb_start(); - let root_node = loop { let mut entry = entries .next() diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs index aee79b2e7504..6cce569e9799 100644 --- a/tvix/glue/src/fetchers/mod.rs +++ b/tvix/glue/src/fetchers/mod.rs @@ -222,7 +222,7 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> { .await?; span.pb_set_length(f.metadata().await?.len()); - span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); span.pb_start(); Ok(Box::new(tokio::io::BufReader::new(InspectReader::new( f, @@ -236,9 +236,9 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> { if let Some(content_length) = resp.content_length() { span.pb_set_length(content_length); - span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); } else { - span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); } span.pb_start(); diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 9e81e59f12b9..2a039fbebed7 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -192,7 +192,7 @@ fn default_threads() -> usize { .unwrap_or(4) } -#[instrument(fields(indicatif.pb_show=1), skip(cli))] +#[instrument(skip_all)] async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { match cli.command { Commands::Daemon { @@ -270,19 +270,9 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { let nar_calculation_service: Arc<dyn NarCalculationService> = nar_calculation_service.into(); - let root_span = { - let s = Span::current(); - s.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); - s.pb_set_message("Importing paths"); - s.pb_set_length(paths.len() as u64); - s.pb_start(); - s - }; - let tasks = paths .into_iter() .map(|path| { - let paths_span = root_span.clone(); tokio::task::spawn({ let blob_service = blob_service.clone(); let directory_service = directory_service.clone(); @@ -305,7 +295,6 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { println!("{}", output_path.to_absolute_path()); } } - paths_span.pb_inc(1); } }) }) @@ -512,7 +501,6 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { } #[tokio::main] -#[instrument(fields(indicatif.pb_show=1))] async fn main() -> Result<(), Box<dyn std::error::Error>> { let cli = Cli::parse(); diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs index 08e100781073..ecf31e1eb031 100644 --- a/tvix/tracing/src/lib.rs +++ b/tvix/tracing/src/lib.rs @@ -18,11 +18,15 @@ use tracing_tracy::TracyLayer; lazy_static! { pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template( - "{span_child_prefix}{bar:30} {wide_msg} [{elapsed_precise}] {pos:>7}/{len:7}" + "{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}" + ) + .expect("invalid progress template"); + pub static ref PB_TRANSFER_STYLE: ProgressStyle = ProgressStyle::with_template( + "{span_child_prefix} {wide_msg} {binary_bytes:>7}/{binary_total_bytes:7}@{decimal_bytes_per_sec} ({elapsed}) {bar:10} " ) .expect("invalid progress template"); pub static ref PB_SPINNER_STYLE: ProgressStyle = ProgressStyle::with_template( - "{span_child_prefix}{spinner} {wide_msg} [{elapsed_precise}] {pos:>7}/{len:7}" + "{span_child_prefix}{spinner} {wide_msg} ({elapsed}) {pos:>7}/{len:7}" ) .expect("invalid progress template"); } |