kanidmd_core/
lib.rs

1//! These contain the server "cores". These are able to startup the server
2//! (bootstrap) to a running state and then execute tasks. This is where modules
3//! are logically ordered based on their depenedncies for execution. Some of these
4//! are task-only i.e. reindexing, and some of these launch the server into a
5//! fully operational state (https, ldap, etc).
6//!
7//! Generally, this is the "entry point" where the server begins to run, and
8//! the entry point for all client traffic which is then directed to the
9//! various `actors`.
10
11#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23#![deny(clippy::indexing_slicing)]
24
25#[macro_use]
26extern crate tracing;
27#[macro_use]
28extern crate kanidmd_lib;
29
30mod actors;
31pub mod admin;
32pub mod config;
33mod crypto;
34mod https;
35mod interval;
36mod ldaps;
37mod repl;
38mod tcp;
39mod utils;
40
41use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
42use crate::admin::AdminActor;
43use crate::config::Configuration;
44use crate::interval::IntervalActor;
45use crate::utils::touch_file_or_quit;
46use compact_jwt::{JwsHs256Signer, JwsSigner};
47use crypto_glue::{
48    s256::{Sha256, Sha256Output},
49    traits::Digest,
50};
51use kanidm_proto::backup::BackupCompression;
52use kanidm_proto::config::ServerRole;
53use kanidm_proto::internal::OperationError;
54use kanidm_proto::scim_v1::client::ScimAssertGeneric;
55use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
56use kanidmd_lib::idm::ldap::LdapServer;
57use kanidmd_lib::prelude::*;
58use kanidmd_lib::schema::Schema;
59use kanidmd_lib::status::StatusActor;
60use kanidmd_lib::value::CredentialType;
61use regex::Regex;
62use std::collections::BTreeSet;
63use std::fmt::{Display, Formatter};
64use std::path::Path;
65use std::path::PathBuf;
66use std::sync::Arc;
67use std::sync::LazyLock;
68use tokio::sync::broadcast;
69use tokio::task;
70
71#[cfg(not(target_family = "windows"))]
72use libc::umask;
73
74// === internal setup helpers
75
76fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
77    setup_backend_vacuum(config, schema, false)
78}
79
80fn setup_backend_vacuum(
81    config: &Configuration,
82    schema: &Schema,
83    vacuum: bool,
84) -> Result<Backend, OperationError> {
85    // Limit the scope of the schema txn.
86    // let schema_txn = task::block_on(schema.write());
87    let schema_txn = schema.write();
88    let idxmeta = schema_txn.reload_idxmeta();
89
90    let pool_size: u32 = config.threads as u32;
91
92    let cfg = BackendConfig::new(
93        config.db_path.as_deref(),
94        pool_size,
95        config.db_fs_type.unwrap_or_default(),
96        config.db_arc_size,
97    );
98
99    Backend::new(cfg, idxmeta, vacuum)
100}
101
102// TODO #54: We could move most of the be/schema/qs setup and startup
103// outside of this call, then pass in "what we need" in a cloneable
104// form, this way we could have separate Idm vs Qs threads, and dedicated
105// threads for write vs read
106async fn setup_qs_idms(
107    be: Backend,
108    schema: Schema,
109    config: &Configuration,
110) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
111    let curtime = duration_from_epoch_now();
112    // Create a query_server implementation
113    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
114
115    // TODO #62: Should the IDM parts be broken out to the IdmServer?
116    // What's important about this initial setup here is that it also triggers
117    // the schema and acp reload, so they are now configured correctly!
118    // Initialise the schema core.
119    //
120    // Now search for the schema itself, and validate that the system
121    // in memory matches the BE on disk, and that it's syntactically correct.
122    // Write it out if changes are needed.
123    query_server
124        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
125        .await?;
126
127    // We generate a SINGLE idms only!
128    let is_integration_test = config.integration_test_config.is_some();
129    let (idms, idms_delayed, idms_audit) = IdmServer::new(
130        query_server.clone(),
131        &config.origin,
132        is_integration_test,
133        curtime,
134    )
135    .await?;
136
137    Ok((query_server, idms, idms_delayed, idms_audit))
138}
139
140async fn setup_qs(
141    be: Backend,
142    schema: Schema,
143    config: &Configuration,
144) -> Result<QueryServer, OperationError> {
145    let curtime = duration_from_epoch_now();
146    // Create a query_server implementation
147    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
148
149    // TODO #62: Should the IDM parts be broken out to the IdmServer?
150    // What's important about this initial setup here is that it also triggers
151    // the schema and acp reload, so they are now configured correctly!
152    // Initialise the schema core.
153    //
154    // Now search for the schema itself, and validate that the system
155    // in memory matches the BE on disk, and that it's syntactically correct.
156    // Write it out if changes are needed.
157    query_server
158        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
159        .await?;
160
161    Ok(query_server)
162}
163
164macro_rules! dbscan_setup_be {
165    (
166        $config:expr
167    ) => {{
168        let schema = match Schema::new() {
169            Ok(s) => s,
170            Err(e) => {
171                error!("Failed to setup in memory schema: {:?}", e);
172                std::process::exit(1);
173            }
174        };
175
176        match setup_backend($config, &schema) {
177            Ok(be) => be,
178            Err(e) => {
179                error!("Failed to setup BE: {:?}", e);
180                return;
181            }
182        }
183    }};
184}
185
186pub fn dbscan_list_indexes_core(config: &Configuration) {
187    let be = dbscan_setup_be!(config);
188    let mut be_rotxn = match be.read() {
189        Ok(txn) => txn,
190        Err(err) => {
191            error!(?err, "Unable to proceed, backend read transaction failure.");
192            return;
193        }
194    };
195
196    match be_rotxn.list_indexes() {
197        Ok(mut idx_list) => {
198            idx_list.sort_unstable();
199            idx_list.iter().for_each(|idx_name| {
200                println!("{idx_name}");
201            })
202        }
203        Err(e) => {
204            error!("Failed to retrieve index list: {:?}", e);
205        }
206    };
207}
208
209pub fn dbscan_list_id2entry_core(config: &Configuration) {
210    let be = dbscan_setup_be!(config);
211    let mut be_rotxn = match be.read() {
212        Ok(txn) => txn,
213        Err(err) => {
214            error!(?err, "Unable to proceed, backend read transaction failure.");
215            return;
216        }
217    };
218
219    match be_rotxn.list_id2entry() {
220        Ok(mut id_list) => {
221            id_list.sort_unstable_by_key(|k| k.0);
222            id_list.iter().for_each(|(id, value)| {
223                println!("{id:>8}: {value}");
224            })
225        }
226        Err(e) => {
227            error!("Failed to retrieve id2entry list: {:?}", e);
228        }
229    };
230}
231
232pub fn dbscan_list_index_analysis_core(config: &Configuration) {
233    let _be = dbscan_setup_be!(config);
234    // TBD in after slopes merge.
235}
236
237pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
238    let be = dbscan_setup_be!(config);
239    let mut be_rotxn = match be.read() {
240        Ok(txn) => txn,
241        Err(err) => {
242            error!(?err, "Unable to proceed, backend read transaction failure.");
243            return;
244        }
245    };
246
247    match be_rotxn.list_index_content(index_name) {
248        Ok(mut idx_list) => {
249            idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
250            idx_list.iter().for_each(|(key, value)| {
251                println!("{key:>50}: {value:?}");
252            })
253        }
254        Err(e) => {
255            error!("Failed to retrieve index list: {:?}", e);
256        }
257    };
258}
259
260pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
261    let be = dbscan_setup_be!(config);
262    let mut be_rotxn = match be.read() {
263        Ok(txn) => txn,
264        Err(err) => {
265            error!(?err, "Unable to proceed, backend read transaction failure.");
266            return;
267        }
268    };
269
270    match be_rotxn.get_id2entry(id) {
271        Ok((id, value)) => println!("{id:>8}: {value}"),
272        Err(e) => {
273            error!("Failed to retrieve id2entry value: {:?}", e);
274        }
275    };
276}
277
278pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
279    let be = dbscan_setup_be!(config);
280    let mut be_wrtxn = match be.write() {
281        Ok(txn) => txn,
282        Err(err) => {
283            error!(
284                ?err,
285                "Unable to proceed, backend write transaction failure."
286            );
287            return;
288        }
289    };
290
291    match be_wrtxn
292        .quarantine_entry(id)
293        .and_then(|_| be_wrtxn.commit())
294    {
295        Ok(()) => {
296            println!("quarantined - {id:>8}")
297        }
298        Err(e) => {
299            error!("Failed to quarantine id2entry value: {:?}", e);
300        }
301    };
302}
303
304pub fn dbscan_list_quarantined_core(config: &Configuration) {
305    let be = dbscan_setup_be!(config);
306    let mut be_rotxn = match be.read() {
307        Ok(txn) => txn,
308        Err(err) => {
309            error!(?err, "Unable to proceed, backend read transaction failure.");
310            return;
311        }
312    };
313
314    match be_rotxn.list_quarantined() {
315        Ok(mut id_list) => {
316            id_list.sort_unstable_by_key(|k| k.0);
317            id_list.iter().for_each(|(id, value)| {
318                println!("{id:>8}: {value}");
319            })
320        }
321        Err(e) => {
322            error!("Failed to retrieve id2entry list: {:?}", e);
323        }
324    };
325}
326
327pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
328    let be = dbscan_setup_be!(config);
329    let mut be_wrtxn = match be.write() {
330        Ok(txn) => txn,
331        Err(err) => {
332            error!(
333                ?err,
334                "Unable to proceed, backend write transaction failure."
335            );
336            return;
337        }
338    };
339
340    match be_wrtxn
341        .restore_quarantined(id)
342        .and_then(|_| be_wrtxn.commit())
343    {
344        Ok(()) => {
345            println!("restored - {id:>8}")
346        }
347        Err(e) => {
348            error!("Failed to restore quarantined id2entry value: {:?}", e);
349        }
350    };
351}
352
353pub fn backup_server_core(config: &Configuration, dst_path: Option<&Path>) {
354    let schema = match Schema::new() {
355        Ok(s) => s,
356        Err(e) => {
357            error!("Failed to setup in memory schema: {:?}", e);
358            std::process::exit(1);
359        }
360    };
361
362    let be = match setup_backend(config, &schema) {
363        Ok(be) => be,
364        Err(e) => {
365            error!("Failed to setup BE: {:?}", e);
366            return;
367        }
368    };
369
370    let mut be_ro_txn = match be.read() {
371        Ok(txn) => txn,
372        Err(err) => {
373            error!(?err, "Unable to proceed, backend read transaction failure.");
374            return;
375        }
376    };
377
378    let compression = match config.online_backup.as_ref() {
379        Some(backup_config) => backup_config.compression,
380        None => BackupCompression::default(),
381    };
382
383    if let Some(dst_path) = dst_path {
384        if dst_path.exists() {
385            error!(
386                "backup file {} already exists, will not overwrite it.",
387                dst_path.display()
388            );
389            return;
390        }
391
392        let output = match std::fs::File::create(dst_path) {
393            Ok(output) => output,
394            Err(err) => {
395                error!(?err, "File::create error creating {}", dst_path.display());
396                return;
397            }
398        };
399
400        match be_ro_txn.backup(output, compression) {
401            Ok(_) => info!("Backup success!"),
402            Err(e) => {
403                error!("Backup failed: {:?}", e);
404                std::process::exit(1);
405            }
406        };
407    } else {
408        // No path set, default to stdout
409        let stdout = std::io::stdout().lock();
410
411        match be_ro_txn.backup(stdout, compression) {
412            Ok(_) => info!("Backup success!"),
413            Err(e) => {
414                error!("Backup failed: {:?}", e);
415                std::process::exit(1);
416            }
417        };
418    };
419    // Let the txn abort, even on success.
420}
421
422pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
423    // If it's an in memory database, we don't need to touch anything
424    if let Some(db_path) = config.db_path.as_ref() {
425        touch_file_or_quit(db_path);
426    }
427
428    // First, we provide the in-memory schema so that core attrs are indexed correctly.
429    let schema = match Schema::new() {
430        Ok(s) => s,
431        Err(e) => {
432            error!("Failed to setup in memory schema: {:?}", e);
433            std::process::exit(1);
434        }
435    };
436
437    let be = match setup_backend(config, &schema) {
438        Ok(be) => be,
439        Err(e) => {
440            error!("Failed to setup backend: {:?}", e);
441            return;
442        }
443    };
444
445    let mut be_wr_txn = match be.write() {
446        Ok(txn) => txn,
447        Err(err) => {
448            error!(
449                ?err,
450                "Unable to proceed, backend write transaction failure."
451            );
452            return;
453        }
454    };
455
456    let compression = BackupCompression::identify_file(dst_path);
457
458    let input = match std::fs::File::open(dst_path) {
459        Ok(output) => output,
460        Err(err) => {
461            error!(?err, "File::open error reading {}", dst_path.display());
462            return;
463        }
464    };
465
466    let r = be_wr_txn
467        .restore(input, compression)
468        .and_then(|_| be_wr_txn.commit());
469
470    if r.is_err() {
471        error!("Failed to restore database: {:?}", r);
472        std::process::exit(1);
473    }
474    info!("Database loaded successfully");
475
476    reindex_inner(be, schema, config).await;
477
478    info!("✅ Restore Success!");
479}
480
481pub async fn reindex_server_core(config: &Configuration) {
482    info!("Start Index Phase 1 ...");
483    // First, we provide the in-memory schema so that core attrs are indexed correctly.
484    let schema = match Schema::new() {
485        Ok(s) => s,
486        Err(e) => {
487            error!("Failed to setup in memory schema: {:?}", e);
488            std::process::exit(1);
489        }
490    };
491
492    let be = match setup_backend(config, &schema) {
493        Ok(be) => be,
494        Err(e) => {
495            error!("Failed to setup BE: {:?}", e);
496            return;
497        }
498    };
499
500    reindex_inner(be, schema, config).await;
501
502    info!("✅ Reindex Success!");
503}
504
505async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
506    // Reindex only the core schema attributes to bootstrap the process.
507    let mut be_wr_txn = match be.write() {
508        Ok(txn) => txn,
509        Err(err) => {
510            error!(
511                ?err,
512                "Unable to proceed, backend write transaction failure."
513            );
514            return;
515        }
516    };
517
518    let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
519
520    // Now that's done, setup a minimal qs and reindex from that.
521    if r.is_err() {
522        error!("Failed to reindex database: {:?}", r);
523        std::process::exit(1);
524    }
525    info!("Index Phase 1 Success!");
526
527    info!("Attempting to init query server ...");
528
529    let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
530        Ok(t) => t,
531        Err(e) => {
532            error!("Unable to setup query server or idm server -> {:?}", e);
533            return;
534        }
535    };
536    info!("Init Query Server Success!");
537
538    info!("Start Index Phase 2 ...");
539
540    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
541        error!("Unable to acquire write transaction");
542        return;
543    };
544    let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
545
546    match r {
547        Ok(_) => info!("Index Phase 2 Success!"),
548        Err(e) => {
549            error!("Reindex failed: {:?}", e);
550            std::process::exit(1);
551        }
552    };
553}
554
555pub fn vacuum_server_core(config: &Configuration) {
556    let schema = match Schema::new() {
557        Ok(s) => s,
558        Err(e) => {
559            eprintln!("Failed to setup in memory schema: {e:?}");
560            std::process::exit(1);
561        }
562    };
563
564    // The schema doesn't matter here. Vacuum is run as part of db open to avoid
565    // locking.
566    let r = setup_backend_vacuum(config, &schema, true);
567
568    match r {
569        Ok(_) => eprintln!("Vacuum Success!"),
570        Err(e) => {
571            eprintln!("Vacuum failed: {e:?}");
572            std::process::exit(1);
573        }
574    };
575}
576
577pub async fn domain_rename_core(config: &Configuration) {
578    let schema = match Schema::new() {
579        Ok(s) => s,
580        Err(e) => {
581            eprintln!("Failed to setup in memory schema: {e:?}");
582            std::process::exit(1);
583        }
584    };
585
586    // Start the backend.
587    let be = match setup_backend(config, &schema) {
588        Ok(be) => be,
589        Err(e) => {
590            error!("Failed to setup BE: {:?}", e);
591            return;
592        }
593    };
594
595    // Setup the qs, and perform any migrations and changes we may have.
596    let qs = match setup_qs(be, schema, config).await {
597        Ok(t) => t,
598        Err(e) => {
599            error!("Unable to setup query server -> {:?}", e);
600            return;
601        }
602    };
603
604    let new_domain_name = config.domain.as_str();
605
606    // make sure we're actually changing the domain name...
607    match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
608        Ok(old_domain_name) => {
609            admin_info!(?old_domain_name, ?new_domain_name);
610            if old_domain_name == new_domain_name {
611                admin_info!("Domain name not changing, stopping.");
612                return;
613            }
614            admin_debug!(
615                "Domain name is changing from {:?} to {:?}",
616                old_domain_name,
617                new_domain_name
618            );
619        }
620        Err(e) => {
621            admin_error!("Failed to query domain name, quitting! -> {:?}", e);
622            return;
623        }
624    }
625
626    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
627        error!("Unable to acquire write transaction");
628        return;
629    };
630    let r = qs_write
631        .danger_domain_rename(new_domain_name)
632        .and_then(|_| qs_write.commit());
633
634    match r {
635        Ok(_) => info!("Domain Rename Success!"),
636        Err(e) => {
637            error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
638            std::process::exit(1);
639        }
640    };
641}
642
643pub async fn verify_server_core(config: &Configuration) {
644    let curtime = duration_from_epoch_now();
645    // setup the qs - without initialise!
646    let schema_mem = match Schema::new() {
647        Ok(sc) => sc,
648        Err(e) => {
649            error!("Failed to setup in memory schema: {:?}", e);
650            return;
651        }
652    };
653    // Setup the be
654    let be = match setup_backend(config, &schema_mem) {
655        Ok(be) => be,
656        Err(e) => {
657            error!("Failed to setup BE: {:?}", e);
658            return;
659        }
660    };
661
662    let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
663        Ok(qs) => qs,
664        Err(err) => {
665            error!(?err, "Failed to setup query server");
666            return;
667        }
668    };
669
670    // Run verifications.
671    let r = server.verify().await;
672
673    if r.is_empty() {
674        eprintln!("Verification passed!");
675        std::process::exit(0);
676    } else {
677        for er in r {
678            error!("{:?}", er);
679        }
680        std::process::exit(1);
681    }
682
683    // Now add IDM server verifications?
684}
685
686pub fn cert_generate_core(config: &Configuration) {
687    // Get the cert root
688
689    let (tls_key_path, tls_chain_path) = match &config.tls_config {
690        Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
691        None => {
692            error!("Unable to find TLS configuration");
693            std::process::exit(1);
694        }
695    };
696
697    if tls_key_path.exists() && tls_chain_path.exists() {
698        info!(
699            "TLS key and chain already exist - remove them first if you intend to regenerate these"
700        );
701        return;
702    }
703
704    let origin_domain = match config.origin.domain() {
705        Some(val) => val,
706        None => {
707            error!("origin does not contain a valid domain");
708            std::process::exit(1);
709        }
710    };
711
712    let cert_root = match tls_key_path.parent() {
713        Some(parent) => parent,
714        None => {
715            error!("Unable to find parent directory of {:?}", tls_key_path);
716            std::process::exit(1);
717        }
718    };
719
720    let ca_cert = cert_root.join("ca.pem");
721    let ca_key = cert_root.join("cakey.pem");
722    let tls_cert_path = cert_root.join("cert.pem");
723
724    let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
725        // Generate the CA again.
726        let ca_handle = match crypto::build_ca(None) {
727            Ok(ca_handle) => ca_handle,
728            Err(e) => {
729                error!(err = ?e, "Failed to build CA");
730                std::process::exit(1);
731            }
732        };
733
734        if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
735            error!("Failed to write CA");
736            std::process::exit(1);
737        }
738
739        ca_handle
740    } else {
741        match crypto::load_ca(ca_key, ca_cert) {
742            Ok(ca_handle) => ca_handle,
743            Err(_) => {
744                error!("Failed to load CA");
745                std::process::exit(1);
746            }
747        }
748    };
749
750    if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
751        // Generate the cert from the ca.
752        let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
753            Ok(cert_handle) => cert_handle,
754            Err(e) => {
755                error!(err = ?e, "Failed to build certificate");
756                std::process::exit(1);
757            }
758        };
759
760        if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
761            error!("Failed to write certificates");
762            std::process::exit(1);
763        }
764    }
765    info!("certificate generation complete");
766}
767
768static MIGRATION_PATH_RE: LazyLock<Regex> = LazyLock::new(|| {
769    #[allow(clippy::expect_used)]
770    Regex::new("^\\d\\d-.*\\.h?json$").expect("Invalid SPN regex found")
771});
772
773struct ScimMigration {
774    path: PathBuf,
775    hash: Sha256Output,
776    assertions: ScimAssertGeneric,
777}
778
779#[instrument(
780    level = "info",
781    fields(uuid = ?eventid),
782    skip_all,
783)]
784async fn migration_apply(
785    eventid: Uuid,
786    server_write_ref: &'static QueryServerWriteV1,
787    migration_path: &Path,
788) {
789    let mut dir_ents = match tokio::fs::read_dir(migration_path).await {
790        Ok(dir_ents) => dir_ents,
791        Err(err) => {
792            error!(?err, "Unable to read migration directory.");
793            let diag = kanidm_lib_file_permissions::diagnose_path(migration_path);
794            info!(%diag);
795            return;
796        }
797    };
798
799    let mut migration_paths = Vec::with_capacity(8);
800
801    loop {
802        match dir_ents.next_entry().await {
803            Ok(Some(dir_ent)) => migration_paths.push(dir_ent.path()),
804            Ok(None) => {
805                // Complete,
806                break;
807            }
808            Err(err) => {
809                error!(?err, "Unable to read directory entries.");
810                return;
811            }
812        }
813    }
814
815    // Filter these.
816
817    let mut migration_paths: Vec<_> = migration_paths.into_iter()
818        .filter(|path| {
819            if !path.is_file() {
820                info!(path = %path.display(), "ignoring path that is not a file.");
821                return false;
822            }
823
824            let Some(file_name) = path.file_name().and_then(std::ffi::OsStr::to_str) else {
825                info!(path = %path.display(), "ignoring path that has no file name, or is not a valid utf-8 file name.");
826                return false;
827            };
828
829            if !MIGRATION_PATH_RE.is_match(file_name) {
830                info!(path = %path.display(), "ignoring file that does not match naming pattern.");
831                info!("expected pattern 'XX-NAME.json' where XX are two numbers, followed by a hypen, with the file extension .json");
832                return false;
833            }
834
835            true
836        })
837        .collect();
838
839    migration_paths.sort_unstable();
840    let mut migrations = Vec::with_capacity(migration_paths.len());
841
842    for migration_path in migration_paths {
843        info!(path = %migration_path.display(), "examining migration");
844
845        let migration_content = match tokio::fs::read(&migration_path).await {
846            Ok(bytes) => bytes,
847            Err(err) => {
848                error!(?err, "Unable to read migration - it will be ignored.");
849                let diag = kanidm_lib_file_permissions::diagnose_path(&migration_path);
850                info!(%diag);
851                continue;
852            }
853        };
854
855        // Is it valid json?
856        let assertions: ScimAssertGeneric = match serde_hjson::from_slice(&migration_content) {
857            Ok(assertions) => assertions,
858            Err(err) => {
859                error!(?err, path = %migration_path.display(), "Invalid JSON SCIM Assertion");
860                continue;
861            }
862        };
863
864        // Hash the content.
865        let mut hasher = Sha256::new();
866        hasher.update(&migration_content);
867        let migration_hash: Sha256Output = hasher.finalize();
868
869        migrations.push(ScimMigration {
870            path: migration_path,
871            hash: migration_hash,
872            assertions,
873        });
874    }
875
876    let mut migration_ids = BTreeSet::new();
877    for migration in &migrations {
878        // BTreeSet returns false on duplicate value insertion.
879        if !migration_ids.insert(migration.assertions.id) {
880            error!(path = %migration.path.display(), uuid = ?migration.assertions.id, "Duplicate migration UUID found, refusing to proceed!!! All migrations must have a unique ID!!!");
881            return;
882        }
883    }
884
885    // Okay, we're setup to go - apply them all. Note that we do these
886    // separately, each migration occurs in its own transaction.
887    for ScimMigration {
888        path,
889        hash,
890        assertions,
891    } in migrations
892    {
893        if let Err(err) = server_write_ref
894            .handle_scim_migration_apply(eventid, assertions, hash)
895            .await
896        {
897            error!(?err, path = %path.display(), "Failed to apply migration");
898        };
899    }
900}
901
902#[derive(Clone, Debug)]
903pub enum CoreAction {
904    Shutdown,
905    Reload,
906}
907
908pub(crate) enum TaskName {
909    AdminSocket,
910    AuditdActor,
911    BackupActor,
912    DelayedActionActor,
913    HttpsServer,
914    IntervalActor,
915    LdapActor,
916    Replication,
917    TlsAcceptorReload,
918    MigrationReload,
919}
920
921impl Display for TaskName {
922    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
923        write!(
924            f,
925            "{}",
926            match self {
927                TaskName::AdminSocket => "Admin Socket",
928                TaskName::AuditdActor => "Auditd Actor",
929                TaskName::BackupActor => "Backup Actor",
930                TaskName::DelayedActionActor => "Delayed Action Actor",
931                TaskName::HttpsServer => "HTTPS Server",
932                TaskName::IntervalActor => "Interval Actor",
933                TaskName::LdapActor => "LDAP Acceptor Actor",
934                TaskName::Replication => "Replication",
935                TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
936                TaskName::MigrationReload => "Migration Reload Monitor",
937            }
938        )
939    }
940}
941
942pub struct CoreHandle {
943    clean_shutdown: bool,
944    tx: broadcast::Sender<CoreAction>,
945    /// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end.
946    handles: Vec<(TaskName, task::JoinHandle<()>)>,
947}
948
949impl CoreHandle {
950    pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
951        self.tx.subscribe()
952    }
953
954    pub async fn shutdown(&mut self) {
955        if self.tx.send(CoreAction::Shutdown).is_err() {
956            eprintln!("No receivers acked shutdown request. Treating as unclean.");
957            return;
958        }
959
960        // Wait on the handles.
961        while let Some((handle_name, handle)) = self.handles.pop() {
962            if let Err(error) = handle.await {
963                eprintln!("Task {handle_name} failed to finish: {error:?}");
964            }
965        }
966
967        self.clean_shutdown = true;
968    }
969
970    pub async fn reload(&mut self) {
971        if self.tx.send(CoreAction::Reload).is_err() {
972            eprintln!("No receivers acked reload request.");
973        }
974    }
975}
976
977impl Drop for CoreHandle {
978    fn drop(&mut self) {
979        if !self.clean_shutdown {
980            eprintln!("⚠️  UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
981        }
982        // Can't enable yet until we clean up unix_int cache layer test
983        // debug_assert!(self.clean_shutdown);
984    }
985}
986
987pub async fn create_server_core(
988    config: Configuration,
989    config_test: bool,
990) -> Result<CoreHandle, ()> {
991    // Until this point, we probably want to write to the log macro fns.
992    let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
993
994    if config.integration_test_config.is_some() {
995        warn!("RUNNING IN INTEGRATION TEST MODE.");
996        warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
997    } else if config.tls_config.is_none() {
998        // TLS is great! We won't run without it.
999        error!("Running without TLS is not supported! Quitting!");
1000        return Err(());
1001    }
1002
1003    info!(
1004        "Starting kanidm with {}configuration: {}",
1005        if config_test { "TEST " } else { "" },
1006        config
1007    );
1008    // Setup umask, so that every we touch or create is secure.
1009    #[cfg(not(target_family = "windows"))]
1010    unsafe {
1011        umask(0o0027)
1012    };
1013
1014    // Similar, create a stats task which aggregates statistics from the
1015    // server as they come in.
1016    let status_ref = StatusActor::start();
1017
1018    // Setup TLS (if any)
1019    let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
1020        Ok(tls_acc) => tls_acc,
1021        Err(err) => {
1022            error!(?err, "Failed to configure TLS acceptor");
1023            return Err(());
1024        }
1025    };
1026
1027    let schema = match Schema::new() {
1028        Ok(s) => s,
1029        Err(e) => {
1030            error!("Failed to setup in memory schema: {:?}", e);
1031            return Err(());
1032        }
1033    };
1034
1035    // Setup the be for the qs.
1036    let be = match setup_backend(&config, &schema) {
1037        Ok(be) => be,
1038        Err(e) => {
1039            error!("Failed to setup BE -> {:?}", e);
1040            return Err(());
1041        }
1042    };
1043    // Start the IDM server.
1044    let (_qs, idms, mut idms_delayed, mut idms_audit) =
1045        match setup_qs_idms(be, schema, &config).await {
1046            Ok(t) => t,
1047            Err(e) => {
1048                error!("Unable to setup query server or idm server -> {:?}", e);
1049                return Err(());
1050            }
1051        };
1052
1053    // Extract any configuration from the IDMS that we may need.
1054    // For now we just do this per run, but we need to extract this from the db later.
1055    let jws_signer = match JwsHs256Signer::generate_hs256() {
1056        Ok(k) => k.set_sign_option_embed_kid(false),
1057        Err(e) => {
1058            error!("Unable to setup jws signer -> {:?}", e);
1059            return Err(());
1060        }
1061    };
1062
1063    // Any pre-start tasks here.
1064    if let Some(itc) = &config.integration_test_config {
1065        let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
1066            error!("Unable to acquire write transaction");
1067            return Err(());
1068        };
1069        // We need to set the admin pw.
1070        match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
1071            Ok(_) => {}
1072            Err(e) => {
1073                error!(
1074                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
1075                    &itc.admin_user, e
1076                );
1077                return Err(());
1078            }
1079        };
1080        // set the idm_admin account password
1081        match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
1082            Ok(_) => {}
1083            Err(e) => {
1084                error!(
1085                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
1086                    &itc.idm_admin_user, e
1087                );
1088                return Err(());
1089            }
1090        };
1091
1092        // Add admin to idm_admins to allow tests more flexibility wrt to permissions.
1093        // This way our default access controls can be stricter to prevent lateral
1094        // movement.
1095        match idms_prox_write.qs_write.internal_modify_uuid(
1096            UUID_IDM_ADMINS,
1097            &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
1098        ) {
1099            Ok(_) => {}
1100            Err(e) => {
1101                error!(
1102                    "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
1103                    e
1104                );
1105                return Err(());
1106            }
1107        };
1108
1109        match idms_prox_write.qs_write.internal_modify_uuid(
1110            UUID_IDM_ALL_PERSONS,
1111            &ModifyList::new_purge_and_set(
1112                Attribute::CredentialTypeMinimum,
1113                CredentialType::Any.into(),
1114            ),
1115        ) {
1116            Ok(_) => {}
1117            Err(e) => {
1118                error!(
1119                    "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
1120                    e
1121                );
1122                return Err(());
1123            }
1124        };
1125
1126        match idms_prox_write.commit() {
1127            Ok(_) => {}
1128            Err(e) => {
1129                error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
1130                return Err(());
1131            }
1132        }
1133    }
1134
1135    let ldap = match LdapServer::new(&idms).await {
1136        Ok(l) => l,
1137        Err(e) => {
1138            error!("Unable to start LdapServer -> {:?}", e);
1139            return Err(());
1140        }
1141    };
1142
1143    // Arc the idms and ldap
1144    let idms_arc = Arc::new(idms);
1145    let ldap_arc = Arc::new(ldap);
1146
1147    // Pass it to the actor for threading.
1148    // Start the read query server with the given be path: future config
1149    let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
1150
1151    // Create the server async write entry point.
1152    let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
1153
1154    let delayed_handle = task::spawn(async move {
1155        let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
1156        loop {
1157            tokio::select! {
1158                added = idms_delayed.recv_many(&mut buffer) => {
1159                    if added == 0 {
1160                        // Channel has closed, stop the task.
1161                        break
1162                    }
1163                    server_write_ref.handle_delayedaction(&mut buffer).await;
1164                }
1165                Ok(action) = broadcast_rx.recv() => {
1166                    match action {
1167                        CoreAction::Shutdown => break,
1168                        CoreAction::Reload => {},
1169                    }
1170                }
1171            }
1172        }
1173        info!("Stopped {}", TaskName::DelayedActionActor);
1174    });
1175
1176    let mut broadcast_rx = broadcast_tx.subscribe();
1177
1178    let auditd_handle = task::spawn(async move {
1179        loop {
1180            tokio::select! {
1181                Ok(action) = broadcast_rx.recv() => {
1182                    match action {
1183                        CoreAction::Shutdown => break,
1184                        CoreAction::Reload => {},
1185                    }
1186                }
1187                audit_event = idms_audit.audit_rx().recv() => {
1188                    match serde_json::to_string(&audit_event) {
1189                        Ok(audit_event) => {
1190                            warn!(%audit_event);
1191                        }
1192                        Err(e) => {
1193                            error!(err=?e, "Unable to process audit event to json.");
1194                            warn!(?audit_event, json=false);
1195                        }
1196                    }
1197
1198                }
1199            }
1200        }
1201        info!("Stopped {}", TaskName::AuditdActor);
1202    });
1203
1204    // Run the migrations *once*, only in production though.
1205    let migration_path = config
1206        .migration_path
1207        .clone()
1208        .unwrap_or(PathBuf::from(env!("KANIDM_SERVER_MIGRATION_PATH")));
1209
1210    if config.integration_test_config.is_none() {
1211        let eventid = Uuid::new_v4();
1212        migration_apply(eventid, server_write_ref, migration_path.as_path()).await;
1213    }
1214
1215    // Setup the Migration Reload Trigger.
1216    let mut broadcast_rx = broadcast_tx.subscribe();
1217    let migration_reload_handle = task::spawn(async move {
1218        loop {
1219            tokio::select! {
1220                Ok(action) = broadcast_rx.recv() => {
1221                    match action {
1222                        CoreAction::Shutdown => break,
1223                        CoreAction::Reload => {
1224                            // Read the migrations.
1225                            // Apply them.
1226                            let eventid = Uuid::new_v4();
1227                            migration_apply(
1228                                eventid,
1229                                server_write_ref,
1230                                migration_path.as_path(),
1231                            ).await;
1232
1233                            info!("Migration reload complete");
1234                        },
1235                    }
1236                }
1237            }
1238        }
1239        info!("Stopped {}", TaskName::MigrationReload);
1240    });
1241
1242    // Setup a TLS Acceptor Reload trigger.
1243
1244    let mut broadcast_rx = broadcast_tx.subscribe();
1245    let tls_config = config.tls_config.clone();
1246
1247    let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1248    let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1249
1250    let tls_acceptor_reload_handle = task::spawn(async move {
1251        loop {
1252            tokio::select! {
1253                Ok(action) = broadcast_rx.recv() => {
1254                    match action {
1255                        CoreAction::Shutdown => break,
1256                        CoreAction::Reload => {
1257                            let tls_acceptor = match crypto::setup_tls(&tls_config) {
1258                                Ok(Some(tls_acc)) => tls_acc,
1259                                Ok(None) => {
1260                                    warn!("TLS not configured, ignoring reload request.");
1261                                    continue;
1262                                }
1263                                Err(err) => {
1264                                    error!(?err, "Failed to configure and reload TLS acceptor");
1265                                    continue;
1266                                }
1267                            };
1268
1269                            // We don't log here as the receivers will notify when they have completed
1270                            // the reload.
1271                            if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1272                                error!("TLS acceptor did not accept the reload, the server may have failed!");
1273                            };
1274                            info!("TLS acceptor reload notification sent");
1275                        },
1276                    }
1277                }
1278            }
1279        }
1280        info!("Stopped {}", TaskName::TlsAcceptorReload);
1281    });
1282
1283    // Setup timed events associated to the write thread
1284    let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1285    // Setup timed events associated to the read thread
1286    let maybe_backup_handle = match &config.online_backup {
1287        Some(online_backup_config) => {
1288            if online_backup_config.enabled {
1289                let handle = IntervalActor::start_online_backup(
1290                    server_read_ref,
1291                    online_backup_config,
1292                    broadcast_tx.subscribe(),
1293                )?;
1294                Some(handle)
1295            } else {
1296                debug!("Backups disabled");
1297                None
1298            }
1299        }
1300        None => {
1301            debug!("Online backup not requested, skipping");
1302            None
1303        }
1304    };
1305
1306    // If we have been requested to init LDAP, configure it now.
1307    let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1308        Some(la) => {
1309            let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1310
1311            let h = ldaps::create_ldap_server(
1312                la,
1313                opt_ldap_ssl_acceptor,
1314                server_read_ref,
1315                &broadcast_tx,
1316                &tls_acceptor_reload_tx,
1317                config.ldap_client_address_info.trusted_tcp_info(),
1318            )
1319            .await?;
1320            Some(h)
1321        }
1322        None => {
1323            debug!("LDAP not requested, skipping");
1324            None
1325        }
1326    };
1327
1328    // If we have replication configured, setup the listener with its initial replication
1329    // map (if any).
1330    let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1331        Some(rc) => {
1332            if !config_test {
1333                // ⚠️  only start the sockets and listeners in non-config-test modes.
1334                let (h, repl_ctrl_tx) =
1335                    repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1336                        .await?;
1337                (Some(h), Some(repl_ctrl_tx))
1338            } else {
1339                (None, None)
1340            }
1341        }
1342        None => {
1343            debug!("Replication not requested, skipping");
1344            (None, None)
1345        }
1346    };
1347
1348    let maybe_http_acceptor_handles = if config_test {
1349        admin_info!("This config rocks! 🪨 ");
1350        None
1351    } else {
1352        let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1353            config.clone(),
1354            jws_signer,
1355            status_ref,
1356            server_write_ref,
1357            server_read_ref,
1358            broadcast_tx.clone(),
1359            maybe_tls_acceptor,
1360            &tls_acceptor_reload_tx,
1361        )
1362        .await
1363        .inspect_err(|err| {
1364            error!(?err, "Failed to start HTTPS server");
1365        })?;
1366
1367        if config.role != ServerRole::WriteReplicaNoUI {
1368            admin_info!("ready to rock! 🪨  UI available at: {}", config.origin);
1369        } else {
1370            admin_info!("ready to rock! 🪨 ");
1371        }
1372        Some(handles)
1373    };
1374
1375    // If we are NOT in integration test mode, start the admin socket now
1376    let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1377        let broadcast_tx_ = broadcast_tx.clone();
1378
1379        let admin_handle = AdminActor::create_admin_sock(
1380            config.adminbindpath.as_str(),
1381            server_write_ref,
1382            server_read_ref,
1383            broadcast_tx_,
1384            maybe_repl_ctrl_tx,
1385        )
1386        .await?;
1387
1388        Some(admin_handle)
1389    } else {
1390        None
1391    };
1392
1393    let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1394        (TaskName::IntervalActor, interval_handle),
1395        (TaskName::DelayedActionActor, delayed_handle),
1396        (TaskName::AuditdActor, auditd_handle),
1397        (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1398        (TaskName::MigrationReload, migration_reload_handle),
1399    ];
1400
1401    if let Some(backup_handle) = maybe_backup_handle {
1402        handles.push((TaskName::BackupActor, backup_handle))
1403    }
1404
1405    if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1406        handles.push((TaskName::AdminSocket, admin_sock_handle))
1407    }
1408
1409    if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1410        for ldap_handle in ldap_handles {
1411            handles.push((TaskName::LdapActor, ldap_handle))
1412        }
1413    }
1414
1415    if let Some(http_handles) = maybe_http_acceptor_handles {
1416        for http_handle in http_handles {
1417            handles.push((TaskName::HttpsServer, http_handle))
1418        }
1419    }
1420
1421    if let Some(repl_handle) = maybe_repl_handle {
1422        handles.push((TaskName::Replication, repl_handle))
1423    }
1424
1425    Ok(CoreHandle {
1426        clean_shutdown: false,
1427        tx: broadcast_tx,
1428        handles,
1429    })
1430}