diff options
author | Florian Klink <flokli@flokli.de> | 2024-10-15T13·16+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2024-10-19T09·37+0000 |
commit | d52d889f2bf6ff0aa329c3058120140da2debaf7 (patch) | |
tree | 5ca5005e19098f1a192be907407c65326059c26d /tvix/store/src/bin/tvix-store.rs | |
parent | 3fda90602d3de7a720149f090422c4da9d12d31d (diff) |
refactor(tvix): make indicatif.pb_show=1 more explicit r/8837
This pushes generating spans with pb_show up to the caller. They usually have more context on how to present things, if at all. Change-Id: Icfcaa64a8a57dce50c0261f2d06e7c051e3946c2 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12657 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 139 |
1 files changed, 107 insertions, 32 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 6cc7c39ab74e..1aed0a8e4453 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -1,9 +1,11 @@ use clap::Parser; use clap::Subcommand; -use futures::future::try_join_all; use futures::StreamExt; use futures::TryStreamExt; +use nix_compat::nix_daemon::de::Error; +use nix_compat::nixhash::CAHash; +use nix_compat::nixhash::NixHash; use nix_compat::{path_info::ExportedPathInfo, store_path::StorePath}; use serde::Deserialize; use serde::Serialize; @@ -12,11 +14,13 @@ use std::sync::Arc; use tonic::transport::Server; use tower::ServiceBuilder; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; -use tracing::{info, info_span, instrument, Level, Span}; -use tracing_indicatif::span_ext::IndicatifSpanExt as _; +use tracing::{debug, info, info_span, instrument, warn, Instrument, Level, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::import::fs::ingest_path; +use tvix_store::import::path_to_name; use tvix_store::nar::NarCalculationService; use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc}; +use tvix_tracing::TracingHandle; use tvix_castore::proto::blob_service_server::BlobServiceServer; use tvix_castore::proto::directory_service_server::DirectoryServiceServer; @@ -159,7 +163,10 @@ fn default_threads() -> usize { } #[instrument(skip_all)] -async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { +async fn run_cli( + cli: Cli, + tracing_handle: TracingHandle, +) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { match cli.command { Commands::Daemon { listen_args, @@ -242,37 +249,105 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync let nar_calculation_service: Arc<dyn NarCalculationService> = nar_calculation_service.into(); - let tasks = paths + // For each path passed, construct the name, or bail out if it's invalid. + let paths_and_names = paths .into_iter() - .map(|path| { - tokio::task::spawn({ - let blob_service = blob_service.clone(); - let directory_service = directory_service.clone(); - let path_info_service = path_info_service.clone(); - let nar_calculation_service = nar_calculation_service.clone(); - - async move { - if let Ok(name) = tvix_store::import::path_to_name(&path) { - let resp = tvix_store::import::import_path_as_nar_ca( - &path, - name, - blob_service, - directory_service, - path_info_service, - nar_calculation_service, - ) - .await; - if let Ok(path_info) = resp { - // If the import was successful, print the path to stdout. - println!("{}", path_info.store_path.to_absolute_path()); - } + .map(|p| match path_to_name(&p) { + Ok(name) => { + let name = name.to_owned(); + Ok((p, name)) + } + Err(e) => Err(e), + }) + .collect::<Result<Vec<_>, _>>()?; + + let imports_span = + info_span!("import paths", "indicatif.pb_show" = tracing::field::Empty); + imports_span.pb_set_message("Importing"); + imports_span.pb_set_length(paths_and_names.len() as u64); + imports_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + imports_span.pb_start(); + + futures::stream::iter(paths_and_names) + .map(|(path, name)| { + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + let path_info_service = path_info_service.clone(); + let nar_calculation_service = nar_calculation_service.clone(); + let imports_span = imports_span.clone(); + let tracing_handle = tracing_handle.clone(); + + async move { + let span = Span::current(); + span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE); + span.pb_set_message(&format!("Ingesting {:?}", path)); + span.pb_start(); + + // Ingest the contents at the given path into castore. + let root_node = ingest_path::<_, _, _, &[u8]>( + blob_service, + directory_service, + &path, + None, + ) + .await + .map_err(std::io::Error::custom)?; + + span.pb_set_message(&format!("NAR Calculation for {:?}", path)); + + // Ask for the NAR size and sha256 + let (nar_size, nar_sha256) = + nar_calculation_service.calculate_nar(&root_node).await?; + + // Calculate the output path. This might still fail, as some names are illegal. + // FUTUREWORK: express the `name` at the type level to be valid and check for this earlier. + let ca = CAHash::Nar(NixHash::Sha256(nar_sha256)); + let output_path: StorePath<String> = + nix_compat::store_path::build_ca_path::<&str, _, _>( + &name, + &ca, + [], + false, + ) + .map_err(|e| { + warn!(err=%e, "unable to build CA path"); + std::io::Error::custom(e) + })?; + + // Construct and insert PathInfo + match path_info_service + .as_ref() + .put(PathInfo { + store_path: output_path.to_owned(), + node: root_node, + // There's no reference scanning on imported paths + references: vec![], + nar_size, + nar_sha256, + signatures: vec![], + deriver: None, + ca: Some(ca), + }) + .await + { + // If the import was successful, print the path to stdout. + Ok(path_info) => { + use std::io::Write; + debug!(store_path=%path_info.store_path.to_absolute_path(), "imported path"); + writeln!(&mut tracing_handle.get_stdout_writer(), "{}", path_info.store_path.to_absolute_path())?; + imports_span.pb_inc(1); + Ok(()) + } + Err(e) => { + warn!(?path, err=%e, "failed to import"); + Err(e) } } - }) + }.instrument(info_span!("import path", "indicatif.pb_show" = tracing::field::Empty)) }) - .collect::<Vec<_>>(); - - try_join_all(tasks).await?; + .buffer_unordered(50) + .try_collect() + .await?; } Commands::Copy { service_addrs, @@ -465,7 +540,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { } Ok(()) }, - res = run_cli(cli) => { + res = run_cli(cli, tracing_handle.clone()) => { if let Err(e) = tracing_handle.shutdown().await { eprintln!("failed to shutdown tracing: {e}"); } |