1use opentelemetry::{global, trace::TracerProvider as _, KeyValue};
2use opentelemetry_otlp::{
3 tonic_types::metadata::MetadataMap, Protocol, WithExportConfig, WithTonicConfig,
4};
5use opentelemetry_sdk::{
6 trace::{Sampler, SdkTracerProvider},
7 Resource,
8};
9use opentelemetry_semantic_conventions::{
10 attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_VERSION},
11 SCHEMA_URL,
12};
13use std::str::FromStr;
14use std::time::Duration;
15use tracing::Subscriber;
16use tracing_core::Level;
17use tracing_subscriber::{filter::Directive, prelude::*, EnvFilter, Registry};
18
19const MAX_EVENTS_PER_SPAN: u32 = 64 * 1024;
20const MAX_ATTRIBUTES_PER_SPAN: u32 = 128;
21
22pub fn start_logging_pipeline(
24 otlp_endpoint: &Option<String>,
25 log_filter: crate::LogLevel,
26) -> Result<(Option<SdkTracerProvider>, Box<dyn Subscriber + Send + Sync>), String> {
27 let kanidmd_core_directives = [
30 Directive::from_str("kanidmd_core::https::trace=info")
31 .map_err(|err| format!("Invalid directive during log setup: {}", err))?,
32 Directive::from_str("kanidmd_core::https::middleware=info")
33 .map_err(|err| format!("Invalid directive during log setup: {}", err))?,
34 ];
35
36 let mut logging_filter = EnvFilter::builder()
37 .with_default_directive(log_filter.into())
38 .parse("")
39 .map_err(|err| format!("Failed to create OTEL logging filter: {}", err))?;
40 for directive in kanidmd_core_directives.iter() {
41 logging_filter = logging_filter.add_directive(directive.clone());
42 }
43 logging_filter = logging_filter
44 .add_directive(
46 Directive::from_str("tonic=warn")
47 .map_err(|err| format!("Failed to set tonic logging to warn: {}", err))?,
48 )
49 .add_directive(
50 Directive::from_str("hyper=warn")
51 .map_err(|err| format!("Failed to set hyper logging to warn: {}", err))?,
52 )
53 .add_directive(
54 Directive::from_str("h2=warn")
55 .map_err(|err| format!("Failed to set h2 logging to warn: {}", err))?,
56 )
57 .add_directive(
58 Directive::from_str("h2::proto::streams::prioritize=warn").map_err(|err| {
59 format!(
60 "Failed to set h2::proto::streams::prioritize logging to warn: {}",
61 err
62 )
63 })?,
64 );
65
66 eprintln!(
67 "Logging filter initialized: {:?}",
68 logging_filter.to_string()
69 );
70
71 if let Some(endpoint) = otlp_endpoint {
73 eprintln!("Starting OTLP logging pipeline endpoint={}", endpoint);
74
75 let mut tonic_metadata = MetadataMap::new();
77
78 if let Some(headers) = std::env::var_os("OTEL_EXPORTER_OTLP_HEADERS") {
79 let headers = headers.to_string_lossy();
80 for header in headers.split(',') {
81 if let Some((key, value)) = header.split_once('=') {
82 if !key.is_empty() && key.eq_ignore_ascii_case("authorization") {
83 if let Ok(header_value) = tonic::metadata::MetadataValue::from_str(value) {
84 tonic_metadata.insert("authorization", header_value);
85 } else {
86 eprintln!(
87 "Warning: could not parse OTEL_EXPORTER_OTLP_HEADERS environment variable, skipping this: {}", header
88 );
89 };
90 }
91 }
92 }
93 }
94 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
95 .with_tonic()
96 .with_endpoint(endpoint)
97 .with_metadata(tonic_metadata)
98 .with_protocol(Protocol::HttpBinary)
99 .with_timeout(Duration::from_secs(5))
100 .build()
101 .map_err(|err| err.to_string())?;
102
103 let git_rev = match option_env!("KANIDM_PKG_COMMIT_REV") {
105 Some(rev) => format!("-{rev}"),
106 None => "".to_string(),
107 };
108
109 let version = format!("{}{}", env!("CARGO_PKG_VERSION"), git_rev);
110 let hostname = gethostname::gethostname();
111 let hostname = hostname.to_string_lossy();
112 let hostname = hostname.to_lowercase();
113
114 let mut resource = Resource::builder().with_schema_url(
115 [
116 KeyValue::new(SERVICE_VERSION, version),
118 KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, hostname),
119 ],
120 SCHEMA_URL,
121 );
122
123 if std::env::var("OTEL_SERVICE_NAME").is_err() {
125 resource = resource.with_service_name("kanidmd");
126 }
127 let resource = resource.build();
128
129 let provider = opentelemetry_sdk::trace::TracerProviderBuilder::default()
130 .with_batch_exporter(otlp_exporter)
131 .with_sampler(Sampler::AlwaysOn)
133 .with_max_events_per_span(MAX_EVENTS_PER_SPAN)
134 .with_max_attributes_per_span(MAX_ATTRIBUTES_PER_SPAN)
135 .with_resource(resource)
136 .build();
137
138 let provider_handle = provider.clone();
139
140 global::set_tracer_provider(provider.clone());
141 provider.tracer("tracing-otel-subscriber");
142 use tracing_opentelemetry::OpenTelemetryLayer;
143
144 let registry = tracing_subscriber::registry()
145 .with(
146 tracing_subscriber::filter::LevelFilter::from_level(Level::INFO)
147 .with_filter(logging_filter.clone()),
148 )
149 .with(
150 OpenTelemetryLayer::new(provider.tracer("tracing-otel-subscriber"))
151 .with_filter(logging_filter),
152 );
153
154 Ok((Some(provider_handle), Box::new(registry)))
155 } else {
156 let forest_layer = tracing_forest::ForestLayer::default().with_filter(logging_filter);
157 Ok((None, Box::new(Registry::default().with(forest_layer))))
158 }
159}
160
161pub struct TracingPipelineGuard(pub Option<SdkTracerProvider>);
164
165impl Drop for TracingPipelineGuard {
166 fn drop(&mut self) {
167 if let Some(provider) = self.0.take() {
168 if let Err(err) = provider.shutdown() {
169 eprintln!("Error shutting down logging pipeline: {}", err);
170 } else {
171 eprintln!("Logging pipeline completed shutdown");
172 }
173 }
174 }
175}