diff options
Diffstat (limited to 'tvix/tracing/src/lib.rs')
-rw-r--r-- | tvix/tracing/src/lib.rs | 106 |
1 files changed, 67 insertions, 39 deletions
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs index 2c73b53a4a00..c7e4e275157d 100644 --- a/tvix/tracing/src/lib.rs +++ b/tvix/tracing/src/lib.rs @@ -7,7 +7,7 @@ use tracing_indicatif::{ }; use tracing_subscriber::{ layer::{Identity, SubscriberExt}, - util::SubscriberInitExt, + util::SubscriberInitExt as _, EnvFilter, Layer, Registry, }; @@ -20,7 +20,6 @@ use opentelemetry::{ use opentelemetry_sdk::{ propagation::TraceContextPropagator, resource::{ResourceDetector, SdkProvidedResourceDetector}, - trace::BatchConfigBuilder, Resource, }; #[cfg(feature = "tracy")] @@ -261,50 +260,76 @@ impl TracingBuilder { } } +#[cfg(feature = "otlp")] +fn gen_resources(service_name: String) -> Resource { + // use SdkProvidedResourceDetector.detect to detect resources, + // but replace the default service name with our default. + // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 + + let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); + // SdkProvidedResourceDetector currently always sets + // `service.name`, but we don't like its default. + if resources.get("service.name".into()).unwrap() == "unknown_service".into() { + resources.merge(&Resource::new([KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + service_name, + )])) + } else { + resources + } +} + +/// Returns an OTLP tracer, and the TX part of a channel, which can be used +/// to request flushes (and signal back the completion of the flush). +#[cfg(feature = "otlp")] +fn gen_tracer_provider( + service_name: String, +) -> Result<opentelemetry_sdk::trace::TracerProvider, opentelemetry::trace::TraceError> { + use opentelemetry_otlp::SpanExporter; + use opentelemetry_sdk::{runtime, trace::TracerProvider}; + + let exporter = SpanExporter::builder().with_tonic().build()?; + + let tracer_provider = TracerProvider::builder() + .with_batch_exporter(exporter, runtime::Tokio) + .with_config( + opentelemetry_sdk::trace::Config::default().with_resource(gen_resources(service_name)), + ) + .build(); + + // Unclear how to configure this + // let batch_config = BatchConfigBuilder::default() + // // the default values for `max_export_batch_size` is set to 512, which we will fill + // // pretty quickly, which will then result in an export. We want to make sure that + // // the export is only done once the schedule is met and not as soon as 512 spans + // // are collected. + // .with_max_export_batch_size(4096) + // // analog to default config `max_export_batch_size * 4` + // .with_max_queue_size(4096 * 4) + // // only force an export to the otlp collector every 10 seconds to reduce the amount + // // of error messages if an otlp collector is not available + // .with_scheduled_delay(std::time::Duration::from_secs(10)) + // .build(); + + // use opentelemetry_sdk::trace::BatchSpanProcessor; + // let batch_span_processor = BatchSpanProcessor::builder(exporter, runtime::Tokio) + // .with_batch_config(batch_config) + // .build(); + + Ok(tracer_provider) +} + /// Returns an OTLP tracer, and the TX part of a channel, which can be used /// to request flushes (and signal back the completion of the flush). #[cfg(feature = "otlp")] fn gen_otlp_tracer( service_name: String, ) -> ( - impl Tracer + tracing_opentelemetry::PreSampledTracer, + impl Tracer + tracing_opentelemetry::PreSampledTracer + 'static, mpsc::Sender<oneshot::Sender<()>>, ) { - let tracer_provider = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().tonic()) - .with_batch_config( - BatchConfigBuilder::default() - // the default values for `max_export_batch_size` is set to 512, which we will fill - // pretty quickly, which will then result in an export. We want to make sure that - // the export is only done once the schedule is met and not as soon as 512 spans - // are collected. - .with_max_export_batch_size(4096) - // analog to default config `max_export_batch_size * 4` - .with_max_queue_size(4096 * 4) - // only force an export to the otlp collector every 10 seconds to reduce the amount - // of error messages if an otlp collector is not available - .with_scheduled_delay(std::time::Duration::from_secs(10)) - .build(), - ) - .with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource({ - // use SdkProvidedResourceDetector.detect to detect resources, - // but replace the default service name with our default. - // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 - let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); - // SdkProvidedResourceDetector currently always sets - // `service.name`, but we don't like its default. - if resources.get("service.name".into()).unwrap() == "unknown_service".into() { - resources.merge(&Resource::new([KeyValue::new( - "service.name", - service_name, - )])) - } else { - resources - } - })) - .install_batch(opentelemetry_sdk::runtime::Tokio) - .expect("Failed to install batch exporter using Tokio"); + let tracer_provider = + gen_tracer_provider(service_name.clone()).expect("Unable to configure trace provider"); // tracer_provider needs to be kept around so we can request flushes later. let tracer = tracer_provider.tracer("tvix"); @@ -322,7 +347,10 @@ fn gen_otlp_tracer( // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335 let _ = tokio::task::spawn_blocking({ let tracer_provider = tracer_provider.clone(); - move || tracer_provider.force_flush() + + move || { + tracer_provider.force_flush(); + } }) .await; let _ = m.send(()); |