about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-06-14T10·54+0300
committerclbot <clbot@tvl.fyi>2024-06-14T14·52+0000
commitd25f047b9d12bab692e61f6bbdc4c8d673914f34 (patch)
tree552373ed7b59a81f91ce027ed6e8a279388169eb /tvix
parent118e69d81da73a4d55c6ab3ba7e1cc924bd3f506 (diff)
refactor(tvix/tracing): move otlp setup into helper function r/8274
Having all this in the main control flow makes it a bit hard to read.
Moving it into a helper function makes it a bit cleaner.

Change-Id: Ibdb739dbd1e013b4f8c4aaf9b036a6bd556a1871
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11814
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Simon Hauser <simon.hauser@helsinki-systems.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix')
-rw-r--r--tvix/tracing/src/lib.rs145
1 files changed, 78 insertions, 67 deletions
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs
index c8c1385c3fba..36bd7cec10c0 100644
--- a/tvix/tracing/src/lib.rs
+++ b/tvix/tracing/src/lib.rs
@@ -6,7 +6,7 @@ use tracing_indicatif::{filter::IndicatifFilter, IndicatifLayer};
 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
 
 #[cfg(feature = "otlp")]
-use opentelemetry::KeyValue;
+use opentelemetry::{trace::Tracer, KeyValue};
 #[cfg(feature = "otlp")]
 use opentelemetry_sdk::{
     resource::{ResourceDetector, SdkProvidedResourceDetector},
@@ -170,72 +170,7 @@ impl TracingBuilder {
         #[cfg(feature = "otlp")]
         {
             if let Some(service_name) = self.service_name {
-                let tracer = opentelemetry_otlp::new_pipeline()
-                    .tracing()
-                    .with_exporter(opentelemetry_otlp::new_exporter().tonic())
-                    .with_batch_config(
-                        BatchConfigBuilder::default()
-                            // the default values for `max_export_batch_size` is set to 512, which we will fill
-                            // pretty quickly, which will then result in an export. We want to make sure that
-                            // the export is only done once the schedule is met and not as soon as 512 spans
-                            // are collected.
-                            .with_max_export_batch_size(4096)
-                            // analog to default config `max_export_batch_size * 4`
-                            .with_max_queue_size(4096 * 4)
-                            // only force an export to the otlp collector every 10 seconds to reduce the amount
-                            // of error messages if an otlp collector is not available
-                            .with_scheduled_delay(std::time::Duration::from_secs(10))
-                            .build(),
-                    )
-                    .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
-                        // use SdkProvidedResourceDetector.detect to detect resources,
-                        // but replace the default service name with our default.
-                        // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
-                        let resources =
-                            SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
-                        // SdkProvidedResourceDetector currently always sets
-                        // `service.name`, but we don't like its default.
-                        if resources.get("service.name".into()).unwrap() == "unknown_service".into()
-                        {
-                            resources.merge(&Resource::new([KeyValue::new(
-                                "service.name",
-                                service_name,
-                            )]))
-                        } else {
-                            resources
-                        }
-                    }))
-                    .install_batch(opentelemetry_sdk::runtime::Tokio)
-                    .expect("Failed to install batch exporter using Tokio");
-
-                // Trace provider is need for later use like flushing the provider.
-                // Needs to be kept around for each message to rx we need to handle.
-                let tracer_provider = tracer
-                    .provider()
-                    .expect("Failed to get the tracer provider");
-
-                // Set up a channel for flushing trace providers later
-                let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
-
-                // Spawning a task that listens on rx for any message. Once we receive a message we
-                // correctly call flush on the tracer_provider.
-                tokio::spawn(async move {
-                    while let Some(m) = rx.recv().await {
-                        // Because of a bug within otlp we currently have to use spawn_blocking
-                        // otherwise will calling `force_flush` block forever, especially if the
-                        // tool was closed with ctrl_c. See
-                        // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
-                        let _ = tokio::task::spawn_blocking({
-                            let tracer_provider = tracer_provider.clone();
-                            move || tracer_provider.force_flush()
-                        })
-                        .await;
-                        if let Some(tx) = m {
-                            let _ = tx.send(());
-                        }
-                    }
-                });
-
+                let (tracer, tx) = gen_otlp_tracer(service_name.to_string());
                 // Create a tracing layer with the configured tracer
                 let layer = tracing_opentelemetry::layer().with_tracer(tracer);
                 subscriber.with(Some(layer)).try_init()?;
@@ -247,3 +182,79 @@ impl TracingBuilder {
         Ok(TracingHandle { tx: None })
     }
 }
+
+/// Returns an OTLP tracer, and the TX part of a channel, which can be used
+/// to request flushes (and signal back the completion of the flush).
+#[cfg(feature = "otlp")]
+fn gen_otlp_tracer(
+    service_name: String,
+) -> (
+    impl Tracer + tracing_opentelemetry::PreSampledTracer,
+    mpsc::Sender<Option<oneshot::Sender<()>>>,
+) {
+    let tracer = opentelemetry_otlp::new_pipeline()
+        .tracing()
+        .with_exporter(opentelemetry_otlp::new_exporter().tonic())
+        .with_batch_config(
+            BatchConfigBuilder::default()
+                // the default values for `max_export_batch_size` is set to 512, which we will fill
+                // pretty quickly, which will then result in an export. We want to make sure that
+                // the export is only done once the schedule is met and not as soon as 512 spans
+                // are collected.
+                .with_max_export_batch_size(4096)
+                // analog to default config `max_export_batch_size * 4`
+                .with_max_queue_size(4096 * 4)
+                // only force an export to the otlp collector every 10 seconds to reduce the amount
+                // of error messages if an otlp collector is not available
+                .with_scheduled_delay(std::time::Duration::from_secs(10))
+                .build(),
+        )
+        .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
+            // use SdkProvidedResourceDetector.detect to detect resources,
+            // but replace the default service name with our default.
+            // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
+            let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
+            // SdkProvidedResourceDetector currently always sets
+            // `service.name`, but we don't like its default.
+            if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
+                resources.merge(&Resource::new([KeyValue::new(
+                    "service.name",
+                    service_name,
+                )]))
+            } else {
+                resources
+            }
+        }))
+        .install_batch(opentelemetry_sdk::runtime::Tokio)
+        .expect("Failed to install batch exporter using Tokio");
+
+    // Trace provider is need for later use like flushing the provider.
+    // Needs to be kept around for each message to rx we need to handle.
+    let tracer_provider = tracer
+        .provider()
+        .expect("Failed to get the tracer provider");
+
+    // Set up a channel for flushing trace providers later
+    let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
+
+    // Spawning a task that listens on rx for any message. Once we receive a message we
+    // correctly call flush on the tracer_provider.
+    tokio::spawn(async move {
+        while let Some(m) = rx.recv().await {
+            // Because of a bug within otlp we currently have to use spawn_blocking
+            // otherwise will calling `force_flush` block forever, especially if the
+            // tool was closed with ctrl_c. See
+            // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
+            let _ = tokio::task::spawn_blocking({
+                let tracer_provider = tracer_provider.clone();
+                move || tracer_provider.force_flush()
+            })
+            .await;
+            if let Some(tx) = m {
+                let _ = tx.send(());
+            }
+        }
+    });
+
+    (tracer, tx)
+}