diff options
-rw-r--r-- | tvix/Cargo.lock | 2 | ||||
-rw-r--r-- | tvix/Cargo.nix | 9 | ||||
-rw-r--r-- | tvix/build/src/bin/tvix-build.rs | 2 | ||||
-rw-r--r-- | tvix/cli/src/main.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 55 | ||||
-rw-r--r-- | tvix/tracing/Cargo.toml | 2 | ||||
-rw-r--r-- | tvix/tracing/src/lib.rs | 232 |
7 files changed, 242 insertions, 65 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 15e098d0f157..e8c434e2de3a 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -4372,6 +4372,8 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "thiserror", + "tokio", "tracing", "tracing-indicatif", "tracing-opentelemetry", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 360747b8d283..4a0f755fe071 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -14008,6 +14008,15 @@ rec { features = [ "rt-tokio" ]; } { + name = "thiserror"; + packageId = "thiserror"; + } + { + name = "tokio"; + packageId = "tokio"; + features = [ "sync" "rt" ]; + } + { name = "tracing"; packageId = "tracing"; features = [ "max_level_trace" "release_max_level_debug" ]; diff --git a/tvix/build/src/bin/tvix-build.rs b/tvix/build/src/bin/tvix-build.rs index 07085a433cfa..c96145ebb479 100644 --- a/tvix/build/src/bin/tvix-build.rs +++ b/tvix/build/src/bin/tvix-build.rs @@ -54,7 +54,7 @@ enum Commands { async fn main() -> Result<(), Box<dyn std::error::Error>> { let cli = Cli::parse(); - tvix_tracing::init(cli.log_level)?; + let _ = tvix_tracing::TracingBuilder::default().level(cli.log_level); match cli.command { Commands::Daemon { diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs index 5e0b8c41819d..3ec45bc37250 100644 --- a/tvix/cli/src/main.rs +++ b/tvix/cli/src/main.rs @@ -279,7 +279,10 @@ fn lint(code: &str, path: Option<PathBuf>, args: &Args) -> bool { fn main() { let args = Args::parse(); - tvix_tracing::init(args.log_level).expect("unable to set up tracing subscriber"); + let _ = tvix_tracing::TracingBuilder::default() + .level(args.log_level) + .build() + .expect("unable to set up tracing subscriber"); let tokio_runtime = tokio::runtime::Runtime::new().expect("failed to setup tokio runtime"); let io_handle = init_io_handle(&tokio_runtime, &args); diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 03c699b893cd..dadfa114f72b 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -199,25 +199,8 @@ fn default_threads() -> usize { 1 } -#[tokio::main] -#[instrument(fields(indicatif.pb_show=1))] -async fn main() -> Result<(), Box<dyn std::error::Error>> { - let cli = Cli::parse(); - - #[cfg(feature = "otlp")] - { - if cli.otlp { - tvix_tracing::init_with_otlp(cli.log_level, "tvix.store")?; - } else { - tvix_tracing::init(cli.log_level)?; - } - } - - #[cfg(not(feature = "otlp"))] - { - tvix_tracing::init(cli.log_level)?; - } - +#[instrument(fields(indicatif.pb_show=1), skip(cli))] +async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { match cli.command { Commands::Daemon { listen_address, @@ -528,3 +511,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { }; Ok(()) } + +#[tokio::main] +#[instrument(fields(indicatif.pb_show=1))] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + let cli = Cli::parse(); + + let tracing_handle = { + let mut builder = tvix_tracing::TracingBuilder::default(); + builder = builder.level(cli.log_level); + #[cfg(feature = "otlp")] + { + if cli.otlp { + builder = builder.enable_otlp("tvix.store"); + } + } + builder.build()? + }; + + tokio::select! { + res = tokio::signal::ctrl_c() => { + res?; + if let Err(e) = tracing_handle.force_shutdown().await { + eprintln!("failed to shutdown tracing: {e}"); + } + Ok(()) + }, + res = run_cli(cli) => { + if let Err(e) = tracing_handle.shutdown().await { + eprintln!("failed to shutdown tracing: {e}"); + } + res + } + } +} diff --git a/tvix/tracing/Cargo.toml b/tvix/tracing/Cargo.toml index f65c9eba6cab..41654d00c02f 100644 --- a/tvix/tracing/Cargo.toml +++ b/tvix/tracing/Cargo.toml @@ -9,6 +9,8 @@ tracing = { version = "0.1.40", features = ["max_level_trace", "release_max_leve tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } indicatif = "0.17.8" tracing-indicatif = "0.3.6" +tokio = { version = "1.32.0" , features = ["sync", "rt"] } +thiserror = "1.0.38" tracing-opentelemetry = { version = "0.23.0", optional = true } opentelemetry = { version = "0.22.0", optional = true } diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs index 8ad90835338e..312c9a5a9a62 100644 --- a/tvix/tracing/src/lib.rs +++ b/tvix/tracing/src/lib.rs @@ -1,5 +1,6 @@ use indicatif::ProgressStyle; use lazy_static::lazy_static; +use tokio::sync::{mpsc, oneshot}; use tracing::Level; use tracing_indicatif::{filter::IndicatifFilter, IndicatifLayer}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; @@ -24,20 +25,136 @@ lazy_static! { .expect("invalid progress template"); } -// using a macro_rule here because of the complex return type -macro_rules! init_base_subscriber { - ($level: expr) => {{ - let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone()); +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error(transparent)] + Init(#[from] tracing_subscriber::util::TryInitError), + + #[error(transparent)] + MpscSend(#[from] mpsc::error::SendError<Option<oneshot::Sender<()>>>), + + #[error(transparent)] + OneshotRecv(#[from] oneshot::error::RecvError), +} + +#[derive(Clone)] +pub struct TracingHandle { + tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>>, +} + +impl TracingHandle { + /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled. + /// If there is none enabled this will result in a noop. + /// + /// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which + /// will receive a message once the flush is completed. + pub async fn flush(&self, msg: Option<oneshot::Sender<()>>) -> Result<(), Error> { + if let Some(tx) = &self.tx { + Ok(tx.send(msg).await?) + } else { + // If we have a message passed in we need to notify the receiver + if let Some(tx) = msg { + let _ = tx.send(()); + } + Ok(()) + } + } + /// This will flush all all attached tracing providers and will wait until the flush is completed. + /// If no tracing providers like otlp are attached then this will be a noop. + /// + /// This should only be called on a regular shutdown. + /// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown) + /// otherwise you will get otlp errors. + pub async fn shutdown(&self) -> Result<(), Error> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.flush(Some(tx)).await?; + rx.await?; + Ok(()) + } + + /// This will flush all all attached tracing providers and will wait until the flush is completed. + /// After this it will do some other necessary cleanup. + /// If no tracing providers like otlp are attached then this will be a noop. + /// + /// This should only be used if the tool received an ctrl_c otherwise you will get otlp errors. + /// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown) + /// method. + pub async fn force_shutdown(&self) -> Result<(), Error> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.flush(Some(tx)).await?; + rx.await?; + + #[cfg(feature = "otlp")] + { + // Because of a bug within otlp we currently have to use spawn_blocking otherwise + // calling `shutdown_tracer_provider` can block forever. See + // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335 + // + // This still throws an error, if the tool exits regularly: "OpenTelemetry trace error + // occurred. oneshot canceled", but not having this leads to errors if we cancel with + // ctrl_c. + // So this should right now only be used on ctrl_c, for a regular exit use the + // [shutdown](#shutdown) method + let _ = tokio::task::spawn_blocking(move || { + opentelemetry::global::shutdown_tracer_provider(); + }) + .await; + } + + Ok(()) + } +} + +pub struct TracingBuilder { + level: Level, + + #[cfg(feature = "otlp")] + service_name: Option<&'static str>, +} + +impl Default for TracingBuilder { + fn default() -> Self { + TracingBuilder { + level: Level::INFO, + + #[cfg(feature = "otlp")] + service_name: None, + } + } +} + +impl TracingBuilder { + /// Set the log level of the stderr writer, RUST_LOG still has a higher priority over this + pub fn level(mut self, level: Level) -> TracingBuilder { + self.level = level; + self + } + + #[cfg(feature = "otlp")] + /// Enable otlp by setting a custom service_name + pub fn enable_otlp(mut self, service_name: &'static str) -> TracingBuilder { + self.service_name = Some(service_name); + self + } + + /// This will setup tracing based on the configuration passed in. + /// It will setup a stderr writer with the provided log level as filter (RUST_LOG still has a + /// higher priority over the configured value) + /// + /// It will also configure otlp if the feature is enabled and a service_name was provided. It + /// will then correctly setup a channel which is later used for flushing the provider. + pub fn build(self) -> Result<TracingHandle, Error> { // Set up the tracing subscriber. - tracing_subscriber::registry() + let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone()); + let subscriber = tracing_subscriber::registry() .with( tracing_subscriber::fmt::Layer::new() .with_writer(indicatif_layer.get_stderr_writer()) .compact() .with_filter( EnvFilter::builder() - .with_default_directive($level.into()) + .with_default_directive(self.level.into()) .from_env() .expect("invalid RUST_LOG"), ), @@ -45,46 +162,73 @@ macro_rules! init_base_subscriber { .with(indicatif_layer.with_filter( // only show progress for spans with indicatif.pb_show field being set IndicatifFilter::new(false), - )) - }}; -} + )); -pub fn init(level: Level) -> Result<(), tracing_subscriber::util::TryInitError> { - init_base_subscriber!(level).try_init() -} + // Setup otlp if a service_name is configured + #[cfg(feature = "otlp")] + { + if let Some(service_name) = self.service_name { + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .with_batch_config(BatchConfig::default()) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ + // use SdkProvidedResourceDetector.detect to detect resources, + // but replace the default service name with our default. + // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 + let resources = + SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); + // SdkProvidedResourceDetector currently always sets + // `service.name`, but we don't like its default. + if resources.get("service.name".into()).unwrap() == "unknown_service".into() + { + resources.merge(&Resource::new([KeyValue::new( + "service.name", + service_name, + )])) + } else { + resources + } + })) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .expect("Failed to install batch exporter using Tokio"); -#[cfg(feature = "otlp")] -pub fn init_with_otlp( - level: Level, - service_name: &'static str, -) -> Result<(), tracing_subscriber::util::TryInitError> { - let subscriber = init_base_subscriber!(level); - - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().tonic()) - .with_batch_config(BatchConfig::default()) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ - // use SdkProvidedResourceDetector.detect to detect resources, - // but replace the default service name with our default. - // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 - let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); - // SdkProvidedResourceDetector currently always sets - // `service.name`, but we don't like its default. - if resources.get("service.name".into()).unwrap() == "unknown_service".into() { - resources.merge(&Resource::new([KeyValue::new( - "service.name", - service_name, - )])) - } else { - resources - } - })) - .install_batch(opentelemetry_sdk::runtime::Tokio) - .expect("Failed to install tokio runtime"); + // Trace provider is need for later use like flushing the provider. + // Needs to be kept around for each message to rx we need to handle. + let tracer_provider = tracer + .provider() + .expect("Failed to get the tracer provider"); - // Create a tracing layer with the configured tracer - let layer = tracing_opentelemetry::layer().with_tracer(tracer); + // Set up a channel for flushing trace providers later + let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16); + + // Spawning a task that listens on rx for any message. Once we receive a message we + // correctly call flush on the tracer_provider. + tokio::spawn(async move { + while let Some(m) = rx.recv().await { + // Because of a bug within otlp we currently have to use spawn_blocking + // otherwise will calling `force_flush` block forever, especially if the + // tool was closed with ctrl_c. See + // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335 + let _ = tokio::task::spawn_blocking({ + let tracer_provider = tracer_provider.clone(); + move || tracer_provider.force_flush() + }) + .await; + if let Some(tx) = m { + let _ = tx.send(()); + } + } + }); + + // Create a tracing layer with the configured tracer + let layer = tracing_opentelemetry::layer().with_tracer(tracer); + subscriber.with(Some(layer)).try_init()?; + return Ok(TracingHandle { tx: Some(tx) }); + } + } - subscriber.with(Some(layer)).try_init() + subscriber.try_init()?; + Ok(TracingHandle { tx: None }) + } } |