about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/castore/src/import/fs.rs25
-rw-r--r--tvix/store/src/bin/tvix-store.rs139
-rw-r--r--tvix/store/src/nar/renderer.rs9
3 files changed, 126 insertions, 47 deletions
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 5708579baad3..78eac6f6128d 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -8,7 +8,9 @@ use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
 use tokio::io::BufReader;
 use tokio_util::io::InspectReader;
+use tracing::info_span;
 use tracing::instrument;
+use tracing::Instrument;
 use tracing::Span;
 use tracing_indicatif::span_ext::IndicatifSpanExt;
 use walkdir::DirEntry;
@@ -30,7 +32,11 @@ use super::IngestionError;
 ///
 /// This function will walk the filesystem using `walkdir` and will consume
 /// `O(#number of entries)` space.
-#[instrument(skip(blob_service, directory_service, reference_scanner), fields(path, indicatif.pb_show=1), err)]
+#[instrument(
+    skip(blob_service, directory_service, reference_scanner),
+    fields(path),
+    err
+)]
 pub async fn ingest_path<BS, DS, P, P2>(
     blob_service: BS,
     directory_service: DS,
@@ -44,8 +50,6 @@ where
     P2: AsRef<[u8]> + Send + Sync,
 {
     let span = Span::current();
-    span.pb_set_message(&format!("Ingesting {:?}", path));
-    span.pb_start();
 
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
@@ -158,8 +162,15 @@ where
             .metadata()
             .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
 
-        let digest =
-            upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner).await?;
+        let digest = upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner)
+            .instrument({
+                let span = info_span!("upload_blob", "indicatif.pb_show" = tracing::field::Empty);
+                span.pb_set_message(&format!("Uploading blob for {:?}", fs_path));
+                span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
+
+                span
+            })
+            .await?;
 
         Ok(IngestionEntry::Regular {
             path,
@@ -175,7 +186,7 @@ where
 }
 
 /// Uploads the file at the provided [Path] the the [BlobService].
-#[instrument(skip(blob_service, reference_scanner), fields(path, indicatif.pb_show=1), err)]
+#[instrument(skip(blob_service, reference_scanner), fields(path), err)]
 async fn upload_blob<BS, P>(
     blob_service: BS,
     path: impl AsRef<std::path::Path>,
@@ -186,8 +197,6 @@ where
     P: AsRef<[u8]>,
 {
     let span = Span::current();
-    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
-    span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref()));
     span.pb_start();
 
     let file = tokio::fs::File::open(path.as_ref())
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 6cc7c39ab74e..1aed0a8e4453 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -1,9 +1,11 @@
 use clap::Parser;
 use clap::Subcommand;
 
-use futures::future::try_join_all;
 use futures::StreamExt;
 use futures::TryStreamExt;
+use nix_compat::nix_daemon::de::Error;
+use nix_compat::nixhash::CAHash;
+use nix_compat::nixhash::NixHash;
 use nix_compat::{path_info::ExportedPathInfo, store_path::StorePath};
 use serde::Deserialize;
 use serde::Serialize;
@@ -12,11 +14,13 @@ use std::sync::Arc;
 use tonic::transport::Server;
 use tower::ServiceBuilder;
 use tower_http::trace::{DefaultMakeSpan, TraceLayer};
-use tracing::{info, info_span, instrument, Level, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+use tracing::{debug, info, info_span, instrument, warn, Instrument, Level, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 use tvix_castore::import::fs::ingest_path;
+use tvix_store::import::path_to_name;
 use tvix_store::nar::NarCalculationService;
 use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc};
+use tvix_tracing::TracingHandle;
 
 use tvix_castore::proto::blob_service_server::BlobServiceServer;
 use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
@@ -159,7 +163,10 @@ fn default_threads() -> usize {
 }
 
 #[instrument(skip_all)]
-async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+async fn run_cli(
+    cli: Cli,
+    tracing_handle: TracingHandle,
+) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
     match cli.command {
         Commands::Daemon {
             listen_args,
@@ -242,37 +249,105 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
             let nar_calculation_service: Arc<dyn NarCalculationService> =
                 nar_calculation_service.into();
 
-            let tasks = paths
+            // For each path passed, construct the name, or bail out if it's invalid.
+            let paths_and_names = paths
                 .into_iter()
-                .map(|path| {
-                    tokio::task::spawn({
-                        let blob_service = blob_service.clone();
-                        let directory_service = directory_service.clone();
-                        let path_info_service = path_info_service.clone();
-                        let nar_calculation_service = nar_calculation_service.clone();
-
-                        async move {
-                            if let Ok(name) = tvix_store::import::path_to_name(&path) {
-                                let resp = tvix_store::import::import_path_as_nar_ca(
-                                    &path,
-                                    name,
-                                    blob_service,
-                                    directory_service,
-                                    path_info_service,
-                                    nar_calculation_service,
-                                )
-                                .await;
-                                if let Ok(path_info) = resp {
-                                    // If the import was successful, print the path to stdout.
-                                    println!("{}", path_info.store_path.to_absolute_path());
-                                }
+                .map(|p| match path_to_name(&p) {
+                    Ok(name) => {
+                        let name = name.to_owned();
+                        Ok((p, name))
+                    }
+                    Err(e) => Err(e),
+                })
+                .collect::<Result<Vec<_>, _>>()?;
+
+            let imports_span =
+                info_span!("import paths", "indicatif.pb_show" = tracing::field::Empty);
+            imports_span.pb_set_message("Importing");
+            imports_span.pb_set_length(paths_and_names.len() as u64);
+            imports_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+            imports_span.pb_start();
+
+            futures::stream::iter(paths_and_names)
+                .map(|(path, name)| {
+                    let blob_service = blob_service.clone();
+                    let directory_service = directory_service.clone();
+                    let path_info_service = path_info_service.clone();
+                    let nar_calculation_service = nar_calculation_service.clone();
+                    let imports_span = imports_span.clone();
+                    let tracing_handle = tracing_handle.clone();
+
+                    async move {
+                        let span = Span::current();
+                        span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
+                        span.pb_set_message(&format!("Ingesting {:?}", path));
+                        span.pb_start();
+
+                        // Ingest the contents at the given path into castore.
+                        let root_node = ingest_path::<_, _, _, &[u8]>(
+                            blob_service,
+                            directory_service,
+                            &path,
+                            None,
+                        )
+                        .await
+                        .map_err(std::io::Error::custom)?;
+
+                        span.pb_set_message(&format!("NAR Calculation for {:?}", path));
+
+                        // Ask for the NAR size and sha256
+                        let (nar_size, nar_sha256) =
+                            nar_calculation_service.calculate_nar(&root_node).await?;
+
+                        // Calculate the output path. This might still fail, as some names are illegal.
+                        // FUTUREWORK: express the `name` at the type level to be valid and check for this earlier.
+                        let ca = CAHash::Nar(NixHash::Sha256(nar_sha256));
+                        let output_path: StorePath<String> =
+                            nix_compat::store_path::build_ca_path::<&str, _, _>(
+                                &name,
+                                &ca,
+                                [],
+                                false,
+                            )
+                            .map_err(|e| {
+                                warn!(err=%e, "unable to build CA path");
+                                std::io::Error::custom(e)
+                            })?;
+
+                        // Construct and insert PathInfo
+                        match path_info_service
+                            .as_ref()
+                            .put(PathInfo {
+                                store_path: output_path.to_owned(),
+                                node: root_node,
+                                // There's no reference scanning on imported paths
+                                references: vec![],
+                                nar_size,
+                                nar_sha256,
+                                signatures: vec![],
+                                deriver: None,
+                                ca: Some(ca),
+                            })
+                            .await
+                        {
+                            // If the import was successful, print the path to stdout.
+                            Ok(path_info) => {
+                                use std::io::Write;
+                                debug!(store_path=%path_info.store_path.to_absolute_path(), "imported path");
+                                writeln!(&mut tracing_handle.get_stdout_writer(), "{}", path_info.store_path.to_absolute_path())?;
+                                imports_span.pb_inc(1);
+                                Ok(())
+                            }
+                            Err(e) => {
+                                warn!(?path, err=%e, "failed to import");
+                                Err(e)
                             }
                         }
-                    })
+                    }.instrument(info_span!("import path", "indicatif.pb_show" = tracing::field::Empty))
                 })
-                .collect::<Vec<_>>();
-
-            try_join_all(tasks).await?;
+                .buffer_unordered(50)
+                .try_collect()
+                .await?;
         }
         Commands::Copy {
             service_addrs,
@@ -465,7 +540,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
             }
             Ok(())
         },
-        res = run_cli(cli) => {
+        res = run_cli(cli, tracing_handle.clone()) => {
             if let Err(e) = tracing_handle.shutdown().await {
                 eprintln!("failed to shutdown tracing: {e}");
             }
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index afd85f267c6c..07cdc4b1e31f 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -6,8 +6,7 @@ use nix_compat::nar::writer::r#async as nar_writer;
 use sha2::{Digest, Sha256};
 use tokio::io::{self, AsyncWrite, BufReader};
 use tonic::async_trait;
-use tracing::{instrument, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::instrument;
 use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Node};
 
 pub struct SimpleRenderer<BS, DS> {
@@ -46,7 +45,7 @@ where
 
 /// Invoke [write_nar], and return the size and sha256 digest of the produced
 /// NAR output.
-#[instrument(skip_all, fields(indicatif.pb_show=1))]
+#[instrument(skip_all)]
 pub async fn calculate_size_and_sha256<BS, DS>(
     root_node: &Node,
     blob_service: BS,
@@ -59,10 +58,6 @@ where
     let mut h = Sha256::new();
     let mut cw = CountWrite::from(&mut h);
 
-    let span = Span::current();
-    span.pb_set_message("Calculating NAR");
-    span.pb_start();
-
     write_nar(
         // The hasher doesn't speak async. It doesn't
         // actually do any I/O, so it's fine to wrap.