From d25f047b9d12bab692e61f6bbdc4c8d673914f34 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 14 Jun 2024 13:54:24 +0300 Subject: refactor(tvix/tracing): move otlp setup into helper function Having all this in the main control flow makes it a bit hard to read. Moving it into a helper function makes it a bit cleaner. Change-Id: Ibdb739dbd1e013b4f8c4aaf9b036a6bd556a1871 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11814 Autosubmit: flokli Reviewed-by: Simon Hauser Tested-by: BuildkiteCI --- tvix/tracing/src/lib.rs | 145 ++++++++++++++++++++++++++---------------------- 1 file changed, 78 insertions(+), 67 deletions(-) (limited to 'tvix/tracing/src/lib.rs') diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs index c8c1385c3fba..36bd7cec10c0 100644 --- a/tvix/tracing/src/lib.rs +++ b/tvix/tracing/src/lib.rs @@ -6,7 +6,7 @@ use tracing_indicatif::{filter::IndicatifFilter, IndicatifLayer}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; #[cfg(feature = "otlp")] -use opentelemetry::KeyValue; +use opentelemetry::{trace::Tracer, KeyValue}; #[cfg(feature = "otlp")] use opentelemetry_sdk::{ resource::{ResourceDetector, SdkProvidedResourceDetector}, @@ -170,72 +170,7 @@ impl TracingBuilder { #[cfg(feature = "otlp")] { if let Some(service_name) = self.service_name { - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().tonic()) - .with_batch_config( - BatchConfigBuilder::default() - // the default values for `max_export_batch_size` is set to 512, which we will fill - // pretty quickly, which will then result in an export. We want to make sure that - // the export is only done once the schedule is met and not as soon as 512 spans - // are collected. - .with_max_export_batch_size(4096) - // analog to default config `max_export_batch_size * 4` - .with_max_queue_size(4096 * 4) - // only force an export to the otlp collector every 10 seconds to reduce the amount - // of error messages if an otlp collector is not available - .with_scheduled_delay(std::time::Duration::from_secs(10)) - .build(), - ) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ - // use SdkProvidedResourceDetector.detect to detect resources, - // but replace the default service name with our default. - // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 - let resources = - SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); - // SdkProvidedResourceDetector currently always sets - // `service.name`, but we don't like its default. - if resources.get("service.name".into()).unwrap() == "unknown_service".into() - { - resources.merge(&Resource::new([KeyValue::new( - "service.name", - service_name, - )])) - } else { - resources - } - })) - .install_batch(opentelemetry_sdk::runtime::Tokio) - .expect("Failed to install batch exporter using Tokio"); - - // Trace provider is need for later use like flushing the provider. - // Needs to be kept around for each message to rx we need to handle. - let tracer_provider = tracer - .provider() - .expect("Failed to get the tracer provider"); - - // Set up a channel for flushing trace providers later - let (tx, mut rx) = mpsc::channel::>>(16); - - // Spawning a task that listens on rx for any message. Once we receive a message we - // correctly call flush on the tracer_provider. - tokio::spawn(async move { - while let Some(m) = rx.recv().await { - // Because of a bug within otlp we currently have to use spawn_blocking - // otherwise will calling `force_flush` block forever, especially if the - // tool was closed with ctrl_c. See - // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335 - let _ = tokio::task::spawn_blocking({ - let tracer_provider = tracer_provider.clone(); - move || tracer_provider.force_flush() - }) - .await; - if let Some(tx) = m { - let _ = tx.send(()); - } - } - }); - + let (tracer, tx) = gen_otlp_tracer(service_name.to_string()); // Create a tracing layer with the configured tracer let layer = tracing_opentelemetry::layer().with_tracer(tracer); subscriber.with(Some(layer)).try_init()?; @@ -247,3 +182,79 @@ impl TracingBuilder { Ok(TracingHandle { tx: None }) } } + +/// Returns an OTLP tracer, and the TX part of a channel, which can be used +/// to request flushes (and signal back the completion of the flush). +#[cfg(feature = "otlp")] +fn gen_otlp_tracer( + service_name: String, +) -> ( + impl Tracer + tracing_opentelemetry::PreSampledTracer, + mpsc::Sender>>, +) { + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .with_batch_config( + BatchConfigBuilder::default() + // the default values for `max_export_batch_size` is set to 512, which we will fill + // pretty quickly, which will then result in an export. We want to make sure that + // the export is only done once the schedule is met and not as soon as 512 spans + // are collected. + .with_max_export_batch_size(4096) + // analog to default config `max_export_batch_size * 4` + .with_max_queue_size(4096 * 4) + // only force an export to the otlp collector every 10 seconds to reduce the amount + // of error messages if an otlp collector is not available + .with_scheduled_delay(std::time::Duration::from_secs(10)) + .build(), + ) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ + // use SdkProvidedResourceDetector.detect to detect resources, + // but replace the default service name with our default. + // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 + let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); + // SdkProvidedResourceDetector currently always sets + // `service.name`, but we don't like its default. + if resources.get("service.name".into()).unwrap() == "unknown_service".into() { + resources.merge(&Resource::new([KeyValue::new( + "service.name", + service_name, + )])) + } else { + resources + } + })) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .expect("Failed to install batch exporter using Tokio"); + + // Trace provider is need for later use like flushing the provider. + // Needs to be kept around for each message to rx we need to handle. + let tracer_provider = tracer + .provider() + .expect("Failed to get the tracer provider"); + + // Set up a channel for flushing trace providers later + let (tx, mut rx) = mpsc::channel::>>(16); + + // Spawning a task that listens on rx for any message. Once we receive a message we + // correctly call flush on the tracer_provider. + tokio::spawn(async move { + while let Some(m) = rx.recv().await { + // Because of a bug within otlp we currently have to use spawn_blocking + // otherwise will calling `force_flush` block forever, especially if the + // tool was closed with ctrl_c. See + // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335 + let _ = tokio::task::spawn_blocking({ + let tracer_provider = tracer_provider.clone(); + move || tracer_provider.force_flush() + }) + .await; + if let Some(tx) = m { + let _ = tx.send(()); + } + } + }); + + (tracer, tx) +} -- cgit 1.4.1