1use super::proto::*;
2use crate::plugins::Plugins;
3use crate::prelude::*;
4use crate::server::{ChangeFlag, ServerPhase};
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::Arc;
7
8impl QueryServerWriteTransaction<'_> {
9 fn consumer_incremental_apply_entries(
12 &mut self,
13 ctx_entries: Vec<ReplIncrementalEntryV1>,
14 ) -> Result<bool, OperationError> {
15 if ctx_entries.is_empty() {
19 debug!("No entries to act upon");
20 return Ok(false);
21 }
22
23 let ctx_entries: Vec<_> = ctx_entries.into_iter().map(
34 EntryIncrementalNew::rehydrate
35 )
36 .collect::<Result<Vec<_>, _>>()
37 .map_err(|e| {
38 error!(err = ?e, "Unable to process replication incremental entries to valid entry states for replication");
39 e
40 })?;
41
42 trace!(?ctx_entries);
43
44 let db_entries = self
45 .be_txn
46 .incremental_prepare(&ctx_entries)
47 .inspect_err(|err| {
48 error!(?err, "Failed to access entries from db");
49 })?;
50
51 trace!(?db_entries);
52
53 let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
60 .iter()
61 .zip(db_entries)
62 .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
63
64 debug!(conflicts = %conflicts.len(), proceed = %proceed.len());
65
66 let (conflict_create, conflict_update): (
72 Vec<Option<EntrySealedNew>>,
73 Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
74 ) = conflicts
75 .into_iter()
76 .map(
77 |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
78 let (opt_create, ent) =
79 ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
80 (opt_create, (ent, db_ent))
81 },
82 )
83 .unzip();
84
85 let mut conflict_uuids: BTreeSet<_> = conflict_update
106 .iter()
107 .filter_map(|(_, e)| {
108 let u = e.get_uuid();
109 if u >= DYNAMIC_RANGE_MINIMUM_UUID {
110 Some(u)
112 } else {
113 None
115 }
116 })
117 .collect();
118
119 let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
121
122 let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
123 .into_iter()
124 .map(|(ctx_ent, db_ent)| {
125 let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema, self.trim_cid());
130 (merge_ent, db_ent)
131 })
132 .collect();
133
134 let mut all_updates = conflict_update
139 .into_iter()
140 .chain(proceed_update)
141 .collect::<Vec<_>>();
142
143 Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
155 admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
156 e
157 })?;
158
159 let all_updates_valid = all_updates
163 .into_iter()
164 .map(|(ctx_ent, db_ent)| {
165 let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
172 (sealed_ent, db_ent)
173 })
174 .collect::<Vec<_>>();
175
176 self.be_txn
188 .incremental_apply(&all_updates_valid, conflict_create)
189 .map_err(|e| {
190 admin_error!("betxn create failure {:?}", e);
191 e
192 })?;
193
194 Plugins::run_post_repl_incremental_conflict(
195 self,
196 all_updates_valid.as_slice(),
197 &mut conflict_uuids,
198 )
199 .map_err(|e| {
200 error!(
201 "Operation failed (post_repl_incremental_conflict plugin), {:?}",
202 e
203 );
204 e
205 })?;
206
207 let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
210 .into_iter()
211 .unzip();
224
225 Plugins::run_post_repl_incremental(
231 self,
232 pre_cand.as_slice(),
233 cand.as_slice(),
234 &conflict_uuids,
235 )
236 .map_err(|e| {
237 error!("Operation failed (post_repl_incremental plugin), {:?}", e);
238 e
239 })?;
240
241 self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));
242
243 if !self.changed_flags.contains(ChangeFlag::ACP)
244 && cand
245 .iter()
246 .chain(pre_cand.iter().map(|e| e.as_ref()))
247 .any(|e| {
248 e.attribute_equality(Attribute::Class, &EntryClass::AccessControlProfile.into())
249 })
250 {
251 self.changed_flags.insert(ChangeFlag::ACP)
252 }
253
254 if !self.changed_flags.contains(ChangeFlag::OAUTH2)
255 && cand
256 .iter()
257 .chain(pre_cand.iter().map(|e| e.as_ref()))
258 .any(|e| {
259 e.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into())
260 })
261 {
262 self.changed_flags.insert(ChangeFlag::OAUTH2)
263 }
264
265 if !self.changed_flags.contains(ChangeFlag::APPLICATION)
266 && cand
267 .iter()
268 .chain(pre_cand.iter().map(|e| e.as_ref()))
269 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Application.into()))
270 {
271 self.changed_flags.insert(ChangeFlag::APPLICATION)
272 }
273
274 if !self.changed_flags.contains(ChangeFlag::SYNC_AGREEMENT)
275 && cand
276 .iter()
277 .chain(pre_cand.iter().map(|e| e.as_ref()))
278 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::SyncAccount.into()))
279 {
280 self.changed_flags.insert(ChangeFlag::SYNC_AGREEMENT)
281 }
282
283 if !self.changed_flags.contains(ChangeFlag::KEY_MATERIAL)
284 && cand
285 .iter()
286 .chain(pre_cand.iter().map(|e| e.as_ref()))
287 .any(|e| {
288 e.attribute_equality(Attribute::Class, &EntryClass::KeyProvider.into())
289 || e.attribute_equality(Attribute::Class, &EntryClass::KeyObject.into())
290 })
291 {
292 self.changed_flags.insert(ChangeFlag::KEY_MATERIAL)
293 }
294
295 trace!(
296 changed = ?self.changed_flags.iter_names().collect::<Vec<_>>(),
297 );
298
299 Ok(true)
300 }
301
302 pub fn consumer_apply_changes(
303 &mut self,
304 ctx: ReplIncrementalContext,
305 ) -> Result<ConsumerState, OperationError> {
306 match ctx {
307 ReplIncrementalContext::DomainMismatch => {
308 error!("Unable to proceed with consumer incremental - the supplier has indicated that our domain_uuid's are not equivalent. This can occur when adding a new consumer to an existing topology.");
309 error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
310 Ok(ConsumerState::RefreshRequired)
311 }
312 ReplIncrementalContext::NoChangesAvailable => {
313 debug!("no changes are available");
314 Ok(ConsumerState::Ok)
315 }
316 ReplIncrementalContext::RefreshRequired => {
317 error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
318 error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
319 Ok(ConsumerState::RefreshRequired)
320 }
321 ReplIncrementalContext::UnwillingToSupply => {
322 warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
323 error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
324 Ok(ConsumerState::Ok)
325 }
326 ReplIncrementalContext::V1 {
327 domain_version,
328 domain_patch_level,
329 domain_uuid,
330 ranges,
331 schema_entries,
332 meta_entries,
333 entries,
334 } => self.consumer_apply_changes_v1(
335 domain_version,
336 domain_patch_level,
337 domain_uuid,
338 &ranges,
339 schema_entries,
340 meta_entries,
341 entries,
342 ),
343 }
344 }
345
346 #[instrument(level = "debug", skip_all)]
347 fn consumer_apply_changes_v1(
348 &mut self,
349 ctx_domain_version: DomainVersion,
350 ctx_domain_patch_level: u32,
351 ctx_domain_uuid: Uuid,
352 ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
353 ctx_schema_entries: Vec<ReplIncrementalEntryV1>,
354 ctx_meta_entries: Vec<ReplIncrementalEntryV1>,
355 ctx_entries: Vec<ReplIncrementalEntryV1>,
356 ) -> Result<ConsumerState, OperationError> {
357 if ctx_domain_version < DOMAIN_MIN_LEVEL {
358 error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
359 return Err(OperationError::ReplDomainLevelUnsatisfiable);
360 } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
361 error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
362 return Err(OperationError::ReplDomainLevelUnsatisfiable);
363 };
364
365 let domain_patch_level = if self.get_domain_development_taint() {
366 u32::MAX
367 } else {
368 self.get_domain_patch_level()
369 };
370
371 if ctx_domain_patch_level != domain_patch_level {
372 error!("Unable to proceed with consumer incremental - incoming domain patch level is not equal to our patch level. {} != {}", ctx_domain_patch_level, domain_patch_level);
373 return Err(OperationError::ReplDomainLevelUnsatisfiable);
374 };
375
376 let db_uuid = self.be_txn.get_db_d_uuid()?;
378
379 if db_uuid != ctx_domain_uuid {
380 error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
381 return Err(OperationError::ReplDomainUuidMismatch);
382 }
383
384 let txn_cid = self.get_cid().clone();
386 let ruv = self.be_txn.get_ruv_write();
387
388 let change_count = ctx_schema_entries.len() + ctx_meta_entries.len() + ctx_entries.len();
389
390 ruv.incremental_preflight_validate_ruv(ctx_ranges, &txn_cid)
391 .inspect_err(|err| {
392 error!(
393 ?err,
394 "Incoming RUV failed preflight checks, unable to proceed."
395 );
396 })?;
397
398 debug!(
400 "Proceeding to apply incremental with {change_count} changes from domain {ctx_domain_uuid:?} at level {ctx_domain_version}"
401 );
402
403 debug!(?ctx_ranges);
404
405 debug!("Applying {} schema entries", ctx_schema_entries.len());
406 let schema_changed = self
408 .consumer_incremental_apply_entries(ctx_schema_entries)
409 .inspect_err(|err| {
410 error!(?err, "Failed to apply incremental schema entries");
411 })?;
412
413 if schema_changed {
414 self.reload_schema().inspect_err(|err| {
416 error!(?err, "Failed to reload schema");
417 })?;
418 }
419
420 debug!("Applying {} meta entries", ctx_meta_entries.len());
421 let meta_changed = self
423 .consumer_incremental_apply_entries(ctx_meta_entries)
424 .inspect_err(|err| {
425 error!(?err, "Failed to apply incremental meta entries");
426 })?;
427
428 if meta_changed {
430 self.reload_domain_info().inspect_err(|err| {
431 error!(?err, "Failed to reload domain info");
432 })?;
433 self.reload_system_config().inspect_err(|err| {
434 error!(?err, "Failed to reload system configuration");
435 })?;
436 }
437
438 debug!("Applying {} context entries", ctx_entries.len());
439 self.consumer_incremental_apply_entries(ctx_entries)
441 .inspect_err(|err| {
442 error!(?err, "Failed to apply incremental meta entries");
443 })?;
444
445 if meta_changed {
452 self.reload_domain_info_version().inspect_err(|err| {
453 error!(?err, "Failed to reload domain info version");
454 })?;
455 }
456
457 let ruv = self.be_txn.get_ruv_write();
460
461 ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
462 error!(?err, "RUV ranges were not rebuilt correctly.");
463 })?;
464
465 ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
466 error!(?err, "Unable to update RUV with supplier ranges.");
467 })?;
468
469 Ok(ConsumerState::Ok)
470 }
471
472 pub fn consumer_apply_refresh(
473 &mut self,
474 ctx: ReplRefreshContext,
475 ) -> Result<(), OperationError> {
476 match ctx {
477 ReplRefreshContext::V1 {
478 domain_version,
479 domain_devel,
480 domain_uuid,
481 ranges,
482 schema_entries,
483 meta_entries,
484 entries,
485 } => self.consumer_apply_refresh_v1(
486 domain_version,
487 domain_devel,
488 domain_uuid,
489 &ranges,
490 schema_entries,
491 meta_entries,
492 entries,
493 ),
494 }
495 }
496
497 fn consumer_refresh_create_entries(
498 &mut self,
499 ctx_entries: Vec<ReplEntryV1>,
500 ) -> Result<(), OperationError> {
501 let candidates = ctx_entries
502 .into_iter()
503 .map(EntryRefreshNew::from_repl_entry_v1)
504 .collect::<Result<Vec<EntryRefreshNew>, _>>()
505 .inspect_err(|err| {
506 error!(?err, "Failed to convert entries from supplier");
507 })?;
508
509 Plugins::run_pre_repl_refresh(self, candidates.as_slice()).map_err(|e| {
510 admin_error!(
511 "Refresh operation failed (pre_repl_refresh plugin), {:?}",
512 e
513 );
514 e
515 })?;
516
517 let norm_cand = candidates
519 .into_iter()
520 .map(|e| {
521 e.validate(&self.schema)
522 .map_err(|e| {
523 admin_error!("Schema Violation in refresh validate {:?}", e);
524 OperationError::SchemaViolation(e)
525 })
526 .map(|e| {
527 e.seal(&self.schema)
529 })
530 })
531 .collect::<Result<Vec<EntrySealedNew>, _>>()?;
532
533 let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
534 admin_error!("betxn create failure {:?}", e);
535 e
536 })?;
537
538 Plugins::run_post_repl_refresh(self, &commit_cand).map_err(|e| {
539 admin_error!(
540 "Refresh operation failed (post_repl_refresh plugin), {:?}",
541 e
542 );
543 e
544 })?;
545
546 self.changed_uuid
547 .extend(commit_cand.iter().map(|e| e.get_uuid()));
548
549 Ok(())
550 }
551
552 #[instrument(level = "info", skip_all)]
553 fn consumer_apply_refresh_v1(
554 &mut self,
555 ctx_domain_version: DomainVersion,
556 ctx_domain_devel: bool,
557 ctx_domain_uuid: Uuid,
558 ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
559 ctx_schema_entries: Vec<ReplEntryV1>,
560 ctx_meta_entries: Vec<ReplEntryV1>,
561 ctx_entries: Vec<ReplEntryV1>,
562 ) -> Result<(), OperationError> {
563 let current_devel_flag = option_env!("KANIDM_PRE_RELEASE").is_some();
566
567 if ctx_domain_version < DOMAIN_MIN_LEVEL {
568 error!("Unable to proceed with consumer refresh - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
569 return Err(OperationError::ReplDomainLevelUnsatisfiable);
570 } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
571 error!("Unable to proceed with consumer refresh - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
572 return Err(OperationError::ReplDomainLevelUnsatisfiable);
573 } else if ctx_domain_devel && !current_devel_flag {
574 error!("Unable to proceed with consumer refresh - incoming domain is from a development version while this server is a stable release.");
575 return Err(OperationError::ReplDomainLevelUnsatisfiable);
576 } else if !ctx_domain_devel && current_devel_flag {
577 error!("Unable to proceed with consumer refresh - incoming domain is from a stable version while this server is a development release.");
578 return Err(OperationError::ReplDomainLevelUnsatisfiable);
579 } else {
580 debug!(
581 "Proceeding to refresh from domain at level {}",
582 ctx_domain_version
583 );
584 };
585
586 self.set_phase_bootstrap();
588
589 self.be_txn
591 .set_db_d_uuid(ctx_domain_uuid)
592 .inspect_err(|err| {
593 error!(?err, "Failed to reset domain uuid");
594 })?;
595
596 self.reset_server_uuid()?;
600
601 self.be_txn
603 .danger_delete_all_db_content()
604 .inspect_err(|err| {
605 error!(?err, "Failed to clear existing server database content");
606 })?;
607
608 self.schema.generate_in_memory().inspect_err(|err| {
610 error!(?err, "Failed to reset in memory schema to clean state");
611 })?;
612
613 self.reindex(false).inspect_err(|err| {
616 error!(?err, "Failed to reload schema");
617 })?;
618
619 self.consumer_refresh_create_entries(ctx_schema_entries)
622 .inspect_err(|err| {
623 error!(?err, "Failed to refresh schema entries");
624 })?;
625
626 self.reload_schema().inspect_err(|err| {
628 error!(?err, "Failed to reload schema");
629 })?;
630
631 self.set_phase(ServerPhase::SchemaReady);
633
634 self.reindex(false).inspect_err(|err| {
637 error!(?err, "Failed to reload schema");
638 })?;
639
640 self.consumer_refresh_create_entries(ctx_meta_entries)
642 .inspect_err(|err| {
643 error!(?err, "Failed to refresh meta entries");
644 })?;
645
646 self.reload_domain_info().inspect_err(|err| {
650 error!(?err, "Failed to reload domain info");
651 })?;
652
653 self.changed_flags.insert(
655 ChangeFlag::SCHEMA
656 | ChangeFlag::ACP
657 | ChangeFlag::OAUTH2
658 | ChangeFlag::DOMAIN
659 | ChangeFlag::APPLICATION
660 | ChangeFlag::SYSTEM_CONFIG
661 | ChangeFlag::SYNC_AGREEMENT
662 | ChangeFlag::KEY_MATERIAL,
663 );
664
665 self.set_phase(ServerPhase::DomainInfoReady);
667
668 self.consumer_refresh_create_entries(ctx_entries)
672 .inspect_err(|err| {
673 error!(?err, "Failed to refresh schema entries");
674 })?;
675
676 let ruv = self.be_txn.get_ruv_write();
679
680 ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
681 error!(?err, "RUV ranges were not rebuilt correctly.");
682 })?;
683
684 ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
685 error!(?err, "Unable to update RUV with supplier ranges.");
686 })?;
687
688 self.set_phase(ServerPhase::Running);
690
691 Ok(())
692 }
693}