about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorSimon Hauser <simon.hauser@helsinki-systems.de>2024-06-10T15·49+0200
committerclbot <clbot@tvl.fyi>2024-06-14T09·34+0000
commitfa7ed39bf480fd50c488ce54daad7322ced73aab (patch)
tree560e0bc8045168c11dade7cf93204262215349be /tvix
parent5077ca70deb8ca8e84abb9608e08bf4485d3ec4b (diff)
feat(tvix/tracing): correctly close otlp on exit r/8271
Provide a new interface for forcing a flush of otlp traces and use this
interface to shutdown otlp prior to exiting tvix-store, either if the
tool was stopped with a SIGTERM or ended regularly.
This also fixes an issue where traces were not even exported if for
example we just imported 10 paths and never even emitted more than 256
traces. The implementation uses a mpsc channel so a flush can be done
without having to wait for it to complete. If you want to wait for a
flush to complete you can provide a oneshot channel which will receive a
message once flushing is complete.

Because of a otlp bug `force_flush` as well as
`shutdown_tracer_provider` need to be executed using `spawn_blocking`
otherwise the function will deadlock. See
https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335

Change-Id: I0a828391adfb1f72dc8305f62ced8cba0515847c
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11803
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Autosubmit: Simon Hauser <simon.hauser@helsinki-systems.de>
Diffstat (limited to 'tvix')
-rw-r--r--tvix/Cargo.lock2
-rw-r--r--tvix/Cargo.nix9
-rw-r--r--tvix/build/src/bin/tvix-build.rs2
-rw-r--r--tvix/cli/src/main.rs5
-rw-r--r--tvix/store/src/bin/tvix-store.rs55
-rw-r--r--tvix/tracing/Cargo.toml2
-rw-r--r--tvix/tracing/src/lib.rs232
7 files changed, 242 insertions, 65 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 15e098d0f157..e8c434e2de3a 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -4372,6 +4372,8 @@ dependencies = [
  "opentelemetry",
  "opentelemetry-otlp",
  "opentelemetry_sdk",
+ "thiserror",
+ "tokio",
  "tracing",
  "tracing-indicatif",
  "tracing-opentelemetry",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 360747b8d283..4a0f755fe071 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -14008,6 +14008,15 @@ rec {
             features = [ "rt-tokio" ];
           }
           {
+            name = "thiserror";
+            packageId = "thiserror";
+          }
+          {
+            name = "tokio";
+            packageId = "tokio";
+            features = [ "sync" "rt" ];
+          }
+          {
             name = "tracing";
             packageId = "tracing";
             features = [ "max_level_trace" "release_max_level_debug" ];
diff --git a/tvix/build/src/bin/tvix-build.rs b/tvix/build/src/bin/tvix-build.rs
index 07085a433cfa..c96145ebb479 100644
--- a/tvix/build/src/bin/tvix-build.rs
+++ b/tvix/build/src/bin/tvix-build.rs
@@ -54,7 +54,7 @@ enum Commands {
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let cli = Cli::parse();
 
-    tvix_tracing::init(cli.log_level)?;
+    let _ = tvix_tracing::TracingBuilder::default().level(cli.log_level);
 
     match cli.command {
         Commands::Daemon {
diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs
index 5e0b8c41819d..3ec45bc37250 100644
--- a/tvix/cli/src/main.rs
+++ b/tvix/cli/src/main.rs
@@ -279,7 +279,10 @@ fn lint(code: &str, path: Option<PathBuf>, args: &Args) -> bool {
 fn main() {
     let args = Args::parse();
 
-    tvix_tracing::init(args.log_level).expect("unable to set up tracing subscriber");
+    let _ = tvix_tracing::TracingBuilder::default()
+        .level(args.log_level)
+        .build()
+        .expect("unable to set up tracing subscriber");
     let tokio_runtime = tokio::runtime::Runtime::new().expect("failed to setup tokio runtime");
 
     let io_handle = init_io_handle(&tokio_runtime, &args);
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 03c699b893cd..dadfa114f72b 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -199,25 +199,8 @@ fn default_threads() -> usize {
     1
 }
 
-#[tokio::main]
-#[instrument(fields(indicatif.pb_show=1))]
-async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    let cli = Cli::parse();
-
-    #[cfg(feature = "otlp")]
-    {
-        if cli.otlp {
-            tvix_tracing::init_with_otlp(cli.log_level, "tvix.store")?;
-        } else {
-            tvix_tracing::init(cli.log_level)?;
-        }
-    }
-
-    #[cfg(not(feature = "otlp"))]
-    {
-        tvix_tracing::init(cli.log_level)?;
-    }
-
+#[instrument(fields(indicatif.pb_show=1), skip(cli))]
+async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
     match cli.command {
         Commands::Daemon {
             listen_address,
@@ -528,3 +511,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     };
     Ok(())
 }
+
+#[tokio::main]
+#[instrument(fields(indicatif.pb_show=1))]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let cli = Cli::parse();
+
+    let tracing_handle = {
+        let mut builder = tvix_tracing::TracingBuilder::default();
+        builder = builder.level(cli.log_level);
+        #[cfg(feature = "otlp")]
+        {
+            if cli.otlp {
+                builder = builder.enable_otlp("tvix.store");
+            }
+        }
+        builder.build()?
+    };
+
+    tokio::select! {
+        res = tokio::signal::ctrl_c() => {
+            res?;
+            if let Err(e) = tracing_handle.force_shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            Ok(())
+        },
+        res = run_cli(cli) => {
+            if let Err(e) = tracing_handle.shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            res
+        }
+    }
+}
diff --git a/tvix/tracing/Cargo.toml b/tvix/tracing/Cargo.toml
index f65c9eba6cab..41654d00c02f 100644
--- a/tvix/tracing/Cargo.toml
+++ b/tvix/tracing/Cargo.toml
@@ -9,6 +9,8 @@ tracing = { version = "0.1.40", features = ["max_level_trace", "release_max_leve
 tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
 indicatif = "0.17.8"
 tracing-indicatif = "0.3.6"
+tokio = { version = "1.32.0" , features = ["sync", "rt"] }
+thiserror = "1.0.38"
 
 tracing-opentelemetry = { version = "0.23.0", optional = true }
 opentelemetry = { version = "0.22.0", optional = true }
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs
index 8ad90835338e..312c9a5a9a62 100644
--- a/tvix/tracing/src/lib.rs
+++ b/tvix/tracing/src/lib.rs
@@ -1,5 +1,6 @@
 use indicatif::ProgressStyle;
 use lazy_static::lazy_static;
+use tokio::sync::{mpsc, oneshot};
 use tracing::Level;
 use tracing_indicatif::{filter::IndicatifFilter, IndicatifLayer};
 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
@@ -24,20 +25,136 @@ lazy_static! {
     .expect("invalid progress template");
 }
 
-// using a macro_rule here because of the complex return type
-macro_rules! init_base_subscriber {
-    ($level: expr) => {{
-        let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+    #[error(transparent)]
+    Init(#[from] tracing_subscriber::util::TryInitError),
+
+    #[error(transparent)]
+    MpscSend(#[from] mpsc::error::SendError<Option<oneshot::Sender<()>>>),
+
+    #[error(transparent)]
+    OneshotRecv(#[from] oneshot::error::RecvError),
+}
+
+#[derive(Clone)]
+pub struct TracingHandle {
+    tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>>,
+}
+
+impl TracingHandle {
+    /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
+    /// If there is none enabled this will result in a noop.
+    ///
+    /// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which
+    /// will receive a message once the flush is completed.
+    pub async fn flush(&self, msg: Option<oneshot::Sender<()>>) -> Result<(), Error> {
+        if let Some(tx) = &self.tx {
+            Ok(tx.send(msg).await?)
+        } else {
+            // If we have a message passed in we need to notify the receiver
+            if let Some(tx) = msg {
+                let _ = tx.send(());
+            }
+            Ok(())
+        }
+    }
 
+    /// This will flush all all attached tracing providers and will wait until the flush is completed.
+    /// If no tracing providers like otlp are attached then this will be a noop.
+    ///
+    /// This should only be called on a regular shutdown.
+    /// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown)
+    /// otherwise you will get otlp errors.
+    pub async fn shutdown(&self) -> Result<(), Error> {
+        let (tx, rx) = tokio::sync::oneshot::channel();
+        self.flush(Some(tx)).await?;
+        rx.await?;
+        Ok(())
+    }
+
+    /// This will flush all all attached tracing providers and will wait until the flush is completed.
+    /// After this it will do some other necessary cleanup.
+    /// If no tracing providers like otlp are attached then this will be a noop.
+    ///
+    /// This should only be used if the tool received an ctrl_c otherwise you will get otlp errors.
+    /// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown)
+    /// method.
+    pub async fn force_shutdown(&self) -> Result<(), Error> {
+        let (tx, rx) = tokio::sync::oneshot::channel();
+        self.flush(Some(tx)).await?;
+        rx.await?;
+
+        #[cfg(feature = "otlp")]
+        {
+            // Because of a bug within otlp we currently have to use spawn_blocking otherwise
+            // calling `shutdown_tracer_provider` can block forever. See
+            // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
+            //
+            // This still throws an error, if the tool exits regularly: "OpenTelemetry trace error
+            // occurred. oneshot canceled", but not having this leads to errors if we cancel with
+            // ctrl_c.
+            // So this should right now only be used on ctrl_c, for a regular exit use the
+            // [shutdown](#shutdown) method
+            let _ = tokio::task::spawn_blocking(move || {
+                opentelemetry::global::shutdown_tracer_provider();
+            })
+            .await;
+        }
+
+        Ok(())
+    }
+}
+
+pub struct TracingBuilder {
+    level: Level,
+
+    #[cfg(feature = "otlp")]
+    service_name: Option<&'static str>,
+}
+
+impl Default for TracingBuilder {
+    fn default() -> Self {
+        TracingBuilder {
+            level: Level::INFO,
+
+            #[cfg(feature = "otlp")]
+            service_name: None,
+        }
+    }
+}
+
+impl TracingBuilder {
+    /// Set the log level of the stderr writer, RUST_LOG still has a higher priority over this
+    pub fn level(mut self, level: Level) -> TracingBuilder {
+        self.level = level;
+        self
+    }
+
+    #[cfg(feature = "otlp")]
+    /// Enable otlp by setting a custom service_name
+    pub fn enable_otlp(mut self, service_name: &'static str) -> TracingBuilder {
+        self.service_name = Some(service_name);
+        self
+    }
+
+    /// This will setup tracing based on the configuration passed in.
+    /// It will setup a stderr writer with the provided log level as filter (RUST_LOG still has a
+    /// higher priority over the configured value)
+    ///
+    /// It will also configure otlp if the feature is enabled and a service_name was provided. It
+    /// will then correctly setup a channel which is later used for flushing the provider.
+    pub fn build(self) -> Result<TracingHandle, Error> {
         // Set up the tracing subscriber.
-        tracing_subscriber::registry()
+        let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
+        let subscriber = tracing_subscriber::registry()
             .with(
                 tracing_subscriber::fmt::Layer::new()
                     .with_writer(indicatif_layer.get_stderr_writer())
                     .compact()
                     .with_filter(
                         EnvFilter::builder()
-                            .with_default_directive($level.into())
+                            .with_default_directive(self.level.into())
                             .from_env()
                             .expect("invalid RUST_LOG"),
                     ),
@@ -45,46 +162,73 @@ macro_rules! init_base_subscriber {
             .with(indicatif_layer.with_filter(
                 // only show progress for spans with indicatif.pb_show field being set
                 IndicatifFilter::new(false),
-            ))
-    }};
-}
+            ));
 
-pub fn init(level: Level) -> Result<(), tracing_subscriber::util::TryInitError> {
-    init_base_subscriber!(level).try_init()
-}
+        // Setup otlp if a service_name is configured
+        #[cfg(feature = "otlp")]
+        {
+            if let Some(service_name) = self.service_name {
+                let tracer = opentelemetry_otlp::new_pipeline()
+                    .tracing()
+                    .with_exporter(opentelemetry_otlp::new_exporter().tonic())
+                    .with_batch_config(BatchConfig::default())
+                    .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
+                        // use SdkProvidedResourceDetector.detect to detect resources,
+                        // but replace the default service name with our default.
+                        // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
+                        let resources =
+                            SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
+                        // SdkProvidedResourceDetector currently always sets
+                        // `service.name`, but we don't like its default.
+                        if resources.get("service.name".into()).unwrap() == "unknown_service".into()
+                        {
+                            resources.merge(&Resource::new([KeyValue::new(
+                                "service.name",
+                                service_name,
+                            )]))
+                        } else {
+                            resources
+                        }
+                    }))
+                    .install_batch(opentelemetry_sdk::runtime::Tokio)
+                    .expect("Failed to install batch exporter using Tokio");
 
-#[cfg(feature = "otlp")]
-pub fn init_with_otlp(
-    level: Level,
-    service_name: &'static str,
-) -> Result<(), tracing_subscriber::util::TryInitError> {
-    let subscriber = init_base_subscriber!(level);
-
-    let tracer = opentelemetry_otlp::new_pipeline()
-        .tracing()
-        .with_exporter(opentelemetry_otlp::new_exporter().tonic())
-        .with_batch_config(BatchConfig::default())
-        .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
-            // use SdkProvidedResourceDetector.detect to detect resources,
-            // but replace the default service name with our default.
-            // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
-            let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
-            // SdkProvidedResourceDetector currently always sets
-            // `service.name`, but we don't like its default.
-            if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
-                resources.merge(&Resource::new([KeyValue::new(
-                    "service.name",
-                    service_name,
-                )]))
-            } else {
-                resources
-            }
-        }))
-        .install_batch(opentelemetry_sdk::runtime::Tokio)
-        .expect("Failed to install tokio runtime");
+                // Trace provider is need for later use like flushing the provider.
+                // Needs to be kept around for each message to rx we need to handle.
+                let tracer_provider = tracer
+                    .provider()
+                    .expect("Failed to get the tracer provider");
 
-    // Create a tracing layer with the configured tracer
-    let layer = tracing_opentelemetry::layer().with_tracer(tracer);
+                // Set up a channel for flushing trace providers later
+                let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
+
+                // Spawning a task that listens on rx for any message. Once we receive a message we
+                // correctly call flush on the tracer_provider.
+                tokio::spawn(async move {
+                    while let Some(m) = rx.recv().await {
+                        // Because of a bug within otlp we currently have to use spawn_blocking
+                        // otherwise will calling `force_flush` block forever, especially if the
+                        // tool was closed with ctrl_c. See
+                        // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
+                        let _ = tokio::task::spawn_blocking({
+                            let tracer_provider = tracer_provider.clone();
+                            move || tracer_provider.force_flush()
+                        })
+                        .await;
+                        if let Some(tx) = m {
+                            let _ = tx.send(());
+                        }
+                    }
+                });
+
+                // Create a tracing layer with the configured tracer
+                let layer = tracing_opentelemetry::layer().with_tracer(tracer);
+                subscriber.with(Some(layer)).try_init()?;
+                return Ok(TracingHandle { tx: Some(tx) });
+            }
+        }
 
-    subscriber.with(Some(layer)).try_init()
+        subscriber.try_init()?;
+        Ok(TracingHandle { tx: None })
+    }
 }