From a01760fceed240de7787d95e384e83294af1350f Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Mon, 18 Nov 2024 01:00:18 +0200 Subject: refactor(tvix/tracing): simplify flushing channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TracingHandle::flush function allowed a user to pass in their own (optional) oneshot::Sender<()> to get notified once the flush is completed, but that's making things unnecessary complicated. By simply having the flush() function await the flush, we make its interface more intuitive, and callsites (only inside tvix-tracing itself so far) simpler. We can also remove the Option around the oneshot::Sender entirely, as we now always call it with that. For some more clarity, we can remove the channel from the struct fields entirely if otlp support isn't compiled in. Change-Id: I0870b9e8e88c6be6494a9c201c1c70b87e0f0810 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12801 Reviewed-by: flokli Reviewed-by: Marijan Petričević Tested-by: BuildkiteCI --- tvix/tracing/src/lib.rs | 61 +++++++++++++++++++++++-------------------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs index 3362818bb180..2c73b53a4a00 100644 --- a/tvix/tracing/src/lib.rs +++ b/tvix/tracing/src/lib.rs @@ -53,7 +53,7 @@ pub enum Error { Init(#[from] tracing_subscriber::util::TryInitError), #[error(transparent)] - MpscSend(#[from] mpsc::error::SendError>>), + MpscSend(#[from] mpsc::error::SendError>), #[error(transparent)] OneshotRecv(#[from] oneshot::error::RecvError), @@ -61,7 +61,10 @@ pub enum Error { #[derive(Clone)] pub struct TracingHandle { - tx: Option>>>, + #[cfg(feature = "otlp")] + /// A channel that can be sent to whenever traces/metrics should be flushed. + /// Once flushing is finished, the sent oneshot::Sender will get triggered. + flush_tx: Option>>, stdout_writer: IndicatifWriter, stderr_writer: IndicatifWriter, @@ -89,18 +92,18 @@ impl TracingHandle { /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled. /// If there is none enabled this will result in a noop. /// - /// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which - /// will receive a message once the flush is completed. - pub async fn flush(&self, msg: Option>) -> Result<(), Error> { - if let Some(tx) = &self.tx { - Ok(tx.send(msg).await?) - } else { - // If we have a message passed in we need to notify the receiver - if let Some(tx) = msg { - let _ = tx.send(()); - } - Ok(()) + /// It will wait until the flush is complete. + pub async fn flush(&self) -> Result<(), Error> { + #[cfg(feature = "otlp")] + if let Some(flush_tx) = &self.flush_tx { + let (tx, rx) = oneshot::channel(); + // Request the flush. + flush_tx.send(tx).await?; + + // Wait for it to be done. + rx.await?; } + Ok(()) } /// This will flush all all attached tracing providers and will wait until the flush is completed. @@ -110,10 +113,7 @@ impl TracingHandle { /// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown) /// otherwise you will get otlp errors. pub async fn shutdown(&self) -> Result<(), Error> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.flush(Some(tx)).await?; - rx.await?; - Ok(()) + self.flush().await } /// This will flush all all attached tracing providers and will wait until the flush is completed. @@ -124,9 +124,7 @@ impl TracingHandle { /// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown) /// method. pub async fn force_shutdown(&self) -> Result<(), Error> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.flush(Some(tx)).await?; - rx.await?; + self.flush().await?; #[cfg(feature = "otlp")] { @@ -221,7 +219,8 @@ impl TracingBuilder { #[cfg(feature = "tracy")] let layered = layered.and_then(TracyLayer::default()); - let mut tx: Option>>> = None; + #[cfg(feature = "otlp")] + let mut flush_tx: Option>> = None; // Setup otlp if a service_name is configured #[cfg(feature = "otlp")] @@ -231,7 +230,7 @@ impl TracingBuilder { opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); let (tracer, sender) = gen_otlp_tracer(service_name.to_string()); - tx = Some(sender); + flush_tx = Some(sender); // Create a tracing layer with the configured tracer Some(tracing_opentelemetry::layer().with_tracer(tracer)) } else { @@ -254,7 +253,8 @@ impl TracingBuilder { .try_init()?; Ok(TracingHandle { - tx, + #[cfg(feature = "otlp")] + flush_tx, stdout_writer, stderr_writer, }) @@ -268,7 +268,7 @@ fn gen_otlp_tracer( service_name: String, ) -> ( impl Tracer + tracing_opentelemetry::PreSampledTracer, - mpsc::Sender>>, + mpsc::Sender>, ) { let tracer_provider = opentelemetry_otlp::new_pipeline() .tracing() @@ -306,17 +306,16 @@ fn gen_otlp_tracer( .install_batch(opentelemetry_sdk::runtime::Tokio) .expect("Failed to install batch exporter using Tokio"); - // Trace provider is need for later use like flushing the provider. - // Needs to be kept around for each message to rx we need to handle. + // tracer_provider needs to be kept around so we can request flushes later. let tracer = tracer_provider.tracer("tvix"); // Set up a channel for flushing trace providers later - let (tx, mut rx) = mpsc::channel::>>(16); + let (flush_tx, mut flush_rx) = mpsc::channel::>(16); // Spawning a task that listens on rx for any message. Once we receive a message we // correctly call flush on the tracer_provider. tokio::spawn(async move { - while let Some(m) = rx.recv().await { + while let Some(m) = flush_rx.recv().await { // Because of a bug within otlp we currently have to use spawn_blocking // otherwise will calling `force_flush` block forever, especially if the // tool was closed with ctrl_c. See @@ -326,11 +325,9 @@ fn gen_otlp_tracer( move || tracer_provider.force_flush() }) .await; - if let Some(tx) = m { - let _ = tx.send(()); - } + let _ = m.send(()); } }); - (tracer, tx) + (tracer, flush_tx) } -- cgit 1.4.1