kanidmd_core/
lib.rs

1//! These contain the server "cores". These are able to startup the server
2//! (bootstrap) to a running state and then execute tasks. This is where modules
3//! are logically ordered based on their depenedncies for execution. Some of these
4//! are task-only i.e. reindexing, and some of these launch the server into a
5//! fully operational state (https, ldap, etc).
6//!
7//! Generally, this is the "entry point" where the server begins to run, and
8//! the entry point for all client traffic which is then directed to the
9//! various `actors`.
10
11#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23
24#[macro_use]
25extern crate tracing;
26#[macro_use]
27extern crate kanidmd_lib;
28
29mod actors;
30pub mod admin;
31pub mod config;
32mod crypto;
33mod https;
34mod interval;
35mod ldaps;
36mod repl;
37mod utils;
38
39use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
40use crate::admin::AdminActor;
41use crate::config::{Configuration, ServerRole};
42use crate::interval::IntervalActor;
43use crate::utils::touch_file_or_quit;
44use compact_jwt::{JwsHs256Signer, JwsSigner};
45use kanidm_proto::backup::BackupCompression;
46use kanidm_proto::internal::OperationError;
47use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
48use kanidmd_lib::idm::ldap::LdapServer;
49use kanidmd_lib::prelude::*;
50use kanidmd_lib::schema::Schema;
51use kanidmd_lib::status::StatusActor;
52use kanidmd_lib::value::CredentialType;
53#[cfg(not(target_family = "windows"))]
54use libc::umask;
55use std::fmt::{Display, Formatter};
56use std::path::Path;
57use std::sync::Arc;
58use tokio::sync::broadcast;
59use tokio::sync::mpsc;
60use tokio::sync::Notify;
61use tokio::task;
62
63// === internal setup helpers
64
65fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
66    setup_backend_vacuum(config, schema, false)
67}
68
69fn setup_backend_vacuum(
70    config: &Configuration,
71    schema: &Schema,
72    vacuum: bool,
73) -> Result<Backend, OperationError> {
74    // Limit the scope of the schema txn.
75    // let schema_txn = task::block_on(schema.write());
76    let schema_txn = schema.write();
77    let idxmeta = schema_txn.reload_idxmeta();
78
79    let pool_size: u32 = config.threads as u32;
80
81    let cfg = BackendConfig::new(
82        config.db_path.as_deref(),
83        pool_size,
84        config.db_fs_type.unwrap_or_default(),
85        config.db_arc_size,
86    );
87
88    Backend::new(cfg, idxmeta, vacuum)
89}
90
91// TODO #54: We could move most of the be/schema/qs setup and startup
92// outside of this call, then pass in "what we need" in a cloneable
93// form, this way we could have separate Idm vs Qs threads, and dedicated
94// threads for write vs read
95async fn setup_qs_idms(
96    be: Backend,
97    schema: Schema,
98    config: &Configuration,
99) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
100    let curtime = duration_from_epoch_now();
101    // Create a query_server implementation
102    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
103
104    // TODO #62: Should the IDM parts be broken out to the IdmServer?
105    // What's important about this initial setup here is that it also triggers
106    // the schema and acp reload, so they are now configured correctly!
107    // Initialise the schema core.
108    //
109    // Now search for the schema itself, and validate that the system
110    // in memory matches the BE on disk, and that it's syntactically correct.
111    // Write it out if changes are needed.
112    query_server
113        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
114        .await?;
115
116    // We generate a SINGLE idms only!
117    let is_integration_test = config.integration_test_config.is_some();
118    let (idms, idms_delayed, idms_audit) = IdmServer::new(
119        query_server.clone(),
120        &config.origin,
121        is_integration_test,
122        curtime,
123    )
124    .await?;
125
126    Ok((query_server, idms, idms_delayed, idms_audit))
127}
128
129async fn setup_qs(
130    be: Backend,
131    schema: Schema,
132    config: &Configuration,
133) -> Result<QueryServer, OperationError> {
134    let curtime = duration_from_epoch_now();
135    // Create a query_server implementation
136    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
137
138    // TODO #62: Should the IDM parts be broken out to the IdmServer?
139    // What's important about this initial setup here is that it also triggers
140    // the schema and acp reload, so they are now configured correctly!
141    // Initialise the schema core.
142    //
143    // Now search for the schema itself, and validate that the system
144    // in memory matches the BE on disk, and that it's syntactically correct.
145    // Write it out if changes are needed.
146    query_server
147        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
148        .await?;
149
150    Ok(query_server)
151}
152
153macro_rules! dbscan_setup_be {
154    (
155        $config:expr
156    ) => {{
157        let schema = match Schema::new() {
158            Ok(s) => s,
159            Err(e) => {
160                error!("Failed to setup in memory schema: {:?}", e);
161                std::process::exit(1);
162            }
163        };
164
165        match setup_backend($config, &schema) {
166            Ok(be) => be,
167            Err(e) => {
168                error!("Failed to setup BE: {:?}", e);
169                return;
170            }
171        }
172    }};
173}
174
175pub fn dbscan_list_indexes_core(config: &Configuration) {
176    let be = dbscan_setup_be!(config);
177    let mut be_rotxn = match be.read() {
178        Ok(txn) => txn,
179        Err(err) => {
180            error!(?err, "Unable to proceed, backend read transaction failure.");
181            return;
182        }
183    };
184
185    match be_rotxn.list_indexes() {
186        Ok(mut idx_list) => {
187            idx_list.sort_unstable();
188            idx_list.iter().for_each(|idx_name| {
189                println!("{idx_name}");
190            })
191        }
192        Err(e) => {
193            error!("Failed to retrieve index list: {:?}", e);
194        }
195    };
196}
197
198pub fn dbscan_list_id2entry_core(config: &Configuration) {
199    let be = dbscan_setup_be!(config);
200    let mut be_rotxn = match be.read() {
201        Ok(txn) => txn,
202        Err(err) => {
203            error!(?err, "Unable to proceed, backend read transaction failure.");
204            return;
205        }
206    };
207
208    match be_rotxn.list_id2entry() {
209        Ok(mut id_list) => {
210            id_list.sort_unstable_by_key(|k| k.0);
211            id_list.iter().for_each(|(id, value)| {
212                println!("{id:>8}: {value}");
213            })
214        }
215        Err(e) => {
216            error!("Failed to retrieve id2entry list: {:?}", e);
217        }
218    };
219}
220
221pub fn dbscan_list_index_analysis_core(config: &Configuration) {
222    let _be = dbscan_setup_be!(config);
223    // TBD in after slopes merge.
224}
225
226pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
227    let be = dbscan_setup_be!(config);
228    let mut be_rotxn = match be.read() {
229        Ok(txn) => txn,
230        Err(err) => {
231            error!(?err, "Unable to proceed, backend read transaction failure.");
232            return;
233        }
234    };
235
236    match be_rotxn.list_index_content(index_name) {
237        Ok(mut idx_list) => {
238            idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
239            idx_list.iter().for_each(|(key, value)| {
240                println!("{key:>50}: {value:?}");
241            })
242        }
243        Err(e) => {
244            error!("Failed to retrieve index list: {:?}", e);
245        }
246    };
247}
248
249pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
250    let be = dbscan_setup_be!(config);
251    let mut be_rotxn = match be.read() {
252        Ok(txn) => txn,
253        Err(err) => {
254            error!(?err, "Unable to proceed, backend read transaction failure.");
255            return;
256        }
257    };
258
259    match be_rotxn.get_id2entry(id) {
260        Ok((id, value)) => println!("{id:>8}: {value}"),
261        Err(e) => {
262            error!("Failed to retrieve id2entry value: {:?}", e);
263        }
264    };
265}
266
267pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
268    let be = dbscan_setup_be!(config);
269    let mut be_wrtxn = match be.write() {
270        Ok(txn) => txn,
271        Err(err) => {
272            error!(
273                ?err,
274                "Unable to proceed, backend write transaction failure."
275            );
276            return;
277        }
278    };
279
280    match be_wrtxn
281        .quarantine_entry(id)
282        .and_then(|_| be_wrtxn.commit())
283    {
284        Ok(()) => {
285            println!("quarantined - {id:>8}")
286        }
287        Err(e) => {
288            error!("Failed to quarantine id2entry value: {:?}", e);
289        }
290    };
291}
292
293pub fn dbscan_list_quarantined_core(config: &Configuration) {
294    let be = dbscan_setup_be!(config);
295    let mut be_rotxn = match be.read() {
296        Ok(txn) => txn,
297        Err(err) => {
298            error!(?err, "Unable to proceed, backend read transaction failure.");
299            return;
300        }
301    };
302
303    match be_rotxn.list_quarantined() {
304        Ok(mut id_list) => {
305            id_list.sort_unstable_by_key(|k| k.0);
306            id_list.iter().for_each(|(id, value)| {
307                println!("{id:>8}: {value}");
308            })
309        }
310        Err(e) => {
311            error!("Failed to retrieve id2entry list: {:?}", e);
312        }
313    };
314}
315
316pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
317    let be = dbscan_setup_be!(config);
318    let mut be_wrtxn = match be.write() {
319        Ok(txn) => txn,
320        Err(err) => {
321            error!(
322                ?err,
323                "Unable to proceed, backend write transaction failure."
324            );
325            return;
326        }
327    };
328
329    match be_wrtxn
330        .restore_quarantined(id)
331        .and_then(|_| be_wrtxn.commit())
332    {
333        Ok(()) => {
334            println!("restored - {id:>8}")
335        }
336        Err(e) => {
337            error!("Failed to restore quarantined id2entry value: {:?}", e);
338        }
339    };
340}
341
342pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
343    let schema = match Schema::new() {
344        Ok(s) => s,
345        Err(e) => {
346            error!("Failed to setup in memory schema: {:?}", e);
347            std::process::exit(1);
348        }
349    };
350
351    let be = match setup_backend(config, &schema) {
352        Ok(be) => be,
353        Err(e) => {
354            error!("Failed to setup BE: {:?}", e);
355            return;
356        }
357    };
358
359    let mut be_ro_txn = match be.read() {
360        Ok(txn) => txn,
361        Err(err) => {
362            error!(?err, "Unable to proceed, backend read transaction failure.");
363            return;
364        }
365    };
366
367    let compression = match config.online_backup.as_ref() {
368        Some(backup_config) => backup_config.compression,
369        None => BackupCompression::default(),
370    };
371
372    match be_ro_txn.backup(dst_path, compression) {
373        Ok(_) => info!("Backup success!"),
374        Err(e) => {
375            error!("Backup failed: {:?}", e);
376            std::process::exit(1);
377        }
378    };
379    // Let the txn abort, even on success.
380}
381
382pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
383    // If it's an in memory database, we don't need to touch anything
384    if let Some(db_path) = config.db_path.as_ref() {
385        touch_file_or_quit(db_path);
386    }
387
388    // First, we provide the in-memory schema so that core attrs are indexed correctly.
389    let schema = match Schema::new() {
390        Ok(s) => s,
391        Err(e) => {
392            error!("Failed to setup in memory schema: {:?}", e);
393            std::process::exit(1);
394        }
395    };
396
397    let be = match setup_backend(config, &schema) {
398        Ok(be) => be,
399        Err(e) => {
400            error!("Failed to setup backend: {:?}", e);
401            return;
402        }
403    };
404
405    let mut be_wr_txn = match be.write() {
406        Ok(txn) => txn,
407        Err(err) => {
408            error!(
409                ?err,
410                "Unable to proceed, backend write transaction failure."
411            );
412            return;
413        }
414    };
415    let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
416
417    if r.is_err() {
418        error!("Failed to restore database: {:?}", r);
419        std::process::exit(1);
420    }
421    info!("Database loaded successfully");
422
423    reindex_inner(be, schema, config).await;
424
425    info!("✅ Restore Success!");
426}
427
428pub async fn reindex_server_core(config: &Configuration) {
429    info!("Start Index Phase 1 ...");
430    // First, we provide the in-memory schema so that core attrs are indexed correctly.
431    let schema = match Schema::new() {
432        Ok(s) => s,
433        Err(e) => {
434            error!("Failed to setup in memory schema: {:?}", e);
435            std::process::exit(1);
436        }
437    };
438
439    let be = match setup_backend(config, &schema) {
440        Ok(be) => be,
441        Err(e) => {
442            error!("Failed to setup BE: {:?}", e);
443            return;
444        }
445    };
446
447    reindex_inner(be, schema, config).await;
448
449    info!("✅ Reindex Success!");
450}
451
452async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
453    // Reindex only the core schema attributes to bootstrap the process.
454    let mut be_wr_txn = match be.write() {
455        Ok(txn) => txn,
456        Err(err) => {
457            error!(
458                ?err,
459                "Unable to proceed, backend write transaction failure."
460            );
461            return;
462        }
463    };
464
465    let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
466
467    // Now that's done, setup a minimal qs and reindex from that.
468    if r.is_err() {
469        error!("Failed to reindex database: {:?}", r);
470        std::process::exit(1);
471    }
472    info!("Index Phase 1 Success!");
473
474    info!("Attempting to init query server ...");
475
476    let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
477        Ok(t) => t,
478        Err(e) => {
479            error!("Unable to setup query server or idm server -> {:?}", e);
480            return;
481        }
482    };
483    info!("Init Query Server Success!");
484
485    info!("Start Index Phase 2 ...");
486
487    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
488        error!("Unable to acquire write transaction");
489        return;
490    };
491    let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
492
493    match r {
494        Ok(_) => info!("Index Phase 2 Success!"),
495        Err(e) => {
496            error!("Reindex failed: {:?}", e);
497            std::process::exit(1);
498        }
499    };
500}
501
502pub fn vacuum_server_core(config: &Configuration) {
503    let schema = match Schema::new() {
504        Ok(s) => s,
505        Err(e) => {
506            eprintln!("Failed to setup in memory schema: {e:?}");
507            std::process::exit(1);
508        }
509    };
510
511    // The schema doesn't matter here. Vacuum is run as part of db open to avoid
512    // locking.
513    let r = setup_backend_vacuum(config, &schema, true);
514
515    match r {
516        Ok(_) => eprintln!("Vacuum Success!"),
517        Err(e) => {
518            eprintln!("Vacuum failed: {e:?}");
519            std::process::exit(1);
520        }
521    };
522}
523
524pub async fn domain_rename_core(config: &Configuration) {
525    let schema = match Schema::new() {
526        Ok(s) => s,
527        Err(e) => {
528            eprintln!("Failed to setup in memory schema: {e:?}");
529            std::process::exit(1);
530        }
531    };
532
533    // Start the backend.
534    let be = match setup_backend(config, &schema) {
535        Ok(be) => be,
536        Err(e) => {
537            error!("Failed to setup BE: {:?}", e);
538            return;
539        }
540    };
541
542    // Setup the qs, and perform any migrations and changes we may have.
543    let qs = match setup_qs(be, schema, config).await {
544        Ok(t) => t,
545        Err(e) => {
546            error!("Unable to setup query server -> {:?}", e);
547            return;
548        }
549    };
550
551    let new_domain_name = config.domain.as_str();
552
553    // make sure we're actually changing the domain name...
554    match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
555        Ok(old_domain_name) => {
556            admin_info!(?old_domain_name, ?new_domain_name);
557            if old_domain_name == new_domain_name {
558                admin_info!("Domain name not changing, stopping.");
559                return;
560            }
561            admin_debug!(
562                "Domain name is changing from {:?} to {:?}",
563                old_domain_name,
564                new_domain_name
565            );
566        }
567        Err(e) => {
568            admin_error!("Failed to query domain name, quitting! -> {:?}", e);
569            return;
570        }
571    }
572
573    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
574        error!("Unable to acquire write transaction");
575        return;
576    };
577    let r = qs_write
578        .danger_domain_rename(new_domain_name)
579        .and_then(|_| qs_write.commit());
580
581    match r {
582        Ok(_) => info!("Domain Rename Success!"),
583        Err(e) => {
584            error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
585            std::process::exit(1);
586        }
587    };
588}
589
590pub async fn verify_server_core(config: &Configuration) {
591    let curtime = duration_from_epoch_now();
592    // setup the qs - without initialise!
593    let schema_mem = match Schema::new() {
594        Ok(sc) => sc,
595        Err(e) => {
596            error!("Failed to setup in memory schema: {:?}", e);
597            return;
598        }
599    };
600    // Setup the be
601    let be = match setup_backend(config, &schema_mem) {
602        Ok(be) => be,
603        Err(e) => {
604            error!("Failed to setup BE: {:?}", e);
605            return;
606        }
607    };
608
609    let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
610        Ok(qs) => qs,
611        Err(err) => {
612            error!(?err, "Failed to setup query server");
613            return;
614        }
615    };
616
617    // Run verifications.
618    let r = server.verify().await;
619
620    if r.is_empty() {
621        eprintln!("Verification passed!");
622        std::process::exit(0);
623    } else {
624        for er in r {
625            error!("{:?}", er);
626        }
627        std::process::exit(1);
628    }
629
630    // Now add IDM server verifications?
631}
632
633pub fn cert_generate_core(config: &Configuration) {
634    // Get the cert root
635
636    let (tls_key_path, tls_chain_path) = match &config.tls_config {
637        Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
638        None => {
639            error!("Unable to find TLS configuration");
640            std::process::exit(1);
641        }
642    };
643
644    if tls_key_path.exists() && tls_chain_path.exists() {
645        info!(
646            "TLS key and chain already exist - remove them first if you intend to regenerate these"
647        );
648        return;
649    }
650
651    let origin_domain = match config.origin.domain() {
652        Some(val) => val,
653        None => {
654            error!("origin does not contain a valid domain");
655            std::process::exit(1);
656        }
657    };
658
659    let cert_root = match tls_key_path.parent() {
660        Some(parent) => parent,
661        None => {
662            error!("Unable to find parent directory of {:?}", tls_key_path);
663            std::process::exit(1);
664        }
665    };
666
667    let ca_cert = cert_root.join("ca.pem");
668    let ca_key = cert_root.join("cakey.pem");
669    let tls_cert_path = cert_root.join("cert.pem");
670
671    let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
672        // Generate the CA again.
673        let ca_handle = match crypto::build_ca(None) {
674            Ok(ca_handle) => ca_handle,
675            Err(e) => {
676                error!(err = ?e, "Failed to build CA");
677                std::process::exit(1);
678            }
679        };
680
681        if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
682            error!("Failed to write CA");
683            std::process::exit(1);
684        }
685
686        ca_handle
687    } else {
688        match crypto::load_ca(ca_key, ca_cert) {
689            Ok(ca_handle) => ca_handle,
690            Err(_) => {
691                error!("Failed to load CA");
692                std::process::exit(1);
693            }
694        }
695    };
696
697    if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
698        // Generate the cert from the ca.
699        let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
700            Ok(cert_handle) => cert_handle,
701            Err(e) => {
702                error!(err = ?e, "Failed to build certificate");
703                std::process::exit(1);
704            }
705        };
706
707        if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
708            error!("Failed to write certificates");
709            std::process::exit(1);
710        }
711    }
712    info!("certificate generation complete");
713}
714
715#[derive(Clone, Debug)]
716pub enum CoreAction {
717    Shutdown,
718}
719
720pub(crate) enum TaskName {
721    AdminSocket,
722    AuditdActor,
723    BackupActor,
724    DelayedActionActor,
725    HttpsServer,
726    IntervalActor,
727    LdapActor,
728    Replication,
729    TlsAcceptorReload,
730}
731
732impl Display for TaskName {
733    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
734        write!(
735            f,
736            "{}",
737            match self {
738                TaskName::AdminSocket => "Admin Socket",
739                TaskName::AuditdActor => "Auditd Actor",
740                TaskName::BackupActor => "Backup Actor",
741                TaskName::DelayedActionActor => "Delayed Action Actor",
742                TaskName::HttpsServer => "HTTPS Server",
743                TaskName::IntervalActor => "Interval Actor",
744                TaskName::LdapActor => "LDAP Acceptor Actor",
745                TaskName::Replication => "Replication",
746                TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
747            }
748        )
749    }
750}
751
752pub struct CoreHandle {
753    clean_shutdown: bool,
754    tx: broadcast::Sender<CoreAction>,
755    tls_acceptor_reload_notify: Arc<Notify>,
756    /// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end.
757    handles: Vec<(TaskName, task::JoinHandle<()>)>,
758}
759
760impl CoreHandle {
761    pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
762        self.tx.subscribe()
763    }
764
765    pub async fn shutdown(&mut self) {
766        if self.tx.send(CoreAction::Shutdown).is_err() {
767            eprintln!("No receivers acked shutdown request. Treating as unclean.");
768            return;
769        }
770
771        // Wait on the handles.
772        while let Some((handle_name, handle)) = self.handles.pop() {
773            if let Err(error) = handle.await {
774                eprintln!("Task {handle_name} failed to finish: {error:?}");
775            }
776        }
777
778        self.clean_shutdown = true;
779    }
780
781    pub async fn tls_acceptor_reload(&mut self) {
782        self.tls_acceptor_reload_notify.notify_one()
783    }
784}
785
786impl Drop for CoreHandle {
787    fn drop(&mut self) {
788        if !self.clean_shutdown {
789            eprintln!("⚠️  UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
790        }
791        // Can't enable yet until we clean up unix_int cache layer test
792        // debug_assert!(self.clean_shutdown);
793    }
794}
795
796pub async fn create_server_core(
797    config: Configuration,
798    config_test: bool,
799) -> Result<CoreHandle, ()> {
800    // Until this point, we probably want to write to the log macro fns.
801    let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
802
803    if config.integration_test_config.is_some() {
804        warn!("RUNNING IN INTEGRATION TEST MODE.");
805        warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
806    } else if config.tls_config.is_none() {
807        // TLS is great! We won't run without it.
808        error!("Running without TLS is not supported! Quitting!");
809        return Err(());
810    }
811
812    info!(
813        "Starting kanidm with {}configuration: {}",
814        if config_test { "TEST " } else { "" },
815        config
816    );
817    // Setup umask, so that every we touch or create is secure.
818    #[cfg(not(target_family = "windows"))]
819    unsafe {
820        umask(0o0027)
821    };
822
823    // Similar, create a stats task which aggregates statistics from the
824    // server as they come in.
825    let status_ref = StatusActor::start();
826
827    // Setup TLS (if any)
828    let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
829        Ok(tls_acc) => tls_acc,
830        Err(err) => {
831            error!(?err, "Failed to configure TLS acceptor");
832            return Err(());
833        }
834    };
835
836    let schema = match Schema::new() {
837        Ok(s) => s,
838        Err(e) => {
839            error!("Failed to setup in memory schema: {:?}", e);
840            return Err(());
841        }
842    };
843
844    // Setup the be for the qs.
845    let be = match setup_backend(&config, &schema) {
846        Ok(be) => be,
847        Err(e) => {
848            error!("Failed to setup BE -> {:?}", e);
849            return Err(());
850        }
851    };
852    // Start the IDM server.
853    let (_qs, idms, mut idms_delayed, mut idms_audit) =
854        match setup_qs_idms(be, schema, &config).await {
855            Ok(t) => t,
856            Err(e) => {
857                error!("Unable to setup query server or idm server -> {:?}", e);
858                return Err(());
859            }
860        };
861
862    // Extract any configuration from the IDMS that we may need.
863    // For now we just do this per run, but we need to extract this from the db later.
864    let jws_signer = match JwsHs256Signer::generate_hs256() {
865        Ok(k) => k.set_sign_option_embed_kid(false),
866        Err(e) => {
867            error!("Unable to setup jws signer -> {:?}", e);
868            return Err(());
869        }
870    };
871
872    // Any pre-start tasks here.
873    if let Some(itc) = &config.integration_test_config {
874        let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
875            error!("Unable to acquire write transaction");
876            return Err(());
877        };
878        // We need to set the admin pw.
879        match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
880            Ok(_) => {}
881            Err(e) => {
882                error!(
883                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
884                    &itc.admin_user, e
885                );
886                return Err(());
887            }
888        };
889        // set the idm_admin account password
890        match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
891            Ok(_) => {}
892            Err(e) => {
893                error!(
894                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
895                    &itc.idm_admin_user, e
896                );
897                return Err(());
898            }
899        };
900
901        // Add admin to idm_admins to allow tests more flexibility wrt to permissions.
902        // This way our default access controls can be stricter to prevent lateral
903        // movement.
904        match idms_prox_write.qs_write.internal_modify_uuid(
905            UUID_IDM_ADMINS,
906            &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
907        ) {
908            Ok(_) => {}
909            Err(e) => {
910                error!(
911                    "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
912                    e
913                );
914                return Err(());
915            }
916        };
917
918        match idms_prox_write.qs_write.internal_modify_uuid(
919            UUID_IDM_ALL_PERSONS,
920            &ModifyList::new_purge_and_set(
921                Attribute::CredentialTypeMinimum,
922                CredentialType::Any.into(),
923            ),
924        ) {
925            Ok(_) => {}
926            Err(e) => {
927                error!(
928                    "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
929                    e
930                );
931                return Err(());
932            }
933        };
934
935        match idms_prox_write.commit() {
936            Ok(_) => {}
937            Err(e) => {
938                error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
939                return Err(());
940            }
941        }
942    }
943
944    let ldap = match LdapServer::new(&idms).await {
945        Ok(l) => l,
946        Err(e) => {
947            error!("Unable to start LdapServer -> {:?}", e);
948            return Err(());
949        }
950    };
951
952    // Arc the idms and ldap
953    let idms_arc = Arc::new(idms);
954    let ldap_arc = Arc::new(ldap);
955
956    // Pass it to the actor for threading.
957    // Start the read query server with the given be path: future config
958    let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
959
960    // Create the server async write entry point.
961    let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
962
963    let delayed_handle = task::spawn(async move {
964        let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
965        loop {
966            tokio::select! {
967                added = idms_delayed.recv_many(&mut buffer) => {
968                    if added == 0 {
969                        // Channel has closed, stop the task.
970                        break
971                    }
972                    server_write_ref.handle_delayedaction(&mut buffer).await;
973                }
974                Ok(action) = broadcast_rx.recv() => {
975                    match action {
976                        CoreAction::Shutdown => break,
977                    }
978                }
979            }
980        }
981        info!("Stopped {}", TaskName::DelayedActionActor);
982    });
983
984    let mut broadcast_rx = broadcast_tx.subscribe();
985
986    let auditd_handle = task::spawn(async move {
987        loop {
988            tokio::select! {
989                Ok(action) = broadcast_rx.recv() => {
990                    match action {
991                        CoreAction::Shutdown => break,
992                    }
993                }
994                audit_event = idms_audit.audit_rx().recv() => {
995                    match serde_json::to_string(&audit_event) {
996                        Ok(audit_event) => {
997                            warn!(%audit_event);
998                        }
999                        Err(e) => {
1000                            error!(err=?e, "Unable to process audit event to json.");
1001                            warn!(?audit_event, json=false);
1002                        }
1003                    }
1004
1005                }
1006            }
1007        }
1008        info!("Stopped {}", TaskName::AuditdActor);
1009    });
1010
1011    // Setup a TLS Acceptor Reload trigger.
1012
1013    let mut broadcast_rx = broadcast_tx.subscribe();
1014    let tls_acceptor_reload_notify = Arc::new(Notify::new());
1015    let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1016    let tls_config = config.tls_config.clone();
1017
1018    let ldap_configured = config.ldapbindaddress.is_some();
1019    let (ldap_tls_acceptor_reload_tx, ldap_tls_acceptor_reload_rx) = mpsc::channel(1);
1020    let (http_tls_acceptor_reload_tx, http_tls_acceptor_reload_rx) = mpsc::channel(1);
1021
1022    let tls_acceptor_reload_handle = task::spawn(async move {
1023        loop {
1024            tokio::select! {
1025                Ok(action) = broadcast_rx.recv() => {
1026                    match action {
1027                        CoreAction::Shutdown => break,
1028                    }
1029                }
1030                _ = tls_accepter_reload_task_notify.notified() => {
1031                    let tls_acceptor = match crypto::setup_tls(&tls_config) {
1032                        Ok(Some(tls_acc)) => tls_acc,
1033                        Ok(None) => {
1034                            warn!("TLS not configured, ignoring reload request.");
1035                            continue;
1036                        }
1037                        Err(err) => {
1038                            error!(?err, "Failed to configure and reload TLS acceptor");
1039                            continue;
1040                        }
1041                    };
1042
1043                    // We don't log here as the receivers will notify when they have completed
1044                    // the reload.
1045                    if ldap_configured &&
1046                        ldap_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1047                            error!("ldap tls acceptor did not accept the reload, the server may have failed!");
1048                        };
1049                    if http_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1050                        error!("http tls acceptor did not accept the reload, the server may have failed!");
1051                        break;
1052                    };
1053                }
1054            }
1055        }
1056        info!("Stopped {}", TaskName::TlsAcceptorReload);
1057    });
1058
1059    // Setup timed events associated to the write thread
1060    let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1061    // Setup timed events associated to the read thread
1062    let maybe_backup_handle = match &config.online_backup {
1063        Some(online_backup_config) => {
1064            if online_backup_config.enabled {
1065                let handle = IntervalActor::start_online_backup(
1066                    server_read_ref,
1067                    online_backup_config,
1068                    broadcast_tx.subscribe(),
1069                )?;
1070                Some(handle)
1071            } else {
1072                debug!("Backups disabled");
1073                None
1074            }
1075        }
1076        None => {
1077            debug!("Online backup not requested, skipping");
1078            None
1079        }
1080    };
1081
1082    // If we have been requested to init LDAP, configure it now.
1083    let maybe_ldap_acceptor_handle = match &config.ldapbindaddress {
1084        Some(la) => {
1085            let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1086
1087            let h = ldaps::create_ldap_server(
1088                la.as_str(),
1089                opt_ldap_ssl_acceptor,
1090                server_read_ref,
1091                broadcast_tx.subscribe(),
1092                ldap_tls_acceptor_reload_rx,
1093                config.ldap_client_address_info.trusted_proxy_v2(),
1094            )
1095            .await?;
1096            Some(h)
1097        }
1098        None => {
1099            debug!("LDAP not requested, skipping");
1100            None
1101        }
1102    };
1103
1104    // If we have replication configured, setup the listener with its initial replication
1105    // map (if any).
1106    let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1107        Some(rc) => {
1108            if !config_test {
1109                // ⚠️  only start the sockets and listeners in non-config-test modes.
1110                let (h, repl_ctrl_tx) =
1111                    repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1112                        .await?;
1113                (Some(h), Some(repl_ctrl_tx))
1114            } else {
1115                (None, None)
1116            }
1117        }
1118        None => {
1119            debug!("Replication not requested, skipping");
1120            (None, None)
1121        }
1122    };
1123
1124    let maybe_http_acceptor_handle = if config_test {
1125        admin_info!("This config rocks! 🪨 ");
1126        None
1127    } else {
1128        let h: task::JoinHandle<()> = match https::create_https_server(
1129            config.clone(),
1130            jws_signer,
1131            status_ref,
1132            server_write_ref,
1133            server_read_ref,
1134            broadcast_tx.clone(),
1135            maybe_tls_acceptor,
1136            http_tls_acceptor_reload_rx,
1137        )
1138        .await
1139        {
1140            Ok(h) => h,
1141            Err(e) => {
1142                error!("Failed to start HTTPS server -> {:?}", e);
1143                return Err(());
1144            }
1145        };
1146        if config.role != ServerRole::WriteReplicaNoUI {
1147            admin_info!("ready to rock! 🪨  UI available at: {}", config.origin);
1148        } else {
1149            admin_info!("ready to rock! 🪨 ");
1150        }
1151        Some(h)
1152    };
1153
1154    // If we are NOT in integration test mode, start the admin socket now
1155    let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1156        let broadcast_rx = broadcast_tx.subscribe();
1157
1158        let admin_handle = AdminActor::create_admin_sock(
1159            config.adminbindpath.as_str(),
1160            server_write_ref,
1161            server_read_ref,
1162            broadcast_rx,
1163            maybe_repl_ctrl_tx,
1164        )
1165        .await?;
1166
1167        Some(admin_handle)
1168    } else {
1169        None
1170    };
1171
1172    let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1173        (TaskName::IntervalActor, interval_handle),
1174        (TaskName::DelayedActionActor, delayed_handle),
1175        (TaskName::AuditdActor, auditd_handle),
1176        (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1177    ];
1178
1179    if let Some(backup_handle) = maybe_backup_handle {
1180        handles.push((TaskName::BackupActor, backup_handle))
1181    }
1182
1183    if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1184        handles.push((TaskName::AdminSocket, admin_sock_handle))
1185    }
1186
1187    if let Some(ldap_handle) = maybe_ldap_acceptor_handle {
1188        handles.push((TaskName::LdapActor, ldap_handle))
1189    }
1190
1191    if let Some(http_handle) = maybe_http_acceptor_handle {
1192        handles.push((TaskName::HttpsServer, http_handle))
1193    }
1194
1195    if let Some(repl_handle) = maybe_repl_handle {
1196        handles.push((TaskName::Replication, repl_handle))
1197    }
1198
1199    Ok(CoreHandle {
1200        clean_shutdown: false,
1201        tls_acceptor_reload_notify,
1202        tx: broadcast_tx,
1203        handles,
1204    })
1205}