1#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23
24#[macro_use]
25extern crate tracing;
26#[macro_use]
27extern crate kanidmd_lib;
28
29mod actors;
30pub mod admin;
31pub mod config;
32mod crypto;
33mod https;
34mod interval;
35mod ldaps;
36mod repl;
37mod tcp;
38mod utils;
39
40use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
41use crate::admin::AdminActor;
42use crate::config::Configuration;
43use crate::interval::IntervalActor;
44use crate::utils::touch_file_or_quit;
45use compact_jwt::{JwsHs256Signer, JwsSigner};
46use kanidm_proto::backup::BackupCompression;
47use kanidm_proto::config::ServerRole;
48use kanidm_proto::internal::OperationError;
49use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
50use kanidmd_lib::idm::ldap::LdapServer;
51use kanidmd_lib::prelude::*;
52use kanidmd_lib::schema::Schema;
53use kanidmd_lib::status::StatusActor;
54use kanidmd_lib::value::CredentialType;
55#[cfg(not(target_family = "windows"))]
56use libc::umask;
57use std::fmt::{Display, Formatter};
58use std::path::Path;
59use std::sync::Arc;
60use tokio::sync::broadcast;
61use tokio::sync::Notify;
62use tokio::task;
63
64fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
67 setup_backend_vacuum(config, schema, false)
68}
69
70fn setup_backend_vacuum(
71 config: &Configuration,
72 schema: &Schema,
73 vacuum: bool,
74) -> Result<Backend, OperationError> {
75 let schema_txn = schema.write();
78 let idxmeta = schema_txn.reload_idxmeta();
79
80 let pool_size: u32 = config.threads as u32;
81
82 let cfg = BackendConfig::new(
83 config.db_path.as_deref(),
84 pool_size,
85 config.db_fs_type.unwrap_or_default(),
86 config.db_arc_size,
87 );
88
89 Backend::new(cfg, idxmeta, vacuum)
90}
91
92async fn setup_qs_idms(
97 be: Backend,
98 schema: Schema,
99 config: &Configuration,
100) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
101 let curtime = duration_from_epoch_now();
102 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
104
105 query_server
114 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
115 .await?;
116
117 let is_integration_test = config.integration_test_config.is_some();
119 let (idms, idms_delayed, idms_audit) = IdmServer::new(
120 query_server.clone(),
121 &config.origin,
122 is_integration_test,
123 curtime,
124 )
125 .await?;
126
127 Ok((query_server, idms, idms_delayed, idms_audit))
128}
129
130async fn setup_qs(
131 be: Backend,
132 schema: Schema,
133 config: &Configuration,
134) -> Result<QueryServer, OperationError> {
135 let curtime = duration_from_epoch_now();
136 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
138
139 query_server
148 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
149 .await?;
150
151 Ok(query_server)
152}
153
154macro_rules! dbscan_setup_be {
155 (
156 $config:expr
157 ) => {{
158 let schema = match Schema::new() {
159 Ok(s) => s,
160 Err(e) => {
161 error!("Failed to setup in memory schema: {:?}", e);
162 std::process::exit(1);
163 }
164 };
165
166 match setup_backend($config, &schema) {
167 Ok(be) => be,
168 Err(e) => {
169 error!("Failed to setup BE: {:?}", e);
170 return;
171 }
172 }
173 }};
174}
175
176pub fn dbscan_list_indexes_core(config: &Configuration) {
177 let be = dbscan_setup_be!(config);
178 let mut be_rotxn = match be.read() {
179 Ok(txn) => txn,
180 Err(err) => {
181 error!(?err, "Unable to proceed, backend read transaction failure.");
182 return;
183 }
184 };
185
186 match be_rotxn.list_indexes() {
187 Ok(mut idx_list) => {
188 idx_list.sort_unstable();
189 idx_list.iter().for_each(|idx_name| {
190 println!("{idx_name}");
191 })
192 }
193 Err(e) => {
194 error!("Failed to retrieve index list: {:?}", e);
195 }
196 };
197}
198
199pub fn dbscan_list_id2entry_core(config: &Configuration) {
200 let be = dbscan_setup_be!(config);
201 let mut be_rotxn = match be.read() {
202 Ok(txn) => txn,
203 Err(err) => {
204 error!(?err, "Unable to proceed, backend read transaction failure.");
205 return;
206 }
207 };
208
209 match be_rotxn.list_id2entry() {
210 Ok(mut id_list) => {
211 id_list.sort_unstable_by_key(|k| k.0);
212 id_list.iter().for_each(|(id, value)| {
213 println!("{id:>8}: {value}");
214 })
215 }
216 Err(e) => {
217 error!("Failed to retrieve id2entry list: {:?}", e);
218 }
219 };
220}
221
222pub fn dbscan_list_index_analysis_core(config: &Configuration) {
223 let _be = dbscan_setup_be!(config);
224 }
226
227pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
228 let be = dbscan_setup_be!(config);
229 let mut be_rotxn = match be.read() {
230 Ok(txn) => txn,
231 Err(err) => {
232 error!(?err, "Unable to proceed, backend read transaction failure.");
233 return;
234 }
235 };
236
237 match be_rotxn.list_index_content(index_name) {
238 Ok(mut idx_list) => {
239 idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
240 idx_list.iter().for_each(|(key, value)| {
241 println!("{key:>50}: {value:?}");
242 })
243 }
244 Err(e) => {
245 error!("Failed to retrieve index list: {:?}", e);
246 }
247 };
248}
249
250pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
251 let be = dbscan_setup_be!(config);
252 let mut be_rotxn = match be.read() {
253 Ok(txn) => txn,
254 Err(err) => {
255 error!(?err, "Unable to proceed, backend read transaction failure.");
256 return;
257 }
258 };
259
260 match be_rotxn.get_id2entry(id) {
261 Ok((id, value)) => println!("{id:>8}: {value}"),
262 Err(e) => {
263 error!("Failed to retrieve id2entry value: {:?}", e);
264 }
265 };
266}
267
268pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
269 let be = dbscan_setup_be!(config);
270 let mut be_wrtxn = match be.write() {
271 Ok(txn) => txn,
272 Err(err) => {
273 error!(
274 ?err,
275 "Unable to proceed, backend write transaction failure."
276 );
277 return;
278 }
279 };
280
281 match be_wrtxn
282 .quarantine_entry(id)
283 .and_then(|_| be_wrtxn.commit())
284 {
285 Ok(()) => {
286 println!("quarantined - {id:>8}")
287 }
288 Err(e) => {
289 error!("Failed to quarantine id2entry value: {:?}", e);
290 }
291 };
292}
293
294pub fn dbscan_list_quarantined_core(config: &Configuration) {
295 let be = dbscan_setup_be!(config);
296 let mut be_rotxn = match be.read() {
297 Ok(txn) => txn,
298 Err(err) => {
299 error!(?err, "Unable to proceed, backend read transaction failure.");
300 return;
301 }
302 };
303
304 match be_rotxn.list_quarantined() {
305 Ok(mut id_list) => {
306 id_list.sort_unstable_by_key(|k| k.0);
307 id_list.iter().for_each(|(id, value)| {
308 println!("{id:>8}: {value}");
309 })
310 }
311 Err(e) => {
312 error!("Failed to retrieve id2entry list: {:?}", e);
313 }
314 };
315}
316
317pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
318 let be = dbscan_setup_be!(config);
319 let mut be_wrtxn = match be.write() {
320 Ok(txn) => txn,
321 Err(err) => {
322 error!(
323 ?err,
324 "Unable to proceed, backend write transaction failure."
325 );
326 return;
327 }
328 };
329
330 match be_wrtxn
331 .restore_quarantined(id)
332 .and_then(|_| be_wrtxn.commit())
333 {
334 Ok(()) => {
335 println!("restored - {id:>8}")
336 }
337 Err(e) => {
338 error!("Failed to restore quarantined id2entry value: {:?}", e);
339 }
340 };
341}
342
343pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
344 let schema = match Schema::new() {
345 Ok(s) => s,
346 Err(e) => {
347 error!("Failed to setup in memory schema: {:?}", e);
348 std::process::exit(1);
349 }
350 };
351
352 let be = match setup_backend(config, &schema) {
353 Ok(be) => be,
354 Err(e) => {
355 error!("Failed to setup BE: {:?}", e);
356 return;
357 }
358 };
359
360 let mut be_ro_txn = match be.read() {
361 Ok(txn) => txn,
362 Err(err) => {
363 error!(?err, "Unable to proceed, backend read transaction failure.");
364 return;
365 }
366 };
367
368 let compression = match config.online_backup.as_ref() {
369 Some(backup_config) => backup_config.compression,
370 None => BackupCompression::default(),
371 };
372
373 match be_ro_txn.backup(dst_path, compression) {
374 Ok(_) => info!("Backup success!"),
375 Err(e) => {
376 error!("Backup failed: {:?}", e);
377 std::process::exit(1);
378 }
379 };
380 }
382
383pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
384 if let Some(db_path) = config.db_path.as_ref() {
386 touch_file_or_quit(db_path);
387 }
388
389 let schema = match Schema::new() {
391 Ok(s) => s,
392 Err(e) => {
393 error!("Failed to setup in memory schema: {:?}", e);
394 std::process::exit(1);
395 }
396 };
397
398 let be = match setup_backend(config, &schema) {
399 Ok(be) => be,
400 Err(e) => {
401 error!("Failed to setup backend: {:?}", e);
402 return;
403 }
404 };
405
406 let mut be_wr_txn = match be.write() {
407 Ok(txn) => txn,
408 Err(err) => {
409 error!(
410 ?err,
411 "Unable to proceed, backend write transaction failure."
412 );
413 return;
414 }
415 };
416 let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
417
418 if r.is_err() {
419 error!("Failed to restore database: {:?}", r);
420 std::process::exit(1);
421 }
422 info!("Database loaded successfully");
423
424 reindex_inner(be, schema, config).await;
425
426 info!("✅ Restore Success!");
427}
428
429pub async fn reindex_server_core(config: &Configuration) {
430 info!("Start Index Phase 1 ...");
431 let schema = match Schema::new() {
433 Ok(s) => s,
434 Err(e) => {
435 error!("Failed to setup in memory schema: {:?}", e);
436 std::process::exit(1);
437 }
438 };
439
440 let be = match setup_backend(config, &schema) {
441 Ok(be) => be,
442 Err(e) => {
443 error!("Failed to setup BE: {:?}", e);
444 return;
445 }
446 };
447
448 reindex_inner(be, schema, config).await;
449
450 info!("✅ Reindex Success!");
451}
452
453async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
454 let mut be_wr_txn = match be.write() {
456 Ok(txn) => txn,
457 Err(err) => {
458 error!(
459 ?err,
460 "Unable to proceed, backend write transaction failure."
461 );
462 return;
463 }
464 };
465
466 let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
467
468 if r.is_err() {
470 error!("Failed to reindex database: {:?}", r);
471 std::process::exit(1);
472 }
473 info!("Index Phase 1 Success!");
474
475 info!("Attempting to init query server ...");
476
477 let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
478 Ok(t) => t,
479 Err(e) => {
480 error!("Unable to setup query server or idm server -> {:?}", e);
481 return;
482 }
483 };
484 info!("Init Query Server Success!");
485
486 info!("Start Index Phase 2 ...");
487
488 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
489 error!("Unable to acquire write transaction");
490 return;
491 };
492 let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
493
494 match r {
495 Ok(_) => info!("Index Phase 2 Success!"),
496 Err(e) => {
497 error!("Reindex failed: {:?}", e);
498 std::process::exit(1);
499 }
500 };
501}
502
503pub fn vacuum_server_core(config: &Configuration) {
504 let schema = match Schema::new() {
505 Ok(s) => s,
506 Err(e) => {
507 eprintln!("Failed to setup in memory schema: {e:?}");
508 std::process::exit(1);
509 }
510 };
511
512 let r = setup_backend_vacuum(config, &schema, true);
515
516 match r {
517 Ok(_) => eprintln!("Vacuum Success!"),
518 Err(e) => {
519 eprintln!("Vacuum failed: {e:?}");
520 std::process::exit(1);
521 }
522 };
523}
524
525pub async fn domain_rename_core(config: &Configuration) {
526 let schema = match Schema::new() {
527 Ok(s) => s,
528 Err(e) => {
529 eprintln!("Failed to setup in memory schema: {e:?}");
530 std::process::exit(1);
531 }
532 };
533
534 let be = match setup_backend(config, &schema) {
536 Ok(be) => be,
537 Err(e) => {
538 error!("Failed to setup BE: {:?}", e);
539 return;
540 }
541 };
542
543 let qs = match setup_qs(be, schema, config).await {
545 Ok(t) => t,
546 Err(e) => {
547 error!("Unable to setup query server -> {:?}", e);
548 return;
549 }
550 };
551
552 let new_domain_name = config.domain.as_str();
553
554 match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
556 Ok(old_domain_name) => {
557 admin_info!(?old_domain_name, ?new_domain_name);
558 if old_domain_name == new_domain_name {
559 admin_info!("Domain name not changing, stopping.");
560 return;
561 }
562 admin_debug!(
563 "Domain name is changing from {:?} to {:?}",
564 old_domain_name,
565 new_domain_name
566 );
567 }
568 Err(e) => {
569 admin_error!("Failed to query domain name, quitting! -> {:?}", e);
570 return;
571 }
572 }
573
574 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
575 error!("Unable to acquire write transaction");
576 return;
577 };
578 let r = qs_write
579 .danger_domain_rename(new_domain_name)
580 .and_then(|_| qs_write.commit());
581
582 match r {
583 Ok(_) => info!("Domain Rename Success!"),
584 Err(e) => {
585 error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
586 std::process::exit(1);
587 }
588 };
589}
590
591pub async fn verify_server_core(config: &Configuration) {
592 let curtime = duration_from_epoch_now();
593 let schema_mem = match Schema::new() {
595 Ok(sc) => sc,
596 Err(e) => {
597 error!("Failed to setup in memory schema: {:?}", e);
598 return;
599 }
600 };
601 let be = match setup_backend(config, &schema_mem) {
603 Ok(be) => be,
604 Err(e) => {
605 error!("Failed to setup BE: {:?}", e);
606 return;
607 }
608 };
609
610 let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
611 Ok(qs) => qs,
612 Err(err) => {
613 error!(?err, "Failed to setup query server");
614 return;
615 }
616 };
617
618 let r = server.verify().await;
620
621 if r.is_empty() {
622 eprintln!("Verification passed!");
623 std::process::exit(0);
624 } else {
625 for er in r {
626 error!("{:?}", er);
627 }
628 std::process::exit(1);
629 }
630
631 }
633
634pub fn cert_generate_core(config: &Configuration) {
635 let (tls_key_path, tls_chain_path) = match &config.tls_config {
638 Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
639 None => {
640 error!("Unable to find TLS configuration");
641 std::process::exit(1);
642 }
643 };
644
645 if tls_key_path.exists() && tls_chain_path.exists() {
646 info!(
647 "TLS key and chain already exist - remove them first if you intend to regenerate these"
648 );
649 return;
650 }
651
652 let origin_domain = match config.origin.domain() {
653 Some(val) => val,
654 None => {
655 error!("origin does not contain a valid domain");
656 std::process::exit(1);
657 }
658 };
659
660 let cert_root = match tls_key_path.parent() {
661 Some(parent) => parent,
662 None => {
663 error!("Unable to find parent directory of {:?}", tls_key_path);
664 std::process::exit(1);
665 }
666 };
667
668 let ca_cert = cert_root.join("ca.pem");
669 let ca_key = cert_root.join("cakey.pem");
670 let tls_cert_path = cert_root.join("cert.pem");
671
672 let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
673 let ca_handle = match crypto::build_ca(None) {
675 Ok(ca_handle) => ca_handle,
676 Err(e) => {
677 error!(err = ?e, "Failed to build CA");
678 std::process::exit(1);
679 }
680 };
681
682 if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
683 error!("Failed to write CA");
684 std::process::exit(1);
685 }
686
687 ca_handle
688 } else {
689 match crypto::load_ca(ca_key, ca_cert) {
690 Ok(ca_handle) => ca_handle,
691 Err(_) => {
692 error!("Failed to load CA");
693 std::process::exit(1);
694 }
695 }
696 };
697
698 if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
699 let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
701 Ok(cert_handle) => cert_handle,
702 Err(e) => {
703 error!(err = ?e, "Failed to build certificate");
704 std::process::exit(1);
705 }
706 };
707
708 if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
709 error!("Failed to write certificates");
710 std::process::exit(1);
711 }
712 }
713 info!("certificate generation complete");
714}
715
716#[derive(Clone, Debug)]
717pub enum CoreAction {
718 Shutdown,
719}
720
721pub(crate) enum TaskName {
722 AdminSocket,
723 AuditdActor,
724 BackupActor,
725 DelayedActionActor,
726 HttpsServer,
727 IntervalActor,
728 LdapActor,
729 Replication,
730 TlsAcceptorReload,
731}
732
733impl Display for TaskName {
734 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
735 write!(
736 f,
737 "{}",
738 match self {
739 TaskName::AdminSocket => "Admin Socket",
740 TaskName::AuditdActor => "Auditd Actor",
741 TaskName::BackupActor => "Backup Actor",
742 TaskName::DelayedActionActor => "Delayed Action Actor",
743 TaskName::HttpsServer => "HTTPS Server",
744 TaskName::IntervalActor => "Interval Actor",
745 TaskName::LdapActor => "LDAP Acceptor Actor",
746 TaskName::Replication => "Replication",
747 TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
748 }
749 )
750 }
751}
752
753pub struct CoreHandle {
754 clean_shutdown: bool,
755 tx: broadcast::Sender<CoreAction>,
756 tls_acceptor_reload_notify: Arc<Notify>,
757 handles: Vec<(TaskName, task::JoinHandle<()>)>,
759}
760
761impl CoreHandle {
762 pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
763 self.tx.subscribe()
764 }
765
766 pub async fn shutdown(&mut self) {
767 if self.tx.send(CoreAction::Shutdown).is_err() {
768 eprintln!("No receivers acked shutdown request. Treating as unclean.");
769 return;
770 }
771
772 while let Some((handle_name, handle)) = self.handles.pop() {
774 if let Err(error) = handle.await {
775 eprintln!("Task {handle_name} failed to finish: {error:?}");
776 }
777 }
778
779 self.clean_shutdown = true;
780 }
781
782 pub async fn tls_acceptor_reload(&mut self) {
783 self.tls_acceptor_reload_notify.notify_one()
784 }
785}
786
787impl Drop for CoreHandle {
788 fn drop(&mut self) {
789 if !self.clean_shutdown {
790 eprintln!("⚠️ UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
791 }
792 }
795}
796
797pub async fn create_server_core(
798 config: Configuration,
799 config_test: bool,
800) -> Result<CoreHandle, ()> {
801 let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
803
804 if config.integration_test_config.is_some() {
805 warn!("RUNNING IN INTEGRATION TEST MODE.");
806 warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
807 } else if config.tls_config.is_none() {
808 error!("Running without TLS is not supported! Quitting!");
810 return Err(());
811 }
812
813 info!(
814 "Starting kanidm with {}configuration: {}",
815 if config_test { "TEST " } else { "" },
816 config
817 );
818 #[cfg(not(target_family = "windows"))]
820 unsafe {
821 umask(0o0027)
822 };
823
824 let status_ref = StatusActor::start();
827
828 let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
830 Ok(tls_acc) => tls_acc,
831 Err(err) => {
832 error!(?err, "Failed to configure TLS acceptor");
833 return Err(());
834 }
835 };
836
837 let schema = match Schema::new() {
838 Ok(s) => s,
839 Err(e) => {
840 error!("Failed to setup in memory schema: {:?}", e);
841 return Err(());
842 }
843 };
844
845 let be = match setup_backend(&config, &schema) {
847 Ok(be) => be,
848 Err(e) => {
849 error!("Failed to setup BE -> {:?}", e);
850 return Err(());
851 }
852 };
853 let (_qs, idms, mut idms_delayed, mut idms_audit) =
855 match setup_qs_idms(be, schema, &config).await {
856 Ok(t) => t,
857 Err(e) => {
858 error!("Unable to setup query server or idm server -> {:?}", e);
859 return Err(());
860 }
861 };
862
863 let jws_signer = match JwsHs256Signer::generate_hs256() {
866 Ok(k) => k.set_sign_option_embed_kid(false),
867 Err(e) => {
868 error!("Unable to setup jws signer -> {:?}", e);
869 return Err(());
870 }
871 };
872
873 if let Some(itc) = &config.integration_test_config {
875 let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
876 error!("Unable to acquire write transaction");
877 return Err(());
878 };
879 match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
881 Ok(_) => {}
882 Err(e) => {
883 error!(
884 "Unable to configure INTEGRATION TEST {} account -> {:?}",
885 &itc.admin_user, e
886 );
887 return Err(());
888 }
889 };
890 match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
892 Ok(_) => {}
893 Err(e) => {
894 error!(
895 "Unable to configure INTEGRATION TEST {} account -> {:?}",
896 &itc.idm_admin_user, e
897 );
898 return Err(());
899 }
900 };
901
902 match idms_prox_write.qs_write.internal_modify_uuid(
906 UUID_IDM_ADMINS,
907 &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
908 ) {
909 Ok(_) => {}
910 Err(e) => {
911 error!(
912 "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
913 e
914 );
915 return Err(());
916 }
917 };
918
919 match idms_prox_write.qs_write.internal_modify_uuid(
920 UUID_IDM_ALL_PERSONS,
921 &ModifyList::new_purge_and_set(
922 Attribute::CredentialTypeMinimum,
923 CredentialType::Any.into(),
924 ),
925 ) {
926 Ok(_) => {}
927 Err(e) => {
928 error!(
929 "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
930 e
931 );
932 return Err(());
933 }
934 };
935
936 match idms_prox_write.commit() {
937 Ok(_) => {}
938 Err(e) => {
939 error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
940 return Err(());
941 }
942 }
943 }
944
945 let ldap = match LdapServer::new(&idms).await {
946 Ok(l) => l,
947 Err(e) => {
948 error!("Unable to start LdapServer -> {:?}", e);
949 return Err(());
950 }
951 };
952
953 let idms_arc = Arc::new(idms);
955 let ldap_arc = Arc::new(ldap);
956
957 let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
960
961 let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
963
964 let delayed_handle = task::spawn(async move {
965 let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
966 loop {
967 tokio::select! {
968 added = idms_delayed.recv_many(&mut buffer) => {
969 if added == 0 {
970 break
972 }
973 server_write_ref.handle_delayedaction(&mut buffer).await;
974 }
975 Ok(action) = broadcast_rx.recv() => {
976 match action {
977 CoreAction::Shutdown => break,
978 }
979 }
980 }
981 }
982 info!("Stopped {}", TaskName::DelayedActionActor);
983 });
984
985 let mut broadcast_rx = broadcast_tx.subscribe();
986
987 let auditd_handle = task::spawn(async move {
988 loop {
989 tokio::select! {
990 Ok(action) = broadcast_rx.recv() => {
991 match action {
992 CoreAction::Shutdown => break,
993 }
994 }
995 audit_event = idms_audit.audit_rx().recv() => {
996 match serde_json::to_string(&audit_event) {
997 Ok(audit_event) => {
998 warn!(%audit_event);
999 }
1000 Err(e) => {
1001 error!(err=?e, "Unable to process audit event to json.");
1002 warn!(?audit_event, json=false);
1003 }
1004 }
1005
1006 }
1007 }
1008 }
1009 info!("Stopped {}", TaskName::AuditdActor);
1010 });
1011
1012 let mut broadcast_rx = broadcast_tx.subscribe();
1015 let tls_acceptor_reload_notify = Arc::new(Notify::new());
1016 let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1017 let tls_config = config.tls_config.clone();
1018
1019 let (tls_acceptor_reload_tx, _tls_acceptor_reload_rx) = broadcast::channel(1);
1020 let tls_acceptor_reload_tx_c = tls_acceptor_reload_tx.clone();
1021
1022 let tls_acceptor_reload_handle = task::spawn(async move {
1023 loop {
1024 tokio::select! {
1025 Ok(action) = broadcast_rx.recv() => {
1026 match action {
1027 CoreAction::Shutdown => break,
1028 }
1029 }
1030 _ = tls_accepter_reload_task_notify.notified() => {
1031 let tls_acceptor = match crypto::setup_tls(&tls_config) {
1032 Ok(Some(tls_acc)) => tls_acc,
1033 Ok(None) => {
1034 warn!("TLS not configured, ignoring reload request.");
1035 continue;
1036 }
1037 Err(err) => {
1038 error!(?err, "Failed to configure and reload TLS acceptor");
1039 continue;
1040 }
1041 };
1042
1043 if tls_acceptor_reload_tx_c.send(tls_acceptor).is_err() {
1046 error!("tls acceptor did not accept the reload, the server may have failed!");
1047 };
1048 info!("tls acceptor reload notification sent");
1049 }
1050 }
1051 }
1052 info!("Stopped {}", TaskName::TlsAcceptorReload);
1053 });
1054
1055 let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1057 let maybe_backup_handle = match &config.online_backup {
1059 Some(online_backup_config) => {
1060 if online_backup_config.enabled {
1061 let handle = IntervalActor::start_online_backup(
1062 server_read_ref,
1063 online_backup_config,
1064 broadcast_tx.subscribe(),
1065 )?;
1066 Some(handle)
1067 } else {
1068 debug!("Backups disabled");
1069 None
1070 }
1071 }
1072 None => {
1073 debug!("Online backup not requested, skipping");
1074 None
1075 }
1076 };
1077
1078 let maybe_ldap_acceptor_handles = match &config.ldapbindaddress {
1080 Some(la) => {
1081 let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1082
1083 let h = ldaps::create_ldap_server(
1084 la,
1085 opt_ldap_ssl_acceptor,
1086 server_read_ref,
1087 &broadcast_tx,
1088 &tls_acceptor_reload_tx,
1089 config.ldap_client_address_info.trusted_tcp_info(),
1090 )
1091 .await?;
1092 Some(h)
1093 }
1094 None => {
1095 debug!("LDAP not requested, skipping");
1096 None
1097 }
1098 };
1099
1100 let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1103 Some(rc) => {
1104 if !config_test {
1105 let (h, repl_ctrl_tx) =
1107 repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1108 .await?;
1109 (Some(h), Some(repl_ctrl_tx))
1110 } else {
1111 (None, None)
1112 }
1113 }
1114 None => {
1115 debug!("Replication not requested, skipping");
1116 (None, None)
1117 }
1118 };
1119
1120 let maybe_http_acceptor_handles = if config_test {
1121 admin_info!("This config rocks! 🪨 ");
1122 None
1123 } else {
1124 let handles: Vec<task::JoinHandle<()>> = https::create_https_server(
1125 config.clone(),
1126 jws_signer,
1127 status_ref,
1128 server_write_ref,
1129 server_read_ref,
1130 broadcast_tx.clone(),
1131 maybe_tls_acceptor,
1132 &tls_acceptor_reload_tx,
1133 )
1134 .await
1135 .inspect_err(|err| {
1136 error!(?err, "Failed to start HTTPS server");
1137 })?;
1138
1139 if config.role != ServerRole::WriteReplicaNoUI {
1140 admin_info!("ready to rock! 🪨 UI available at: {}", config.origin);
1141 } else {
1142 admin_info!("ready to rock! 🪨 ");
1143 }
1144 Some(handles)
1145 };
1146
1147 let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1149 let broadcast_rx = broadcast_tx.subscribe();
1150
1151 let admin_handle = AdminActor::create_admin_sock(
1152 config.adminbindpath.as_str(),
1153 server_write_ref,
1154 server_read_ref,
1155 broadcast_rx,
1156 maybe_repl_ctrl_tx,
1157 )
1158 .await?;
1159
1160 Some(admin_handle)
1161 } else {
1162 None
1163 };
1164
1165 let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1166 (TaskName::IntervalActor, interval_handle),
1167 (TaskName::DelayedActionActor, delayed_handle),
1168 (TaskName::AuditdActor, auditd_handle),
1169 (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1170 ];
1171
1172 if let Some(backup_handle) = maybe_backup_handle {
1173 handles.push((TaskName::BackupActor, backup_handle))
1174 }
1175
1176 if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1177 handles.push((TaskName::AdminSocket, admin_sock_handle))
1178 }
1179
1180 if let Some(ldap_handles) = maybe_ldap_acceptor_handles {
1181 for ldap_handle in ldap_handles {
1182 handles.push((TaskName::LdapActor, ldap_handle))
1183 }
1184 }
1185
1186 if let Some(http_handles) = maybe_http_acceptor_handles {
1187 for http_handle in http_handles {
1188 handles.push((TaskName::HttpsServer, http_handle))
1189 }
1190 }
1191
1192 if let Some(repl_handle) = maybe_repl_handle {
1193 handles.push((TaskName::Replication, repl_handle))
1194 }
1195
1196 Ok(CoreHandle {
1197 clean_shutdown: false,
1198 tls_acceptor_reload_notify,
1199 tx: broadcast_tx,
1200 handles,
1201 })
1202}