about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-11-17T23·00+0200
committerflokli <flokli@flokli.de>2024-11-18T22·10+0000
commita01760fceed240de7787d95e384e83294af1350f (patch)
treeb1650fa8209560cedcbb849a87ec9f1353e3bf0a
parent9656686fb3767d2e111132be856bc97e93c01b2c (diff)
refactor(tvix/tracing): simplify flushing channel r/8942
The TracingHandle::flush function allowed a user to pass in their
own (optional) oneshot::Sender<()> to get notified once the flush is
completed, but that's making things unnecessary complicated.

By simply having the flush() function await the flush, we make its
interface more intuitive, and callsites (only inside tvix-tracing itself
so far) simpler.

We can also remove the Option around the oneshot::Sender entirely, as we
now always call it with that.

For some more clarity, we can remove the channel from the struct fields
entirely if otlp support isn't compiled in.

Change-Id: I0870b9e8e88c6be6494a9c201c1c70b87e0f0810
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12801
Reviewed-by: flokli <flokli@flokli.de>
Reviewed-by: Marijan Petričević <marijan.petricevic94@gmail.com>
Tested-by: BuildkiteCI
-rw-r--r--tvix/tracing/src/lib.rs61
1 files changed, 29 insertions, 32 deletions
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs
index 3362818bb180..2c73b53a4a00 100644
--- a/tvix/tracing/src/lib.rs
+++ b/tvix/tracing/src/lib.rs
@@ -53,7 +53,7 @@ pub enum Error {
     Init(#[from] tracing_subscriber::util::TryInitError),
 
     #[error(transparent)]
-    MpscSend(#[from] mpsc::error::SendError<Option<oneshot::Sender<()>>>),
+    MpscSend(#[from] mpsc::error::SendError<oneshot::Sender<()>>),
 
     #[error(transparent)]
     OneshotRecv(#[from] oneshot::error::RecvError),
@@ -61,7 +61,10 @@ pub enum Error {
 
 #[derive(Clone)]
 pub struct TracingHandle {
-    tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>>,
+    #[cfg(feature = "otlp")]
+    /// A channel that can be sent to whenever traces/metrics should be flushed.
+    /// Once flushing is finished, the sent oneshot::Sender will get triggered.
+    flush_tx: Option<mpsc::Sender<oneshot::Sender<()>>>,
 
     stdout_writer: IndicatifWriter<writer::Stdout>,
     stderr_writer: IndicatifWriter<writer::Stderr>,
@@ -89,18 +92,18 @@ impl TracingHandle {
     /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
     /// If there is none enabled this will result in a noop.
     ///
-    /// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which
-    /// will receive a message once the flush is completed.
-    pub async fn flush(&self, msg: Option<oneshot::Sender<()>>) -> Result<(), Error> {
-        if let Some(tx) = &self.tx {
-            Ok(tx.send(msg).await?)
-        } else {
-            // If we have a message passed in we need to notify the receiver
-            if let Some(tx) = msg {
-                let _ = tx.send(());
-            }
-            Ok(())
+    /// It will wait until the flush is complete.
+    pub async fn flush(&self) -> Result<(), Error> {
+        #[cfg(feature = "otlp")]
+        if let Some(flush_tx) = &self.flush_tx {
+            let (tx, rx) = oneshot::channel();
+            // Request the flush.
+            flush_tx.send(tx).await?;
+
+            // Wait for it to be done.
+            rx.await?;
         }
+        Ok(())
     }
 
     /// This will flush all all attached tracing providers and will wait until the flush is completed.
@@ -110,10 +113,7 @@ impl TracingHandle {
     /// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown)
     /// otherwise you will get otlp errors.
     pub async fn shutdown(&self) -> Result<(), Error> {
-        let (tx, rx) = tokio::sync::oneshot::channel();
-        self.flush(Some(tx)).await?;
-        rx.await?;
-        Ok(())
+        self.flush().await
     }
 
     /// This will flush all all attached tracing providers and will wait until the flush is completed.
@@ -124,9 +124,7 @@ impl TracingHandle {
     /// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown)
     /// method.
     pub async fn force_shutdown(&self) -> Result<(), Error> {
-        let (tx, rx) = tokio::sync::oneshot::channel();
-        self.flush(Some(tx)).await?;
-        rx.await?;
+        self.flush().await?;
 
         #[cfg(feature = "otlp")]
         {
@@ -221,7 +219,8 @@ impl TracingBuilder {
         #[cfg(feature = "tracy")]
         let layered = layered.and_then(TracyLayer::default());
 
-        let mut tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>> = None;
+        #[cfg(feature = "otlp")]
+        let mut flush_tx: Option<mpsc::Sender<oneshot::Sender<()>>> = None;
 
         // Setup otlp if a service_name is configured
         #[cfg(feature = "otlp")]
@@ -231,7 +230,7 @@ impl TracingBuilder {
                 opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
 
                 let (tracer, sender) = gen_otlp_tracer(service_name.to_string());
-                tx = Some(sender);
+                flush_tx = Some(sender);
                 // Create a tracing layer with the configured tracer
                 Some(tracing_opentelemetry::layer().with_tracer(tracer))
             } else {
@@ -254,7 +253,8 @@ impl TracingBuilder {
             .try_init()?;
 
         Ok(TracingHandle {
-            tx,
+            #[cfg(feature = "otlp")]
+            flush_tx,
             stdout_writer,
             stderr_writer,
         })
@@ -268,7 +268,7 @@ fn gen_otlp_tracer(
     service_name: String,
 ) -> (
     impl Tracer + tracing_opentelemetry::PreSampledTracer,
-    mpsc::Sender<Option<oneshot::Sender<()>>>,
+    mpsc::Sender<oneshot::Sender<()>>,
 ) {
     let tracer_provider = opentelemetry_otlp::new_pipeline()
         .tracing()
@@ -306,17 +306,16 @@ fn gen_otlp_tracer(
         .install_batch(opentelemetry_sdk::runtime::Tokio)
         .expect("Failed to install batch exporter using Tokio");
 
-    // Trace provider is need for later use like flushing the provider.
-    // Needs to be kept around for each message to rx we need to handle.
+    // tracer_provider needs to be kept around so we can request flushes later.
     let tracer = tracer_provider.tracer("tvix");
 
     // Set up a channel for flushing trace providers later
-    let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
+    let (flush_tx, mut flush_rx) = mpsc::channel::<oneshot::Sender<()>>(16);
 
     // Spawning a task that listens on rx for any message. Once we receive a message we
     // correctly call flush on the tracer_provider.
     tokio::spawn(async move {
-        while let Some(m) = rx.recv().await {
+        while let Some(m) = flush_rx.recv().await {
             // Because of a bug within otlp we currently have to use spawn_blocking
             // otherwise will calling `force_flush` block forever, especially if the
             // tool was closed with ctrl_c. See
@@ -326,11 +325,9 @@ fn gen_otlp_tracer(
                 move || tracer_provider.force_flush()
             })
             .await;
-            if let Some(tx) = m {
-                let _ = tx.send(());
-            }
+            let _ = m.send(());
         }
     });
 
-    (tracer, tx)
+    (tracer, flush_tx)
 }