1#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23
24#[macro_use]
25extern crate tracing;
26#[macro_use]
27extern crate kanidmd_lib;
28
29mod actors;
30pub mod admin;
31pub mod config;
32mod crypto;
33mod https;
34mod interval;
35mod ldaps;
36mod repl;
37mod utils;
38
39use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
40use crate::admin::AdminActor;
41use crate::config::{Configuration, ServerRole};
42use crate::interval::IntervalActor;
43use crate::utils::touch_file_or_quit;
44use compact_jwt::{JwsHs256Signer, JwsSigner};
45use kanidm_proto::backup::BackupCompression;
46use kanidm_proto::internal::OperationError;
47use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
48use kanidmd_lib::idm::ldap::LdapServer;
49use kanidmd_lib::prelude::*;
50use kanidmd_lib::schema::Schema;
51use kanidmd_lib::status::StatusActor;
52use kanidmd_lib::value::CredentialType;
53#[cfg(not(target_family = "windows"))]
54use libc::umask;
55use std::fmt::{Display, Formatter};
56use std::path::Path;
57use std::sync::Arc;
58use tokio::sync::broadcast;
59use tokio::sync::mpsc;
60use tokio::sync::Notify;
61use tokio::task;
62
63fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
66 setup_backend_vacuum(config, schema, false)
67}
68
69fn setup_backend_vacuum(
70 config: &Configuration,
71 schema: &Schema,
72 vacuum: bool,
73) -> Result<Backend, OperationError> {
74 let schema_txn = schema.write();
77 let idxmeta = schema_txn.reload_idxmeta();
78
79 let pool_size: u32 = config.threads as u32;
80
81 let cfg = BackendConfig::new(
82 config.db_path.as_deref(),
83 pool_size,
84 config.db_fs_type.unwrap_or_default(),
85 config.db_arc_size,
86 );
87
88 Backend::new(cfg, idxmeta, vacuum)
89}
90
91async fn setup_qs_idms(
96 be: Backend,
97 schema: Schema,
98 config: &Configuration,
99) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
100 let curtime = duration_from_epoch_now();
101 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
103
104 query_server
113 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
114 .await?;
115
116 let is_integration_test = config.integration_test_config.is_some();
118 let (idms, idms_delayed, idms_audit) = IdmServer::new(
119 query_server.clone(),
120 &config.origin,
121 is_integration_test,
122 curtime,
123 )
124 .await?;
125
126 Ok((query_server, idms, idms_delayed, idms_audit))
127}
128
129async fn setup_qs(
130 be: Backend,
131 schema: Schema,
132 config: &Configuration,
133) -> Result<QueryServer, OperationError> {
134 let curtime = duration_from_epoch_now();
135 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
137
138 query_server
147 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
148 .await?;
149
150 Ok(query_server)
151}
152
153macro_rules! dbscan_setup_be {
154 (
155 $config:expr
156 ) => {{
157 let schema = match Schema::new() {
158 Ok(s) => s,
159 Err(e) => {
160 error!("Failed to setup in memory schema: {:?}", e);
161 std::process::exit(1);
162 }
163 };
164
165 match setup_backend($config, &schema) {
166 Ok(be) => be,
167 Err(e) => {
168 error!("Failed to setup BE: {:?}", e);
169 return;
170 }
171 }
172 }};
173}
174
175pub fn dbscan_list_indexes_core(config: &Configuration) {
176 let be = dbscan_setup_be!(config);
177 let mut be_rotxn = match be.read() {
178 Ok(txn) => txn,
179 Err(err) => {
180 error!(?err, "Unable to proceed, backend read transaction failure.");
181 return;
182 }
183 };
184
185 match be_rotxn.list_indexes() {
186 Ok(mut idx_list) => {
187 idx_list.sort_unstable();
188 idx_list.iter().for_each(|idx_name| {
189 println!("{idx_name}");
190 })
191 }
192 Err(e) => {
193 error!("Failed to retrieve index list: {:?}", e);
194 }
195 };
196}
197
198pub fn dbscan_list_id2entry_core(config: &Configuration) {
199 let be = dbscan_setup_be!(config);
200 let mut be_rotxn = match be.read() {
201 Ok(txn) => txn,
202 Err(err) => {
203 error!(?err, "Unable to proceed, backend read transaction failure.");
204 return;
205 }
206 };
207
208 match be_rotxn.list_id2entry() {
209 Ok(mut id_list) => {
210 id_list.sort_unstable_by_key(|k| k.0);
211 id_list.iter().for_each(|(id, value)| {
212 println!("{id:>8}: {value}");
213 })
214 }
215 Err(e) => {
216 error!("Failed to retrieve id2entry list: {:?}", e);
217 }
218 };
219}
220
221pub fn dbscan_list_index_analysis_core(config: &Configuration) {
222 let _be = dbscan_setup_be!(config);
223 }
225
226pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
227 let be = dbscan_setup_be!(config);
228 let mut be_rotxn = match be.read() {
229 Ok(txn) => txn,
230 Err(err) => {
231 error!(?err, "Unable to proceed, backend read transaction failure.");
232 return;
233 }
234 };
235
236 match be_rotxn.list_index_content(index_name) {
237 Ok(mut idx_list) => {
238 idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
239 idx_list.iter().for_each(|(key, value)| {
240 println!("{key:>50}: {value:?}");
241 })
242 }
243 Err(e) => {
244 error!("Failed to retrieve index list: {:?}", e);
245 }
246 };
247}
248
249pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
250 let be = dbscan_setup_be!(config);
251 let mut be_rotxn = match be.read() {
252 Ok(txn) => txn,
253 Err(err) => {
254 error!(?err, "Unable to proceed, backend read transaction failure.");
255 return;
256 }
257 };
258
259 match be_rotxn.get_id2entry(id) {
260 Ok((id, value)) => println!("{id:>8}: {value}"),
261 Err(e) => {
262 error!("Failed to retrieve id2entry value: {:?}", e);
263 }
264 };
265}
266
267pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
268 let be = dbscan_setup_be!(config);
269 let mut be_wrtxn = match be.write() {
270 Ok(txn) => txn,
271 Err(err) => {
272 error!(
273 ?err,
274 "Unable to proceed, backend write transaction failure."
275 );
276 return;
277 }
278 };
279
280 match be_wrtxn
281 .quarantine_entry(id)
282 .and_then(|_| be_wrtxn.commit())
283 {
284 Ok(()) => {
285 println!("quarantined - {id:>8}")
286 }
287 Err(e) => {
288 error!("Failed to quarantine id2entry value: {:?}", e);
289 }
290 };
291}
292
293pub fn dbscan_list_quarantined_core(config: &Configuration) {
294 let be = dbscan_setup_be!(config);
295 let mut be_rotxn = match be.read() {
296 Ok(txn) => txn,
297 Err(err) => {
298 error!(?err, "Unable to proceed, backend read transaction failure.");
299 return;
300 }
301 };
302
303 match be_rotxn.list_quarantined() {
304 Ok(mut id_list) => {
305 id_list.sort_unstable_by_key(|k| k.0);
306 id_list.iter().for_each(|(id, value)| {
307 println!("{id:>8}: {value}");
308 })
309 }
310 Err(e) => {
311 error!("Failed to retrieve id2entry list: {:?}", e);
312 }
313 };
314}
315
316pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
317 let be = dbscan_setup_be!(config);
318 let mut be_wrtxn = match be.write() {
319 Ok(txn) => txn,
320 Err(err) => {
321 error!(
322 ?err,
323 "Unable to proceed, backend write transaction failure."
324 );
325 return;
326 }
327 };
328
329 match be_wrtxn
330 .restore_quarantined(id)
331 .and_then(|_| be_wrtxn.commit())
332 {
333 Ok(()) => {
334 println!("restored - {id:>8}")
335 }
336 Err(e) => {
337 error!("Failed to restore quarantined id2entry value: {:?}", e);
338 }
339 };
340}
341
342pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
343 let schema = match Schema::new() {
344 Ok(s) => s,
345 Err(e) => {
346 error!("Failed to setup in memory schema: {:?}", e);
347 std::process::exit(1);
348 }
349 };
350
351 let be = match setup_backend(config, &schema) {
352 Ok(be) => be,
353 Err(e) => {
354 error!("Failed to setup BE: {:?}", e);
355 return;
356 }
357 };
358
359 let mut be_ro_txn = match be.read() {
360 Ok(txn) => txn,
361 Err(err) => {
362 error!(?err, "Unable to proceed, backend read transaction failure.");
363 return;
364 }
365 };
366
367 let compression = match config.online_backup.as_ref() {
368 Some(backup_config) => backup_config.compression,
369 None => BackupCompression::default(),
370 };
371
372 match be_ro_txn.backup(dst_path, compression) {
373 Ok(_) => info!("Backup success!"),
374 Err(e) => {
375 error!("Backup failed: {:?}", e);
376 std::process::exit(1);
377 }
378 };
379 }
381
382pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
383 if let Some(db_path) = config.db_path.as_ref() {
385 touch_file_or_quit(db_path);
386 }
387
388 let schema = match Schema::new() {
390 Ok(s) => s,
391 Err(e) => {
392 error!("Failed to setup in memory schema: {:?}", e);
393 std::process::exit(1);
394 }
395 };
396
397 let be = match setup_backend(config, &schema) {
398 Ok(be) => be,
399 Err(e) => {
400 error!("Failed to setup backend: {:?}", e);
401 return;
402 }
403 };
404
405 let mut be_wr_txn = match be.write() {
406 Ok(txn) => txn,
407 Err(err) => {
408 error!(
409 ?err,
410 "Unable to proceed, backend write transaction failure."
411 );
412 return;
413 }
414 };
415 let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
416
417 if r.is_err() {
418 error!("Failed to restore database: {:?}", r);
419 std::process::exit(1);
420 }
421 info!("Database loaded successfully");
422
423 reindex_inner(be, schema, config).await;
424
425 info!("✅ Restore Success!");
426}
427
428pub async fn reindex_server_core(config: &Configuration) {
429 info!("Start Index Phase 1 ...");
430 let schema = match Schema::new() {
432 Ok(s) => s,
433 Err(e) => {
434 error!("Failed to setup in memory schema: {:?}", e);
435 std::process::exit(1);
436 }
437 };
438
439 let be = match setup_backend(config, &schema) {
440 Ok(be) => be,
441 Err(e) => {
442 error!("Failed to setup BE: {:?}", e);
443 return;
444 }
445 };
446
447 reindex_inner(be, schema, config).await;
448
449 info!("✅ Reindex Success!");
450}
451
452async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
453 let mut be_wr_txn = match be.write() {
455 Ok(txn) => txn,
456 Err(err) => {
457 error!(
458 ?err,
459 "Unable to proceed, backend write transaction failure."
460 );
461 return;
462 }
463 };
464
465 let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
466
467 if r.is_err() {
469 error!("Failed to reindex database: {:?}", r);
470 std::process::exit(1);
471 }
472 info!("Index Phase 1 Success!");
473
474 info!("Attempting to init query server ...");
475
476 let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
477 Ok(t) => t,
478 Err(e) => {
479 error!("Unable to setup query server or idm server -> {:?}", e);
480 return;
481 }
482 };
483 info!("Init Query Server Success!");
484
485 info!("Start Index Phase 2 ...");
486
487 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
488 error!("Unable to acquire write transaction");
489 return;
490 };
491 let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
492
493 match r {
494 Ok(_) => info!("Index Phase 2 Success!"),
495 Err(e) => {
496 error!("Reindex failed: {:?}", e);
497 std::process::exit(1);
498 }
499 };
500}
501
502pub fn vacuum_server_core(config: &Configuration) {
503 let schema = match Schema::new() {
504 Ok(s) => s,
505 Err(e) => {
506 eprintln!("Failed to setup in memory schema: {e:?}");
507 std::process::exit(1);
508 }
509 };
510
511 let r = setup_backend_vacuum(config, &schema, true);
514
515 match r {
516 Ok(_) => eprintln!("Vacuum Success!"),
517 Err(e) => {
518 eprintln!("Vacuum failed: {e:?}");
519 std::process::exit(1);
520 }
521 };
522}
523
524pub async fn domain_rename_core(config: &Configuration) {
525 let schema = match Schema::new() {
526 Ok(s) => s,
527 Err(e) => {
528 eprintln!("Failed to setup in memory schema: {e:?}");
529 std::process::exit(1);
530 }
531 };
532
533 let be = match setup_backend(config, &schema) {
535 Ok(be) => be,
536 Err(e) => {
537 error!("Failed to setup BE: {:?}", e);
538 return;
539 }
540 };
541
542 let qs = match setup_qs(be, schema, config).await {
544 Ok(t) => t,
545 Err(e) => {
546 error!("Unable to setup query server -> {:?}", e);
547 return;
548 }
549 };
550
551 let new_domain_name = config.domain.as_str();
552
553 match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
555 Ok(old_domain_name) => {
556 admin_info!(?old_domain_name, ?new_domain_name);
557 if old_domain_name == new_domain_name {
558 admin_info!("Domain name not changing, stopping.");
559 return;
560 }
561 admin_debug!(
562 "Domain name is changing from {:?} to {:?}",
563 old_domain_name,
564 new_domain_name
565 );
566 }
567 Err(e) => {
568 admin_error!("Failed to query domain name, quitting! -> {:?}", e);
569 return;
570 }
571 }
572
573 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
574 error!("Unable to acquire write transaction");
575 return;
576 };
577 let r = qs_write
578 .danger_domain_rename(new_domain_name)
579 .and_then(|_| qs_write.commit());
580
581 match r {
582 Ok(_) => info!("Domain Rename Success!"),
583 Err(e) => {
584 error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
585 std::process::exit(1);
586 }
587 };
588}
589
590pub async fn verify_server_core(config: &Configuration) {
591 let curtime = duration_from_epoch_now();
592 let schema_mem = match Schema::new() {
594 Ok(sc) => sc,
595 Err(e) => {
596 error!("Failed to setup in memory schema: {:?}", e);
597 return;
598 }
599 };
600 let be = match setup_backend(config, &schema_mem) {
602 Ok(be) => be,
603 Err(e) => {
604 error!("Failed to setup BE: {:?}", e);
605 return;
606 }
607 };
608
609 let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
610 Ok(qs) => qs,
611 Err(err) => {
612 error!(?err, "Failed to setup query server");
613 return;
614 }
615 };
616
617 let r = server.verify().await;
619
620 if r.is_empty() {
621 eprintln!("Verification passed!");
622 std::process::exit(0);
623 } else {
624 for er in r {
625 error!("{:?}", er);
626 }
627 std::process::exit(1);
628 }
629
630 }
632
633pub fn cert_generate_core(config: &Configuration) {
634 let (tls_key_path, tls_chain_path) = match &config.tls_config {
637 Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
638 None => {
639 error!("Unable to find TLS configuration");
640 std::process::exit(1);
641 }
642 };
643
644 if tls_key_path.exists() && tls_chain_path.exists() {
645 info!(
646 "TLS key and chain already exist - remove them first if you intend to regenerate these"
647 );
648 return;
649 }
650
651 let origin_domain = match config.origin.domain() {
652 Some(val) => val,
653 None => {
654 error!("origin does not contain a valid domain");
655 std::process::exit(1);
656 }
657 };
658
659 let cert_root = match tls_key_path.parent() {
660 Some(parent) => parent,
661 None => {
662 error!("Unable to find parent directory of {:?}", tls_key_path);
663 std::process::exit(1);
664 }
665 };
666
667 let ca_cert = cert_root.join("ca.pem");
668 let ca_key = cert_root.join("cakey.pem");
669 let tls_cert_path = cert_root.join("cert.pem");
670
671 let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
672 let ca_handle = match crypto::build_ca(None) {
674 Ok(ca_handle) => ca_handle,
675 Err(e) => {
676 error!(err = ?e, "Failed to build CA");
677 std::process::exit(1);
678 }
679 };
680
681 if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
682 error!("Failed to write CA");
683 std::process::exit(1);
684 }
685
686 ca_handle
687 } else {
688 match crypto::load_ca(ca_key, ca_cert) {
689 Ok(ca_handle) => ca_handle,
690 Err(_) => {
691 error!("Failed to load CA");
692 std::process::exit(1);
693 }
694 }
695 };
696
697 if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
698 let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
700 Ok(cert_handle) => cert_handle,
701 Err(e) => {
702 error!(err = ?e, "Failed to build certificate");
703 std::process::exit(1);
704 }
705 };
706
707 if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
708 error!("Failed to write certificates");
709 std::process::exit(1);
710 }
711 }
712 info!("certificate generation complete");
713}
714
715#[derive(Clone, Debug)]
716pub enum CoreAction {
717 Shutdown,
718}
719
720pub(crate) enum TaskName {
721 AdminSocket,
722 AuditdActor,
723 BackupActor,
724 DelayedActionActor,
725 HttpsServer,
726 IntervalActor,
727 LdapActor,
728 Replication,
729 TlsAcceptorReload,
730}
731
732impl Display for TaskName {
733 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
734 write!(
735 f,
736 "{}",
737 match self {
738 TaskName::AdminSocket => "Admin Socket",
739 TaskName::AuditdActor => "Auditd Actor",
740 TaskName::BackupActor => "Backup Actor",
741 TaskName::DelayedActionActor => "Delayed Action Actor",
742 TaskName::HttpsServer => "HTTPS Server",
743 TaskName::IntervalActor => "Interval Actor",
744 TaskName::LdapActor => "LDAP Acceptor Actor",
745 TaskName::Replication => "Replication",
746 TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
747 }
748 )
749 }
750}
751
752pub struct CoreHandle {
753 clean_shutdown: bool,
754 tx: broadcast::Sender<CoreAction>,
755 tls_acceptor_reload_notify: Arc<Notify>,
756 handles: Vec<(TaskName, task::JoinHandle<()>)>,
758}
759
760impl CoreHandle {
761 pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
762 self.tx.subscribe()
763 }
764
765 pub async fn shutdown(&mut self) {
766 if self.tx.send(CoreAction::Shutdown).is_err() {
767 eprintln!("No receivers acked shutdown request. Treating as unclean.");
768 return;
769 }
770
771 while let Some((handle_name, handle)) = self.handles.pop() {
773 if let Err(error) = handle.await {
774 eprintln!("Task {handle_name} failed to finish: {error:?}");
775 }
776 }
777
778 self.clean_shutdown = true;
779 }
780
781 pub async fn tls_acceptor_reload(&mut self) {
782 self.tls_acceptor_reload_notify.notify_one()
783 }
784}
785
786impl Drop for CoreHandle {
787 fn drop(&mut self) {
788 if !self.clean_shutdown {
789 eprintln!("⚠️ UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
790 }
791 }
794}
795
796pub async fn create_server_core(
797 config: Configuration,
798 config_test: bool,
799) -> Result<CoreHandle, ()> {
800 let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
802
803 if config.integration_test_config.is_some() {
804 warn!("RUNNING IN INTEGRATION TEST MODE.");
805 warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
806 } else if config.tls_config.is_none() {
807 error!("Running without TLS is not supported! Quitting!");
809 return Err(());
810 }
811
812 info!(
813 "Starting kanidm with {}configuration: {}",
814 if config_test { "TEST " } else { "" },
815 config
816 );
817 #[cfg(not(target_family = "windows"))]
819 unsafe {
820 umask(0o0027)
821 };
822
823 let status_ref = StatusActor::start();
826
827 let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
829 Ok(tls_acc) => tls_acc,
830 Err(err) => {
831 error!(?err, "Failed to configure TLS acceptor");
832 return Err(());
833 }
834 };
835
836 let schema = match Schema::new() {
837 Ok(s) => s,
838 Err(e) => {
839 error!("Failed to setup in memory schema: {:?}", e);
840 return Err(());
841 }
842 };
843
844 let be = match setup_backend(&config, &schema) {
846 Ok(be) => be,
847 Err(e) => {
848 error!("Failed to setup BE -> {:?}", e);
849 return Err(());
850 }
851 };
852 let (_qs, idms, mut idms_delayed, mut idms_audit) =
854 match setup_qs_idms(be, schema, &config).await {
855 Ok(t) => t,
856 Err(e) => {
857 error!("Unable to setup query server or idm server -> {:?}", e);
858 return Err(());
859 }
860 };
861
862 let jws_signer = match JwsHs256Signer::generate_hs256() {
865 Ok(k) => k.set_sign_option_embed_kid(false),
866 Err(e) => {
867 error!("Unable to setup jws signer -> {:?}", e);
868 return Err(());
869 }
870 };
871
872 if let Some(itc) = &config.integration_test_config {
874 let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
875 error!("Unable to acquire write transaction");
876 return Err(());
877 };
878 match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
880 Ok(_) => {}
881 Err(e) => {
882 error!(
883 "Unable to configure INTEGRATION TEST {} account -> {:?}",
884 &itc.admin_user, e
885 );
886 return Err(());
887 }
888 };
889 match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
891 Ok(_) => {}
892 Err(e) => {
893 error!(
894 "Unable to configure INTEGRATION TEST {} account -> {:?}",
895 &itc.idm_admin_user, e
896 );
897 return Err(());
898 }
899 };
900
901 match idms_prox_write.qs_write.internal_modify_uuid(
905 UUID_IDM_ADMINS,
906 &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
907 ) {
908 Ok(_) => {}
909 Err(e) => {
910 error!(
911 "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
912 e
913 );
914 return Err(());
915 }
916 };
917
918 match idms_prox_write.qs_write.internal_modify_uuid(
919 UUID_IDM_ALL_PERSONS,
920 &ModifyList::new_purge_and_set(
921 Attribute::CredentialTypeMinimum,
922 CredentialType::Any.into(),
923 ),
924 ) {
925 Ok(_) => {}
926 Err(e) => {
927 error!(
928 "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
929 e
930 );
931 return Err(());
932 }
933 };
934
935 match idms_prox_write.commit() {
936 Ok(_) => {}
937 Err(e) => {
938 error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
939 return Err(());
940 }
941 }
942 }
943
944 let ldap = match LdapServer::new(&idms).await {
945 Ok(l) => l,
946 Err(e) => {
947 error!("Unable to start LdapServer -> {:?}", e);
948 return Err(());
949 }
950 };
951
952 let idms_arc = Arc::new(idms);
954 let ldap_arc = Arc::new(ldap);
955
956 let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
959
960 let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
962
963 let delayed_handle = task::spawn(async move {
964 let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
965 loop {
966 tokio::select! {
967 added = idms_delayed.recv_many(&mut buffer) => {
968 if added == 0 {
969 break
971 }
972 server_write_ref.handle_delayedaction(&mut buffer).await;
973 }
974 Ok(action) = broadcast_rx.recv() => {
975 match action {
976 CoreAction::Shutdown => break,
977 }
978 }
979 }
980 }
981 info!("Stopped {}", TaskName::DelayedActionActor);
982 });
983
984 let mut broadcast_rx = broadcast_tx.subscribe();
985
986 let auditd_handle = task::spawn(async move {
987 loop {
988 tokio::select! {
989 Ok(action) = broadcast_rx.recv() => {
990 match action {
991 CoreAction::Shutdown => break,
992 }
993 }
994 audit_event = idms_audit.audit_rx().recv() => {
995 match serde_json::to_string(&audit_event) {
996 Ok(audit_event) => {
997 warn!(%audit_event);
998 }
999 Err(e) => {
1000 error!(err=?e, "Unable to process audit event to json.");
1001 warn!(?audit_event, json=false);
1002 }
1003 }
1004
1005 }
1006 }
1007 }
1008 info!("Stopped {}", TaskName::AuditdActor);
1009 });
1010
1011 let mut broadcast_rx = broadcast_tx.subscribe();
1014 let tls_acceptor_reload_notify = Arc::new(Notify::new());
1015 let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1016 let tls_config = config.tls_config.clone();
1017
1018 let ldap_configured = config.ldapbindaddress.is_some();
1019 let (ldap_tls_acceptor_reload_tx, ldap_tls_acceptor_reload_rx) = mpsc::channel(1);
1020 let (http_tls_acceptor_reload_tx, http_tls_acceptor_reload_rx) = mpsc::channel(1);
1021
1022 let tls_acceptor_reload_handle = task::spawn(async move {
1023 loop {
1024 tokio::select! {
1025 Ok(action) = broadcast_rx.recv() => {
1026 match action {
1027 CoreAction::Shutdown => break,
1028 }
1029 }
1030 _ = tls_accepter_reload_task_notify.notified() => {
1031 let tls_acceptor = match crypto::setup_tls(&tls_config) {
1032 Ok(Some(tls_acc)) => tls_acc,
1033 Ok(None) => {
1034 warn!("TLS not configured, ignoring reload request.");
1035 continue;
1036 }
1037 Err(err) => {
1038 error!(?err, "Failed to configure and reload TLS acceptor");
1039 continue;
1040 }
1041 };
1042
1043 if ldap_configured &&
1046 ldap_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1047 error!("ldap tls acceptor did not accept the reload, the server may have failed!");
1048 };
1049 if http_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1050 error!("http tls acceptor did not accept the reload, the server may have failed!");
1051 break;
1052 };
1053 }
1054 }
1055 }
1056 info!("Stopped {}", TaskName::TlsAcceptorReload);
1057 });
1058
1059 let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1061 let maybe_backup_handle = match &config.online_backup {
1063 Some(online_backup_config) => {
1064 if online_backup_config.enabled {
1065 let handle = IntervalActor::start_online_backup(
1066 server_read_ref,
1067 online_backup_config,
1068 broadcast_tx.subscribe(),
1069 )?;
1070 Some(handle)
1071 } else {
1072 debug!("Backups disabled");
1073 None
1074 }
1075 }
1076 None => {
1077 debug!("Online backup not requested, skipping");
1078 None
1079 }
1080 };
1081
1082 let maybe_ldap_acceptor_handle = match &config.ldapbindaddress {
1084 Some(la) => {
1085 let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1086
1087 let h = ldaps::create_ldap_server(
1088 la.as_str(),
1089 opt_ldap_ssl_acceptor,
1090 server_read_ref,
1091 broadcast_tx.subscribe(),
1092 ldap_tls_acceptor_reload_rx,
1093 config.ldap_client_address_info.trusted_proxy_v2(),
1094 )
1095 .await?;
1096 Some(h)
1097 }
1098 None => {
1099 debug!("LDAP not requested, skipping");
1100 None
1101 }
1102 };
1103
1104 let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1107 Some(rc) => {
1108 if !config_test {
1109 let (h, repl_ctrl_tx) =
1111 repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1112 .await?;
1113 (Some(h), Some(repl_ctrl_tx))
1114 } else {
1115 (None, None)
1116 }
1117 }
1118 None => {
1119 debug!("Replication not requested, skipping");
1120 (None, None)
1121 }
1122 };
1123
1124 let maybe_http_acceptor_handle = if config_test {
1125 admin_info!("This config rocks! 🪨 ");
1126 None
1127 } else {
1128 let h: task::JoinHandle<()> = match https::create_https_server(
1129 config.clone(),
1130 jws_signer,
1131 status_ref,
1132 server_write_ref,
1133 server_read_ref,
1134 broadcast_tx.clone(),
1135 maybe_tls_acceptor,
1136 http_tls_acceptor_reload_rx,
1137 )
1138 .await
1139 {
1140 Ok(h) => h,
1141 Err(e) => {
1142 error!("Failed to start HTTPS server -> {:?}", e);
1143 return Err(());
1144 }
1145 };
1146 if config.role != ServerRole::WriteReplicaNoUI {
1147 admin_info!("ready to rock! 🪨 UI available at: {}", config.origin);
1148 } else {
1149 admin_info!("ready to rock! 🪨 ");
1150 }
1151 Some(h)
1152 };
1153
1154 let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1156 let broadcast_rx = broadcast_tx.subscribe();
1157
1158 let admin_handle = AdminActor::create_admin_sock(
1159 config.adminbindpath.as_str(),
1160 server_write_ref,
1161 server_read_ref,
1162 broadcast_rx,
1163 maybe_repl_ctrl_tx,
1164 )
1165 .await?;
1166
1167 Some(admin_handle)
1168 } else {
1169 None
1170 };
1171
1172 let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1173 (TaskName::IntervalActor, interval_handle),
1174 (TaskName::DelayedActionActor, delayed_handle),
1175 (TaskName::AuditdActor, auditd_handle),
1176 (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1177 ];
1178
1179 if let Some(backup_handle) = maybe_backup_handle {
1180 handles.push((TaskName::BackupActor, backup_handle))
1181 }
1182
1183 if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1184 handles.push((TaskName::AdminSocket, admin_sock_handle))
1185 }
1186
1187 if let Some(ldap_handle) = maybe_ldap_acceptor_handle {
1188 handles.push((TaskName::LdapActor, ldap_handle))
1189 }
1190
1191 if let Some(http_handle) = maybe_http_acceptor_handle {
1192 handles.push((TaskName::HttpsServer, http_handle))
1193 }
1194
1195 if let Some(repl_handle) = maybe_repl_handle {
1196 handles.push((TaskName::Replication, repl_handle))
1197 }
1198
1199 Ok(CoreHandle {
1200 clean_shutdown: false,
1201 tls_acceptor_reload_notify,
1202 tx: broadcast_tx,
1203 handles,
1204 })
1205}