1#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23#![deny(clippy::indexing_slicing)]
24
25#[macro_use]
26extern crate tracing;
27#[macro_use]
28extern crate kanidmd_lib;
29
30mod actors;
31pub mod admin;
32pub mod config;
33mod crypto;
34mod https;
35mod interval;
36mod ldaps;
37mod repl;
38mod tcp;
39mod utils;
40
41use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
42use crate::admin::AdminActor;
43use crate::config::Configuration;
44use crate::interval::IntervalActor;
45use crate::utils::touch_file_or_quit;
46use compact_jwt::{JwsHs256Signer, JwsSigner};
47use kanidm_proto::backup::BackupCompression;
48use kanidm_proto::config::ServerRole;
49use kanidm_proto::internal::OperationError;
50use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
51use kanidmd_lib::idm::ldap::LdapServer;
52use kanidmd_lib::prelude::*;
53use kanidmd_lib::schema::Schema;
54use kanidmd_lib::status::StatusActor;
55use kanidmd_lib::value::CredentialType;
56#[cfg(not(target_family = "windows"))]
57use libc::umask;
58use std::fmt::{Display, Formatter};
59use std::path::Path;
60use std::sync::Arc;
61use tokio::sync::broadcast;
62use tokio::sync::Notify;
63use tokio::task;
64
65fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
68 setup_backend_vacuum(config, schema, false)
69}
70
71fn setup_backend_vacuum(
72 config: &Configuration,
73 schema: &Schema,
74 vacuum: bool,
75) -> Result<Backend, OperationError> {
76 let schema_txn = schema.write();
79 let idxmeta = schema_txn.reload_idxmeta();
80
81 let pool_size: u32 = config.threads as u32;
82
83 let cfg = BackendConfig::new(
84 config.db_path.as_deref(),
85 pool_size,
86 config.db_fs_type.unwrap_or_default(),
87 config.db_arc_size,
88 );
89
90 Backend::new(cfg, idxmeta, vacuum)
91}
92
93async fn setup_qs_idms(
98 be: Backend,
99 schema: Schema,
100 config: &Configuration,
101) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
102 let curtime = duration_from_epoch_now();
103 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
105
106 query_server
115 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
116 .await?;
117
118 let is_integration_test = config.integration_test_config.is_some();
120 let (idms, idms_delayed, idms_audit) = IdmServer::new(
121 query_server.clone(),
122 &config.origin,
123 is_integration_test,
124 curtime,
125 )
126 .await?;
127
128 Ok((query_server, idms, idms_delayed, idms_audit))
129}
130
131async fn setup_qs(
132 be: Backend,
133 schema: Schema,
134 config: &Configuration,
135) -> Result<QueryServer, OperationError> {
136 let curtime = duration_from_epoch_now();
137 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
139
140 query_server
149 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
150 .await?;
151
152 Ok(query_server)
153}
154
155macro_rules! dbscan_setup_be {
156 (
157 $config:expr
158 ) => {{
159 let schema = match Schema::new() {
160 Ok(s) => s,
161 Err(e) => {
162 error!("Failed to setup in memory schema: {:?}", e);
163 std::process::exit(1);
164 }
165 };
166
167 match setup_backend($config, &schema) {
168 Ok(be) => be,
169 Err(e) => {
170 error!("Failed to setup BE: {:?}", e);
171 return;
172 }
173 }
174 }};
175}
176
177pub fn dbscan_list_indexes_core(config: &Configuration) {
178 let be = dbscan_setup_be!(config);
179 let mut be_rotxn = match be.read() {
180 Ok(txn) => txn,
181 Err(err) => {
182 error!(?err, "Unable to proceed, backend read transaction failure.");
183 return;
184 }
185 };
186
187 match be_rotxn.list_indexes() {
188 Ok(mut idx_list) => {
189 idx_list.sort_unstable();
190 idx_list.iter().for_each(|idx_name| {
191 println!("{idx_name}");
192 })
193 }
194 Err(e) => {
195 error!("Failed to retrieve index list: {:?}", e);
196 }
197 };
198}
199
200pub fn dbscan_list_id2entry_core(config: &Configuration) {
201 let be = dbscan_setup_be!(config);
202 let mut be_rotxn = match be.read() {
203 Ok(txn) => txn,
204 Err(err) => {
205 error!(?err, "Unable to proceed, backend read transaction failure.");
206 return;
207 }
208 };
209
210 match be_rotxn.list_id2entry() {
211 Ok(mut id_list) => {
212 id_list.sort_unstable_by_key(|k| k.0);
213 id_list.iter().for_each(|(id, value)| {
214 println!("{id:>8}: {value}");
215 })
216 }
217 Err(e) => {
218 error!("Failed to retrieve id2entry list: {:?}", e);
219 }
220 };
221}
222
223pub fn dbscan_list_index_analysis_core(config: &Configuration) {
224 let _be = dbscan_setup_be!(config);
225 }
227
228pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
229 let be = dbscan_setup_be!(config);
230 let mut be_rotxn = match be.read() {
231 Ok(txn) => txn,
232 Err(err) => {
233 error!(?err, "Unable to proceed, backend read transaction failure.");
234 return;
235 }
236 };
237
238 match be_rotxn.list_index_content(index_name) {
239 Ok(mut idx_list) => {
240 idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
241 idx_list.iter().for_each(|(key, value)| {
242 println!("{key:>50}: {value:?}");
243 })
244 }
245 Err(e) => {
246 error!("Failed to retrieve index list: {:?}", e);
247 }
248 };
249}
250
251pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
252 let be = dbscan_setup_be!(config);
253 let mut be_rotxn = match be.read() {
254 Ok(txn) => txn,
255 Err(err) => {
256 error!(?err, "Unable to proceed, backend read transaction failure.");
257 return;
258 }
259 };
260
261 match be_rotxn.get_id2entry(id) {
262 Ok((id, value)) => println!("{id:>8}: {value}"),
263 Err(e) => {
264 error!("Failed to retrieve id2entry value: {:?}", e);
265 }
266 };
267}
268
269pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
270 let be = dbscan_setup_be!(config);
271 let mut be_wrtxn = match be.write() {
272 Ok(txn) => txn,
273 Err(err) => {
274 error!(
275 ?err,
276 "Unable to proceed, backend write transaction failure."
277 );
278 return;
279 }
280 };
281
282 match be_wrtxn
283 .quarantine_entry(id)
284 .and_then(|_| be_wrtxn.commit())
285 {
286 Ok(()) => {
287 println!("quarantined - {id:>8}")
288 }
289 Err(e) => {
290 error!("Failed to quarantine id2entry value: {:?}", e);
291 }
292 };
293}
294
295pub fn dbscan_list_quarantined_core(config: &Configuration) {
296 let be = dbscan_setup_be!(config);
297 let mut be_rotxn = match be.read() {
298 Ok(txn) => txn,
299 Err(err) => {
300 error!(?err, "Unable to proceed, backend read transaction failure.");
301 return;
302 }
303 };
304
305 match be_rotxn.list_quarantined() {
306 Ok(mut id_list) => {
307 id_list.sort_unstable_by_key(|k| k.0);
308 id_list.iter().for_each(|(id, value)| {
309 println!("{id:>8}: {value}");
310 })
311 }
312 Err(e) => {
313 error!("Failed to retrieve id2entry list: {:?}", e);
314 }
315 };
316}
317
318pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
319 let be = dbscan_setup_be!(config);
320 let mut be_wrtxn = match be.write() {
321 Ok(txn) => txn,
322 Err(err) => {
323 error!(
324 ?err,
325 "Unable to proceed, backend write transaction failure."
326 );
327 return;
328 }
329 };
330
331 match be_wrtxn
332 .restore_quarantined(id)
333 .and_then(|_| be_wrtxn.commit())
334 {
335 Ok(()) => {
336 println!("restored - {id:>8}")
337 }
338 Err(e) => {
339 error!("Failed to restore quarantined id2entry value: {:?}", e);
340 }
341 };
342}
343
344pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
345 let schema = match Schema::new() {
346 Ok(s) => s,
347 Err(e) => {
348 error!("Failed to setup in memory schema: {:?}", e);
349 std::process::exit(1);
350 }
351 };
352
353 let be = match setup_backend(config, &schema) {
354 Ok(be) => be,
355 Err(e) => {
356 error!("Failed to setup BE: {:?}", e);
357 return;
358 }
359 };
360
361 let mut be_ro_txn = match be.read() {
362 Ok(txn) => txn,
363 Err(err) => {
364 error!(?err, "Unable to proceed, backend read transaction failure.");
365 return;
366 }
367 };
368
369 let compression = match config.online_backup.as_ref() {
370 Some(backup_config) => backup_config.compression,
371 None => BackupCompression::default(),
372 };
373
374 match be_ro_txn.backup(dst_path, compression) {
375 Ok(_) => info!("Backup success!"),
376 Err(e) => {
377 error!("Backup failed: {:?}", e);
378 std::process::exit(1);
379 }
380 };
381 }
383
384pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
385 if let Some(db_path) = config.db_path.as_ref() {
387 touch_file_or_quit(db_path);
388 }
389
390 let schema = match Schema::new() {
392 Ok(s) => s,
393 Err(e) => {
394 error!("Failed to setup in memory schema: {:?}", e);
395 std::process::exit(1);
396 }
397 };
398
399 let be = match setup_backend(config, &schema) {
400 Ok(be) => be,
401 Err(e) => {
402 error!("Failed to setup backend: {:?}", e);
403 return;
404 }
405 };
406
407 let mut be_wr_txn = match be.write() {
408 Ok(txn) => txn,
409 Err(err) => {
410 error!(
411 ?err,
412 "Unable to proceed, backend write transaction failure."
413 );
414 return;
415 }
416 };
417 let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
418
419 if r.is_err() {
420 error!("Failed to restore database: {:?}", r);
421 std::process::exit(1);
422 }
423 info!("Database loaded successfully");
424
425 reindex_inner(be, schema, config).await;
426
427 info!("✅ Restore Success!");
428}
429
430pub async fn reindex_server_core(config: &Configuration) {
431 info!("Start Index Phase 1 ...");
432 let schema = match Schema::new() {
434 Ok(s) => s,
435 Err(e) => {
436 error!("Failed to setup in memory schema: {:?}", e);
437 std::process::exit(1);
438 }
439 };
440
441 let be = match setup_backend(config, &schema) {
442 Ok(be) => be,
443 Err(e) => {
444 error!("Failed to setup BE: {:?}", e);
445 return;
446 }
447 };
448
449 reindex_inner(be, schema, config).await;
450
451 info!("✅ Reindex Success!");
452}
453
454async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
455 let mut be_wr_txn = match be.write() {
457 Ok(txn) => txn,
458 Err(err) => {
459 error!(
460 ?err,
461 "Unable to proceed, backend write transaction failure."
462 );
463 return;
464 }
465 };
466
467 let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
468
469 if r.is_err() {
471 error!("Failed to reindex database: {:?}", r);
472 std::process::exit(1);
473 }
474 info!("Index Phase 1 Success!");
475
476 info!("Attempting to init query server ...");
477
478 let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
479 Ok(t) => t,
480 Err(e) => {
481 error!("Unable to setup query server or idm server -> {:?}", e);
482 return;
483 }
484 };
485 info!("Init Query Server Success!");
486
487 info!("Start Index Phase 2 ...");
488
489 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
490 error!("Unable to acquire write transaction");
491 return;
492 };
493 let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
494
495 match r {
496 Ok(_) => info!("Index Phase 2 Success!"),
497 Err(e) => {
498 error!("Reindex failed: {:?}", e);
499 std::process::exit(1);
500 }
501 };
502}
503
504pub fn vacuum_server_core(config: &Configuration) {
505 let schema = match Schema::new() {
506 Ok(s) => s,
507 Err(e) => {
508 eprintln!("Failed to setup in memory schema: {e:?}");
509 std::process::exit(1);
510 }
511 };
512
513 let r = setup_backend_vacuum(config, &schema, true);
516
517 match r {
518 Ok(_) => eprintln!("Vacuum Success!"),
519 Err(e) => {
520 eprintln!("Vacuum failed: {e:?}");
521 std::process::exit(1);
522 }
523 };
524}
525
526pub async fn domain_rename_core(config: &Configuration) {
527 let schema = match Schema::new() {
528 Ok(s) => s,
529 Err(e) => {
530 eprintln!("Failed to setup in memory schema: {e:?}");
531 std::process::exit(1);
532 }
533 };
534
535 let be = match setup_backend(config, &schema) {
537 Ok(be) => be,
538 Err(e) => {
539 error!("Failed to setup BE: {:?}", e);
540 return;
541 }
542 };
543
544 let qs = match setup_qs(be, schema, config).await {
546 Ok(t) => t,
547 Err(e) => {
548 error!("Unable to setup query server -> {:?}", e);
549 return;
550 }
551 };
552
553 let new_domain_name = config.domain.as_str();
554
555 match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
557 Ok(old_domain_name) => {
558 admin_info!(?old_domain_name, ?new_domain_name);
559 if old_domain_name == new_domain_name {
560 admin_info!("Domain name not changing, stopping.");
561 return;
562 }
563 admin_debug!(
564 "Domain name is changing from {:?} to {:?}",
565 old_domain_name,
566 new_domain_name
567 );
568 }
569 Err(e) => {
570 admin_error!("Failed to query domain name, quitting! -> {:?}", e);
571 return;
572 }
573 }
574
575 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
576 error!("Unable to acquire write transaction");
577 return;
578 };
579 let r = qs_write
580 .danger_domain_rename(new_domain_name)
581 .and_then(|_| qs_write.commit());
582
583 match r {
584 Ok(_) => info!("Domain Rename Success!"),
585 Err(e) => {
586 error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
587 std::process::exit(1);
588 }
589 };
590}
591
592pub async fn verify_server_core(config: &Configuration) {
593 let curtime = duration_from_epoch_now();
594 let schema_mem = match Schema::new() {
596 Ok(sc) => sc,
597 Err(e) => {
598 error!("Failed to setup in memory schema: {:?}", e);
599 return;
600 }
601 };
602 let be = match setup_backend(config, &schema_mem) {
604 Ok(be) => be,
605 Err(e) => {
606 error!("Failed to setup BE: {:?}", e);
607 return;
608 }
609 };
610
611 let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
612 Ok(qs) => qs,
613 Err(err) => {
614 error!(?err, "Failed to setup query server");
615 return;
616 }
617 };
618
619 let r = server.verify().await;
621
622 if r.is_empty() {
623 eprintln!("Verification passed!");
624 std::process::exit(0);
625 } else {
626 for er in r {
627 error!("{:?}", er);
628 }
629 std::process::exit(1);
630 }
631
632 }
634
635pub fn cert_generate_core(config: &Configuration) {
636 let (tls_key_path, tls_chain_path) = match &config.tls_config {
639 Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
640 None => {
641 error!("Unable to find TLS configuration");
642 std::process::exit(1);
643 }
644 };
645
646 if tls_key_path.exists() && tls_chain_path.exists() {
647 info!(
648 "TLS key and chain already exist - remove them first if you intend to regenerate these"
649 );
650 return;
651 }
652
653 let origin_domain = match config.origin.domain() {
654 Some(val) => val,
655 None => {
656 error!("origin does not contain a valid domain");
657 std::process::exit(1);
658 }
659 };
660
661 let cert_root = match tls_key_path.parent() {
662 Some(parent) => parent,
663 None => {
664 error!("Unable to find parent directory of {:?}", tls_key_path);
665 std::process::exit(1);
666 }
667 };
668
669 let ca_cert = cert_root.join("ca.pem");
670 let ca_key = cert_root.join("cakey.pem");
671 let tls_cert_path = cert_root.join("cert.pem");
672
673 let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
674 let ca_handle = match crypto::build_ca(None) {
676 Ok(ca_handle) => ca_handle,
677 Err(e) => {
678 error!(err = ?e, "Failed to build CA");
679 std::process::exit(1);
680 }
681 };
682
683 if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
684 error!("Failed to write CA");
685 std::process::exit(1);
686 }
687
688 ca_handle
689 } else {
690 match crypto::load_ca(ca_key, ca_cert) {
691 Ok(ca_handle) => ca_handle,
692 Err(_) => {
693 error!("Failed to load CA");
694 std::process::exit(1);
695 }
696 }
697 };
698
699 if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
700 let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
702 Ok(cert_handle) => cert_handle,
703 Err(e) => {
704 error!(err = ?e, "Failed to build certificate");
705 std::process::exit(1);
706 }
707 };
708
709 if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
710 error!("Failed to write certificates");
711 std::process::exit(1);
712 }
713 }
714 info!("certificate generation complete");
715}
716
717#[derive(Clone, Debug)]
718pub enum CoreAction {
719 Shutdown,
720}
721
722pub(crate) enum TaskName {
723 AdminSocket,
724 AuditdActor,
725 BackupActor,
726 DelayedActionActor,
727 HttpsServer,
728 IntervalActor,
729 LdapActor,
730 Replication,
731 TlsAcceptorReload,
732}
733
734impl Display for TaskName {
735 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
736 write!(
737 f,
738 "{}",
739 match self {
740 TaskName::AdminSocket => "Admin Socket",
741 TaskName::AuditdActor => "Auditd Actor",
742 TaskName::BackupActor => "Backup Actor",
743 TaskName::DelayedActionActor => "Delayed Action Actor",
744 TaskName::HttpsServer => "HTTPS Server",
745 TaskName::IntervalActor => "Interval Actor",
746 TaskName::LdapActor => "LDAP Acceptor Actor",
747 TaskName::Replication => "Replication",
748 TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
749 }
750 )
751 }
752}
753
754pub struct CoreHandle {
755 clean_shutdown: bool,
756 tx: broadcast::Sender<CoreAction>,
757 tls_acceptor_reload_notify: Arc<Notify>,
758 handles: Vec<(TaskName, task::JoinHandle<()>)>,
760}
761
762impl CoreHandle {
763 pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
764 self.tx.subscribe()
765 }
766
767 pub async fn shutdown(&mut self) {
768 if self.tx.send(CoreAction::Shutdown).is_err() {
769 eprintln!("No receivers acked shutdown request. Treating as unclean.");
770 return;
771 }
772
773 while let Some((handle_name, handle)) = self.handles.pop() {
775 if let Err(error) = handle.await {
776 eprintln!("Task {handle_name} failed to finish: {error:?}");
777 }
778 }
779
780 self.clean_shutdown = true;
781 }
782
783 pub async fn tls_acceptor_reload(&mut self) {
784 self.tls_acceptor_reload_notify.notify_one()
785 }
786}
787
788impl Drop for CoreHandle {
789 fn drop(&mut self) {
790 if !self.clean_shutdown {
791 eprintln!("⚠️ UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
792 }
793 }
796}
797
798pub async fn create_server_core(
799 config: Configuration,
800 config_test: bool,
801) -> Result<CoreHandle, ()> {
802 let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
804
805 if config.integration_test_config.is_some() {
806 warn!("RUNNING IN INTEGRATION TEST MODE.");
807 warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
808 } else if config.tls_config.is_none() {
809 error!("Running without TLS is not supported! Quitting!");
811 return Err(());
812 }
813
814 info!(
815 "Starting kanidm with {}configuration: {}",
816 if config_test { "TEST " } else { "" },
817 config
818 );
819 #[cfg(not(target_family = "windows"))]
821 unsafe {
822 umask(0o0027)
823 };
824
825 let status_ref = StatusActor::start();
828
829 let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
831 Ok(tls_acc) => tls_acc,
832 Err(err) => {
833 error!(?err, "Failed to configure TLS acceptor");
834 return Err(());
835 }
836 };
837
838 let schema = match Schema::new() {
839 Ok(s) => s,
840 Err(e) => {
841 error!("Failed to setup in memory schema: {:?}", e);
842 return Err(());
843 }
844 };
845
846 let be = match setup_backend(&config, &schema) {
848 Ok(be) => be,
849 Err(e) => {
850 error!("Failed to setup BE -> {:?}", e);
851 return Err(());
852 }
853 };
854 let (_qs, idms, mut idms_delayed, mut idms_audit) =
856 match setup_qs_idms(be, schema, &config).await {
857 Ok(t) => t,
858 Err(e) => {
859 error!("Unable to setup query server or idm server -> {:?}", e);
860 return Err(());
861 }
862 };
863
864 let jws_signer = match JwsHs256Signer::generate_hs256() {
867 Ok(k) => k.set_sign_option_embed_kid(false),
868 Err(e) => {
869 error!("Unable to setup jws signer -> {:?}", e);
870 return Err(());
871 }
872 };
873
874 if let Some(itc) = &config.integration_test_config {
876 let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
877 error!("Unable to acquire write transaction");
878 return Err(());
879 };
880 match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
882 Ok(_) => {}
883 Err(e) => {
884 error!(
885 "Unable to configure INTEGRATION TEST {} account -> {:?}",
886 &itc.admin_user, e
887 );
888 return Err(());
889 }
890 };
891 match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
893 Ok(_) => {}
894 Err(e) => {
895 error!(
896 "Unable to configure INTEGRATION TEST {} account -> {:?}",
897 &itc.idm_admin_user, e
898 );
899 return Err(());
900 }
901 };
902
903 match idms_prox_write.qs_write.internal_modify_uuid(
907 UUID_IDM_ADMINS,
908 &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
909 ) {
910 Ok(_) => {}
911 Err(e) => {
912 error!(
913 "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
914 e
915 );
916 return Err(());
917 }
918 };
919
920 match idms_prox_write.qs_write.internal_modify_uuid(
921 UUID_IDM_ALL_PERSONS,
922 &ModifyList::new_purge_and_set(
923 Attribute::CredentialTypeMinimum,
924 CredentialType::Any.into(),
925 ),
926 ) {
927 Ok(_) => {}
928 Err(e) => {
929 error!(
930 "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
931 e
932 );
933 return Err(());
934 }
935 };
936
937 match idms_prox_write.commit() {
938 Ok(_) => {}
939 Err(e) => {
940 error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
941 return Err(());
942 }
943 }
944 }
945
946 let ldap = match LdapServer::new(&idms).await {
947 Ok(l) => l,
948 Err(e) => {
949 error!("Unable to start LdapServer -> {:?}", e);
950 return Err(());
951 }
952 };
953
954 let idms_arc = Arc::new(idms);
956 let ldap_arc = Arc::new(ldap);
957
958 let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
961
962 let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
964
965 let delayed_handle = task::spawn(async move {
966 let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
967 loop {
968 tokio::select! {
969 added = idms_delayed.recv_many(&mut buffer) => {
970 if added == 0 {
971 break
973 }
974 server_write_ref.handle_delayedaction(&mut buffer).await;
975 }
976 Ok(action) = broadcast_rx.recv() => {
977 match action {
978 CoreAction::Shutdown => break,
979 }
980 }
981 }
982 }
983 info!("Stopped {}", TaskName::DelayedActionActor);
984 });
985
986 let mut broadcast_rx = broadcast_tx.subscribe();
987
988 let auditd_handle = task::spawn(async move {
989 loop {
990 tokio::select! {
991 Ok(action) = broadcast_rx.recv() => {
992 match action {
993 CoreAction::Shutdown => break,
994 }
995 }
996 audit_event = idms_audit.audit_rx().recv() => {
997 match serde_json::to_string(&audit_event) {
998 Ok(audit_event) => {
999 warn!(%audit_event);
1000 }
1001 Err(e) => {
1002 error!(err=?e, "Unable to process audit event to json.");
1003 warn!(?audit_event, json=false);
1004 }
1005 }
1006
1007 }
1008 }
1009 }
1010 info!("Stopped {}", TaskName::AuditdActor);
1011 });
1012
1013 let mut broadcast_rx = broadcast_tx.subscribe();
1016 let tls_acceptor_reload_notify = Arc::new(Notify::new());
1017 let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1018 let tls_config = config.tls_config.clone();
1019
1020 let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1021 let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1022
1023 let tls_acceptor_reload_handle = task::spawn(async move {
1024 loop {
1025 tokio::select! {
1026 Ok(action) = broadcast_rx.recv() => {
1027 match action {
1028 CoreAction::Shutdown => break,
1029 }
1030 }
1031 _ = tls_accepter_reload_task_notify.notified() => {
1032 let tls_acceptor = match crypto::setup_tls(&tls_config) {
1033 Ok(Some(tls_acc)) => tls_acc,
1034 Ok(None) => {
1035 warn!("TLS not configured, ignoring reload request.");
1036 continue;
1037 }
1038 Err(err) => {
1039 error!(?err, "Failed to configure and reload TLS acceptor");
1040 continue;
1041 }
1042 };
1043
1044 if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1047 error!("tls acceptor did not accept the reload, the server may have failed!");
1048 };
1049 info!("tls acceptor reload notification sent");
1050 }
1051 }
1052 }
1053 info!("Stopped {}", TaskName::TlsAcceptorReload);
1054 });
1055
1056 let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1058 let maybe_backup_handle = match &config.online_backup {
1060 Some(online_backup_config) => {
1061 if online_backup_config.enabled {
1062 let handle = IntervalActor::start_online_backup(
1063 server_read_ref,
1064 online_backup_config,
1065 broadcast_tx.subscribe(),
1066 )?;
1067 Some(handle)
1068 } else {
1069 debug!("Backups disabled");
1070 None
1071 }
1072 }
1073 None => {
1074 debug!("Online backup not requested, skipping");
1075 None
1076 }
1077 };
1078
1079 let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1081 Some(la) => {
1082 let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1083
1084 let h = ldaps::create_ldap_server(
1085 la,
1086 opt_ldap_ssl_acceptor,
1087 server_read_ref,
1088 &broadcast_tx,
1089 &tls_acceptor_reload_tx,
1090 config.ldap_client_address_info.trusted_tcp_info(),
1091 )
1092 .await?;
1093 Some(h)
1094 }
1095 None => {
1096 debug!("LDAP not requested, skipping");
1097 None
1098 }
1099 };
1100
1101 let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1104 Some(rc) => {
1105 if !config_test {
1106 let (h, repl_ctrl_tx) =
1108 repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1109 .await?;
1110 (Some(h), Some(repl_ctrl_tx))
1111 } else {
1112 (None, None)
1113 }
1114 }
1115 None => {
1116 debug!("Replication not requested, skipping");
1117 (None, None)
1118 }
1119 };
1120
1121 let maybe_http_acceptor_handles = if config_test {
1122 admin_info!("This config rocks! 🪨 ");
1123 None
1124 } else {
1125 let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1126 config.clone(),
1127 jws_signer,
1128 status_ref,
1129 server_write_ref,
1130 server_read_ref,
1131 broadcast_tx.clone(),
1132 maybe_tls_acceptor,
1133 &tls_acceptor_reload_tx,
1134 )
1135 .await
1136 .inspect_err(|err| {
1137 error!(?err, "Failed to start HTTPS server");
1138 })?;
1139
1140 if config.role != ServerRole::WriteReplicaNoUI {
1141 admin_info!("ready to rock! 🪨 UI available at: {}", config.origin);
1142 } else {
1143 admin_info!("ready to rock! 🪨 ");
1144 }
1145 Some(handles)
1146 };
1147
1148 let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1150 let broadcast_rx = broadcast_tx.subscribe();
1151
1152 let admin_handle = AdminActor::create_admin_sock(
1153 config.adminbindpath.as_str(),
1154 server_write_ref,
1155 server_read_ref,
1156 broadcast_rx,
1157 maybe_repl_ctrl_tx,
1158 )
1159 .await?;
1160
1161 Some(admin_handle)
1162 } else {
1163 None
1164 };
1165
1166 let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1167 (TaskName::IntervalActor, interval_handle),
1168 (TaskName::DelayedActionActor, delayed_handle),
1169 (TaskName::AuditdActor, auditd_handle),
1170 (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1171 ];
1172
1173 if let Some(backup_handle) = maybe_backup_handle {
1174 handles.push((TaskName::BackupActor, backup_handle))
1175 }
1176
1177 if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1178 handles.push((TaskName::AdminSocket, admin_sock_handle))
1179 }
1180
1181 if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1182 for ldap_handle in ldap_handles {
1183 handles.push((TaskName::LdapActor, ldap_handle))
1184 }
1185 }
1186
1187 if let Some(http_handles) = maybe_http_acceptor_handles {
1188 for http_handle in http_handles {
1189 handles.push((TaskName::HttpsServer, http_handle))
1190 }
1191 }
1192
1193 if let Some(repl_handle) = maybe_repl_handle {
1194 handles.push((TaskName::Replication, repl_handle))
1195 }
1196
1197 Ok(CoreHandle {
1198 clean_shutdown: false,
1199 tls_acceptor_reload_notify,
1200 tx: broadcast_tx,
1201 handles,
1202 })
1203}