kanidmd_core/
lib.rs

1//! These contain the server "cores". These are able to startup the server
2//! (bootstrap) to a running state and then execute tasks. This is where modules
3//! are logically ordered based on their depenedncies for execution. Some of these
4//! are task-only i.e. reindexing, and some of these launch the server into a
5//! fully operational state (https, ldap, etc).
6//!
7//! Generally, this is the "entry point" where the server begins to run, and
8//! the entry point for all client traffic which is then directed to the
9//! various `actors`.
10
11#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23#![deny(clippy::indexing_slicing)]
24
25#[macro_use]
26extern crate tracing;
27#[macro_use]
28extern crate kanidmd_lib;
29
30mod actors;
31pub mod admin;
32pub mod config;
33mod crypto;
34mod https;
35mod interval;
36mod ldaps;
37mod repl;
38mod tcp;
39mod utils;
40
41use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
42use crate::admin::AdminActor;
43use crate::config::Configuration;
44use crate::interval::IntervalActor;
45use crate::utils::touch_file_or_quit;
46use compact_jwt::{JwsHs256Signer, JwsSigner};
47use kanidm_proto::backup::BackupCompression;
48use kanidm_proto::config::ServerRole;
49use kanidm_proto::internal::OperationError;
50use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
51use kanidmd_lib::idm::ldap::LdapServer;
52use kanidmd_lib::prelude::*;
53use kanidmd_lib::schema::Schema;
54use kanidmd_lib::status::StatusActor;
55use kanidmd_lib::value::CredentialType;
56#[cfg(not(target_family = "windows"))]
57use libc::umask;
58use std::fmt::{Display, Formatter};
59use std::path::Path;
60use std::sync::Arc;
61use tokio::sync::broadcast;
62use tokio::sync::Notify;
63use tokio::task;
64
65// === internal setup helpers
66
67fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
68    setup_backend_vacuum(config, schema, false)
69}
70
71fn setup_backend_vacuum(
72    config: &Configuration,
73    schema: &Schema,
74    vacuum: bool,
75) -> Result<Backend, OperationError> {
76    // Limit the scope of the schema txn.
77    // let schema_txn = task::block_on(schema.write());
78    let schema_txn = schema.write();
79    let idxmeta = schema_txn.reload_idxmeta();
80
81    let pool_size: u32 = config.threads as u32;
82
83    let cfg = BackendConfig::new(
84        config.db_path.as_deref(),
85        pool_size,
86        config.db_fs_type.unwrap_or_default(),
87        config.db_arc_size,
88    );
89
90    Backend::new(cfg, idxmeta, vacuum)
91}
92
93// TODO #54: We could move most of the be/schema/qs setup and startup
94// outside of this call, then pass in "what we need" in a cloneable
95// form, this way we could have separate Idm vs Qs threads, and dedicated
96// threads for write vs read
97async fn setup_qs_idms(
98    be: Backend,
99    schema: Schema,
100    config: &Configuration,
101) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
102    let curtime = duration_from_epoch_now();
103    // Create a query_server implementation
104    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
105
106    // TODO #62: Should the IDM parts be broken out to the IdmServer?
107    // What's important about this initial setup here is that it also triggers
108    // the schema and acp reload, so they are now configured correctly!
109    // Initialise the schema core.
110    //
111    // Now search for the schema itself, and validate that the system
112    // in memory matches the BE on disk, and that it's syntactically correct.
113    // Write it out if changes are needed.
114    query_server
115        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
116        .await?;
117
118    // We generate a SINGLE idms only!
119    let is_integration_test = config.integration_test_config.is_some();
120    let (idms, idms_delayed, idms_audit) = IdmServer::new(
121        query_server.clone(),
122        &config.origin,
123        is_integration_test,
124        curtime,
125    )
126    .await?;
127
128    Ok((query_server, idms, idms_delayed, idms_audit))
129}
130
131async fn setup_qs(
132    be: Backend,
133    schema: Schema,
134    config: &Configuration,
135) -> Result<QueryServer, OperationError> {
136    let curtime = duration_from_epoch_now();
137    // Create a query_server implementation
138    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
139
140    // TODO #62: Should the IDM parts be broken out to the IdmServer?
141    // What's important about this initial setup here is that it also triggers
142    // the schema and acp reload, so they are now configured correctly!
143    // Initialise the schema core.
144    //
145    // Now search for the schema itself, and validate that the system
146    // in memory matches the BE on disk, and that it's syntactically correct.
147    // Write it out if changes are needed.
148    query_server
149        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
150        .await?;
151
152    Ok(query_server)
153}
154
155macro_rules! dbscan_setup_be {
156    (
157        $config:expr
158    ) => {{
159        let schema = match Schema::new() {
160            Ok(s) => s,
161            Err(e) => {
162                error!("Failed to setup in memory schema: {:?}", e);
163                std::process::exit(1);
164            }
165        };
166
167        match setup_backend($config, &schema) {
168            Ok(be) => be,
169            Err(e) => {
170                error!("Failed to setup BE: {:?}", e);
171                return;
172            }
173        }
174    }};
175}
176
177pub fn dbscan_list_indexes_core(config: &Configuration) {
178    let be = dbscan_setup_be!(config);
179    let mut be_rotxn = match be.read() {
180        Ok(txn) => txn,
181        Err(err) => {
182            error!(?err, "Unable to proceed, backend read transaction failure.");
183            return;
184        }
185    };
186
187    match be_rotxn.list_indexes() {
188        Ok(mut idx_list) => {
189            idx_list.sort_unstable();
190            idx_list.iter().for_each(|idx_name| {
191                println!("{idx_name}");
192            })
193        }
194        Err(e) => {
195            error!("Failed to retrieve index list: {:?}", e);
196        }
197    };
198}
199
200pub fn dbscan_list_id2entry_core(config: &Configuration) {
201    let be = dbscan_setup_be!(config);
202    let mut be_rotxn = match be.read() {
203        Ok(txn) => txn,
204        Err(err) => {
205            error!(?err, "Unable to proceed, backend read transaction failure.");
206            return;
207        }
208    };
209
210    match be_rotxn.list_id2entry() {
211        Ok(mut id_list) => {
212            id_list.sort_unstable_by_key(|k| k.0);
213            id_list.iter().for_each(|(id, value)| {
214                println!("{id:>8}: {value}");
215            })
216        }
217        Err(e) => {
218            error!("Failed to retrieve id2entry list: {:?}", e);
219        }
220    };
221}
222
223pub fn dbscan_list_index_analysis_core(config: &Configuration) {
224    let _be = dbscan_setup_be!(config);
225    // TBD in after slopes merge.
226}
227
228pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
229    let be = dbscan_setup_be!(config);
230    let mut be_rotxn = match be.read() {
231        Ok(txn) => txn,
232        Err(err) => {
233            error!(?err, "Unable to proceed, backend read transaction failure.");
234            return;
235        }
236    };
237
238    match be_rotxn.list_index_content(index_name) {
239        Ok(mut idx_list) => {
240            idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
241            idx_list.iter().for_each(|(key, value)| {
242                println!("{key:>50}: {value:?}");
243            })
244        }
245        Err(e) => {
246            error!("Failed to retrieve index list: {:?}", e);
247        }
248    };
249}
250
251pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
252    let be = dbscan_setup_be!(config);
253    let mut be_rotxn = match be.read() {
254        Ok(txn) => txn,
255        Err(err) => {
256            error!(?err, "Unable to proceed, backend read transaction failure.");
257            return;
258        }
259    };
260
261    match be_rotxn.get_id2entry(id) {
262        Ok((id, value)) => println!("{id:>8}: {value}"),
263        Err(e) => {
264            error!("Failed to retrieve id2entry value: {:?}", e);
265        }
266    };
267}
268
269pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
270    let be = dbscan_setup_be!(config);
271    let mut be_wrtxn = match be.write() {
272        Ok(txn) => txn,
273        Err(err) => {
274            error!(
275                ?err,
276                "Unable to proceed, backend write transaction failure."
277            );
278            return;
279        }
280    };
281
282    match be_wrtxn
283        .quarantine_entry(id)
284        .and_then(|_| be_wrtxn.commit())
285    {
286        Ok(()) => {
287            println!("quarantined - {id:>8}")
288        }
289        Err(e) => {
290            error!("Failed to quarantine id2entry value: {:?}", e);
291        }
292    };
293}
294
295pub fn dbscan_list_quarantined_core(config: &Configuration) {
296    let be = dbscan_setup_be!(config);
297    let mut be_rotxn = match be.read() {
298        Ok(txn) => txn,
299        Err(err) => {
300            error!(?err, "Unable to proceed, backend read transaction failure.");
301            return;
302        }
303    };
304
305    match be_rotxn.list_quarantined() {
306        Ok(mut id_list) => {
307            id_list.sort_unstable_by_key(|k| k.0);
308            id_list.iter().for_each(|(id, value)| {
309                println!("{id:>8}: {value}");
310            })
311        }
312        Err(e) => {
313            error!("Failed to retrieve id2entry list: {:?}", e);
314        }
315    };
316}
317
318pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
319    let be = dbscan_setup_be!(config);
320    let mut be_wrtxn = match be.write() {
321        Ok(txn) => txn,
322        Err(err) => {
323            error!(
324                ?err,
325                "Unable to proceed, backend write transaction failure."
326            );
327            return;
328        }
329    };
330
331    match be_wrtxn
332        .restore_quarantined(id)
333        .and_then(|_| be_wrtxn.commit())
334    {
335        Ok(()) => {
336            println!("restored - {id:>8}")
337        }
338        Err(e) => {
339            error!("Failed to restore quarantined id2entry value: {:?}", e);
340        }
341    };
342}
343
344pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
345    let schema = match Schema::new() {
346        Ok(s) => s,
347        Err(e) => {
348            error!("Failed to setup in memory schema: {:?}", e);
349            std::process::exit(1);
350        }
351    };
352
353    let be = match setup_backend(config, &schema) {
354        Ok(be) => be,
355        Err(e) => {
356            error!("Failed to setup BE: {:?}", e);
357            return;
358        }
359    };
360
361    let mut be_ro_txn = match be.read() {
362        Ok(txn) => txn,
363        Err(err) => {
364            error!(?err, "Unable to proceed, backend read transaction failure.");
365            return;
366        }
367    };
368
369    let compression = match config.online_backup.as_ref() {
370        Some(backup_config) => backup_config.compression,
371        None => BackupCompression::default(),
372    };
373
374    match be_ro_txn.backup(dst_path, compression) {
375        Ok(_) => info!("Backup success!"),
376        Err(e) => {
377            error!("Backup failed: {:?}", e);
378            std::process::exit(1);
379        }
380    };
381    // Let the txn abort, even on success.
382}
383
384pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
385    // If it's an in memory database, we don't need to touch anything
386    if let Some(db_path) = config.db_path.as_ref() {
387        touch_file_or_quit(db_path);
388    }
389
390    // First, we provide the in-memory schema so that core attrs are indexed correctly.
391    let schema = match Schema::new() {
392        Ok(s) => s,
393        Err(e) => {
394            error!("Failed to setup in memory schema: {:?}", e);
395            std::process::exit(1);
396        }
397    };
398
399    let be = match setup_backend(config, &schema) {
400        Ok(be) => be,
401        Err(e) => {
402            error!("Failed to setup backend: {:?}", e);
403            return;
404        }
405    };
406
407    let mut be_wr_txn = match be.write() {
408        Ok(txn) => txn,
409        Err(err) => {
410            error!(
411                ?err,
412                "Unable to proceed, backend write transaction failure."
413            );
414            return;
415        }
416    };
417    let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
418
419    if r.is_err() {
420        error!("Failed to restore database: {:?}", r);
421        std::process::exit(1);
422    }
423    info!("Database loaded successfully");
424
425    reindex_inner(be, schema, config).await;
426
427    info!("✅ Restore Success!");
428}
429
430pub async fn reindex_server_core(config: &Configuration) {
431    info!("Start Index Phase 1 ...");
432    // First, we provide the in-memory schema so that core attrs are indexed correctly.
433    let schema = match Schema::new() {
434        Ok(s) => s,
435        Err(e) => {
436            error!("Failed to setup in memory schema: {:?}", e);
437            std::process::exit(1);
438        }
439    };
440
441    let be = match setup_backend(config, &schema) {
442        Ok(be) => be,
443        Err(e) => {
444            error!("Failed to setup BE: {:?}", e);
445            return;
446        }
447    };
448
449    reindex_inner(be, schema, config).await;
450
451    info!("✅ Reindex Success!");
452}
453
454async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
455    // Reindex only the core schema attributes to bootstrap the process.
456    let mut be_wr_txn = match be.write() {
457        Ok(txn) => txn,
458        Err(err) => {
459            error!(
460                ?err,
461                "Unable to proceed, backend write transaction failure."
462            );
463            return;
464        }
465    };
466
467    let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
468
469    // Now that's done, setup a minimal qs and reindex from that.
470    if r.is_err() {
471        error!("Failed to reindex database: {:?}", r);
472        std::process::exit(1);
473    }
474    info!("Index Phase 1 Success!");
475
476    info!("Attempting to init query server ...");
477
478    let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
479        Ok(t) => t,
480        Err(e) => {
481            error!("Unable to setup query server or idm server -> {:?}", e);
482            return;
483        }
484    };
485    info!("Init Query Server Success!");
486
487    info!("Start Index Phase 2 ...");
488
489    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
490        error!("Unable to acquire write transaction");
491        return;
492    };
493    let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
494
495    match r {
496        Ok(_) => info!("Index Phase 2 Success!"),
497        Err(e) => {
498            error!("Reindex failed: {:?}", e);
499            std::process::exit(1);
500        }
501    };
502}
503
504pub fn vacuum_server_core(config: &Configuration) {
505    let schema = match Schema::new() {
506        Ok(s) => s,
507        Err(e) => {
508            eprintln!("Failed to setup in memory schema: {e:?}");
509            std::process::exit(1);
510        }
511    };
512
513    // The schema doesn't matter here. Vacuum is run as part of db open to avoid
514    // locking.
515    let r = setup_backend_vacuum(config, &schema, true);
516
517    match r {
518        Ok(_) => eprintln!("Vacuum Success!"),
519        Err(e) => {
520            eprintln!("Vacuum failed: {e:?}");
521            std::process::exit(1);
522        }
523    };
524}
525
526pub async fn domain_rename_core(config: &Configuration) {
527    let schema = match Schema::new() {
528        Ok(s) => s,
529        Err(e) => {
530            eprintln!("Failed to setup in memory schema: {e:?}");
531            std::process::exit(1);
532        }
533    };
534
535    // Start the backend.
536    let be = match setup_backend(config, &schema) {
537        Ok(be) => be,
538        Err(e) => {
539            error!("Failed to setup BE: {:?}", e);
540            return;
541        }
542    };
543
544    // Setup the qs, and perform any migrations and changes we may have.
545    let qs = match setup_qs(be, schema, config).await {
546        Ok(t) => t,
547        Err(e) => {
548            error!("Unable to setup query server -> {:?}", e);
549            return;
550        }
551    };
552
553    let new_domain_name = config.domain.as_str();
554
555    // make sure we're actually changing the domain name...
556    match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
557        Ok(old_domain_name) => {
558            admin_info!(?old_domain_name, ?new_domain_name);
559            if old_domain_name == new_domain_name {
560                admin_info!("Domain name not changing, stopping.");
561                return;
562            }
563            admin_debug!(
564                "Domain name is changing from {:?} to {:?}",
565                old_domain_name,
566                new_domain_name
567            );
568        }
569        Err(e) => {
570            admin_error!("Failed to query domain name, quitting! -> {:?}", e);
571            return;
572        }
573    }
574
575    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
576        error!("Unable to acquire write transaction");
577        return;
578    };
579    let r = qs_write
580        .danger_domain_rename(new_domain_name)
581        .and_then(|_| qs_write.commit());
582
583    match r {
584        Ok(_) => info!("Domain Rename Success!"),
585        Err(e) => {
586            error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
587            std::process::exit(1);
588        }
589    };
590}
591
592pub async fn verify_server_core(config: &Configuration) {
593    let curtime = duration_from_epoch_now();
594    // setup the qs - without initialise!
595    let schema_mem = match Schema::new() {
596        Ok(sc) => sc,
597        Err(e) => {
598            error!("Failed to setup in memory schema: {:?}", e);
599            return;
600        }
601    };
602    // Setup the be
603    let be = match setup_backend(config, &schema_mem) {
604        Ok(be) => be,
605        Err(e) => {
606            error!("Failed to setup BE: {:?}", e);
607            return;
608        }
609    };
610
611    let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
612        Ok(qs) => qs,
613        Err(err) => {
614            error!(?err, "Failed to setup query server");
615            return;
616        }
617    };
618
619    // Run verifications.
620    let r = server.verify().await;
621
622    if r.is_empty() {
623        eprintln!("Verification passed!");
624        std::process::exit(0);
625    } else {
626        for er in r {
627            error!("{:?}", er);
628        }
629        std::process::exit(1);
630    }
631
632    // Now add IDM server verifications?
633}
634
635pub fn cert_generate_core(config: &Configuration) {
636    // Get the cert root
637
638    let (tls_key_path, tls_chain_path) = match &config.tls_config {
639        Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
640        None => {
641            error!("Unable to find TLS configuration");
642            std::process::exit(1);
643        }
644    };
645
646    if tls_key_path.exists() && tls_chain_path.exists() {
647        info!(
648            "TLS key and chain already exist - remove them first if you intend to regenerate these"
649        );
650        return;
651    }
652
653    let origin_domain = match config.origin.domain() {
654        Some(val) => val,
655        None => {
656            error!("origin does not contain a valid domain");
657            std::process::exit(1);
658        }
659    };
660
661    let cert_root = match tls_key_path.parent() {
662        Some(parent) => parent,
663        None => {
664            error!("Unable to find parent directory of {:?}", tls_key_path);
665            std::process::exit(1);
666        }
667    };
668
669    let ca_cert = cert_root.join("ca.pem");
670    let ca_key = cert_root.join("cakey.pem");
671    let tls_cert_path = cert_root.join("cert.pem");
672
673    let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
674        // Generate the CA again.
675        let ca_handle = match crypto::build_ca(None) {
676            Ok(ca_handle) => ca_handle,
677            Err(e) => {
678                error!(err = ?e, "Failed to build CA");
679                std::process::exit(1);
680            }
681        };
682
683        if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
684            error!("Failed to write CA");
685            std::process::exit(1);
686        }
687
688        ca_handle
689    } else {
690        match crypto::load_ca(ca_key, ca_cert) {
691            Ok(ca_handle) => ca_handle,
692            Err(_) => {
693                error!("Failed to load CA");
694                std::process::exit(1);
695            }
696        }
697    };
698
699    if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
700        // Generate the cert from the ca.
701        let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
702            Ok(cert_handle) => cert_handle,
703            Err(e) => {
704                error!(err = ?e, "Failed to build certificate");
705                std::process::exit(1);
706            }
707        };
708
709        if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
710            error!("Failed to write certificates");
711            std::process::exit(1);
712        }
713    }
714    info!("certificate generation complete");
715}
716
717#[derive(Clone, Debug)]
718pub enum CoreAction {
719    Shutdown,
720}
721
722pub(crate) enum TaskName {
723    AdminSocket,
724    AuditdActor,
725    BackupActor,
726    DelayedActionActor,
727    HttpsServer,
728    IntervalActor,
729    LdapActor,
730    Replication,
731    TlsAcceptorReload,
732}
733
734impl Display for TaskName {
735    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
736        write!(
737            f,
738            "{}",
739            match self {
740                TaskName::AdminSocket => "Admin Socket",
741                TaskName::AuditdActor => "Auditd Actor",
742                TaskName::BackupActor => "Backup Actor",
743                TaskName::DelayedActionActor => "Delayed Action Actor",
744                TaskName::HttpsServer => "HTTPS Server",
745                TaskName::IntervalActor => "Interval Actor",
746                TaskName::LdapActor => "LDAP Acceptor Actor",
747                TaskName::Replication => "Replication",
748                TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
749            }
750        )
751    }
752}
753
754pub struct CoreHandle {
755    clean_shutdown: bool,
756    tx: broadcast::Sender<CoreAction>,
757    tls_acceptor_reload_notify: Arc<Notify>,
758    /// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end.
759    handles: Vec<(TaskName, task::JoinHandle<()>)>,
760}
761
762impl CoreHandle {
763    pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
764        self.tx.subscribe()
765    }
766
767    pub async fn shutdown(&mut self) {
768        if self.tx.send(CoreAction::Shutdown).is_err() {
769            eprintln!("No receivers acked shutdown request. Treating as unclean.");
770            return;
771        }
772
773        // Wait on the handles.
774        while let Some((handle_name, handle)) = self.handles.pop() {
775            if let Err(error) = handle.await {
776                eprintln!("Task {handle_name} failed to finish: {error:?}");
777            }
778        }
779
780        self.clean_shutdown = true;
781    }
782
783    pub async fn tls_acceptor_reload(&mut self) {
784        self.tls_acceptor_reload_notify.notify_one()
785    }
786}
787
788impl Drop for CoreHandle {
789    fn drop(&mut self) {
790        if !self.clean_shutdown {
791            eprintln!("⚠️  UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
792        }
793        // Can't enable yet until we clean up unix_int cache layer test
794        // debug_assert!(self.clean_shutdown);
795    }
796}
797
798pub async fn create_server_core(
799    config: Configuration,
800    config_test: bool,
801) -> Result<CoreHandle, ()> {
802    // Until this point, we probably want to write to the log macro fns.
803    let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
804
805    if config.integration_test_config.is_some() {
806        warn!("RUNNING IN INTEGRATION TEST MODE.");
807        warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
808    } else if config.tls_config.is_none() {
809        // TLS is great! We won't run without it.
810        error!("Running without TLS is not supported! Quitting!");
811        return Err(());
812    }
813
814    info!(
815        "Starting kanidm with {}configuration: {}",
816        if config_test { "TEST " } else { "" },
817        config
818    );
819    // Setup umask, so that every we touch or create is secure.
820    #[cfg(not(target_family = "windows"))]
821    unsafe {
822        umask(0o0027)
823    };
824
825    // Similar, create a stats task which aggregates statistics from the
826    // server as they come in.
827    let status_ref = StatusActor::start();
828
829    // Setup TLS (if any)
830    let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
831        Ok(tls_acc) => tls_acc,
832        Err(err) => {
833            error!(?err, "Failed to configure TLS acceptor");
834            return Err(());
835        }
836    };
837
838    let schema = match Schema::new() {
839        Ok(s) => s,
840        Err(e) => {
841            error!("Failed to setup in memory schema: {:?}", e);
842            return Err(());
843        }
844    };
845
846    // Setup the be for the qs.
847    let be = match setup_backend(&config, &schema) {
848        Ok(be) => be,
849        Err(e) => {
850            error!("Failed to setup BE -> {:?}", e);
851            return Err(());
852        }
853    };
854    // Start the IDM server.
855    let (_qs, idms, mut idms_delayed, mut idms_audit) =
856        match setup_qs_idms(be, schema, &config).await {
857            Ok(t) => t,
858            Err(e) => {
859                error!("Unable to setup query server or idm server -> {:?}", e);
860                return Err(());
861            }
862        };
863
864    // Extract any configuration from the IDMS that we may need.
865    // For now we just do this per run, but we need to extract this from the db later.
866    let jws_signer = match JwsHs256Signer::generate_hs256() {
867        Ok(k) => k.set_sign_option_embed_kid(false),
868        Err(e) => {
869            error!("Unable to setup jws signer -> {:?}", e);
870            return Err(());
871        }
872    };
873
874    // Any pre-start tasks here.
875    if let Some(itc) = &config.integration_test_config {
876        let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
877            error!("Unable to acquire write transaction");
878            return Err(());
879        };
880        // We need to set the admin pw.
881        match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
882            Ok(_) => {}
883            Err(e) => {
884                error!(
885                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
886                    &itc.admin_user, e
887                );
888                return Err(());
889            }
890        };
891        // set the idm_admin account password
892        match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
893            Ok(_) => {}
894            Err(e) => {
895                error!(
896                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
897                    &itc.idm_admin_user, e
898                );
899                return Err(());
900            }
901        };
902
903        // Add admin to idm_admins to allow tests more flexibility wrt to permissions.
904        // This way our default access controls can be stricter to prevent lateral
905        // movement.
906        match idms_prox_write.qs_write.internal_modify_uuid(
907            UUID_IDM_ADMINS,
908            &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
909        ) {
910            Ok(_) => {}
911            Err(e) => {
912                error!(
913                    "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
914                    e
915                );
916                return Err(());
917            }
918        };
919
920        match idms_prox_write.qs_write.internal_modify_uuid(
921            UUID_IDM_ALL_PERSONS,
922            &ModifyList::new_purge_and_set(
923                Attribute::CredentialTypeMinimum,
924                CredentialType::Any.into(),
925            ),
926        ) {
927            Ok(_) => {}
928            Err(e) => {
929                error!(
930                    "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
931                    e
932                );
933                return Err(());
934            }
935        };
936
937        match idms_prox_write.commit() {
938            Ok(_) => {}
939            Err(e) => {
940                error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
941                return Err(());
942            }
943        }
944    }
945
946    let ldap = match LdapServer::new(&idms).await {
947        Ok(l) => l,
948        Err(e) => {
949            error!("Unable to start LdapServer -> {:?}", e);
950            return Err(());
951        }
952    };
953
954    // Arc the idms and ldap
955    let idms_arc = Arc::new(idms);
956    let ldap_arc = Arc::new(ldap);
957
958    // Pass it to the actor for threading.
959    // Start the read query server with the given be path: future config
960    let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
961
962    // Create the server async write entry point.
963    let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
964
965    let delayed_handle = task::spawn(async move {
966        let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
967        loop {
968            tokio::select! {
969                added = idms_delayed.recv_many(&mut buffer) => {
970                    if added == 0 {
971                        // Channel has closed, stop the task.
972                        break
973                    }
974                    server_write_ref.handle_delayedaction(&mut buffer).await;
975                }
976                Ok(action) = broadcast_rx.recv() => {
977                    match action {
978                        CoreAction::Shutdown => break,
979                    }
980                }
981            }
982        }
983        info!("Stopped {}", TaskName::DelayedActionActor);
984    });
985
986    let mut broadcast_rx = broadcast_tx.subscribe();
987
988    let auditd_handle = task::spawn(async move {
989        loop {
990            tokio::select! {
991                Ok(action) = broadcast_rx.recv() => {
992                    match action {
993                        CoreAction::Shutdown => break,
994                    }
995                }
996                audit_event = idms_audit.audit_rx().recv() => {
997                    match serde_json::to_string(&audit_event) {
998                        Ok(audit_event) => {
999                            warn!(%audit_event);
1000                        }
1001                        Err(e) => {
1002                            error!(err=?e, "Unable to process audit event to json.");
1003                            warn!(?audit_event, json=false);
1004                        }
1005                    }
1006
1007                }
1008            }
1009        }
1010        info!("Stopped {}", TaskName::AuditdActor);
1011    });
1012
1013    // Setup a TLS Acceptor Reload trigger.
1014
1015    let mut broadcast_rx = broadcast_tx.subscribe();
1016    let tls_acceptor_reload_notify = Arc::new(Notify::new());
1017    let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1018    let tls_config = config.tls_config.clone();
1019
1020    let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1021    let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1022
1023    let tls_acceptor_reload_handle = task::spawn(async move {
1024        loop {
1025            tokio::select! {
1026                Ok(action) = broadcast_rx.recv() => {
1027                    match action {
1028                        CoreAction::Shutdown => break,
1029                    }
1030                }
1031                _ = tls_accepter_reload_task_notify.notified() => {
1032                    let tls_acceptor = match crypto::setup_tls(&tls_config) {
1033                        Ok(Some(tls_acc)) => tls_acc,
1034                        Ok(None) => {
1035                            warn!("TLS not configured, ignoring reload request.");
1036                            continue;
1037                        }
1038                        Err(err) => {
1039                            error!(?err, "Failed to configure and reload TLS acceptor");
1040                            continue;
1041                        }
1042                    };
1043
1044                    // We don't log here as the receivers will notify when they have completed
1045                    // the reload.
1046                    if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1047                        error!("tls acceptor did not accept the reload, the server may have failed!");
1048                    };
1049                    info!("tls acceptor reload notification sent");
1050                }
1051            }
1052        }
1053        info!("Stopped {}", TaskName::TlsAcceptorReload);
1054    });
1055
1056    // Setup timed events associated to the write thread
1057    let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1058    // Setup timed events associated to the read thread
1059    let maybe_backup_handle = match &config.online_backup {
1060        Some(online_backup_config) => {
1061            if online_backup_config.enabled {
1062                let handle = IntervalActor::start_online_backup(
1063                    server_read_ref,
1064                    online_backup_config,
1065                    broadcast_tx.subscribe(),
1066                )?;
1067                Some(handle)
1068            } else {
1069                debug!("Backups disabled");
1070                None
1071            }
1072        }
1073        None => {
1074            debug!("Online backup not requested, skipping");
1075            None
1076        }
1077    };
1078
1079    // If we have been requested to init LDAP, configure it now.
1080    let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1081        Some(la) => {
1082            let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1083
1084            let h = ldaps::create_ldap_server(
1085                la,
1086                opt_ldap_ssl_acceptor,
1087                server_read_ref,
1088                &broadcast_tx,
1089                &tls_acceptor_reload_tx,
1090                config.ldap_client_address_info.trusted_tcp_info(),
1091            )
1092            .await?;
1093            Some(h)
1094        }
1095        None => {
1096            debug!("LDAP not requested, skipping");
1097            None
1098        }
1099    };
1100
1101    // If we have replication configured, setup the listener with its initial replication
1102    // map (if any).
1103    let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1104        Some(rc) => {
1105            if !config_test {
1106                // ⚠️  only start the sockets and listeners in non-config-test modes.
1107                let (h, repl_ctrl_tx) =
1108                    repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1109                        .await?;
1110                (Some(h), Some(repl_ctrl_tx))
1111            } else {
1112                (None, None)
1113            }
1114        }
1115        None => {
1116            debug!("Replication not requested, skipping");
1117            (None, None)
1118        }
1119    };
1120
1121    let maybe_http_acceptor_handles = if config_test {
1122        admin_info!("This config rocks! 🪨 ");
1123        None
1124    } else {
1125        let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1126            config.clone(),
1127            jws_signer,
1128            status_ref,
1129            server_write_ref,
1130            server_read_ref,
1131            broadcast_tx.clone(),
1132            maybe_tls_acceptor,
1133            &tls_acceptor_reload_tx,
1134        )
1135        .await
1136        .inspect_err(|err| {
1137            error!(?err, "Failed to start HTTPS server");
1138        })?;
1139
1140        if config.role != ServerRole::WriteReplicaNoUI {
1141            admin_info!("ready to rock! 🪨  UI available at: {}", config.origin);
1142        } else {
1143            admin_info!("ready to rock! 🪨 ");
1144        }
1145        Some(handles)
1146    };
1147
1148    // If we are NOT in integration test mode, start the admin socket now
1149    let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1150        let broadcast_rx = broadcast_tx.subscribe();
1151
1152        let admin_handle = AdminActor::create_admin_sock(
1153            config.adminbindpath.as_str(),
1154            server_write_ref,
1155            server_read_ref,
1156            broadcast_rx,
1157            maybe_repl_ctrl_tx,
1158        )
1159        .await?;
1160
1161        Some(admin_handle)
1162    } else {
1163        None
1164    };
1165
1166    let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1167        (TaskName::IntervalActor, interval_handle),
1168        (TaskName::DelayedActionActor, delayed_handle),
1169        (TaskName::AuditdActor, auditd_handle),
1170        (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1171    ];
1172
1173    if let Some(backup_handle) = maybe_backup_handle {
1174        handles.push((TaskName::BackupActor, backup_handle))
1175    }
1176
1177    if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1178        handles.push((TaskName::AdminSocket, admin_sock_handle))
1179    }
1180
1181    if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1182        for ldap_handle in ldap_handles {
1183            handles.push((TaskName::LdapActor, ldap_handle))
1184        }
1185    }
1186
1187    if let Some(http_handles) = maybe_http_acceptor_handles {
1188        for http_handle in http_handles {
1189            handles.push((TaskName::HttpsServer, http_handle))
1190        }
1191    }
1192
1193    if let Some(repl_handle) = maybe_repl_handle {
1194        handles.push((TaskName::Replication, repl_handle))
1195    }
1196
1197    Ok(CoreHandle {
1198        clean_shutdown: false,
1199        tls_acceptor_reload_notify,
1200        tx: broadcast_tx,
1201        handles,
1202    })
1203}