1#![deny(warnings)]
12#![warn(unused_extern_crates)]
13#![warn(unused_imports)]
14#![deny(clippy::todo)]
15#![deny(clippy::unimplemented)]
16#![deny(clippy::unwrap_used)]
17#![deny(clippy::expect_used)]
18#![deny(clippy::panic)]
19#![deny(clippy::unreachable)]
20#![deny(clippy::await_holding_lock)]
21#![deny(clippy::needless_pass_by_value)]
22#![deny(clippy::trivially_copy_pass_by_ref)]
23
24#[macro_use]
25extern crate tracing;
26#[macro_use]
27extern crate kanidmd_lib;
28
29mod actors;
30pub mod admin;
31pub mod config;
32mod crypto;
33mod https;
34mod interval;
35mod ldaps;
36mod repl;
37mod utils;
38
39use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
40use crate::admin::AdminActor;
41use crate::config::{Configuration, ServerRole};
42use crate::interval::IntervalActor;
43use crate::utils::touch_file_or_quit;
44use compact_jwt::{JwsHs256Signer, JwsSigner};
45use kanidm_proto::internal::OperationError;
46use kanidmd_lib::be::{Backend, BackendConfig, BackendTransaction};
47use kanidmd_lib::idm::ldap::LdapServer;
48use kanidmd_lib::prelude::*;
49use kanidmd_lib::schema::Schema;
50use kanidmd_lib::status::StatusActor;
51use kanidmd_lib::value::CredentialType;
52#[cfg(not(target_family = "windows"))]
53use libc::umask;
54use std::fmt::{Display, Formatter};
55use std::path::Path;
56use std::sync::Arc;
57use tokio::sync::broadcast;
58use tokio::sync::mpsc;
59use tokio::sync::Notify;
60use tokio::task;
61
62fn setup_backend(config: &Configuration, schema: &Schema) -> Result<Backend, OperationError> {
65 setup_backend_vacuum(config, schema, false)
66}
67
68fn setup_backend_vacuum(
69 config: &Configuration,
70 schema: &Schema,
71 vacuum: bool,
72) -> Result<Backend, OperationError> {
73 let schema_txn = schema.write();
76 let idxmeta = schema_txn.reload_idxmeta();
77
78 let pool_size: u32 = config.threads as u32;
79
80 let cfg = BackendConfig::new(
81 config.db_path.as_deref(),
82 pool_size,
83 config.db_fs_type.unwrap_or_default(),
84 config.db_arc_size,
85 );
86
87 Backend::new(cfg, idxmeta, vacuum)
88}
89
90async fn setup_qs_idms(
95 be: Backend,
96 schema: Schema,
97 config: &Configuration,
98) -> Result<(QueryServer, IdmServer, IdmServerDelayed, IdmServerAudit), OperationError> {
99 let curtime = duration_from_epoch_now();
100 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
102
103 query_server
112 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
113 .await?;
114
115 let is_integration_test = config.integration_test_config.is_some();
117 let (idms, idms_delayed, idms_audit) = IdmServer::new(
118 query_server.clone(),
119 &config.origin,
120 is_integration_test,
121 curtime,
122 )
123 .await?;
124
125 Ok((query_server, idms, idms_delayed, idms_audit))
126}
127
128async fn setup_qs(
129 be: Backend,
130 schema: Schema,
131 config: &Configuration,
132) -> Result<QueryServer, OperationError> {
133 let curtime = duration_from_epoch_now();
134 let query_server = QueryServer::new(be, schema, config.domain.clone(), curtime)?;
136
137 query_server
146 .initialise_helper(curtime, DOMAIN_TGT_LEVEL)
147 .await?;
148
149 Ok(query_server)
150}
151
152macro_rules! dbscan_setup_be {
153 (
154 $config:expr
155 ) => {{
156 let schema = match Schema::new() {
157 Ok(s) => s,
158 Err(e) => {
159 error!("Failed to setup in memory schema: {:?}", e);
160 std::process::exit(1);
161 }
162 };
163
164 match setup_backend($config, &schema) {
165 Ok(be) => be,
166 Err(e) => {
167 error!("Failed to setup BE: {:?}", e);
168 return;
169 }
170 }
171 }};
172}
173
174pub fn dbscan_list_indexes_core(config: &Configuration) {
175 let be = dbscan_setup_be!(config);
176 let mut be_rotxn = match be.read() {
177 Ok(txn) => txn,
178 Err(err) => {
179 error!(?err, "Unable to proceed, backend read transaction failure.");
180 return;
181 }
182 };
183
184 match be_rotxn.list_indexes() {
185 Ok(mut idx_list) => {
186 idx_list.sort_unstable();
187 idx_list.iter().for_each(|idx_name| {
188 println!("{}", idx_name);
189 })
190 }
191 Err(e) => {
192 error!("Failed to retrieve index list: {:?}", e);
193 }
194 };
195}
196
197pub fn dbscan_list_id2entry_core(config: &Configuration) {
198 let be = dbscan_setup_be!(config);
199 let mut be_rotxn = match be.read() {
200 Ok(txn) => txn,
201 Err(err) => {
202 error!(?err, "Unable to proceed, backend read transaction failure.");
203 return;
204 }
205 };
206
207 match be_rotxn.list_id2entry() {
208 Ok(mut id_list) => {
209 id_list.sort_unstable_by_key(|k| k.0);
210 id_list.iter().for_each(|(id, value)| {
211 println!("{:>8}: {}", id, value);
212 })
213 }
214 Err(e) => {
215 error!("Failed to retrieve id2entry list: {:?}", e);
216 }
217 };
218}
219
220pub fn dbscan_list_index_analysis_core(config: &Configuration) {
221 let _be = dbscan_setup_be!(config);
222 }
224
225pub fn dbscan_list_index_core(config: &Configuration, index_name: &str) {
226 let be = dbscan_setup_be!(config);
227 let mut be_rotxn = match be.read() {
228 Ok(txn) => txn,
229 Err(err) => {
230 error!(?err, "Unable to proceed, backend read transaction failure.");
231 return;
232 }
233 };
234
235 match be_rotxn.list_index_content(index_name) {
236 Ok(mut idx_list) => {
237 idx_list.sort_unstable_by(|a, b| a.0.cmp(&b.0));
238 idx_list.iter().for_each(|(key, value)| {
239 println!("{:>50}: {:?}", key, value);
240 })
241 }
242 Err(e) => {
243 error!("Failed to retrieve index list: {:?}", e);
244 }
245 };
246}
247
248pub fn dbscan_get_id2entry_core(config: &Configuration, id: u64) {
249 let be = dbscan_setup_be!(config);
250 let mut be_rotxn = match be.read() {
251 Ok(txn) => txn,
252 Err(err) => {
253 error!(?err, "Unable to proceed, backend read transaction failure.");
254 return;
255 }
256 };
257
258 match be_rotxn.get_id2entry(id) {
259 Ok((id, value)) => println!("{:>8}: {}", id, value),
260 Err(e) => {
261 error!("Failed to retrieve id2entry value: {:?}", e);
262 }
263 };
264}
265
266pub fn dbscan_quarantine_id2entry_core(config: &Configuration, id: u64) {
267 let be = dbscan_setup_be!(config);
268 let mut be_wrtxn = match be.write() {
269 Ok(txn) => txn,
270 Err(err) => {
271 error!(
272 ?err,
273 "Unable to proceed, backend write transaction failure."
274 );
275 return;
276 }
277 };
278
279 match be_wrtxn
280 .quarantine_entry(id)
281 .and_then(|_| be_wrtxn.commit())
282 {
283 Ok(()) => {
284 println!("quarantined - {:>8}", id)
285 }
286 Err(e) => {
287 error!("Failed to quarantine id2entry value: {:?}", e);
288 }
289 };
290}
291
292pub fn dbscan_list_quarantined_core(config: &Configuration) {
293 let be = dbscan_setup_be!(config);
294 let mut be_rotxn = match be.read() {
295 Ok(txn) => txn,
296 Err(err) => {
297 error!(?err, "Unable to proceed, backend read transaction failure.");
298 return;
299 }
300 };
301
302 match be_rotxn.list_quarantined() {
303 Ok(mut id_list) => {
304 id_list.sort_unstable_by_key(|k| k.0);
305 id_list.iter().for_each(|(id, value)| {
306 println!("{:>8}: {}", id, value);
307 })
308 }
309 Err(e) => {
310 error!("Failed to retrieve id2entry list: {:?}", e);
311 }
312 };
313}
314
315pub fn dbscan_restore_quarantined_core(config: &Configuration, id: u64) {
316 let be = dbscan_setup_be!(config);
317 let mut be_wrtxn = match be.write() {
318 Ok(txn) => txn,
319 Err(err) => {
320 error!(
321 ?err,
322 "Unable to proceed, backend write transaction failure."
323 );
324 return;
325 }
326 };
327
328 match be_wrtxn
329 .restore_quarantined(id)
330 .and_then(|_| be_wrtxn.commit())
331 {
332 Ok(()) => {
333 println!("restored - {:>8}", id)
334 }
335 Err(e) => {
336 error!("Failed to restore quarantined id2entry value: {:?}", e);
337 }
338 };
339}
340
341pub fn backup_server_core(config: &Configuration, dst_path: &Path) {
342 let schema = match Schema::new() {
343 Ok(s) => s,
344 Err(e) => {
345 error!("Failed to setup in memory schema: {:?}", e);
346 std::process::exit(1);
347 }
348 };
349
350 let be = match setup_backend(config, &schema) {
351 Ok(be) => be,
352 Err(e) => {
353 error!("Failed to setup BE: {:?}", e);
354 return;
355 }
356 };
357
358 let mut be_ro_txn = match be.read() {
359 Ok(txn) => txn,
360 Err(err) => {
361 error!(?err, "Unable to proceed, backend read transaction failure.");
362 return;
363 }
364 };
365
366 let r = be_ro_txn.backup(dst_path);
367 match r {
368 Ok(_) => info!("Backup success!"),
369 Err(e) => {
370 error!("Backup failed: {:?}", e);
371 std::process::exit(1);
372 }
373 };
374 }
376
377pub async fn restore_server_core(config: &Configuration, dst_path: &Path) {
378 if let Some(db_path) = config.db_path.as_ref() {
380 touch_file_or_quit(db_path);
381 }
382
383 let schema = match Schema::new() {
385 Ok(s) => s,
386 Err(e) => {
387 error!("Failed to setup in memory schema: {:?}", e);
388 std::process::exit(1);
389 }
390 };
391
392 let be = match setup_backend(config, &schema) {
393 Ok(be) => be,
394 Err(e) => {
395 error!("Failed to setup backend: {:?}", e);
396 return;
397 }
398 };
399
400 let mut be_wr_txn = match be.write() {
401 Ok(txn) => txn,
402 Err(err) => {
403 error!(
404 ?err,
405 "Unable to proceed, backend write transaction failure."
406 );
407 return;
408 }
409 };
410 let r = be_wr_txn.restore(dst_path).and_then(|_| be_wr_txn.commit());
411
412 if r.is_err() {
413 error!("Failed to restore database: {:?}", r);
414 std::process::exit(1);
415 }
416 info!("Database loaded successfully");
417
418 reindex_inner(be, schema, config).await;
419
420 info!("✅ Restore Success!");
421}
422
423pub async fn reindex_server_core(config: &Configuration) {
424 info!("Start Index Phase 1 ...");
425 let schema = match Schema::new() {
427 Ok(s) => s,
428 Err(e) => {
429 error!("Failed to setup in memory schema: {:?}", e);
430 std::process::exit(1);
431 }
432 };
433
434 let be = match setup_backend(config, &schema) {
435 Ok(be) => be,
436 Err(e) => {
437 error!("Failed to setup BE: {:?}", e);
438 return;
439 }
440 };
441
442 reindex_inner(be, schema, config).await;
443
444 info!("✅ Reindex Success!");
445}
446
447async fn reindex_inner(be: Backend, schema: Schema, config: &Configuration) {
448 let mut be_wr_txn = match be.write() {
450 Ok(txn) => txn,
451 Err(err) => {
452 error!(
453 ?err,
454 "Unable to proceed, backend write transaction failure."
455 );
456 return;
457 }
458 };
459
460 let r = be_wr_txn.reindex(true).and_then(|_| be_wr_txn.commit());
461
462 if r.is_err() {
464 error!("Failed to reindex database: {:?}", r);
465 std::process::exit(1);
466 }
467 info!("Index Phase 1 Success!");
468
469 info!("Attempting to init query server ...");
470
471 let (qs, _idms, _idms_delayed, _idms_audit) = match setup_qs_idms(be, schema, config).await {
472 Ok(t) => t,
473 Err(e) => {
474 error!("Unable to setup query server or idm server -> {:?}", e);
475 return;
476 }
477 };
478 info!("Init Query Server Success!");
479
480 info!("Start Index Phase 2 ...");
481
482 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
483 error!("Unable to acquire write transaction");
484 return;
485 };
486 let r = qs_write.reindex(true).and_then(|_| qs_write.commit());
487
488 match r {
489 Ok(_) => info!("Index Phase 2 Success!"),
490 Err(e) => {
491 error!("Reindex failed: {:?}", e);
492 std::process::exit(1);
493 }
494 };
495}
496
497pub fn vacuum_server_core(config: &Configuration) {
498 let schema = match Schema::new() {
499 Ok(s) => s,
500 Err(e) => {
501 eprintln!("Failed to setup in memory schema: {:?}", e);
502 std::process::exit(1);
503 }
504 };
505
506 let r = setup_backend_vacuum(config, &schema, true);
509
510 match r {
511 Ok(_) => eprintln!("Vacuum Success!"),
512 Err(e) => {
513 eprintln!("Vacuum failed: {:?}", e);
514 std::process::exit(1);
515 }
516 };
517}
518
519pub async fn domain_rename_core(config: &Configuration) {
520 let schema = match Schema::new() {
521 Ok(s) => s,
522 Err(e) => {
523 eprintln!("Failed to setup in memory schema: {:?}", e);
524 std::process::exit(1);
525 }
526 };
527
528 let be = match setup_backend(config, &schema) {
530 Ok(be) => be,
531 Err(e) => {
532 error!("Failed to setup BE: {:?}", e);
533 return;
534 }
535 };
536
537 let qs = match setup_qs(be, schema, config).await {
539 Ok(t) => t,
540 Err(e) => {
541 error!("Unable to setup query server -> {:?}", e);
542 return;
543 }
544 };
545
546 let new_domain_name = config.domain.as_str();
547
548 match qs.read().await.map(|qs| qs.get_domain_name().to_string()) {
550 Ok(old_domain_name) => {
551 admin_info!(?old_domain_name, ?new_domain_name);
552 if old_domain_name == new_domain_name {
553 admin_info!("Domain name not changing, stopping.");
554 return;
555 }
556 admin_debug!(
557 "Domain name is changing from {:?} to {:?}",
558 old_domain_name,
559 new_domain_name
560 );
561 }
562 Err(e) => {
563 admin_error!("Failed to query domain name, quitting! -> {:?}", e);
564 return;
565 }
566 }
567
568 let Ok(mut qs_write) = qs.write(duration_from_epoch_now()).await else {
569 error!("Unable to acquire write transaction");
570 return;
571 };
572 let r = qs_write
573 .danger_domain_rename(new_domain_name)
574 .and_then(|_| qs_write.commit());
575
576 match r {
577 Ok(_) => info!("Domain Rename Success!"),
578 Err(e) => {
579 error!("Domain Rename Failed - Rollback has occurred: {:?}", e);
580 std::process::exit(1);
581 }
582 };
583}
584
585pub async fn verify_server_core(config: &Configuration) {
586 let curtime = duration_from_epoch_now();
587 let schema_mem = match Schema::new() {
589 Ok(sc) => sc,
590 Err(e) => {
591 error!("Failed to setup in memory schema: {:?}", e);
592 return;
593 }
594 };
595 let be = match setup_backend(config, &schema_mem) {
597 Ok(be) => be,
598 Err(e) => {
599 error!("Failed to setup BE: {:?}", e);
600 return;
601 }
602 };
603
604 let server = match QueryServer::new(be, schema_mem, config.domain.clone(), curtime) {
605 Ok(qs) => qs,
606 Err(err) => {
607 error!(?err, "Failed to setup query server");
608 return;
609 }
610 };
611
612 let r = server.verify().await;
614
615 if r.is_empty() {
616 eprintln!("Verification passed!");
617 std::process::exit(0);
618 } else {
619 for er in r {
620 error!("{:?}", er);
621 }
622 std::process::exit(1);
623 }
624
625 }
627
628pub fn cert_generate_core(config: &Configuration) {
629 let (tls_key_path, tls_chain_path) = match &config.tls_config {
632 Some(tls_config) => (tls_config.key.as_path(), tls_config.chain.as_path()),
633 None => {
634 error!("Unable to find TLS configuration");
635 std::process::exit(1);
636 }
637 };
638
639 if tls_key_path.exists() && tls_chain_path.exists() {
640 info!(
641 "TLS key and chain already exist - remove them first if you intend to regenerate these"
642 );
643 return;
644 }
645
646 let origin = match Url::parse(&config.origin) {
647 Ok(url) => url,
648 Err(e) => {
649 error!(err = ?e, "Unable to parse origin URL - refusing to start. You must correct the value for origin. {:?}", config.origin);
650 std::process::exit(1);
651 }
652 };
653
654 let origin_domain = if let Some(d) = origin.domain() {
655 d
656 } else {
657 error!("origin does not contain a valid domain");
658 std::process::exit(1);
659 };
660
661 let cert_root = match tls_key_path.parent() {
662 Some(parent) => parent,
663 None => {
664 error!("Unable to find parent directory of {:?}", tls_key_path);
665 std::process::exit(1);
666 }
667 };
668
669 let ca_cert = cert_root.join("ca.pem");
670 let ca_key = cert_root.join("cakey.pem");
671 let tls_cert_path = cert_root.join("cert.pem");
672
673 let ca_handle = if !ca_cert.exists() || !ca_key.exists() {
674 let ca_handle = match crypto::build_ca(None) {
676 Ok(ca_handle) => ca_handle,
677 Err(e) => {
678 error!(err = ?e, "Failed to build CA");
679 std::process::exit(1);
680 }
681 };
682
683 if crypto::write_ca(ca_key, ca_cert, &ca_handle).is_err() {
684 error!("Failed to write CA");
685 std::process::exit(1);
686 }
687
688 ca_handle
689 } else {
690 match crypto::load_ca(ca_key, ca_cert) {
691 Ok(ca_handle) => ca_handle,
692 Err(_) => {
693 error!("Failed to load CA");
694 std::process::exit(1);
695 }
696 }
697 };
698
699 if !tls_key_path.exists() || !tls_chain_path.exists() || !tls_cert_path.exists() {
700 let cert_handle = match crypto::build_cert(origin_domain, &ca_handle, None, None) {
702 Ok(cert_handle) => cert_handle,
703 Err(e) => {
704 error!(err = ?e, "Failed to build certificate");
705 std::process::exit(1);
706 }
707 };
708
709 if crypto::write_cert(tls_key_path, tls_chain_path, tls_cert_path, &cert_handle).is_err() {
710 error!("Failed to write certificates");
711 std::process::exit(1);
712 }
713 }
714 info!("certificate generation complete");
715}
716
717#[derive(Clone, Debug)]
718pub enum CoreAction {
719 Shutdown,
720}
721
722pub(crate) enum TaskName {
723 AdminSocket,
724 AuditdActor,
725 BackupActor,
726 DelayedActionActor,
727 HttpsServer,
728 IntervalActor,
729 LdapActor,
730 Replication,
731 TlsAcceptorReload,
732}
733
734impl Display for TaskName {
735 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
736 write!(
737 f,
738 "{}",
739 match self {
740 TaskName::AdminSocket => "Admin Socket",
741 TaskName::AuditdActor => "Auditd Actor",
742 TaskName::BackupActor => "Backup Actor",
743 TaskName::DelayedActionActor => "Delayed Action Actor",
744 TaskName::HttpsServer => "HTTPS Server",
745 TaskName::IntervalActor => "Interval Actor",
746 TaskName::LdapActor => "LDAP Acceptor Actor",
747 TaskName::Replication => "Replication",
748 TaskName::TlsAcceptorReload => "TlsAcceptor Reload Monitor",
749 }
750 )
751 }
752}
753
754pub struct CoreHandle {
755 clean_shutdown: bool,
756 tx: broadcast::Sender<CoreAction>,
757 tls_acceptor_reload_notify: Arc<Notify>,
758 handles: Vec<(TaskName, task::JoinHandle<()>)>,
760}
761
762impl CoreHandle {
763 pub fn subscribe(&mut self) -> broadcast::Receiver<CoreAction> {
764 self.tx.subscribe()
765 }
766
767 pub async fn shutdown(&mut self) {
768 if self.tx.send(CoreAction::Shutdown).is_err() {
769 eprintln!("No receivers acked shutdown request. Treating as unclean.");
770 return;
771 }
772
773 while let Some((handle_name, handle)) = self.handles.pop() {
775 if let Err(error) = handle.await {
776 eprintln!("Task {} failed to finish: {:?}", handle_name, error);
777 }
778 }
779
780 self.clean_shutdown = true;
781 }
782
783 pub async fn tls_acceptor_reload(&mut self) {
784 self.tls_acceptor_reload_notify.notify_one()
785 }
786}
787
788impl Drop for CoreHandle {
789 fn drop(&mut self) {
790 if !self.clean_shutdown {
791 eprintln!("⚠️ UNCLEAN SHUTDOWN OCCURRED ⚠️ ");
792 }
793 }
796}
797
798pub async fn create_server_core(
799 config: Configuration,
800 config_test: bool,
801) -> Result<CoreHandle, ()> {
802 let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
804
805 if config.integration_test_config.is_some() {
806 warn!("RUNNING IN INTEGRATION TEST MODE.");
807 warn!("IF YOU SEE THIS IN PRODUCTION YOU MUST CONTACT SUPPORT IMMEDIATELY.");
808 } else if config.tls_config.is_none() {
809 error!("Running without TLS is not supported! Quitting!");
811 return Err(());
812 }
813
814 info!(
815 "Starting kanidm with {}configuration: {}",
816 if config_test { "TEST " } else { "" },
817 config
818 );
819 #[cfg(not(target_family = "windows"))]
821 unsafe {
822 umask(0o0027)
823 };
824
825 let status_ref = StatusActor::start();
828
829 let maybe_tls_acceptor = match crypto::setup_tls(&config.tls_config) {
831 Ok(tls_acc) => tls_acc,
832 Err(err) => {
833 error!(?err, "Failed to configure TLS acceptor");
834 return Err(());
835 }
836 };
837
838 let schema = match Schema::new() {
839 Ok(s) => s,
840 Err(e) => {
841 error!("Failed to setup in memory schema: {:?}", e);
842 return Err(());
843 }
844 };
845
846 let be = match setup_backend(&config, &schema) {
848 Ok(be) => be,
849 Err(e) => {
850 error!("Failed to setup BE -> {:?}", e);
851 return Err(());
852 }
853 };
854 let (_qs, idms, mut idms_delayed, mut idms_audit) =
856 match setup_qs_idms(be, schema, &config).await {
857 Ok(t) => t,
858 Err(e) => {
859 error!("Unable to setup query server or idm server -> {:?}", e);
860 return Err(());
861 }
862 };
863
864 let jws_signer = match JwsHs256Signer::generate_hs256() {
867 Ok(k) => k.set_sign_option_embed_kid(false),
868 Err(e) => {
869 error!("Unable to setup jws signer -> {:?}", e);
870 return Err(());
871 }
872 };
873
874 if let Some(itc) = &config.integration_test_config {
876 let Ok(mut idms_prox_write) = idms.proxy_write(duration_from_epoch_now()).await else {
877 error!("Unable to acquire write transaction");
878 return Err(());
879 };
880 match idms_prox_write.recover_account(&itc.admin_user, Some(&itc.admin_password)) {
882 Ok(_) => {}
883 Err(e) => {
884 error!(
885 "Unable to configure INTEGRATION TEST {} account -> {:?}",
886 &itc.admin_user, e
887 );
888 return Err(());
889 }
890 };
891 match idms_prox_write.recover_account(&itc.idm_admin_user, Some(&itc.idm_admin_password)) {
893 Ok(_) => {}
894 Err(e) => {
895 error!(
896 "Unable to configure INTEGRATION TEST {} account -> {:?}",
897 &itc.idm_admin_user, e
898 );
899 return Err(());
900 }
901 };
902
903 match idms_prox_write.qs_write.internal_modify_uuid(
907 UUID_IDM_ADMINS,
908 &ModifyList::new_append(Attribute::Member, Value::Refer(UUID_ADMIN)),
909 ) {
910 Ok(_) => {}
911 Err(e) => {
912 error!(
913 "Unable to configure INTEGRATION TEST admin as member of idm_admins -> {:?}",
914 e
915 );
916 return Err(());
917 }
918 };
919
920 match idms_prox_write.qs_write.internal_modify_uuid(
921 UUID_IDM_ALL_PERSONS,
922 &ModifyList::new_purge_and_set(
923 Attribute::CredentialTypeMinimum,
924 CredentialType::Any.into(),
925 ),
926 ) {
927 Ok(_) => {}
928 Err(e) => {
929 error!(
930 "Unable to configure INTEGRATION TEST default credential policy -> {:?}",
931 e
932 );
933 return Err(());
934 }
935 };
936
937 match idms_prox_write.commit() {
938 Ok(_) => {}
939 Err(e) => {
940 error!("Unable to commit INTEGRATION TEST setup -> {:?}", e);
941 return Err(());
942 }
943 }
944 }
945
946 let ldap = match LdapServer::new(&idms).await {
947 Ok(l) => l,
948 Err(e) => {
949 error!("Unable to start LdapServer -> {:?}", e);
950 return Err(());
951 }
952 };
953
954 let idms_arc = Arc::new(idms);
956 let ldap_arc = Arc::new(ldap);
957
958 let server_read_ref = QueryServerReadV1::start_static(idms_arc.clone(), ldap_arc.clone());
961
962 let server_write_ref = QueryServerWriteV1::start_static(idms_arc.clone());
964
965 let delayed_handle = task::spawn(async move {
966 let mut buffer = Vec::with_capacity(DELAYED_ACTION_BATCH_SIZE);
967 loop {
968 tokio::select! {
969 added = idms_delayed.recv_many(&mut buffer) => {
970 if added == 0 {
971 break
973 }
974 server_write_ref.handle_delayedaction(&mut buffer).await;
975 }
976 Ok(action) = broadcast_rx.recv() => {
977 match action {
978 CoreAction::Shutdown => break,
979 }
980 }
981 }
982 }
983 info!("Stopped {}", TaskName::DelayedActionActor);
984 });
985
986 let mut broadcast_rx = broadcast_tx.subscribe();
987
988 let auditd_handle = task::spawn(async move {
989 loop {
990 tokio::select! {
991 Ok(action) = broadcast_rx.recv() => {
992 match action {
993 CoreAction::Shutdown => break,
994 }
995 }
996 audit_event = idms_audit.audit_rx().recv() => {
997 match serde_json::to_string(&audit_event) {
998 Ok(audit_event) => {
999 warn!(%audit_event);
1000 }
1001 Err(e) => {
1002 error!(err=?e, "Unable to process audit event to json.");
1003 warn!(?audit_event, json=false);
1004 }
1005 }
1006
1007 }
1008 }
1009 }
1010 info!("Stopped {}", TaskName::AuditdActor);
1011 });
1012
1013 let mut broadcast_rx = broadcast_tx.subscribe();
1016 let tls_acceptor_reload_notify = Arc::new(Notify::new());
1017 let tls_accepter_reload_task_notify = tls_acceptor_reload_notify.clone();
1018 let tls_config = config.tls_config.clone();
1019
1020 let ldap_configured = config.ldapbindaddress.is_some();
1021 let (ldap_tls_acceptor_reload_tx, ldap_tls_acceptor_reload_rx) = mpsc::channel(1);
1022 let (http_tls_acceptor_reload_tx, http_tls_acceptor_reload_rx) = mpsc::channel(1);
1023
1024 let tls_acceptor_reload_handle = task::spawn(async move {
1025 loop {
1026 tokio::select! {
1027 Ok(action) = broadcast_rx.recv() => {
1028 match action {
1029 CoreAction::Shutdown => break,
1030 }
1031 }
1032 _ = tls_accepter_reload_task_notify.notified() => {
1033 let tls_acceptor = match crypto::setup_tls(&tls_config) {
1034 Ok(Some(tls_acc)) => tls_acc,
1035 Ok(None) => {
1036 warn!("TLS not configured, ignoring reload request.");
1037 continue;
1038 }
1039 Err(err) => {
1040 error!(?err, "Failed to configure and reload TLS acceptor");
1041 continue;
1042 }
1043 };
1044
1045 if ldap_configured &&
1048 ldap_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1049 error!("ldap tls acceptor did not accept the reload, the server may have failed!");
1050 };
1051 if http_tls_acceptor_reload_tx.send(tls_acceptor.clone()).await.is_err() {
1052 error!("http tls acceptor did not accept the reload, the server may have failed!");
1053 break;
1054 };
1055 }
1056 }
1057 }
1058 info!("Stopped {}", TaskName::TlsAcceptorReload);
1059 });
1060
1061 let interval_handle = IntervalActor::start(server_write_ref, broadcast_tx.subscribe());
1063 let maybe_backup_handle = match &config.online_backup {
1065 Some(online_backup_config) => {
1066 if online_backup_config.enabled {
1067 let handle = IntervalActor::start_online_backup(
1068 server_read_ref,
1069 online_backup_config,
1070 broadcast_tx.subscribe(),
1071 )?;
1072 Some(handle)
1073 } else {
1074 debug!("Backups disabled");
1075 None
1076 }
1077 }
1078 None => {
1079 debug!("Online backup not requested, skipping");
1080 None
1081 }
1082 };
1083
1084 let maybe_ldap_acceptor_handle = match &config.ldapbindaddress {
1086 Some(la) => {
1087 let opt_ldap_ssl_acceptor = maybe_tls_acceptor.clone();
1088
1089 let h = ldaps::create_ldap_server(
1090 la.as_str(),
1091 opt_ldap_ssl_acceptor,
1092 server_read_ref,
1093 broadcast_tx.subscribe(),
1094 ldap_tls_acceptor_reload_rx,
1095 config.ldap_client_address_info.trusted_proxy_v2(),
1096 )
1097 .await?;
1098 Some(h)
1099 }
1100 None => {
1101 debug!("LDAP not requested, skipping");
1102 None
1103 }
1104 };
1105
1106 let (maybe_repl_handle, maybe_repl_ctrl_tx) = match &config.repl_config {
1109 Some(rc) => {
1110 if !config_test {
1111 let (h, repl_ctrl_tx) =
1113 repl::create_repl_server(idms_arc.clone(), rc, broadcast_tx.subscribe())
1114 .await?;
1115 (Some(h), Some(repl_ctrl_tx))
1116 } else {
1117 (None, None)
1118 }
1119 }
1120 None => {
1121 debug!("Replication not requested, skipping");
1122 (None, None)
1123 }
1124 };
1125
1126 let maybe_http_acceptor_handle = if config_test {
1127 admin_info!("This config rocks! 🪨 ");
1128 None
1129 } else {
1130 let h: task::JoinHandle<()> = match https::create_https_server(
1131 config.clone(),
1132 jws_signer,
1133 status_ref,
1134 server_write_ref,
1135 server_read_ref,
1136 broadcast_tx.clone(),
1137 maybe_tls_acceptor,
1138 http_tls_acceptor_reload_rx,
1139 )
1140 .await
1141 {
1142 Ok(h) => h,
1143 Err(e) => {
1144 error!("Failed to start HTTPS server -> {:?}", e);
1145 return Err(());
1146 }
1147 };
1148 if config.role != ServerRole::WriteReplicaNoUI {
1149 admin_info!("ready to rock! 🪨 UI available at: {}", config.origin);
1150 } else {
1151 admin_info!("ready to rock! 🪨 ");
1152 }
1153 Some(h)
1154 };
1155
1156 let maybe_admin_sock_handle = if config.integration_test_config.is_none() {
1158 let broadcast_rx = broadcast_tx.subscribe();
1159
1160 let admin_handle = AdminActor::create_admin_sock(
1161 config.adminbindpath.as_str(),
1162 server_write_ref,
1163 server_read_ref,
1164 broadcast_rx,
1165 maybe_repl_ctrl_tx,
1166 )
1167 .await?;
1168
1169 Some(admin_handle)
1170 } else {
1171 None
1172 };
1173
1174 let mut handles: Vec<(TaskName, task::JoinHandle<()>)> = vec![
1175 (TaskName::IntervalActor, interval_handle),
1176 (TaskName::DelayedActionActor, delayed_handle),
1177 (TaskName::AuditdActor, auditd_handle),
1178 (TaskName::TlsAcceptorReload, tls_acceptor_reload_handle),
1179 ];
1180
1181 if let Some(backup_handle) = maybe_backup_handle {
1182 handles.push((TaskName::BackupActor, backup_handle))
1183 }
1184
1185 if let Some(admin_sock_handle) = maybe_admin_sock_handle {
1186 handles.push((TaskName::AdminSocket, admin_sock_handle))
1187 }
1188
1189 if let Some(ldap_handle) = maybe_ldap_acceptor_handle {
1190 handles.push((TaskName::LdapActor, ldap_handle))
1191 }
1192
1193 if let Some(http_handle) = maybe_http_acceptor_handle {
1194 handles.push((TaskName::HttpsServer, http_handle))
1195 }
1196
1197 if let Some(repl_handle) = maybe_repl_handle {
1198 handles.push((TaskName::Replication, repl_handle))
1199 }
1200
1201 Ok(CoreHandle {
1202 clean_shutdown: false,
1203 tls_acceptor_reload_notify,
1204 tx: broadcast_tx,
1205 handles,
1206 })
1207}