about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/Cargo.toml1
-rw-r--r--tvix/castore/src/import/fs.rs51
-rw-r--r--tvix/castore/src/import/mod.rs7
3 files changed, 37 insertions, 22 deletions
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index e82a6d12421d..ea4c598fe884 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -29,6 +29,7 @@ tonic = "0.11.0"
 tower = "0.4.13"
 tracing = "0.1.37"
 tracing-indicatif = "0.3.6"
+tvix-tracing = { path = "../tracing" }
 url = "2.4.0"
 walkdir = "2.4.0"
 zstd = "0.13.0"
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 265d7723554b..dc7821b8101e 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -6,6 +6,8 @@ use std::fs::FileType;
 use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
+use tokio::io::BufReader;
+use tokio_util::io::InspectReader;
 use tracing::instrument;
 use tracing::Span;
 use tracing_indicatif::span_ext::IndicatifSpanExt;
@@ -28,7 +30,7 @@ use super::IngestionError;
 ///
 /// This function will walk the filesystem using `walkdir` and will consume
 /// `O(#number of entries)` space.
-#[instrument(skip(blob_service, directory_service), fields(path), err)]
+#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)]
 pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
@@ -39,7 +41,10 @@ where
     BS: BlobService + Clone,
     DS: DirectoryService,
 {
-    Span::current().pb_start();
+    let span = Span::current();
+    span.pb_set_message(&format!("Ingesting {:?}", path));
+    span.pb_start();
+
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
         .follow_root_links(false)
@@ -49,11 +54,12 @@ where
     let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref());
     ingest_entries(
         directory_service,
-        entries.inspect(|e| {
-            if let Ok(e) = e {
-                let s = Span::current();
-                s.pb_inc(1);
-                s.pb_set_message(&format!("Ingesting {}", e.path()));
+        entries.inspect({
+            let span = span.clone();
+            move |e| {
+                if e.is_ok() {
+                    span.pb_inc(1)
+                }
             }
         }),
     )
@@ -151,7 +157,7 @@ where
 }
 
 /// Uploads the file at the provided [Path] the the [BlobService].
-#[instrument(skip(blob_service), fields(path), err)]
+#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)]
 async fn upload_blob<BS>(
     blob_service: BS,
     path: impl AsRef<std::path::Path>,
@@ -159,16 +165,29 @@ async fn upload_blob<BS>(
 where
     BS: BlobService,
 {
-    let mut file = match tokio::fs::File::open(path.as_ref()).await {
-        Ok(file) => file,
-        Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)),
-    };
+    let span = Span::current();
+    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
+    span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref()));
+    span.pb_start();
 
-    let mut writer = blob_service.open_write().await;
+    let file = tokio::fs::File::open(path.as_ref())
+        .await
+        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
 
-    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
-        return Err(Error::BlobRead(path.as_ref().to_path_buf(), e));
-    };
+    let metadata = file
+        .metadata()
+        .await
+        .map_err(|e| Error::Stat(path.as_ref().to_path_buf(), e))?;
+
+    span.pb_set_length(metadata.len());
+    let reader = InspectReader::new(file, |d| {
+        span.pb_inc(d.len() as u64);
+    });
+
+    let mut writer = blob_service.open_write().await;
+    tokio::io::copy(&mut BufReader::new(reader), &mut writer)
+        .await
+        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
 
     let digest = writer
         .close()
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index c57c5bcada20..a9ac0be6b064 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -14,9 +14,6 @@ use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
 use crate::B3Digest;
 use futures::{Stream, StreamExt};
-use tracing::Span;
-use tracing_indicatif::span_ext::IndicatifSpanExt;
-
 use tracing::Level;
 
 use std::collections::HashMap;
@@ -46,7 +43,7 @@ pub mod fs;
 /// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
 ///
 /// On success, returns the root node.
-#[instrument(skip_all, fields(indicatif.pb_show=1), ret(level = Level::TRACE), err)]
+#[instrument(skip_all, ret(level = Level::TRACE), err)]
 pub async fn ingest_entries<DS, S, E>(
     directory_service: DS,
     mut entries: S,
@@ -60,8 +57,6 @@ where
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
     let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
 
-    Span::current().pb_start();
-
     let root_node = loop {
         let mut entry = entries
             .next()