diff options
Diffstat (limited to 'tvix/castore')
-rw-r--r-- | tvix/castore/Cargo.toml | 1 | ||||
-rw-r--r-- | tvix/castore/src/import/fs.rs | 51 | ||||
-rw-r--r-- | tvix/castore/src/import/mod.rs | 7 |
3 files changed, 37 insertions, 22 deletions
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index e82a6d12421d..ea4c598fe884 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -29,6 +29,7 @@ tonic = "0.11.0" tower = "0.4.13" tracing = "0.1.37" tracing-indicatif = "0.3.6" +tvix-tracing = { path = "../tracing" } url = "2.4.0" walkdir = "2.4.0" zstd = "0.13.0" diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index 265d7723554b..dc7821b8101e 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -6,6 +6,8 @@ use std::fs::FileType; use std::os::unix::ffi::OsStringExt; use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; +use tokio::io::BufReader; +use tokio_util::io::InspectReader; use tracing::instrument; use tracing::Span; use tracing_indicatif::span_ext::IndicatifSpanExt; @@ -28,7 +30,7 @@ use super::IngestionError; /// /// This function will walk the filesystem using `walkdir` and will consume /// `O(#number of entries)` space. -#[instrument(skip(blob_service, directory_service), fields(path), err)] +#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)] pub async fn ingest_path<BS, DS, P>( blob_service: BS, directory_service: DS, @@ -39,7 +41,10 @@ where BS: BlobService + Clone, DS: DirectoryService, { - Span::current().pb_start(); + let span = Span::current(); + span.pb_set_message(&format!("Ingesting {:?}", path)); + span.pb_start(); + let iter = WalkDir::new(path.as_ref()) .follow_links(false) .follow_root_links(false) @@ -49,11 +54,12 @@ where let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref()); ingest_entries( directory_service, - entries.inspect(|e| { - if let Ok(e) = e { - let s = Span::current(); - s.pb_inc(1); - s.pb_set_message(&format!("Ingesting {}", e.path())); + entries.inspect({ + let span = span.clone(); + move |e| { + if e.is_ok() { + span.pb_inc(1) + } } }), ) @@ -151,7 +157,7 @@ where } /// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service), fields(path), err)] +#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)] async fn upload_blob<BS>( blob_service: BS, path: impl AsRef<std::path::Path>, @@ -159,16 +165,29 @@ async fn upload_blob<BS>( where BS: BlobService, { - let mut file = match tokio::fs::File::open(path.as_ref()).await { - Ok(file) => file, - Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)), - }; + let span = Span::current(); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); + span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref())); + span.pb_start(); - let mut writer = blob_service.open_write().await; + let file = tokio::fs::File::open(path.as_ref()) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; - if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { - return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)); - }; + let metadata = file + .metadata() + .await + .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?; + + span.pb_set_length(metadata.len()); + let reader = InspectReader::new(file, |d| { + span.pb_inc(d.len() as u64); + }); + + let mut writer = blob_service.open_write().await; + tokio::io::copy(&mut BufReader::new(reader), &mut writer) + .await + .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?; let digest = writer .close() diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index c57c5bcada20..a9ac0be6b064 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -14,9 +14,6 @@ use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; -use tracing::Span; -use tracing_indicatif::span_ext::IndicatifSpanExt; - use tracing::Level; use std::collections::HashMap; @@ -46,7 +43,7 @@ pub mod fs; /// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter]. /// /// On success, returns the root node. -#[instrument(skip_all, fields(indicatif.pb_show=1), ret(level = Level::TRACE), err)] +#[instrument(skip_all, ret(level = Level::TRACE), err)] pub async fn ingest_entries<DS, S, E>( directory_service: DS, mut entries: S, @@ -60,8 +57,6 @@ where let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None; - Span::current().pb_start(); - let root_node = loop { let mut entry = entries .next() |