about summary refs log tree commit diff
path: root/tvix/tracing/src/lib.rs
blob: 3362818bb1804d69096d031d3d2b845743958877 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
use indicatif::ProgressStyle;
use std::sync::LazyLock;
use tokio::sync::{mpsc, oneshot};
use tracing::level_filters::LevelFilter;
use tracing_indicatif::{
    filter::IndicatifFilter, util::FilteredFormatFields, writer, IndicatifLayer, IndicatifWriter,
};
use tracing_subscriber::{
    layer::{Identity, SubscriberExt},
    util::SubscriberInitExt,
    EnvFilter, Layer, Registry,
};

#[cfg(feature = "otlp")]
use opentelemetry::{
    trace::{Tracer, TracerProvider},
    KeyValue,
};
#[cfg(feature = "otlp")]
use opentelemetry_sdk::{
    propagation::TraceContextPropagator,
    resource::{ResourceDetector, SdkProvidedResourceDetector},
    trace::BatchConfigBuilder,
    Resource,
};
#[cfg(feature = "tracy")]
use tracing_tracy::TracyLayer;

pub mod propagate;

pub static PB_PROGRESS_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
    ProgressStyle::with_template(
        "{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}",
    )
    .expect("invalid progress template")
});
pub static PB_TRANSFER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
    ProgressStyle::with_template(
        "{span_child_prefix} {wide_msg} {binary_bytes:>7}/{binary_total_bytes:7}@{decimal_bytes_per_sec} ({elapsed}) {bar:10} "
    )
    .expect("invalid progress template")
});
pub static PB_SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
    ProgressStyle::with_template(
        "{span_child_prefix}{spinner} {wide_msg} ({elapsed}) {pos:>7}/{len:7}",
    )
    .expect("invalid progress template")
});

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error(transparent)]
    Init(#[from] tracing_subscriber::util::TryInitError),

    #[error(transparent)]
    MpscSend(#[from] mpsc::error::SendError<Option<oneshot::Sender<()>>>),

    #[error(transparent)]
    OneshotRecv(#[from] oneshot::error::RecvError),
}

#[derive(Clone)]
pub struct TracingHandle {
    tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>>,

    stdout_writer: IndicatifWriter<writer::Stdout>,
    stderr_writer: IndicatifWriter<writer::Stderr>,
}

impl TracingHandle {
    /// Returns a writer for [std::io::Stdout] that ensures its output will not be clobbered by
    /// active progress bars.
    ///
    /// Instead of `println!(...)` prefer `writeln!(handle.get_stdout_writer(), ...)`
    pub fn get_stdout_writer(&self) -> IndicatifWriter<writer::Stdout> {
        // clone is fine here because its only a wrapper over an `Arc`
        self.stdout_writer.clone()
    }

    /// Returns a writer for [std::io::Stderr] that ensures its output will not be clobbered by
    /// active progress bars.
    ///
    /// Instead of `println!(...)` prefer `writeln!(handle.get_stderr_writer(), ...)`.
    pub fn get_stderr_writer(&self) -> IndicatifWriter<writer::Stderr> {
        // clone is fine here because its only a wrapper over an `Arc`
        self.stderr_writer.clone()
    }

    /// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
    /// If there is none enabled this will result in a noop.
    ///
    /// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which
    /// will receive a message once the flush is completed.
    pub async fn flush(&self, msg: Option<oneshot::Sender<()>>) -> Result<(), Error> {
        if let Some(tx) = &self.tx {
            Ok(tx.send(msg).await?)
        } else {
            // If we have a message passed in we need to notify the receiver
            if let Some(tx) = msg {
                let _ = tx.send(());
            }
            Ok(())
        }
    }

    /// This will flush all all attached tracing providers and will wait until the flush is completed.
    /// If no tracing providers like otlp are attached then this will be a noop.
    ///
    /// This should only be called on a regular shutdown.
    /// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown)
    /// otherwise you will get otlp errors.
    pub async fn shutdown(&self) -> Result<(), Error> {
        let (tx, rx) = tokio::sync::oneshot::channel();
        self.flush(Some(tx)).await?;
        rx.await?;
        Ok(())
    }

    /// This will flush all all attached tracing providers and will wait until the flush is completed.
    /// After this it will do some other necessary cleanup.
    /// If no tracing providers like otlp are attached then this will be a noop.
    ///
    /// This should only be used if the tool received an ctrl_c otherwise you will get otlp errors.
    /// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown)
    /// method.
    pub async fn force_shutdown(&self) -> Result<(), Error> {
        let (tx, rx) = tokio::sync::oneshot::channel();
        self.flush(Some(tx)).await?;
        rx.await?;

        #[cfg(feature = "otlp")]
        {
            // Because of a bug within otlp we currently have to use spawn_blocking otherwise
            // calling `shutdown_tracer_provider` can block forever. See
            // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
            //
            // This still throws an error, if the tool exits regularly: "OpenTelemetry trace error
            // occurred. oneshot canceled", but not having this leads to errors if we cancel with
            // ctrl_c.
            // So this should right now only be used on ctrl_c, for a regular exit use the
            // [shutdown](#shutdown) method
            let _ = tokio::task::spawn_blocking(move || {
                opentelemetry::global::shutdown_tracer_provider();
            })
            .await;
        }

        Ok(())
    }
}

#[must_use = "Don't forget to call build() to enable tracing."]
#[derive(Default)]
pub struct TracingBuilder {
    progess_bar: bool,

    #[cfg(feature = "otlp")]
    service_name: Option<&'static str>,
}

impl TracingBuilder {
    #[cfg(feature = "otlp")]
    /// Enable otlp by setting a custom service_name
    pub fn enable_otlp(mut self, service_name: &'static str) -> TracingBuilder {
        self.service_name = Some(service_name);
        self
    }

    /// Enable progress bar layer, default is disabled
    pub fn enable_progressbar(mut self) -> TracingBuilder {
        self.progess_bar = true;
        self
    }

    /// This will setup tracing based on the configuration passed in.
    /// It will setup a stderr writer output layer and configure EnvFilter to honor RUST_LOG.
    /// The EnvFilter will be applied to all configured layers, also otlp.
    ///
    /// It will also configure otlp if the feature is enabled and a service_name was provided. It
    /// will then correctly setup a channel which is later used for flushing the provider.
    pub fn build(self) -> Result<TracingHandle, Error> {
        self.build_with_additional(Identity::new())
    }

    /// Similar to `build()` but allows passing in an additional tracing [`Layer`].
    ///
    /// This method is generic over the `Layer` to avoid the runtime cost of dynamic dispatch.
    /// While it only allows passing a single `Layer`, it can be composed of multiple ones:
    ///
    /// ```ignore
    /// build_with_additional(
    ///   fmt::layer()
    ///     .and_then(some_other_layer)
    ///     .and_then(yet_another_layer)
    ///     .with_filter(my_filter)
    /// )
    /// ```
    /// [`Layer`]: tracing_subscriber::layer::Layer
    pub fn build_with_additional<L>(self, additional_layer: L) -> Result<TracingHandle, Error>
    where
        L: Layer<Registry> + Send + Sync + 'static,
    {
        // Set up the tracing subscriber.
        let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
        let stdout_writer = indicatif_layer.get_stdout_writer();
        let stderr_writer = indicatif_layer.get_stderr_writer();

        let layered = tracing_subscriber::fmt::Layer::new()
            .fmt_fields(FilteredFormatFields::new(
                tracing_subscriber::fmt::format::DefaultFields::new(),
                |field| field.name() != "indicatif.pb_show",
            ))
            .with_writer(indicatif_layer.get_stderr_writer())
            .compact()
            .and_then((self.progess_bar).then(|| {
                indicatif_layer.with_filter(
                    // only show progress for spans with indicatif.pb_show field being set
                    IndicatifFilter::new(false),
                )
            }));
        #[cfg(feature = "tracy")]
        let layered = layered.and_then(TracyLayer::default());

        let mut tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>> = None;

        // Setup otlp if a service_name is configured
        #[cfg(feature = "otlp")]
        let layered = layered.and_then({
            if let Some(service_name) = self.service_name {
                // register a text map propagator for trace propagation
                opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());

                let (tracer, sender) = gen_otlp_tracer(service_name.to_string());
                tx = Some(sender);
                // Create a tracing layer with the configured tracer
                Some(tracing_opentelemetry::layer().with_tracer(tracer))
            } else {
                None
            }
        });

        let layered = layered.with_filter(
            EnvFilter::builder()
                .with_default_directive(LevelFilter::INFO.into())
                .from_env()
                .expect("invalid RUST_LOG"),
        );

        tracing_subscriber::registry()
            // TODO: if additional_layer has global filters, there is a risk that it will disable the "default" ones,
            // while it could be solved by registering `additional_layer` last, it requires boxing `additional_layer`.
            .with(additional_layer)
            .with(layered)
            .try_init()?;

        Ok(TracingHandle {
            tx,
            stdout_writer,
            stderr_writer,
        })
    }
}

/// Returns an OTLP tracer, and the TX part of a channel, which can be used
/// to request flushes (and signal back the completion of the flush).
#[cfg(feature = "otlp")]
fn gen_otlp_tracer(
    service_name: String,
) -> (
    impl Tracer + tracing_opentelemetry::PreSampledTracer,
    mpsc::Sender<Option<oneshot::Sender<()>>>,
) {
    let tracer_provider = opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(opentelemetry_otlp::new_exporter().tonic())
        .with_batch_config(
            BatchConfigBuilder::default()
                // the default values for `max_export_batch_size` is set to 512, which we will fill
                // pretty quickly, which will then result in an export. We want to make sure that
                // the export is only done once the schedule is met and not as soon as 512 spans
                // are collected.
                .with_max_export_batch_size(4096)
                // analog to default config `max_export_batch_size * 4`
                .with_max_queue_size(4096 * 4)
                // only force an export to the otlp collector every 10 seconds to reduce the amount
                // of error messages if an otlp collector is not available
                .with_scheduled_delay(std::time::Duration::from_secs(10))
                .build(),
        )
        .with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource({
            // use SdkProvidedResourceDetector.detect to detect resources,
            // but replace the default service name with our default.
            // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
            let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
            // SdkProvidedResourceDetector currently always sets
            // `service.name`, but we don't like its default.
            if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
                resources.merge(&Resource::new([KeyValue::new(
                    "service.name",
                    service_name,
                )]))
            } else {
                resources
            }
        }))
        .install_batch(opentelemetry_sdk::runtime::Tokio)
        .expect("Failed to install batch exporter using Tokio");

    // Trace provider is need for later use like flushing the provider.
    // Needs to be kept around for each message to rx we need to handle.
    let tracer = tracer_provider.tracer("tvix");

    // Set up a channel for flushing trace providers later
    let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);

    // Spawning a task that listens on rx for any message. Once we receive a message we
    // correctly call flush on the tracer_provider.
    tokio::spawn(async move {
        while let Some(m) = rx.recv().await {
            // Because of a bug within otlp we currently have to use spawn_blocking
            // otherwise will calling `force_flush` block forever, especially if the
            // tool was closed with ctrl_c. See
            // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
            let _ = tokio::task::spawn_blocking({
                let tracer_provider = tracer_provider.clone();
                move || tracer_provider.force_flush()
            })
            .await;
            if let Some(tx) = m {
                let _ = tx.send(());
            }
        }
    });

    (tracer, tx)
}