Skip to main content

kanidmd_core/
lib.rs

1//! These contain the server "cores". These are able to startup the server
2//! (bootstrap) to a running state and then execute tasks. This is where modules
3//! are logically ordered based on their depenedncies for execution. Some of these
4//! are task-only i.e. reindexing, and some of these launch the server into a
5//! fully operational state (https, ldap, etc).
6//!
7//! Generally, this is the "entry point" where the server begins to run, and
8//! the entry point for all client traffic which is then directed to the
9//! various `actors`.
10
11#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23#![deny(clippy::indexing_slicing)]
24
25#[macro_use]
26extern crate tracing;
27#[macro_use]
28extern crate kanidmd_lib;
29
30mod actors;
31pub mod admin;
32pub mod config;
33mod crypto;
34mod https;
35mod interval;
36mod ldaps;
37mod repl;
38mod tcp;
39mod utils;
40
41use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
42use crate::admin::AdminActor;
43use crate::config::{Configuration, ServerRole};
44use crate::interval::IntervalActor;
45use crate::utils::touch_file_or_quit;
46use compact_jwt::{JwsHs256Signer, JwsSigner};
47use crypto_glue::{
48    s256::{Sha256, Sha256Output},
49    traits::Digest,
50};
51use kanidm_proto::backup::BackupCompression;
52use kanidm_proto::internal::OperationError;
53use kanidm_proto::scim_v1::client::ScimAssertGeneric;
54use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
55use kanidmd_lib::idm::ldap::LdapServer;
56use kanidmd_lib::prelude::*;
57use kanidmd_lib::schema::Schema;
58use kanidmd_lib::status::StatusActor;
59use kanidmd_lib::value::CredentialType;
60use regex::Regex;
61use std::collections::BTreeSet;
62use std::fmt::{Display, Formatter};
63use std::path::Path;
64use std::path::PathBuf;
65use std::sync::Arc;
66use std::sync::LazyLock;
67use tokio::sync::broadcast;
68use tokio::task;
69
70#[cfg(not(target_family = "windows"))]
71use libc::umask;
72
73// === internal setup helpers
74
75fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
76    setup_backend_vacuum(config, schema, false)
77}
78
79fn setup_backend_vacuum(
80    config: &Configuration,
81    schema: &Schema,
82    vacuum: bool,
83) -> Result<Backend, OperationError> {
84    // Limit the scope of the schema txn.
85    // let schema_txn = task::block_on(schema.write());
86    let schema_txn = schema.write();
87    let idxmeta = schema_txn.reload_idxmeta();
88
89    let pool_size: u32 = config.threads as u32;
90
91    let cfg = BackendConfig::new(
92        config.db_path.as_deref(),
93        pool_size,
94        config.db_fs_type.unwrap_or_default(),
95        config.db_arc_size,
96    );
97
98    Backend::new(cfg, idxmeta, vacuum)
99}
100
101// TODO #54: We could move most of the be/schema/qs setup and startup
102// outside of this call, then pass in "what we need" in a cloneable
103// form, this way we could have separate Idm vs Qs threads, and dedicated
104// threads for write vs read
105async fn setup_qs_idms(
106    be: Backend,
107    schema: Schema,
108    config: &Configuration,
109) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
110    let curtime = duration_from_epoch_now();
111    // Create a query_server implementation
112    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
113
114    // TODO #62: Should the IDM parts be broken out to the IdmServer?
115    // What's important about this initial setup here is that it also triggers
116    // the schema and acp reload, so they are now configured correctly!
117    // Initialise the schema core.
118    //
119    // Now search for the schema itself, and validate that the system
120    // in memory matches the BE on disk, and that it's syntactically correct.
121    // Write it out if changes are needed.
122    query_server
123        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
124        .await?;
125
126    // We generate a SINGLE idms only!
127    let is_integration_test = config.integration_test_config.is_some();
128    let (idms, idms_delayed, idms_audit) = IdmServer::new(
129        query_server.clone(),
130        &config.origin,
131        is_integration_test,
132        curtime,
133    )
134    .await?;
135
136    Ok((query_server, idms, idms_delayed, idms_audit))
137}
138
139async fn setup_qs(
140    be: Backend,
141    schema: Schema,
142    config: &Configuration,
143) -> Result<QueryServer, OperationError> {
144    let curtime = duration_from_epoch_now();
145    // Create a query_server implementation
146    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
147
148    // TODO #62: Should the IDM parts be broken out to the IdmServer?
149    // What's important about this initial setup here is that it also triggers
150    // the schema and acp reload, so they are now configured correctly!
151    // Initialise the schema core.
152    //
153    // Now search for the schema itself, and validate that the system
154    // in memory matches the BE on disk, and that it's syntactically correct.
155    // Write it out if changes are needed.
156    query_server
157        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
158        .await?;
159
160    Ok(query_server)
161}
162
163macro_rules! dbscan_setup_be {
164    (
165        $config:expr
166    ) => {{
167        let schema = match Schema::new() {
168            Ok(s) => s,
169            Err(e) => {
170                error!("Failed to setup in memory schema: {:?}", e);
171                std::process::exit(1);
172            }
173        };
174
175        match setup_backend($config, &schema) {
176            Ok(be) => be,
177            Err(e) => {
178                error!("Failed to setup BE: {:?}", e);
179                return;
180            }
181        }
182    }};
183}
184
185pub fn dbscan_list_indexes_core(config: &Configuration) {
186    let be = dbscan_setup_be!(config);
187    let mut be_rotxn = match be.read() {
188        Ok(txn) => txn,
189        Err(err) => {
190            error!(?err, "Unable to proceed, backend read transaction failure.");
191            return;
192        }
193    };
194
195    match be_rotxn.list_indexes() {
196        Ok(mut idx_list) => {
197            idx_list.sort_unstable();
198            idx_list.iter().for_each(|idx_name| {
199                println!("{idx_name}");
200            })
201        }
202        Err(e) => {
203            error!("Failed to retrieve index list: {:?}", e);
204        }
205    };
206}
207
208pub fn dbscan_list_id2entry_core(config: &Configuration) {
209    let be = dbscan_setup_be!(config);
210    let mut be_rotxn = match be.read() {
211        Ok(txn) => txn,
212        Err(err) => {
213            error!(?err, "Unable to proceed, backend read transaction failure.");
214            return;
215        }
216    };
217
218    match be_rotxn.list_id2entry() {
219        Ok(mut id_list) => {
220            id_list.sort_unstable_by_key(|k| k.0);
221            id_list.iter().for_each(|(id, value)| {
222                println!("{id:>8}: {value}");
223            })
224        }
225        Err(e) => {
226            error!("Failed to retrieve id2entry list: {:?}", e);
227        }
228    };
229}
230
231pub fn dbscan_list_index_analysis_core(config: &Configuration) {
232    let _be = dbscan_setup_be!(config);
233    // TBD in after slopes merge.
234}
235
236pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
237    let be = dbscan_setup_be!(config);
238    let mut be_rotxn = match be.read() {
239        Ok(txn) => txn,
240        Err(err) => {
241            error!(?err, "Unable to proceed, backend read transaction failure.");
242            return;
243        }
244    };
245
246    match be_rotxn.list_index_content(index_name) {
247        Ok(mut idx_list) => {
248            idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
249            idx_list.iter().for_each(|(key, value)| {
250                println!("{key:>50}: {value:?}");
251            })
252        }
253        Err(e) => {
254            error!("Failed to retrieve index list: {:?}", e);
255        }
256    };
257}
258
259pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
260    let be = dbscan_setup_be!(config);
261    let mut be_rotxn = match be.read() {
262        Ok(txn) => txn,
263        Err(err) => {
264            error!(?err, "Unable to proceed, backend read transaction failure.");
265            return;
266        }
267    };
268
269    match be_rotxn.get_id2entry(id) {
270        Ok((id, value)) => println!("{id:>8}: {value}"),
271        Err(e) => {
272            error!("Failed to retrieve id2entry value: {:?}", e);
273        }
274    };
275}
276
277pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
278    let be = dbscan_setup_be!(config);
279    let mut be_wrtxn = match be.write() {
280        Ok(txn) => txn,
281        Err(err) => {
282            error!(
283                ?err,
284                "Unable to proceed, backend write transaction failure."
285            );
286            return;
287        }
288    };
289
290    match be_wrtxn
291        .quarantine_entry(id)
292        .and_then(|_| be_wrtxn.commit())
293    {
294        Ok(()) => {
295            println!("quarantined - {id:>8}")
296        }
297        Err(e) => {
298            error!("Failed to quarantine id2entry value: {:?}", e);
299        }
300    };
301}
302
303pub fn dbscan_list_quarantined_core(config: &Configuration) {
304    let be = dbscan_setup_be!(config);
305    let mut be_rotxn = match be.read() {
306        Ok(txn) => txn,
307        Err(err) => {
308            error!(?err, "Unable to proceed, backend read transaction failure.");
309            return;
310        }
311    };
312
313    match be_rotxn.list_quarantined() {
314        Ok(mut id_list) => {
315            id_list.sort_unstable_by_key(|k| k.0);
316            id_list.iter().for_each(|(id, value)| {
317                println!("{id:>8}: {value}");
318            })
319        }
320        Err(e) => {
321            error!("Failed to retrieve id2entry list: {:?}", e);
322        }
323    };
324}
325
326pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
327    let be = dbscan_setup_be!(config);
328    let mut be_wrtxn = match be.write() {
329        Ok(txn) => txn,
330        Err(err) => {
331            error!(
332                ?err,
333                "Unable to proceed, backend write transaction failure."
334            );
335            return;
336        }
337    };
338
339    match be_wrtxn
340        .restore_quarantined(id)
341        .and_then(|_| be_wrtxn.commit())
342    {
343        Ok(()) => {
344            println!("restored - {id:>8}")
345        }
346        Err(e) => {
347            error!("Failed to restore quarantined id2entry value: {:?}", e);
348        }
349    };
350}
351
352pub fn backup_server_core(config: &Configuration, dst_path: Option<&Path>) {
353    let schema = match Schema::new() {
354        Ok(s) => s,
355        Err(e) => {
356            error!("Failed to setup in memory schema: {:?}", e);
357            std::process::exit(1);
358        }
359    };
360
361    let be = match setup_backend(config, &schema) {
362        Ok(be) => be,
363        Err(e) => {
364            error!("Failed to setup BE: {:?}", e);
365            return;
366        }
367    };
368
369    let mut be_ro_txn = match be.read() {
370        Ok(txn) => txn,
371        Err(err) => {
372            error!(?err, "Unable to proceed, backend read transaction failure.");
373            return;
374        }
375    };
376
377    let compression = match config.online_backup.as_ref() {
378        Some(backup_config) => backup_config.compression,
379        None => BackupCompression::default(),
380    };
381
382    if let Some(dst_path) = dst_path {
383        if dst_path.exists() {
384            error!(
385                "backup file {} already exists, will not overwrite it.",
386                dst_path.display()
387            );
388            return;
389        }
390
391        let output = match std::fs::File::create(dst_path) {
392            Ok(output) => output,
393            Err(err) => {
394                error!(?err, "File::create error creating {}", dst_path.display());
395                return;
396            }
397        };
398
399        match be_ro_txn.backup(output, compression) {
400            Ok(_) => info!("Backup success!"),
401            Err(e) => {
402                error!("Backup failed: {:?}", e);
403                std::process::exit(1);
404            }
405        };
406    } else {
407        // No path set, default to stdout
408        let stdout = std::io::stdout().lock();
409
410        match be_ro_txn.backup(stdout, compression) {
411            Ok(_) => info!("Backup success!"),
412            Err(e) => {
413                error!("Backup failed: {:?}", e);
414                std::process::exit(1);
415            }
416        };
417    };
418    // Let the txn abort, even on success.
419}
420
421pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
422    // If it's an in memory database, we don't need to touch anything
423    if let Some(db_path) = config.db_path.as_ref() {
424        touch_file_or_quit(db_path);
425    }
426
427    // First, we provide the in-memory schema so that core attrs are indexed correctly.
428    let schema = match Schema::new() {
429        Ok(s) => s,
430        Err(e) => {
431            error!("Failed to setup in memory schema: {:?}", e);
432            std::process::exit(1);
433        }
434    };
435
436    let be = match setup_backend(config, &schema) {
437        Ok(be) => be,
438        Err(e) => {
439            error!("Failed to setup backend: {:?}", e);
440            return;
441        }
442    };
443
444    let mut be_wr_txn = match be.write() {
445        Ok(txn) => txn,
446        Err(err) => {
447            error!(
448                ?err,
449                "Unable to proceed, backend write transaction failure."
450            );
451            return;
452        }
453    };
454
455    let compression = BackupCompression::identify_file(dst_path);
456
457    let input = match std::fs::File::open(dst_path) {
458        Ok(output) => output,
459        Err(err) => {
460            error!(?err, "File::open error reading {}", dst_path.display());
461            return;
462        }
463    };
464
465    let r = be_wr_txn
466        .restore(input, compression)
467        .and_then(|_| be_wr_txn.commit());
468
469    if r.is_err() {
470        error!("Failed to restore database: {:?}", r);
471        std::process::exit(1);
472    }
473    info!("Database loaded successfully");
474
475    reindex_inner(be, schema, config).await;
476
477    info!("✅ Restore Success!");
478}
479
480pub async fn reindex_server_core(config: &Configuration) {
481    info!("Start Index Phase 1 ...");
482    // First, we provide the in-memory schema so that core attrs are indexed correctly.
483    let schema = match Schema::new() {
484        Ok(s) => s,
485        Err(e) => {
486            error!("Failed to setup in memory schema: {:?}", e);
487            std::process::exit(1);
488        }
489    };
490
491    let be = match setup_backend(config, &schema) {
492        Ok(be) => be,
493        Err(e) => {
494            error!("Failed to setup BE: {:?}", e);
495            return;
496        }
497    };
498
499    reindex_inner(be, schema, config).await;
500
501    info!("✅ Reindex Success!");
502}
503
504async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
505    // Reindex only the core schema attributes to bootstrap the process.
506    let mut be_wr_txn = match be.write() {
507        Ok(txn) => txn,
508        Err(err) => {
509            error!(
510                ?err,
511                "Unable to proceed, backend write transaction failure."
512            );
513            return;
514        }
515    };
516
517    let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
518
519    // Now that's done, setup a minimal qs and reindex from that.
520    if r.is_err() {
521        error!("Failed to reindex database: {:?}", r);
522        std::process::exit(1);
523    }
524    info!("Index Phase 1 Success!");
525
526    info!("Attempting to init query server ...");
527
528    let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
529        Ok(t) => t,
530        Err(e) => {
531            error!("Unable to setup query server or idm server -> {:?}", e);
532            return;
533        }
534    };
535    info!("Init Query Server Success!");
536
537    info!("Start Index Phase 2 ...");
538
539    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
540        error!("Unable to acquire write transaction");
541        return;
542    };
543    let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
544
545    match r {
546        Ok(_) => info!("Index Phase 2 Success!"),
547        Err(e) => {
548            error!("Reindex failed: {:?}", e);
549            std::process::exit(1);
550        }
551    };
552}
553
554pub fn vacuum_server_core(config: &Configuration) {
555    let schema = match Schema::new() {
556        Ok(s) => s,
557        Err(e) => {
558            eprintln!("Failed to setup in memory schema: {e:?}");
559            std::process::exit(1);
560        }
561    };
562
563    // The schema doesn't matter here. Vacuum is run as part of db open to avoid
564    // locking.
565    let r = setup_backend_vacuum(config, &schema, true);
566
567    match r {
568        Ok(_) => eprintln!("Vacuum Success!"),
569        Err(e) => {
570            eprintln!("Vacuum failed: {e:?}");
571            std::process::exit(1);
572        }
573    };
574}
575
576pub async fn domain_rename_core(config: &Configuration) {
577    let schema = match Schema::new() {
578        Ok(s) => s,
579        Err(e) => {
580            eprintln!("Failed to setup in memory schema: {e:?}");
581            std::process::exit(1);
582        }
583    };
584
585    // Start the backend.
586    let be = match setup_backend(config, &schema) {
587        Ok(be) => be,
588        Err(e) => {
589            error!("Failed to setup BE: {:?}", e);
590            return;
591        }
592    };
593
594    // Setup the qs, and perform any migrations and changes we may have.
595    let qs = match setup_qs(be, schema, config).await {
596        Ok(t) => t,
597        Err(e) => {
598            error!("Unable to setup query server -> {:?}", e);
599            return;
600        }
601    };
602
603    let new_domain_name = config.domain.as_str();
604
605    // make sure we're actually changing the domain name...
606    match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
607        Ok(old_domain_name) => {
608            admin_info!(?old_domain_name, ?new_domain_name);
609            if old_domain_name == new_domain_name {
610                admin_info!("Domain name not changing, stopping.");
611                return;
612            }
613            admin_debug!(
614                "Domain name is changing from {:?} to {:?}",
615                old_domain_name,
616                new_domain_name
617            );
618        }
619        Err(e) => {
620            admin_error!("Failed to query domain name, quitting! -> {:?}", e);
621            return;
622        }
623    }
624
625    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
626        error!("Unable to acquire write transaction");
627        return;
628    };
629    let r = qs_write
630        .danger_domain_rename(new_domain_name)
631        .and_then(|_| qs_write.commit());
632
633    match r {
634        Ok(_) => info!("Domain Rename Success!"),
635        Err(e) => {
636            error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
637            std::process::exit(1);
638        }
639    };
640}
641
642pub async fn verify_server_core(config: &Configuration) {
643    let curtime = duration_from_epoch_now();
644    // setup the qs - without initialise!
645    let schema_mem = match Schema::new() {
646        Ok(sc) => sc,
647        Err(e) => {
648            error!("Failed to setup in memory schema: {:?}", e);
649            return;
650        }
651    };
652    // Setup the be
653    let be = match setup_backend(config, &schema_mem) {
654        Ok(be) => be,
655        Err(e) => {
656            error!("Failed to setup BE: {:?}", e);
657            return;
658        }
659    };
660
661    let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
662        Ok(qs) => qs,
663        Err(err) => {
664            error!(?err, "Failed to setup query server");
665            return;
666        }
667    };
668
669    // Run verifications.
670    let r = server.verify().await;
671
672    if r.is_empty() {
673        eprintln!("Verification passed!");
674        std::process::exit(0);
675    } else {
676        for er in r {
677            error!("{:?}", er);
678        }
679        std::process::exit(1);
680    }
681
682    // Now add IDM server verifications?
683}
684
685pub fn cert_generate_core(config: &Configuration) {
686    // Get the cert root
687
688    let (tls_key_path, tls_chain_path) = match &config.tls_config {
689        Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
690        None => {
691            error!("Unable to find TLS configuration");
692            std::process::exit(1);
693        }
694    };
695
696    if tls_key_path.exists() && tls_chain_path.exists() {
697        info!(
698            "TLS key and chain already exist - remove them first if you intend to regenerate these"
699        );
700        return;
701    }
702
703    let origin_domain = match config.origin.domain() {
704        Some(val) => val,
705        None => {
706            error!("origin does not contain a valid domain");
707            std::process::exit(1);
708        }
709    };
710
711    let cert_root = match tls_key_path.parent() {
712        Some(parent) => parent,
713        None => {
714            error!("Unable to find parent directory of {:?}", tls_key_path);
715            std::process::exit(1);
716        }
717    };
718
719    let ca_cert = cert_root.join("ca.pem");
720    let ca_key = cert_root.join("cakey.pem");
721    let tls_cert_path = cert_root.join("cert.pem");
722
723    let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
724        // Generate the CA again.
725        let ca_handle = match crypto::build_ca() {
726            Ok(ca_handle) => ca_handle,
727            Err(e) => {
728                error!(err = ?e, "Failed to build CA");
729                std::process::exit(1);
730            }
731        };
732
733        if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
734            error!("Failed to write CA");
735            std::process::exit(1);
736        }
737
738        ca_handle
739    } else {
740        match crypto::load_ca(ca_key, ca_cert) {
741            Ok(ca_handle) => ca_handle,
742            Err(_) => {
743                error!("Failed to load CA");
744                std::process::exit(1);
745            }
746        }
747    };
748
749    if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
750        // Generate the cert from the ca.
751        let cert_handle = match crypto::build_cert(origin_domain, &ca_handle) {
752            Ok(cert_handle) => cert_handle,
753            Err(e) => {
754                error!(err = ?e, "Failed to build certificate");
755                std::process::exit(1);
756            }
757        };
758
759        if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
760            error!("Failed to write certificates");
761            std::process::exit(1);
762        }
763    }
764    info!("certificate generation complete");
765}
766
767static MIGRATION_PATH_RE: LazyLock<Regex> = LazyLock::new(|| {
768    #[allow(clippy::expect_used)]
769    Regex::new("^\\d\\d-.*\\.h?json$").expect("Invalid SPN regex found")
770});
771
772struct ScimMigration {
773    path: PathBuf,
774    hash: Sha256Output,
775    assertions: ScimAssertGeneric,
776}
777
778#[instrument(
779    level = "info",
780    fields(uuid = ?eventid),
781    skip_all,
782)]
783async fn migration_apply(
784    eventid: Uuid,
785    server_write_ref: &'static QueryServerWriteV1,
786    migration_path: &Path,
787) {
788    if !migration_path.exists() {
789        info!(migration_path = %migration_path.display(), "Migration path does not exist - migrations will be skipped.");
790        return;
791    }
792
793    let mut dir_ents = match tokio::fs::read_dir(migration_path).await {
794        Ok(dir_ents) => dir_ents,
795        Err(err) => {
796            error!(?err, "Unable to read migration directory.");
797            let diag = kanidm_lib_file_permissions::diagnose_path(migration_path);
798            info!(%diag);
799            return;
800        }
801    };
802
803    let mut migration_paths = Vec::with_capacity(8);
804
805    loop {
806        match dir_ents.next_entry().await {
807            Ok(Some(dir_ent)) => migration_paths.push(dir_ent.path()),
808            Ok(None) => {
809                // Complete,
810                break;
811            }
812            Err(err) => {
813                error!(?err, "Unable to read directory entries.");
814                return;
815            }
816        }
817    }
818
819    // Filter these.
820
821    let mut migration_paths: Vec<_> = migration_paths.into_iter()
822        .filter(|path| {
823            if !path.is_file() {
824                info!(path = %path.display(), "ignoring path that is not a file.");
825                return false;
826            }
827
828            let Some(file_name) = path.file_name().and_then(std::ffi::OsStr::to_str) else {
829                info!(path = %path.display(), "ignoring path that has no file name, or is not a valid utf-8 file name.");
830                return false;
831            };
832
833            if !MIGRATION_PATH_RE.is_match(file_name) {
834                info!(path = %path.display(), "ignoring file that does not match naming pattern.");
835                info!("expected pattern 'XX-NAME.json' where XX are two numbers, followed by a hypen, with the file extension .json");
836                return false;
837            }
838
839            true
840        })
841        .collect();
842
843    migration_paths.sort_unstable();
844    let mut migrations = Vec::with_capacity(migration_paths.len());
845
846    for migration_path in migration_paths {
847        info!(path = %migration_path.display(), "examining migration");
848
849        let migration_content = match tokio::fs::read(&migration_path).await {
850            Ok(bytes) => bytes,
851            Err(err) => {
852                error!(?err, "Unable to read migration - it will be ignored.");
853                let diag = kanidm_lib_file_permissions::diagnose_path(&migration_path);
854                info!(%diag);
855                continue;
856            }
857        };
858
859        // Is it valid json?
860        let assertions: ScimAssertGeneric = match serde_hjson::from_slice(&migration_content) {
861            Ok(assertions) => assertions,
862            Err(err) => {
863                error!(?err, path = %migration_path.display(), "Invalid JSON SCIM Assertion");
864                continue;
865            }
866        };
867
868        // Hash the content.
869        let mut hasher = Sha256::new();
870        hasher.update(&migration_content);
871        let migration_hash: Sha256Output = hasher.finalize();
872
873        migrations.push(ScimMigration {
874            path: migration_path,
875            hash: migration_hash,
876            assertions,
877        });
878    }
879
880    let mut migration_ids = BTreeSet::new();
881    for migration in &migrations {
882        // BTreeSet returns false on duplicate value insertion.
883        if !migration_ids.insert(migration.assertions.id) {
884            error!(path = %migration.path.display(), uuid = ?migration.assertions.id, "Duplicate migration UUID found, refusing to proceed!!! All migrations must have a unique ID!!!");
885            return;
886        }
887    }
888
889    // Okay, we're setup to go - apply them all. Note that we do these
890    // separately, each migration occurs in its own transaction.
891    for ScimMigration {
892        path,
893        hash,
894        assertions,
895    } in migrations
896    {
897        if let Err(err) = server_write_ref
898            .handle_scim_migration_apply(eventid, assertions, hash)
899            .await
900        {
901            error!(?err, path = %path.display(), "Failed to apply migration");
902        };
903    }
904}
905
906#[derive(Clone, Debug)]
907pub enum CoreAction {
908    Shutdown,
909    Reload,
910}
911
912pub(crate) enum TaskName {
913    AdminSocket,
914    AuditdActor,
915    BackupActor,
916    DelayedActionActor,
917    HttpsServer,
918    IntervalActor,
919    LdapActor,
920    Replication,
921    TlsAcceptorReload,
922    MigrationReload,
923}
924
925impl Display for TaskName {
926    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
927        write!(
928            f,
929            "{}",
930            match self {
931                TaskName::AdminSocket => "Admin Socket",
932                TaskName::AuditdActor => "Auditd Actor",
933                TaskName::BackupActor => "Backup Actor",
934                TaskName::DelayedActionActor => "Delayed Action Actor",
935                TaskName::HttpsServer => "HTTPS Server",
936                TaskName::IntervalActor => "Interval Actor",
937                TaskName::LdapActor => "LDAP Acceptor Actor",
938                TaskName::Replication => "Replication",
939                TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
940                TaskName::MigrationReload => "Migration Reload Monitor",
941            }
942        )
943    }
944}
945
946pub struct CoreHandle {
947    clean_shutdown: bool,
948    tx: broadcast::Sender<CoreAction>,
949    /// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end.
950    handles: Vec<(TaskName, task::JoinHandle<()>)>,
951}
952
953impl CoreHandle {
954    pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
955        self.tx.subscribe()
956    }
957
958    pub async fn shutdown(&mut self) {
959        if self.tx.send(CoreAction::Shutdown).is_err() {
960            eprintln!("No receivers acked shutdown request. Treating as unclean.");
961            return;
962        }
963
964        // Wait on the handles.
965        while let Some((handle_name, handle)) = self.handles.pop() {
966            debug!("Waiting for {handle_name} ...");
967            if let Err(error) = handle.await {
968                eprintln!("Task {handle_name} failed to finish: {error:?}");
969            }
970        }
971
972        self.clean_shutdown = true;
973    }
974
975    pub async fn reload(&mut self) {
976        if self.tx.send(CoreAction::Reload).is_err() {
977            eprintln!("No receivers acked reload request.");
978        }
979    }
980}
981
982impl Drop for CoreHandle {
983    fn drop(&mut self) {
984        if !self.clean_shutdown {
985            eprintln!("⚠️  UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
986        }
987        // Can't enable yet until we clean up unix_int cache layer test
988        // debug_assert!(self.clean_shutdown);
989    }
990}
991
992pub async fn create_server_core(
993    config: Configuration,
994    config_test: bool,
995) -> Result<CoreHandle, ()> {
996    // Until this point, we probably want to write to the log macro fns.
997    let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
998
999    if config.integration_test_config.is_some() {
1000        warn!("RUNNING IN INTEGRATION TEST MODE.");
1001        warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
1002    } else if config.tls_config.is_none() {
1003        // TLS is great! We won't run without it.
1004        error!("Running without TLS is not supported! Quitting!");
1005        return Err(());
1006    }
1007
1008    info!(
1009        "Starting kanidm with {}configuration: {}",
1010        if config_test { "TEST " } else { "" },
1011        config
1012    );
1013    // Setup umask, so that every we touch or create is secure.
1014    #[cfg(not(target_family = "windows"))]
1015    unsafe {
1016        umask(0o0027)
1017    };
1018
1019    // Similar, create a stats task which aggregates statistics from the
1020    // server as they come in.
1021    let status_ref = StatusActor::start();
1022
1023    // Setup TLS (if any)
1024    let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
1025        Ok(tls_acc) => tls_acc,
1026        Err(err) => {
1027            error!(?err, "Failed to configure TLS acceptor");
1028            return Err(());
1029        }
1030    };
1031
1032    let schema = match Schema::new() {
1033        Ok(s) => s,
1034        Err(e) => {
1035            error!("Failed to setup in memory schema: {:?}", e);
1036            return Err(());
1037        }
1038    };
1039
1040    // Setup the be for the qs.
1041    let be = match setup_backend(&config, &schema) {
1042        Ok(be) => be,
1043        Err(e) => {
1044            error!("Failed to setup BE -> {:?}", e);
1045            return Err(());
1046        }
1047    };
1048    // Start the IDM server.
1049    let (_qs, idms, mut idms_delayed, mut idms_audit) =
1050        match setup_qs_idms(be, schema, &config).await {
1051            Ok(t) => t,
1052            Err(e) => {
1053                error!("Unable to setup query server or idm server -> {:?}", e);
1054                return Err(());
1055            }
1056        };
1057
1058    // Extract any configuration from the IDMS that we may need.
1059    // For now we just do this per run, but we need to extract this from the db later.
1060    let jws_signer = match JwsHs256Signer::generate_hs256() {
1061        Ok(k) => k.set_sign_option_embed_kid(false),
1062        Err(e) => {
1063            error!("Unable to setup jws signer -> {:?}", e);
1064            return Err(());
1065        }
1066    };
1067
1068    // Any pre-start tasks here.
1069    if let Some(itc) = &config.integration_test_config {
1070        let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
1071            error!("Unable to acquire write transaction");
1072            return Err(());
1073        };
1074        // We need to set the admin pw.
1075        match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
1076            Ok(_) => {}
1077            Err(e) => {
1078                error!(
1079                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
1080                    &itc.admin_user, e
1081                );
1082                return Err(());
1083            }
1084        };
1085        // set the idm_admin account password
1086        match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
1087            Ok(_) => {}
1088            Err(e) => {
1089                error!(
1090                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
1091                    &itc.idm_admin_user, e
1092                );
1093                return Err(());
1094            }
1095        };
1096
1097        // Add admin to idm_admins to allow tests more flexibility wrt to permissions.
1098        // This way our default access controls can be stricter to prevent lateral
1099        // movement.
1100        match idms_prox_write.qs_write.internal_modify_uuid(
1101            UUID_IDM_ADMINS,
1102            &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
1103        ) {
1104            Ok(_) => {}
1105            Err(e) => {
1106                error!(
1107                    "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
1108                    e
1109                );
1110                return Err(());
1111            }
1112        };
1113
1114        match idms_prox_write.qs_write.internal_modify_uuid(
1115            UUID_IDM_ALL_PERSONS,
1116            &ModifyList::new_purge_and_set(
1117                Attribute::CredentialTypeMinimum,
1118                CredentialType::Any.into(),
1119            ),
1120        ) {
1121            Ok(_) => {}
1122            Err(e) => {
1123                error!(
1124                    "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
1125                    e
1126                );
1127                return Err(());
1128            }
1129        };
1130
1131        match idms_prox_write.commit() {
1132            Ok(_) => {}
1133            Err(e) => {
1134                error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
1135                return Err(());
1136            }
1137        }
1138    }
1139
1140    let ldap = match LdapServer::new(&idms).await {
1141        Ok(l) => l,
1142        Err(e) => {
1143            error!("Unable to start LdapServer -> {:?}", e);
1144            return Err(());
1145        }
1146    };
1147
1148    // Arc the idms and ldap
1149    let idms_arc = Arc::new(idms);
1150    let ldap_arc = Arc::new(ldap);
1151
1152    // Pass it to the actor for threading.
1153    // Start the read query server with the given be path: future config
1154    let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
1155
1156    // Create the server async write entry point.
1157    let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
1158
1159    let delayed_handle = task::spawn(async move {
1160        let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
1161        loop {
1162            tokio::select! {
1163                added = idms_delayed.recv_many(&mut buffer) => {
1164                    if added == 0 {
1165                        // Channel has closed, stop the task.
1166                        break
1167                    }
1168                    server_write_ref.handle_delayedaction(&mut buffer).await;
1169                }
1170                Ok(action) = broadcast_rx.recv() => {
1171                    match action {
1172                        CoreAction::Shutdown => break,
1173                        CoreAction::Reload => {},
1174                    }
1175                }
1176            }
1177        }
1178        info!("Stopped {}", TaskName::DelayedActionActor);
1179    });
1180
1181    let mut broadcast_rx = broadcast_tx.subscribe();
1182
1183    let auditd_handle = task::spawn(async move {
1184        loop {
1185            tokio::select! {
1186                Ok(action) = broadcast_rx.recv() => {
1187                    match action {
1188                        CoreAction::Shutdown => break,
1189                        CoreAction::Reload => {},
1190                    }
1191                }
1192                audit_event = idms_audit.audit_rx().recv() => {
1193                    match serde_json::to_string(&audit_event) {
1194                        Ok(audit_event) => {
1195                            warn!(%audit_event);
1196                        }
1197                        Err(e) => {
1198                            error!(err=?e, "Unable to process audit event to json.");
1199                            warn!(?audit_event, json=false);
1200                        }
1201                    }
1202
1203                }
1204            }
1205        }
1206        info!("Stopped {}", TaskName::AuditdActor);
1207    });
1208
1209    // Run the migrations *once*, only in production though.
1210    let migration_path = config
1211        .migration_path
1212        .clone()
1213        .unwrap_or(PathBuf::from(env!("KANIDM_SERVER_MIGRATION_PATH")));
1214
1215    if config.integration_test_config.is_none() {
1216        let eventid = Uuid::new_v4();
1217        migration_apply(eventid, server_write_ref, migration_path.as_path()).await;
1218    }
1219
1220    // Setup the Migration Reload Trigger.
1221    let mut broadcast_rx = broadcast_tx.subscribe();
1222    let migration_reload_handle = task::spawn(async move {
1223        loop {
1224            tokio::select! {
1225                Ok(action) = broadcast_rx.recv() => {
1226                    match action {
1227                        CoreAction::Shutdown => break,
1228                        CoreAction::Reload => {
1229                            // Read the migrations.
1230                            // Apply them.
1231                            let eventid = Uuid::new_v4();
1232                            migration_apply(
1233                                eventid,
1234                                server_write_ref,
1235                                migration_path.as_path(),
1236                            ).await;
1237
1238                            info!("Migration reload complete");
1239                        },
1240                    }
1241                }
1242            }
1243        }
1244        info!("Stopped {}", TaskName::MigrationReload);
1245    });
1246
1247    // Setup a TLS Acceptor Reload trigger.
1248
1249    let mut broadcast_rx = broadcast_tx.subscribe();
1250    let tls_config = config.tls_config.clone();
1251
1252    let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1253    let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1254
1255    let tls_acceptor_reload_handle = task::spawn(async move {
1256        loop {
1257            tokio::select! {
1258                Ok(action) = broadcast_rx.recv() => {
1259                    match action {
1260                        CoreAction::Shutdown => break,
1261                        CoreAction::Reload => {
1262                            let tls_acceptor = match crypto::setup_tls(&tls_config) {
1263                                Ok(Some(tls_acc)) => tls_acc,
1264                                Ok(None) => {
1265                                    warn!("TLS not configured, ignoring reload request.");
1266                                    continue;
1267                                }
1268                                Err(err) => {
1269                                    error!(?err, "Failed to configure and reload TLS acceptor");
1270                                    continue;
1271                                }
1272                            };
1273
1274                            // We don't log here as the receivers will notify when they have completed
1275                            // the reload.
1276                            if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1277                                error!("TLS acceptor did not accept the reload, the server may have failed!");
1278                            };
1279                            info!("TLS acceptor reload notification sent");
1280                        },
1281                    }
1282                }
1283            }
1284        }
1285        info!("Stopped {}", TaskName::TlsAcceptorReload);
1286    });
1287
1288    // Setup timed events associated to the write thread
1289    let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1290    // Setup timed events associated to the read thread
1291    let maybe_backup_handle = match &config.online_backup {
1292        Some(online_backup_config) => {
1293            if online_backup_config.enabled {
1294                let handle = IntervalActor::start_online_backup(
1295                    server_read_ref,
1296                    online_backup_config,
1297                    broadcast_tx.subscribe(),
1298                )?;
1299                Some(handle)
1300            } else {
1301                debug!("Backups disabled");
1302                None
1303            }
1304        }
1305        None => {
1306            debug!("Online backup not requested, skipping");
1307            None
1308        }
1309    };
1310
1311    // If we have been requested to init LDAP, configure it now.
1312    let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1313        Some(la) => {
1314            let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1315
1316            let h = ldaps::create_ldap_server(
1317                la,
1318                opt_ldap_ssl_acceptor,
1319                server_read_ref,
1320                &broadcast_tx,
1321                &tls_acceptor_reload_tx,
1322                config.ldap_client_address_info.trusted_tcp_info(),
1323            )
1324            .await?;
1325            Some(h)
1326        }
1327        None => {
1328            debug!("LDAP not requested, skipping");
1329            None
1330        }
1331    };
1332
1333    // If we have replication configured, setup the listener with its initial replication
1334    // map (if any).
1335    let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1336        Some(rc) => {
1337            if !config_test {
1338                // ⚠️  only start the sockets and listeners in non-config-test modes.
1339                let (h, repl_ctrl_tx) =
1340                    repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1341                        .await?;
1342                (Some(h), Some(repl_ctrl_tx))
1343            } else {
1344                (None, None)
1345            }
1346        }
1347        None => {
1348            debug!("Replication not requested, skipping");
1349            (None, None)
1350        }
1351    };
1352
1353    let maybe_http_acceptor_handles = if config_test {
1354        admin_info!("This config rocks! 🪨 ");
1355        None
1356    } else {
1357        let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1358            config.clone(),
1359            jws_signer,
1360            status_ref,
1361            server_write_ref,
1362            server_read_ref,
1363            broadcast_tx.clone(),
1364            maybe_tls_acceptor,
1365            &tls_acceptor_reload_tx,
1366        )
1367        .await
1368        .inspect_err(|err| {
1369            error!(?err, "Failed to start HTTPS server");
1370        })?;
1371
1372        if config.role != ServerRole::WriteReplicaNoUI {
1373            admin_info!("ready to rock! 🪨  UI available at: {}", config.origin);
1374        } else {
1375            admin_info!("ready to rock! 🪨 ");
1376        }
1377        Some(handles)
1378    };
1379
1380    // If we are NOT in integration test mode, start the admin socket now
1381    let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1382        let broadcast_tx_ = broadcast_tx.clone();
1383
1384        let admin_handle = AdminActor::create_admin_sock(
1385            config.adminbindpath.as_str(),
1386            server_write_ref,
1387            server_read_ref,
1388            broadcast_tx_,
1389            maybe_repl_ctrl_tx,
1390        )
1391        .await?;
1392
1393        Some(admin_handle)
1394    } else {
1395        None
1396    };
1397
1398    let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1399        (TaskName::IntervalActor, interval_handle),
1400        (TaskName::DelayedActionActor, delayed_handle),
1401        (TaskName::AuditdActor, auditd_handle),
1402        (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1403        (TaskName::MigrationReload, migration_reload_handle),
1404    ];
1405
1406    if let Some(backup_handle) = maybe_backup_handle {
1407        handles.push((TaskName::BackupActor, backup_handle))
1408    }
1409
1410    if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1411        handles.push((TaskName::AdminSocket, admin_sock_handle))
1412    }
1413
1414    if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1415        for ldap_handle in ldap_handles {
1416            handles.push((TaskName::LdapActor, ldap_handle))
1417        }
1418    }
1419
1420    if let Some(http_handles) = maybe_http_acceptor_handles {
1421        for http_handle in http_handles {
1422            handles.push((TaskName::HttpsServer, http_handle))
1423        }
1424    }
1425
1426    if let Some(repl_handle) = maybe_repl_handle {
1427        handles.push((TaskName::Replication, repl_handle))
1428    }
1429
1430    Ok(CoreHandle {
1431        clean_shutdown: false,
1432        tx: broadcast_tx,
1433        handles,
1434    })
1435}