about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/tracing/src/lib.rs61
1 files changed, 29 insertions, 32 deletions
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs
index 3362818bb180..2c73b53a4a00 100644
--- a/tvix/tracing/src/lib.rs
+++ b/tvix/tracing/src/lib.rs
@@ -53,7 +53,7 @@ pub enum Error {
     Init(#[from] tracing_subscriber::util::TryInitError),
 
     #[error(transparent)]
-    MpscSend(#[from] mpsc::error::SendError<Option<oneshot::Sender<()>>>),
+    MpscSend(#[from] mpsc::error::SendError<oneshot::Sender<()>>),
 
     #[error(transparent)]
     OneshotRecv(#[from] oneshot::error::RecvError),
@@ -61,7 +61,10 @@ pub enum Error {
 
 #[derive(Clone)]
 pub struct TracingHandle {
-    tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>>,
+    #[cfg(feature = "otlp")]
+    /// A channel that can be sent to whenever traces/metrics should be flushed.
+    /// Once flushing is finished, the sent oneshot::Sender will get triggered.
+    flush_tx: Option<mpsc::Sender<oneshot::Sender<()>>>,
 
     stdout_writer: IndicatifWriter<writer::Stdout>,
     stderr_writer: IndicatifWriter<writer::Stderr>,
@@ -89,18 +92,18 @@ impl TracingHandle {
     /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
     /// If there is none enabled this will result in a noop.
     ///
-    /// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which
-    /// will receive a message once the flush is completed.
-    pub async fn flush(&self, msg: Option<oneshot::Sender<()>>) -> Result<(), Error> {
-        if let Some(tx) = &self.tx {
-            Ok(tx.send(msg).await?)
-        } else {
-            // If we have a message passed in we need to notify the receiver
-            if let Some(tx) = msg {
-                let _ = tx.send(());
-            }
-            Ok(())
+    /// It will wait until the flush is complete.
+    pub async fn flush(&self) -> Result<(), Error> {
+        #[cfg(feature = "otlp")]
+        if let Some(flush_tx) = &self.flush_tx {
+            let (tx, rx) = oneshot::channel();
+            // Request the flush.
+            flush_tx.send(tx).await?;
+
+            // Wait for it to be done.
+            rx.await?;
         }
+        Ok(())
     }
 
     /// This will flush all all attached tracing providers and will wait until the flush is completed.
@@ -110,10 +113,7 @@ impl TracingHandle {
     /// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown)
     /// otherwise you will get otlp errors.
     pub async fn shutdown(&self) -> Result<(), Error> {
-        let (tx, rx) = tokio::sync::oneshot::channel();
-        self.flush(Some(tx)).await?;
-        rx.await?;
-        Ok(())
+        self.flush().await
     }
 
     /// This will flush all all attached tracing providers and will wait until the flush is completed.
@@ -124,9 +124,7 @@ impl TracingHandle {
     /// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown)
     /// method.
     pub async fn force_shutdown(&self) -> Result<(), Error> {
-        let (tx, rx) = tokio::sync::oneshot::channel();
-        self.flush(Some(tx)).await?;
-        rx.await?;
+        self.flush().await?;
 
         #[cfg(feature = "otlp")]
         {
@@ -221,7 +219,8 @@ impl TracingBuilder {
         #[cfg(feature = "tracy")]
         let layered = layered.and_then(TracyLayer::default());
 
-        let mut tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>> = None;
+        #[cfg(feature = "otlp")]
+        let mut flush_tx: Option<mpsc::Sender<oneshot::Sender<()>>> = None;
 
         // Setup otlp if a service_name is configured
         #[cfg(feature = "otlp")]
@@ -231,7 +230,7 @@ impl TracingBuilder {
                 opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
 
                 let (tracer, sender) = gen_otlp_tracer(service_name.to_string());
-                tx = Some(sender);
+                flush_tx = Some(sender);
                 // Create a tracing layer with the configured tracer
                 Some(tracing_opentelemetry::layer().with_tracer(tracer))
             } else {
@@ -254,7 +253,8 @@ impl TracingBuilder {
             .try_init()?;
 
         Ok(TracingHandle {
-            tx,
+            #[cfg(feature = "otlp")]
+            flush_tx,
             stdout_writer,
             stderr_writer,
         })
@@ -268,7 +268,7 @@ fn gen_otlp_tracer(
     service_name: String,
 ) -> (
     impl Tracer + tracing_opentelemetry::PreSampledTracer,
-    mpsc::Sender<Option<oneshot::Sender<()>>>,
+    mpsc::Sender<oneshot::Sender<()>>,
 ) {
     let tracer_provider = opentelemetry_otlp::new_pipeline()
         .tracing()
@@ -306,17 +306,16 @@ fn gen_otlp_tracer(
         .install_batch(opentelemetry_sdk::runtime::Tokio)
         .expect("Failed to install batch exporter using Tokio");
 
-    // Trace provider is need for later use like flushing the provider.
-    // Needs to be kept around for each message to rx we need to handle.
+    // tracer_provider needs to be kept around so we can request flushes later.
     let tracer = tracer_provider.tracer("tvix");
 
     // Set up a channel for flushing trace providers later
-    let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
+    let (flush_tx, mut flush_rx) = mpsc::channel::<oneshot::Sender<()>>(16);
 
     // Spawning a task that listens on rx for any message. Once we receive a message we
     // correctly call flush on the tracer_provider.
     tokio::spawn(async move {
-        while let Some(m) = rx.recv().await {
+        while let Some(m) = flush_rx.recv().await {
             // Because of a bug within otlp we currently have to use spawn_blocking
             // otherwise will calling `force_flush` block forever, especially if the
             // tool was closed with ctrl_c. See
@@ -326,11 +325,9 @@ fn gen_otlp_tracer(
                 move || tracer_provider.force_flush()
             })
             .await;
-            if let Some(tx) = m {
-                let _ = tx.send(());
-            }
+            let _ = m.send(());
         }
     });
 
-    (tracer, tx)
+    (tracer, flush_tx)
 }