diff options
author | Florian Klink <flokli@flokli.de> | 2024-06-16T16·22+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2024-06-17T12·57+0000 |
commit | 28b692fd507b5a32add96e3694bd1f2959bd9608 (patch) | |
tree | 945ed9c6feafb45a7fc1b366b7d5254064213134 /tvix/castore/src/import | |
parent | cfab953094c6da5a919ea968f2bd7753035005c7 (diff) |
feat(tvix/tvix-store): improve progress bars r/8289
Don't show an empty spinner for daemon commands. Move the bar to the right, so the text is better aligned between spinner progress and bar progress styles. Generally, push progress bars a bit more down to the place where we can track progress. This includes adding one in the upload_blob span. Introduce another progress style template for transfers, which interprets the counter as bytes (not just a plain integer), and also a data rate. Use it for here and in the fetching code, and also make the progress bar itself a bit less wide. Change-Id: I15c2ea3d2b24b5186cec19cd3dbd706638497f40 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11845 Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de> Reviewed-by: Simon Hauser <simon.hauser@helsinki-systems.de>
Diffstat (limited to 'tvix/castore/src/import')
-rw-r--r-- | tvix/castore/src/import/fs.rs | 51 | ||||
-rw-r--r-- | tvix/castore/src/import/mod.rs | 7 |
2 files changed, 36 insertions, 22 deletions
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index 265d7723554b..dc7821b8101e 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -6,6 +6,8 @@ use std::fs::FileType; use std::os::unix::ffi::OsStringExt; use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; +use tokio::io::BufReader; +use tokio_util::io::InspectReader; use tracing::instrument; use tracing::Span; use tracing_indicatif::span_ext::IndicatifSpanExt; @@ -28,7 +30,7 @@ use super::IngestionError; /// /// This function will walk the filesystem using `walkdir` and will consume /// `O(#number of entries)` space. -#[instrument(skip(blob_service, directory_service), fields(path), err)] +#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)] pub async fn ingest_path<BS, DS, P>( blob_service: BS, directory_service: DS, @@ -39,7 +41,10 @@ where BS: BlobService + Clone, DS: DirectoryService, { - Span::current().pb_start(); + let span = Span::current(); + span.pb_set_message(&format!("Ingesting {:?}", path)); + span.pb_start(); + let iter = WalkDir::new(path.as_ref()) .follow_links(false) .follow_root_links(false) @@ -49,11 +54,12 @@ where let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref()); ingest_entries( directory_service, - entries.inspect(|e| { - if let Ok(e) = e { - let s = Span::current(); - s.pb_inc(1); - s.pb_set_message(&format!("Ingesting {}", e.path())); + entries.inspect({ + let span = span.clone(); + move |e| { + if e.is_ok() { + span.pb_inc(1) + } } }), ) @@ -151,7 +157,7 @@ where } /// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service), fields(path), err)] +#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)] async fn upload_blob<BS>( blob_service: BS, path: impl AsRef<std::path::Path>, @@ -159,16 +165,29 @@ async fn upload_blob<BS>( where BS: BlobService, { - let mut file = match tokio::fs::File::open(path.as_ref()).await { - Ok(file) => file, - Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)), - }; + let span = Span::current(); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); + span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref())); + span.pb_start(); - let mut writer = blob_service.open_write().await; + let file = tokio::fs::File::open(path.as_ref()) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; - if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { - return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)); - }; + let metadata = file + .metadata() + .await + .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?; + + span.pb_set_length(metadata.len()); + let reader = InspectReader::new(file, |d| { + span.pb_inc(d.len() as u64); + }); + + let mut writer = blob_service.open_write().await; + tokio::io::copy(&mut BufReader::new(reader), &mut writer) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; let digest = writer .close() diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index c57c5bcada20..a9ac0be6b064 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -14,9 +14,6 @@ use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; -use tracing::Span; -use tracing_indicatif::span_ext::IndicatifSpanExt; - use tracing::Level; use std::collections::HashMap; @@ -46,7 +43,7 @@ pub mod fs; /// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter]. /// /// On success, returns the root node. -#[instrument(skip_all, fields(indicatif.pb_show=1), ret(level = Level::TRACE), err)] +#[instrument(skip_all, ret(level = Level::TRACE), err)] pub async fn ingest_entries<DS, S, E>( directory_service: DS, mut entries: S, @@ -60,8 +57,6 @@ where let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None; - Span::current().pb_start(); - let root_node = loop { let mut entry = entries .next() |