1use super::proto::*;
2use crate::plugins::Plugins;
3use crate::prelude::*;
4use crate::server::{ChangeFlag, ServerPhase};
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::Arc;
7
8impl QueryServerWriteTransaction<'_> {
9 fn consumer_incremental_apply_entries(
12 &mut self,
13 ctx_entries: Vec<ReplIncrementalEntryV1>,
14 ) -> Result<bool, OperationError> {
15 if ctx_entries.is_empty() {
19 debug!("No entries to act upon");
20 return Ok(false);
21 }
22
23 let ctx_entries: Vec<_> = ctx_entries.into_iter().map(
34 EntryIncrementalNew::rehydrate
35 )
36 .collect::<Result<Vec<_>, _>>()
37 .inspect_err(|err| {
38 error!(?err, "Unable to process replication incremental entries to valid entry states for replication");
39 })?;
40
41 trace!(?ctx_entries);
42
43 let db_entries = self
44 .be_txn
45 .incremental_prepare(&ctx_entries)
46 .inspect_err(|err| {
47 error!(?err, "Failed to access entries from db");
48 })?;
49
50 trace!(?db_entries);
51
52 let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
60 .iter()
61 .zip(db_entries)
62 .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
63
64 debug!(conflicts = %conflicts.len(), proceed = %proceed.len());
65
66 let (conflict_create, conflict_update): (
72 Vec<Option<EntrySealedNew>>,
73 Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
74 ) = conflicts
75 .into_iter()
76 .map(
77 |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
78 let (opt_create, ent) =
79 ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
80 (opt_create, (ent, db_ent))
81 },
82 )
83 .unzip();
84
85 let mut conflict_uuids: BTreeSet<_> = conflict_update
106 .iter()
107 .filter_map(|(_, e)| {
108 let u = e.get_uuid();
109 if u >= DYNAMIC_RANGE_MINIMUM_UUID {
110 Some(u)
112 } else {
113 None
115 }
116 })
117 .collect();
118
119 let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
121
122 let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
123 .into_iter()
124 .map(|(ctx_ent, db_ent)| {
125 let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema, self.trim_cid());
130 (merge_ent, db_ent)
131 })
132 .collect();
133
134 let mut all_updates = conflict_update
139 .into_iter()
140 .chain(proceed_update)
141 .collect::<Vec<_>>();
142
143 Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
155 admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
156 e
157 })?;
158
159 let all_updates_valid = all_updates
163 .into_iter()
164 .map(|(ctx_ent, db_ent)| {
165 let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
172 (sealed_ent, db_ent)
173 })
174 .collect::<Vec<_>>();
175
176 self.be_txn
188 .incremental_apply(&all_updates_valid, conflict_create)
189 .map_err(|e| {
190 admin_error!("betxn create failure {:?}", e);
191 e
192 })?;
193
194 Plugins::run_post_repl_incremental_conflict(
195 self,
196 all_updates_valid.as_slice(),
197 &mut conflict_uuids,
198 )
199 .map_err(|e| {
200 error!(
201 "Operation failed (post_repl_incremental_conflict plugin), {:?}",
202 e
203 );
204 e
205 })?;
206
207 let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
210 .into_iter()
211 .unzip();
224
225 Plugins::run_post_repl_incremental(
231 self,
232 pre_cand.as_slice(),
233 cand.as_slice(),
234 &conflict_uuids,
235 )
236 .map_err(|e| {
237 error!("Operation failed (post_repl_incremental plugin), {:?}", e);
238 e
239 })?;
240
241 self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));
242
243 if !self.changed_flags.contains(ChangeFlag::ACP)
244 && cand
245 .iter()
246 .chain(pre_cand.iter().map(|e| e.as_ref()))
247 .any(|e| {
248 e.attribute_equality(Attribute::Class, &EntryClass::AccessControlProfile.into())
249 })
250 {
251 self.changed_flags.insert(ChangeFlag::ACP)
252 }
253
254 if !self.changed_flags.contains(ChangeFlag::OAUTH2)
255 && cand
256 .iter()
257 .chain(pre_cand.iter().map(|e| e.as_ref()))
258 .any(|e| {
259 e.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into())
260 })
261 {
262 self.changed_flags.insert(ChangeFlag::OAUTH2)
263 }
264
265 if !self.changed_flags.contains(ChangeFlag::OAUTH2_CLIENT)
266 && cand
267 .iter()
268 .chain(pre_cand.iter().map(|e| e.as_ref()))
269 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::OAuth2Client.into()))
270 {
271 self.changed_flags.insert(ChangeFlag::OAUTH2_CLIENT)
272 }
273
274 if !self.changed_flags.contains(ChangeFlag::FEATURE)
275 && cand
276 .iter()
277 .chain(pre_cand.iter().map(|e| e.as_ref()))
278 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Feature.into()))
279 {
280 self.changed_flags.insert(ChangeFlag::FEATURE)
281 }
282
283 if !self.changed_flags.contains(ChangeFlag::APPLICATION)
284 && cand
285 .iter()
286 .chain(pre_cand.iter().map(|e| e.as_ref()))
287 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Application.into()))
288 {
289 self.changed_flags.insert(ChangeFlag::APPLICATION)
290 }
291
292 if !self.changed_flags.contains(ChangeFlag::SYNC_AGREEMENT)
293 && cand
294 .iter()
295 .chain(pre_cand.iter().map(|e| e.as_ref()))
296 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::SyncAccount.into()))
297 {
298 self.changed_flags.insert(ChangeFlag::SYNC_AGREEMENT)
299 }
300
301 if !self.changed_flags.contains(ChangeFlag::KEY_MATERIAL)
302 && cand
303 .iter()
304 .chain(pre_cand.iter().map(|e| e.as_ref()))
305 .any(|e| {
306 e.attribute_equality(Attribute::Class, &EntryClass::KeyProvider.into())
307 || e.attribute_equality(Attribute::Class, &EntryClass::KeyObject.into())
308 })
309 {
310 self.changed_flags.insert(ChangeFlag::KEY_MATERIAL)
311 }
312
313 trace!(
314 changed = ?self.changed_flags.iter_names().collect::<Vec<_>>(),
315 );
316
317 Ok(true)
318 }
319
320 pub fn consumer_apply_changes(
321 &mut self,
322 ctx: ReplIncrementalContext,
323 ) -> Result<ConsumerState, OperationError> {
324 match ctx {
325 ReplIncrementalContext::DomainMismatch => {
326 error!("Unable to proceed with consumer incremental - the supplier has indicated that our domain_uuid's are not equivalent. This can occur when adding a new consumer to an existing topology.");
327 error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
328 Ok(ConsumerState::RefreshRequired)
329 }
330 ReplIncrementalContext::NoChangesAvailable => {
331 debug!("no changes are available");
332 Ok(ConsumerState::Ok)
333 }
334 ReplIncrementalContext::RefreshRequired => {
335 error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
336 error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
337 Ok(ConsumerState::RefreshRequired)
338 }
339 ReplIncrementalContext::UnwillingToSupply => {
340 warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
341 error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
342 Ok(ConsumerState::Ok)
343 }
344 ReplIncrementalContext::V1 {
345 domain_version,
346 domain_patch_level,
347 domain_uuid,
348 ranges,
349 schema_entries,
350 meta_entries,
351 entries,
352 } => self.consumer_apply_changes_v1(
353 domain_version,
354 domain_patch_level,
355 domain_uuid,
356 &ranges,
357 schema_entries,
358 meta_entries,
359 entries,
360 ),
361 }
362 }
363
364 #[instrument(level = "debug", skip_all)]
365 fn consumer_apply_changes_v1(
366 &mut self,
367 ctx_domain_version: DomainVersion,
368 ctx_domain_patch_level: u32,
369 ctx_domain_uuid: Uuid,
370 ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
371 ctx_schema_entries: Vec<ReplIncrementalEntryV1>,
372 ctx_meta_entries: Vec<ReplIncrementalEntryV1>,
373 ctx_entries: Vec<ReplIncrementalEntryV1>,
374 ) -> Result<ConsumerState, OperationError> {
375 if ctx_domain_version < DOMAIN_MIN_LEVEL {
376 error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
377 return Err(OperationError::ReplDomainLevelUnsatisfiable);
378 } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
379 error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
380 return Err(OperationError::ReplDomainLevelUnsatisfiable);
381 };
382
383 let domain_patch_level = if self.get_domain_development_taint() {
384 u32::MAX
385 } else {
386 self.get_domain_patch_level()
387 };
388
389 if ctx_domain_patch_level != domain_patch_level {
390 error!("Unable to proceed with consumer incremental - incoming domain patch level is not equal to our patch level. {} != {}", ctx_domain_patch_level, domain_patch_level);
391 return Err(OperationError::ReplDomainLevelUnsatisfiable);
392 };
393
394 let db_uuid = self.be_txn.get_db_d_uuid()?;
396
397 if db_uuid != ctx_domain_uuid {
398 error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
399 return Err(OperationError::ReplDomainUuidMismatch);
400 }
401
402 let txn_cid = self.get_cid().clone();
404 let ruv = self.be_txn.get_ruv_write();
405
406 let change_count = ctx_schema_entries.len() + ctx_meta_entries.len() + ctx_entries.len();
407
408 ruv.incremental_preflight_validate_ruv(ctx_ranges, &txn_cid)
409 .inspect_err(|err| {
410 error!(
411 ?err,
412 "Incoming RUV failed preflight checks, unable to proceed."
413 );
414 })?;
415
416 debug!(
418 "Proceeding to apply incremental with {change_count} changes from domain {ctx_domain_uuid:?} at level {ctx_domain_version}"
419 );
420
421 debug!(?ctx_ranges);
422
423 debug!("Applying {} schema entries", ctx_schema_entries.len());
424 let schema_changed = self
426 .consumer_incremental_apply_entries(ctx_schema_entries)
427 .inspect_err(|err| {
428 error!(?err, "Failed to apply incremental schema entries");
429 })?;
430
431 if schema_changed {
432 self.reload_schema().inspect_err(|err| {
434 error!(?err, "Failed to reload schema");
435 })?;
436 }
437
438 debug!("Applying {} meta entries", ctx_meta_entries.len());
439 let meta_changed = self
441 .consumer_incremental_apply_entries(ctx_meta_entries)
442 .inspect_err(|err| {
443 error!(?err, "Failed to apply incremental meta entries");
444 })?;
445
446 if meta_changed {
448 self.reload_domain_info().inspect_err(|err| {
449 error!(?err, "Failed to reload domain info");
450 })?;
451 self.reload_system_config().inspect_err(|err| {
452 error!(?err, "Failed to reload system configuration");
453 })?;
454 }
455
456 debug!("Applying {} context entries", ctx_entries.len());
457 self.consumer_incremental_apply_entries(ctx_entries)
459 .inspect_err(|err| {
460 error!(?err, "Failed to apply incremental meta entries");
461 })?;
462
463 if meta_changed {
470 self.reload_domain_info_version().inspect_err(|err| {
471 error!(?err, "Failed to reload domain info version");
472 })?;
473 }
474
475 let ruv = self.be_txn.get_ruv_write();
478
479 ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
480 error!(?err, "RUV ranges were not rebuilt correctly.");
481 })?;
482
483 ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
484 error!(?err, "Unable to update RUV with supplier ranges.");
485 })?;
486
487 Ok(ConsumerState::Ok)
488 }
489
490 pub fn consumer_apply_refresh(
491 &mut self,
492 ctx: ReplRefreshContext,
493 ) -> Result<(), OperationError> {
494 match ctx {
495 ReplRefreshContext::V1 {
496 domain_version,
497 domain_devel,
498 domain_uuid,
499 ranges,
500 schema_entries,
501 meta_entries,
502 entries,
503 } => self.consumer_apply_refresh_v1(
504 domain_version,
505 domain_devel,
506 domain_uuid,
507 &ranges,
508 schema_entries,
509 meta_entries,
510 entries,
511 ),
512 }
513 }
514
515 fn consumer_refresh_create_entries(
516 &mut self,
517 ctx_entries: Vec<ReplEntryV1>,
518 ) -> Result<(), OperationError> {
519 let candidates = ctx_entries
520 .into_iter()
521 .map(EntryRefreshNew::from_repl_entry_v1)
522 .collect::<Result<Vec<EntryRefreshNew>, _>>()
523 .inspect_err(|err| {
524 error!(?err, "Failed to convert entries from supplier");
525 })?;
526
527 Plugins::run_pre_repl_refresh(self, candidates.as_slice()).map_err(|e| {
528 admin_error!(
529 "Refresh operation failed (pre_repl_refresh plugin), {:?}",
530 e
531 );
532 e
533 })?;
534
535 let norm_cand = candidates
537 .into_iter()
538 .map(|e| {
539 e.validate(&self.schema)
540 .map_err(|e| {
541 admin_error!("Schema Violation in refresh validate {:?}", e);
542 OperationError::SchemaViolation(e)
543 })
544 .map(|e| {
545 e.seal(&self.schema)
547 })
548 })
549 .collect::<Result<Vec<EntrySealedNew>, _>>()?;
550
551 let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
552 admin_error!("betxn create failure {:?}", e);
553 e
554 })?;
555
556 Plugins::run_post_repl_refresh(self, &commit_cand).map_err(|e| {
557 admin_error!(
558 "Refresh operation failed (post_repl_refresh plugin), {:?}",
559 e
560 );
561 e
562 })?;
563
564 self.changed_uuid
565 .extend(commit_cand.iter().map(|e| e.get_uuid()));
566
567 Ok(())
568 }
569
570 #[instrument(level = "info", skip_all)]
571 fn consumer_apply_refresh_v1(
572 &mut self,
573 ctx_domain_version: DomainVersion,
574 ctx_domain_devel: bool,
575 ctx_domain_uuid: Uuid,
576 ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
577 ctx_schema_entries: Vec<ReplEntryV1>,
578 ctx_meta_entries: Vec<ReplEntryV1>,
579 ctx_entries: Vec<ReplEntryV1>,
580 ) -> Result<(), OperationError> {
581 let current_devel_flag = option_env!("KANIDM_PRE_RELEASE").is_some();
584
585 if ctx_domain_version < DOMAIN_MIN_LEVEL {
586 error!("Unable to proceed with consumer refresh - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
587 return Err(OperationError::ReplDomainLevelUnsatisfiable);
588 } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
589 error!("Unable to proceed with consumer refresh - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
590 return Err(OperationError::ReplDomainLevelUnsatisfiable);
591 } else if ctx_domain_devel && !current_devel_flag {
592 error!("Unable to proceed with consumer refresh - incoming domain is from a development version while this server is a stable release.");
593 return Err(OperationError::ReplDomainLevelUnsatisfiable);
594 } else if !ctx_domain_devel && current_devel_flag {
595 error!("Unable to proceed with consumer refresh - incoming domain is from a stable version while this server is a development release.");
596 return Err(OperationError::ReplDomainLevelUnsatisfiable);
597 } else {
598 debug!(
599 "Proceeding to refresh from domain at level {}",
600 ctx_domain_version
601 );
602 };
603
604 self.set_phase_bootstrap();
606
607 self.be_txn
609 .set_db_d_uuid(ctx_domain_uuid)
610 .inspect_err(|err| {
611 error!(?err, "Failed to reset domain uuid");
612 })?;
613
614 self.reset_server_uuid()?;
618
619 self.be_txn
621 .danger_delete_all_db_content()
622 .inspect_err(|err| {
623 error!(?err, "Failed to clear existing server database content");
624 })?;
625
626 self.schema.generate_in_memory().inspect_err(|err| {
628 error!(?err, "Failed to reset in memory schema to clean state");
629 })?;
630
631 self.reindex(false).inspect_err(|err| {
634 error!(?err, "Failed to reload schema");
635 })?;
636
637 self.consumer_refresh_create_entries(ctx_schema_entries)
640 .inspect_err(|err| {
641 error!(?err, "Failed to refresh schema entries");
642 })?;
643
644 self.reload_schema().inspect_err(|err| {
646 error!(?err, "Failed to reload schema");
647 })?;
648
649 self.set_phase(ServerPhase::SchemaReady);
651
652 self.reindex(false).inspect_err(|err| {
655 error!(?err, "Failed to reload schema");
656 })?;
657
658 self.consumer_refresh_create_entries(ctx_meta_entries)
660 .inspect_err(|err| {
661 error!(?err, "Failed to refresh meta entries");
662 })?;
663
664 self.reload_domain_info().inspect_err(|err| {
668 error!(?err, "Failed to reload domain info");
669 })?;
670
671 self.changed_flags.insert(
673 ChangeFlag::SCHEMA
674 | ChangeFlag::ACP
675 | ChangeFlag::OAUTH2
676 | ChangeFlag::OAUTH2_CLIENT
677 | ChangeFlag::FEATURE
678 | ChangeFlag::DOMAIN
679 | ChangeFlag::APPLICATION
680 | ChangeFlag::SYSTEM_CONFIG
681 | ChangeFlag::SYNC_AGREEMENT
682 | ChangeFlag::KEY_MATERIAL,
683 );
684
685 self.set_phase(ServerPhase::DomainInfoReady);
687
688 self.consumer_refresh_create_entries(ctx_entries)
692 .inspect_err(|err| {
693 error!(?err, "Failed to refresh schema entries");
694 })?;
695
696 let ruv = self.be_txn.get_ruv_write();
699
700 ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
701 error!(?err, "RUV ranges were not rebuilt correctly.");
702 })?;
703
704 ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
705 error!(?err, "Unable to update RUV with supplier ranges.");
706 })?;
707
708 self.set_phase(ServerPhase::Running);
710
711 Ok(())
712 }
713}