1#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23#![deny(clippy::indexing_slicing)]
24
25#[macro_use]
26extern crate tracing;
27#[macro_use]
28extern crate kanidmd_lib;
29
30mod actors;
31pub mod admin;
32pub mod config;
33mod crypto;
34mod https;
35mod interval;
36mod ldaps;
37mod repl;
38mod tcp;
39mod utils;
40
41use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
42use crate::admin::AdminActor;
43use crate::config::{Configuration, ServerRole};
44use crate::interval::IntervalActor;
45use crate::utils::touch_file_or_quit;
46use compact_jwt::{JwsHs256Signer, JwsSigner};
47use crypto_glue::{
48 s256::{Sha256, Sha256Output},
49 traits::Digest,
50};
51use kanidm_proto::backup::BackupCompression;
52use kanidm_proto::internal::OperationError;
53use kanidm_proto::scim_v1::client::ScimAssertGeneric;
54use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
55use kanidmd_lib::idm::ldap::LdapServer;
56use kanidmd_lib::prelude::*;
57use kanidmd_lib::schema::Schema;
58use kanidmd_lib::status::StatusActor;
59use kanidmd_lib::value::CredentialType;
60use regex::Regex;
61use std::collections::BTreeSet;
62use std::fmt::{Display, Formatter};
63use std::path::Path;
64use std::path::PathBuf;
65use std::sync::Arc;
66use std::sync::LazyLock;
67use tokio::sync::broadcast;
68use tokio::task;
69
70#[cfg(not(target_family = "windows"))]
71use libc::umask;
72
73fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
76 setup_backend_vacuum(config, schema, false)
77}
78
79fn setup_backend_vacuum(
80 config: &Configuration,
81 schema: &Schema,
82 vacuum: bool,
83) -> Result<Backend, OperationError> {
84 let schema_txn = schema.write();
87 let idxmeta = schema_txn.reload_idxmeta();
88
89 let pool_size: u32 = config.threads as u32;
90
91 let cfg = BackendConfig::new(
92 config.db_path.as_deref(),
93 pool_size,
94 config.db_fs_type.unwrap_or_default(),
95 config.db_arc_size,
96 );
97
98 Backend::new(cfg, idxmeta, vacuum)
99}
100
101async fn setup_qs_idms(
106 be: Backend,
107 schema: Schema,
108 config: &Configuration,
109) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
110 let curtime = duration_from_epoch_now();
111 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
113
114 query_server
123 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
124 .await?;
125
126 let is_integration_test = config.integration_test_config.is_some();
128 let (idms, idms_delayed, idms_audit) = IdmServer::new(
129 query_server.clone(),
130 &config.origin,
131 is_integration_test,
132 curtime,
133 )
134 .await?;
135
136 Ok((query_server, idms, idms_delayed, idms_audit))
137}
138
139async fn setup_qs(
140 be: Backend,
141 schema: Schema,
142 config: &Configuration,
143) -> Result<QueryServer, OperationError> {
144 let curtime = duration_from_epoch_now();
145 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
147
148 query_server
157 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
158 .await?;
159
160 Ok(query_server)
161}
162
163macro_rules! dbscan_setup_be {
164 (
165 $config:expr
166 ) => {{
167 let schema = match Schema::new() {
168 Ok(s) => s,
169 Err(e) => {
170 error!("Failed to setup in memory schema: {:?}", e);
171 std::process::exit(1);
172 }
173 };
174
175 match setup_backend($config, &schema) {
176 Ok(be) => be,
177 Err(e) => {
178 error!("Failed to setup BE: {:?}", e);
179 return;
180 }
181 }
182 }};
183}
184
185pub fn dbscan_list_indexes_core(config: &Configuration) {
186 let be = dbscan_setup_be!(config);
187 let mut be_rotxn = match be.read() {
188 Ok(txn) => txn,
189 Err(err) => {
190 error!(?err, "Unable to proceed, backend read transaction failure.");
191 return;
192 }
193 };
194
195 match be_rotxn.list_indexes() {
196 Ok(mut idx_list) => {
197 idx_list.sort_unstable();
198 idx_list.iter().for_each(|idx_name| {
199 println!("{idx_name}");
200 })
201 }
202 Err(e) => {
203 error!("Failed to retrieve index list: {:?}", e);
204 }
205 };
206}
207
208pub fn dbscan_list_id2entry_core(config: &Configuration) {
209 let be = dbscan_setup_be!(config);
210 let mut be_rotxn = match be.read() {
211 Ok(txn) => txn,
212 Err(err) => {
213 error!(?err, "Unable to proceed, backend read transaction failure.");
214 return;
215 }
216 };
217
218 match be_rotxn.list_id2entry() {
219 Ok(mut id_list) => {
220 id_list.sort_unstable_by_key(|k| k.0);
221 id_list.iter().for_each(|(id, value)| {
222 println!("{id:>8}: {value}");
223 })
224 }
225 Err(e) => {
226 error!("Failed to retrieve id2entry list: {:?}", e);
227 }
228 };
229}
230
231pub fn dbscan_list_index_analysis_core(config: &Configuration) {
232 let _be = dbscan_setup_be!(config);
233 }
235
236pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
237 let be = dbscan_setup_be!(config);
238 let mut be_rotxn = match be.read() {
239 Ok(txn) => txn,
240 Err(err) => {
241 error!(?err, "Unable to proceed, backend read transaction failure.");
242 return;
243 }
244 };
245
246 match be_rotxn.list_index_content(index_name) {
247 Ok(mut idx_list) => {
248 idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
249 idx_list.iter().for_each(|(key, value)| {
250 println!("{key:>50}: {value:?}");
251 })
252 }
253 Err(e) => {
254 error!("Failed to retrieve index list: {:?}", e);
255 }
256 };
257}
258
259pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
260 let be = dbscan_setup_be!(config);
261 let mut be_rotxn = match be.read() {
262 Ok(txn) => txn,
263 Err(err) => {
264 error!(?err, "Unable to proceed, backend read transaction failure.");
265 return;
266 }
267 };
268
269 match be_rotxn.get_id2entry(id) {
270 Ok((id, value)) => println!("{id:>8}: {value}"),
271 Err(e) => {
272 error!("Failed to retrieve id2entry value: {:?}", e);
273 }
274 };
275}
276
277pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
278 let be = dbscan_setup_be!(config);
279 let mut be_wrtxn = match be.write() {
280 Ok(txn) => txn,
281 Err(err) => {
282 error!(
283 ?err,
284 "Unable to proceed, backend write transaction failure."
285 );
286 return;
287 }
288 };
289
290 match be_wrtxn
291 .quarantine_entry(id)
292 .and_then(|_| be_wrtxn.commit())
293 {
294 Ok(()) => {
295 println!("quarantined - {id:>8}")
296 }
297 Err(e) => {
298 error!("Failed to quarantine id2entry value: {:?}", e);
299 }
300 };
301}
302
303pub fn dbscan_list_quarantined_core(config: &Configuration) {
304 let be = dbscan_setup_be!(config);
305 let mut be_rotxn = match be.read() {
306 Ok(txn) => txn,
307 Err(err) => {
308 error!(?err, "Unable to proceed, backend read transaction failure.");
309 return;
310 }
311 };
312
313 match be_rotxn.list_quarantined() {
314 Ok(mut id_list) => {
315 id_list.sort_unstable_by_key(|k| k.0);
316 id_list.iter().for_each(|(id, value)| {
317 println!("{id:>8}: {value}");
318 })
319 }
320 Err(e) => {
321 error!("Failed to retrieve id2entry list: {:?}", e);
322 }
323 };
324}
325
326pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
327 let be = dbscan_setup_be!(config);
328 let mut be_wrtxn = match be.write() {
329 Ok(txn) => txn,
330 Err(err) => {
331 error!(
332 ?err,
333 "Unable to proceed, backend write transaction failure."
334 );
335 return;
336 }
337 };
338
339 match be_wrtxn
340 .restore_quarantined(id)
341 .and_then(|_| be_wrtxn.commit())
342 {
343 Ok(()) => {
344 println!("restored - {id:>8}")
345 }
346 Err(e) => {
347 error!("Failed to restore quarantined id2entry value: {:?}", e);
348 }
349 };
350}
351
352pub fn backup_server_core(config: &Configuration, dst_path: Option<&Path>) {
353 let schema = match Schema::new() {
354 Ok(s) => s,
355 Err(e) => {
356 error!("Failed to setup in memory schema: {:?}", e);
357 std::process::exit(1);
358 }
359 };
360
361 let be = match setup_backend(config, &schema) {
362 Ok(be) => be,
363 Err(e) => {
364 error!("Failed to setup BE: {:?}", e);
365 return;
366 }
367 };
368
369 let mut be_ro_txn = match be.read() {
370 Ok(txn) => txn,
371 Err(err) => {
372 error!(?err, "Unable to proceed, backend read transaction failure.");
373 return;
374 }
375 };
376
377 let compression = match config.online_backup.as_ref() {
378 Some(backup_config) => backup_config.compression,
379 None => BackupCompression::default(),
380 };
381
382 if let Some(dst_path) = dst_path {
383 if dst_path.exists() {
384 error!(
385 "backup file {} already exists, will not overwrite it.",
386 dst_path.display()
387 );
388 return;
389 }
390
391 let output = match std::fs::File::create(dst_path) {
392 Ok(output) => output,
393 Err(err) => {
394 error!(?err, "File::create error creating {}", dst_path.display());
395 return;
396 }
397 };
398
399 match be_ro_txn.backup(output, compression) {
400 Ok(_) => info!("Backup success!"),
401 Err(e) => {
402 error!("Backup failed: {:?}", e);
403 std::process::exit(1);
404 }
405 };
406 } else {
407 let stdout = std::io::stdout().lock();
409
410 match be_ro_txn.backup(stdout, compression) {
411 Ok(_) => info!("Backup success!"),
412 Err(e) => {
413 error!("Backup failed: {:?}", e);
414 std::process::exit(1);
415 }
416 };
417 };
418 }
420
421pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
422 if let Some(db_path) = config.db_path.as_ref() {
424 touch_file_or_quit(db_path);
425 }
426
427 let schema = match Schema::new() {
429 Ok(s) => s,
430 Err(e) => {
431 error!("Failed to setup in memory schema: {:?}", e);
432 std::process::exit(1);
433 }
434 };
435
436 let be = match setup_backend(config, &schema) {
437 Ok(be) => be,
438 Err(e) => {
439 error!("Failed to setup backend: {:?}", e);
440 return;
441 }
442 };
443
444 let mut be_wr_txn = match be.write() {
445 Ok(txn) => txn,
446 Err(err) => {
447 error!(
448 ?err,
449 "Unable to proceed, backend write transaction failure."
450 );
451 return;
452 }
453 };
454
455 let compression = BackupCompression::identify_file(dst_path);
456
457 let input = match std::fs::File::open(dst_path) {
458 Ok(output) => output,
459 Err(err) => {
460 error!(?err, "File::open error reading {}", dst_path.display());
461 return;
462 }
463 };
464
465 let r = be_wr_txn
466 .restore(input, compression)
467 .and_then(|_| be_wr_txn.commit());
468
469 if r.is_err() {
470 error!("Failed to restore database: {:?}", r);
471 std::process::exit(1);
472 }
473 info!("Database loaded successfully");
474
475 reindex_inner(be, schema, config).await;
476
477 info!("✅ Restore Success!");
478}
479
480pub async fn reindex_server_core(config: &Configuration) {
481 info!("Start Index Phase 1 ...");
482 let schema = match Schema::new() {
484 Ok(s) => s,
485 Err(e) => {
486 error!("Failed to setup in memory schema: {:?}", e);
487 std::process::exit(1);
488 }
489 };
490
491 let be = match setup_backend(config, &schema) {
492 Ok(be) => be,
493 Err(e) => {
494 error!("Failed to setup BE: {:?}", e);
495 return;
496 }
497 };
498
499 reindex_inner(be, schema, config).await;
500
501 info!("✅ Reindex Success!");
502}
503
504async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
505 let mut be_wr_txn = match be.write() {
507 Ok(txn) => txn,
508 Err(err) => {
509 error!(
510 ?err,
511 "Unable to proceed, backend write transaction failure."
512 );
513 return;
514 }
515 };
516
517 let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
518
519 if r.is_err() {
521 error!("Failed to reindex database: {:?}", r);
522 std::process::exit(1);
523 }
524 info!("Index Phase 1 Success!");
525
526 info!("Attempting to init query server ...");
527
528 let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
529 Ok(t) => t,
530 Err(e) => {
531 error!("Unable to setup query server or idm server -> {:?}", e);
532 return;
533 }
534 };
535 info!("Init Query Server Success!");
536
537 info!("Start Index Phase 2 ...");
538
539 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
540 error!("Unable to acquire write transaction");
541 return;
542 };
543 let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
544
545 match r {
546 Ok(_) => info!("Index Phase 2 Success!"),
547 Err(e) => {
548 error!("Reindex failed: {:?}", e);
549 std::process::exit(1);
550 }
551 };
552}
553
554pub fn vacuum_server_core(config: &Configuration) {
555 let schema = match Schema::new() {
556 Ok(s) => s,
557 Err(e) => {
558 eprintln!("Failed to setup in memory schema: {e:?}");
559 std::process::exit(1);
560 }
561 };
562
563 let r = setup_backend_vacuum(config, &schema, true);
566
567 match r {
568 Ok(_) => eprintln!("Vacuum Success!"),
569 Err(e) => {
570 eprintln!("Vacuum failed: {e:?}");
571 std::process::exit(1);
572 }
573 };
574}
575
576pub async fn domain_rename_core(config: &Configuration) {
577 let schema = match Schema::new() {
578 Ok(s) => s,
579 Err(e) => {
580 eprintln!("Failed to setup in memory schema: {e:?}");
581 std::process::exit(1);
582 }
583 };
584
585 let be = match setup_backend(config, &schema) {
587 Ok(be) => be,
588 Err(e) => {
589 error!("Failed to setup BE: {:?}", e);
590 return;
591 }
592 };
593
594 let qs = match setup_qs(be, schema, config).await {
596 Ok(t) => t,
597 Err(e) => {
598 error!("Unable to setup query server -> {:?}", e);
599 return;
600 }
601 };
602
603 let new_domain_name = config.domain.as_str();
604
605 match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
607 Ok(old_domain_name) => {
608 admin_info!(?old_domain_name, ?new_domain_name);
609 if old_domain_name == new_domain_name {
610 admin_info!("Domain name not changing, stopping.");
611 return;
612 }
613 admin_debug!(
614 "Domain name is changing from {:?} to {:?}",
615 old_domain_name,
616 new_domain_name
617 );
618 }
619 Err(e) => {
620 admin_error!("Failed to query domain name, quitting! -> {:?}", e);
621 return;
622 }
623 }
624
625 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
626 error!("Unable to acquire write transaction");
627 return;
628 };
629 let r = qs_write
630 .danger_domain_rename(new_domain_name)
631 .and_then(|_| qs_write.commit());
632
633 match r {
634 Ok(_) => info!("Domain Rename Success!"),
635 Err(e) => {
636 error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
637 std::process::exit(1);
638 }
639 };
640}
641
642pub async fn verify_server_core(config: &Configuration) {
643 let curtime = duration_from_epoch_now();
644 let schema_mem = match Schema::new() {
646 Ok(sc) => sc,
647 Err(e) => {
648 error!("Failed to setup in memory schema: {:?}", e);
649 return;
650 }
651 };
652 let be = match setup_backend(config, &schema_mem) {
654 Ok(be) => be,
655 Err(e) => {
656 error!("Failed to setup BE: {:?}", e);
657 return;
658 }
659 };
660
661 let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
662 Ok(qs) => qs,
663 Err(err) => {
664 error!(?err, "Failed to setup query server");
665 return;
666 }
667 };
668
669 let r = server.verify().await;
671
672 if r.is_empty() {
673 eprintln!("Verification passed!");
674 std::process::exit(0);
675 } else {
676 for er in r {
677 error!("{:?}", er);
678 }
679 std::process::exit(1);
680 }
681
682 }
684
685pub fn cert_generate_core(config: &Configuration) {
686 let (tls_key_path, tls_chain_path) = match &config.tls_config {
689 Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
690 None => {
691 error!("Unable to find TLS configuration");
692 std::process::exit(1);
693 }
694 };
695
696 if tls_key_path.exists() && tls_chain_path.exists() {
697 info!(
698 "TLS key and chain already exist - remove them first if you intend to regenerate these"
699 );
700 return;
701 }
702
703 let origin_domain = match config.origin.domain() {
704 Some(val) => val,
705 None => {
706 error!("origin does not contain a valid domain");
707 std::process::exit(1);
708 }
709 };
710
711 let cert_root = match tls_key_path.parent() {
712 Some(parent) => parent,
713 None => {
714 error!("Unable to find parent directory of {:?}", tls_key_path);
715 std::process::exit(1);
716 }
717 };
718
719 let ca_cert = cert_root.join("ca.pem");
720 let ca_key = cert_root.join("cakey.pem");
721 let tls_cert_path = cert_root.join("cert.pem");
722
723 let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
724 let ca_handle = match crypto::build_ca() {
726 Ok(ca_handle) => ca_handle,
727 Err(e) => {
728 error!(err = ?e, "Failed to build CA");
729 std::process::exit(1);
730 }
731 };
732
733 if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
734 error!("Failed to write CA");
735 std::process::exit(1);
736 }
737
738 ca_handle
739 } else {
740 match crypto::load_ca(ca_key, ca_cert) {
741 Ok(ca_handle) => ca_handle,
742 Err(_) => {
743 error!("Failed to load CA");
744 std::process::exit(1);
745 }
746 }
747 };
748
749 if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
750 let cert_handle = match crypto::build_cert(origin_domain, &ca_handle) {
752 Ok(cert_handle) => cert_handle,
753 Err(e) => {
754 error!(err = ?e, "Failed to build certificate");
755 std::process::exit(1);
756 }
757 };
758
759 if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
760 error!("Failed to write certificates");
761 std::process::exit(1);
762 }
763 }
764 info!("certificate generation complete");
765}
766
767static MIGRATION_PATH_RE: LazyLock<Regex> = LazyLock::new(|| {
768 #[allow(clippy::expect_used)]
769 Regex::new("^\\d\\d-.*\\.h?json$").expect("Invalid SPN regex found")
770});
771
772struct ScimMigration {
773 path: PathBuf,
774 hash: Sha256Output,
775 assertions: ScimAssertGeneric,
776}
777
778#[instrument(
779 level = "info",
780 fields(uuid = ?eventid),
781 skip_all,
782)]
783async fn migration_apply(
784 eventid: Uuid,
785 server_write_ref: &'static QueryServerWriteV1,
786 migration_path: &Path,
787) {
788 if !migration_path.exists() {
789 info!(migration_path = %migration_path.display(), "Migration path does not exist - migrations will be skipped.");
790 return;
791 }
792
793 let mut dir_ents = match tokio::fs::read_dir(migration_path).await {
794 Ok(dir_ents) => dir_ents,
795 Err(err) => {
796 error!(?err, "Unable to read migration directory.");
797 let diag = kanidm_lib_file_permissions::diagnose_path(migration_path);
798 info!(%diag);
799 return;
800 }
801 };
802
803 let mut migration_paths = Vec::with_capacity(8);
804
805 loop {
806 match dir_ents.next_entry().await {
807 Ok(Some(dir_ent)) => migration_paths.push(dir_ent.path()),
808 Ok(None) => {
809 break;
811 }
812 Err(err) => {
813 error!(?err, "Unable to read directory entries.");
814 return;
815 }
816 }
817 }
818
819 let mut migration_paths: Vec<_> = migration_paths.into_iter()
822 .filter(|path| {
823 if !path.is_file() {
824 info!(path = %path.display(), "ignoring path that is not a file.");
825 return false;
826 }
827
828 let Some(file_name) = path.file_name().and_then(std::ffi::OsStr::to_str) else {
829 info!(path = %path.display(), "ignoring path that has no file name, or is not a valid utf-8 file name.");
830 return false;
831 };
832
833 if !MIGRATION_PATH_RE.is_match(file_name) {
834 info!(path = %path.display(), "ignoring file that does not match naming pattern.");
835 info!("expected pattern 'XX-NAME.json' where XX are two numbers, followed by a hypen, with the file extension .json");
836 return false;
837 }
838
839 true
840 })
841 .collect();
842
843 migration_paths.sort_unstable();
844 let mut migrations = Vec::with_capacity(migration_paths.len());
845
846 for migration_path in migration_paths {
847 info!(path = %migration_path.display(), "examining migration");
848
849 let migration_content = match tokio::fs::read(&migration_path).await {
850 Ok(bytes) => bytes,
851 Err(err) => {
852 error!(?err, "Unable to read migration - it will be ignored.");
853 let diag = kanidm_lib_file_permissions::diagnose_path(&migration_path);
854 info!(%diag);
855 continue;
856 }
857 };
858
859 let assertions: ScimAssertGeneric = match serde_hjson::from_slice(&migration_content) {
861 Ok(assertions) => assertions,
862 Err(err) => {
863 error!(?err, path = %migration_path.display(), "Invalid JSON SCIM Assertion");
864 continue;
865 }
866 };
867
868 let mut hasher = Sha256::new();
870 hasher.update(&migration_content);
871 let migration_hash: Sha256Output = hasher.finalize();
872
873 migrations.push(ScimMigration {
874 path: migration_path,
875 hash: migration_hash,
876 assertions,
877 });
878 }
879
880 let mut migration_ids = BTreeSet::new();
881 for migration in &migrations {
882 if !migration_ids.insert(migration.assertions.id) {
884 error!(path = %migration.path.display(), uuid = ?migration.assertions.id, "Duplicate migration UUID found, refusing to proceed!!! All migrations must have a unique ID!!!");
885 return;
886 }
887 }
888
889 for ScimMigration {
892 path,
893 hash,
894 assertions,
895 } in migrations
896 {
897 if let Err(err) = server_write_ref
898 .handle_scim_migration_apply(eventid, assertions, hash)
899 .await
900 {
901 error!(?err, path = %path.display(), "Failed to apply migration");
902 };
903 }
904}
905
906#[derive(Clone, Debug)]
907pub enum CoreAction {
908 Shutdown,
909 Reload,
910}
911
912pub(crate) enum TaskName {
913 AdminSocket,
914 AuditdActor,
915 BackupActor,
916 DelayedActionActor,
917 HttpsServer,
918 IntervalActor,
919 LdapActor,
920 Replication,
921 TlsAcceptorReload,
922 MigrationReload,
923}
924
925impl Display for TaskName {
926 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
927 write!(
928 f,
929 "{}",
930 match self {
931 TaskName::AdminSocket => "Admin Socket",
932 TaskName::AuditdActor => "Auditd Actor",
933 TaskName::BackupActor => "Backup Actor",
934 TaskName::DelayedActionActor => "Delayed Action Actor",
935 TaskName::HttpsServer => "HTTPS Server",
936 TaskName::IntervalActor => "Interval Actor",
937 TaskName::LdapActor => "LDAP Acceptor Actor",
938 TaskName::Replication => "Replication",
939 TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
940 TaskName::MigrationReload => "Migration Reload Monitor",
941 }
942 )
943 }
944}
945
946pub struct CoreHandle {
947 clean_shutdown: bool,
948 tx: broadcast::Sender<CoreAction>,
949 handles: Vec<(TaskName, task::JoinHandle<()>)>,
951}
952
953impl CoreHandle {
954 pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
955 self.tx.subscribe()
956 }
957
958 pub async fn shutdown(&mut self) {
959 if self.tx.send(CoreAction::Shutdown).is_err() {
960 eprintln!("No receivers acked shutdown request. Treating as unclean.");
961 return;
962 }
963
964 while let Some((handle_name, handle)) = self.handles.pop() {
966 debug!("Waiting for {handle_name} ...");
967 if let Err(error) = handle.await {
968 eprintln!("Task {handle_name} failed to finish: {error:?}");
969 }
970 }
971
972 self.clean_shutdown = true;
973 }
974
975 pub async fn reload(&mut self) {
976 if self.tx.send(CoreAction::Reload).is_err() {
977 eprintln!("No receivers acked reload request.");
978 }
979 }
980}
981
982impl Drop for CoreHandle {
983 fn drop(&mut self) {
984 if !self.clean_shutdown {
985 eprintln!("⚠️ UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
986 }
987 }
990}
991
992pub async fn create_server_core(
993 config: Configuration,
994 config_test: bool,
995) -> Result<CoreHandle, ()> {
996 let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
998
999 if config.integration_test_config.is_some() {
1000 warn!("RUNNING IN INTEGRATION TEST MODE.");
1001 warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
1002 } else if config.tls_config.is_none() {
1003 error!("Running without TLS is not supported! Quitting!");
1005 return Err(());
1006 }
1007
1008 info!(
1009 "Starting kanidm with {}configuration: {}",
1010 if config_test { "TEST " } else { "" },
1011 config
1012 );
1013 #[cfg(not(target_family = "windows"))]
1015 unsafe {
1016 umask(0o0027)
1017 };
1018
1019 let status_ref = StatusActor::start();
1022
1023 let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
1025 Ok(tls_acc) => tls_acc,
1026 Err(err) => {
1027 error!(?err, "Failed to configure TLS acceptor");
1028 return Err(());
1029 }
1030 };
1031
1032 let schema = match Schema::new() {
1033 Ok(s) => s,
1034 Err(e) => {
1035 error!("Failed to setup in memory schema: {:?}", e);
1036 return Err(());
1037 }
1038 };
1039
1040 let be = match setup_backend(&config, &schema) {
1042 Ok(be) => be,
1043 Err(e) => {
1044 error!("Failed to setup BE -> {:?}", e);
1045 return Err(());
1046 }
1047 };
1048 let (_qs, idms, mut idms_delayed, mut idms_audit) =
1050 match setup_qs_idms(be, schema, &config).await {
1051 Ok(t) => t,
1052 Err(e) => {
1053 error!("Unable to setup query server or idm server -> {:?}", e);
1054 return Err(());
1055 }
1056 };
1057
1058 let jws_signer = match JwsHs256Signer::generate_hs256() {
1061 Ok(k) => k.set_sign_option_embed_kid(false),
1062 Err(e) => {
1063 error!("Unable to setup jws signer -> {:?}", e);
1064 return Err(());
1065 }
1066 };
1067
1068 if let Some(itc) = &config.integration_test_config {
1070 let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
1071 error!("Unable to acquire write transaction");
1072 return Err(());
1073 };
1074 match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
1076 Ok(_) => {}
1077 Err(e) => {
1078 error!(
1079 "Unable to configure INTEGRATION TEST {} account -> {:?}",
1080 &itc.admin_user, e
1081 );
1082 return Err(());
1083 }
1084 };
1085 match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
1087 Ok(_) => {}
1088 Err(e) => {
1089 error!(
1090 "Unable to configure INTEGRATION TEST {} account -> {:?}",
1091 &itc.idm_admin_user, e
1092 );
1093 return Err(());
1094 }
1095 };
1096
1097 match idms_prox_write.qs_write.internal_modify_uuid(
1101 UUID_IDM_ADMINS,
1102 &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
1103 ) {
1104 Ok(_) => {}
1105 Err(e) => {
1106 error!(
1107 "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
1108 e
1109 );
1110 return Err(());
1111 }
1112 };
1113
1114 match idms_prox_write.qs_write.internal_modify_uuid(
1115 UUID_IDM_ALL_PERSONS,
1116 &ModifyList::new_purge_and_set(
1117 Attribute::CredentialTypeMinimum,
1118 CredentialType::Any.into(),
1119 ),
1120 ) {
1121 Ok(_) => {}
1122 Err(e) => {
1123 error!(
1124 "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
1125 e
1126 );
1127 return Err(());
1128 }
1129 };
1130
1131 match idms_prox_write.commit() {
1132 Ok(_) => {}
1133 Err(e) => {
1134 error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
1135 return Err(());
1136 }
1137 }
1138 }
1139
1140 let ldap = match LdapServer::new(&idms).await {
1141 Ok(l) => l,
1142 Err(e) => {
1143 error!("Unable to start LdapServer -> {:?}", e);
1144 return Err(());
1145 }
1146 };
1147
1148 let idms_arc = Arc::new(idms);
1150 let ldap_arc = Arc::new(ldap);
1151
1152 let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
1155
1156 let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
1158
1159 let delayed_handle = task::spawn(async move {
1160 let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
1161 loop {
1162 tokio::select! {
1163 added = idms_delayed.recv_many(&mut buffer) => {
1164 if added == 0 {
1165 break
1167 }
1168 server_write_ref.handle_delayedaction(&mut buffer).await;
1169 }
1170 Ok(action) = broadcast_rx.recv() => {
1171 match action {
1172 CoreAction::Shutdown => break,
1173 CoreAction::Reload => {},
1174 }
1175 }
1176 }
1177 }
1178 info!("Stopped {}", TaskName::DelayedActionActor);
1179 });
1180
1181 let mut broadcast_rx = broadcast_tx.subscribe();
1182
1183 let auditd_handle = task::spawn(async move {
1184 loop {
1185 tokio::select! {
1186 Ok(action) = broadcast_rx.recv() => {
1187 match action {
1188 CoreAction::Shutdown => break,
1189 CoreAction::Reload => {},
1190 }
1191 }
1192 audit_event = idms_audit.audit_rx().recv() => {
1193 match serde_json::to_string(&audit_event) {
1194 Ok(audit_event) => {
1195 warn!(%audit_event);
1196 }
1197 Err(e) => {
1198 error!(err=?e, "Unable to process audit event to json.");
1199 warn!(?audit_event, json=false);
1200 }
1201 }
1202
1203 }
1204 }
1205 }
1206 info!("Stopped {}", TaskName::AuditdActor);
1207 });
1208
1209 let migration_path = config
1211 .migration_path
1212 .clone()
1213 .unwrap_or(PathBuf::from(env!("KANIDM_SERVER_MIGRATION_PATH")));
1214
1215 if config.integration_test_config.is_none() {
1216 let eventid = Uuid::new_v4();
1217 migration_apply(eventid, server_write_ref, migration_path.as_path()).await;
1218 }
1219
1220 let mut broadcast_rx = broadcast_tx.subscribe();
1222 let migration_reload_handle = task::spawn(async move {
1223 loop {
1224 tokio::select! {
1225 Ok(action) = broadcast_rx.recv() => {
1226 match action {
1227 CoreAction::Shutdown => break,
1228 CoreAction::Reload => {
1229 let eventid = Uuid::new_v4();
1232 migration_apply(
1233 eventid,
1234 server_write_ref,
1235 migration_path.as_path(),
1236 ).await;
1237
1238 info!("Migration reload complete");
1239 },
1240 }
1241 }
1242 }
1243 }
1244 info!("Stopped {}", TaskName::MigrationReload);
1245 });
1246
1247 let mut broadcast_rx = broadcast_tx.subscribe();
1250 let tls_config = config.tls_config.clone();
1251
1252 let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1253 let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1254
1255 let tls_acceptor_reload_handle = task::spawn(async move {
1256 loop {
1257 tokio::select! {
1258 Ok(action) = broadcast_rx.recv() => {
1259 match action {
1260 CoreAction::Shutdown => break,
1261 CoreAction::Reload => {
1262 let tls_acceptor = match crypto::setup_tls(&tls_config) {
1263 Ok(Some(tls_acc)) => tls_acc,
1264 Ok(None) => {
1265 warn!("TLS not configured, ignoring reload request.");
1266 continue;
1267 }
1268 Err(err) => {
1269 error!(?err, "Failed to configure and reload TLS acceptor");
1270 continue;
1271 }
1272 };
1273
1274 if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1277 error!("TLS acceptor did not accept the reload, the server may have failed!");
1278 };
1279 info!("TLS acceptor reload notification sent");
1280 },
1281 }
1282 }
1283 }
1284 }
1285 info!("Stopped {}", TaskName::TlsAcceptorReload);
1286 });
1287
1288 let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1290 let maybe_backup_handle = match &config.online_backup {
1292 Some(online_backup_config) => {
1293 if online_backup_config.enabled {
1294 let handle = IntervalActor::start_online_backup(
1295 server_read_ref,
1296 online_backup_config,
1297 broadcast_tx.subscribe(),
1298 )?;
1299 Some(handle)
1300 } else {
1301 debug!("Backups disabled");
1302 None
1303 }
1304 }
1305 None => {
1306 debug!("Online backup not requested, skipping");
1307 None
1308 }
1309 };
1310
1311 let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1313 Some(la) => {
1314 let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1315
1316 let h = ldaps::create_ldap_server(
1317 la,
1318 opt_ldap_ssl_acceptor,
1319 server_read_ref,
1320 &broadcast_tx,
1321 &tls_acceptor_reload_tx,
1322 config.ldap_client_address_info.trusted_tcp_info(),
1323 )
1324 .await?;
1325 Some(h)
1326 }
1327 None => {
1328 debug!("LDAP not requested, skipping");
1329 None
1330 }
1331 };
1332
1333 let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1336 Some(rc) => {
1337 if !config_test {
1338 let (h, repl_ctrl_tx) =
1340 repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1341 .await?;
1342 (Some(h), Some(repl_ctrl_tx))
1343 } else {
1344 (None, None)
1345 }
1346 }
1347 None => {
1348 debug!("Replication not requested, skipping");
1349 (None, None)
1350 }
1351 };
1352
1353 let maybe_http_acceptor_handles = if config_test {
1354 admin_info!("This config rocks! 🪨 ");
1355 None
1356 } else {
1357 let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1358 config.clone(),
1359 jws_signer,
1360 status_ref,
1361 server_write_ref,
1362 server_read_ref,
1363 broadcast_tx.clone(),
1364 maybe_tls_acceptor,
1365 &tls_acceptor_reload_tx,
1366 )
1367 .await
1368 .inspect_err(|err| {
1369 error!(?err, "Failed to start HTTPS server");
1370 })?;
1371
1372 if config.role != ServerRole::WriteReplicaNoUI {
1373 admin_info!("ready to rock! 🪨 UI available at: {}", config.origin);
1374 } else {
1375 admin_info!("ready to rock! 🪨 ");
1376 }
1377 Some(handles)
1378 };
1379
1380 let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1382 let broadcast_tx_ = broadcast_tx.clone();
1383
1384 let admin_handle = AdminActor::create_admin_sock(
1385 config.adminbindpath.as_str(),
1386 server_write_ref,
1387 server_read_ref,
1388 broadcast_tx_,
1389 maybe_repl_ctrl_tx,
1390 )
1391 .await?;
1392
1393 Some(admin_handle)
1394 } else {
1395 None
1396 };
1397
1398 let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1399 (TaskName::IntervalActor, interval_handle),
1400 (TaskName::DelayedActionActor, delayed_handle),
1401 (TaskName::AuditdActor, auditd_handle),
1402 (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1403 (TaskName::MigrationReload, migration_reload_handle),
1404 ];
1405
1406 if let Some(backup_handle) = maybe_backup_handle {
1407 handles.push((TaskName::BackupActor, backup_handle))
1408 }
1409
1410 if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1411 handles.push((TaskName::AdminSocket, admin_sock_handle))
1412 }
1413
1414 if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1415 for ldap_handle in ldap_handles {
1416 handles.push((TaskName::LdapActor, ldap_handle))
1417 }
1418 }
1419
1420 if let Some(http_handles) = maybe_http_acceptor_handles {
1421 for http_handle in http_handles {
1422 handles.push((TaskName::HttpsServer, http_handle))
1423 }
1424 }
1425
1426 if let Some(repl_handle) = maybe_repl_handle {
1427 handles.push((TaskName::Replication, repl_handle))
1428 }
1429
1430 Ok(CoreHandle {
1431 clean_shutdown: false,
1432 tx: broadcast_tx,
1433 handles,
1434 })
1435}