1#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23#![deny(clippy::indexing_slicing)]
24
25#[macro_use]
26extern crate tracing;
27#[macro_use]
28extern crate kanidmd_lib;
29
30mod actors;
31pub mod admin;
32pub mod config;
33mod crypto;
34mod https;
35mod interval;
36mod ldaps;
37mod repl;
38mod tcp;
39mod utils;
40
41use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
42use crate::admin::AdminActor;
43use crate::config::Configuration;
44use crate::interval::IntervalActor;
45use crate::utils::touch_file_or_quit;
46use compact_jwt::{JwsHs256Signer, JwsSigner};
47use crypto_glue::{
48 s256::{Sha256, Sha256Output},
49 traits::Digest,
50};
51use kanidm_proto::backup::BackupCompression;
52use kanidm_proto::config::ServerRole;
53use kanidm_proto::internal::OperationError;
54use kanidm_proto::scim_v1::client::ScimAssertGeneric;
55use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
56use kanidmd_lib::idm::ldap::LdapServer;
57use kanidmd_lib::prelude::*;
58use kanidmd_lib::schema::Schema;
59use kanidmd_lib::status::StatusActor;
60use kanidmd_lib::value::CredentialType;
61use regex::Regex;
62use std::collections::BTreeSet;
63use std::fmt::{Display, Formatter};
64use std::path::Path;
65use std::path::PathBuf;
66use std::sync::Arc;
67use std::sync::LazyLock;
68use tokio::sync::broadcast;
69use tokio::task;
70
71#[cfg(not(target_family = "windows"))]
72use libc::umask;
73
74fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
77 setup_backend_vacuum(config, schema, false)
78}
79
80fn setup_backend_vacuum(
81 config: &Configuration,
82 schema: &Schema,
83 vacuum: bool,
84) -> Result<Backend, OperationError> {
85 let schema_txn = schema.write();
88 let idxmeta = schema_txn.reload_idxmeta();
89
90 let pool_size: u32 = config.threads as u32;
91
92 let cfg = BackendConfig::new(
93 config.db_path.as_deref(),
94 pool_size,
95 config.db_fs_type.unwrap_or_default(),
96 config.db_arc_size,
97 );
98
99 Backend::new(cfg, idxmeta, vacuum)
100}
101
102async fn setup_qs_idms(
107 be: Backend,
108 schema: Schema,
109 config: &Configuration,
110) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
111 let curtime = duration_from_epoch_now();
112 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
114
115 query_server
124 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
125 .await?;
126
127 let is_integration_test = config.integration_test_config.is_some();
129 let (idms, idms_delayed, idms_audit) = IdmServer::new(
130 query_server.clone(),
131 &config.origin,
132 is_integration_test,
133 curtime,
134 )
135 .await?;
136
137 Ok((query_server, idms, idms_delayed, idms_audit))
138}
139
140async fn setup_qs(
141 be: Backend,
142 schema: Schema,
143 config: &Configuration,
144) -> Result<QueryServer, OperationError> {
145 let curtime = duration_from_epoch_now();
146 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
148
149 query_server
158 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
159 .await?;
160
161 Ok(query_server)
162}
163
164macro_rules! dbscan_setup_be {
165 (
166 $config:expr
167 ) => {{
168 let schema = match Schema::new() {
169 Ok(s) => s,
170 Err(e) => {
171 error!("Failed to setup in memory schema: {:?}", e);
172 std::process::exit(1);
173 }
174 };
175
176 match setup_backend($config, &schema) {
177 Ok(be) => be,
178 Err(e) => {
179 error!("Failed to setup BE: {:?}", e);
180 return;
181 }
182 }
183 }};
184}
185
186pub fn dbscan_list_indexes_core(config: &Configuration) {
187 let be = dbscan_setup_be!(config);
188 let mut be_rotxn = match be.read() {
189 Ok(txn) => txn,
190 Err(err) => {
191 error!(?err, "Unable to proceed, backend read transaction failure.");
192 return;
193 }
194 };
195
196 match be_rotxn.list_indexes() {
197 Ok(mut idx_list) => {
198 idx_list.sort_unstable();
199 idx_list.iter().for_each(|idx_name| {
200 println!("{idx_name}");
201 })
202 }
203 Err(e) => {
204 error!("Failed to retrieve index list: {:?}", e);
205 }
206 };
207}
208
209pub fn dbscan_list_id2entry_core(config: &Configuration) {
210 let be = dbscan_setup_be!(config);
211 let mut be_rotxn = match be.read() {
212 Ok(txn) => txn,
213 Err(err) => {
214 error!(?err, "Unable to proceed, backend read transaction failure.");
215 return;
216 }
217 };
218
219 match be_rotxn.list_id2entry() {
220 Ok(mut id_list) => {
221 id_list.sort_unstable_by_key(|k| k.0);
222 id_list.iter().for_each(|(id, value)| {
223 println!("{id:>8}: {value}");
224 })
225 }
226 Err(e) => {
227 error!("Failed to retrieve id2entry list: {:?}", e);
228 }
229 };
230}
231
232pub fn dbscan_list_index_analysis_core(config: &Configuration) {
233 let _be = dbscan_setup_be!(config);
234 }
236
237pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
238 let be = dbscan_setup_be!(config);
239 let mut be_rotxn = match be.read() {
240 Ok(txn) => txn,
241 Err(err) => {
242 error!(?err, "Unable to proceed, backend read transaction failure.");
243 return;
244 }
245 };
246
247 match be_rotxn.list_index_content(index_name) {
248 Ok(mut idx_list) => {
249 idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
250 idx_list.iter().for_each(|(key, value)| {
251 println!("{key:>50}: {value:?}");
252 })
253 }
254 Err(e) => {
255 error!("Failed to retrieve index list: {:?}", e);
256 }
257 };
258}
259
260pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
261 let be = dbscan_setup_be!(config);
262 let mut be_rotxn = match be.read() {
263 Ok(txn) => txn,
264 Err(err) => {
265 error!(?err, "Unable to proceed, backend read transaction failure.");
266 return;
267 }
268 };
269
270 match be_rotxn.get_id2entry(id) {
271 Ok((id, value)) => println!("{id:>8}: {value}"),
272 Err(e) => {
273 error!("Failed to retrieve id2entry value: {:?}", e);
274 }
275 };
276}
277
278pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
279 let be = dbscan_setup_be!(config);
280 let mut be_wrtxn = match be.write() {
281 Ok(txn) => txn,
282 Err(err) => {
283 error!(
284 ?err,
285 "Unable to proceed, backend write transaction failure."
286 );
287 return;
288 }
289 };
290
291 match be_wrtxn
292 .quarantine_entry(id)
293 .and_then(|_| be_wrtxn.commit())
294 {
295 Ok(()) => {
296 println!("quarantined - {id:>8}")
297 }
298 Err(e) => {
299 error!("Failed to quarantine id2entry value: {:?}", e);
300 }
301 };
302}
303
304pub fn dbscan_list_quarantined_core(config: &Configuration) {
305 let be = dbscan_setup_be!(config);
306 let mut be_rotxn = match be.read() {
307 Ok(txn) => txn,
308 Err(err) => {
309 error!(?err, "Unable to proceed, backend read transaction failure.");
310 return;
311 }
312 };
313
314 match be_rotxn.list_quarantined() {
315 Ok(mut id_list) => {
316 id_list.sort_unstable_by_key(|k| k.0);
317 id_list.iter().for_each(|(id, value)| {
318 println!("{id:>8}: {value}");
319 })
320 }
321 Err(e) => {
322 error!("Failed to retrieve id2entry list: {:?}", e);
323 }
324 };
325}
326
327pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
328 let be = dbscan_setup_be!(config);
329 let mut be_wrtxn = match be.write() {
330 Ok(txn) => txn,
331 Err(err) => {
332 error!(
333 ?err,
334 "Unable to proceed, backend write transaction failure."
335 );
336 return;
337 }
338 };
339
340 match be_wrtxn
341 .restore_quarantined(id)
342 .and_then(|_| be_wrtxn.commit())
343 {
344 Ok(()) => {
345 println!("restored - {id:>8}")
346 }
347 Err(e) => {
348 error!("Failed to restore quarantined id2entry value: {:?}", e);
349 }
350 };
351}
352
353pub fn backup_server_core(config: &Configuration, dst_path: Option<&Path>) {
354 let schema = match Schema::new() {
355 Ok(s) => s,
356 Err(e) => {
357 error!("Failed to setup in memory schema: {:?}", e);
358 std::process::exit(1);
359 }
360 };
361
362 let be = match setup_backend(config, &schema) {
363 Ok(be) => be,
364 Err(e) => {
365 error!("Failed to setup BE: {:?}", e);
366 return;
367 }
368 };
369
370 let mut be_ro_txn = match be.read() {
371 Ok(txn) => txn,
372 Err(err) => {
373 error!(?err, "Unable to proceed, backend read transaction failure.");
374 return;
375 }
376 };
377
378 let compression = match config.online_backup.as_ref() {
379 Some(backup_config) => backup_config.compression,
380 None => BackupCompression::default(),
381 };
382
383 if let Some(dst_path) = dst_path {
384 if dst_path.exists() {
385 error!(
386 "backup file {} already exists, will not overwrite it.",
387 dst_path.display()
388 );
389 return;
390 }
391
392 let output = match std::fs::File::create(dst_path) {
393 Ok(output) => output,
394 Err(err) => {
395 error!(?err, "File::create error creating {}", dst_path.display());
396 return;
397 }
398 };
399
400 match be_ro_txn.backup(output, compression) {
401 Ok(_) => info!("Backup success!"),
402 Err(e) => {
403 error!("Backup failed: {:?}", e);
404 std::process::exit(1);
405 }
406 };
407 } else {
408 let stdout = std::io::stdout().lock();
410
411 match be_ro_txn.backup(stdout, compression) {
412 Ok(_) => info!("Backup success!"),
413 Err(e) => {
414 error!("Backup failed: {:?}", e);
415 std::process::exit(1);
416 }
417 };
418 };
419 }
421
422pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
423 if let Some(db_path) = config.db_path.as_ref() {
425 touch_file_or_quit(db_path);
426 }
427
428 let schema = match Schema::new() {
430 Ok(s) => s,
431 Err(e) => {
432 error!("Failed to setup in memory schema: {:?}", e);
433 std::process::exit(1);
434 }
435 };
436
437 let be = match setup_backend(config, &schema) {
438 Ok(be) => be,
439 Err(e) => {
440 error!("Failed to setup backend: {:?}", e);
441 return;
442 }
443 };
444
445 let mut be_wr_txn = match be.write() {
446 Ok(txn) => txn,
447 Err(err) => {
448 error!(
449 ?err,
450 "Unable to proceed, backend write transaction failure."
451 );
452 return;
453 }
454 };
455
456 let compression = BackupCompression::identify_file(dst_path);
457
458 let input = match std::fs::File::open(dst_path) {
459 Ok(output) => output,
460 Err(err) => {
461 error!(?err, "File::open error reading {}", dst_path.display());
462 return;
463 }
464 };
465
466 let r = be_wr_txn
467 .restore(input, compression)
468 .and_then(|_| be_wr_txn.commit());
469
470 if r.is_err() {
471 error!("Failed to restore database: {:?}", r);
472 std::process::exit(1);
473 }
474 info!("Database loaded successfully");
475
476 reindex_inner(be, schema, config).await;
477
478 info!("✅ Restore Success!");
479}
480
481pub async fn reindex_server_core(config: &Configuration) {
482 info!("Start Index Phase 1 ...");
483 let schema = match Schema::new() {
485 Ok(s) => s,
486 Err(e) => {
487 error!("Failed to setup in memory schema: {:?}", e);
488 std::process::exit(1);
489 }
490 };
491
492 let be = match setup_backend(config, &schema) {
493 Ok(be) => be,
494 Err(e) => {
495 error!("Failed to setup BE: {:?}", e);
496 return;
497 }
498 };
499
500 reindex_inner(be, schema, config).await;
501
502 info!("✅ Reindex Success!");
503}
504
505async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
506 let mut be_wr_txn = match be.write() {
508 Ok(txn) => txn,
509 Err(err) => {
510 error!(
511 ?err,
512 "Unable to proceed, backend write transaction failure."
513 );
514 return;
515 }
516 };
517
518 let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
519
520 if r.is_err() {
522 error!("Failed to reindex database: {:?}", r);
523 std::process::exit(1);
524 }
525 info!("Index Phase 1 Success!");
526
527 info!("Attempting to init query server ...");
528
529 let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
530 Ok(t) => t,
531 Err(e) => {
532 error!("Unable to setup query server or idm server -> {:?}", e);
533 return;
534 }
535 };
536 info!("Init Query Server Success!");
537
538 info!("Start Index Phase 2 ...");
539
540 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
541 error!("Unable to acquire write transaction");
542 return;
543 };
544 let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
545
546 match r {
547 Ok(_) => info!("Index Phase 2 Success!"),
548 Err(e) => {
549 error!("Reindex failed: {:?}", e);
550 std::process::exit(1);
551 }
552 };
553}
554
555pub fn vacuum_server_core(config: &Configuration) {
556 let schema = match Schema::new() {
557 Ok(s) => s,
558 Err(e) => {
559 eprintln!("Failed to setup in memory schema: {e:?}");
560 std::process::exit(1);
561 }
562 };
563
564 let r = setup_backend_vacuum(config, &schema, true);
567
568 match r {
569 Ok(_) => eprintln!("Vacuum Success!"),
570 Err(e) => {
571 eprintln!("Vacuum failed: {e:?}");
572 std::process::exit(1);
573 }
574 };
575}
576
577pub async fn domain_rename_core(config: &Configuration) {
578 let schema = match Schema::new() {
579 Ok(s) => s,
580 Err(e) => {
581 eprintln!("Failed to setup in memory schema: {e:?}");
582 std::process::exit(1);
583 }
584 };
585
586 let be = match setup_backend(config, &schema) {
588 Ok(be) => be,
589 Err(e) => {
590 error!("Failed to setup BE: {:?}", e);
591 return;
592 }
593 };
594
595 let qs = match setup_qs(be, schema, config).await {
597 Ok(t) => t,
598 Err(e) => {
599 error!("Unable to setup query server -> {:?}", e);
600 return;
601 }
602 };
603
604 let new_domain_name = config.domain.as_str();
605
606 match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
608 Ok(old_domain_name) => {
609 admin_info!(?old_domain_name, ?new_domain_name);
610 if old_domain_name == new_domain_name {
611 admin_info!("Domain name not changing, stopping.");
612 return;
613 }
614 admin_debug!(
615 "Domain name is changing from {:?} to {:?}",
616 old_domain_name,
617 new_domain_name
618 );
619 }
620 Err(e) => {
621 admin_error!("Failed to query domain name, quitting! -> {:?}", e);
622 return;
623 }
624 }
625
626 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
627 error!("Unable to acquire write transaction");
628 return;
629 };
630 let r = qs_write
631 .danger_domain_rename(new_domain_name)
632 .and_then(|_| qs_write.commit());
633
634 match r {
635 Ok(_) => info!("Domain Rename Success!"),
636 Err(e) => {
637 error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
638 std::process::exit(1);
639 }
640 };
641}
642
643pub async fn verify_server_core(config: &Configuration) {
644 let curtime = duration_from_epoch_now();
645 let schema_mem = match Schema::new() {
647 Ok(sc) => sc,
648 Err(e) => {
649 error!("Failed to setup in memory schema: {:?}", e);
650 return;
651 }
652 };
653 let be = match setup_backend(config, &schema_mem) {
655 Ok(be) => be,
656 Err(e) => {
657 error!("Failed to setup BE: {:?}", e);
658 return;
659 }
660 };
661
662 let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
663 Ok(qs) => qs,
664 Err(err) => {
665 error!(?err, "Failed to setup query server");
666 return;
667 }
668 };
669
670 let r = server.verify().await;
672
673 if r.is_empty() {
674 eprintln!("Verification passed!");
675 std::process::exit(0);
676 } else {
677 for er in r {
678 error!("{:?}", er);
679 }
680 std::process::exit(1);
681 }
682
683 }
685
686pub fn cert_generate_core(config: &Configuration) {
687 let (tls_key_path, tls_chain_path) = match &config.tls_config {
690 Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
691 None => {
692 error!("Unable to find TLS configuration");
693 std::process::exit(1);
694 }
695 };
696
697 if tls_key_path.exists() && tls_chain_path.exists() {
698 info!(
699 "TLS key and chain already exist - remove them first if you intend to regenerate these"
700 );
701 return;
702 }
703
704 let origin_domain = match config.origin.domain() {
705 Some(val) => val,
706 None => {
707 error!("origin does not contain a valid domain");
708 std::process::exit(1);
709 }
710 };
711
712 let cert_root = match tls_key_path.parent() {
713 Some(parent) => parent,
714 None => {
715 error!("Unable to find parent directory of {:?}", tls_key_path);
716 std::process::exit(1);
717 }
718 };
719
720 let ca_cert = cert_root.join("ca.pem");
721 let ca_key = cert_root.join("cakey.pem");
722 let tls_cert_path = cert_root.join("cert.pem");
723
724 let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
725 let ca_handle = match crypto::build_ca(None) {
727 Ok(ca_handle) => ca_handle,
728 Err(e) => {
729 error!(err = ?e, "Failed to build CA");
730 std::process::exit(1);
731 }
732 };
733
734 if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
735 error!("Failed to write CA");
736 std::process::exit(1);
737 }
738
739 ca_handle
740 } else {
741 match crypto::load_ca(ca_key, ca_cert) {
742 Ok(ca_handle) => ca_handle,
743 Err(_) => {
744 error!("Failed to load CA");
745 std::process::exit(1);
746 }
747 }
748 };
749
750 if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
751 let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
753 Ok(cert_handle) => cert_handle,
754 Err(e) => {
755 error!(err = ?e, "Failed to build certificate");
756 std::process::exit(1);
757 }
758 };
759
760 if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
761 error!("Failed to write certificates");
762 std::process::exit(1);
763 }
764 }
765 info!("certificate generation complete");
766}
767
768static MIGRATION_PATH_RE: LazyLock<Regex> = LazyLock::new(|| {
769 #[allow(clippy::expect_used)]
770 Regex::new("^\\d\\d-.*\\.h?json$").expect("Invalid SPN regex found")
771});
772
773struct ScimMigration {
774 path: PathBuf,
775 hash: Sha256Output,
776 assertions: ScimAssertGeneric,
777}
778
779#[instrument(
780 level = "info",
781 fields(uuid = ?eventid),
782 skip_all,
783)]
784async fn migration_apply(
785 eventid: Uuid,
786 server_write_ref: &'static QueryServerWriteV1,
787 migration_path: &Path,
788) {
789 let mut dir_ents = match tokio::fs::read_dir(migration_path).await {
790 Ok(dir_ents) => dir_ents,
791 Err(err) => {
792 error!(?err, "Unable to read migration directory.");
793 let diag = kanidm_lib_file_permissions::diagnose_path(migration_path);
794 info!(%diag);
795 return;
796 }
797 };
798
799 let mut migration_paths = Vec::with_capacity(8);
800
801 loop {
802 match dir_ents.next_entry().await {
803 Ok(Some(dir_ent)) => migration_paths.push(dir_ent.path()),
804 Ok(None) => {
805 break;
807 }
808 Err(err) => {
809 error!(?err, "Unable to read directory entries.");
810 return;
811 }
812 }
813 }
814
815 let mut migration_paths: Vec<_> = migration_paths.into_iter()
818 .filter(|path| {
819 if !path.is_file() {
820 info!(path = %path.display(), "ignoring path that is not a file.");
821 return false;
822 }
823
824 let Some(file_name) = path.file_name().and_then(std::ffi::OsStr::to_str) else {
825 info!(path = %path.display(), "ignoring path that has no file name, or is not a valid utf-8 file name.");
826 return false;
827 };
828
829 if !MIGRATION_PATH_RE.is_match(file_name) {
830 info!(path = %path.display(), "ignoring file that does not match naming pattern.");
831 info!("expected pattern 'XX-NAME.json' where XX are two numbers, followed by a hypen, with the file extension .json");
832 return false;
833 }
834
835 true
836 })
837 .collect();
838
839 migration_paths.sort_unstable();
840 let mut migrations = Vec::with_capacity(migration_paths.len());
841
842 for migration_path in migration_paths {
843 info!(path = %migration_path.display(), "examining migration");
844
845 let migration_content = match tokio::fs::read(&migration_path).await {
846 Ok(bytes) => bytes,
847 Err(err) => {
848 error!(?err, "Unable to read migration - it will be ignored.");
849 let diag = kanidm_lib_file_permissions::diagnose_path(&migration_path);
850 info!(%diag);
851 continue;
852 }
853 };
854
855 let assertions: ScimAssertGeneric = match serde_hjson::from_slice(&migration_content) {
857 Ok(assertions) => assertions,
858 Err(err) => {
859 error!(?err, path = %migration_path.display(), "Invalid JSON SCIM Assertion");
860 continue;
861 }
862 };
863
864 let mut hasher = Sha256::new();
866 hasher.update(&migration_content);
867 let migration_hash: Sha256Output = hasher.finalize();
868
869 migrations.push(ScimMigration {
870 path: migration_path,
871 hash: migration_hash,
872 assertions,
873 });
874 }
875
876 let mut migration_ids = BTreeSet::new();
877 for migration in &migrations {
878 if !migration_ids.insert(migration.assertions.id) {
880 error!(path = %migration.path.display(), uuid = ?migration.assertions.id, "Duplicate migration UUID found, refusing to proceed!!! All migrations must have a unique ID!!!");
881 return;
882 }
883 }
884
885 for ScimMigration {
888 path,
889 hash,
890 assertions,
891 } in migrations
892 {
893 if let Err(err) = server_write_ref
894 .handle_scim_migration_apply(eventid, assertions, hash)
895 .await
896 {
897 error!(?err, path = %path.display(), "Failed to apply migration");
898 };
899 }
900}
901
902#[derive(Clone, Debug)]
903pub enum CoreAction {
904 Shutdown,
905 Reload,
906}
907
908pub(crate) enum TaskName {
909 AdminSocket,
910 AuditdActor,
911 BackupActor,
912 DelayedActionActor,
913 HttpsServer,
914 IntervalActor,
915 LdapActor,
916 Replication,
917 TlsAcceptorReload,
918 MigrationReload,
919}
920
921impl Display for TaskName {
922 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
923 write!(
924 f,
925 "{}",
926 match self {
927 TaskName::AdminSocket => "Admin Socket",
928 TaskName::AuditdActor => "Auditd Actor",
929 TaskName::BackupActor => "Backup Actor",
930 TaskName::DelayedActionActor => "Delayed Action Actor",
931 TaskName::HttpsServer => "HTTPS Server",
932 TaskName::IntervalActor => "Interval Actor",
933 TaskName::LdapActor => "LDAP Acceptor Actor",
934 TaskName::Replication => "Replication",
935 TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
936 TaskName::MigrationReload => "Migration Reload Monitor",
937 }
938 )
939 }
940}
941
942pub struct CoreHandle {
943 clean_shutdown: bool,
944 tx: broadcast::Sender<CoreAction>,
945 handles: Vec<(TaskName, task::JoinHandle<()>)>,
947}
948
949impl CoreHandle {
950 pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
951 self.tx.subscribe()
952 }
953
954 pub async fn shutdown(&mut self) {
955 if self.tx.send(CoreAction::Shutdown).is_err() {
956 eprintln!("No receivers acked shutdown request. Treating as unclean.");
957 return;
958 }
959
960 while let Some((handle_name, handle)) = self.handles.pop() {
962 if let Err(error) = handle.await {
963 eprintln!("Task {handle_name} failed to finish: {error:?}");
964 }
965 }
966
967 self.clean_shutdown = true;
968 }
969
970 pub async fn reload(&mut self) {
971 if self.tx.send(CoreAction::Reload).is_err() {
972 eprintln!("No receivers acked reload request.");
973 }
974 }
975}
976
977impl Drop for CoreHandle {
978 fn drop(&mut self) {
979 if !self.clean_shutdown {
980 eprintln!("⚠️ UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
981 }
982 }
985}
986
987pub async fn create_server_core(
988 config: Configuration,
989 config_test: bool,
990) -> Result<CoreHandle, ()> {
991 let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
993
994 if config.integration_test_config.is_some() {
995 warn!("RUNNING IN INTEGRATION TEST MODE.");
996 warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
997 } else if config.tls_config.is_none() {
998 error!("Running without TLS is not supported! Quitting!");
1000 return Err(());
1001 }
1002
1003 info!(
1004 "Starting kanidm with {}configuration: {}",
1005 if config_test { "TEST " } else { "" },
1006 config
1007 );
1008 #[cfg(not(target_family = "windows"))]
1010 unsafe {
1011 umask(0o0027)
1012 };
1013
1014 let status_ref = StatusActor::start();
1017
1018 let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
1020 Ok(tls_acc) => tls_acc,
1021 Err(err) => {
1022 error!(?err, "Failed to configure TLS acceptor");
1023 return Err(());
1024 }
1025 };
1026
1027 let schema = match Schema::new() {
1028 Ok(s) => s,
1029 Err(e) => {
1030 error!("Failed to setup in memory schema: {:?}", e);
1031 return Err(());
1032 }
1033 };
1034
1035 let be = match setup_backend(&config, &schema) {
1037 Ok(be) => be,
1038 Err(e) => {
1039 error!("Failed to setup BE -> {:?}", e);
1040 return Err(());
1041 }
1042 };
1043 let (_qs, idms, mut idms_delayed, mut idms_audit) =
1045 match setup_qs_idms(be, schema, &config).await {
1046 Ok(t) => t,
1047 Err(e) => {
1048 error!("Unable to setup query server or idm server -> {:?}", e);
1049 return Err(());
1050 }
1051 };
1052
1053 let jws_signer = match JwsHs256Signer::generate_hs256() {
1056 Ok(k) => k.set_sign_option_embed_kid(false),
1057 Err(e) => {
1058 error!("Unable to setup jws signer -> {:?}", e);
1059 return Err(());
1060 }
1061 };
1062
1063 if let Some(itc) = &config.integration_test_config {
1065 let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
1066 error!("Unable to acquire write transaction");
1067 return Err(());
1068 };
1069 match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
1071 Ok(_) => {}
1072 Err(e) => {
1073 error!(
1074 "Unable to configure INTEGRATION TEST {} account -> {:?}",
1075 &itc.admin_user, e
1076 );
1077 return Err(());
1078 }
1079 };
1080 match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
1082 Ok(_) => {}
1083 Err(e) => {
1084 error!(
1085 "Unable to configure INTEGRATION TEST {} account -> {:?}",
1086 &itc.idm_admin_user, e
1087 );
1088 return Err(());
1089 }
1090 };
1091
1092 match idms_prox_write.qs_write.internal_modify_uuid(
1096 UUID_IDM_ADMINS,
1097 &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
1098 ) {
1099 Ok(_) => {}
1100 Err(e) => {
1101 error!(
1102 "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
1103 e
1104 );
1105 return Err(());
1106 }
1107 };
1108
1109 match idms_prox_write.qs_write.internal_modify_uuid(
1110 UUID_IDM_ALL_PERSONS,
1111 &ModifyList::new_purge_and_set(
1112 Attribute::CredentialTypeMinimum,
1113 CredentialType::Any.into(),
1114 ),
1115 ) {
1116 Ok(_) => {}
1117 Err(e) => {
1118 error!(
1119 "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
1120 e
1121 );
1122 return Err(());
1123 }
1124 };
1125
1126 match idms_prox_write.commit() {
1127 Ok(_) => {}
1128 Err(e) => {
1129 error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
1130 return Err(());
1131 }
1132 }
1133 }
1134
1135 let ldap = match LdapServer::new(&idms).await {
1136 Ok(l) => l,
1137 Err(e) => {
1138 error!("Unable to start LdapServer -> {:?}", e);
1139 return Err(());
1140 }
1141 };
1142
1143 let idms_arc = Arc::new(idms);
1145 let ldap_arc = Arc::new(ldap);
1146
1147 let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
1150
1151 let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
1153
1154 let delayed_handle = task::spawn(async move {
1155 let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
1156 loop {
1157 tokio::select! {
1158 added = idms_delayed.recv_many(&mut buffer) => {
1159 if added == 0 {
1160 break
1162 }
1163 server_write_ref.handle_delayedaction(&mut buffer).await;
1164 }
1165 Ok(action) = broadcast_rx.recv() => {
1166 match action {
1167 CoreAction::Shutdown => break,
1168 CoreAction::Reload => {},
1169 }
1170 }
1171 }
1172 }
1173 info!("Stopped {}", TaskName::DelayedActionActor);
1174 });
1175
1176 let mut broadcast_rx = broadcast_tx.subscribe();
1177
1178 let auditd_handle = task::spawn(async move {
1179 loop {
1180 tokio::select! {
1181 Ok(action) = broadcast_rx.recv() => {
1182 match action {
1183 CoreAction::Shutdown => break,
1184 CoreAction::Reload => {},
1185 }
1186 }
1187 audit_event = idms_audit.audit_rx().recv() => {
1188 match serde_json::to_string(&audit_event) {
1189 Ok(audit_event) => {
1190 warn!(%audit_event);
1191 }
1192 Err(e) => {
1193 error!(err=?e, "Unable to process audit event to json.");
1194 warn!(?audit_event, json=false);
1195 }
1196 }
1197
1198 }
1199 }
1200 }
1201 info!("Stopped {}", TaskName::AuditdActor);
1202 });
1203
1204 let migration_path = config
1206 .migration_path
1207 .clone()
1208 .unwrap_or(PathBuf::from(env!("KANIDM_SERVER_MIGRATION_PATH")));
1209
1210 if config.integration_test_config.is_none() {
1211 let eventid = Uuid::new_v4();
1212 migration_apply(eventid, server_write_ref, migration_path.as_path()).await;
1213 }
1214
1215 let mut broadcast_rx = broadcast_tx.subscribe();
1217 let migration_reload_handle = task::spawn(async move {
1218 loop {
1219 tokio::select! {
1220 Ok(action) = broadcast_rx.recv() => {
1221 match action {
1222 CoreAction::Shutdown => break,
1223 CoreAction::Reload => {
1224 let eventid = Uuid::new_v4();
1227 migration_apply(
1228 eventid,
1229 server_write_ref,
1230 migration_path.as_path(),
1231 ).await;
1232
1233 info!("Migration reload complete");
1234 },
1235 }
1236 }
1237 }
1238 }
1239 info!("Stopped {}", TaskName::MigrationReload);
1240 });
1241
1242 let mut broadcast_rx = broadcast_tx.subscribe();
1245 let tls_config = config.tls_config.clone();
1246
1247 let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1248 let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1249
1250 let tls_acceptor_reload_handle = task::spawn(async move {
1251 loop {
1252 tokio::select! {
1253 Ok(action) = broadcast_rx.recv() => {
1254 match action {
1255 CoreAction::Shutdown => break,
1256 CoreAction::Reload => {
1257 let tls_acceptor = match crypto::setup_tls(&tls_config) {
1258 Ok(Some(tls_acc)) => tls_acc,
1259 Ok(None) => {
1260 warn!("TLS not configured, ignoring reload request.");
1261 continue;
1262 }
1263 Err(err) => {
1264 error!(?err, "Failed to configure and reload TLS acceptor");
1265 continue;
1266 }
1267 };
1268
1269 if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1272 error!("TLS acceptor did not accept the reload, the server may have failed!");
1273 };
1274 info!("TLS acceptor reload notification sent");
1275 },
1276 }
1277 }
1278 }
1279 }
1280 info!("Stopped {}", TaskName::TlsAcceptorReload);
1281 });
1282
1283 let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1285 let maybe_backup_handle = match &config.online_backup {
1287 Some(online_backup_config) => {
1288 if online_backup_config.enabled {
1289 let handle = IntervalActor::start_online_backup(
1290 server_read_ref,
1291 online_backup_config,
1292 broadcast_tx.subscribe(),
1293 )?;
1294 Some(handle)
1295 } else {
1296 debug!("Backups disabled");
1297 None
1298 }
1299 }
1300 None => {
1301 debug!("Online backup not requested, skipping");
1302 None
1303 }
1304 };
1305
1306 let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1308 Some(la) => {
1309 let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1310
1311 let h = ldaps::create_ldap_server(
1312 la,
1313 opt_ldap_ssl_acceptor,
1314 server_read_ref,
1315 &broadcast_tx,
1316 &tls_acceptor_reload_tx,
1317 config.ldap_client_address_info.trusted_tcp_info(),
1318 )
1319 .await?;
1320 Some(h)
1321 }
1322 None => {
1323 debug!("LDAP not requested, skipping");
1324 None
1325 }
1326 };
1327
1328 let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1331 Some(rc) => {
1332 if !config_test {
1333 let (h, repl_ctrl_tx) =
1335 repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1336 .await?;
1337 (Some(h), Some(repl_ctrl_tx))
1338 } else {
1339 (None, None)
1340 }
1341 }
1342 None => {
1343 debug!("Replication not requested, skipping");
1344 (None, None)
1345 }
1346 };
1347
1348 let maybe_http_acceptor_handles = if config_test {
1349 admin_info!("This config rocks! 🪨 ");
1350 None
1351 } else {
1352 let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1353 config.clone(),
1354 jws_signer,
1355 status_ref,
1356 server_write_ref,
1357 server_read_ref,
1358 broadcast_tx.clone(),
1359 maybe_tls_acceptor,
1360 &tls_acceptor_reload_tx,
1361 )
1362 .await
1363 .inspect_err(|err| {
1364 error!(?err, "Failed to start HTTPS server");
1365 })?;
1366
1367 if config.role != ServerRole::WriteReplicaNoUI {
1368 admin_info!("ready to rock! 🪨 UI available at: {}", config.origin);
1369 } else {
1370 admin_info!("ready to rock! 🪨 ");
1371 }
1372 Some(handles)
1373 };
1374
1375 let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1377 let broadcast_tx_ = broadcast_tx.clone();
1378
1379 let admin_handle = AdminActor::create_admin_sock(
1380 config.adminbindpath.as_str(),
1381 server_write_ref,
1382 server_read_ref,
1383 broadcast_tx_,
1384 maybe_repl_ctrl_tx,
1385 )
1386 .await?;
1387
1388 Some(admin_handle)
1389 } else {
1390 None
1391 };
1392
1393 let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1394 (TaskName::IntervalActor, interval_handle),
1395 (TaskName::DelayedActionActor, delayed_handle),
1396 (TaskName::AuditdActor, auditd_handle),
1397 (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1398 (TaskName::MigrationReload, migration_reload_handle),
1399 ];
1400
1401 if let Some(backup_handle) = maybe_backup_handle {
1402 handles.push((TaskName::BackupActor, backup_handle))
1403 }
1404
1405 if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1406 handles.push((TaskName::AdminSocket, admin_sock_handle))
1407 }
1408
1409 if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1410 for ldap_handle in ldap_handles {
1411 handles.push((TaskName::LdapActor, ldap_handle))
1412 }
1413 }
1414
1415 if let Some(http_handles) = maybe_http_acceptor_handles {
1416 for http_handle in http_handles {
1417 handles.push((TaskName::HttpsServer, http_handle))
1418 }
1419 }
1420
1421 if let Some(repl_handle) = maybe_repl_handle {
1422 handles.push((TaskName::Replication, repl_handle))
1423 }
1424
1425 Ok(CoreHandle {
1426 clean_shutdown: false,
1427 tx: broadcast_tx,
1428 handles,
1429 })
1430}