about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/Cargo.lock2
-rw-r--r--tvix/Cargo.nix9
-rw-r--r--tvix/build/src/bin/tvix-build.rs2
-rw-r--r--tvix/cli/src/main.rs5
-rw-r--r--tvix/store/src/bin/tvix-store.rs55
-rw-r--r--tvix/tracing/Cargo.toml2
-rw-r--r--tvix/tracing/src/lib.rs232
7 files changed, 242 insertions, 65 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 15e098d0f157..e8c434e2de3a 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -4372,6 +4372,8 @@ dependencies = [
  "opentelemetry",
  "opentelemetry-otlp",
  "opentelemetry_sdk",
+ "thiserror",
+ "tokio",
  "tracing",
  "tracing-indicatif",
  "tracing-opentelemetry",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 360747b8d283..4a0f755fe071 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -14008,6 +14008,15 @@ rec {
             features = [ "rt-tokio" ];
           }
           {
+            name = "thiserror";
+            packageId = "thiserror";
+          }
+          {
+            name = "tokio";
+            packageId = "tokio";
+            features = [ "sync" "rt" ];
+          }
+          {
             name = "tracing";
             packageId = "tracing";
             features = [ "max_level_trace" "release_max_level_debug" ];
diff --git a/tvix/build/src/bin/tvix-build.rs b/tvix/build/src/bin/tvix-build.rs
index 07085a433cfa..c96145ebb479 100644
--- a/tvix/build/src/bin/tvix-build.rs
+++ b/tvix/build/src/bin/tvix-build.rs
@@ -54,7 +54,7 @@ enum Commands {
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let cli = Cli::parse();
 
-    tvix_tracing::init(cli.log_level)?;
+    let _ = tvix_tracing::TracingBuilder::default().level(cli.log_level);
 
     match cli.command {
         Commands::Daemon {
diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs
index 5e0b8c41819d..3ec45bc37250 100644
--- a/tvix/cli/src/main.rs
+++ b/tvix/cli/src/main.rs
@@ -279,7 +279,10 @@ fn lint(code: &str, path: Option<PathBuf>, args: &Args) -> bool {
 fn main() {
     let args = Args::parse();
 
-    tvix_tracing::init(args.log_level).expect("unable to set up tracing subscriber");
+    let _ = tvix_tracing::TracingBuilder::default()
+        .level(args.log_level)
+        .build()
+        .expect("unable to set up tracing subscriber");
     let tokio_runtime = tokio::runtime::Runtime::new().expect("failed to setup tokio runtime");
 
     let io_handle = init_io_handle(&tokio_runtime, &args);
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 03c699b893cd..dadfa114f72b 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -199,25 +199,8 @@ fn default_threads() -> usize {
     1
 }
 
-#[tokio::main]
-#[instrument(fields(indicatif.pb_show=1))]
-async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    let cli = Cli::parse();
-
-    #[cfg(feature = "otlp")]
-    {
-        if cli.otlp {
-            tvix_tracing::init_with_otlp(cli.log_level, "tvix.store")?;
-        } else {
-            tvix_tracing::init(cli.log_level)?;
-        }
-    }
-
-    #[cfg(not(feature = "otlp"))]
-    {
-        tvix_tracing::init(cli.log_level)?;
-    }
-
+#[instrument(fields(indicatif.pb_show=1), skip(cli))]
+async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
     match cli.command {
         Commands::Daemon {
             listen_address,
@@ -528,3 +511,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     };
     Ok(())
 }
+
+#[tokio::main]
+#[instrument(fields(indicatif.pb_show=1))]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let cli = Cli::parse();
+
+    let tracing_handle = {
+        let mut builder = tvix_tracing::TracingBuilder::default();
+        builder = builder.level(cli.log_level);
+        #[cfg(feature = "otlp")]
+        {
+            if cli.otlp {
+                builder = builder.enable_otlp("tvix.store");
+            }
+        }
+        builder.build()?
+    };
+
+    tokio::select! {
+        res = tokio::signal::ctrl_c() => {
+            res?;
+            if let Err(e) = tracing_handle.force_shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            Ok(())
+        },
+        res = run_cli(cli) => {
+            if let Err(e) = tracing_handle.shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            res
+        }
+    }
+}
diff --git a/tvix/tracing/Cargo.toml b/tvix/tracing/Cargo.toml
index f65c9eba6cab..41654d00c02f 100644
--- a/tvix/tracing/Cargo.toml
+++ b/tvix/tracing/Cargo.toml
@@ -9,6 +9,8 @@ tracing = { version = "0.1.40", features = ["max_level_trace", "release_max_leve
 tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
 indicatif = "0.17.8"
 tracing-indicatif = "0.3.6"
+tokio = { version = "1.32.0" , features = ["sync", "rt"] }
+thiserror = "1.0.38"
 
 tracing-opentelemetry = { version = "0.23.0", optional = true }
 opentelemetry = { version = "0.22.0", optional = true }
diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs
index 8ad90835338e..312c9a5a9a62 100644
--- a/tvix/tracing/src/lib.rs
+++ b/tvix/tracing/src/lib.rs
@@ -1,5 +1,6 @@
 use indicatif::ProgressStyle;
 use lazy_static::lazy_static;
+use tokio::sync::{mpsc, oneshot};
 use tracing::Level;
 use tracing_indicatif::{filter::IndicatifFilter, IndicatifLayer};
 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
@@ -24,20 +25,136 @@ lazy_static! {
     .expect("invalid progress template");
 }
 
-// using a macro_rule here because of the complex return type
-macro_rules! init_base_subscriber {
-    ($level: expr) => {{
-        let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+    #[error(transparent)]
+    Init(#[from] tracing_subscriber::util::TryInitError),
+
+    #[error(transparent)]
+    MpscSend(#[from] mpsc::error::SendError<Option<oneshot::Sender<()>>>),
+
+    #[error(transparent)]
+    OneshotRecv(#[from] oneshot::error::RecvError),
+}
+
+#[derive(Clone)]
+pub struct TracingHandle {
+    tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>>,
+}
+
+impl TracingHandle {
+    /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
+    /// If there is none enabled this will result in a noop.
+    ///
+    /// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which
+    /// will receive a message once the flush is completed.
+    pub async fn flush(&self, msg: Option<oneshot::Sender<()>>) -> Result<(), Error> {
+        if let Some(tx) = &self.tx {
+            Ok(tx.send(msg).await?)
+        } else {
+            // If we have a message passed in we need to notify the receiver
+            if let Some(tx) = msg {
+                let _ = tx.send(());
+            }
+            Ok(())
+        }
+    }
 
+    /// This will flush all all attached tracing providers and will wait until the flush is completed.
+    /// If no tracing providers like otlp are attached then this will be a noop.
+    ///
+    /// This should only be called on a regular shutdown.
+    /// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown)
+    /// otherwise you will get otlp errors.
+    pub async fn shutdown(&self) -> Result<(), Error> {
+        let (tx, rx) = tokio::sync::oneshot::channel();
+        self.flush(Some(tx)).await?;
+        rx.await?;
+        Ok(())
+    }
+
+    /// This will flush all all attached tracing providers and will wait until the flush is completed.
+    /// After this it will do some other necessary cleanup.
+    /// If no tracing providers like otlp are attached then this will be a noop.
+    ///
+    /// This should only be used if the tool received an ctrl_c otherwise you will get otlp errors.
+    /// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown)
+    /// method.
+    pub async fn force_shutdown(&self) -> Result<(), Error> {
+        let (tx, rx) = tokio::sync::oneshot::channel();
+        self.flush(Some(tx)).await?;
+        rx.await?;
+
+        #[cfg(feature = "otlp")]
+        {
+            // Because of a bug within otlp we currently have to use spawn_blocking otherwise
+            // calling `shutdown_tracer_provider` can block forever. See
+            // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
+            //
+            // This still throws an error, if the tool exits regularly: "OpenTelemetry trace error
+            // occurred. oneshot canceled", but not having this leads to errors if we cancel with
+            // ctrl_c.
+            // So this should right now only be used on ctrl_c, for a regular exit use the
+            // [shutdown](#shutdown) method
+            let _ = tokio::task::spawn_blocking(move || {
+                opentelemetry::global::shutdown_tracer_provider();
+            })
+            .await;
+        }
+
+        Ok(())
+    }
+}
+
+pub struct TracingBuilder {
+    level: Level,
+
+    #[cfg(feature = "otlp")]
+    service_name: Option<&'static str>,
+}
+
+impl Default for TracingBuilder {
+    fn default() -> Self {
+        TracingBuilder {
+            level: Level::INFO,
+
+            #[cfg(feature = "otlp")]
+            service_name: None,
+        }
+    }
+}
+
+impl TracingBuilder {
+    /// Set the log level of the stderr writer, RUST_LOG still has a higher priority over this
+    pub fn level(mut self, level: Level) -> TracingBuilder {
+        self.level = level;
+        self
+    }
+
+    #[cfg(feature = "otlp")]
+    /// Enable otlp by setting a custom service_name
+    pub fn enable_otlp(mut self, service_name: &'static str) -> TracingBuilder {
+        self.service_name = Some(service_name);
+        self
+    }
+
+    /// This will setup tracing based on the configuration passed in.
+    /// It will setup a stderr writer with the provided log level as filter (RUST_LOG still has a
+    /// higher priority over the configured value)
+    ///
+    /// It will also configure otlp if the feature is enabled and a service_name was provided. It
+    /// will then correctly setup a channel which is later used for flushing the provider.
+    pub fn build(self) -> Result<TracingHandle, Error> {
         // Set up the tracing subscriber.
-        tracing_subscriber::registry()
+        let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
+        let subscriber = tracing_subscriber::registry()
             .with(
                 tracing_subscriber::fmt::Layer::new()
                     .with_writer(indicatif_layer.get_stderr_writer())
                     .compact()
                     .with_filter(
                         EnvFilter::builder()
-                            .with_default_directive($level.into())
+                            .with_default_directive(self.level.into())
                             .from_env()
                             .expect("invalid RUST_LOG"),
                     ),
@@ -45,46 +162,73 @@ macro_rules! init_base_subscriber {
             .with(indicatif_layer.with_filter(
                 // only show progress for spans with indicatif.pb_show field being set
                 IndicatifFilter::new(false),
-            ))
-    }};
-}
+            ));
 
-pub fn init(level: Level) -> Result<(), tracing_subscriber::util::TryInitError> {
-    init_base_subscriber!(level).try_init()
-}
+        // Setup otlp if a service_name is configured
+        #[cfg(feature = "otlp")]
+        {
+            if let Some(service_name) = self.service_name {
+                let tracer = opentelemetry_otlp::new_pipeline()
+                    .tracing()
+                    .with_exporter(opentelemetry_otlp::new_exporter().tonic())
+                    .with_batch_config(BatchConfig::default())
+                    .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
+                        // use SdkProvidedResourceDetector.detect to detect resources,
+                        // but replace the default service name with our default.
+                        // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
+                        let resources =
+                            SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
+                        // SdkProvidedResourceDetector currently always sets
+                        // `service.name`, but we don't like its default.
+                        if resources.get("service.name".into()).unwrap() == "unknown_service".into()
+                        {
+                            resources.merge(&Resource::new([KeyValue::new(
+                                "service.name",
+                                service_name,
+                            )]))
+                        } else {
+                            resources
+                        }
+                    }))
+                    .install_batch(opentelemetry_sdk::runtime::Tokio)
+                    .expect("Failed to install batch exporter using Tokio");
 
-#[cfg(feature = "otlp")]
-pub fn init_with_otlp(
-    level: Level,
-    service_name: &'static str,
-) -> Result<(), tracing_subscriber::util::TryInitError> {
-    let subscriber = init_base_subscriber!(level);
-
-    let tracer = opentelemetry_otlp::new_pipeline()
-        .tracing()
-        .with_exporter(opentelemetry_otlp::new_exporter().tonic())
-        .with_batch_config(BatchConfig::default())
-        .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
-            // use SdkProvidedResourceDetector.detect to detect resources,
-            // but replace the default service name with our default.
-            // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
-            let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
-            // SdkProvidedResourceDetector currently always sets
-            // `service.name`, but we don't like its default.
-            if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
-                resources.merge(&Resource::new([KeyValue::new(
-                    "service.name",
-                    service_name,
-                )]))
-            } else {
-                resources
-            }
-        }))
-        .install_batch(opentelemetry_sdk::runtime::Tokio)
-        .expect("Failed to install tokio runtime");
+                // Trace provider is need for later use like flushing the provider.
+                // Needs to be kept around for each message to rx we need to handle.
+                let tracer_provider = tracer
+                    .provider()
+                    .expect("Failed to get the tracer provider");
 
-    // Create a tracing layer with the configured tracer
-    let layer = tracing_opentelemetry::layer().with_tracer(tracer);
+                // Set up a channel for flushing trace providers later
+                let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
+
+                // Spawning a task that listens on rx for any message. Once we receive a message we
+                // correctly call flush on the tracer_provider.
+                tokio::spawn(async move {
+                    while let Some(m) = rx.recv().await {
+                        // Because of a bug within otlp we currently have to use spawn_blocking
+                        // otherwise will calling `force_flush` block forever, especially if the
+                        // tool was closed with ctrl_c. See
+                        // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
+                        let _ = tokio::task::spawn_blocking({
+                            let tracer_provider = tracer_provider.clone();
+                            move || tracer_provider.force_flush()
+                        })
+                        .await;
+                        if let Some(tx) = m {
+                            let _ = tx.send(());
+                        }
+                    }
+                });
+
+                // Create a tracing layer with the configured tracer
+                let layer = tracing_opentelemetry::layer().with_tracer(tracer);
+                subscriber.with(Some(layer)).try_init()?;
+                return Ok(TracingHandle { tx: Some(tx) });
+            }
+        }
 
-    subscriber.with(Some(layer)).try_init()
+        subscriber.try_init()?;
+        Ok(TracingHandle { tx: None })
+    }
 }