about summary refs log tree commit diff
path: root/tvix/store/src/bin/tvix-store.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-10-15T13·16+0300
committerflokli <flokli@flokli.de>2024-10-19T09·37+0000
commitd52d889f2bf6ff0aa329c3058120140da2debaf7 (patch)
tree5ca5005e19098f1a192be907407c65326059c26d /tvix/store/src/bin/tvix-store.rs
parent3fda90602d3de7a720149f090422c4da9d12d31d (diff)
refactor(tvix): make indicatif.pb_show=1 more explicit r/8837
This pushes generating spans with pb_show up to the caller.
They usually have more context on how to present things, if at all.

Change-Id: Icfcaa64a8a57dce50c0261f2d06e7c051e3946c2
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12657
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r--tvix/store/src/bin/tvix-store.rs139
1 files changed, 107 insertions, 32 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 6cc7c39ab74e..1aed0a8e4453 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -1,9 +1,11 @@
 use clap::Parser;
 use clap::Subcommand;
 
-use futures::future::try_join_all;
 use futures::StreamExt;
 use futures::TryStreamExt;
+use nix_compat::nix_daemon::de::Error;
+use nix_compat::nixhash::CAHash;
+use nix_compat::nixhash::NixHash;
 use nix_compat::{path_info::ExportedPathInfo, store_path::StorePath};
 use serde::Deserialize;
 use serde::Serialize;
@@ -12,11 +14,13 @@ use std::sync::Arc;
 use tonic::transport::Server;
 use tower::ServiceBuilder;
 use tower_http::trace::{DefaultMakeSpan, TraceLayer};
-use tracing::{info, info_span, instrument, Level, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+use tracing::{debug, info, info_span, instrument, warn, Instrument, Level, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
 use tvix_castore::import::fs::ingest_path;
+use tvix_store::import::path_to_name;
 use tvix_store::nar::NarCalculationService;
 use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc};
+use tvix_tracing::TracingHandle;
 
 use tvix_castore::proto::blob_service_server::BlobServiceServer;
 use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
@@ -159,7 +163,10 @@ fn default_threads() -> usize {
 }
 
 #[instrument(skip_all)]
-async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+async fn run_cli(
+    cli: Cli,
+    tracing_handle: TracingHandle,
+) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
     match cli.command {
         Commands::Daemon {
             listen_args,
@@ -242,37 +249,105 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
             let nar_calculation_service: Arc<dyn NarCalculationService> =
                 nar_calculation_service.into();
 
-            let tasks = paths
+            // For each path passed, construct the name, or bail out if it's invalid.
+            let paths_and_names = paths
                 .into_iter()
-                .map(|path| {
-                    tokio::task::spawn({
-                        let blob_service = blob_service.clone();
-                        let directory_service = directory_service.clone();
-                        let path_info_service = path_info_service.clone();
-                        let nar_calculation_service = nar_calculation_service.clone();
-
-                        async move {
-                            if let Ok(name) = tvix_store::import::path_to_name(&path) {
-                                let resp = tvix_store::import::import_path_as_nar_ca(
-                                    &path,
-                                    name,
-                                    blob_service,
-                                    directory_service,
-                                    path_info_service,
-                                    nar_calculation_service,
-                                )
-                                .await;
-                                if let Ok(path_info) = resp {
-                                    // If the import was successful, print the path to stdout.
-                                    println!("{}", path_info.store_path.to_absolute_path());
-                                }
+                .map(|p| match path_to_name(&p) {
+                    Ok(name) => {
+                        let name = name.to_owned();
+                        Ok((p, name))
+                    }
+                    Err(e) => Err(e),
+                })
+                .collect::<Result<Vec<_>, _>>()?;
+
+            let imports_span =
+                info_span!("import paths", "indicatif.pb_show" = tracing::field::Empty);
+            imports_span.pb_set_message("Importing");
+            imports_span.pb_set_length(paths_and_names.len() as u64);
+            imports_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+            imports_span.pb_start();
+
+            futures::stream::iter(paths_and_names)
+                .map(|(path, name)| {
+                    let blob_service = blob_service.clone();
+                    let directory_service = directory_service.clone();
+                    let path_info_service = path_info_service.clone();
+                    let nar_calculation_service = nar_calculation_service.clone();
+                    let imports_span = imports_span.clone();
+                    let tracing_handle = tracing_handle.clone();
+
+                    async move {
+                        let span = Span::current();
+                        span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
+                        span.pb_set_message(&format!("Ingesting {:?}", path));
+                        span.pb_start();
+
+                        // Ingest the contents at the given path into castore.
+                        let root_node = ingest_path::<_, _, _, &[u8]>(
+                            blob_service,
+                            directory_service,
+                            &path,
+                            None,
+                        )
+                        .await
+                        .map_err(std::io::Error::custom)?;
+
+                        span.pb_set_message(&format!("NAR Calculation for {:?}", path));
+
+                        // Ask for the NAR size and sha256
+                        let (nar_size, nar_sha256) =
+                            nar_calculation_service.calculate_nar(&root_node).await?;
+
+                        // Calculate the output path. This might still fail, as some names are illegal.
+                        // FUTUREWORK: express the `name` at the type level to be valid and check for this earlier.
+                        let ca = CAHash::Nar(NixHash::Sha256(nar_sha256));
+                        let output_path: StorePath<String> =
+                            nix_compat::store_path::build_ca_path::<&str, _, _>(
+                                &name,
+                                &ca,
+                                [],
+                                false,
+                            )
+                            .map_err(|e| {
+                                warn!(err=%e, "unable to build CA path");
+                                std::io::Error::custom(e)
+                            })?;
+
+                        // Construct and insert PathInfo
+                        match path_info_service
+                            .as_ref()
+                            .put(PathInfo {
+                                store_path: output_path.to_owned(),
+                                node: root_node,
+                                // There's no reference scanning on imported paths
+                                references: vec![],
+                                nar_size,
+                                nar_sha256,
+                                signatures: vec![],
+                                deriver: None,
+                                ca: Some(ca),
+                            })
+                            .await
+                        {
+                            // If the import was successful, print the path to stdout.
+                            Ok(path_info) => {
+                                use std::io::Write;
+                                debug!(store_path=%path_info.store_path.to_absolute_path(), "imported path");
+                                writeln!(&mut tracing_handle.get_stdout_writer(), "{}", path_info.store_path.to_absolute_path())?;
+                                imports_span.pb_inc(1);
+                                Ok(())
+                            }
+                            Err(e) => {
+                                warn!(?path, err=%e, "failed to import");
+                                Err(e)
                             }
                         }
-                    })
+                    }.instrument(info_span!("import path", "indicatif.pb_show" = tracing::field::Empty))
                 })
-                .collect::<Vec<_>>();
-
-            try_join_all(tasks).await?;
+                .buffer_unordered(50)
+                .try_collect()
+                .await?;
         }
         Commands::Copy {
             service_addrs,
@@ -465,7 +540,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
             }
             Ok(())
         },
-        res = run_cli(cli) => {
+        res = run_cli(cli, tracing_handle.clone()) => {
             if let Err(e) = tracing_handle.shutdown().await {
                 eprintln!("failed to shutdown tracing: {e}");
             }