kanidmd_core/
lib.rs

1//! These contain the server "cores". These are able to startup the server
2//! (bootstrap) to a running state and then execute tasks. This is where modules
3//! are logically ordered based on their depenedncies for execution. Some of these
4//! are task-only i.e. reindexing, and some of these launch the server into a
5//! fully operational state (https, ldap, etc).
6//!
7//! Generally, this is the "entry point" where the server begins to run, and
8//! the entry point for all client traffic which is then directed to the
9//! various `actors`.
10
11#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23
24#[macro_use]
25extern crate tracing;
26#[macro_use]
27extern crate kanidmd_lib;
28
29mod actors;
30pub mod admin;
31pub mod config;
32mod crypto;
33mod https;
34mod interval;
35mod ldaps;
36mod repl;
37mod utils;
38
39use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
40use crate::admin::AdminActor;
41use crate::config::{Configuration, ServerRole};
42use crate::interval::IntervalActor;
43use crate::utils::touch_file_or_quit;
44use compact_jwt::{JwsHs256Signer, JwsSigner};
45use kanidm_proto::internal::OperationError;
46use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
47use kanidmd_lib::idm::ldap::LdapServer;
48use kanidmd_lib::prelude::*;
49use kanidmd_lib::schema::Schema;
50use kanidmd_lib::status::StatusActor;
51use kanidmd_lib::value::CredentialType;
52#[cfg(not(target_family = "windows"))]
53use libc::umask;
54use std::fmt::{Display, Formatter};
55use std::path::Path;
56use std::sync::Arc;
57use tokio::sync::broadcast;
58use tokio::sync::mpsc;
59use tokio::sync::Notify;
60use tokio::task;
61
62// === internal setup helpers
63
64fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
65    setup_backend_vacuum(config, schema, false)
66}
67
68fn setup_backend_vacuum(
69    config: &Configuration,
70    schema: &Schema,
71    vacuum: bool,
72) -> Result<Backend, OperationError> {
73    // Limit the scope of the schema txn.
74    // let schema_txn = task::block_on(schema.write());
75    let schema_txn = schema.write();
76    let idxmeta = schema_txn.reload_idxmeta();
77
78    let pool_size: u32 = config.threads as u32;
79
80    let cfg = BackendConfig::new(
81        config.db_path.as_deref(),
82        pool_size,
83        config.db_fs_type.unwrap_or_default(),
84        config.db_arc_size,
85    );
86
87    Backend::new(cfg, idxmeta, vacuum)
88}
89
90// TODO #54: We could move most of the be/schema/qs setup and startup
91// outside of this call, then pass in "what we need" in a cloneable
92// form, this way we could have separate Idm vs Qs threads, and dedicated
93// threads for write vs read
94async fn setup_qs_idms(
95    be: Backend,
96    schema: Schema,
97    config: &Configuration,
98) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
99    let curtime = duration_from_epoch_now();
100    // Create a query_server implementation
101    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
102
103    // TODO #62: Should the IDM parts be broken out to the IdmServer?
104    // What's important about this initial setup here is that it also triggers
105    // the schema and acp reload, so they are now configured correctly!
106    // Initialise the schema core.
107    //
108    // Now search for the schema itself, and validate that the system
109    // in memory matches the BE on disk, and that it's syntactically correct.
110    // Write it out if changes are needed.
111    query_server
112        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
113        .await?;
114
115    // We generate a SINGLE idms only!
116    let is_integration_test = config.integration_test_config.is_some();
117    let (idms, idms_delayed, idms_audit) = IdmServer::new(
118        query_server.clone(),
119        &config.origin,
120        is_integration_test,
121        curtime,
122    )
123    .await?;
124
125    Ok((query_server, idms, idms_delayed, idms_audit))
126}
127
128async fn setup_qs(
129    be: Backend,
130    schema: Schema,
131    config: &Configuration,
132) -> Result<QueryServer, OperationError> {
133    let curtime = duration_from_epoch_now();
134    // Create a query_server implementation
135    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
136
137    // TODO #62: Should the IDM parts be broken out to the IdmServer?
138    // What's important about this initial setup here is that it also triggers
139    // the schema and acp reload, so they are now configured correctly!
140    // Initialise the schema core.
141    //
142    // Now search for the schema itself, and validate that the system
143    // in memory matches the BE on disk, and that it's syntactically correct.
144    // Write it out if changes are needed.
145    query_server
146        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
147        .await?;
148
149    Ok(query_server)
150}
151
152macro_rules! dbscan_setup_be {
153    (
154        $config:expr
155    ) => {{
156        let schema = match Schema::new() {
157            Ok(s) => s,
158            Err(e) => {
159                error!("Failed to setup in memory schema: {:?}", e);
160                std::process::exit(1);
161            }
162        };
163
164        match setup_backend($config, &schema) {
165            Ok(be) => be,
166            Err(e) => {
167                error!("Failed to setup BE: {:?}", e);
168                return;
169            }
170        }
171    }};
172}
173
174pub fn dbscan_list_indexes_core(config: &Configuration) {
175    let be = dbscan_setup_be!(config);
176    let mut be_rotxn = match be.read() {
177        Ok(txn) => txn,
178        Err(err) => {
179            error!(?err, "Unable to proceed, backend read transaction failure.");
180            return;
181        }
182    };
183
184    match be_rotxn.list_indexes() {
185        Ok(mut idx_list) => {
186            idx_list.sort_unstable();
187            idx_list.iter().for_each(|idx_name| {
188                println!("{}", idx_name);
189            })
190        }
191        Err(e) => {
192            error!("Failed to retrieve index list: {:?}", e);
193        }
194    };
195}
196
197pub fn dbscan_list_id2entry_core(config: &Configuration) {
198    let be = dbscan_setup_be!(config);
199    let mut be_rotxn = match be.read() {
200        Ok(txn) => txn,
201        Err(err) => {
202            error!(?err, "Unable to proceed, backend read transaction failure.");
203            return;
204        }
205    };
206
207    match be_rotxn.list_id2entry() {
208        Ok(mut id_list) => {
209            id_list.sort_unstable_by_key(|k| k.0);
210            id_list.iter().for_each(|(id, value)| {
211                println!("{:>8}: {}", id, value);
212            })
213        }
214        Err(e) => {
215            error!("Failed to retrieve id2entry list: {:?}", e);
216        }
217    };
218}
219
220pub fn dbscan_list_index_analysis_core(config: &Configuration) {
221    let _be = dbscan_setup_be!(config);
222    // TBD in after slopes merge.
223}
224
225pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
226    let be = dbscan_setup_be!(config);
227    let mut be_rotxn = match be.read() {
228        Ok(txn) => txn,
229        Err(err) => {
230            error!(?err, "Unable to proceed, backend read transaction failure.");
231            return;
232        }
233    };
234
235    match be_rotxn.list_index_content(index_name) {
236        Ok(mut idx_list) => {
237            idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
238            idx_list.iter().for_each(|(key, value)| {
239                println!("{:>50}: {:?}", key, value);
240            })
241        }
242        Err(e) => {
243            error!("Failed to retrieve index list: {:?}", e);
244        }
245    };
246}
247
248pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
249    let be = dbscan_setup_be!(config);
250    let mut be_rotxn = match be.read() {
251        Ok(txn) => txn,
252        Err(err) => {
253            error!(?err, "Unable to proceed, backend read transaction failure.");
254            return;
255        }
256    };
257
258    match be_rotxn.get_id2entry(id) {
259        Ok((id, value)) => println!("{:>8}: {}", id, value),
260        Err(e) => {
261            error!("Failed to retrieve id2entry value: {:?}", e);
262        }
263    };
264}
265
266pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
267    let be = dbscan_setup_be!(config);
268    let mut be_wrtxn = match be.write() {
269        Ok(txn) => txn,
270        Err(err) => {
271            error!(
272                ?err,
273                "Unable to proceed, backend write transaction failure."
274            );
275            return;
276        }
277    };
278
279    match be_wrtxn
280        .quarantine_entry(id)
281        .and_then(|_| be_wrtxn.commit())
282    {
283        Ok(()) => {
284            println!("quarantined - {:>8}", id)
285        }
286        Err(e) => {
287            error!("Failed to quarantine id2entry value: {:?}", e);
288        }
289    };
290}
291
292pub fn dbscan_list_quarantined_core(config: &Configuration) {
293    let be = dbscan_setup_be!(config);
294    let mut be_rotxn = match be.read() {
295        Ok(txn) => txn,
296        Err(err) => {
297            error!(?err, "Unable to proceed, backend read transaction failure.");
298            return;
299        }
300    };
301
302    match be_rotxn.list_quarantined() {
303        Ok(mut id_list) => {
304            id_list.sort_unstable_by_key(|k| k.0);
305            id_list.iter().for_each(|(id, value)| {
306                println!("{:>8}: {}", id, value);
307            })
308        }
309        Err(e) => {
310            error!("Failed to retrieve id2entry list: {:?}", e);
311        }
312    };
313}
314
315pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
316    let be = dbscan_setup_be!(config);
317    let mut be_wrtxn = match be.write() {
318        Ok(txn) => txn,
319        Err(err) => {
320            error!(
321                ?err,
322                "Unable to proceed, backend write transaction failure."
323            );
324            return;
325        }
326    };
327
328    match be_wrtxn
329        .restore_quarantined(id)
330        .and_then(|_| be_wrtxn.commit())
331    {
332        Ok(()) => {
333            println!("restored - {:>8}", id)
334        }
335        Err(e) => {
336            error!("Failed to restore quarantined id2entry value: {:?}", e);
337        }
338    };
339}
340
341pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
342    let schema = match Schema::new() {
343        Ok(s) => s,
344        Err(e) => {
345            error!("Failed to setup in memory schema: {:?}", e);
346            std::process::exit(1);
347        }
348    };
349
350    let be = match setup_backend(config, &schema) {
351        Ok(be) => be,
352        Err(e) => {
353            error!("Failed to setup BE: {:?}", e);
354            return;
355        }
356    };
357
358    let mut be_ro_txn = match be.read() {
359        Ok(txn) => txn,
360        Err(err) => {
361            error!(?err, "Unable to proceed, backend read transaction failure.");
362            return;
363        }
364    };
365
366    let r = be_ro_txn.backup(dst_path);
367    match r {
368        Ok(_) => info!("Backup success!"),
369        Err(e) => {
370            error!("Backup failed: {:?}", e);
371            std::process::exit(1);
372        }
373    };
374    // Let the txn abort, even on success.
375}
376
377pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
378    // If it's an in memory database, we don't need to touch anything
379    if let Some(db_path) = config.db_path.as_ref() {
380        touch_file_or_quit(db_path);
381    }
382
383    // First, we provide the in-memory schema so that core attrs are indexed correctly.
384    let schema = match Schema::new() {
385        Ok(s) => s,
386        Err(e) => {
387            error!("Failed to setup in memory schema: {:?}", e);
388            std::process::exit(1);
389        }
390    };
391
392    let be = match setup_backend(config, &schema) {
393        Ok(be) => be,
394        Err(e) => {
395            error!("Failed to setup backend: {:?}", e);
396            return;
397        }
398    };
399
400    let mut be_wr_txn = match be.write() {
401        Ok(txn) => txn,
402        Err(err) => {
403            error!(
404                ?err,
405                "Unable to proceed, backend write transaction failure."
406            );
407            return;
408        }
409    };
410    let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
411
412    if r.is_err() {
413        error!("Failed to restore database: {:?}", r);
414        std::process::exit(1);
415    }
416    info!("Database loaded successfully");
417
418    reindex_inner(be, schema, config).await;
419
420    info!("✅ Restore Success!");
421}
422
423pub async fn reindex_server_core(config: &Configuration) {
424    info!("Start Index Phase 1 ...");
425    // First, we provide the in-memory schema so that core attrs are indexed correctly.
426    let schema = match Schema::new() {
427        Ok(s) => s,
428        Err(e) => {
429            error!("Failed to setup in memory schema: {:?}", e);
430            std::process::exit(1);
431        }
432    };
433
434    let be = match setup_backend(config, &schema) {
435        Ok(be) => be,
436        Err(e) => {
437            error!("Failed to setup BE: {:?}", e);
438            return;
439        }
440    };
441
442    reindex_inner(be, schema, config).await;
443
444    info!("✅ Reindex Success!");
445}
446
447async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
448    // Reindex only the core schema attributes to bootstrap the process.
449    let mut be_wr_txn = match be.write() {
450        Ok(txn) => txn,
451        Err(err) => {
452            error!(
453                ?err,
454                "Unable to proceed, backend write transaction failure."
455            );
456            return;
457        }
458    };
459
460    let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
461
462    // Now that's done, setup a minimal qs and reindex from that.
463    if r.is_err() {
464        error!("Failed to reindex database: {:?}", r);
465        std::process::exit(1);
466    }
467    info!("Index Phase 1 Success!");
468
469    info!("Attempting to init query server ...");
470
471    let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
472        Ok(t) => t,
473        Err(e) => {
474            error!("Unable to setup query server or idm server -> {:?}", e);
475            return;
476        }
477    };
478    info!("Init Query Server Success!");
479
480    info!("Start Index Phase 2 ...");
481
482    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
483        error!("Unable to acquire write transaction");
484        return;
485    };
486    let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
487
488    match r {
489        Ok(_) => info!("Index Phase 2 Success!"),
490        Err(e) => {
491            error!("Reindex failed: {:?}", e);
492            std::process::exit(1);
493        }
494    };
495}
496
497pub fn vacuum_server_core(config: &Configuration) {
498    let schema = match Schema::new() {
499        Ok(s) => s,
500        Err(e) => {
501            eprintln!("Failed to setup in memory schema: {:?}", e);
502            std::process::exit(1);
503        }
504    };
505
506    // The schema doesn't matter here. Vacuum is run as part of db open to avoid
507    // locking.
508    let r = setup_backend_vacuum(config, &schema, true);
509
510    match r {
511        Ok(_) => eprintln!("Vacuum Success!"),
512        Err(e) => {
513            eprintln!("Vacuum failed: {:?}", e);
514            std::process::exit(1);
515        }
516    };
517}
518
519pub async fn domain_rename_core(config: &Configuration) {
520    let schema = match Schema::new() {
521        Ok(s) => s,
522        Err(e) => {
523            eprintln!("Failed to setup in memory schema: {:?}", e);
524            std::process::exit(1);
525        }
526    };
527
528    // Start the backend.
529    let be = match setup_backend(config, &schema) {
530        Ok(be) => be,
531        Err(e) => {
532            error!("Failed to setup BE: {:?}", e);
533            return;
534        }
535    };
536
537    // Setup the qs, and perform any migrations and changes we may have.
538    let qs = match setup_qs(be, schema, config).await {
539        Ok(t) => t,
540        Err(e) => {
541            error!("Unable to setup query server -> {:?}", e);
542            return;
543        }
544    };
545
546    let new_domain_name = config.domain.as_str();
547
548    // make sure we're actually changing the domain name...
549    match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
550        Ok(old_domain_name) => {
551            admin_info!(?old_domain_name, ?new_domain_name);
552            if old_domain_name == new_domain_name {
553                admin_info!("Domain name not changing, stopping.");
554                return;
555            }
556            admin_debug!(
557                "Domain name is changing from {:?} to {:?}",
558                old_domain_name,
559                new_domain_name
560            );
561        }
562        Err(e) => {
563            admin_error!("Failed to query domain name, quitting! -> {:?}", e);
564            return;
565        }
566    }
567
568    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
569        error!("Unable to acquire write transaction");
570        return;
571    };
572    let r = qs_write
573        .danger_domain_rename(new_domain_name)
574        .and_then(|_| qs_write.commit());
575
576    match r {
577        Ok(_) => info!("Domain Rename Success!"),
578        Err(e) => {
579            error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
580            std::process::exit(1);
581        }
582    };
583}
584
585pub async fn verify_server_core(config: &Configuration) {
586    let curtime = duration_from_epoch_now();
587    // setup the qs - without initialise!
588    let schema_mem = match Schema::new() {
589        Ok(sc) => sc,
590        Err(e) => {
591            error!("Failed to setup in memory schema: {:?}", e);
592            return;
593        }
594    };
595    // Setup the be
596    let be = match setup_backend(config, &schema_mem) {
597        Ok(be) => be,
598        Err(e) => {
599            error!("Failed to setup BE: {:?}", e);
600            return;
601        }
602    };
603
604    let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
605        Ok(qs) => qs,
606        Err(err) => {
607            error!(?err, "Failed to setup query server");
608            return;
609        }
610    };
611
612    // Run verifications.
613    let r = server.verify().await;
614
615    if r.is_empty() {
616        eprintln!("Verification passed!");
617        std::process::exit(0);
618    } else {
619        for er in r {
620            error!("{:?}", er);
621        }
622        std::process::exit(1);
623    }
624
625    // Now add IDM server verifications?
626}
627
628pub fn cert_generate_core(config: &Configuration) {
629    // Get the cert root
630
631    let (tls_key_path, tls_chain_path) = match &config.tls_config {
632        Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
633        None => {
634            error!("Unable to find TLS configuration");
635            std::process::exit(1);
636        }
637    };
638
639    if tls_key_path.exists() && tls_chain_path.exists() {
640        info!(
641            "TLS key and chain already exist - remove them first if you intend to regenerate these"
642        );
643        return;
644    }
645
646    let origin_domain = match config.origin.domain() {
647        Some(val) => val,
648        None => {
649            error!("origin does not contain a valid domain");
650            std::process::exit(1);
651        }
652    };
653
654    let cert_root = match tls_key_path.parent() {
655        Some(parent) => parent,
656        None => {
657            error!("Unable to find parent directory of {:?}", tls_key_path);
658            std::process::exit(1);
659        }
660    };
661
662    let ca_cert = cert_root.join("ca.pem");
663    let ca_key = cert_root.join("cakey.pem");
664    let tls_cert_path = cert_root.join("cert.pem");
665
666    let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
667        // Generate the CA again.
668        let ca_handle = match crypto::build_ca(None) {
669            Ok(ca_handle) => ca_handle,
670            Err(e) => {
671                error!(err = ?e, "Failed to build CA");
672                std::process::exit(1);
673            }
674        };
675
676        if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
677            error!("Failed to write CA");
678            std::process::exit(1);
679        }
680
681        ca_handle
682    } else {
683        match crypto::load_ca(ca_key, ca_cert) {
684            Ok(ca_handle) => ca_handle,
685            Err(_) => {
686                error!("Failed to load CA");
687                std::process::exit(1);
688            }
689        }
690    };
691
692    if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
693        // Generate the cert from the ca.
694        let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
695            Ok(cert_handle) => cert_handle,
696            Err(e) => {
697                error!(err = ?e, "Failed to build certificate");
698                std::process::exit(1);
699            }
700        };
701
702        if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
703            error!("Failed to write certificates");
704            std::process::exit(1);
705        }
706    }
707    info!("certificate generation complete");
708}
709
710#[derive(Clone, Debug)]
711pub enum CoreAction {
712    Shutdown,
713}
714
715pub(crate) enum TaskName {
716    AdminSocket,
717    AuditdActor,
718    BackupActor,
719    DelayedActionActor,
720    HttpsServer,
721    IntervalActor,
722    LdapActor,
723    Replication,
724    TlsAcceptorReload,
725}
726
727impl Display for TaskName {
728    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
729        write!(
730            f,
731            "{}",
732            match self {
733                TaskName::AdminSocket => "Admin Socket",
734                TaskName::AuditdActor => "Auditd Actor",
735                TaskName::BackupActor => "Backup Actor",
736                TaskName::DelayedActionActor => "Delayed Action Actor",
737                TaskName::HttpsServer => "HTTPS Server",
738                TaskName::IntervalActor => "Interval Actor",
739                TaskName::LdapActor => "LDAP Acceptor Actor",
740                TaskName::Replication => "Replication",
741                TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
742            }
743        )
744    }
745}
746
747pub struct CoreHandle {
748    clean_shutdown: bool,
749    tx: broadcast::Sender<CoreAction>,
750    tls_acceptor_reload_notify: Arc<Notify>,
751    /// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end.
752    handles: Vec<(TaskName, task::JoinHandle<()>)>,
753}
754
755impl CoreHandle {
756    pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
757        self.tx.subscribe()
758    }
759
760    pub async fn shutdown(&mut self) {
761        if self.tx.send(CoreAction::Shutdown).is_err() {
762            eprintln!("No receivers acked shutdown request. Treating as unclean.");
763            return;
764        }
765
766        // Wait on the handles.
767        while let Some((handle_name, handle)) = self.handles.pop() {
768            if let Err(error) = handle.await {
769                eprintln!("Task {} failed to finish: {:?}", handle_name, error);
770            }
771        }
772
773        self.clean_shutdown = true;
774    }
775
776    pub async fn tls_acceptor_reload(&mut self) {
777        self.tls_acceptor_reload_notify.notify_one()
778    }
779}
780
781impl Drop for CoreHandle {
782    fn drop(&mut self) {
783        if !self.clean_shutdown {
784            eprintln!("⚠️  UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
785        }
786        // Can't enable yet until we clean up unix_int cache layer test
787        // debug_assert!(self.clean_shutdown);
788    }
789}
790
791pub async fn create_server_core(
792    config: Configuration,
793    config_test: bool,
794) -> Result<CoreHandle, ()> {
795    // Until this point, we probably want to write to the log macro fns.
796    let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
797
798    if config.integration_test_config.is_some() {
799        warn!("RUNNING IN INTEGRATION TEST MODE.");
800        warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
801    } else if config.tls_config.is_none() {
802        // TLS is great! We won't run without it.
803        error!("Running without TLS is not supported! Quitting!");
804        return Err(());
805    }
806
807    info!(
808        "Starting kanidm with {}configuration: {}",
809        if config_test { "TEST " } else { "" },
810        config
811    );
812    // Setup umask, so that every we touch or create is secure.
813    #[cfg(not(target_family = "windows"))]
814    unsafe {
815        umask(0o0027)
816    };
817
818    // Similar, create a stats task which aggregates statistics from the
819    // server as they come in.
820    let status_ref = StatusActor::start();
821
822    // Setup TLS (if any)
823    let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
824        Ok(tls_acc) => tls_acc,
825        Err(err) => {
826            error!(?err, "Failed to configure TLS acceptor");
827            return Err(());
828        }
829    };
830
831    let schema = match Schema::new() {
832        Ok(s) => s,
833        Err(e) => {
834            error!("Failed to setup in memory schema: {:?}", e);
835            return Err(());
836        }
837    };
838
839    // Setup the be for the qs.
840    let be = match setup_backend(&config, &schema) {
841        Ok(be) => be,
842        Err(e) => {
843            error!("Failed to setup BE -> {:?}", e);
844            return Err(());
845        }
846    };
847    // Start the IDM server.
848    let (_qs, idms, mut idms_delayed, mut idms_audit) =
849        match setup_qs_idms(be, schema, &config).await {
850            Ok(t) => t,
851            Err(e) => {
852                error!("Unable to setup query server or idm server -> {:?}", e);
853                return Err(());
854            }
855        };
856
857    // Extract any configuration from the IDMS that we may need.
858    // For now we just do this per run, but we need to extract this from the db later.
859    let jws_signer = match JwsHs256Signer::generate_hs256() {
860        Ok(k) => k.set_sign_option_embed_kid(false),
861        Err(e) => {
862            error!("Unable to setup jws signer -> {:?}", e);
863            return Err(());
864        }
865    };
866
867    // Any pre-start tasks here.
868    if let Some(itc) = &config.integration_test_config {
869        let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
870            error!("Unable to acquire write transaction");
871            return Err(());
872        };
873        // We need to set the admin pw.
874        match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
875            Ok(_) => {}
876            Err(e) => {
877                error!(
878                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
879                    &itc.admin_user, e
880                );
881                return Err(());
882            }
883        };
884        // set the idm_admin account password
885        match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
886            Ok(_) => {}
887            Err(e) => {
888                error!(
889                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
890                    &itc.idm_admin_user, e
891                );
892                return Err(());
893            }
894        };
895
896        // Add admin to idm_admins to allow tests more flexibility wrt to permissions.
897        // This way our default access controls can be stricter to prevent lateral
898        // movement.
899        match idms_prox_write.qs_write.internal_modify_uuid(
900            UUID_IDM_ADMINS,
901            &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
902        ) {
903            Ok(_) => {}
904            Err(e) => {
905                error!(
906                    "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
907                    e
908                );
909                return Err(());
910            }
911        };
912
913        match idms_prox_write.qs_write.internal_modify_uuid(
914            UUID_IDM_ALL_PERSONS,
915            &ModifyList::new_purge_and_set(
916                Attribute::CredentialTypeMinimum,
917                CredentialType::Any.into(),
918            ),
919        ) {
920            Ok(_) => {}
921            Err(e) => {
922                error!(
923                    "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
924                    e
925                );
926                return Err(());
927            }
928        };
929
930        match idms_prox_write.commit() {
931            Ok(_) => {}
932            Err(e) => {
933                error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
934                return Err(());
935            }
936        }
937    }
938
939    let ldap = match LdapServer::new(&idms).await {
940        Ok(l) => l,
941        Err(e) => {
942            error!("Unable to start LdapServer -> {:?}", e);
943            return Err(());
944        }
945    };
946
947    // Arc the idms and ldap
948    let idms_arc = Arc::new(idms);
949    let ldap_arc = Arc::new(ldap);
950
951    // Pass it to the actor for threading.
952    // Start the read query server with the given be path: future config
953    let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
954
955    // Create the server async write entry point.
956    let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
957
958    let delayed_handle = task::spawn(async move {
959        let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
960        loop {
961            tokio::select! {
962                added = idms_delayed.recv_many(&mut buffer) => {
963                    if added == 0 {
964                        // Channel has closed, stop the task.
965                        break
966                    }
967                    server_write_ref.handle_delayedaction(&mut buffer).await;
968                }
969                Ok(action) = broadcast_rx.recv() => {
970                    match action {
971                        CoreAction::Shutdown => break,
972                    }
973                }
974            }
975        }
976        info!("Stopped {}", TaskName::DelayedActionActor);
977    });
978
979    let mut broadcast_rx = broadcast_tx.subscribe();
980
981    let auditd_handle = task::spawn(async move {
982        loop {
983            tokio::select! {
984                Ok(action) = broadcast_rx.recv() => {
985                    match action {
986                        CoreAction::Shutdown => break,
987                    }
988                }
989                audit_event = idms_audit.audit_rx().recv() => {
990                    match serde_json::to_string(&audit_event) {
991                        Ok(audit_event) => {
992                            warn!(%audit_event);
993                        }
994                        Err(e) => {
995                            error!(err=?e, "Unable to process audit event to json.");
996                            warn!(?audit_event, json=false);
997                        }
998                    }
999
1000                }
1001            }
1002        }
1003        info!("Stopped {}", TaskName::AuditdActor);
1004    });
1005
1006    // Setup a TLS Acceptor Reload trigger.
1007
1008    let mut broadcast_rx = broadcast_tx.subscribe();
1009    let tls_acceptor_reload_notify = Arc::new(Notify::new());
1010    let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1011    let tls_config = config.tls_config.clone();
1012
1013    let ldap_configured = config.ldapbindaddress.is_some();
1014    let (ldap_tls_acceptor_reload_tx, ldap_tls_acceptor_reload_rx) = mpsc::channel(1);
1015    let (http_tls_acceptor_reload_tx, http_tls_acceptor_reload_rx) = mpsc::channel(1);
1016
1017    let tls_acceptor_reload_handle = task::spawn(async move {
1018        loop {
1019            tokio::select! {
1020                Ok(action) = broadcast_rx.recv() => {
1021                    match action {
1022                        CoreAction::Shutdown => break,
1023                    }
1024                }
1025                _ = tls_accepter_reload_task_notify.notified() => {
1026                    let tls_acceptor = match crypto::setup_tls(&tls_config) {
1027                        Ok(Some(tls_acc)) => tls_acc,
1028                        Ok(None) => {
1029                            warn!("TLS not configured, ignoring reload request.");
1030                            continue;
1031                        }
1032                        Err(err) => {
1033                            error!(?err, "Failed to configure and reload TLS acceptor");
1034                            continue;
1035                        }
1036                    };
1037
1038                    // We don't log here as the receivers will notify when they have completed
1039                    // the reload.
1040                    if ldap_configured &&
1041                        ldap_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1042                            error!("ldap tls acceptor did not accept the reload, the server may have failed!");
1043                        };
1044                    if http_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1045                        error!("http tls acceptor did not accept the reload, the server may have failed!");
1046                        break;
1047                    };
1048                }
1049            }
1050        }
1051        info!("Stopped {}", TaskName::TlsAcceptorReload);
1052    });
1053
1054    // Setup timed events associated to the write thread
1055    let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1056    // Setup timed events associated to the read thread
1057    let maybe_backup_handle = match &config.online_backup {
1058        Some(online_backup_config) => {
1059            if online_backup_config.enabled {
1060                let handle = IntervalActor::start_online_backup(
1061                    server_read_ref,
1062                    online_backup_config,
1063                    broadcast_tx.subscribe(),
1064                )?;
1065                Some(handle)
1066            } else {
1067                debug!("Backups disabled");
1068                None
1069            }
1070        }
1071        None => {
1072            debug!("Online backup not requested, skipping");
1073            None
1074        }
1075    };
1076
1077    // If we have been requested to init LDAP, configure it now.
1078    let maybe_ldap_acceptor_handle = match &config.ldapbindaddress {
1079        Some(la) => {
1080            let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1081
1082            let h = ldaps::create_ldap_server(
1083                la.as_str(),
1084                opt_ldap_ssl_acceptor,
1085                server_read_ref,
1086                broadcast_tx.subscribe(),
1087                ldap_tls_acceptor_reload_rx,
1088                config.ldap_client_address_info.trusted_proxy_v2(),
1089            )
1090            .await?;
1091            Some(h)
1092        }
1093        None => {
1094            debug!("LDAP not requested, skipping");
1095            None
1096        }
1097    };
1098
1099    // If we have replication configured, setup the listener with its initial replication
1100    // map (if any).
1101    let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1102        Some(rc) => {
1103            if !config_test {
1104                // ⚠️  only start the sockets and listeners in non-config-test modes.
1105                let (h, repl_ctrl_tx) =
1106                    repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1107                        .await?;
1108                (Some(h), Some(repl_ctrl_tx))
1109            } else {
1110                (None, None)
1111            }
1112        }
1113        None => {
1114            debug!("Replication not requested, skipping");
1115            (None, None)
1116        }
1117    };
1118
1119    let maybe_http_acceptor_handle = if config_test {
1120        admin_info!("This config rocks! 🪨 ");
1121        None
1122    } else {
1123        let h: task::JoinHandle<()> = match https::create_https_server(
1124            config.clone(),
1125            jws_signer,
1126            status_ref,
1127            server_write_ref,
1128            server_read_ref,
1129            broadcast_tx.clone(),
1130            maybe_tls_acceptor,
1131            http_tls_acceptor_reload_rx,
1132        )
1133        .await
1134        {
1135            Ok(h) => h,
1136            Err(e) => {
1137                error!("Failed to start HTTPS server -> {:?}", e);
1138                return Err(());
1139            }
1140        };
1141        if config.role != ServerRole::WriteReplicaNoUI {
1142            admin_info!("ready to rock! 🪨  UI available at: {}", config.origin);
1143        } else {
1144            admin_info!("ready to rock! 🪨 ");
1145        }
1146        Some(h)
1147    };
1148
1149    // If we are NOT in integration test mode, start the admin socket now
1150    let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1151        let broadcast_rx = broadcast_tx.subscribe();
1152
1153        let admin_handle = AdminActor::create_admin_sock(
1154            config.adminbindpath.as_str(),
1155            server_write_ref,
1156            server_read_ref,
1157            broadcast_rx,
1158            maybe_repl_ctrl_tx,
1159        )
1160        .await?;
1161
1162        Some(admin_handle)
1163    } else {
1164        None
1165    };
1166
1167    let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1168        (TaskName::IntervalActor, interval_handle),
1169        (TaskName::DelayedActionActor, delayed_handle),
1170        (TaskName::AuditdActor, auditd_handle),
1171        (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1172    ];
1173
1174    if let Some(backup_handle) = maybe_backup_handle {
1175        handles.push((TaskName::BackupActor, backup_handle))
1176    }
1177
1178    if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1179        handles.push((TaskName::AdminSocket, admin_sock_handle))
1180    }
1181
1182    if let Some(ldap_handle) = maybe_ldap_acceptor_handle {
1183        handles.push((TaskName::LdapActor, ldap_handle))
1184    }
1185
1186    if let Some(http_handle) = maybe_http_acceptor_handle {
1187        handles.push((TaskName::HttpsServer, http_handle))
1188    }
1189
1190    if let Some(repl_handle) = maybe_repl_handle {
1191        handles.push((TaskName::Replication, repl_handle))
1192    }
1193
1194    Ok(CoreHandle {
1195        clean_shutdown: false,
1196        tls_acceptor_reload_notify,
1197        tx: broadcast_tx,
1198        handles,
1199    })
1200}