1#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23#![deny(clippy::indexing_slicing)]
24
25#[macro_use]
26extern crate tracing;
27#[macro_use]
28extern crate kanidmd_lib;
29
30mod actors;
31pub mod admin;
32pub mod config;
33mod crypto;
34mod https;
35mod interval;
36mod ldaps;
37mod repl;
38mod tcp;
39mod utils;
40
41use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
42use crate::admin::AdminActor;
43use crate::config::Configuration;
44use crate::interval::IntervalActor;
45use crate::utils::touch_file_or_quit;
46use compact_jwt::{JwsHs256Signer, JwsSigner};
47use crypto_glue::{
48 s256::{Sha256, Sha256Output},
49 traits::Digest,
50};
51use kanidm_proto::backup::BackupCompression;
52use kanidm_proto::config::ServerRole;
53use kanidm_proto::internal::OperationError;
54use kanidm_proto::scim_v1::client::ScimAssertGeneric;
55use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
56use kanidmd_lib::idm::ldap::LdapServer;
57use kanidmd_lib::prelude::*;
58use kanidmd_lib::schema::Schema;
59use kanidmd_lib::status::StatusActor;
60use kanidmd_lib::value::CredentialType;
61use regex::Regex;
62use std::collections::BTreeSet;
63use std::fmt::{Display, Formatter};
64use std::path::Path;
65use std::path::PathBuf;
66use std::sync::Arc;
67use std::sync::LazyLock;
68use tokio::sync::broadcast;
69use tokio::task;
70
71#[cfg(not(target_family = "windows"))]
72use libc::umask;
73
74fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
77 setup_backend_vacuum(config, schema, false)
78}
79
80fn setup_backend_vacuum(
81 config: &Configuration,
82 schema: &Schema,
83 vacuum: bool,
84) -> Result<Backend, OperationError> {
85 let schema_txn = schema.write();
88 let idxmeta = schema_txn.reload_idxmeta();
89
90 let pool_size: u32 = config.threads as u32;
91
92 let cfg = BackendConfig::new(
93 config.db_path.as_deref(),
94 pool_size,
95 config.db_fs_type.unwrap_or_default(),
96 config.db_arc_size,
97 );
98
99 Backend::new(cfg, idxmeta, vacuum)
100}
101
102async fn setup_qs_idms(
107 be: Backend,
108 schema: Schema,
109 config: &Configuration,
110) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
111 let curtime = duration_from_epoch_now();
112 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
114
115 query_server
124 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
125 .await?;
126
127 let is_integration_test = config.integration_test_config.is_some();
129 let (idms, idms_delayed, idms_audit) = IdmServer::new(
130 query_server.clone(),
131 &config.origin,
132 is_integration_test,
133 curtime,
134 )
135 .await?;
136
137 Ok((query_server, idms, idms_delayed, idms_audit))
138}
139
140async fn setup_qs(
141 be: Backend,
142 schema: Schema,
143 config: &Configuration,
144) -> Result<QueryServer, OperationError> {
145 let curtime = duration_from_epoch_now();
146 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
148
149 query_server
158 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
159 .await?;
160
161 Ok(query_server)
162}
163
164macro_rules! dbscan_setup_be {
165 (
166 $config:expr
167 ) => {{
168 let schema = match Schema::new() {
169 Ok(s) => s,
170 Err(e) => {
171 error!("Failed to setup in memory schema: {:?}", e);
172 std::process::exit(1);
173 }
174 };
175
176 match setup_backend($config, &schema) {
177 Ok(be) => be,
178 Err(e) => {
179 error!("Failed to setup BE: {:?}", e);
180 return;
181 }
182 }
183 }};
184}
185
186pub fn dbscan_list_indexes_core(config: &Configuration) {
187 let be = dbscan_setup_be!(config);
188 let mut be_rotxn = match be.read() {
189 Ok(txn) => txn,
190 Err(err) => {
191 error!(?err, "Unable to proceed, backend read transaction failure.");
192 return;
193 }
194 };
195
196 match be_rotxn.list_indexes() {
197 Ok(mut idx_list) => {
198 idx_list.sort_unstable();
199 idx_list.iter().for_each(|idx_name| {
200 println!("{idx_name}");
201 })
202 }
203 Err(e) => {
204 error!("Failed to retrieve index list: {:?}", e);
205 }
206 };
207}
208
209pub fn dbscan_list_id2entry_core(config: &Configuration) {
210 let be = dbscan_setup_be!(config);
211 let mut be_rotxn = match be.read() {
212 Ok(txn) => txn,
213 Err(err) => {
214 error!(?err, "Unable to proceed, backend read transaction failure.");
215 return;
216 }
217 };
218
219 match be_rotxn.list_id2entry() {
220 Ok(mut id_list) => {
221 id_list.sort_unstable_by_key(|k| k.0);
222 id_list.iter().for_each(|(id, value)| {
223 println!("{id:>8}: {value}");
224 })
225 }
226 Err(e) => {
227 error!("Failed to retrieve id2entry list: {:?}", e);
228 }
229 };
230}
231
232pub fn dbscan_list_index_analysis_core(config: &Configuration) {
233 let _be = dbscan_setup_be!(config);
234 }
236
237pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
238 let be = dbscan_setup_be!(config);
239 let mut be_rotxn = match be.read() {
240 Ok(txn) => txn,
241 Err(err) => {
242 error!(?err, "Unable to proceed, backend read transaction failure.");
243 return;
244 }
245 };
246
247 match be_rotxn.list_index_content(index_name) {
248 Ok(mut idx_list) => {
249 idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
250 idx_list.iter().for_each(|(key, value)| {
251 println!("{key:>50}: {value:?}");
252 })
253 }
254 Err(e) => {
255 error!("Failed to retrieve index list: {:?}", e);
256 }
257 };
258}
259
260pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
261 let be = dbscan_setup_be!(config);
262 let mut be_rotxn = match be.read() {
263 Ok(txn) => txn,
264 Err(err) => {
265 error!(?err, "Unable to proceed, backend read transaction failure.");
266 return;
267 }
268 };
269
270 match be_rotxn.get_id2entry(id) {
271 Ok((id, value)) => println!("{id:>8}: {value}"),
272 Err(e) => {
273 error!("Failed to retrieve id2entry value: {:?}", e);
274 }
275 };
276}
277
278pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
279 let be = dbscan_setup_be!(config);
280 let mut be_wrtxn = match be.write() {
281 Ok(txn) => txn,
282 Err(err) => {
283 error!(
284 ?err,
285 "Unable to proceed, backend write transaction failure."
286 );
287 return;
288 }
289 };
290
291 match be_wrtxn
292 .quarantine_entry(id)
293 .and_then(|_| be_wrtxn.commit())
294 {
295 Ok(()) => {
296 println!("quarantined - {id:>8}")
297 }
298 Err(e) => {
299 error!("Failed to quarantine id2entry value: {:?}", e);
300 }
301 };
302}
303
304pub fn dbscan_list_quarantined_core(config: &Configuration) {
305 let be = dbscan_setup_be!(config);
306 let mut be_rotxn = match be.read() {
307 Ok(txn) => txn,
308 Err(err) => {
309 error!(?err, "Unable to proceed, backend read transaction failure.");
310 return;
311 }
312 };
313
314 match be_rotxn.list_quarantined() {
315 Ok(mut id_list) => {
316 id_list.sort_unstable_by_key(|k| k.0);
317 id_list.iter().for_each(|(id, value)| {
318 println!("{id:>8}: {value}");
319 })
320 }
321 Err(e) => {
322 error!("Failed to retrieve id2entry list: {:?}", e);
323 }
324 };
325}
326
327pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
328 let be = dbscan_setup_be!(config);
329 let mut be_wrtxn = match be.write() {
330 Ok(txn) => txn,
331 Err(err) => {
332 error!(
333 ?err,
334 "Unable to proceed, backend write transaction failure."
335 );
336 return;
337 }
338 };
339
340 match be_wrtxn
341 .restore_quarantined(id)
342 .and_then(|_| be_wrtxn.commit())
343 {
344 Ok(()) => {
345 println!("restored - {id:>8}")
346 }
347 Err(e) => {
348 error!("Failed to restore quarantined id2entry value: {:?}", e);
349 }
350 };
351}
352
353pub fn backup_server_core(config: &Configuration, dst_path: Option<&Path>) {
354 let schema = match Schema::new() {
355 Ok(s) => s,
356 Err(e) => {
357 error!("Failed to setup in memory schema: {:?}", e);
358 std::process::exit(1);
359 }
360 };
361
362 let be = match setup_backend(config, &schema) {
363 Ok(be) => be,
364 Err(e) => {
365 error!("Failed to setup BE: {:?}", e);
366 return;
367 }
368 };
369
370 let mut be_ro_txn = match be.read() {
371 Ok(txn) => txn,
372 Err(err) => {
373 error!(?err, "Unable to proceed, backend read transaction failure.");
374 return;
375 }
376 };
377
378 let compression = match config.online_backup.as_ref() {
379 Some(backup_config) => backup_config.compression,
380 None => BackupCompression::default(),
381 };
382
383 if let Some(dst_path) = dst_path {
384 if dst_path.exists() {
385 error!(
386 "backup file {} already exists, will not overwrite it.",
387 dst_path.display()
388 );
389 return;
390 }
391
392 let output = match std::fs::File::create(dst_path) {
393 Ok(output) => output,
394 Err(err) => {
395 error!(?err, "File::create error creating {}", dst_path.display());
396 return;
397 }
398 };
399
400 match be_ro_txn.backup(output, compression) {
401 Ok(_) => info!("Backup success!"),
402 Err(e) => {
403 error!("Backup failed: {:?}", e);
404 std::process::exit(1);
405 }
406 };
407 } else {
408 let stdout = std::io::stdout().lock();
410
411 match be_ro_txn.backup(stdout, compression) {
412 Ok(_) => info!("Backup success!"),
413 Err(e) => {
414 error!("Backup failed: {:?}", e);
415 std::process::exit(1);
416 }
417 };
418 };
419 }
421
422pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
423 if let Some(db_path) = config.db_path.as_ref() {
425 touch_file_or_quit(db_path);
426 }
427
428 let schema = match Schema::new() {
430 Ok(s) => s,
431 Err(e) => {
432 error!("Failed to setup in memory schema: {:?}", e);
433 std::process::exit(1);
434 }
435 };
436
437 let be = match setup_backend(config, &schema) {
438 Ok(be) => be,
439 Err(e) => {
440 error!("Failed to setup backend: {:?}", e);
441 return;
442 }
443 };
444
445 let mut be_wr_txn = match be.write() {
446 Ok(txn) => txn,
447 Err(err) => {
448 error!(
449 ?err,
450 "Unable to proceed, backend write transaction failure."
451 );
452 return;
453 }
454 };
455
456 let compression = BackupCompression::identify_file(dst_path);
457
458 let input = match std::fs::File::open(dst_path) {
459 Ok(output) => output,
460 Err(err) => {
461 error!(?err, "File::open error reading {}", dst_path.display());
462 return;
463 }
464 };
465
466 let r = be_wr_txn
467 .restore(input, compression)
468 .and_then(|_| be_wr_txn.commit());
469
470 if r.is_err() {
471 error!("Failed to restore database: {:?}", r);
472 std::process::exit(1);
473 }
474 info!("Database loaded successfully");
475
476 reindex_inner(be, schema, config).await;
477
478 info!("✅ Restore Success!");
479}
480
481pub async fn reindex_server_core(config: &Configuration) {
482 info!("Start Index Phase 1 ...");
483 let schema = match Schema::new() {
485 Ok(s) => s,
486 Err(e) => {
487 error!("Failed to setup in memory schema: {:?}", e);
488 std::process::exit(1);
489 }
490 };
491
492 let be = match setup_backend(config, &schema) {
493 Ok(be) => be,
494 Err(e) => {
495 error!("Failed to setup BE: {:?}", e);
496 return;
497 }
498 };
499
500 reindex_inner(be, schema, config).await;
501
502 info!("✅ Reindex Success!");
503}
504
505async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
506 let mut be_wr_txn = match be.write() {
508 Ok(txn) => txn,
509 Err(err) => {
510 error!(
511 ?err,
512 "Unable to proceed, backend write transaction failure."
513 );
514 return;
515 }
516 };
517
518 let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
519
520 if r.is_err() {
522 error!("Failed to reindex database: {:?}", r);
523 std::process::exit(1);
524 }
525 info!("Index Phase 1 Success!");
526
527 info!("Attempting to init query server ...");
528
529 let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
530 Ok(t) => t,
531 Err(e) => {
532 error!("Unable to setup query server or idm server -> {:?}", e);
533 return;
534 }
535 };
536 info!("Init Query Server Success!");
537
538 info!("Start Index Phase 2 ...");
539
540 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
541 error!("Unable to acquire write transaction");
542 return;
543 };
544 let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
545
546 match r {
547 Ok(_) => info!("Index Phase 2 Success!"),
548 Err(e) => {
549 error!("Reindex failed: {:?}", e);
550 std::process::exit(1);
551 }
552 };
553}
554
555pub fn vacuum_server_core(config: &Configuration) {
556 let schema = match Schema::new() {
557 Ok(s) => s,
558 Err(e) => {
559 eprintln!("Failed to setup in memory schema: {e:?}");
560 std::process::exit(1);
561 }
562 };
563
564 let r = setup_backend_vacuum(config, &schema, true);
567
568 match r {
569 Ok(_) => eprintln!("Vacuum Success!"),
570 Err(e) => {
571 eprintln!("Vacuum failed: {e:?}");
572 std::process::exit(1);
573 }
574 };
575}
576
577pub async fn domain_rename_core(config: &Configuration) {
578 let schema = match Schema::new() {
579 Ok(s) => s,
580 Err(e) => {
581 eprintln!("Failed to setup in memory schema: {e:?}");
582 std::process::exit(1);
583 }
584 };
585
586 let be = match setup_backend(config, &schema) {
588 Ok(be) => be,
589 Err(e) => {
590 error!("Failed to setup BE: {:?}", e);
591 return;
592 }
593 };
594
595 let qs = match setup_qs(be, schema, config).await {
597 Ok(t) => t,
598 Err(e) => {
599 error!("Unable to setup query server -> {:?}", e);
600 return;
601 }
602 };
603
604 let new_domain_name = config.domain.as_str();
605
606 match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
608 Ok(old_domain_name) => {
609 admin_info!(?old_domain_name, ?new_domain_name);
610 if old_domain_name == new_domain_name {
611 admin_info!("Domain name not changing, stopping.");
612 return;
613 }
614 admin_debug!(
615 "Domain name is changing from {:?} to {:?}",
616 old_domain_name,
617 new_domain_name
618 );
619 }
620 Err(e) => {
621 admin_error!("Failed to query domain name, quitting! -> {:?}", e);
622 return;
623 }
624 }
625
626 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
627 error!("Unable to acquire write transaction");
628 return;
629 };
630 let r = qs_write
631 .danger_domain_rename(new_domain_name)
632 .and_then(|_| qs_write.commit());
633
634 match r {
635 Ok(_) => info!("Domain Rename Success!"),
636 Err(e) => {
637 error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
638 std::process::exit(1);
639 }
640 };
641}
642
643pub async fn verify_server_core(config: &Configuration) {
644 let curtime = duration_from_epoch_now();
645 let schema_mem = match Schema::new() {
647 Ok(sc) => sc,
648 Err(e) => {
649 error!("Failed to setup in memory schema: {:?}", e);
650 return;
651 }
652 };
653 let be = match setup_backend(config, &schema_mem) {
655 Ok(be) => be,
656 Err(e) => {
657 error!("Failed to setup BE: {:?}", e);
658 return;
659 }
660 };
661
662 let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
663 Ok(qs) => qs,
664 Err(err) => {
665 error!(?err, "Failed to setup query server");
666 return;
667 }
668 };
669
670 let r = server.verify().await;
672
673 if r.is_empty() {
674 eprintln!("Verification passed!");
675 std::process::exit(0);
676 } else {
677 for er in r {
678 error!("{:?}", er);
679 }
680 std::process::exit(1);
681 }
682
683 }
685
686pub fn cert_generate_core(config: &Configuration) {
687 let (tls_key_path, tls_chain_path) = match &config.tls_config {
690 Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
691 None => {
692 error!("Unable to find TLS configuration");
693 std::process::exit(1);
694 }
695 };
696
697 if tls_key_path.exists() && tls_chain_path.exists() {
698 info!(
699 "TLS key and chain already exist - remove them first if you intend to regenerate these"
700 );
701 return;
702 }
703
704 let origin_domain = match config.origin.domain() {
705 Some(val) => val,
706 None => {
707 error!("origin does not contain a valid domain");
708 std::process::exit(1);
709 }
710 };
711
712 let cert_root = match tls_key_path.parent() {
713 Some(parent) => parent,
714 None => {
715 error!("Unable to find parent directory of {:?}", tls_key_path);
716 std::process::exit(1);
717 }
718 };
719
720 let ca_cert = cert_root.join("ca.pem");
721 let ca_key = cert_root.join("cakey.pem");
722 let tls_cert_path = cert_root.join("cert.pem");
723
724 let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
725 let ca_handle = match crypto::build_ca(None) {
727 Ok(ca_handle) => ca_handle,
728 Err(e) => {
729 error!(err = ?e, "Failed to build CA");
730 std::process::exit(1);
731 }
732 };
733
734 if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
735 error!("Failed to write CA");
736 std::process::exit(1);
737 }
738
739 ca_handle
740 } else {
741 match crypto::load_ca(ca_key, ca_cert) {
742 Ok(ca_handle) => ca_handle,
743 Err(_) => {
744 error!("Failed to load CA");
745 std::process::exit(1);
746 }
747 }
748 };
749
750 if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
751 let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
753 Ok(cert_handle) => cert_handle,
754 Err(e) => {
755 error!(err = ?e, "Failed to build certificate");
756 std::process::exit(1);
757 }
758 };
759
760 if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
761 error!("Failed to write certificates");
762 std::process::exit(1);
763 }
764 }
765 info!("certificate generation complete");
766}
767
768static MIGRATION_PATH_RE: LazyLock<Regex> = LazyLock::new(|| {
769 #[allow(clippy::expect_used)]
770 Regex::new("^\\d\\d-.*\\.h?json$").expect("Invalid SPN regex found")
771});
772
773struct ScimMigration {
774 path: PathBuf,
775 hash: Sha256Output,
776 assertions: ScimAssertGeneric,
777}
778
779#[instrument(
780 level = "info",
781 fields(uuid = ?eventid),
782 skip_all,
783)]
784async fn migration_apply(
785 eventid: Uuid,
786 server_write_ref: &'static QueryServerWriteV1,
787 migration_path: &Path,
788) {
789 if !migration_path.exists() {
790 info!(migration_path = %migration_path.display(), "Migration path does not exist - migrations will be skipped.");
791 return;
792 }
793
794 let mut dir_ents = match tokio::fs::read_dir(migration_path).await {
795 Ok(dir_ents) => dir_ents,
796 Err(err) => {
797 error!(?err, "Unable to read migration directory.");
798 let diag = kanidm_lib_file_permissions::diagnose_path(migration_path);
799 info!(%diag);
800 return;
801 }
802 };
803
804 let mut migration_paths = Vec::with_capacity(8);
805
806 loop {
807 match dir_ents.next_entry().await {
808 Ok(Some(dir_ent)) => migration_paths.push(dir_ent.path()),
809 Ok(None) => {
810 break;
812 }
813 Err(err) => {
814 error!(?err, "Unable to read directory entries.");
815 return;
816 }
817 }
818 }
819
820 let mut migration_paths: Vec<_> = migration_paths.into_iter()
823 .filter(|path| {
824 if !path.is_file() {
825 info!(path = %path.display(), "ignoring path that is not a file.");
826 return false;
827 }
828
829 let Some(file_name) = path.file_name().and_then(std::ffi::OsStr::to_str) else {
830 info!(path = %path.display(), "ignoring path that has no file name, or is not a valid utf-8 file name.");
831 return false;
832 };
833
834 if !MIGRATION_PATH_RE.is_match(file_name) {
835 info!(path = %path.display(), "ignoring file that does not match naming pattern.");
836 info!("expected pattern 'XX-NAME.json' where XX are two numbers, followed by a hypen, with the file extension .json");
837 return false;
838 }
839
840 true
841 })
842 .collect();
843
844 migration_paths.sort_unstable();
845 let mut migrations = Vec::with_capacity(migration_paths.len());
846
847 for migration_path in migration_paths {
848 info!(path = %migration_path.display(), "examining migration");
849
850 let migration_content = match tokio::fs::read(&migration_path).await {
851 Ok(bytes) => bytes,
852 Err(err) => {
853 error!(?err, "Unable to read migration - it will be ignored.");
854 let diag = kanidm_lib_file_permissions::diagnose_path(&migration_path);
855 info!(%diag);
856 continue;
857 }
858 };
859
860 let assertions: ScimAssertGeneric = match serde_hjson::from_slice(&migration_content) {
862 Ok(assertions) => assertions,
863 Err(err) => {
864 error!(?err, path = %migration_path.display(), "Invalid JSON SCIM Assertion");
865 continue;
866 }
867 };
868
869 let mut hasher = Sha256::new();
871 hasher.update(&migration_content);
872 let migration_hash: Sha256Output = hasher.finalize();
873
874 migrations.push(ScimMigration {
875 path: migration_path,
876 hash: migration_hash,
877 assertions,
878 });
879 }
880
881 let mut migration_ids = BTreeSet::new();
882 for migration in &migrations {
883 if !migration_ids.insert(migration.assertions.id) {
885 error!(path = %migration.path.display(), uuid = ?migration.assertions.id, "Duplicate migration UUID found, refusing to proceed!!! All migrations must have a unique ID!!!");
886 return;
887 }
888 }
889
890 for ScimMigration {
893 path,
894 hash,
895 assertions,
896 } in migrations
897 {
898 if let Err(err) = server_write_ref
899 .handle_scim_migration_apply(eventid, assertions, hash)
900 .await
901 {
902 error!(?err, path = %path.display(), "Failed to apply migration");
903 };
904 }
905}
906
907#[derive(Clone, Debug)]
908pub enum CoreAction {
909 Shutdown,
910 Reload,
911}
912
913pub(crate) enum TaskName {
914 AdminSocket,
915 AuditdActor,
916 BackupActor,
917 DelayedActionActor,
918 HttpsServer,
919 IntervalActor,
920 LdapActor,
921 Replication,
922 TlsAcceptorReload,
923 MigrationReload,
924}
925
926impl Display for TaskName {
927 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
928 write!(
929 f,
930 "{}",
931 match self {
932 TaskName::AdminSocket => "Admin Socket",
933 TaskName::AuditdActor => "Auditd Actor",
934 TaskName::BackupActor => "Backup Actor",
935 TaskName::DelayedActionActor => "Delayed Action Actor",
936 TaskName::HttpsServer => "HTTPS Server",
937 TaskName::IntervalActor => "Interval Actor",
938 TaskName::LdapActor => "LDAP Acceptor Actor",
939 TaskName::Replication => "Replication",
940 TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
941 TaskName::MigrationReload => "Migration Reload Monitor",
942 }
943 )
944 }
945}
946
947pub struct CoreHandle {
948 clean_shutdown: bool,
949 tx: broadcast::Sender<CoreAction>,
950 handles: Vec<(TaskName, task::JoinHandle<()>)>,
952}
953
954impl CoreHandle {
955 pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
956 self.tx.subscribe()
957 }
958
959 pub async fn shutdown(&mut self) {
960 if self.tx.send(CoreAction::Shutdown).is_err() {
961 eprintln!("No receivers acked shutdown request. Treating as unclean.");
962 return;
963 }
964
965 while let Some((handle_name, handle)) = self.handles.pop() {
967 debug!("Waiting for {handle_name} ...");
968 if let Err(error) = handle.await {
969 eprintln!("Task {handle_name} failed to finish: {error:?}");
970 }
971 }
972
973 self.clean_shutdown = true;
974 }
975
976 pub async fn reload(&mut self) {
977 if self.tx.send(CoreAction::Reload).is_err() {
978 eprintln!("No receivers acked reload request.");
979 }
980 }
981}
982
983impl Drop for CoreHandle {
984 fn drop(&mut self) {
985 if !self.clean_shutdown {
986 eprintln!("⚠️ UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
987 }
988 }
991}
992
993pub async fn create_server_core(
994 config: Configuration,
995 config_test: bool,
996) -> Result<CoreHandle, ()> {
997 let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
999
1000 if config.integration_test_config.is_some() {
1001 warn!("RUNNING IN INTEGRATION TEST MODE.");
1002 warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
1003 } else if config.tls_config.is_none() {
1004 error!("Running without TLS is not supported! Quitting!");
1006 return Err(());
1007 }
1008
1009 info!(
1010 "Starting kanidm with {}configuration: {}",
1011 if config_test { "TEST " } else { "" },
1012 config
1013 );
1014 #[cfg(not(target_family = "windows"))]
1016 unsafe {
1017 umask(0o0027)
1018 };
1019
1020 let status_ref = StatusActor::start();
1023
1024 let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
1026 Ok(tls_acc) => tls_acc,
1027 Err(err) => {
1028 error!(?err, "Failed to configure TLS acceptor");
1029 return Err(());
1030 }
1031 };
1032
1033 let schema = match Schema::new() {
1034 Ok(s) => s,
1035 Err(e) => {
1036 error!("Failed to setup in memory schema: {:?}", e);
1037 return Err(());
1038 }
1039 };
1040
1041 let be = match setup_backend(&config, &schema) {
1043 Ok(be) => be,
1044 Err(e) => {
1045 error!("Failed to setup BE -> {:?}", e);
1046 return Err(());
1047 }
1048 };
1049 let (_qs, idms, mut idms_delayed, mut idms_audit) =
1051 match setup_qs_idms(be, schema, &config).await {
1052 Ok(t) => t,
1053 Err(e) => {
1054 error!("Unable to setup query server or idm server -> {:?}", e);
1055 return Err(());
1056 }
1057 };
1058
1059 let jws_signer = match JwsHs256Signer::generate_hs256() {
1062 Ok(k) => k.set_sign_option_embed_kid(false),
1063 Err(e) => {
1064 error!("Unable to setup jws signer -> {:?}", e);
1065 return Err(());
1066 }
1067 };
1068
1069 if let Some(itc) = &config.integration_test_config {
1071 let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
1072 error!("Unable to acquire write transaction");
1073 return Err(());
1074 };
1075 match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
1077 Ok(_) => {}
1078 Err(e) => {
1079 error!(
1080 "Unable to configure INTEGRATION TEST {} account -> {:?}",
1081 &itc.admin_user, e
1082 );
1083 return Err(());
1084 }
1085 };
1086 match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
1088 Ok(_) => {}
1089 Err(e) => {
1090 error!(
1091 "Unable to configure INTEGRATION TEST {} account -> {:?}",
1092 &itc.idm_admin_user, e
1093 );
1094 return Err(());
1095 }
1096 };
1097
1098 match idms_prox_write.qs_write.internal_modify_uuid(
1102 UUID_IDM_ADMINS,
1103 &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
1104 ) {
1105 Ok(_) => {}
1106 Err(e) => {
1107 error!(
1108 "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
1109 e
1110 );
1111 return Err(());
1112 }
1113 };
1114
1115 match idms_prox_write.qs_write.internal_modify_uuid(
1116 UUID_IDM_ALL_PERSONS,
1117 &ModifyList::new_purge_and_set(
1118 Attribute::CredentialTypeMinimum,
1119 CredentialType::Any.into(),
1120 ),
1121 ) {
1122 Ok(_) => {}
1123 Err(e) => {
1124 error!(
1125 "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
1126 e
1127 );
1128 return Err(());
1129 }
1130 };
1131
1132 match idms_prox_write.commit() {
1133 Ok(_) => {}
1134 Err(e) => {
1135 error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
1136 return Err(());
1137 }
1138 }
1139 }
1140
1141 let ldap = match LdapServer::new(&idms).await {
1142 Ok(l) => l,
1143 Err(e) => {
1144 error!("Unable to start LdapServer -> {:?}", e);
1145 return Err(());
1146 }
1147 };
1148
1149 let idms_arc = Arc::new(idms);
1151 let ldap_arc = Arc::new(ldap);
1152
1153 let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
1156
1157 let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
1159
1160 let delayed_handle = task::spawn(async move {
1161 let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
1162 loop {
1163 tokio::select! {
1164 added = idms_delayed.recv_many(&mut buffer) => {
1165 if added == 0 {
1166 break
1168 }
1169 server_write_ref.handle_delayedaction(&mut buffer).await;
1170 }
1171 Ok(action) = broadcast_rx.recv() => {
1172 match action {
1173 CoreAction::Shutdown => break,
1174 CoreAction::Reload => {},
1175 }
1176 }
1177 }
1178 }
1179 info!("Stopped {}", TaskName::DelayedActionActor);
1180 });
1181
1182 let mut broadcast_rx = broadcast_tx.subscribe();
1183
1184 let auditd_handle = task::spawn(async move {
1185 loop {
1186 tokio::select! {
1187 Ok(action) = broadcast_rx.recv() => {
1188 match action {
1189 CoreAction::Shutdown => break,
1190 CoreAction::Reload => {},
1191 }
1192 }
1193 audit_event = idms_audit.audit_rx().recv() => {
1194 match serde_json::to_string(&audit_event) {
1195 Ok(audit_event) => {
1196 warn!(%audit_event);
1197 }
1198 Err(e) => {
1199 error!(err=?e, "Unable to process audit event to json.");
1200 warn!(?audit_event, json=false);
1201 }
1202 }
1203
1204 }
1205 }
1206 }
1207 info!("Stopped {}", TaskName::AuditdActor);
1208 });
1209
1210 let migration_path = config
1212 .migration_path
1213 .clone()
1214 .unwrap_or(PathBuf::from(env!("KANIDM_SERVER_MIGRATION_PATH")));
1215
1216 if config.integration_test_config.is_none() {
1217 let eventid = Uuid::new_v4();
1218 migration_apply(eventid, server_write_ref, migration_path.as_path()).await;
1219 }
1220
1221 let mut broadcast_rx = broadcast_tx.subscribe();
1223 let migration_reload_handle = task::spawn(async move {
1224 loop {
1225 tokio::select! {
1226 Ok(action) = broadcast_rx.recv() => {
1227 match action {
1228 CoreAction::Shutdown => break,
1229 CoreAction::Reload => {
1230 let eventid = Uuid::new_v4();
1233 migration_apply(
1234 eventid,
1235 server_write_ref,
1236 migration_path.as_path(),
1237 ).await;
1238
1239 info!("Migration reload complete");
1240 },
1241 }
1242 }
1243 }
1244 }
1245 info!("Stopped {}", TaskName::MigrationReload);
1246 });
1247
1248 let mut broadcast_rx = broadcast_tx.subscribe();
1251 let tls_config = config.tls_config.clone();
1252
1253 let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1254 let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1255
1256 let tls_acceptor_reload_handle = task::spawn(async move {
1257 loop {
1258 tokio::select! {
1259 Ok(action) = broadcast_rx.recv() => {
1260 match action {
1261 CoreAction::Shutdown => break,
1262 CoreAction::Reload => {
1263 let tls_acceptor = match crypto::setup_tls(&tls_config) {
1264 Ok(Some(tls_acc)) => tls_acc,
1265 Ok(None) => {
1266 warn!("TLS not configured, ignoring reload request.");
1267 continue;
1268 }
1269 Err(err) => {
1270 error!(?err, "Failed to configure and reload TLS acceptor");
1271 continue;
1272 }
1273 };
1274
1275 if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1278 error!("TLS acceptor did not accept the reload, the server may have failed!");
1279 };
1280 info!("TLS acceptor reload notification sent");
1281 },
1282 }
1283 }
1284 }
1285 }
1286 info!("Stopped {}", TaskName::TlsAcceptorReload);
1287 });
1288
1289 let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1291 let maybe_backup_handle = match &config.online_backup {
1293 Some(online_backup_config) => {
1294 if online_backup_config.enabled {
1295 let handle = IntervalActor::start_online_backup(
1296 server_read_ref,
1297 online_backup_config,
1298 broadcast_tx.subscribe(),
1299 )?;
1300 Some(handle)
1301 } else {
1302 debug!("Backups disabled");
1303 None
1304 }
1305 }
1306 None => {
1307 debug!("Online backup not requested, skipping");
1308 None
1309 }
1310 };
1311
1312 let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1314 Some(la) => {
1315 let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1316
1317 let h = ldaps::create_ldap_server(
1318 la,
1319 opt_ldap_ssl_acceptor,
1320 server_read_ref,
1321 &broadcast_tx,
1322 &tls_acceptor_reload_tx,
1323 config.ldap_client_address_info.trusted_tcp_info(),
1324 )
1325 .await?;
1326 Some(h)
1327 }
1328 None => {
1329 debug!("LDAP not requested, skipping");
1330 None
1331 }
1332 };
1333
1334 let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1337 Some(rc) => {
1338 if !config_test {
1339 let (h, repl_ctrl_tx) =
1341 repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1342 .await?;
1343 (Some(h), Some(repl_ctrl_tx))
1344 } else {
1345 (None, None)
1346 }
1347 }
1348 None => {
1349 debug!("Replication not requested, skipping");
1350 (None, None)
1351 }
1352 };
1353
1354 let maybe_http_acceptor_handles = if config_test {
1355 admin_info!("This config rocks! 🪨 ");
1356 None
1357 } else {
1358 let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1359 config.clone(),
1360 jws_signer,
1361 status_ref,
1362 server_write_ref,
1363 server_read_ref,
1364 broadcast_tx.clone(),
1365 maybe_tls_acceptor,
1366 &tls_acceptor_reload_tx,
1367 )
1368 .await
1369 .inspect_err(|err| {
1370 error!(?err, "Failed to start HTTPS server");
1371 })?;
1372
1373 if config.role != ServerRole::WriteReplicaNoUI {
1374 admin_info!("ready to rock! 🪨 UI available at: {}", config.origin);
1375 } else {
1376 admin_info!("ready to rock! 🪨 ");
1377 }
1378 Some(handles)
1379 };
1380
1381 let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1383 let broadcast_tx_ = broadcast_tx.clone();
1384
1385 let admin_handle = AdminActor::create_admin_sock(
1386 config.adminbindpath.as_str(),
1387 server_write_ref,
1388 server_read_ref,
1389 broadcast_tx_,
1390 maybe_repl_ctrl_tx,
1391 )
1392 .await?;
1393
1394 Some(admin_handle)
1395 } else {
1396 None
1397 };
1398
1399 let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1400 (TaskName::IntervalActor, interval_handle),
1401 (TaskName::DelayedActionActor, delayed_handle),
1402 (TaskName::AuditdActor, auditd_handle),
1403 (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1404 (TaskName::MigrationReload, migration_reload_handle),
1405 ];
1406
1407 if let Some(backup_handle) = maybe_backup_handle {
1408 handles.push((TaskName::BackupActor, backup_handle))
1409 }
1410
1411 if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1412 handles.push((TaskName::AdminSocket, admin_sock_handle))
1413 }
1414
1415 if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1416 for ldap_handle in ldap_handles {
1417 handles.push((TaskName::LdapActor, ldap_handle))
1418 }
1419 }
1420
1421 if let Some(http_handles) = maybe_http_acceptor_handles {
1422 for http_handle in http_handles {
1423 handles.push((TaskName::HttpsServer, http_handle))
1424 }
1425 }
1426
1427 if let Some(repl_handle) = maybe_repl_handle {
1428 handles.push((TaskName::Replication, repl_handle))
1429 }
1430
1431 Ok(CoreHandle {
1432 clean_shutdown: false,
1433 tx: broadcast_tx,
1434 handles,
1435 })
1436}