sketching/
otel.rs

1use std::{str::FromStr, time::Duration};
2
3use opentelemetry_otlp::{Protocol, WithExportConfig};
4
5use opentelemetry::{global, trace::TracerProvider as _, KeyValue};
6
7use opentelemetry_sdk::{
8    trace::{Sampler, SdkTracerProvider},
9    Resource,
10};
11use tracing::Subscriber;
12use tracing_core::Level;
13
14use tracing_subscriber::{filter::Directive, prelude::*, EnvFilter, Registry};
15
16pub const MAX_EVENTS_PER_SPAN: u32 = 64 * 1024;
17pub const MAX_ATTRIBUTES_PER_SPAN: u32 = 128;
18
19use opentelemetry_semantic_conventions::{
20    attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION},
21    SCHEMA_URL,
22};
23
24// TODO: this is coming back later
25// #[allow(dead_code)]
26// pub fn init_metrics() -> metrics::Result<MeterProvider> {
27//     let export_config = opentelemetry_otlp::ExportConfig {
28//         endpoint: "http://localhost:4318/v1/metrics".to_string(),
29//         ..opentelemetry_otlp::ExportConfig::default()
30//     };
31//     opentelemetry_otlp::new_pipeline()
32//         .metrics(opentelemetry_sdk::runtime::Tokio)
33//         .with_exporter(
34//             opentelemetry_otlp::new_exporter()
35//                 .http()
36//                 .with_export_config(export_config),
37//         )
38//         .build()
39// }
40
41/// This does all the startup things for the logging pipeline
42pub fn start_logging_pipeline(
43    otlp_endpoint: &Option<String>,
44    log_filter: crate::LogLevel,
45    service_name: &'static str,
46) -> Result<(Option<SdkTracerProvider>, Box<dyn Subscriber + Send + Sync>), String> {
47    let forest_filter: EnvFilter = EnvFilter::builder()
48        .with_default_directive(log_filter.into())
49        .from_env_lossy();
50
51    // TODO: work out how to do metrics things
52    match otlp_endpoint {
53        Some(endpoint) => {
54            // adding these filters because when you close out the process the OTLP comms layer is NOISY
55            let forest_filter = forest_filter
56                .add_directive(
57                    Directive::from_str("tonic=info").expect("Failed to set tonic logging to info"),
58                )
59                .add_directive(
60                    Directive::from_str("h2=info").expect("Failed to set h2 logging to info"),
61                )
62                .add_directive(
63                    Directive::from_str("hyper=info").expect("Failed to set hyper logging to info"),
64                );
65            let forest_layer = tracing_forest::ForestLayer::default().with_filter(forest_filter);
66            let t_filter: EnvFilter = EnvFilter::builder()
67                .with_default_directive(log_filter.into())
68                .from_env_lossy();
69
70            let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
71                .with_tonic()
72                .with_endpoint(endpoint)
73                .with_protocol(Protocol::HttpBinary)
74                .with_timeout(Duration::from_secs(5))
75                .build()
76                .map_err(|err| err.to_string())?;
77
78            // this env var gets set at build time, if we can pull it, add it to the metadata
79            let git_rev = match option_env!("KANIDM_PKG_COMMIT_REV") {
80                Some(rev) => format!("-{rev}"),
81                None => "".to_string(),
82            };
83
84            let version = format!("{}{}", env!("CARGO_PKG_VERSION"), git_rev);
85            let hostname = gethostname::gethostname();
86            let hostname = hostname.to_string_lossy();
87            let hostname = hostname.to_lowercase();
88
89            let resource = Resource::builder()
90                .with_schema_url(
91                    [
92                        // TODO: it'd be really nice to be able to set the instance ID here, from the server UUID so we know *which* instance on this host is logging
93                        KeyValue::new(SERVICE_NAME, service_name),
94                        KeyValue::new(SERVICE_VERSION, version),
95                        KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, hostname),
96                    ],
97                    SCHEMA_URL,
98                )
99                .build();
100
101            let provider = opentelemetry_sdk::trace::TracerProviderBuilder::default()
102                .with_batch_exporter(otlp_exporter)
103                // we want *everything!*
104                .with_sampler(Sampler::AlwaysOn)
105                .with_max_events_per_span(MAX_EVENTS_PER_SPAN)
106                .with_max_attributes_per_span(MAX_ATTRIBUTES_PER_SPAN)
107                .with_resource(resource)
108                .build();
109
110            let provider_handle = provider.clone();
111
112            global::set_tracer_provider(provider.clone());
113            provider.tracer("tracing-otel-subscriber");
114            use tracing_opentelemetry::OpenTelemetryLayer;
115
116            let registry = tracing_subscriber::registry()
117                .with(
118                    tracing_subscriber::filter::LevelFilter::from_level(Level::INFO)
119                        .with_filter(t_filter),
120                )
121                // .with(MetricsLayer::new(meter_provider.clone()))
122                .with(forest_layer)
123                .with(OpenTelemetryLayer::new(
124                    provider.tracer("tracing-otel-subscriber"),
125                ));
126
127            Ok((Some(provider_handle), Box::new(registry)))
128        }
129        None => {
130            let forest_layer = tracing_forest::ForestLayer::default().with_filter(forest_filter);
131            Ok((None, Box::new(Registry::default().with(forest_layer))))
132        }
133    }
134}
135
136/// This helps with cleanly shutting down the tracing/logging providers when done,
137/// so we don't lose traces.
138pub struct TracingPipelineGuard(pub Option<SdkTracerProvider>);
139
140impl Drop for TracingPipelineGuard {
141    fn drop(&mut self) {
142        if let Some(provider) = self.0.take() {
143            if let Err(err) = provider.shutdown() {
144                eprintln!("Error shutting down logging pipeline: {}", err);
145            } else {
146                eprintln!("Logging pipeline completed shutdown");
147            }
148        }
149    }
150}