kanidmd_core/
lib.rs

1//! These contain the server "cores". These are able to startup the server
2//! (bootstrap) to a running state and then execute tasks. This is where modules
3//! are logically ordered based on their depenedncies for execution. Some of these
4//! are task-only i.e. reindexing, and some of these launch the server into a
5//! fully operational state (https, ldap, etc).
6//!
7//! Generally, this is the "entry point" where the server begins to run, and
8//! the entry point for all client traffic which is then directed to the
9//! various `actors`.
10
11#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23
24#[macro_use]
25extern crate tracing;
26#[macro_use]
27extern crate kanidmd_lib;
28
29mod actors;
30pub mod admin;
31pub mod config;
32mod crypto;
33mod https;
34mod interval;
35mod ldaps;
36mod repl;
37mod tcp;
38mod utils;
39
40use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
41use crate::admin::AdminActor;
42use crate::config::Configuration;
43use crate::interval::IntervalActor;
44use crate::utils::touch_file_or_quit;
45use compact_jwt::{JwsHs256Signer, JwsSigner};
46use kanidm_proto::backup::BackupCompression;
47use kanidm_proto::config::ServerRole;
48use kanidm_proto::internal::OperationError;
49use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
50use kanidmd_lib::idm::ldap::LdapServer;
51use kanidmd_lib::prelude::*;
52use kanidmd_lib::schema::Schema;
53use kanidmd_lib::status::StatusActor;
54use kanidmd_lib::value::CredentialType;
55#[cfg(not(target_family = "windows"))]
56use libc::umask;
57use std::fmt::{Display, Formatter};
58use std::path::Path;
59use std::sync::Arc;
60use tokio::sync::broadcast;
61use tokio::sync::Notify;
62use tokio::task;
63
64// === internal setup helpers
65
66fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
67    setup_backend_vacuum(config, schema, false)
68}
69
70fn setup_backend_vacuum(
71    config: &Configuration,
72    schema: &Schema,
73    vacuum: bool,
74) -> Result<Backend, OperationError> {
75    // Limit the scope of the schema txn.
76    // let schema_txn = task::block_on(schema.write());
77    let schema_txn = schema.write();
78    let idxmeta = schema_txn.reload_idxmeta();
79
80    let pool_size: u32 = config.threads as u32;
81
82    let cfg = BackendConfig::new(
83        config.db_path.as_deref(),
84        pool_size,
85        config.db_fs_type.unwrap_or_default(),
86        config.db_arc_size,
87    );
88
89    Backend::new(cfg, idxmeta, vacuum)
90}
91
92// TODO #54: We could move most of the be/schema/qs setup and startup
93// outside of this call, then pass in "what we need" in a cloneable
94// form, this way we could have separate Idm vs Qs threads, and dedicated
95// threads for write vs read
96async fn setup_qs_idms(
97    be: Backend,
98    schema: Schema,
99    config: &Configuration,
100) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
101    let curtime = duration_from_epoch_now();
102    // Create a query_server implementation
103    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
104
105    // TODO #62: Should the IDM parts be broken out to the IdmServer?
106    // What's important about this initial setup here is that it also triggers
107    // the schema and acp reload, so they are now configured correctly!
108    // Initialise the schema core.
109    //
110    // Now search for the schema itself, and validate that the system
111    // in memory matches the BE on disk, and that it's syntactically correct.
112    // Write it out if changes are needed.
113    query_server
114        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
115        .await?;
116
117    // We generate a SINGLE idms only!
118    let is_integration_test = config.integration_test_config.is_some();
119    let (idms, idms_delayed, idms_audit) = IdmServer::new(
120        query_server.clone(),
121        &config.origin,
122        is_integration_test,
123        curtime,
124    )
125    .await?;
126
127    Ok((query_server, idms, idms_delayed, idms_audit))
128}
129
130async fn setup_qs(
131    be: Backend,
132    schema: Schema,
133    config: &Configuration,
134) -> Result<QueryServer, OperationError> {
135    let curtime = duration_from_epoch_now();
136    // Create a query_server implementation
137    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
138
139    // TODO #62: Should the IDM parts be broken out to the IdmServer?
140    // What's important about this initial setup here is that it also triggers
141    // the schema and acp reload, so they are now configured correctly!
142    // Initialise the schema core.
143    //
144    // Now search for the schema itself, and validate that the system
145    // in memory matches the BE on disk, and that it's syntactically correct.
146    // Write it out if changes are needed.
147    query_server
148        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
149        .await?;
150
151    Ok(query_server)
152}
153
154macro_rules! dbscan_setup_be {
155    (
156        $config:expr
157    ) => {{
158        let schema = match Schema::new() {
159            Ok(s) => s,
160            Err(e) => {
161                error!("Failed to setup in memory schema: {:?}", e);
162                std::process::exit(1);
163            }
164        };
165
166        match setup_backend($config, &schema) {
167            Ok(be) => be,
168            Err(e) => {
169                error!("Failed to setup BE: {:?}", e);
170                return;
171            }
172        }
173    }};
174}
175
176pub fn dbscan_list_indexes_core(config: &Configuration) {
177    let be = dbscan_setup_be!(config);
178    let mut be_rotxn = match be.read() {
179        Ok(txn) => txn,
180        Err(err) => {
181            error!(?err, "Unable to proceed, backend read transaction failure.");
182            return;
183        }
184    };
185
186    match be_rotxn.list_indexes() {
187        Ok(mut idx_list) => {
188            idx_list.sort_unstable();
189            idx_list.iter().for_each(|idx_name| {
190                println!("{idx_name}");
191            })
192        }
193        Err(e) => {
194            error!("Failed to retrieve index list: {:?}", e);
195        }
196    };
197}
198
199pub fn dbscan_list_id2entry_core(config: &Configuration) {
200    let be = dbscan_setup_be!(config);
201    let mut be_rotxn = match be.read() {
202        Ok(txn) => txn,
203        Err(err) => {
204            error!(?err, "Unable to proceed, backend read transaction failure.");
205            return;
206        }
207    };
208
209    match be_rotxn.list_id2entry() {
210        Ok(mut id_list) => {
211            id_list.sort_unstable_by_key(|k| k.0);
212            id_list.iter().for_each(|(id, value)| {
213                println!("{id:>8}: {value}");
214            })
215        }
216        Err(e) => {
217            error!("Failed to retrieve id2entry list: {:?}", e);
218        }
219    };
220}
221
222pub fn dbscan_list_index_analysis_core(config: &Configuration) {
223    let _be = dbscan_setup_be!(config);
224    // TBD in after slopes merge.
225}
226
227pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
228    let be = dbscan_setup_be!(config);
229    let mut be_rotxn = match be.read() {
230        Ok(txn) => txn,
231        Err(err) => {
232            error!(?err, "Unable to proceed, backend read transaction failure.");
233            return;
234        }
235    };
236
237    match be_rotxn.list_index_content(index_name) {
238        Ok(mut idx_list) => {
239            idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
240            idx_list.iter().for_each(|(key, value)| {
241                println!("{key:>50}: {value:?}");
242            })
243        }
244        Err(e) => {
245            error!("Failed to retrieve index list: {:?}", e);
246        }
247    };
248}
249
250pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
251    let be = dbscan_setup_be!(config);
252    let mut be_rotxn = match be.read() {
253        Ok(txn) => txn,
254        Err(err) => {
255            error!(?err, "Unable to proceed, backend read transaction failure.");
256            return;
257        }
258    };
259
260    match be_rotxn.get_id2entry(id) {
261        Ok((id, value)) => println!("{id:>8}: {value}"),
262        Err(e) => {
263            error!("Failed to retrieve id2entry value: {:?}", e);
264        }
265    };
266}
267
268pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
269    let be = dbscan_setup_be!(config);
270    let mut be_wrtxn = match be.write() {
271        Ok(txn) => txn,
272        Err(err) => {
273            error!(
274                ?err,
275                "Unable to proceed, backend write transaction failure."
276            );
277            return;
278        }
279    };
280
281    match be_wrtxn
282        .quarantine_entry(id)
283        .and_then(|_| be_wrtxn.commit())
284    {
285        Ok(()) => {
286            println!("quarantined - {id:>8}")
287        }
288        Err(e) => {
289            error!("Failed to quarantine id2entry value: {:?}", e);
290        }
291    };
292}
293
294pub fn dbscan_list_quarantined_core(config: &Configuration) {
295    let be = dbscan_setup_be!(config);
296    let mut be_rotxn = match be.read() {
297        Ok(txn) => txn,
298        Err(err) => {
299            error!(?err, "Unable to proceed, backend read transaction failure.");
300            return;
301        }
302    };
303
304    match be_rotxn.list_quarantined() {
305        Ok(mut id_list) => {
306            id_list.sort_unstable_by_key(|k| k.0);
307            id_list.iter().for_each(|(id, value)| {
308                println!("{id:>8}: {value}");
309            })
310        }
311        Err(e) => {
312            error!("Failed to retrieve id2entry list: {:?}", e);
313        }
314    };
315}
316
317pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
318    let be = dbscan_setup_be!(config);
319    let mut be_wrtxn = match be.write() {
320        Ok(txn) => txn,
321        Err(err) => {
322            error!(
323                ?err,
324                "Unable to proceed, backend write transaction failure."
325            );
326            return;
327        }
328    };
329
330    match be_wrtxn
331        .restore_quarantined(id)
332        .and_then(|_| be_wrtxn.commit())
333    {
334        Ok(()) => {
335            println!("restored - {id:>8}")
336        }
337        Err(e) => {
338            error!("Failed to restore quarantined id2entry value: {:?}", e);
339        }
340    };
341}
342
343pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
344    let schema = match Schema::new() {
345        Ok(s) => s,
346        Err(e) => {
347            error!("Failed to setup in memory schema: {:?}", e);
348            std::process::exit(1);
349        }
350    };
351
352    let be = match setup_backend(config, &schema) {
353        Ok(be) => be,
354        Err(e) => {
355            error!("Failed to setup BE: {:?}", e);
356            return;
357        }
358    };
359
360    let mut be_ro_txn = match be.read() {
361        Ok(txn) => txn,
362        Err(err) => {
363            error!(?err, "Unable to proceed, backend read transaction failure.");
364            return;
365        }
366    };
367
368    let compression = match config.online_backup.as_ref() {
369        Some(backup_config) => backup_config.compression,
370        None => BackupCompression::default(),
371    };
372
373    match be_ro_txn.backup(dst_path, compression) {
374        Ok(_) => info!("Backup success!"),
375        Err(e) => {
376            error!("Backup failed: {:?}", e);
377            std::process::exit(1);
378        }
379    };
380    // Let the txn abort, even on success.
381}
382
383pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
384    // If it's an in memory database, we don't need to touch anything
385    if let Some(db_path) = config.db_path.as_ref() {
386        touch_file_or_quit(db_path);
387    }
388
389    // First, we provide the in-memory schema so that core attrs are indexed correctly.
390    let schema = match Schema::new() {
391        Ok(s) => s,
392        Err(e) => {
393            error!("Failed to setup in memory schema: {:?}", e);
394            std::process::exit(1);
395        }
396    };
397
398    let be = match setup_backend(config, &schema) {
399        Ok(be) => be,
400        Err(e) => {
401            error!("Failed to setup backend: {:?}", e);
402            return;
403        }
404    };
405
406    let mut be_wr_txn = match be.write() {
407        Ok(txn) => txn,
408        Err(err) => {
409            error!(
410                ?err,
411                "Unable to proceed, backend write transaction failure."
412            );
413            return;
414        }
415    };
416    let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
417
418    if r.is_err() {
419        error!("Failed to restore database: {:?}", r);
420        std::process::exit(1);
421    }
422    info!("Database loaded successfully");
423
424    reindex_inner(be, schema, config).await;
425
426    info!("✅ Restore Success!");
427}
428
429pub async fn reindex_server_core(config: &Configuration) {
430    info!("Start Index Phase 1 ...");
431    // First, we provide the in-memory schema so that core attrs are indexed correctly.
432    let schema = match Schema::new() {
433        Ok(s) => s,
434        Err(e) => {
435            error!("Failed to setup in memory schema: {:?}", e);
436            std::process::exit(1);
437        }
438    };
439
440    let be = match setup_backend(config, &schema) {
441        Ok(be) => be,
442        Err(e) => {
443            error!("Failed to setup BE: {:?}", e);
444            return;
445        }
446    };
447
448    reindex_inner(be, schema, config).await;
449
450    info!("✅ Reindex Success!");
451}
452
453async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
454    // Reindex only the core schema attributes to bootstrap the process.
455    let mut be_wr_txn = match be.write() {
456        Ok(txn) => txn,
457        Err(err) => {
458            error!(
459                ?err,
460                "Unable to proceed, backend write transaction failure."
461            );
462            return;
463        }
464    };
465
466    let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
467
468    // Now that's done, setup a minimal qs and reindex from that.
469    if r.is_err() {
470        error!("Failed to reindex database: {:?}", r);
471        std::process::exit(1);
472    }
473    info!("Index Phase 1 Success!");
474
475    info!("Attempting to init query server ...");
476
477    let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
478        Ok(t) => t,
479        Err(e) => {
480            error!("Unable to setup query server or idm server -> {:?}", e);
481            return;
482        }
483    };
484    info!("Init Query Server Success!");
485
486    info!("Start Index Phase 2 ...");
487
488    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
489        error!("Unable to acquire write transaction");
490        return;
491    };
492    let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
493
494    match r {
495        Ok(_) => info!("Index Phase 2 Success!"),
496        Err(e) => {
497            error!("Reindex failed: {:?}", e);
498            std::process::exit(1);
499        }
500    };
501}
502
503pub fn vacuum_server_core(config: &Configuration) {
504    let schema = match Schema::new() {
505        Ok(s) => s,
506        Err(e) => {
507            eprintln!("Failed to setup in memory schema: {e:?}");
508            std::process::exit(1);
509        }
510    };
511
512    // The schema doesn't matter here. Vacuum is run as part of db open to avoid
513    // locking.
514    let r = setup_backend_vacuum(config, &schema, true);
515
516    match r {
517        Ok(_) => eprintln!("Vacuum Success!"),
518        Err(e) => {
519            eprintln!("Vacuum failed: {e:?}");
520            std::process::exit(1);
521        }
522    };
523}
524
525pub async fn domain_rename_core(config: &Configuration) {
526    let schema = match Schema::new() {
527        Ok(s) => s,
528        Err(e) => {
529            eprintln!("Failed to setup in memory schema: {e:?}");
530            std::process::exit(1);
531        }
532    };
533
534    // Start the backend.
535    let be = match setup_backend(config, &schema) {
536        Ok(be) => be,
537        Err(e) => {
538            error!("Failed to setup BE: {:?}", e);
539            return;
540        }
541    };
542
543    // Setup the qs, and perform any migrations and changes we may have.
544    let qs = match setup_qs(be, schema, config).await {
545        Ok(t) => t,
546        Err(e) => {
547            error!("Unable to setup query server -> {:?}", e);
548            return;
549        }
550    };
551
552    let new_domain_name = config.domain.as_str();
553
554    // make sure we're actually changing the domain name...
555    match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
556        Ok(old_domain_name) => {
557            admin_info!(?old_domain_name, ?new_domain_name);
558            if old_domain_name == new_domain_name {
559                admin_info!("Domain name not changing, stopping.");
560                return;
561            }
562            admin_debug!(
563                "Domain name is changing from {:?} to {:?}",
564                old_domain_name,
565                new_domain_name
566            );
567        }
568        Err(e) => {
569            admin_error!("Failed to query domain name, quitting! -> {:?}", e);
570            return;
571        }
572    }
573
574    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
575        error!("Unable to acquire write transaction");
576        return;
577    };
578    let r = qs_write
579        .danger_domain_rename(new_domain_name)
580        .and_then(|_| qs_write.commit());
581
582    match r {
583        Ok(_) => info!("Domain Rename Success!"),
584        Err(e) => {
585            error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
586            std::process::exit(1);
587        }
588    };
589}
590
591pub async fn verify_server_core(config: &Configuration) {
592    let curtime = duration_from_epoch_now();
593    // setup the qs - without initialise!
594    let schema_mem = match Schema::new() {
595        Ok(sc) => sc,
596        Err(e) => {
597            error!("Failed to setup in memory schema: {:?}", e);
598            return;
599        }
600    };
601    // Setup the be
602    let be = match setup_backend(config, &schema_mem) {
603        Ok(be) => be,
604        Err(e) => {
605            error!("Failed to setup BE: {:?}", e);
606            return;
607        }
608    };
609
610    let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
611        Ok(qs) => qs,
612        Err(err) => {
613            error!(?err, "Failed to setup query server");
614            return;
615        }
616    };
617
618    // Run verifications.
619    let r = server.verify().await;
620
621    if r.is_empty() {
622        eprintln!("Verification passed!");
623        std::process::exit(0);
624    } else {
625        for er in r {
626            error!("{:?}", er);
627        }
628        std::process::exit(1);
629    }
630
631    // Now add IDM server verifications?
632}
633
634pub fn cert_generate_core(config: &Configuration) {
635    // Get the cert root
636
637    let (tls_key_path, tls_chain_path) = match &config.tls_config {
638        Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
639        None => {
640            error!("Unable to find TLS configuration");
641            std::process::exit(1);
642        }
643    };
644
645    if tls_key_path.exists() && tls_chain_path.exists() {
646        info!(
647            "TLS key and chain already exist - remove them first if you intend to regenerate these"
648        );
649        return;
650    }
651
652    let origin_domain = match config.origin.domain() {
653        Some(val) => val,
654        None => {
655            error!("origin does not contain a valid domain");
656            std::process::exit(1);
657        }
658    };
659
660    let cert_root = match tls_key_path.parent() {
661        Some(parent) => parent,
662        None => {
663            error!("Unable to find parent directory of {:?}", tls_key_path);
664            std::process::exit(1);
665        }
666    };
667
668    let ca_cert = cert_root.join("ca.pem");
669    let ca_key = cert_root.join("cakey.pem");
670    let tls_cert_path = cert_root.join("cert.pem");
671
672    let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
673        // Generate the CA again.
674        let ca_handle = match crypto::build_ca(None) {
675            Ok(ca_handle) => ca_handle,
676            Err(e) => {
677                error!(err = ?e, "Failed to build CA");
678                std::process::exit(1);
679            }
680        };
681
682        if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
683            error!("Failed to write CA");
684            std::process::exit(1);
685        }
686
687        ca_handle
688    } else {
689        match crypto::load_ca(ca_key, ca_cert) {
690            Ok(ca_handle) => ca_handle,
691            Err(_) => {
692                error!("Failed to load CA");
693                std::process::exit(1);
694            }
695        }
696    };
697
698    if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
699        // Generate the cert from the ca.
700        let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
701            Ok(cert_handle) => cert_handle,
702            Err(e) => {
703                error!(err = ?e, "Failed to build certificate");
704                std::process::exit(1);
705            }
706        };
707
708        if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
709            error!("Failed to write certificates");
710            std::process::exit(1);
711        }
712    }
713    info!("certificate generation complete");
714}
715
716#[derive(Clone, Debug)]
717pub enum CoreAction {
718    Shutdown,
719}
720
721pub(crate) enum TaskName {
722    AdminSocket,
723    AuditdActor,
724    BackupActor,
725    DelayedActionActor,
726    HttpsServer,
727    IntervalActor,
728    LdapActor,
729    Replication,
730    TlsAcceptorReload,
731}
732
733impl Display for TaskName {
734    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
735        write!(
736            f,
737            "{}",
738            match self {
739                TaskName::AdminSocket => "Admin Socket",
740                TaskName::AuditdActor => "Auditd Actor",
741                TaskName::BackupActor => "Backup Actor",
742                TaskName::DelayedActionActor => "Delayed Action Actor",
743                TaskName::HttpsServer => "HTTPS Server",
744                TaskName::IntervalActor => "Interval Actor",
745                TaskName::LdapActor => "LDAP Acceptor Actor",
746                TaskName::Replication => "Replication",
747                TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
748            }
749        )
750    }
751}
752
753pub struct CoreHandle {
754    clean_shutdown: bool,
755    tx: broadcast::Sender<CoreAction>,
756    tls_acceptor_reload_notify: Arc<Notify>,
757    /// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end.
758    handles: Vec<(TaskName, task::JoinHandle<()>)>,
759}
760
761impl CoreHandle {
762    pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
763        self.tx.subscribe()
764    }
765
766    pub async fn shutdown(&mut self) {
767        if self.tx.send(CoreAction::Shutdown).is_err() {
768            eprintln!("No receivers acked shutdown request. Treating as unclean.");
769            return;
770        }
771
772        // Wait on the handles.
773        while let Some((handle_name, handle)) = self.handles.pop() {
774            if let Err(error) = handle.await {
775                eprintln!("Task {handle_name} failed to finish: {error:?}");
776            }
777        }
778
779        self.clean_shutdown = true;
780    }
781
782    pub async fn tls_acceptor_reload(&mut self) {
783        self.tls_acceptor_reload_notify.notify_one()
784    }
785}
786
787impl Drop for CoreHandle {
788    fn drop(&mut self) {
789        if !self.clean_shutdown {
790            eprintln!("⚠️  UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
791        }
792        // Can't enable yet until we clean up unix_int cache layer test
793        // debug_assert!(self.clean_shutdown);
794    }
795}
796
797pub async fn create_server_core(
798    config: Configuration,
799    config_test: bool,
800) -> Result<CoreHandle, ()> {
801    // Until this point, we probably want to write to the log macro fns.
802    let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
803
804    if config.integration_test_config.is_some() {
805        warn!("RUNNING IN INTEGRATION TEST MODE.");
806        warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
807    } else if config.tls_config.is_none() {
808        // TLS is great! We won't run without it.
809        error!("Running without TLS is not supported! Quitting!");
810        return Err(());
811    }
812
813    info!(
814        "Starting kanidm with {}configuration: {}",
815        if config_test { "TEST " } else { "" },
816        config
817    );
818    // Setup umask, so that every we touch or create is secure.
819    #[cfg(not(target_family = "windows"))]
820    unsafe {
821        umask(0o0027)
822    };
823
824    // Similar, create a stats task which aggregates statistics from the
825    // server as they come in.
826    let status_ref = StatusActor::start();
827
828    // Setup TLS (if any)
829    let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
830        Ok(tls_acc) => tls_acc,
831        Err(err) => {
832            error!(?err, "Failed to configure TLS acceptor");
833            return Err(());
834        }
835    };
836
837    let schema = match Schema::new() {
838        Ok(s) => s,
839        Err(e) => {
840            error!("Failed to setup in memory schema: {:?}", e);
841            return Err(());
842        }
843    };
844
845    // Setup the be for the qs.
846    let be = match setup_backend(&config, &schema) {
847        Ok(be) => be,
848        Err(e) => {
849            error!("Failed to setup BE -> {:?}", e);
850            return Err(());
851        }
852    };
853    // Start the IDM server.
854    let (_qs, idms, mut idms_delayed, mut idms_audit) =
855        match setup_qs_idms(be, schema, &config).await {
856            Ok(t) => t,
857            Err(e) => {
858                error!("Unable to setup query server or idm server -> {:?}", e);
859                return Err(());
860            }
861        };
862
863    // Extract any configuration from the IDMS that we may need.
864    // For now we just do this per run, but we need to extract this from the db later.
865    let jws_signer = match JwsHs256Signer::generate_hs256() {
866        Ok(k) => k.set_sign_option_embed_kid(false),
867        Err(e) => {
868            error!("Unable to setup jws signer -> {:?}", e);
869            return Err(());
870        }
871    };
872
873    // Any pre-start tasks here.
874    if let Some(itc) = &config.integration_test_config {
875        let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
876            error!("Unable to acquire write transaction");
877            return Err(());
878        };
879        // We need to set the admin pw.
880        match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
881            Ok(_) => {}
882            Err(e) => {
883                error!(
884                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
885                    &itc.admin_user, e
886                );
887                return Err(());
888            }
889        };
890        // set the idm_admin account password
891        match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
892            Ok(_) => {}
893            Err(e) => {
894                error!(
895                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
896                    &itc.idm_admin_user, e
897                );
898                return Err(());
899            }
900        };
901
902        // Add admin to idm_admins to allow tests more flexibility wrt to permissions.
903        // This way our default access controls can be stricter to prevent lateral
904        // movement.
905        match idms_prox_write.qs_write.internal_modify_uuid(
906            UUID_IDM_ADMINS,
907            &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
908        ) {
909            Ok(_) => {}
910            Err(e) => {
911                error!(
912                    "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
913                    e
914                );
915                return Err(());
916            }
917        };
918
919        match idms_prox_write.qs_write.internal_modify_uuid(
920            UUID_IDM_ALL_PERSONS,
921            &ModifyList::new_purge_and_set(
922                Attribute::CredentialTypeMinimum,
923                CredentialType::Any.into(),
924            ),
925        ) {
926            Ok(_) => {}
927            Err(e) => {
928                error!(
929                    "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
930                    e
931                );
932                return Err(());
933            }
934        };
935
936        match idms_prox_write.commit() {
937            Ok(_) => {}
938            Err(e) => {
939                error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
940                return Err(());
941            }
942        }
943    }
944
945    let ldap = match LdapServer::new(&idms).await {
946        Ok(l) => l,
947        Err(e) => {
948            error!("Unable to start LdapServer -> {:?}", e);
949            return Err(());
950        }
951    };
952
953    // Arc the idms and ldap
954    let idms_arc = Arc::new(idms);
955    let ldap_arc = Arc::new(ldap);
956
957    // Pass it to the actor for threading.
958    // Start the read query server with the given be path: future config
959    let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
960
961    // Create the server async write entry point.
962    let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
963
964    let delayed_handle = task::spawn(async move {
965        let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
966        loop {
967            tokio::select! {
968                added = idms_delayed.recv_many(&mut buffer) => {
969                    if added == 0 {
970                        // Channel has closed, stop the task.
971                        break
972                    }
973                    server_write_ref.handle_delayedaction(&mut buffer).await;
974                }
975                Ok(action) = broadcast_rx.recv() => {
976                    match action {
977                        CoreAction::Shutdown => break,
978                    }
979                }
980            }
981        }
982        info!("Stopped {}", TaskName::DelayedActionActor);
983    });
984
985    let mut broadcast_rx = broadcast_tx.subscribe();
986
987    let auditd_handle = task::spawn(async move {
988        loop {
989            tokio::select! {
990                Ok(action) = broadcast_rx.recv() => {
991                    match action {
992                        CoreAction::Shutdown => break,
993                    }
994                }
995                audit_event = idms_audit.audit_rx().recv() => {
996                    match serde_json::to_string(&audit_event) {
997                        Ok(audit_event) => {
998                            warn!(%audit_event);
999                        }
1000                        Err(e) => {
1001                            error!(err=?e, "Unable to process audit event to json.");
1002                            warn!(?audit_event, json=false);
1003                        }
1004                    }
1005
1006                }
1007            }
1008        }
1009        info!("Stopped {}", TaskName::AuditdActor);
1010    });
1011
1012    // Setup a TLS Acceptor Reload trigger.
1013
1014    let mut broadcast_rx = broadcast_tx.subscribe();
1015    let tls_acceptor_reload_notify = Arc::new(Notify::new());
1016    let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1017    let tls_config = config.tls_config.clone();
1018
1019    let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1020    let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1021
1022    let tls_acceptor_reload_handle = task::spawn(async move {
1023        loop {
1024            tokio::select! {
1025                Ok(action) = broadcast_rx.recv() => {
1026                    match action {
1027                        CoreAction::Shutdown => break,
1028                    }
1029                }
1030                _ = tls_accepter_reload_task_notify.notified() => {
1031                    let tls_acceptor = match crypto::setup_tls(&tls_config) {
1032                        Ok(Some(tls_acc)) => tls_acc,
1033                        Ok(None) => {
1034                            warn!("TLS not configured, ignoring reload request.");
1035                            continue;
1036                        }
1037                        Err(err) => {
1038                            error!(?err, "Failed to configure and reload TLS acceptor");
1039                            continue;
1040                        }
1041                    };
1042
1043                    // We don't log here as the receivers will notify when they have completed
1044                    // the reload.
1045                    if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1046                        error!("tls acceptor did not accept the reload, the server may have failed!");
1047                    };
1048                    info!("tls acceptor reload notification sent");
1049                }
1050            }
1051        }
1052        info!("Stopped {}", TaskName::TlsAcceptorReload);
1053    });
1054
1055    // Setup timed events associated to the write thread
1056    let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1057    // Setup timed events associated to the read thread
1058    let maybe_backup_handle = match &config.online_backup {
1059        Some(online_backup_config) => {
1060            if online_backup_config.enabled {
1061                let handle = IntervalActor::start_online_backup(
1062                    server_read_ref,
1063                    online_backup_config,
1064                    broadcast_tx.subscribe(),
1065                )?;
1066                Some(handle)
1067            } else {
1068                debug!("Backups disabled");
1069                None
1070            }
1071        }
1072        None => {
1073            debug!("Online backup not requested, skipping");
1074            None
1075        }
1076    };
1077
1078    // If we have been requested to init LDAP, configure it now.
1079    let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1080        Some(la) => {
1081            let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1082
1083            let h = ldaps::create_ldap_server(
1084                la,
1085                opt_ldap_ssl_acceptor,
1086                server_read_ref,
1087                &broadcast_tx,
1088                &tls_acceptor_reload_tx,
1089                config.ldap_client_address_info.trusted_tcp_info(),
1090            )
1091            .await?;
1092            Some(h)
1093        }
1094        None => {
1095            debug!("LDAP not requested, skipping");
1096            None
1097        }
1098    };
1099
1100    // If we have replication configured, setup the listener with its initial replication
1101    // map (if any).
1102    let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1103        Some(rc) => {
1104            if !config_test {
1105                // ⚠️  only start the sockets and listeners in non-config-test modes.
1106                let (h, repl_ctrl_tx) =
1107                    repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1108                        .await?;
1109                (Some(h), Some(repl_ctrl_tx))
1110            } else {
1111                (None, None)
1112            }
1113        }
1114        None => {
1115            debug!("Replication not requested, skipping");
1116            (None, None)
1117        }
1118    };
1119
1120    let maybe_http_acceptor_handles = if config_test {
1121        admin_info!("This config rocks! 🪨 ");
1122        None
1123    } else {
1124        let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1125            config.clone(),
1126            jws_signer,
1127            status_ref,
1128            server_write_ref,
1129            server_read_ref,
1130            broadcast_tx.clone(),
1131            maybe_tls_acceptor,
1132            &tls_acceptor_reload_tx,
1133        )
1134        .await
1135        .inspect_err(|err| {
1136            error!(?err, "Failed to start HTTPS server");
1137        })?;
1138
1139        if config.role != ServerRole::WriteReplicaNoUI {
1140            admin_info!("ready to rock! 🪨  UI available at: {}", config.origin);
1141        } else {
1142            admin_info!("ready to rock! 🪨 ");
1143        }
1144        Some(handles)
1145    };
1146
1147    // If we are NOT in integration test mode, start the admin socket now
1148    let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1149        let broadcast_rx = broadcast_tx.subscribe();
1150
1151        let admin_handle = AdminActor::create_admin_sock(
1152            config.adminbindpath.as_str(),
1153            server_write_ref,
1154            server_read_ref,
1155            broadcast_rx,
1156            maybe_repl_ctrl_tx,
1157        )
1158        .await?;
1159
1160        Some(admin_handle)
1161    } else {
1162        None
1163    };
1164
1165    let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1166        (TaskName::IntervalActor, interval_handle),
1167        (TaskName::DelayedActionActor, delayed_handle),
1168        (TaskName::AuditdActor, auditd_handle),
1169        (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1170    ];
1171
1172    if let Some(backup_handle) = maybe_backup_handle {
1173        handles.push((TaskName::BackupActor, backup_handle))
1174    }
1175
1176    if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1177        handles.push((TaskName::AdminSocket, admin_sock_handle))
1178    }
1179
1180    if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1181        for ldap_handle in ldap_handles {
1182            handles.push((TaskName::LdapActor, ldap_handle))
1183        }
1184    }
1185
1186    if let Some(http_handles) = maybe_http_acceptor_handles {
1187        for http_handle in http_handles {
1188            handles.push((TaskName::HttpsServer, http_handle))
1189        }
1190    }
1191
1192    if let Some(repl_handle) = maybe_repl_handle {
1193        handles.push((TaskName::Replication, repl_handle))
1194    }
1195
1196    Ok(CoreHandle {
1197        clean_shutdown: false,
1198        tls_acceptor_reload_notify,
1199        tx: broadcast_tx,
1200        handles,
1201    })
1202}