1use super::proto::*;
2use crate::plugins::Plugins;
3use crate::prelude::*;
4use crate::server::{ChangeFlag, ServerPhase};
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::Arc;
7
8impl QueryServerWriteTransaction<'_> {
9 fn consumer_incremental_apply_entries(
12 &mut self,
13 ctx_entries: Vec<ReplIncrementalEntryV1>,
14 ) -> Result<bool, OperationError> {
15 if ctx_entries.is_empty() {
19 debug!("No entries to act upon");
20 return Ok(false);
21 }
22
23 let ctx_entries: Vec<_> = ctx_entries.into_iter().map(
34 EntryIncrementalNew::rehydrate
35 )
36 .collect::<Result<Vec<_>, _>>()
37 .inspect_err(|err| {
38 error!(?err, "Unable to process replication incremental entries to valid entry states for replication");
39 })?;
40
41 trace!(?ctx_entries);
42
43 let db_entries = self
44 .be_txn
45 .incremental_prepare(&ctx_entries)
46 .inspect_err(|err| {
47 error!(?err, "Failed to access entries from db");
48 })?;
49
50 trace!(?db_entries);
51
52 let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
60 .iter()
61 .zip(db_entries)
62 .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
63
64 debug!(conflicts = %conflicts.len(), proceed = %proceed.len());
65
66 let (conflict_create, conflict_update): (
72 Vec<Option<EntrySealedNew>>,
73 Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
74 ) = conflicts
75 .into_iter()
76 .map(
77 |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
78 let (opt_create, ent) =
79 ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
80 (opt_create, (ent, db_ent))
81 },
82 )
83 .unzip();
84
85 let mut conflict_uuids: BTreeSet<_> = conflict_update
106 .iter()
107 .filter_map(|(_, e)| {
108 let u = e.get_uuid();
109 if u >= DYNAMIC_RANGE_MINIMUM_UUID {
110 Some(u)
112 } else {
113 None
115 }
116 })
117 .collect();
118
119 let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
121
122 let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
123 .into_iter()
124 .map(|(ctx_ent, db_ent)| {
125 let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema, self.trim_cid());
130 (merge_ent, db_ent)
131 })
132 .collect();
133
134 let mut all_updates = conflict_update
139 .into_iter()
140 .chain(proceed_update)
141 .collect::<Vec<_>>();
142
143 Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
155 admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
156 e
157 })?;
158
159 let all_updates_valid = all_updates
163 .into_iter()
164 .map(|(ctx_ent, db_ent)| {
165 let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
172 (sealed_ent, db_ent)
173 })
174 .collect::<Vec<_>>();
175
176 self.be_txn
188 .incremental_apply(&all_updates_valid, conflict_create)
189 .map_err(|e| {
190 admin_error!("betxn create failure {:?}", e);
191 e
192 })?;
193
194 Plugins::run_post_repl_incremental_conflict(
195 self,
196 all_updates_valid.as_slice(),
197 &mut conflict_uuids,
198 )
199 .map_err(|e| {
200 error!(
201 "Operation failed (post_repl_incremental_conflict plugin), {:?}",
202 e
203 );
204 e
205 })?;
206
207 let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
210 .into_iter()
211 .unzip();
224
225 Plugins::run_post_repl_incremental(
231 self,
232 pre_cand.as_slice(),
233 cand.as_slice(),
234 &conflict_uuids,
235 )
236 .map_err(|e| {
237 error!("Operation failed (post_repl_incremental plugin), {:?}", e);
238 e
239 })?;
240
241 self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));
242
243 if !self.changed_flags.contains(ChangeFlag::ACP)
244 && cand
245 .iter()
246 .chain(pre_cand.iter().map(|e| e.as_ref()))
247 .any(|e| {
248 e.attribute_equality(Attribute::Class, &EntryClass::AccessControlProfile.into())
249 })
250 {
251 self.changed_flags.insert(ChangeFlag::ACP)
252 }
253
254 if !self.changed_flags.contains(ChangeFlag::OAUTH2)
255 && cand
256 .iter()
257 .chain(pre_cand.iter().map(|e| e.as_ref()))
258 .any(|e| {
259 e.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into())
260 })
261 {
262 self.changed_flags.insert(ChangeFlag::OAUTH2)
263 }
264
265 if !self.changed_flags.contains(ChangeFlag::OAUTH2_CLIENT)
266 && cand
267 .iter()
268 .chain(pre_cand.iter().map(|e| e.as_ref()))
269 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::OAuth2Client.into()))
270 {
271 self.changed_flags.insert(ChangeFlag::OAUTH2_CLIENT)
272 }
273
274 if !self.changed_flags.contains(ChangeFlag::FEATURE)
275 && cand
276 .iter()
277 .chain(pre_cand.iter().map(|e| e.as_ref()))
278 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Feature.into()))
279 {
280 self.changed_flags.insert(ChangeFlag::FEATURE)
281 }
282
283 if !self.changed_flags.contains(ChangeFlag::APPLICATION)
284 && cand
285 .iter()
286 .chain(pre_cand.iter().map(|e| e.as_ref()))
287 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Application.into()))
288 {
289 self.changed_flags.insert(ChangeFlag::APPLICATION)
290 }
291
292 if !self.changed_flags.contains(ChangeFlag::SYNC_AGREEMENT)
293 && cand
294 .iter()
295 .chain(pre_cand.iter().map(|e| e.as_ref()))
296 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::SyncAccount.into()))
297 {
298 self.changed_flags.insert(ChangeFlag::SYNC_AGREEMENT)
299 }
300
301 if !self.changed_flags.contains(ChangeFlag::KEY_MATERIAL)
302 && cand
303 .iter()
304 .chain(pre_cand.iter().map(|e| e.as_ref()))
305 .any(|e| {
306 e.attribute_equality(Attribute::Class, &EntryClass::KeyProvider.into())
307 || e.attribute_equality(Attribute::Class, &EntryClass::KeyObject.into())
308 })
309 {
310 self.changed_flags.insert(ChangeFlag::KEY_MATERIAL)
311 }
312
313 trace!(
314 changed = ?self.changed_flags.iter_names().collect::<Vec<_>>(),
315 );
316
317 Ok(true)
318 }
319
320 pub fn consumer_apply_changes(
321 &mut self,
322 ctx: ReplIncrementalContext,
323 ) -> Result<ConsumerState, OperationError> {
324 match ctx {
325 ReplIncrementalContext::DomainMismatch => {
326 error!("Unable to proceed with consumer incremental - the supplier has indicated that our domain_uuid's are not equivalent. This can occur when adding a new consumer to an existing topology.");
327 error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
328 Ok(ConsumerState::RefreshRequired)
329 }
330 ReplIncrementalContext::NoChangesAvailable => {
331 debug!("no changes are available");
332 Ok(ConsumerState::Ok)
333 }
334 ReplIncrementalContext::RefreshRequired => {
335 error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
336 error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
337 Ok(ConsumerState::RefreshRequired)
338 }
339 ReplIncrementalContext::UnwillingToSupply => {
340 warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
341 error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
342 Ok(ConsumerState::Ok)
343 }
344 ReplIncrementalContext::V1 {
345 domain_version,
346 domain_patch_level,
347 domain_uuid,
348 ranges,
349 schema_entries,
350 meta_entries,
351 entries,
352 } => self.consumer_apply_changes_v1(
353 domain_version,
354 domain_patch_level,
355 domain_uuid,
356 &ranges,
357 schema_entries,
358 meta_entries,
359 entries,
360 ),
361 }
362 }
363
364 #[instrument(level = "debug", skip_all)]
365 fn consumer_apply_changes_v1(
366 &mut self,
367 ctx_domain_version: DomainVersion,
368 ctx_domain_patch_level: u32,
369 ctx_domain_uuid: Uuid,
370 ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
371 ctx_schema_entries: Vec<ReplIncrementalEntryV1>,
372 ctx_meta_entries: Vec<ReplIncrementalEntryV1>,
373 ctx_entries: Vec<ReplIncrementalEntryV1>,
374 ) -> Result<ConsumerState, OperationError> {
375 if ctx_domain_version < DOMAIN_MINIMUM_REPLICATION_LEVEL {
376 error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MINIMUM_REPLICATION_LEVEL);
377 return Err(OperationError::ReplDomainLevelUnsatisfiable);
378 } else if ctx_domain_version > DOMAIN_MAXIMUM_REPLICATION_LEVEL {
379 error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAXIMUM_REPLICATION_LEVEL);
380 return Err(OperationError::ReplDomainLevelUnsatisfiable);
381 };
382
383 let domain_patch_level = if self.get_domain_development_taint() {
384 u32::MAX
385 } else {
386 self.get_domain_patch_level()
387 };
388
389 if ctx_domain_patch_level != domain_patch_level {
390 error!("Unable to proceed with consumer incremental - incoming domain patch level is not equal to our patch level. {} != {}", ctx_domain_patch_level, domain_patch_level);
391 return Err(OperationError::ReplDomainLevelUnsatisfiable);
392 };
393
394 let db_uuid = self.be_txn.get_db_d_uuid()?;
396
397 if db_uuid != ctx_domain_uuid {
398 error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
399 return Err(OperationError::ReplDomainUuidMismatch);
400 }
401
402 let txn_cid = self.get_cid().clone();
404 let ruv = self.be_txn.get_ruv_write();
405
406 let change_count = ctx_schema_entries.len() + ctx_meta_entries.len() + ctx_entries.len();
407
408 ruv.incremental_preflight_validate_ruv(ctx_ranges, &txn_cid)
409 .inspect_err(|err| {
410 error!(
411 ?err,
412 "Incoming RUV failed preflight checks, unable to proceed."
413 );
414 })?;
415
416 debug!(
418 "Proceeding to apply incremental with {change_count} changes from domain {ctx_domain_uuid:?} at level {ctx_domain_version}"
419 );
420
421 debug!(?ctx_ranges);
422
423 if ctx_domain_version < DOMAIN_LEVEL_1_11 {
424 debug!("Applying {} schema entries", ctx_schema_entries.len());
427 let schema_changed = self
429 .consumer_incremental_apply_entries(ctx_schema_entries)
430 .inspect_err(|err| {
431 error!(?err, "Failed to apply incremental schema entries");
432 })?;
433
434 if schema_changed {
435 self.reload_schema().inspect_err(|err| {
437 error!(?err, "Failed to reload schema");
438 })?;
439 }
440 } else {
441 self.reload_schema().inspect_err(|err| {
442 error!(?err, "Failed to reload schema");
443 })?;
444 }
445
446 debug!("Applying {} meta entries", ctx_meta_entries.len());
447 let meta_changed = self
449 .consumer_incremental_apply_entries(ctx_meta_entries)
450 .inspect_err(|err| {
451 error!(?err, "Failed to apply incremental meta entries");
452 })?;
453
454 if meta_changed {
456 self.reload_domain_info().inspect_err(|err| {
457 error!(?err, "Failed to reload domain info");
458 })?;
459 self.reload_system_config().inspect_err(|err| {
460 error!(?err, "Failed to reload system configuration");
461 })?;
462 }
463
464 debug!("Applying {} context entries", ctx_entries.len());
465 self.consumer_incremental_apply_entries(ctx_entries)
467 .inspect_err(|err| {
468 error!(?err, "Failed to apply incremental meta entries");
469 })?;
470
471 if meta_changed {
478 self.reload_domain_info_version().inspect_err(|err| {
479 error!(?err, "Failed to reload domain info version");
480 })?;
481 }
482
483 let ruv = self.be_txn.get_ruv_write();
486
487 ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
488 error!(?err, "RUV ranges were not rebuilt correctly.");
489 })?;
490
491 ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
492 error!(?err, "Unable to update RUV with supplier ranges.");
493 })?;
494
495 Ok(ConsumerState::Ok)
496 }
497
498 pub fn consumer_apply_refresh(
499 &mut self,
500 ctx: ReplRefreshContext,
501 ) -> Result<(), OperationError> {
502 match ctx {
503 ReplRefreshContext::V1 {
504 domain_version,
505 domain_devel,
506 domain_uuid,
507 ranges,
508 schema_entries,
509 meta_entries,
510 entries,
511 } => self.consumer_apply_refresh_v1(
512 domain_version,
513 domain_devel,
514 domain_uuid,
515 &ranges,
516 schema_entries,
517 meta_entries,
518 entries,
519 ),
520 }
521 }
522
523 fn consumer_refresh_create_entries(
524 &mut self,
525 ctx_entries: Vec<ReplEntryV1>,
526 ) -> Result<(), OperationError> {
527 let candidates = ctx_entries
528 .into_iter()
529 .map(EntryRefreshNew::from_repl_entry_v1)
530 .collect::<Result<Vec<EntryRefreshNew>, _>>()
531 .inspect_err(|err| {
532 error!(?err, "Failed to convert entries from supplier");
533 })?;
534
535 Plugins::run_pre_repl_refresh(self, candidates.as_slice()).map_err(|e| {
536 admin_error!(
537 "Refresh operation failed (pre_repl_refresh plugin), {:?}",
538 e
539 );
540 e
541 })?;
542
543 let norm_cand = candidates
545 .into_iter()
546 .map(|e| {
547 e.validate(&self.schema)
548 .map_err(|e| {
549 admin_error!("Schema Violation in refresh validate {:?}", e);
550 OperationError::SchemaViolation(e)
551 })
552 .map(|e| {
553 e.seal(&self.schema)
555 })
556 })
557 .collect::<Result<Vec<EntrySealedNew>, _>>()?;
558
559 let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
560 admin_error!("betxn create failure {:?}", e);
561 e
562 })?;
563
564 Plugins::run_post_repl_refresh(self, &commit_cand).map_err(|e| {
565 admin_error!(
566 "Refresh operation failed (post_repl_refresh plugin), {:?}",
567 e
568 );
569 e
570 })?;
571
572 self.changed_uuid
573 .extend(commit_cand.iter().map(|e| e.get_uuid()));
574
575 Ok(())
576 }
577
578 #[instrument(level = "info", skip_all)]
579 fn consumer_apply_refresh_v1(
580 &mut self,
581 ctx_domain_version: DomainVersion,
582 ctx_domain_devel: bool,
583 ctx_domain_uuid: Uuid,
584 ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
585 ctx_schema_entries: Vec<ReplEntryV1>,
586 ctx_meta_entries: Vec<ReplEntryV1>,
587 ctx_entries: Vec<ReplEntryV1>,
588 ) -> Result<(), OperationError> {
589 let current_devel_flag = option_env!("KANIDM_PRE_RELEASE").is_some();
592
593 if ctx_domain_version < DOMAIN_MINIMUM_REPLICATION_LEVEL {
594 error!("Unable to proceed with consumer refresh - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MINIMUM_REPLICATION_LEVEL);
595 return Err(OperationError::ReplDomainLevelUnsatisfiable);
596 } else if ctx_domain_version > DOMAIN_MAXIMUM_REPLICATION_LEVEL {
597 error!("Unable to proceed with consumer refresh - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAXIMUM_REPLICATION_LEVEL);
598 return Err(OperationError::ReplDomainLevelUnsatisfiable);
599 } else if ctx_domain_devel && !current_devel_flag {
600 error!("Unable to proceed with consumer refresh - incoming domain is from a development version while this server is a stable release.");
601 return Err(OperationError::ReplDomainLevelUnsatisfiable);
602 } else if !ctx_domain_devel && current_devel_flag {
603 error!("Unable to proceed with consumer refresh - incoming domain is from a stable version while this server is a development release.");
604 return Err(OperationError::ReplDomainLevelUnsatisfiable);
605 } else {
606 debug!(
607 "Proceeding to refresh from domain at level {}",
608 ctx_domain_version
609 );
610 };
611
612 self.set_phase_bootstrap();
614
615 self.be_txn
617 .set_db_d_uuid(ctx_domain_uuid)
618 .inspect_err(|err| {
619 error!(?err, "Failed to reset domain uuid");
620 })?;
621
622 self.reset_server_uuid()?;
626
627 self.be_txn
629 .danger_delete_all_db_content()
630 .inspect_err(|err| {
631 error!(?err, "Failed to clear existing server database content");
632 })?;
633
634 if ctx_domain_version < DOMAIN_LEVEL_1_11 {
639 self.schema.generate_in_memory().inspect_err(|err| {
640 error!(?err, "Failed to reset in memory schema to clean state");
641 })?;
642
643 self.reindex(false).inspect_err(|err| {
645 error!(?err, "Failed to reload schema");
646 })?;
647
648 self.consumer_refresh_create_entries(ctx_schema_entries)
651 .inspect_err(|err| {
652 error!(?err, "Failed to refresh schema entries");
653 })?;
654 }
655
656 self.reload_schema().inspect_err(|err| {
659 error!(?err, "Failed to reload schema");
660 })?;
661
662 self.set_phase(ServerPhase::SchemaReady);
664
665 self.reindex(false).inspect_err(|err| {
668 error!(?err, "Failed to reload schema");
669 })?;
670
671 self.consumer_refresh_create_entries(ctx_meta_entries)
673 .inspect_err(|err| {
674 error!(?err, "Failed to refresh meta entries");
675 })?;
676
677 self.reload_domain_info().inspect_err(|err| {
681 error!(?err, "Failed to reload domain info");
682 })?;
683
684 self.changed_flags.insert(
686 ChangeFlag::SCHEMA
687 | ChangeFlag::ACP
688 | ChangeFlag::OAUTH2
689 | ChangeFlag::OAUTH2_CLIENT
690 | ChangeFlag::FEATURE
691 | ChangeFlag::DOMAIN
692 | ChangeFlag::APPLICATION
693 | ChangeFlag::SYSTEM_CONFIG
694 | ChangeFlag::SYNC_AGREEMENT
695 | ChangeFlag::KEY_MATERIAL,
696 );
697
698 self.set_phase(ServerPhase::DomainInfoReady);
700
701 self.consumer_refresh_create_entries(ctx_entries)
705 .inspect_err(|err| {
706 error!(?err, "Failed to refresh main db entries");
707 })?;
708
709 let ruv = self.be_txn.get_ruv_write();
712
713 ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
714 error!(?err, "RUV ranges were not rebuilt correctly.");
715 })?;
716
717 ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
718 error!(?err, "Unable to update RUV with supplier ranges.");
719 })?;
720
721 self.set_phase(ServerPhase::Running);
723
724 Ok(())
725 }
726}