kanidmd_core/
lib.rs

1//! These contain the server "cores". These are able to startup the server
2//! (bootstrap) to a running state and then execute tasks. This is where modules
3//! are logically ordered based on their depenedncies for execution. Some of these
4//! are task-only i.e. reindexing, and some of these launch the server into a
5//! fully operational state (https, ldap, etc).
6//!
7//! Generally, this is the "entry point" where the server begins to run, and
8//! the entry point for all client traffic which is then directed to the
9//! various `actors`.
10
11#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23
24#[macro_use]
25extern crate tracing;
26#[macro_use]
27extern crate kanidmd_lib;
28
29mod actors;
30pub mod admin;
31pub mod config;
32mod crypto;
33mod https;
34mod interval;
35mod ldaps;
36mod repl;
37mod utils;
38
39use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
40use crate::admin::AdminActor;
41use crate::config::{Configuration, ServerRole};
42use crate::interval::IntervalActor;
43use crate::utils::touch_file_or_quit;
44use compact_jwt::{JwsHs256Signer, JwsSigner};
45use kanidm_proto::internal::OperationError;
46use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
47use kanidmd_lib::idm::ldap::LdapServer;
48use kanidmd_lib::prelude::*;
49use kanidmd_lib::schema::Schema;
50use kanidmd_lib::status::StatusActor;
51use kanidmd_lib::value::CredentialType;
52#[cfg(not(target_family = "windows"))]
53use libc::umask;
54use std::fmt::{Display, Formatter};
55use std::path::Path;
56use std::sync::Arc;
57use tokio::sync::broadcast;
58use tokio::sync::mpsc;
59use tokio::sync::Notify;
60use tokio::task;
61
62// === internal setup helpers
63
64fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
65    setup_backend_vacuum(config, schema, false)
66}
67
68fn setup_backend_vacuum(
69    config: &Configuration,
70    schema: &Schema,
71    vacuum: bool,
72) -> Result<Backend, OperationError> {
73    // Limit the scope of the schema txn.
74    // let schema_txn = task::block_on(schema.write());
75    let schema_txn = schema.write();
76    let idxmeta = schema_txn.reload_idxmeta();
77
78    let pool_size: u32 = config.threads as u32;
79
80    let cfg = BackendConfig::new(
81        config.db_path.as_deref(),
82        pool_size,
83        config.db_fs_type.unwrap_or_default(),
84        config.db_arc_size,
85    );
86
87    Backend::new(cfg, idxmeta, vacuum)
88}
89
90// TODO #54: We could move most of the be/schema/qs setup and startup
91// outside of this call, then pass in "what we need" in a cloneable
92// form, this way we could have separate Idm vs Qs threads, and dedicated
93// threads for write vs read
94async fn setup_qs_idms(
95    be: Backend,
96    schema: Schema,
97    config: &Configuration,
98) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
99    let curtime = duration_from_epoch_now();
100    // Create a query_server implementation
101    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
102
103    // TODO #62: Should the IDM parts be broken out to the IdmServer?
104    // What's important about this initial setup here is that it also triggers
105    // the schema and acp reload, so they are now configured correctly!
106    // Initialise the schema core.
107    //
108    // Now search for the schema itself, and validate that the system
109    // in memory matches the BE on disk, and that it's syntactically correct.
110    // Write it out if changes are needed.
111    query_server
112        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
113        .await?;
114
115    // We generate a SINGLE idms only!
116    let is_integration_test = config.integration_test_config.is_some();
117    let (idms, idms_delayed, idms_audit) = IdmServer::new(
118        query_server.clone(),
119        &config.origin,
120        is_integration_test,
121        curtime,
122    )
123    .await?;
124
125    Ok((query_server, idms, idms_delayed, idms_audit))
126}
127
128async fn setup_qs(
129    be: Backend,
130    schema: Schema,
131    config: &Configuration,
132) -> Result<QueryServer, OperationError> {
133    let curtime = duration_from_epoch_now();
134    // Create a query_server implementation
135    let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
136
137    // TODO #62: Should the IDM parts be broken out to the IdmServer?
138    // What's important about this initial setup here is that it also triggers
139    // the schema and acp reload, so they are now configured correctly!
140    // Initialise the schema core.
141    //
142    // Now search for the schema itself, and validate that the system
143    // in memory matches the BE on disk, and that it's syntactically correct.
144    // Write it out if changes are needed.
145    query_server
146        .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
147        .await?;
148
149    Ok(query_server)
150}
151
152macro_rules! dbscan_setup_be {
153    (
154        $config:expr
155    ) => {{
156        let schema = match Schema::new() {
157            Ok(s) => s,
158            Err(e) => {
159                error!("Failed to setup in memory schema: {:?}", e);
160                std::process::exit(1);
161            }
162        };
163
164        match setup_backend($config, &schema) {
165            Ok(be) => be,
166            Err(e) => {
167                error!("Failed to setup BE: {:?}", e);
168                return;
169            }
170        }
171    }};
172}
173
174pub fn dbscan_list_indexes_core(config: &Configuration) {
175    let be = dbscan_setup_be!(config);
176    let mut be_rotxn = match be.read() {
177        Ok(txn) => txn,
178        Err(err) => {
179            error!(?err, "Unable to proceed, backend read transaction failure.");
180            return;
181        }
182    };
183
184    match be_rotxn.list_indexes() {
185        Ok(mut idx_list) => {
186            idx_list.sort_unstable();
187            idx_list.iter().for_each(|idx_name| {
188                println!("{}", idx_name);
189            })
190        }
191        Err(e) => {
192            error!("Failed to retrieve index list: {:?}", e);
193        }
194    };
195}
196
197pub fn dbscan_list_id2entry_core(config: &Configuration) {
198    let be = dbscan_setup_be!(config);
199    let mut be_rotxn = match be.read() {
200        Ok(txn) => txn,
201        Err(err) => {
202            error!(?err, "Unable to proceed, backend read transaction failure.");
203            return;
204        }
205    };
206
207    match be_rotxn.list_id2entry() {
208        Ok(mut id_list) => {
209            id_list.sort_unstable_by_key(|k| k.0);
210            id_list.iter().for_each(|(id, value)| {
211                println!("{:>8}: {}", id, value);
212            })
213        }
214        Err(e) => {
215            error!("Failed to retrieve id2entry list: {:?}", e);
216        }
217    };
218}
219
220pub fn dbscan_list_index_analysis_core(config: &Configuration) {
221    let _be = dbscan_setup_be!(config);
222    // TBD in after slopes merge.
223}
224
225pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
226    let be = dbscan_setup_be!(config);
227    let mut be_rotxn = match be.read() {
228        Ok(txn) => txn,
229        Err(err) => {
230            error!(?err, "Unable to proceed, backend read transaction failure.");
231            return;
232        }
233    };
234
235    match be_rotxn.list_index_content(index_name) {
236        Ok(mut idx_list) => {
237            idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
238            idx_list.iter().for_each(|(key, value)| {
239                println!("{:>50}: {:?}", key, value);
240            })
241        }
242        Err(e) => {
243            error!("Failed to retrieve index list: {:?}", e);
244        }
245    };
246}
247
248pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
249    let be = dbscan_setup_be!(config);
250    let mut be_rotxn = match be.read() {
251        Ok(txn) => txn,
252        Err(err) => {
253            error!(?err, "Unable to proceed, backend read transaction failure.");
254            return;
255        }
256    };
257
258    match be_rotxn.get_id2entry(id) {
259        Ok((id, value)) => println!("{:>8}: {}", id, value),
260        Err(e) => {
261            error!("Failed to retrieve id2entry value: {:?}", e);
262        }
263    };
264}
265
266pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
267    let be = dbscan_setup_be!(config);
268    let mut be_wrtxn = match be.write() {
269        Ok(txn) => txn,
270        Err(err) => {
271            error!(
272                ?err,
273                "Unable to proceed, backend write transaction failure."
274            );
275            return;
276        }
277    };
278
279    match be_wrtxn
280        .quarantine_entry(id)
281        .and_then(|_| be_wrtxn.commit())
282    {
283        Ok(()) => {
284            println!("quarantined - {:>8}", id)
285        }
286        Err(e) => {
287            error!("Failed to quarantine id2entry value: {:?}", e);
288        }
289    };
290}
291
292pub fn dbscan_list_quarantined_core(config: &Configuration) {
293    let be = dbscan_setup_be!(config);
294    let mut be_rotxn = match be.read() {
295        Ok(txn) => txn,
296        Err(err) => {
297            error!(?err, "Unable to proceed, backend read transaction failure.");
298            return;
299        }
300    };
301
302    match be_rotxn.list_quarantined() {
303        Ok(mut id_list) => {
304            id_list.sort_unstable_by_key(|k| k.0);
305            id_list.iter().for_each(|(id, value)| {
306                println!("{:>8}: {}", id, value);
307            })
308        }
309        Err(e) => {
310            error!("Failed to retrieve id2entry list: {:?}", e);
311        }
312    };
313}
314
315pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
316    let be = dbscan_setup_be!(config);
317    let mut be_wrtxn = match be.write() {
318        Ok(txn) => txn,
319        Err(err) => {
320            error!(
321                ?err,
322                "Unable to proceed, backend write transaction failure."
323            );
324            return;
325        }
326    };
327
328    match be_wrtxn
329        .restore_quarantined(id)
330        .and_then(|_| be_wrtxn.commit())
331    {
332        Ok(()) => {
333            println!("restored - {:>8}", id)
334        }
335        Err(e) => {
336            error!("Failed to restore quarantined id2entry value: {:?}", e);
337        }
338    };
339}
340
341pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
342    let schema = match Schema::new() {
343        Ok(s) => s,
344        Err(e) => {
345            error!("Failed to setup in memory schema: {:?}", e);
346            std::process::exit(1);
347        }
348    };
349
350    let be = match setup_backend(config, &schema) {
351        Ok(be) => be,
352        Err(e) => {
353            error!("Failed to setup BE: {:?}", e);
354            return;
355        }
356    };
357
358    let mut be_ro_txn = match be.read() {
359        Ok(txn) => txn,
360        Err(err) => {
361            error!(?err, "Unable to proceed, backend read transaction failure.");
362            return;
363        }
364    };
365
366    let r = be_ro_txn.backup(dst_path);
367    match r {
368        Ok(_) => info!("Backup success!"),
369        Err(e) => {
370            error!("Backup failed: {:?}", e);
371            std::process::exit(1);
372        }
373    };
374    // Let the txn abort, even on success.
375}
376
377pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
378    // If it's an in memory database, we don't need to touch anything
379    if let Some(db_path) = config.db_path.as_ref() {
380        touch_file_or_quit(db_path);
381    }
382
383    // First, we provide the in-memory schema so that core attrs are indexed correctly.
384    let schema = match Schema::new() {
385        Ok(s) => s,
386        Err(e) => {
387            error!("Failed to setup in memory schema: {:?}", e);
388            std::process::exit(1);
389        }
390    };
391
392    let be = match setup_backend(config, &schema) {
393        Ok(be) => be,
394        Err(e) => {
395            error!("Failed to setup backend: {:?}", e);
396            return;
397        }
398    };
399
400    let mut be_wr_txn = match be.write() {
401        Ok(txn) => txn,
402        Err(err) => {
403            error!(
404                ?err,
405                "Unable to proceed, backend write transaction failure."
406            );
407            return;
408        }
409    };
410    let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
411
412    if r.is_err() {
413        error!("Failed to restore database: {:?}", r);
414        std::process::exit(1);
415    }
416    info!("Database loaded successfully");
417
418    reindex_inner(be, schema, config).await;
419
420    info!("✅ Restore Success!");
421}
422
423pub async fn reindex_server_core(config: &Configuration) {
424    info!("Start Index Phase 1 ...");
425    // First, we provide the in-memory schema so that core attrs are indexed correctly.
426    let schema = match Schema::new() {
427        Ok(s) => s,
428        Err(e) => {
429            error!("Failed to setup in memory schema: {:?}", e);
430            std::process::exit(1);
431        }
432    };
433
434    let be = match setup_backend(config, &schema) {
435        Ok(be) => be,
436        Err(e) => {
437            error!("Failed to setup BE: {:?}", e);
438            return;
439        }
440    };
441
442    reindex_inner(be, schema, config).await;
443
444    info!("✅ Reindex Success!");
445}
446
447async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
448    // Reindex only the core schema attributes to bootstrap the process.
449    let mut be_wr_txn = match be.write() {
450        Ok(txn) => txn,
451        Err(err) => {
452            error!(
453                ?err,
454                "Unable to proceed, backend write transaction failure."
455            );
456            return;
457        }
458    };
459
460    let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
461
462    // Now that's done, setup a minimal qs and reindex from that.
463    if r.is_err() {
464        error!("Failed to reindex database: {:?}", r);
465        std::process::exit(1);
466    }
467    info!("Index Phase 1 Success!");
468
469    info!("Attempting to init query server ...");
470
471    let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
472        Ok(t) => t,
473        Err(e) => {
474            error!("Unable to setup query server or idm server -> {:?}", e);
475            return;
476        }
477    };
478    info!("Init Query Server Success!");
479
480    info!("Start Index Phase 2 ...");
481
482    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
483        error!("Unable to acquire write transaction");
484        return;
485    };
486    let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
487
488    match r {
489        Ok(_) => info!("Index Phase 2 Success!"),
490        Err(e) => {
491            error!("Reindex failed: {:?}", e);
492            std::process::exit(1);
493        }
494    };
495}
496
497pub fn vacuum_server_core(config: &Configuration) {
498    let schema = match Schema::new() {
499        Ok(s) => s,
500        Err(e) => {
501            eprintln!("Failed to setup in memory schema: {:?}", e);
502            std::process::exit(1);
503        }
504    };
505
506    // The schema doesn't matter here. Vacuum is run as part of db open to avoid
507    // locking.
508    let r = setup_backend_vacuum(config, &schema, true);
509
510    match r {
511        Ok(_) => eprintln!("Vacuum Success!"),
512        Err(e) => {
513            eprintln!("Vacuum failed: {:?}", e);
514            std::process::exit(1);
515        }
516    };
517}
518
519pub async fn domain_rename_core(config: &Configuration) {
520    let schema = match Schema::new() {
521        Ok(s) => s,
522        Err(e) => {
523            eprintln!("Failed to setup in memory schema: {:?}", e);
524            std::process::exit(1);
525        }
526    };
527
528    // Start the backend.
529    let be = match setup_backend(config, &schema) {
530        Ok(be) => be,
531        Err(e) => {
532            error!("Failed to setup BE: {:?}", e);
533            return;
534        }
535    };
536
537    // Setup the qs, and perform any migrations and changes we may have.
538    let qs = match setup_qs(be, schema, config).await {
539        Ok(t) => t,
540        Err(e) => {
541            error!("Unable to setup query server -> {:?}", e);
542            return;
543        }
544    };
545
546    let new_domain_name = config.domain.as_str();
547
548    // make sure we're actually changing the domain name...
549    match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
550        Ok(old_domain_name) => {
551            admin_info!(?old_domain_name, ?new_domain_name);
552            if old_domain_name == new_domain_name {
553                admin_info!("Domain name not changing, stopping.");
554                return;
555            }
556            admin_debug!(
557                "Domain name is changing from {:?} to {:?}",
558                old_domain_name,
559                new_domain_name
560            );
561        }
562        Err(e) => {
563            admin_error!("Failed to query domain name, quitting! -> {:?}", e);
564            return;
565        }
566    }
567
568    let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
569        error!("Unable to acquire write transaction");
570        return;
571    };
572    let r = qs_write
573        .danger_domain_rename(new_domain_name)
574        .and_then(|_| qs_write.commit());
575
576    match r {
577        Ok(_) => info!("Domain Rename Success!"),
578        Err(e) => {
579            error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
580            std::process::exit(1);
581        }
582    };
583}
584
585pub async fn verify_server_core(config: &Configuration) {
586    let curtime = duration_from_epoch_now();
587    // setup the qs - without initialise!
588    let schema_mem = match Schema::new() {
589        Ok(sc) => sc,
590        Err(e) => {
591            error!("Failed to setup in memory schema: {:?}", e);
592            return;
593        }
594    };
595    // Setup the be
596    let be = match setup_backend(config, &schema_mem) {
597        Ok(be) => be,
598        Err(e) => {
599            error!("Failed to setup BE: {:?}", e);
600            return;
601        }
602    };
603
604    let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
605        Ok(qs) => qs,
606        Err(err) => {
607            error!(?err, "Failed to setup query server");
608            return;
609        }
610    };
611
612    // Run verifications.
613    let r = server.verify().await;
614
615    if r.is_empty() {
616        eprintln!("Verification passed!");
617        std::process::exit(0);
618    } else {
619        for er in r {
620            error!("{:?}", er);
621        }
622        std::process::exit(1);
623    }
624
625    // Now add IDM server verifications?
626}
627
628pub fn cert_generate_core(config: &Configuration) {
629    // Get the cert root
630
631    let (tls_key_path, tls_chain_path) = match &config.tls_config {
632        Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
633        None => {
634            error!("Unable to find TLS configuration");
635            std::process::exit(1);
636        }
637    };
638
639    if tls_key_path.exists() && tls_chain_path.exists() {
640        info!(
641            "TLS key and chain already exist - remove them first if you intend to regenerate these"
642        );
643        return;
644    }
645
646    let origin = match Url::parse(&config.origin) {
647        Ok(url) => url,
648        Err(e) => {
649            error!(err = ?e, "Unable to parse origin URL - refusing to start. You must correct the value for origin. {:?}", config.origin);
650            std::process::exit(1);
651        }
652    };
653
654    let origin_domain = if let Some(d) = origin.domain() {
655        d
656    } else {
657        error!("origin does not contain a valid domain");
658        std::process::exit(1);
659    };
660
661    let cert_root = match tls_key_path.parent() {
662        Some(parent) => parent,
663        None => {
664            error!("Unable to find parent directory of {:?}", tls_key_path);
665            std::process::exit(1);
666        }
667    };
668
669    let ca_cert = cert_root.join("ca.pem");
670    let ca_key = cert_root.join("cakey.pem");
671    let tls_cert_path = cert_root.join("cert.pem");
672
673    let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
674        // Generate the CA again.
675        let ca_handle = match crypto::build_ca(None) {
676            Ok(ca_handle) => ca_handle,
677            Err(e) => {
678                error!(err = ?e, "Failed to build CA");
679                std::process::exit(1);
680            }
681        };
682
683        if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
684            error!("Failed to write CA");
685            std::process::exit(1);
686        }
687
688        ca_handle
689    } else {
690        match crypto::load_ca(ca_key, ca_cert) {
691            Ok(ca_handle) => ca_handle,
692            Err(_) => {
693                error!("Failed to load CA");
694                std::process::exit(1);
695            }
696        }
697    };
698
699    if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
700        // Generate the cert from the ca.
701        let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
702            Ok(cert_handle) => cert_handle,
703            Err(e) => {
704                error!(err = ?e, "Failed to build certificate");
705                std::process::exit(1);
706            }
707        };
708
709        if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
710            error!("Failed to write certificates");
711            std::process::exit(1);
712        }
713    }
714    info!("certificate generation complete");
715}
716
717#[derive(Clone, Debug)]
718pub enum CoreAction {
719    Shutdown,
720}
721
722pub(crate) enum TaskName {
723    AdminSocket,
724    AuditdActor,
725    BackupActor,
726    DelayedActionActor,
727    HttpsServer,
728    IntervalActor,
729    LdapActor,
730    Replication,
731    TlsAcceptorReload,
732}
733
734impl Display for TaskName {
735    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
736        write!(
737            f,
738            "{}",
739            match self {
740                TaskName::AdminSocket => "Admin Socket",
741                TaskName::AuditdActor => "Auditd Actor",
742                TaskName::BackupActor => "Backup Actor",
743                TaskName::DelayedActionActor => "Delayed Action Actor",
744                TaskName::HttpsServer => "HTTPS Server",
745                TaskName::IntervalActor => "Interval Actor",
746                TaskName::LdapActor => "LDAP Acceptor Actor",
747                TaskName::Replication => "Replication",
748                TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
749            }
750        )
751    }
752}
753
754pub struct CoreHandle {
755    clean_shutdown: bool,
756    tx: broadcast::Sender<CoreAction>,
757    tls_acceptor_reload_notify: Arc<Notify>,
758    /// This stores a name for the handle, and the handle itself so we can tell which failed/succeeded at the end.
759    handles: Vec<(TaskName, task::JoinHandle<()>)>,
760}
761
762impl CoreHandle {
763    pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
764        self.tx.subscribe()
765    }
766
767    pub async fn shutdown(&mut self) {
768        if self.tx.send(CoreAction::Shutdown).is_err() {
769            eprintln!("No receivers acked shutdown request. Treating as unclean.");
770            return;
771        }
772
773        // Wait on the handles.
774        while let Some((handle_name, handle)) = self.handles.pop() {
775            if let Err(error) = handle.await {
776                eprintln!("Task {} failed to finish: {:?}", handle_name, error);
777            }
778        }
779
780        self.clean_shutdown = true;
781    }
782
783    pub async fn tls_acceptor_reload(&mut self) {
784        self.tls_acceptor_reload_notify.notify_one()
785    }
786}
787
788impl Drop for CoreHandle {
789    fn drop(&mut self) {
790        if !self.clean_shutdown {
791            eprintln!("⚠️  UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
792        }
793        // Can't enable yet until we clean up unix_int cache layer test
794        // debug_assert!(self.clean_shutdown);
795    }
796}
797
798pub async fn create_server_core(
799    config: Configuration,
800    config_test: bool,
801) -> Result<CoreHandle, ()> {
802    // Until this point, we probably want to write to the log macro fns.
803    let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
804
805    if config.integration_test_config.is_some() {
806        warn!("RUNNING IN INTEGRATION TEST MODE.");
807        warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
808    } else if config.tls_config.is_none() {
809        // TLS is great! We won't run without it.
810        error!("Running without TLS is not supported! Quitting!");
811        return Err(());
812    }
813
814    info!(
815        "Starting kanidm with {}configuration: {}",
816        if config_test { "TEST " } else { "" },
817        config
818    );
819    // Setup umask, so that every we touch or create is secure.
820    #[cfg(not(target_family = "windows"))]
821    unsafe {
822        umask(0o0027)
823    };
824
825    // Similar, create a stats task which aggregates statistics from the
826    // server as they come in.
827    let status_ref = StatusActor::start();
828
829    // Setup TLS (if any)
830    let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
831        Ok(tls_acc) => tls_acc,
832        Err(err) => {
833            error!(?err, "Failed to configure TLS acceptor");
834            return Err(());
835        }
836    };
837
838    let schema = match Schema::new() {
839        Ok(s) => s,
840        Err(e) => {
841            error!("Failed to setup in memory schema: {:?}", e);
842            return Err(());
843        }
844    };
845
846    // Setup the be for the qs.
847    let be = match setup_backend(&config, &schema) {
848        Ok(be) => be,
849        Err(e) => {
850            error!("Failed to setup BE -> {:?}", e);
851            return Err(());
852        }
853    };
854    // Start the IDM server.
855    let (_qs, idms, mut idms_delayed, mut idms_audit) =
856        match setup_qs_idms(be, schema, &config).await {
857            Ok(t) => t,
858            Err(e) => {
859                error!("Unable to setup query server or idm server -> {:?}", e);
860                return Err(());
861            }
862        };
863
864    // Extract any configuration from the IDMS that we may need.
865    // For now we just do this per run, but we need to extract this from the db later.
866    let jws_signer = match JwsHs256Signer::generate_hs256() {
867        Ok(k) => k.set_sign_option_embed_kid(false),
868        Err(e) => {
869            error!("Unable to setup jws signer -> {:?}", e);
870            return Err(());
871        }
872    };
873
874    // Any pre-start tasks here.
875    if let Some(itc) = &config.integration_test_config {
876        let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
877            error!("Unable to acquire write transaction");
878            return Err(());
879        };
880        // We need to set the admin pw.
881        match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
882            Ok(_) => {}
883            Err(e) => {
884                error!(
885                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
886                    &itc.admin_user, e
887                );
888                return Err(());
889            }
890        };
891        // set the idm_admin account password
892        match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
893            Ok(_) => {}
894            Err(e) => {
895                error!(
896                    "Unable to configure INTEGRATION TEST {} account -> {:?}",
897                    &itc.idm_admin_user, e
898                );
899                return Err(());
900            }
901        };
902
903        // Add admin to idm_admins to allow tests more flexibility wrt to permissions.
904        // This way our default access controls can be stricter to prevent lateral
905        // movement.
906        match idms_prox_write.qs_write.internal_modify_uuid(
907            UUID_IDM_ADMINS,
908            &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
909        ) {
910            Ok(_) => {}
911            Err(e) => {
912                error!(
913                    "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
914                    e
915                );
916                return Err(());
917            }
918        };
919
920        match idms_prox_write.qs_write.internal_modify_uuid(
921            UUID_IDM_ALL_PERSONS,
922            &ModifyList::new_purge_and_set(
923                Attribute::CredentialTypeMinimum,
924                CredentialType::Any.into(),
925            ),
926        ) {
927            Ok(_) => {}
928            Err(e) => {
929                error!(
930                    "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
931                    e
932                );
933                return Err(());
934            }
935        };
936
937        match idms_prox_write.commit() {
938            Ok(_) => {}
939            Err(e) => {
940                error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
941                return Err(());
942            }
943        }
944    }
945
946    let ldap = match LdapServer::new(&idms).await {
947        Ok(l) => l,
948        Err(e) => {
949            error!("Unable to start LdapServer -> {:?}", e);
950            return Err(());
951        }
952    };
953
954    // Arc the idms and ldap
955    let idms_arc = Arc::new(idms);
956    let ldap_arc = Arc::new(ldap);
957
958    // Pass it to the actor for threading.
959    // Start the read query server with the given be path: future config
960    let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
961
962    // Create the server async write entry point.
963    let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
964
965    let delayed_handle = task::spawn(async move {
966        let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
967        loop {
968            tokio::select! {
969                added = idms_delayed.recv_many(&mut buffer) => {
970                    if added == 0 {
971                        // Channel has closed, stop the task.
972                        break
973                    }
974                    server_write_ref.handle_delayedaction(&mut buffer).await;
975                }
976                Ok(action) = broadcast_rx.recv() => {
977                    match action {
978                        CoreAction::Shutdown => break,
979                    }
980                }
981            }
982        }
983        info!("Stopped {}", TaskName::DelayedActionActor);
984    });
985
986    let mut broadcast_rx = broadcast_tx.subscribe();
987
988    let auditd_handle = task::spawn(async move {
989        loop {
990            tokio::select! {
991                Ok(action) = broadcast_rx.recv() => {
992                    match action {
993                        CoreAction::Shutdown => break,
994                    }
995                }
996                audit_event = idms_audit.audit_rx().recv() => {
997                    match serde_json::to_string(&audit_event) {
998                        Ok(audit_event) => {
999                            warn!(%audit_event);
1000                        }
1001                        Err(e) => {
1002                            error!(err=?e, "Unable to process audit event to json.");
1003                            warn!(?audit_event, json=false);
1004                        }
1005                    }
1006
1007                }
1008            }
1009        }
1010        info!("Stopped {}", TaskName::AuditdActor);
1011    });
1012
1013    // Setup a TLS Acceptor Reload trigger.
1014
1015    let mut broadcast_rx = broadcast_tx.subscribe();
1016    let tls_acceptor_reload_notify = Arc::new(Notify::new());
1017    let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1018    let tls_config = config.tls_config.clone();
1019
1020    let ldap_configured = config.ldapbindaddress.is_some();
1021    let (ldap_tls_acceptor_reload_tx, ldap_tls_acceptor_reload_rx) = mpsc::channel(1);
1022    let (http_tls_acceptor_reload_tx, http_tls_acceptor_reload_rx) = mpsc::channel(1);
1023
1024    let tls_acceptor_reload_handle = task::spawn(async move {
1025        loop {
1026            tokio::select! {
1027                Ok(action) = broadcast_rx.recv() => {
1028                    match action {
1029                        CoreAction::Shutdown => break,
1030                    }
1031                }
1032                _ = tls_accepter_reload_task_notify.notified() => {
1033                    let tls_acceptor = match crypto::setup_tls(&tls_config) {
1034                        Ok(Some(tls_acc)) => tls_acc,
1035                        Ok(None) => {
1036                            warn!("TLS not configured, ignoring reload request.");
1037                            continue;
1038                        }
1039                        Err(err) => {
1040                            error!(?err, "Failed to configure and reload TLS acceptor");
1041                            continue;
1042                        }
1043                    };
1044
1045                    // We don't log here as the receivers will notify when they have completed
1046                    // the reload.
1047                    if ldap_configured &&
1048                        ldap_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1049                            error!("ldap tls acceptor did not accept the reload, the server may have failed!");
1050                        };
1051                    if http_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1052                        error!("http tls acceptor did not accept the reload, the server may have failed!");
1053                        break;
1054                    };
1055                }
1056            }
1057        }
1058        info!("Stopped {}", TaskName::TlsAcceptorReload);
1059    });
1060
1061    // Setup timed events associated to the write thread
1062    let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1063    // Setup timed events associated to the read thread
1064    let maybe_backup_handle = match &config.online_backup {
1065        Some(online_backup_config) => {
1066            if online_backup_config.enabled {
1067                let handle = IntervalActor::start_online_backup(
1068                    server_read_ref,
1069                    online_backup_config,
1070                    broadcast_tx.subscribe(),
1071                )?;
1072                Some(handle)
1073            } else {
1074                debug!("Backups disabled");
1075                None
1076            }
1077        }
1078        None => {
1079            debug!("Online backup not requested, skipping");
1080            None
1081        }
1082    };
1083
1084    // If we have been requested to init LDAP, configure it now.
1085    let maybe_ldap_acceptor_handle = match &config.ldapbindaddress {
1086        Some(la) => {
1087            let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1088
1089            let h = ldaps::create_ldap_server(
1090                la.as_str(),
1091                opt_ldap_ssl_acceptor,
1092                server_read_ref,
1093                broadcast_tx.subscribe(),
1094                ldap_tls_acceptor_reload_rx,
1095                config.ldap_client_address_info.trusted_proxy_v2(),
1096            )
1097            .await?;
1098            Some(h)
1099        }
1100        None => {
1101            debug!("LDAP not requested, skipping");
1102            None
1103        }
1104    };
1105
1106    // If we have replication configured, setup the listener with its initial replication
1107    // map (if any).
1108    let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1109        Some(rc) => {
1110            if !config_test {
1111                // ⚠️  only start the sockets and listeners in non-config-test modes.
1112                let (h, repl_ctrl_tx) =
1113                    repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1114                        .await?;
1115                (Some(h), Some(repl_ctrl_tx))
1116            } else {
1117                (None, None)
1118            }
1119        }
1120        None => {
1121            debug!("Replication not requested, skipping");
1122            (None, None)
1123        }
1124    };
1125
1126    let maybe_http_acceptor_handle = if config_test {
1127        admin_info!("This config rocks! 🪨 ");
1128        None
1129    } else {
1130        let h: task::JoinHandle<()> = match https::create_https_server(
1131            config.clone(),
1132            jws_signer,
1133            status_ref,
1134            server_write_ref,
1135            server_read_ref,
1136            broadcast_tx.clone(),
1137            maybe_tls_acceptor,
1138            http_tls_acceptor_reload_rx,
1139        )
1140        .await
1141        {
1142            Ok(h) => h,
1143            Err(e) => {
1144                error!("Failed to start HTTPS server -> {:?}", e);
1145                return Err(());
1146            }
1147        };
1148        if config.role != ServerRole::WriteReplicaNoUI {
1149            admin_info!("ready to rock! 🪨  UI available at: {}", config.origin);
1150        } else {
1151            admin_info!("ready to rock! 🪨 ");
1152        }
1153        Some(h)
1154    };
1155
1156    // If we are NOT in integration test mode, start the admin socket now
1157    let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1158        let broadcast_rx = broadcast_tx.subscribe();
1159
1160        let admin_handle = AdminActor::create_admin_sock(
1161            config.adminbindpath.as_str(),
1162            server_write_ref,
1163            server_read_ref,
1164            broadcast_rx,
1165            maybe_repl_ctrl_tx,
1166        )
1167        .await?;
1168
1169        Some(admin_handle)
1170    } else {
1171        None
1172    };
1173
1174    let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1175        (TaskName::IntervalActor, interval_handle),
1176        (TaskName::DelayedActionActor, delayed_handle),
1177        (TaskName::AuditdActor, auditd_handle),
1178        (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1179    ];
1180
1181    if let Some(backup_handle) = maybe_backup_handle {
1182        handles.push((TaskName::BackupActor, backup_handle))
1183    }
1184
1185    if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1186        handles.push((TaskName::AdminSocket, admin_sock_handle))
1187    }
1188
1189    if let Some(ldap_handle) = maybe_ldap_acceptor_handle {
1190        handles.push((TaskName::LdapActor, ldap_handle))
1191    }
1192
1193    if let Some(http_handle) = maybe_http_acceptor_handle {
1194        handles.push((TaskName::HttpsServer, http_handle))
1195    }
1196
1197    if let Some(repl_handle) = maybe_repl_handle {
1198        handles.push((TaskName::Replication, repl_handle))
1199    }
1200
1201    Ok(CoreHandle {
1202        clean_shutdown: false,
1203        tls_acceptor_reload_notify,
1204        tx: broadcast_tx,
1205        handles,
1206    })
1207}