about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-09-29T21·35+0200
committerflokli <flokli@flokli.de>2024-11-23T09·40+0000
commit5f670a2f67723436f020d96dbe74fe01db51ebf7 (patch)
tree34147bb27d15d03ca281b4e427e0b5428c2b234b
parent6f1d059c7daed0dbd53d2f17734262cd17d8a37d (diff)
feat(tvix/tracing): configure metrics support r/8951
This creates and registers a global meter provider, which uses the same
mechanism to get notified of flushes.

Change-Id: I856a67f0b282d494de3b2c2a1b79c06ae8ffe252
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12556
Reviewed-by: Jonas Chevalier <zimbatm@zimbatm.com>
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
-rw-r--r--tvix/tracing/src/lib.rs94
1 files changed, 69 insertions, 25 deletions
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs
index c7e4e275157d..16a003c718fd 100644
--- a/tvix/tracing/src/lib.rs
+++ b/tvix/tracing/src/lib.rs
@@ -12,10 +12,7 @@ use tracing_subscriber::{
 };
 
 #[cfg(feature = "otlp")]
-use opentelemetry::{
-    trace::{Tracer, TracerProvider},
-    KeyValue,
-};
+use opentelemetry::{trace::Tracer, KeyValue};
 #[cfg(feature = "otlp")]
 use opentelemetry_sdk::{
     propagation::TraceContextPropagator,
@@ -228,8 +225,15 @@ impl TracingBuilder {
                 // register a text map propagator for trace propagation
                 opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
 
-                let (tracer, sender) = gen_otlp_tracer(service_name.to_string());
+                let (tracer, meter_provider, sender) =
+                    gen_otlp_tracer_meter_provider(service_name.to_string());
+
                 flush_tx = Some(sender);
+
+                // Register the returned meter provider as the global one.
+                // FUTUREWORK: store in the struct and provide getter instead?
+                opentelemetry::global::set_meter_provider(meter_provider);
+
                 // Create a tracing layer with the configured tracer
                 Some(tracing_opentelemetry::layer().with_tracer(tracer))
             } else {
@@ -319,17 +323,49 @@ fn gen_tracer_provider(
     Ok(tracer_provider)
 }
 
-/// Returns an OTLP tracer, and the TX part of a channel, which can be used
-/// to request flushes (and signal back the completion of the flush).
 #[cfg(feature = "otlp")]
-fn gen_otlp_tracer(
+fn gen_meter_provider(
+    service_name: String,
+) -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, opentelemetry_sdk::metrics::MetricError> {
+    use std::time::Duration;
+
+    use opentelemetry_otlp::WithExportConfig;
+    use opentelemetry_sdk::{
+        metrics::{PeriodicReader, SdkMeterProvider},
+        runtime,
+    };
+    let exporter = opentelemetry_otlp::MetricExporter::builder()
+        .with_tonic()
+        .with_timeout(Duration::from_secs(10))
+        .build()?;
+
+    Ok(SdkMeterProvider::builder()
+        .with_reader(
+            PeriodicReader::builder(exporter, runtime::Tokio)
+                .with_interval(Duration::from_secs(3))
+                .with_timeout(Duration::from_secs(10))
+                .build(),
+        )
+        .with_resource(gen_resources(service_name))
+        .build())
+}
+
+/// Returns an OTLP tracer, and a meter provider, as well as the TX part
+/// of a channel, which can be used to request flushes (and signal back the
+/// completion of the flush).
+#[cfg(feature = "otlp")]
+fn gen_otlp_tracer_meter_provider(
     service_name: String,
 ) -> (
-    impl Tracer + tracing_opentelemetry::PreSampledTracer + 'static,
+    impl Tracer + tracing_opentelemetry::PreSampledTracer,
+    opentelemetry_sdk::metrics::SdkMeterProvider,
     mpsc::Sender<oneshot::Sender<()>>,
 ) {
+    use opentelemetry::trace::TracerProvider;
     let tracer_provider =
         gen_tracer_provider(service_name.clone()).expect("Unable to configure trace provider");
+    let meter_provider =
+        gen_meter_provider(service_name).expect("Unable to configure meter provider");
 
     // tracer_provider needs to be kept around so we can request flushes later.
     let tracer = tracer_provider.tracer("tvix");
@@ -339,23 +375,31 @@ fn gen_otlp_tracer(
 
     // Spawning a task that listens on rx for any message. Once we receive a message we
     // correctly call flush on the tracer_provider.
-    tokio::spawn(async move {
-        while let Some(m) = flush_rx.recv().await {
-            // Because of a bug within otlp we currently have to use spawn_blocking
-            // otherwise will calling `force_flush` block forever, especially if the
-            // tool was closed with ctrl_c. See
-            // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
-            let _ = tokio::task::spawn_blocking({
-                let tracer_provider = tracer_provider.clone();
-
-                move || {
-                    tracer_provider.force_flush();
-                }
-            })
-            .await;
-            let _ = m.send(());
+    tokio::spawn({
+        let meter_provider = meter_provider.clone();
+
+        async move {
+            while let Some(m) = flush_rx.recv().await {
+                // Because of a bug within otlp we currently have to use spawn_blocking
+                // otherwise will calling `force_flush` block forever, especially if the
+                // tool was closed with ctrl_c. See
+                // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
+                let _ = tokio::task::spawn_blocking({
+                    let tracer_provider = tracer_provider.clone();
+                    let meter_provider = meter_provider.clone();
+
+                    move || {
+                        tracer_provider.force_flush();
+                        if let Err(e) = meter_provider.force_flush() {
+                            eprintln!("failed to flush meter provider: {}", e);
+                        }
+                    }
+                })
+                .await;
+                let _ = m.send(());
+            }
         }
     });
 
-    (tracer, flush_tx)
+    (tracer, meter_provider, flush_tx)
 }