kanidmd_core/
lib.rs

1//! These contain the server "cores". These are able to startup the server
2//! (bootstrap) to a running state and then execute tasks. This is where modules
3//! are logically ordered based on their depenedncies for execution. Some of these
4//! are task-only i.e. reindexing, and some of these launch the server into a
5//! fully operational state (https, ldap, etc).
6//!
7//! Generally, this is the "entry point" where the server begins to run, and
8//! the entry point for all client traffic which is then directed to the
9//! various `actors`.
10
11#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23#![deny(clippy::indexing_slicing)]
24
25#[macro_use]
26extern crate tracing;
27#[macro_use]
28extern crate kanidmd_lib;
29
30mod actors;
31pub mod admin;
32pub mod config;
33mod crypto;
34mod https;
35mod interval;
36mod ldaps;
37mod repl;
38mod tcp;
39mod utils;
40
41use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
42use crate::admin::AdminActor;
43use crate::config::Configuration;
44use crate::interval::IntervalActor;
45use crate::utils::touch_file_or_quit;
46use compact_jwt::{JwsHs256Signer, JwsSigner};
47use crypto_glue::{
48    s256::{Sha256, Sha256Output},
49    traits::Digest,
50};
51use kanidm_proto::backup::BackupCompression;
52use kanidm_proto::config::ServerRole;
53use kanidm_proto::internal::OperationError;
54use kanidm_proto::scim_v1::client::ScimAssertGeneric;
55use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
56use kanidmd_lib::idm::ldap::LdapServer;
57use kanidmd_lib::prelude::*;
58use kanidmd_lib::schema::Schema;
59use kanidmd_lib::status::StatusActor;
60use kanidmd_lib::value::CredentialType;
61use regex::Regex;
62use std::collections::BTreeSet;
63use std::fmt::{Display, Formatter};
64use std::path::Path;
65use std::path::PathBuf;
66use std::sync::Arc;
67use std::sync::LazyLock;
68use tokio::sync::broadcast;
69use tokio::task;
70
71#[cfg(not(target_family = "windows"))]
72use libc::umask;
73
74// === internal setup helpers
75
76fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
77    setup_backend_vacuum(config, schema, false)
78}
79
80fn setup_backend_vacuum(
81    config: &Configuration,
82    schema: &Schema,
83    vacuum: bool,
84) -> Result<Backend, OperationError> {
85    // Limit the scope of the schema txn.
86    // let schema_txn = task::block_on(schema.write());
87    let schema_txn = schema.write();
88    let idxmeta = schema_txn.reload_idxmeta();
89
90    let pool_size: u32 = config.threads as u32;
91
92    let cfg = BackendConfig::new(
93        config.db_path.as_deref(),
94        pool_size,
95        config.db_fs_type.unwrap_or_default(),
96        config.db_arc_size,
97    );
98
99    Backend::new(cfg, idxmeta, vacuum)
100}
101
102// TODO #54: We could move most of the be/schema/qs setup and startup
103// outside of this call, then pass in "what we need" in a cloneable
104// form, this way we could have separate Idm vs Qs threads, and dedicated
105// threads for write vs read
106async fn setup_qs_idms(
107    be: Backend,
108    schema: Schema,
109    config: &Configuration,
110) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
111    let curtime = duration_from_epoch_now();
112    // Create a query_server implementation
113    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
114
115    // TODO #62: Should the IDM parts be broken out to the IdmServer?
116    // What's important about this initial setup here is that it also triggers
117    // the schema and acp reload, so they are now configured correctly!
118    // Initialise the schema core.
119    //
120    // Now search for the schema itself, and validate that the system
121    // in memory matches the BE on disk, and that it's syntactically correct.
122    // Write it out if changes are needed.
123    query_server
124        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
125        .await?;
126
127    // We generate a SINGLE idms only!
128    let is_integration_test = config.integration_test_config.is_some();
129    let (idms, idms_delayed, idms_audit) = IdmServer::new(
130        query_server.clone(),
131        &config.origin,
132        is_integration_test,
133        curtime,
134    )
135    .await?;
136
137    Ok((query_server, idms, idms_delayed, idms_audit))
138}
139
140async fn setup_qs(
141    be: Backend,
142    schema: Schema,
143    config: &Configuration,
144) -> Result<QueryServer, OperationError> {
145    let curtime = duration_from_epoch_now();
146    // Create a query_server implementation
147    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
148
149    // TODO #62: Should the IDM parts be broken out to the IdmServer?
150    // What's important about this initial setup here is that it also triggers
151    // the schema and acp reload, so they are now configured correctly!
152    // Initialise the schema core.
153    //
154    // Now search for the schema itself, and validate that the system
155    // in memory matches the BE on disk, and that it's syntactically correct.
156    // Write it out if changes are needed.
157    query_server
158        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
159        .await?;
160
161    Ok(query_server)
162}
163
164macro_rules! dbscan_setup_be {
165    (
166        $config:expr
167    ) => {{
168        let schema = match Schema::new() {
169            Ok(s) => s,
170            Err(e) => {
171                error!("Failed to setup in memory schema: {:?}", e);
172                std::process::exit(1);
173            }
174        };
175
176        match setup_backend($config, &schema) {
177            Ok(be) => be,
178            Err(e) => {
179                error!("Failed to setup BE: {:?}", e);
180                return;
181            }
182        }
183    }};
184}
185
186pub fn dbscan_list_indexes_core(config: &Configuration) {
187    let be = dbscan_setup_be!(config);
188    let mut be_rotxn = match be.read() {
189        Ok(txn) => txn,
190        Err(err) => {
191            error!(?err, "Unable to proceed, backend read transaction failure.");
192            return;
193        }
194    };
195
196    match be_rotxn.list_indexes() {
197        Ok(mut idx_list) => {
198            idx_list.sort_unstable();
199            idx_list.iter().for_each(|idx_name| {
200                println!("{idx_name}");
201            })
202        }
203        Err(e) => {
204            error!("Failed to retrieve index list: {:?}", e);
205        }
206    };
207}
208
209pub fn dbscan_list_id2entry_core(config: &Configuration) {
210    let be = dbscan_setup_be!(config);
211    let mut be_rotxn = match be.read() {
212        Ok(txn) => txn,
213        Err(err) => {
214            error!(?err, "Unable to proceed, backend read transaction failure.");
215            return;
216        }
217    };
218
219    match be_rotxn.list_id2entry() {
220        Ok(mut id_list) => {
221            id_list.sort_unstable_by_key(|k| k.0);
222            id_list.iter().for_each(|(id, value)| {
223                println!("{id:>8}: {value}");
224            })
225        }
226        Err(e) => {
227            error!("Failed to retrieve id2entry list: {:?}", e);
228        }
229    };
230}
231
232pub fn dbscan_list_index_analysis_core(config: &Configuration) {
233    let _be = dbscan_setup_be!(config);
234    // TBD in after slopes merge.
235}
236
237pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
238    let be = dbscan_setup_be!(config);
239    let mut be_rotxn = match be.read() {
240        Ok(txn) => txn,
241        Err(err) => {
242            error!(?err, "Unable to proceed, backend read transaction failure.");
243            return;
244        }
245    };
246
247    match be_rotxn.list_index_content(index_name) {
248        Ok(mut idx_list) => {
249            idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
250            idx_list.iter().for_each(|(key, value)| {
251                println!("{key:>50}: {value:?}");
252            })
253        }
254        Err(e) => {
255            error!("Failed to retrieve index list: {:?}", e);
256        }
257    };
258}
259
260pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
261    let be = dbscan_setup_be!(config);
262    let mut be_rotxn = match be.read() {
263        Ok(txn) => txn,
264        Err(err) => {
265            error!(?err, "Unable to proceed, backend read transaction failure.");
266            return;
267        }
268    };
269
270    match be_rotxn.get_id2entry(id) {
271        Ok((id, value)) => println!("{id:>8}: {value}"),
272        Err(e) => {
273            error!("Failed to retrieve id2entry value: {:?}", e);
274        }
275    };
276}
277
278pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
279    let be = dbscan_setup_be!(config);
280    let mut be_wrtxn = match be.write() {
281        Ok(txn) => txn,
282        Err(err) => {
283            error!(
284                ?err,
285                "Unable to proceed, backend write transaction failure."
286            );
287            return;
288        }
289    };
290
291    match be_wrtxn
292        .quarantine_entry(id)
293        .and_then(|_| be_wrtxn.commit())
294    {
295        Ok(()) => {
296            println!("quarantined - {id:>8}")
297        }
298        Err(e) => {
299            error!("Failed to quarantine id2entry value: {:?}", e);
300        }
301    };
302}
303
304pub fn dbscan_list_quarantined_core(config: &Configuration) {
305    let be = dbscan_setup_be!(config);
306    let mut be_rotxn = match be.read() {
307        Ok(txn) => txn,
308        Err(err) => {
309            error!(?err, "Unable to proceed, backend read transaction failure.");
310            return;
311        }
312    };
313
314    match be_rotxn.list_quarantined() {
315        Ok(mut id_list) => {
316            id_list.sort_unstable_by_key(|k| k.0);
317            id_list.iter().for_each(|(id, value)| {
318                println!("{id:>8}: {value}");
319            })
320        }
321        Err(e) => {
322            error!("Failed to retrieve id2entry list: {:?}", e);
323        }
324    };
325}
326
327pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
328    let be = dbscan_setup_be!(config);
329    let mut be_wrtxn = match be.write() {
330        Ok(txn) => txn,
331        Err(err) => {
332            error!(
333                ?err,
334                "Unable to proceed, backend write transaction failure."
335            );
336            return;
337        }
338    };
339
340    match be_wrtxn
341        .restore_quarantined(id)
342        .and_then(|_| be_wrtxn.commit())
343    {
344        Ok(()) => {
345            println!("restored - {id:>8}")
346        }
347        Err(e) => {
348            error!("Failed to restore quarantined id2entry value: {:?}", e);
349        }
350    };
351}
352
353pub fn backup_server_core(config: &Configuration, dst_path: Option<&Path>) {
354    let schema = match Schema::new() {
355        Ok(s) => s,
356        Err(e) => {
357            error!("Failed to setup in memory schema: {:?}", e);
358            std::process::exit(1);
359        }
360    };
361
362    let be = match setup_backend(config, &schema) {
363        Ok(be) => be,
364        Err(e) => {
365            error!("Failed to setup BE: {:?}", e);
366            return;
367        }
368    };
369
370    let mut be_ro_txn = match be.read() {
371        Ok(txn) => txn,
372        Err(err) => {
373            error!(?err, "Unable to proceed, backend read transaction failure.");
374            return;
375        }
376    };
377
378    let compression = match config.online_backup.as_ref() {
379        Some(backup_config) => backup_config.compression,
380        None => BackupCompression::default(),
381    };
382
383    if let Some(dst_path) = dst_path {
384        if dst_path.exists() {
385            error!(
386                "backup file {} already exists, will not overwrite it.",
387                dst_path.display()
388            );
389            return;
390        }
391
392        let output = match std::fs::File::create(dst_path) {
393            Ok(output) => output,
394            Err(err) => {
395                error!(?err, "File::create error creating {}", dst_path.display());
396                return;
397            }
398        };
399
400        match be_ro_txn.backup(output, compression) {
401            Ok(_) => info!("Backup success!"),
402            Err(e) => {
403                error!("Backup failed: {:?}", e);
404                std::process::exit(1);
405            }
406        };
407    } else {
408        // No path set, default to stdout
409        let stdout = std::io::stdout().lock();
410
411        match be_ro_txn.backup(stdout, compression) {
412            Ok(_) => info!("Backup success!"),
413            Err(e) => {
414                error!("Backup failed: {:?}", e);
415                std::process::exit(1);
416            }
417        };
418    };
419    // Let the txn abort, even on success.
420}
421
422pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
423    // If it's an in memory database, we don't need to touch anything
424    if let Some(db_path) = config.db_path.as_ref() {
425        touch_file_or_quit(db_path);
426    }
427
428    // First, we provide the in-memory schema so that core attrs are indexed correctly.
429    let schema = match Schema::new() {
430        Ok(s) => s,
431        Err(e) => {
432            error!("Failed to setup in memory schema: {:?}", e);
433            std::process::exit(1);
434        }
435    };
436
437    let be = match setup_backend(config, &schema) {
438        Ok(be) => be,
439        Err(e) => {
440            error!("Failed to setup backend: {:?}", e);
441            return;
442        }
443    };
444
445    let mut be_wr_txn = match be.write() {
446        Ok(txn) => txn,
447        Err(err) => {
448            error!(
449                ?err,
450                "Unable to proceed, backend write transaction failure."
451            );
452            return;
453        }
454    };
455
456    let compression = BackupCompression::identify_file(dst_path);
457
458    let input = match std::fs::File::open(dst_path) {
459        Ok(output) => output,
460        Err(err) => {
461            error!(?err, "File::open error reading {}", dst_path.display());
462            return;
463        }
464    };
465
466    let r = be_wr_txn
467        .restore(input, compression)
468        .and_then(|_| be_wr_txn.commit());
469
470    if r.is_err() {
471        error!("Failed to restore database: {:?}", r);
472        std::process::exit(1);
473    }
474    info!("Database loaded successfully");
475
476    reindex_inner(be, schema, config).await;
477
478    info!("✅ Restore Success!");
479}
480
481pub async fn reindex_server_core(config: &Configuration) {
482    info!("Start Index Phase 1 ...");
483    // First, we provide the in-memory schema so that core attrs are indexed correctly.
484    let schema = match Schema::new() {
485        Ok(s) => s,
486        Err(e) => {
487            error!("Failed to setup in memory schema: {:?}", e);
488            std::process::exit(1);
489        }
490    };
491
492    let be = match setup_backend(config, &schema) {
493        Ok(be) => be,
494        Err(e) => {
495            error!("Failed to setup BE: {:?}", e);
496            return;
497        }
498    };
499
500    reindex_inner(be, schema, config).await;
501
502    info!("✅ Reindex Success!");
503}
504
505async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
506    // Reindex only the core schema attributes to bootstrap the process.
507    let mut be_wr_txn = match be.write() {
508        Ok(txn) => txn,
509        Err(err) => {
510            error!(
511                ?err,
512                "Unable to proceed, backend write transaction failure."
513            );
514            return;
515        }
516    };
517
518    let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
519
520    // Now that's done, setup a minimal qs and reindex from that.
521    if r.is_err() {
522        error!("Failed to reindex database: {:?}", r);
523        std::process::exit(1);
524    }
525    info!("Index Phase 1 Success!");
526
527    info!("Attempting to init query server ...");
528
529    let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
530        Ok(t) => t,
531        Err(e) => {
532            error!("Unable to setup query server or idm server -> {:?}", e);
533            return;
534        }
535    };
536    info!("Init Query Server Success!");
537
538    info!("Start Index Phase 2 ...");
539
540    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
541        error!("Unable to acquire write transaction");
542        return;
543    };
544    let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
545
546    match r {
547        Ok(_) => info!("Index Phase 2 Success!"),
548        Err(e) => {
549            error!("Reindex failed: {:?}", e);
550            std::process::exit(1);
551        }
552    };
553}
554
555pub fn vacuum_server_core(config: &Configuration) {
556    let schema = match Schema::new() {
557        Ok(s) => s,
558        Err(e) => {
559            eprintln!("Failed to setup in memory schema: {e:?}");
560            std::process::exit(1);
561        }
562    };
563
564    // The schema doesn't matter here. Vacuum is run as part of db open to avoid
565    // locking.
566    let r = setup_backend_vacuum(config, &schema, true);
567
568    match r {
569        Ok(_) => eprintln!("Vacuum Success!"),
570        Err(e) => {
571            eprintln!("Vacuum failed: {e:?}");
572            std::process::exit(1);
573        }
574    };
575}
576
577pub async fn domain_rename_core(config: &Configuration) {
578    let schema = match Schema::new() {
579        Ok(s) => s,
580        Err(e) => {
581            eprintln!("Failed to setup in memory schema: {e:?}");
582            std::process::exit(1);
583        }
584    };
585
586    // Start the backend.
587    let be = match setup_backend(config, &schema) {
588        Ok(be) => be,
589        Err(e) => {
590            error!("Failed to setup BE: {:?}", e);
591            return;
592        }
593    };
594
595    // Setup the qs, and perform any migrations and changes we may have.
596    let qs = match setup_qs(be, schema, config).await {
597        Ok(t) => t,
598        Err(e) => {
599            error!("Unable to setup query server -> {:?}", e);
600            return;
601        }
602    };
603
604    let new_domain_name = config.domain.as_str();
605
606    // make sure we're actually changing the domain name...
607    match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
608        Ok(old_domain_name) => {
609            admin_info!(?old_domain_name, ?new_domain_name);
610            if old_domain_name == new_domain_name {
611                admin_info!("Domain name not changing, stopping.");
612                return;
613            }
614            admin_debug!(
615                "Domain name is changing from {:?} to {:?}",
616                old_domain_name,
617                new_domain_name
618            );
619        }
620        Err(e) => {
621            admin_error!("Failed to query domain name, quitting! -> {:?}", e);
622            return;
623        }
624    }
625
626    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
627        error!("Unable to acquire write transaction");
628        return;
629    };
630    let r = qs_write
631        .danger_domain_rename(new_domain_name)
632        .and_then(|_| qs_write.commit());
633
634    match r {
635        Ok(_) => info!("Domain Rename Success!"),
636        Err(e) => {
637            error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
638            std::process::exit(1);
639        }
640    };
641}
642
643pub async fn verify_server_core(config: &Configuration) {
644    let curtime = duration_from_epoch_now();
645    // setup the qs - without initialise!
646    let schema_mem = match Schema::new() {
647        Ok(sc) => sc,
648        Err(e) => {
649            error!("Failed to setup in memory schema: {:?}", e);
650            return;
651        }
652    };
653    // Setup the be
654    let be = match setup_backend(config, &schema_mem) {
655        Ok(be) => be,
656        Err(e) => {
657            error!("Failed to setup BE: {:?}", e);
658            return;
659        }
660    };
661
662    let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
663        Ok(qs) => qs,
664        Err(err) => {
665            error!(?err, "Failed to setup query server");
666            return;
667        }
668    };
669
670    // Run verifications.
671    let r = server.verify().await;
672
673    if r.is_empty() {
674        eprintln!("Verification passed!");
675        std::process::exit(0);
676    } else {
677        for er in r {
678            error!("{:?}", er);
679        }
680        std::process::exit(1);
681    }
682
683    // Now add IDM server verifications?
684}
685
686pub fn cert_generate_core(config: &Configuration) {
687    // Get the cert root
688
689    let (tls_key_path, tls_chain_path) = match &config.tls_config {
690        Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
691        None => {
692            error!("Unable to find TLS configuration");
693            std::process::exit(1);
694        }
695    };
696
697    if tls_key_path.exists() && tls_chain_path.exists() {
698        info!(
699            "TLS key and chain already exist - remove them first if you intend to regenerate these"
700        );
701        return;
702    }
703
704    let origin_domain = match config.origin.domain() {
705        Some(val) => val,
706        None => {
707            error!("origin does not contain a valid domain");
708            std::process::exit(1);
709        }
710    };
711
712    let cert_root = match tls_key_path.parent() {
713        Some(parent) => parent,
714        None => {
715            error!("Unable to find parent directory of {:?}", tls_key_path);
716            std::process::exit(1);
717        }
718    };
719
720    let ca_cert = cert_root.join("ca.pem");
721    let ca_key = cert_root.join("cakey.pem");
722    let tls_cert_path = cert_root.join("cert.pem");
723
724    let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
725        // Generate the CA again.
726        let ca_handle = match crypto::build_ca(None) {
727            Ok(ca_handle) => ca_handle,
728            Err(e) => {
729                error!(err = ?e, "Failed to build CA");
730                std::process::exit(1);
731            }
732        };
733
734        if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
735            error!("Failed to write CA");
736            std::process::exit(1);
737        }
738
739        ca_handle
740    } else {
741        match crypto::load_ca(ca_key, ca_cert) {
742            Ok(ca_handle) => ca_handle,
743            Err(_) => {
744                error!("Failed to load CA");
745                std::process::exit(1);
746            }
747        }
748    };
749
750    if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
751        // Generate the cert from the ca.
752        let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
753            Ok(cert_handle) => cert_handle,
754            Err(e) => {
755                error!(err = ?e, "Failed to build certificate");
756                std::process::exit(1);
757            }
758        };
759
760        if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
761            error!("Failed to write certificates");
762            std::process::exit(1);
763        }
764    }
765    info!("certificate generation complete");
766}
767
768static MIGRATION_PATH_RE: LazyLock<Regex> = LazyLock::new(|| {
769    #[allow(clippy::expect_used)]
770    Regex::new("^\\d\\d-.*\\.h?json$").expect("Invalid SPN regex found")
771});
772
773struct ScimMigration {
774    path: PathBuf,
775    hash: Sha256Output,
776    assertions: ScimAssertGeneric,
777}
778
779#[instrument(
780    level = "info",
781    fields(uuid = ?eventid),
782    skip_all,
783)]
784async fn migration_apply(
785    eventid: Uuid,
786    server_write_ref: &'static QueryServerWriteV1,
787    migration_path: &Path,
788) {
789    if !migration_path.exists() {
790        info!(migration_path = %migration_path.display(), "Migration path does not exist - migrations will be skipped.");
791        return;
792    }
793
794    let mut dir_ents = match tokio::fs::read_dir(migration_path).await {
795        Ok(dir_ents) => dir_ents,
796        Err(err) => {
797            error!(?err, "Unable to read migration directory.");
798            let diag = kanidm_lib_file_permissions::diagnose_path(migration_path);
799            info!(%diag);
800            return;
801        }
802    };
803
804    let mut migration_paths = Vec::with_capacity(8);
805
806    loop {
807        match dir_ents.next_entry().await {
808            Ok(Some(dir_ent)) => migration_paths.push(dir_ent.path()),
809            Ok(None) => {
810                // Complete,
811                break;
812            }
813            Err(err) => {
814                error!(?err, "Unable to read directory entries.");
815                return;
816            }
817        }
818    }
819
820    // Filter these.
821
822    let mut migration_paths: Vec<_> = migration_paths.into_iter()
823        .filter(|path| {
824            if !path.is_file() {
825                info!(path = %path.display(), "ignoring path that is not a file.");
826                return false;
827            }
828
829            let Some(file_name) = path.file_name().and_then(std::ffi::OsStr::to_str) else {
830                info!(path = %path.display(), "ignoring path that has no file name, or is not a valid utf-8 file name.");
831                return false;
832            };
833
834            if !MIGRATION_PATH_RE.is_match(file_name) {
835                info!(path = %path.display(), "ignoring file that does not match naming pattern.");
836                info!("expected pattern 'XX-NAME.json' where XX are two numbers, followed by a hypen, with the file extension .json");
837                return false;
838            }
839
840            true
841        })
842        .collect();
843
844    migration_paths.sort_unstable();
845    let mut migrations = Vec::with_capacity(migration_paths.len());
846
847    for migration_path in migration_paths {
848        info!(path = %migration_path.display(), "examining migration");
849
850        let migration_content = match tokio::fs::read(&migration_path).await {
851            Ok(bytes) => bytes,
852            Err(err) => {
853                error!(?err, "Unable to read migration - it will be ignored.");
854                let diag = kanidm_lib_file_permissions::diagnose_path(&migration_path);
855                info!(%diag);
856                continue;
857            }
858        };
859
860        // Is it valid json?
861        let assertions: ScimAssertGeneric = match serde_hjson::from_slice(&migration_content) {
862            Ok(assertions) => assertions,
863            Err(err) => {
864                error!(?err, path = %migration_path.display(), "Invalid JSON SCIM Assertion");
865                continue;
866            }
867        };
868
869        // Hash the content.
870        let mut hasher = Sha256::new();
871        hasher.update(&migration_content);
872        let migration_hash: Sha256Output = hasher.finalize();
873
874        migrations.push(ScimMigration {
875            path: migration_path,
876            hash: migration_hash,
877            assertions,
878        });
879    }
880
881    let mut migration_ids = BTreeSet::new();
882    for migration in &migrations {
883        // BTreeSet returns false on duplicate value insertion.
884        if !migration_ids.insert(migration.assertions.id) {
885            error!(path = %migration.path.display(), uuid = ?migration.assertions.id, "Duplicate migration UUID found, refusing to proceed!!! All migrations must have a unique ID!!!");
886            return;
887        }
888    }
889
890    // Okay, we're setup to go - apply them all. Note that we do these
891    // separately, each migration occurs in its own transaction.
892    for ScimMigration {
893        path,
894        hash,
895        assertions,
896    } in migrations
897    {
898        if let Err(err) = server_write_ref
899            .handle_scim_migration_apply(eventid, assertions, hash)
900            .await
901        {
902            error!(?err, path = %path.display(), "Failed to apply migration");
903        };
904    }
905}
906
907#[derive(Clone, Debug)]
908pub enum CoreAction {
909    Shutdown,
910    Reload,
911}
912
913pub(crate) enum TaskName {
914    AdminSocket,
915    AuditdActor,
916    BackupActor,
917    DelayedActionActor,
918    HttpsServer,
919    IntervalActor,
920    LdapActor,
921    Replication,
922    TlsAcceptorReload,
923    MigrationReload,
924}
925
926impl Display for TaskName {
927    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
928        write!(
929            f,
930            "{}",
931            match self {
932                TaskName::AdminSocket => "Admin Socket",
933                TaskName::AuditdActor => "Auditd Actor",
934                TaskName::BackupActor => "Backup Actor",
935                TaskName::DelayedActionActor => "Delayed Action Actor",
936                TaskName::HttpsServer => "HTTPS Server",
937                TaskName::IntervalActor => "Interval Actor",
938                TaskName::LdapActor => "LDAP Acceptor Actor",
939                TaskName::Replication => "Replication",
940                TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
941                TaskName::MigrationReload => "Migration Reload Monitor",
942            }
943        )
944    }
945}
946
947pub struct CoreHandle {
948    clean_shutdown: bool,
949    tx: broadcast::Sender<CoreAction>,
950    /// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end.
951    handles: Vec<(TaskName, task::JoinHandle<()>)>,
952}
953
954impl CoreHandle {
955    pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
956        self.tx.subscribe()
957    }
958
959    pub async fn shutdown(&mut self) {
960        if self.tx.send(CoreAction::Shutdown).is_err() {
961            eprintln!("No receivers acked shutdown request. Treating as unclean.");
962            return;
963        }
964
965        // Wait on the handles.
966        while let Some((handle_name, handle)) = self.handles.pop() {
967            debug!("Waiting for {handle_name} ...");
968            if let Err(error) = handle.await {
969                eprintln!("Task {handle_name} failed to finish: {error:?}");
970            }
971        }
972
973        self.clean_shutdown = true;
974    }
975
976    pub async fn reload(&mut self) {
977        if self.tx.send(CoreAction::Reload).is_err() {
978            eprintln!("No receivers acked reload request.");
979        }
980    }
981}
982
983impl Drop for CoreHandle {
984    fn drop(&mut self) {
985        if !self.clean_shutdown {
986            eprintln!("⚠️  UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
987        }
988        // Can't enable yet until we clean up unix_int cache layer test
989        // debug_assert!(self.clean_shutdown);
990    }
991}
992
993pub async fn create_server_core(
994    config: Configuration,
995    config_test: bool,
996) -> Result<CoreHandle, ()> {
997    // Until this point, we probably want to write to the log macro fns.
998    let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
999
1000    if config.integration_test_config.is_some() {
1001        warn!("RUNNING IN INTEGRATION TEST MODE.");
1002        warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
1003    } else if config.tls_config.is_none() {
1004        // TLS is great! We won't run without it.
1005        error!("Running without TLS is not supported! Quitting!");
1006        return Err(());
1007    }
1008
1009    info!(
1010        "Starting kanidm with {}configuration: {}",
1011        if config_test { "TEST " } else { "" },
1012        config
1013    );
1014    // Setup umask, so that every we touch or create is secure.
1015    #[cfg(not(target_family = "windows"))]
1016    unsafe {
1017        umask(0o0027)
1018    };
1019
1020    // Similar, create a stats task which aggregates statistics from the
1021    // server as they come in.
1022    let status_ref = StatusActor::start();
1023
1024    // Setup TLS (if any)
1025    let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
1026        Ok(tls_acc) => tls_acc,
1027        Err(err) => {
1028            error!(?err, "Failed to configure TLS acceptor");
1029            return Err(());
1030        }
1031    };
1032
1033    let schema = match Schema::new() {
1034        Ok(s) => s,
1035        Err(e) => {
1036            error!("Failed to setup in memory schema: {:?}", e);
1037            return Err(());
1038        }
1039    };
1040
1041    // Setup the be for the qs.
1042    let be = match setup_backend(&config, &schema) {
1043        Ok(be) => be,
1044        Err(e) => {
1045            error!("Failed to setup BE -> {:?}", e);
1046            return Err(());
1047        }
1048    };
1049    // Start the IDM server.
1050    let (_qs, idms, mut idms_delayed, mut idms_audit) =
1051        match setup_qs_idms(be, schema, &config).await {
1052            Ok(t) => t,
1053            Err(e) => {
1054                error!("Unable to setup query server or idm server -> {:?}", e);
1055                return Err(());
1056            }
1057        };
1058
1059    // Extract any configuration from the IDMS that we may need.
1060    // For now we just do this per run, but we need to extract this from the db later.
1061    let jws_signer = match JwsHs256Signer::generate_hs256() {
1062        Ok(k) => k.set_sign_option_embed_kid(false),
1063        Err(e) => {
1064            error!("Unable to setup jws signer -> {:?}", e);
1065            return Err(());
1066        }
1067    };
1068
1069    // Any pre-start tasks here.
1070    if let Some(itc) = &config.integration_test_config {
1071        let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
1072            error!("Unable to acquire write transaction");
1073            return Err(());
1074        };
1075        // We need to set the admin pw.
1076        match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
1077            Ok(_) => {}
1078            Err(e) => {
1079                error!(
1080                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
1081                    &itc.admin_user, e
1082                );
1083                return Err(());
1084            }
1085        };
1086        // set the idm_admin account password
1087        match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
1088            Ok(_) => {}
1089            Err(e) => {
1090                error!(
1091                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
1092                    &itc.idm_admin_user, e
1093                );
1094                return Err(());
1095            }
1096        };
1097
1098        // Add admin to idm_admins to allow tests more flexibility wrt to permissions.
1099        // This way our default access controls can be stricter to prevent lateral
1100        // movement.
1101        match idms_prox_write.qs_write.internal_modify_uuid(
1102            UUID_IDM_ADMINS,
1103            &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
1104        ) {
1105            Ok(_) => {}
1106            Err(e) => {
1107                error!(
1108                    "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
1109                    e
1110                );
1111                return Err(());
1112            }
1113        };
1114
1115        match idms_prox_write.qs_write.internal_modify_uuid(
1116            UUID_IDM_ALL_PERSONS,
1117            &ModifyList::new_purge_and_set(
1118                Attribute::CredentialTypeMinimum,
1119                CredentialType::Any.into(),
1120            ),
1121        ) {
1122            Ok(_) => {}
1123            Err(e) => {
1124                error!(
1125                    "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
1126                    e
1127                );
1128                return Err(());
1129            }
1130        };
1131
1132        match idms_prox_write.commit() {
1133            Ok(_) => {}
1134            Err(e) => {
1135                error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
1136                return Err(());
1137            }
1138        }
1139    }
1140
1141    let ldap = match LdapServer::new(&idms).await {
1142        Ok(l) => l,
1143        Err(e) => {
1144            error!("Unable to start LdapServer -> {:?}", e);
1145            return Err(());
1146        }
1147    };
1148
1149    // Arc the idms and ldap
1150    let idms_arc = Arc::new(idms);
1151    let ldap_arc = Arc::new(ldap);
1152
1153    // Pass it to the actor for threading.
1154    // Start the read query server with the given be path: future config
1155    let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
1156
1157    // Create the server async write entry point.
1158    let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
1159
1160    let delayed_handle = task::spawn(async move {
1161        let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
1162        loop {
1163            tokio::select! {
1164                added = idms_delayed.recv_many(&mut buffer) => {
1165                    if added == 0 {
1166                        // Channel has closed, stop the task.
1167                        break
1168                    }
1169                    server_write_ref.handle_delayedaction(&mut buffer).await;
1170                }
1171                Ok(action) = broadcast_rx.recv() => {
1172                    match action {
1173                        CoreAction::Shutdown => break,
1174                        CoreAction::Reload => {},
1175                    }
1176                }
1177            }
1178        }
1179        info!("Stopped {}", TaskName::DelayedActionActor);
1180    });
1181
1182    let mut broadcast_rx = broadcast_tx.subscribe();
1183
1184    let auditd_handle = task::spawn(async move {
1185        loop {
1186            tokio::select! {
1187                Ok(action) = broadcast_rx.recv() => {
1188                    match action {
1189                        CoreAction::Shutdown => break,
1190                        CoreAction::Reload => {},
1191                    }
1192                }
1193                audit_event = idms_audit.audit_rx().recv() => {
1194                    match serde_json::to_string(&audit_event) {
1195                        Ok(audit_event) => {
1196                            warn!(%audit_event);
1197                        }
1198                        Err(e) => {
1199                            error!(err=?e, "Unable to process audit event to json.");
1200                            warn!(?audit_event, json=false);
1201                        }
1202                    }
1203
1204                }
1205            }
1206        }
1207        info!("Stopped {}", TaskName::AuditdActor);
1208    });
1209
1210    // Run the migrations *once*, only in production though.
1211    let migration_path = config
1212        .migration_path
1213        .clone()
1214        .unwrap_or(PathBuf::from(env!("KANIDM_SERVER_MIGRATION_PATH")));
1215
1216    if config.integration_test_config.is_none() {
1217        let eventid = Uuid::new_v4();
1218        migration_apply(eventid, server_write_ref, migration_path.as_path()).await;
1219    }
1220
1221    // Setup the Migration Reload Trigger.
1222    let mut broadcast_rx = broadcast_tx.subscribe();
1223    let migration_reload_handle = task::spawn(async move {
1224        loop {
1225            tokio::select! {
1226                Ok(action) = broadcast_rx.recv() => {
1227                    match action {
1228                        CoreAction::Shutdown => break,
1229                        CoreAction::Reload => {
1230                            // Read the migrations.
1231                            // Apply them.
1232                            let eventid = Uuid::new_v4();
1233                            migration_apply(
1234                                eventid,
1235                                server_write_ref,
1236                                migration_path.as_path(),
1237                            ).await;
1238
1239                            info!("Migration reload complete");
1240                        },
1241                    }
1242                }
1243            }
1244        }
1245        info!("Stopped {}", TaskName::MigrationReload);
1246    });
1247
1248    // Setup a TLS Acceptor Reload trigger.
1249
1250    let mut broadcast_rx = broadcast_tx.subscribe();
1251    let tls_config = config.tls_config.clone();
1252
1253    let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1254    let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1255
1256    let tls_acceptor_reload_handle = task::spawn(async move {
1257        loop {
1258            tokio::select! {
1259                Ok(action) = broadcast_rx.recv() => {
1260                    match action {
1261                        CoreAction::Shutdown => break,
1262                        CoreAction::Reload => {
1263                            let tls_acceptor = match crypto::setup_tls(&tls_config) {
1264                                Ok(Some(tls_acc)) => tls_acc,
1265                                Ok(None) => {
1266                                    warn!("TLS not configured, ignoring reload request.");
1267                                    continue;
1268                                }
1269                                Err(err) => {
1270                                    error!(?err, "Failed to configure and reload TLS acceptor");
1271                                    continue;
1272                                }
1273                            };
1274
1275                            // We don't log here as the receivers will notify when they have completed
1276                            // the reload.
1277                            if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1278                                error!("TLS acceptor did not accept the reload, the server may have failed!");
1279                            };
1280                            info!("TLS acceptor reload notification sent");
1281                        },
1282                    }
1283                }
1284            }
1285        }
1286        info!("Stopped {}", TaskName::TlsAcceptorReload);
1287    });
1288
1289    // Setup timed events associated to the write thread
1290    let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1291    // Setup timed events associated to the read thread
1292    let maybe_backup_handle = match &config.online_backup {
1293        Some(online_backup_config) => {
1294            if online_backup_config.enabled {
1295                let handle = IntervalActor::start_online_backup(
1296                    server_read_ref,
1297                    online_backup_config,
1298                    broadcast_tx.subscribe(),
1299                )?;
1300                Some(handle)
1301            } else {
1302                debug!("Backups disabled");
1303                None
1304            }
1305        }
1306        None => {
1307            debug!("Online backup not requested, skipping");
1308            None
1309        }
1310    };
1311
1312    // If we have been requested to init LDAP, configure it now.
1313    let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1314        Some(la) => {
1315            let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1316
1317            let h = ldaps::create_ldap_server(
1318                la,
1319                opt_ldap_ssl_acceptor,
1320                server_read_ref,
1321                &broadcast_tx,
1322                &tls_acceptor_reload_tx,
1323                config.ldap_client_address_info.trusted_tcp_info(),
1324            )
1325            .await?;
1326            Some(h)
1327        }
1328        None => {
1329            debug!("LDAP not requested, skipping");
1330            None
1331        }
1332    };
1333
1334    // If we have replication configured, setup the listener with its initial replication
1335    // map (if any).
1336    let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1337        Some(rc) => {
1338            if !config_test {
1339                // ⚠️  only start the sockets and listeners in non-config-test modes.
1340                let (h, repl_ctrl_tx) =
1341                    repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1342                        .await?;
1343                (Some(h), Some(repl_ctrl_tx))
1344            } else {
1345                (None, None)
1346            }
1347        }
1348        None => {
1349            debug!("Replication not requested, skipping");
1350            (None, None)
1351        }
1352    };
1353
1354    let maybe_http_acceptor_handles = if config_test {
1355        admin_info!("This config rocks! 🪨 ");
1356        None
1357    } else {
1358        let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1359            config.clone(),
1360            jws_signer,
1361            status_ref,
1362            server_write_ref,
1363            server_read_ref,
1364            broadcast_tx.clone(),
1365            maybe_tls_acceptor,
1366            &tls_acceptor_reload_tx,
1367        )
1368        .await
1369        .inspect_err(|err| {
1370            error!(?err, "Failed to start HTTPS server");
1371        })?;
1372
1373        if config.role != ServerRole::WriteReplicaNoUI {
1374            admin_info!("ready to rock! 🪨  UI available at: {}", config.origin);
1375        } else {
1376            admin_info!("ready to rock! 🪨 ");
1377        }
1378        Some(handles)
1379    };
1380
1381    // If we are NOT in integration test mode, start the admin socket now
1382    let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1383        let broadcast_tx_ = broadcast_tx.clone();
1384
1385        let admin_handle = AdminActor::create_admin_sock(
1386            config.adminbindpath.as_str(),
1387            server_write_ref,
1388            server_read_ref,
1389            broadcast_tx_,
1390            maybe_repl_ctrl_tx,
1391        )
1392        .await?;
1393
1394        Some(admin_handle)
1395    } else {
1396        None
1397    };
1398
1399    let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1400        (TaskName::IntervalActor, interval_handle),
1401        (TaskName::DelayedActionActor, delayed_handle),
1402        (TaskName::AuditdActor, auditd_handle),
1403        (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1404        (TaskName::MigrationReload, migration_reload_handle),
1405    ];
1406
1407    if let Some(backup_handle) = maybe_backup_handle {
1408        handles.push((TaskName::BackupActor, backup_handle))
1409    }
1410
1411    if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1412        handles.push((TaskName::AdminSocket, admin_sock_handle))
1413    }
1414
1415    if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1416        for ldap_handle in ldap_handles {
1417            handles.push((TaskName::LdapActor, ldap_handle))
1418        }
1419    }
1420
1421    if let Some(http_handles) = maybe_http_acceptor_handles {
1422        for http_handle in http_handles {
1423            handles.push((TaskName::HttpsServer, http_handle))
1424        }
1425    }
1426
1427    if let Some(repl_handle) = maybe_repl_handle {
1428        handles.push((TaskName::Replication, repl_handle))
1429    }
1430
1431    Ok(CoreHandle {
1432        clean_shutdown: false,
1433        tx: broadcast_tx,
1434        handles,
1435    })
1436}