sketching/
pipeline.rs

1use opentelemetry::{global, trace::TracerProvider as _, KeyValue};
2use opentelemetry_otlp::{
3    tonic_types::metadata::MetadataMap, Protocol, WithExportConfig, WithTonicConfig,
4};
5use opentelemetry_sdk::{
6    trace::{Sampler, SdkTracerProvider},
7    Resource,
8};
9use opentelemetry_semantic_conventions::{
10    attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_VERSION},
11    SCHEMA_URL,
12};
13use std::str::FromStr;
14use std::time::Duration;
15use tracing::Subscriber;
16use tracing_core::Level;
17use tracing_subscriber::{filter::Directive, prelude::*, EnvFilter, Registry};
18
19const MAX_EVENTS_PER_SPAN: u32 = 64 * 1024;
20const MAX_ATTRIBUTES_PER_SPAN: u32 = 128;
21
22/// This does all the startup things for the logging pipeline
23pub fn start_logging_pipeline(
24    otlp_endpoint: &Option<String>,
25    log_filter: crate::LogLevel,
26) -> Result<(Option<SdkTracerProvider>, Box<dyn Subscriber + Send + Sync>), String> {
27    // Always force the event span to be generated at the correct level, regardless
28    // of what the user set.
29    let kanidmd_core_directives = [
30        Directive::from_str("kanidmd_core::https::trace=info")
31            .map_err(|err| format!("Invalid directive during log setup: {}", err))?,
32        Directive::from_str("kanidmd_core::https::middleware=info")
33            .map_err(|err| format!("Invalid directive during log setup: {}", err))?,
34    ];
35
36    let mut logging_filter = EnvFilter::builder()
37        .with_default_directive(log_filter.into())
38        .parse("")
39        .map_err(|err| format!("Failed to create OTEL logging filter: {}", err))?;
40    for directive in kanidmd_core_directives.iter() {
41        logging_filter = logging_filter.add_directive(directive.clone());
42    }
43    logging_filter = logging_filter
44        // tell the tonic/grpc/h2 layers to trace at warn, so we can see connectivity issues
45        .add_directive(
46            Directive::from_str("tonic=warn")
47                .map_err(|err| format!("Failed to set tonic logging to warn: {}", err))?,
48        )
49        .add_directive(
50            Directive::from_str("hyper=warn")
51                .map_err(|err| format!("Failed to set hyper logging to warn: {}", err))?,
52        )
53        .add_directive(
54            Directive::from_str("h2=warn")
55                .map_err(|err| format!("Failed to set h2 logging to warn: {}", err))?,
56        )
57        .add_directive(
58            Directive::from_str("h2::proto::streams::prioritize=warn").map_err(|err| {
59                format!(
60                    "Failed to set h2::proto::streams::prioritize logging to warn: {}",
61                    err
62                )
63            })?,
64        );
65
66    eprintln!(
67        "Logging filter initialized: {:?}",
68        logging_filter.to_string()
69    );
70
71    // TODO: work out how to do metrics things
72    if let Some(endpoint) = otlp_endpoint {
73        eprintln!("Starting OTLP logging pipeline endpoint={}", endpoint);
74
75        // setup metadata so we can auth to third-party services
76        let mut tonic_metadata = MetadataMap::new();
77
78        if let Some(headers) = std::env::var_os("OTEL_EXPORTER_OTLP_HEADERS") {
79            let headers = headers.to_string_lossy();
80            for header in headers.split(',') {
81                if let Some((key, value)) = header.split_once('=') {
82                    if !key.is_empty() && key.eq_ignore_ascii_case("authorization") {
83                        if let Ok(header_value) = tonic::metadata::MetadataValue::from_str(value) {
84                            tonic_metadata.insert("authorization", header_value);
85                        } else {
86                            eprintln!(
87                            "Warning: could not parse OTEL_EXPORTER_OTLP_HEADERS environment variable, skipping this: {}", header
88                        );
89                        };
90                    }
91                }
92            }
93        }
94        let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
95            .with_tonic()
96            .with_endpoint(endpoint)
97            .with_metadata(tonic_metadata)
98            .with_protocol(Protocol::HttpBinary)
99            .with_timeout(Duration::from_secs(5))
100            .build()
101            .map_err(|err| err.to_string())?;
102
103        // this env var gets set at build time, if we can pull it, add it to the metadata
104        let git_rev = match option_env!("KANIDM_PKG_COMMIT_REV") {
105            Some(rev) => format!("-{rev}"),
106            None => "".to_string(),
107        };
108
109        let version = format!("{}{}", env!("CARGO_PKG_VERSION"), git_rev);
110        let hostname = gethostname::gethostname();
111        let hostname = hostname.to_string_lossy();
112        let hostname = hostname.to_lowercase();
113
114        let mut resource = Resource::builder().with_schema_url(
115            [
116                // TODO: it'd be really nice to be able to set the instance ID here, from the server UUID so we know *which* instance on this host is logging
117                KeyValue::new(SERVICE_VERSION, version),
118                KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, hostname),
119            ],
120            SCHEMA_URL,
121        );
122
123        // only set the service name if it's not already set in the environment because the SDK defaults to "unknown_service"
124        if std::env::var("OTEL_SERVICE_NAME").is_err() {
125            resource = resource.with_service_name("kanidmd");
126        }
127        let resource = resource.build();
128
129        let provider = opentelemetry_sdk::trace::TracerProviderBuilder::default()
130            .with_batch_exporter(otlp_exporter)
131            // we want *everything!*
132            .with_sampler(Sampler::AlwaysOn)
133            .with_max_events_per_span(MAX_EVENTS_PER_SPAN)
134            .with_max_attributes_per_span(MAX_ATTRIBUTES_PER_SPAN)
135            .with_resource(resource)
136            .build();
137
138        let provider_handle = provider.clone();
139
140        global::set_tracer_provider(provider.clone());
141        provider.tracer("tracing-otel-subscriber");
142        use tracing_opentelemetry::OpenTelemetryLayer;
143
144        let registry = tracing_subscriber::registry()
145            .with(
146                tracing_subscriber::filter::LevelFilter::from_level(Level::INFO)
147                    .with_filter(logging_filter.clone()),
148            )
149            .with(
150                OpenTelemetryLayer::new(provider.tracer("tracing-otel-subscriber"))
151                    .with_filter(logging_filter),
152            );
153
154        Ok((Some(provider_handle), Box::new(registry)))
155    } else {
156        let forest_layer = tracing_forest::ForestLayer::default().with_filter(logging_filter);
157        Ok((None, Box::new(Registry::default().with(forest_layer))))
158    }
159}
160
161/// This helps with cleanly shutting down the tracing/logging providers when done,
162/// so we don't lose traces.
163pub struct TracingPipelineGuard(pub Option<SdkTracerProvider>);
164
165impl Drop for TracingPipelineGuard {
166    fn drop(&mut self) {
167        if let Some(provider) = self.0.take() {
168            if let Err(err) = provider.shutdown() {
169                eprintln!("Error shutting down logging pipeline: {}", err);
170            } else {
171                eprintln!("Logging pipeline completed shutdown");
172            }
173        }
174    }
175}