1#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23
24#[macro_use]
25extern crate tracing;
26#[macro_use]
27extern crate kanidmd_lib;
28
29mod actors;
30pub mod admin;
31pub mod config;
32mod crypto;
33mod https;
34mod interval;
35mod ldaps;
36mod repl;
37mod utils;
38
39use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
40use crate::admin::AdminActor;
41use crate::config::{Configuration, ServerRole};
42use crate::interval::IntervalActor;
43use crate::utils::touch_file_or_quit;
44use compact_jwt::{JwsHs256Signer, JwsSigner};
45use kanidm_proto::internal::OperationError;
46use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
47use kanidmd_lib::idm::ldap::LdapServer;
48use kanidmd_lib::prelude::*;
49use kanidmd_lib::schema::Schema;
50use kanidmd_lib::status::StatusActor;
51use kanidmd_lib::value::CredentialType;
52#[cfg(not(target_family = "windows"))]
53use libc::umask;
54use std::fmt::{Display, Formatter};
55use std::path::Path;
56use std::sync::Arc;
57use tokio::sync::broadcast;
58use tokio::sync::mpsc;
59use tokio::sync::Notify;
60use tokio::task;
61
62fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
65 setup_backend_vacuum(config, schema, false)
66}
67
68fn setup_backend_vacuum(
69 config: &Configuration,
70 schema: &Schema,
71 vacuum: bool,
72) -> Result<Backend, OperationError> {
73 let schema_txn = schema.write();
76 let idxmeta = schema_txn.reload_idxmeta();
77
78 let pool_size: u32 = config.threads as u32;
79
80 let cfg = BackendConfig::new(
81 config.db_path.as_deref(),
82 pool_size,
83 config.db_fs_type.unwrap_or_default(),
84 config.db_arc_size,
85 );
86
87 Backend::new(cfg, idxmeta, vacuum)
88}
89
90async fn setup_qs_idms(
95 be: Backend,
96 schema: Schema,
97 config: &Configuration,
98) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
99 let curtime = duration_from_epoch_now();
100 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
102
103 query_server
112 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
113 .await?;
114
115 let is_integration_test = config.integration_test_config.is_some();
117 let (idms, idms_delayed, idms_audit) = IdmServer::new(
118 query_server.clone(),
119 &config.origin,
120 is_integration_test,
121 curtime,
122 )
123 .await?;
124
125 Ok((query_server, idms, idms_delayed, idms_audit))
126}
127
128async fn setup_qs(
129 be: Backend,
130 schema: Schema,
131 config: &Configuration,
132) -> Result<QueryServer, OperationError> {
133 let curtime = duration_from_epoch_now();
134 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
136
137 query_server
146 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
147 .await?;
148
149 Ok(query_server)
150}
151
152macro_rules! dbscan_setup_be {
153 (
154 $config:expr
155 ) => {{
156 let schema = match Schema::new() {
157 Ok(s) => s,
158 Err(e) => {
159 error!("Failed to setup in memory schema: {:?}", e);
160 std::process::exit(1);
161 }
162 };
163
164 match setup_backend($config, &schema) {
165 Ok(be) => be,
166 Err(e) => {
167 error!("Failed to setup BE: {:?}", e);
168 return;
169 }
170 }
171 }};
172}
173
174pub fn dbscan_list_indexes_core(config: &Configuration) {
175 let be = dbscan_setup_be!(config);
176 let mut be_rotxn = match be.read() {
177 Ok(txn) => txn,
178 Err(err) => {
179 error!(?err, "Unable to proceed, backend read transaction failure.");
180 return;
181 }
182 };
183
184 match be_rotxn.list_indexes() {
185 Ok(mut idx_list) => {
186 idx_list.sort_unstable();
187 idx_list.iter().for_each(|idx_name| {
188 println!("{}", idx_name);
189 })
190 }
191 Err(e) => {
192 error!("Failed to retrieve index list: {:?}", e);
193 }
194 };
195}
196
197pub fn dbscan_list_id2entry_core(config: &Configuration) {
198 let be = dbscan_setup_be!(config);
199 let mut be_rotxn = match be.read() {
200 Ok(txn) => txn,
201 Err(err) => {
202 error!(?err, "Unable to proceed, backend read transaction failure.");
203 return;
204 }
205 };
206
207 match be_rotxn.list_id2entry() {
208 Ok(mut id_list) => {
209 id_list.sort_unstable_by_key(|k| k.0);
210 id_list.iter().for_each(|(id, value)| {
211 println!("{:>8}: {}", id, value);
212 })
213 }
214 Err(e) => {
215 error!("Failed to retrieve id2entry list: {:?}", e);
216 }
217 };
218}
219
220pub fn dbscan_list_index_analysis_core(config: &Configuration) {
221 let _be = dbscan_setup_be!(config);
222 }
224
225pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
226 let be = dbscan_setup_be!(config);
227 let mut be_rotxn = match be.read() {
228 Ok(txn) => txn,
229 Err(err) => {
230 error!(?err, "Unable to proceed, backend read transaction failure.");
231 return;
232 }
233 };
234
235 match be_rotxn.list_index_content(index_name) {
236 Ok(mut idx_list) => {
237 idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
238 idx_list.iter().for_each(|(key, value)| {
239 println!("{:>50}: {:?}", key, value);
240 })
241 }
242 Err(e) => {
243 error!("Failed to retrieve index list: {:?}", e);
244 }
245 };
246}
247
248pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
249 let be = dbscan_setup_be!(config);
250 let mut be_rotxn = match be.read() {
251 Ok(txn) => txn,
252 Err(err) => {
253 error!(?err, "Unable to proceed, backend read transaction failure.");
254 return;
255 }
256 };
257
258 match be_rotxn.get_id2entry(id) {
259 Ok((id, value)) => println!("{:>8}: {}", id, value),
260 Err(e) => {
261 error!("Failed to retrieve id2entry value: {:?}", e);
262 }
263 };
264}
265
266pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
267 let be = dbscan_setup_be!(config);
268 let mut be_wrtxn = match be.write() {
269 Ok(txn) => txn,
270 Err(err) => {
271 error!(
272 ?err,
273 "Unable to proceed, backend write transaction failure."
274 );
275 return;
276 }
277 };
278
279 match be_wrtxn
280 .quarantine_entry(id)
281 .and_then(|_| be_wrtxn.commit())
282 {
283 Ok(()) => {
284 println!("quarantined - {:>8}", id)
285 }
286 Err(e) => {
287 error!("Failed to quarantine id2entry value: {:?}", e);
288 }
289 };
290}
291
292pub fn dbscan_list_quarantined_core(config: &Configuration) {
293 let be = dbscan_setup_be!(config);
294 let mut be_rotxn = match be.read() {
295 Ok(txn) => txn,
296 Err(err) => {
297 error!(?err, "Unable to proceed, backend read transaction failure.");
298 return;
299 }
300 };
301
302 match be_rotxn.list_quarantined() {
303 Ok(mut id_list) => {
304 id_list.sort_unstable_by_key(|k| k.0);
305 id_list.iter().for_each(|(id, value)| {
306 println!("{:>8}: {}", id, value);
307 })
308 }
309 Err(e) => {
310 error!("Failed to retrieve id2entry list: {:?}", e);
311 }
312 };
313}
314
315pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
316 let be = dbscan_setup_be!(config);
317 let mut be_wrtxn = match be.write() {
318 Ok(txn) => txn,
319 Err(err) => {
320 error!(
321 ?err,
322 "Unable to proceed, backend write transaction failure."
323 );
324 return;
325 }
326 };
327
328 match be_wrtxn
329 .restore_quarantined(id)
330 .and_then(|_| be_wrtxn.commit())
331 {
332 Ok(()) => {
333 println!("restored - {:>8}", id)
334 }
335 Err(e) => {
336 error!("Failed to restore quarantined id2entry value: {:?}", e);
337 }
338 };
339}
340
341pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
342 let schema = match Schema::new() {
343 Ok(s) => s,
344 Err(e) => {
345 error!("Failed to setup in memory schema: {:?}", e);
346 std::process::exit(1);
347 }
348 };
349
350 let be = match setup_backend(config, &schema) {
351 Ok(be) => be,
352 Err(e) => {
353 error!("Failed to setup BE: {:?}", e);
354 return;
355 }
356 };
357
358 let mut be_ro_txn = match be.read() {
359 Ok(txn) => txn,
360 Err(err) => {
361 error!(?err, "Unable to proceed, backend read transaction failure.");
362 return;
363 }
364 };
365
366 let r = be_ro_txn.backup(dst_path);
367 match r {
368 Ok(_) => info!("Backup success!"),
369 Err(e) => {
370 error!("Backup failed: {:?}", e);
371 std::process::exit(1);
372 }
373 };
374 }
376
377pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
378 if let Some(db_path) = config.db_path.as_ref() {
380 touch_file_or_quit(db_path);
381 }
382
383 let schema = match Schema::new() {
385 Ok(s) => s,
386 Err(e) => {
387 error!("Failed to setup in memory schema: {:?}", e);
388 std::process::exit(1);
389 }
390 };
391
392 let be = match setup_backend(config, &schema) {
393 Ok(be) => be,
394 Err(e) => {
395 error!("Failed to setup backend: {:?}", e);
396 return;
397 }
398 };
399
400 let mut be_wr_txn = match be.write() {
401 Ok(txn) => txn,
402 Err(err) => {
403 error!(
404 ?err,
405 "Unable to proceed, backend write transaction failure."
406 );
407 return;
408 }
409 };
410 let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
411
412 if r.is_err() {
413 error!("Failed to restore database: {:?}", r);
414 std::process::exit(1);
415 }
416 info!("Database loaded successfully");
417
418 reindex_inner(be, schema, config).await;
419
420 info!("✅ Restore Success!");
421}
422
423pub async fn reindex_server_core(config: &Configuration) {
424 info!("Start Index Phase 1 ...");
425 let schema = match Schema::new() {
427 Ok(s) => s,
428 Err(e) => {
429 error!("Failed to setup in memory schema: {:?}", e);
430 std::process::exit(1);
431 }
432 };
433
434 let be = match setup_backend(config, &schema) {
435 Ok(be) => be,
436 Err(e) => {
437 error!("Failed to setup BE: {:?}", e);
438 return;
439 }
440 };
441
442 reindex_inner(be, schema, config).await;
443
444 info!("✅ Reindex Success!");
445}
446
447async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
448 let mut be_wr_txn = match be.write() {
450 Ok(txn) => txn,
451 Err(err) => {
452 error!(
453 ?err,
454 "Unable to proceed, backend write transaction failure."
455 );
456 return;
457 }
458 };
459
460 let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
461
462 if r.is_err() {
464 error!("Failed to reindex database: {:?}", r);
465 std::process::exit(1);
466 }
467 info!("Index Phase 1 Success!");
468
469 info!("Attempting to init query server ...");
470
471 let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
472 Ok(t) => t,
473 Err(e) => {
474 error!("Unable to setup query server or idm server -> {:?}", e);
475 return;
476 }
477 };
478 info!("Init Query Server Success!");
479
480 info!("Start Index Phase 2 ...");
481
482 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
483 error!("Unable to acquire write transaction");
484 return;
485 };
486 let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
487
488 match r {
489 Ok(_) => info!("Index Phase 2 Success!"),
490 Err(e) => {
491 error!("Reindex failed: {:?}", e);
492 std::process::exit(1);
493 }
494 };
495}
496
497pub fn vacuum_server_core(config: &Configuration) {
498 let schema = match Schema::new() {
499 Ok(s) => s,
500 Err(e) => {
501 eprintln!("Failed to setup in memory schema: {:?}", e);
502 std::process::exit(1);
503 }
504 };
505
506 let r = setup_backend_vacuum(config, &schema, true);
509
510 match r {
511 Ok(_) => eprintln!("Vacuum Success!"),
512 Err(e) => {
513 eprintln!("Vacuum failed: {:?}", e);
514 std::process::exit(1);
515 }
516 };
517}
518
519pub async fn domain_rename_core(config: &Configuration) {
520 let schema = match Schema::new() {
521 Ok(s) => s,
522 Err(e) => {
523 eprintln!("Failed to setup in memory schema: {:?}", e);
524 std::process::exit(1);
525 }
526 };
527
528 let be = match setup_backend(config, &schema) {
530 Ok(be) => be,
531 Err(e) => {
532 error!("Failed to setup BE: {:?}", e);
533 return;
534 }
535 };
536
537 let qs = match setup_qs(be, schema, config).await {
539 Ok(t) => t,
540 Err(e) => {
541 error!("Unable to setup query server -> {:?}", e);
542 return;
543 }
544 };
545
546 let new_domain_name = config.domain.as_str();
547
548 match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
550 Ok(old_domain_name) => {
551 admin_info!(?old_domain_name, ?new_domain_name);
552 if old_domain_name == new_domain_name {
553 admin_info!("Domain name not changing, stopping.");
554 return;
555 }
556 admin_debug!(
557 "Domain name is changing from {:?} to {:?}",
558 old_domain_name,
559 new_domain_name
560 );
561 }
562 Err(e) => {
563 admin_error!("Failed to query domain name, quitting! -> {:?}", e);
564 return;
565 }
566 }
567
568 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
569 error!("Unable to acquire write transaction");
570 return;
571 };
572 let r = qs_write
573 .danger_domain_rename(new_domain_name)
574 .and_then(|_| qs_write.commit());
575
576 match r {
577 Ok(_) => info!("Domain Rename Success!"),
578 Err(e) => {
579 error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
580 std::process::exit(1);
581 }
582 };
583}
584
585pub async fn verify_server_core(config: &Configuration) {
586 let curtime = duration_from_epoch_now();
587 let schema_mem = match Schema::new() {
589 Ok(sc) => sc,
590 Err(e) => {
591 error!("Failed to setup in memory schema: {:?}", e);
592 return;
593 }
594 };
595 let be = match setup_backend(config, &schema_mem) {
597 Ok(be) => be,
598 Err(e) => {
599 error!("Failed to setup BE: {:?}", e);
600 return;
601 }
602 };
603
604 let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
605 Ok(qs) => qs,
606 Err(err) => {
607 error!(?err, "Failed to setup query server");
608 return;
609 }
610 };
611
612 let r = server.verify().await;
614
615 if r.is_empty() {
616 eprintln!("Verification passed!");
617 std::process::exit(0);
618 } else {
619 for er in r {
620 error!("{:?}", er);
621 }
622 std::process::exit(1);
623 }
624
625 }
627
628pub fn cert_generate_core(config: &Configuration) {
629 let (tls_key_path, tls_chain_path) = match &config.tls_config {
632 Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
633 None => {
634 error!("Unable to find TLS configuration");
635 std::process::exit(1);
636 }
637 };
638
639 if tls_key_path.exists() && tls_chain_path.exists() {
640 info!(
641 "TLS key and chain already exist - remove them first if you intend to regenerate these"
642 );
643 return;
644 }
645
646 let origin_domain = match config.origin.domain() {
647 Some(val) => val,
648 None => {
649 error!("origin does not contain a valid domain");
650 std::process::exit(1);
651 }
652 };
653
654 let cert_root = match tls_key_path.parent() {
655 Some(parent) => parent,
656 None => {
657 error!("Unable to find parent directory of {:?}", tls_key_path);
658 std::process::exit(1);
659 }
660 };
661
662 let ca_cert = cert_root.join("ca.pem");
663 let ca_key = cert_root.join("cakey.pem");
664 let tls_cert_path = cert_root.join("cert.pem");
665
666 let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
667 let ca_handle = match crypto::build_ca(None) {
669 Ok(ca_handle) => ca_handle,
670 Err(e) => {
671 error!(err = ?e, "Failed to build CA");
672 std::process::exit(1);
673 }
674 };
675
676 if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
677 error!("Failed to write CA");
678 std::process::exit(1);
679 }
680
681 ca_handle
682 } else {
683 match crypto::load_ca(ca_key, ca_cert) {
684 Ok(ca_handle) => ca_handle,
685 Err(_) => {
686 error!("Failed to load CA");
687 std::process::exit(1);
688 }
689 }
690 };
691
692 if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
693 let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
695 Ok(cert_handle) => cert_handle,
696 Err(e) => {
697 error!(err = ?e, "Failed to build certificate");
698 std::process::exit(1);
699 }
700 };
701
702 if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
703 error!("Failed to write certificates");
704 std::process::exit(1);
705 }
706 }
707 info!("certificate generation complete");
708}
709
710#[derive(Clone, Debug)]
711pub enum CoreAction {
712 Shutdown,
713}
714
715pub(crate) enum TaskName {
716 AdminSocket,
717 AuditdActor,
718 BackupActor,
719 DelayedActionActor,
720 HttpsServer,
721 IntervalActor,
722 LdapActor,
723 Replication,
724 TlsAcceptorReload,
725}
726
727impl Display for TaskName {
728 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
729 write!(
730 f,
731 "{}",
732 match self {
733 TaskName::AdminSocket => "Admin Socket",
734 TaskName::AuditdActor => "Auditd Actor",
735 TaskName::BackupActor => "Backup Actor",
736 TaskName::DelayedActionActor => "Delayed Action Actor",
737 TaskName::HttpsServer => "HTTPS Server",
738 TaskName::IntervalActor => "Interval Actor",
739 TaskName::LdapActor => "LDAP Acceptor Actor",
740 TaskName::Replication => "Replication",
741 TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
742 }
743 )
744 }
745}
746
747pub struct CoreHandle {
748 clean_shutdown: bool,
749 tx: broadcast::Sender<CoreAction>,
750 tls_acceptor_reload_notify: Arc<Notify>,
751 handles: Vec<(TaskName, task::JoinHandle<()>)>,
753}
754
755impl CoreHandle {
756 pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
757 self.tx.subscribe()
758 }
759
760 pub async fn shutdown(&mut self) {
761 if self.tx.send(CoreAction::Shutdown).is_err() {
762 eprintln!("No receivers acked shutdown request. Treating as unclean.");
763 return;
764 }
765
766 while let Some((handle_name, handle)) = self.handles.pop() {
768 if let Err(error) = handle.await {
769 eprintln!("Task {} failed to finish: {:?}", handle_name, error);
770 }
771 }
772
773 self.clean_shutdown = true;
774 }
775
776 pub async fn tls_acceptor_reload(&mut self) {
777 self.tls_acceptor_reload_notify.notify_one()
778 }
779}
780
781impl Drop for CoreHandle {
782 fn drop(&mut self) {
783 if !self.clean_shutdown {
784 eprintln!("⚠️ UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
785 }
786 }
789}
790
791pub async fn create_server_core(
792 config: Configuration,
793 config_test: bool,
794) -> Result<CoreHandle, ()> {
795 let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
797
798 if config.integration_test_config.is_some() {
799 warn!("RUNNING IN INTEGRATION TEST MODE.");
800 warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
801 } else if config.tls_config.is_none() {
802 error!("Running without TLS is not supported! Quitting!");
804 return Err(());
805 }
806
807 info!(
808 "Starting kanidm with {}configuration: {}",
809 if config_test { "TEST " } else { "" },
810 config
811 );
812 #[cfg(not(target_family = "windows"))]
814 unsafe {
815 umask(0o0027)
816 };
817
818 let status_ref = StatusActor::start();
821
822 let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
824 Ok(tls_acc) => tls_acc,
825 Err(err) => {
826 error!(?err, "Failed to configure TLS acceptor");
827 return Err(());
828 }
829 };
830
831 let schema = match Schema::new() {
832 Ok(s) => s,
833 Err(e) => {
834 error!("Failed to setup in memory schema: {:?}", e);
835 return Err(());
836 }
837 };
838
839 let be = match setup_backend(&config, &schema) {
841 Ok(be) => be,
842 Err(e) => {
843 error!("Failed to setup BE -> {:?}", e);
844 return Err(());
845 }
846 };
847 let (_qs, idms, mut idms_delayed, mut idms_audit) =
849 match setup_qs_idms(be, schema, &config).await {
850 Ok(t) => t,
851 Err(e) => {
852 error!("Unable to setup query server or idm server -> {:?}", e);
853 return Err(());
854 }
855 };
856
857 let jws_signer = match JwsHs256Signer::generate_hs256() {
860 Ok(k) => k.set_sign_option_embed_kid(false),
861 Err(e) => {
862 error!("Unable to setup jws signer -> {:?}", e);
863 return Err(());
864 }
865 };
866
867 if let Some(itc) = &config.integration_test_config {
869 let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
870 error!("Unable to acquire write transaction");
871 return Err(());
872 };
873 match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
875 Ok(_) => {}
876 Err(e) => {
877 error!(
878 "Unable to configure INTEGRATION TEST {} account -> {:?}",
879 &itc.admin_user, e
880 );
881 return Err(());
882 }
883 };
884 match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
886 Ok(_) => {}
887 Err(e) => {
888 error!(
889 "Unable to configure INTEGRATION TEST {} account -> {:?}",
890 &itc.idm_admin_user, e
891 );
892 return Err(());
893 }
894 };
895
896 match idms_prox_write.qs_write.internal_modify_uuid(
900 UUID_IDM_ADMINS,
901 &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
902 ) {
903 Ok(_) => {}
904 Err(e) => {
905 error!(
906 "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
907 e
908 );
909 return Err(());
910 }
911 };
912
913 match idms_prox_write.qs_write.internal_modify_uuid(
914 UUID_IDM_ALL_PERSONS,
915 &ModifyList::new_purge_and_set(
916 Attribute::CredentialTypeMinimum,
917 CredentialType::Any.into(),
918 ),
919 ) {
920 Ok(_) => {}
921 Err(e) => {
922 error!(
923 "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
924 e
925 );
926 return Err(());
927 }
928 };
929
930 match idms_prox_write.commit() {
931 Ok(_) => {}
932 Err(e) => {
933 error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
934 return Err(());
935 }
936 }
937 }
938
939 let ldap = match LdapServer::new(&idms).await {
940 Ok(l) => l,
941 Err(e) => {
942 error!("Unable to start LdapServer -> {:?}", e);
943 return Err(());
944 }
945 };
946
947 let idms_arc = Arc::new(idms);
949 let ldap_arc = Arc::new(ldap);
950
951 let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
954
955 let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
957
958 let delayed_handle = task::spawn(async move {
959 let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
960 loop {
961 tokio::select! {
962 added = idms_delayed.recv_many(&mut buffer) => {
963 if added == 0 {
964 break
966 }
967 server_write_ref.handle_delayedaction(&mut buffer).await;
968 }
969 Ok(action) = broadcast_rx.recv() => {
970 match action {
971 CoreAction::Shutdown => break,
972 }
973 }
974 }
975 }
976 info!("Stopped {}", TaskName::DelayedActionActor);
977 });
978
979 let mut broadcast_rx = broadcast_tx.subscribe();
980
981 let auditd_handle = task::spawn(async move {
982 loop {
983 tokio::select! {
984 Ok(action) = broadcast_rx.recv() => {
985 match action {
986 CoreAction::Shutdown => break,
987 }
988 }
989 audit_event = idms_audit.audit_rx().recv() => {
990 match serde_json::to_string(&audit_event) {
991 Ok(audit_event) => {
992 warn!(%audit_event);
993 }
994 Err(e) => {
995 error!(err=?e, "Unable to process audit event to json.");
996 warn!(?audit_event, json=false);
997 }
998 }
999
1000 }
1001 }
1002 }
1003 info!("Stopped {}", TaskName::AuditdActor);
1004 });
1005
1006 let mut broadcast_rx = broadcast_tx.subscribe();
1009 let tls_acceptor_reload_notify = Arc::new(Notify::new());
1010 let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1011 let tls_config = config.tls_config.clone();
1012
1013 let ldap_configured = config.ldapbindaddress.is_some();
1014 let (ldap_tls_acceptor_reload_tx, ldap_tls_acceptor_reload_rx) = mpsc::channel(1);
1015 let (http_tls_acceptor_reload_tx, http_tls_acceptor_reload_rx) = mpsc::channel(1);
1016
1017 let tls_acceptor_reload_handle = task::spawn(async move {
1018 loop {
1019 tokio::select! {
1020 Ok(action) = broadcast_rx.recv() => {
1021 match action {
1022 CoreAction::Shutdown => break,
1023 }
1024 }
1025 _ = tls_accepter_reload_task_notify.notified() => {
1026 let tls_acceptor = match crypto::setup_tls(&tls_config) {
1027 Ok(Some(tls_acc)) => tls_acc,
1028 Ok(None) => {
1029 warn!("TLS not configured, ignoring reload request.");
1030 continue;
1031 }
1032 Err(err) => {
1033 error!(?err, "Failed to configure and reload TLS acceptor");
1034 continue;
1035 }
1036 };
1037
1038 if ldap_configured &&
1041 ldap_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1042 error!("ldap tls acceptor did not accept the reload, the server may have failed!");
1043 };
1044 if http_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1045 error!("http tls acceptor did not accept the reload, the server may have failed!");
1046 break;
1047 };
1048 }
1049 }
1050 }
1051 info!("Stopped {}", TaskName::TlsAcceptorReload);
1052 });
1053
1054 let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1056 let maybe_backup_handle = match &config.online_backup {
1058 Some(online_backup_config) => {
1059 if online_backup_config.enabled {
1060 let handle = IntervalActor::start_online_backup(
1061 server_read_ref,
1062 online_backup_config,
1063 broadcast_tx.subscribe(),
1064 )?;
1065 Some(handle)
1066 } else {
1067 debug!("Backups disabled");
1068 None
1069 }
1070 }
1071 None => {
1072 debug!("Online backup not requested, skipping");
1073 None
1074 }
1075 };
1076
1077 let maybe_ldap_acceptor_handle = match &config.ldapbindaddress {
1079 Some(la) => {
1080 let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1081
1082 let h = ldaps::create_ldap_server(
1083 la.as_str(),
1084 opt_ldap_ssl_acceptor,
1085 server_read_ref,
1086 broadcast_tx.subscribe(),
1087 ldap_tls_acceptor_reload_rx,
1088 config.ldap_client_address_info.trusted_proxy_v2(),
1089 )
1090 .await?;
1091 Some(h)
1092 }
1093 None => {
1094 debug!("LDAP not requested, skipping");
1095 None
1096 }
1097 };
1098
1099 let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1102 Some(rc) => {
1103 if !config_test {
1104 let (h, repl_ctrl_tx) =
1106 repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1107 .await?;
1108 (Some(h), Some(repl_ctrl_tx))
1109 } else {
1110 (None, None)
1111 }
1112 }
1113 None => {
1114 debug!("Replication not requested, skipping");
1115 (None, None)
1116 }
1117 };
1118
1119 let maybe_http_acceptor_handle = if config_test {
1120 admin_info!("This config rocks! 🪨 ");
1121 None
1122 } else {
1123 let h: task::JoinHandle<()> = match https::create_https_server(
1124 config.clone(),
1125 jws_signer,
1126 status_ref,
1127 server_write_ref,
1128 server_read_ref,
1129 broadcast_tx.clone(),
1130 maybe_tls_acceptor,
1131 http_tls_acceptor_reload_rx,
1132 )
1133 .await
1134 {
1135 Ok(h) => h,
1136 Err(e) => {
1137 error!("Failed to start HTTPS server -> {:?}", e);
1138 return Err(());
1139 }
1140 };
1141 if config.role != ServerRole::WriteReplicaNoUI {
1142 admin_info!("ready to rock! 🪨 UI available at: {}", config.origin);
1143 } else {
1144 admin_info!("ready to rock! 🪨 ");
1145 }
1146 Some(h)
1147 };
1148
1149 let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1151 let broadcast_rx = broadcast_tx.subscribe();
1152
1153 let admin_handle = AdminActor::create_admin_sock(
1154 config.adminbindpath.as_str(),
1155 server_write_ref,
1156 server_read_ref,
1157 broadcast_rx,
1158 maybe_repl_ctrl_tx,
1159 )
1160 .await?;
1161
1162 Some(admin_handle)
1163 } else {
1164 None
1165 };
1166
1167 let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1168 (TaskName::IntervalActor, interval_handle),
1169 (TaskName::DelayedActionActor, delayed_handle),
1170 (TaskName::AuditdActor, auditd_handle),
1171 (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1172 ];
1173
1174 if let Some(backup_handle) = maybe_backup_handle {
1175 handles.push((TaskName::BackupActor, backup_handle))
1176 }
1177
1178 if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1179 handles.push((TaskName::AdminSocket, admin_sock_handle))
1180 }
1181
1182 if let Some(ldap_handle) = maybe_ldap_acceptor_handle {
1183 handles.push((TaskName::LdapActor, ldap_handle))
1184 }
1185
1186 if let Some(http_handle) = maybe_http_acceptor_handle {
1187 handles.push((TaskName::HttpsServer, http_handle))
1188 }
1189
1190 if let Some(repl_handle) = maybe_repl_handle {
1191 handles.push((TaskName::Replication, repl_handle))
1192 }
1193
1194 Ok(CoreHandle {
1195 clean_shutdown: false,
1196 tls_acceptor_reload_notify,
1197 tx: broadcast_tx,
1198 handles,
1199 })
1200}