diff options
-rw-r--r-- | tvix/tracing/src/lib.rs | 61 |
1 files changed, 29 insertions, 32 deletions
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs index 3362818bb180..2c73b53a4a00 100644 --- a/tvix/tracing/src/lib.rs +++ b/tvix/tracing/src/lib.rs @@ -53,7 +53,7 @@ pub enum Error { Init(#[from] tracing_subscriber::util::TryInitError), #[error(transparent)] - MpscSend(#[from] mpsc::error::SendError<Option<oneshot::Sender<()>>>), + MpscSend(#[from] mpsc::error::SendError<oneshot::Sender<()>>), #[error(transparent)] OneshotRecv(#[from] oneshot::error::RecvError), @@ -61,7 +61,10 @@ pub enum Error { #[derive(Clone)] pub struct TracingHandle { - tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>>, + #[cfg(feature = "otlp")] + /// A channel that can be sent to whenever traces/metrics should be flushed. + /// Once flushing is finished, the sent oneshot::Sender will get triggered. + flush_tx: Option<mpsc::Sender<oneshot::Sender<()>>>, stdout_writer: IndicatifWriter<writer::Stdout>, stderr_writer: IndicatifWriter<writer::Stderr>, @@ -89,18 +92,18 @@ impl TracingHandle { /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled. /// If there is none enabled this will result in a noop. /// - /// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which - /// will receive a message once the flush is completed. - pub async fn flush(&self, msg: Option<oneshot::Sender<()>>) -> Result<(), Error> { - if let Some(tx) = &self.tx { - Ok(tx.send(msg).await?) - } else { - // If we have a message passed in we need to notify the receiver - if let Some(tx) = msg { - let _ = tx.send(()); - } - Ok(()) + /// It will wait until the flush is complete. + pub async fn flush(&self) -> Result<(), Error> { + #[cfg(feature = "otlp")] + if let Some(flush_tx) = &self.flush_tx { + let (tx, rx) = oneshot::channel(); + // Request the flush. + flush_tx.send(tx).await?; + + // Wait for it to be done. + rx.await?; } + Ok(()) } /// This will flush all all attached tracing providers and will wait until the flush is completed. @@ -110,10 +113,7 @@ impl TracingHandle { /// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown) /// otherwise you will get otlp errors. pub async fn shutdown(&self) -> Result<(), Error> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.flush(Some(tx)).await?; - rx.await?; - Ok(()) + self.flush().await } /// This will flush all all attached tracing providers and will wait until the flush is completed. @@ -124,9 +124,7 @@ impl TracingHandle { /// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown) /// method. pub async fn force_shutdown(&self) -> Result<(), Error> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.flush(Some(tx)).await?; - rx.await?; + self.flush().await?; #[cfg(feature = "otlp")] { @@ -221,7 +219,8 @@ impl TracingBuilder { #[cfg(feature = "tracy")] let layered = layered.and_then(TracyLayer::default()); - let mut tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>> = None; + #[cfg(feature = "otlp")] + let mut flush_tx: Option<mpsc::Sender<oneshot::Sender<()>>> = None; // Setup otlp if a service_name is configured #[cfg(feature = "otlp")] @@ -231,7 +230,7 @@ impl TracingBuilder { opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); let (tracer, sender) = gen_otlp_tracer(service_name.to_string()); - tx = Some(sender); + flush_tx = Some(sender); // Create a tracing layer with the configured tracer Some(tracing_opentelemetry::layer().with_tracer(tracer)) } else { @@ -254,7 +253,8 @@ impl TracingBuilder { .try_init()?; Ok(TracingHandle { - tx, + #[cfg(feature = "otlp")] + flush_tx, stdout_writer, stderr_writer, }) @@ -268,7 +268,7 @@ fn gen_otlp_tracer( service_name: String, ) -> ( impl Tracer + tracing_opentelemetry::PreSampledTracer, - mpsc::Sender<Option<oneshot::Sender<()>>>, + mpsc::Sender<oneshot::Sender<()>>, ) { let tracer_provider = opentelemetry_otlp::new_pipeline() .tracing() @@ -306,17 +306,16 @@ fn gen_otlp_tracer( .install_batch(opentelemetry_sdk::runtime::Tokio) .expect("Failed to install batch exporter using Tokio"); - // Trace provider is need for later use like flushing the provider. - // Needs to be kept around for each message to rx we need to handle. + // tracer_provider needs to be kept around so we can request flushes later. let tracer = tracer_provider.tracer("tvix"); // Set up a channel for flushing trace providers later - let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16); + let (flush_tx, mut flush_rx) = mpsc::channel::<oneshot::Sender<()>>(16); // Spawning a task that listens on rx for any message. Once we receive a message we // correctly call flush on the tracer_provider. tokio::spawn(async move { - while let Some(m) = rx.recv().await { + while let Some(m) = flush_rx.recv().await { // Because of a bug within otlp we currently have to use spawn_blocking // otherwise will calling `force_flush` block forever, especially if the // tool was closed with ctrl_c. See @@ -326,11 +325,9 @@ fn gen_otlp_tracer( move || tracer_provider.force_flush() }) .await; - if let Some(tx) = m { - let _ = tx.send(()); - } + let _ = m.send(()); } }); - (tracer, tx) + (tracer, flush_tx) } |