1use super::proto::*;
2use crate::plugins::Plugins;
3use crate::prelude::*;
4use crate::server::{ChangeFlag, ServerPhase};
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::Arc;
7
8impl QueryServerWriteTransaction<'_> {
9 fn consumer_incremental_apply_entries(
12 &mut self,
13 ctx_entries: Vec<ReplIncrementalEntryV1>,
14 ) -> Result<bool, OperationError> {
15 if ctx_entries.is_empty() {
19 debug!("No entries to act upon");
20 return Ok(false);
21 }
22
23 let ctx_entries: Vec<_> = ctx_entries.into_iter().map(
34 EntryIncrementalNew::rehydrate
35 )
36 .collect::<Result<Vec<_>, _>>()
37 .map_err(|e| {
38 error!(err = ?e, "Unable to process replication incremental entries to valid entry states for replication");
39 e
40 })?;
41
42 trace!(?ctx_entries);
43
44 let db_entries = self
45 .be_txn
46 .incremental_prepare(&ctx_entries)
47 .inspect_err(|err| {
48 error!(?err, "Failed to access entries from db");
49 })?;
50
51 trace!(?db_entries);
52
53 let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
60 .iter()
61 .zip(db_entries)
62 .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
63
64 debug!(conflicts = %conflicts.len(), proceed = %proceed.len());
65
66 let (conflict_create, conflict_update): (
72 Vec<Option<EntrySealedNew>>,
73 Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
74 ) = conflicts
75 .into_iter()
76 .map(
77 |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
78 let (opt_create, ent) =
79 ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
80 (opt_create, (ent, db_ent))
81 },
82 )
83 .unzip();
84
85 let mut conflict_uuids: BTreeSet<_> = conflict_update
106 .iter()
107 .filter_map(|(_, e)| {
108 let u = e.get_uuid();
109 if u >= DYNAMIC_RANGE_MINIMUM_UUID {
110 Some(u)
112 } else {
113 None
115 }
116 })
117 .collect();
118
119 let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
121
122 let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
123 .into_iter()
124 .map(|(ctx_ent, db_ent)| {
125 let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema, self.trim_cid());
130 (merge_ent, db_ent)
131 })
132 .collect();
133
134 let mut all_updates = conflict_update
139 .into_iter()
140 .chain(proceed_update)
141 .collect::<Vec<_>>();
142
143 Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
155 admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
156 e
157 })?;
158
159 let all_updates_valid = all_updates
163 .into_iter()
164 .map(|(ctx_ent, db_ent)| {
165 let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
172 (sealed_ent, db_ent)
173 })
174 .collect::<Vec<_>>();
175
176 self.be_txn
188 .incremental_apply(&all_updates_valid, conflict_create)
189 .map_err(|e| {
190 admin_error!("betxn create failure {:?}", e);
191 e
192 })?;
193
194 Plugins::run_post_repl_incremental_conflict(
195 self,
196 all_updates_valid.as_slice(),
197 &mut conflict_uuids,
198 )
199 .map_err(|e| {
200 error!(
201 "Operation failed (post_repl_incremental_conflict plugin), {:?}",
202 e
203 );
204 e
205 })?;
206
207 let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
210 .into_iter()
211 .unzip();
224
225 Plugins::run_post_repl_incremental(
231 self,
232 pre_cand.as_slice(),
233 cand.as_slice(),
234 &conflict_uuids,
235 )
236 .map_err(|e| {
237 error!("Operation failed (post_repl_incremental plugin), {:?}", e);
238 e
239 })?;
240
241 self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));
242
243 if !self.changed_flags.contains(ChangeFlag::ACP)
244 && cand
245 .iter()
246 .chain(pre_cand.iter().map(|e| e.as_ref()))
247 .any(|e| {
248 e.attribute_equality(Attribute::Class, &EntryClass::AccessControlProfile.into())
249 })
250 {
251 self.changed_flags.insert(ChangeFlag::ACP)
252 }
253
254 if !self.changed_flags.contains(ChangeFlag::OAUTH2)
255 && cand
256 .iter()
257 .chain(pre_cand.iter().map(|e| e.as_ref()))
258 .any(|e| {
259 e.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into())
260 })
261 {
262 self.changed_flags.insert(ChangeFlag::OAUTH2)
263 }
264
265 if !self.changed_flags.contains(ChangeFlag::APPLICATION)
266 && cand
267 .iter()
268 .chain(pre_cand.iter().map(|e| e.as_ref()))
269 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Application.into()))
270 {
271 self.changed_flags.insert(ChangeFlag::APPLICATION)
272 }
273
274 if !self.changed_flags.contains(ChangeFlag::SYNC_AGREEMENT)
275 && cand
276 .iter()
277 .chain(pre_cand.iter().map(|e| e.as_ref()))
278 .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::SyncAccount.into()))
279 {
280 self.changed_flags.insert(ChangeFlag::SYNC_AGREEMENT)
281 }
282
283 if !self.changed_flags.contains(ChangeFlag::KEY_MATERIAL)
284 && cand
285 .iter()
286 .chain(pre_cand.iter().map(|e| e.as_ref()))
287 .any(|e| {
288 e.attribute_equality(Attribute::Class, &EntryClass::KeyProvider.into())
289 || e.attribute_equality(Attribute::Class, &EntryClass::KeyObject.into())
290 })
291 {
292 self.changed_flags.insert(ChangeFlag::KEY_MATERIAL)
293 }
294
295 trace!(
296 changed = ?self.changed_flags.iter_names().collect::<Vec<_>>(),
297 );
298
299 Ok(true)
300 }
301
302 pub fn consumer_apply_changes(
303 &mut self,
304 ctx: ReplIncrementalContext,
305 ) -> Result<ConsumerState, OperationError> {
306 match ctx {
307 ReplIncrementalContext::DomainMismatch => {
308 error!("Unable to proceed with consumer incremental - the supplier has indicated that our domain_uuid's are not equivalent. This can occur when adding a new consumer to an existing topology.");
309 error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
310 Ok(ConsumerState::RefreshRequired)
311 }
312 ReplIncrementalContext::NoChangesAvailable => {
313 info!("no changes are available");
314 Ok(ConsumerState::Ok)
315 }
316 ReplIncrementalContext::RefreshRequired => {
317 error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
318 error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
319 Ok(ConsumerState::RefreshRequired)
320 }
321 ReplIncrementalContext::UnwillingToSupply => {
322 warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
323 error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
324 Ok(ConsumerState::Ok)
325 }
326 ReplIncrementalContext::V1 {
327 domain_version,
328 domain_patch_level,
329 domain_uuid,
330 ranges,
331 schema_entries,
332 meta_entries,
333 entries,
334 } => self.consumer_apply_changes_v1(
335 domain_version,
336 domain_patch_level,
337 domain_uuid,
338 ranges,
339 schema_entries,
340 meta_entries,
341 entries,
342 ),
343 }
344 }
345
346 #[instrument(level = "debug", skip_all)]
347 fn consumer_apply_changes_v1(
348 &mut self,
349 ctx_domain_version: DomainVersion,
350 ctx_domain_patch_level: u32,
351 ctx_domain_uuid: Uuid,
352 ctx_ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
353 ctx_schema_entries: Vec<ReplIncrementalEntryV1>,
354 ctx_meta_entries: Vec<ReplIncrementalEntryV1>,
355 ctx_entries: Vec<ReplIncrementalEntryV1>,
356 ) -> Result<ConsumerState, OperationError> {
357 if ctx_domain_version < DOMAIN_MIN_LEVEL {
358 error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
359 return Err(OperationError::ReplDomainLevelUnsatisfiable);
360 } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
361 error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
362 return Err(OperationError::ReplDomainLevelUnsatisfiable);
363 };
364
365 let domain_patch_level = if self.get_domain_development_taint() {
366 u32::MAX
367 } else {
368 self.get_domain_patch_level()
369 };
370
371 if ctx_domain_patch_level != domain_patch_level {
372 error!("Unable to proceed with consumer incremental - incoming domain patch level is not equal to our patch level. {} != {}", ctx_domain_patch_level, domain_patch_level);
373 return Err(OperationError::ReplDomainLevelUnsatisfiable);
374 };
375
376 let db_uuid = self.be_txn.get_db_d_uuid()?;
378
379 if db_uuid != ctx_domain_uuid {
380 error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
381 return Err(OperationError::ReplDomainUuidMismatch);
382 }
383
384 let txn_cid = self.get_cid().clone();
386 let ruv = self.be_txn.get_ruv_write();
387
388 ruv.incremental_preflight_validate_ruv(&ctx_ranges, &txn_cid)
389 .inspect_err(|err| {
390 error!(
391 ?err,
392 "Incoming RUV failed preflight checks, unable to proceed."
393 );
394 })?;
395
396 debug!(
398 "Proceeding to apply incremental from domain {:?} at level {}",
399 ctx_domain_uuid, ctx_domain_version
400 );
401
402 debug!(?ctx_ranges);
403
404 debug!("Applying schema entries");
405 let schema_changed = self
407 .consumer_incremental_apply_entries(ctx_schema_entries)
408 .inspect_err(|err| {
409 error!(?err, "Failed to apply incremental schema entries");
410 })?;
411
412 if schema_changed {
413 self.reload_schema().inspect_err(|err| {
415 error!(?err, "Failed to reload schema");
416 })?;
417 }
418
419 debug!("Applying meta entries");
420 let meta_changed = self
422 .consumer_incremental_apply_entries(ctx_meta_entries)
423 .inspect_err(|err| {
424 error!(?err, "Failed to apply incremental meta entries");
425 })?;
426
427 if meta_changed {
429 self.reload_domain_info().inspect_err(|err| {
430 error!(?err, "Failed to reload domain info");
431 })?;
432 self.reload_system_config().inspect_err(|err| {
433 error!(?err, "Failed to reload system configuration");
434 })?;
435 }
436
437 debug!("Applying all context entries");
438 self.consumer_incremental_apply_entries(ctx_entries)
440 .inspect_err(|err| {
441 error!(?err, "Failed to apply incremental meta entries");
442 })?;
443
444 if meta_changed {
451 self.reload_domain_info_version().inspect_err(|err| {
452 error!(?err, "Failed to reload domain info version");
453 })?;
454 }
455
456 let ruv = self.be_txn.get_ruv_write();
459
460 ruv.refresh_validate_ruv(&ctx_ranges).inspect_err(|err| {
461 error!(?err, "RUV ranges were not rebuilt correctly.");
462 })?;
463
464 ruv.refresh_update_ruv(&ctx_ranges).inspect_err(|err| {
465 error!(?err, "Unable to update RUV with supplier ranges.");
466 })?;
467
468 Ok(ConsumerState::Ok)
469 }
470
471 pub fn consumer_apply_refresh(
472 &mut self,
473 ctx: ReplRefreshContext,
474 ) -> Result<(), OperationError> {
475 match ctx {
476 ReplRefreshContext::V1 {
477 domain_version,
478 domain_devel,
479 domain_uuid,
480 ranges,
481 schema_entries,
482 meta_entries,
483 entries,
484 } => self.consumer_apply_refresh_v1(
485 domain_version,
486 domain_devel,
487 domain_uuid,
488 ranges,
489 schema_entries,
490 meta_entries,
491 entries,
492 ),
493 }
494 }
495
496 fn consumer_refresh_create_entries(
497 &mut self,
498 ctx_entries: Vec<ReplEntryV1>,
499 ) -> Result<(), OperationError> {
500 let candidates = ctx_entries
501 .into_iter()
502 .map(EntryRefreshNew::from_repl_entry_v1)
503 .collect::<Result<Vec<EntryRefreshNew>, _>>()
504 .inspect_err(|err| {
505 error!(?err, "Failed to convert entries from supplier");
506 })?;
507
508 Plugins::run_pre_repl_refresh(self, candidates.as_slice()).map_err(|e| {
509 admin_error!(
510 "Refresh operation failed (pre_repl_refresh plugin), {:?}",
511 e
512 );
513 e
514 })?;
515
516 let norm_cand = candidates
518 .into_iter()
519 .map(|e| {
520 e.validate(&self.schema)
521 .map_err(|e| {
522 admin_error!("Schema Violation in refresh validate {:?}", e);
523 OperationError::SchemaViolation(e)
524 })
525 .map(|e| {
526 e.seal(&self.schema)
528 })
529 })
530 .collect::<Result<Vec<EntrySealedNew>, _>>()?;
531
532 let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
533 admin_error!("betxn create failure {:?}", e);
534 e
535 })?;
536
537 Plugins::run_post_repl_refresh(self, &commit_cand).map_err(|e| {
538 admin_error!(
539 "Refresh operation failed (post_repl_refresh plugin), {:?}",
540 e
541 );
542 e
543 })?;
544
545 self.changed_uuid
546 .extend(commit_cand.iter().map(|e| e.get_uuid()));
547
548 Ok(())
549 }
550
551 #[instrument(level = "info", skip_all)]
552 fn consumer_apply_refresh_v1(
553 &mut self,
554 ctx_domain_version: DomainVersion,
555 ctx_domain_devel: bool,
556 ctx_domain_uuid: Uuid,
557 ctx_ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
558 ctx_schema_entries: Vec<ReplEntryV1>,
559 ctx_meta_entries: Vec<ReplEntryV1>,
560 ctx_entries: Vec<ReplEntryV1>,
561 ) -> Result<(), OperationError> {
562 let current_devel_flag = option_env!("KANIDM_PRE_RELEASE").is_some();
565
566 if ctx_domain_version < DOMAIN_MIN_LEVEL {
567 error!("Unable to proceed with consumer refresh - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
568 return Err(OperationError::ReplDomainLevelUnsatisfiable);
569 } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
570 error!("Unable to proceed with consumer refresh - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
571 return Err(OperationError::ReplDomainLevelUnsatisfiable);
572 } else if ctx_domain_devel && !current_devel_flag {
573 error!("Unable to proceed with consumer refresh - incoming domain is from a development version while this server is a stable release.");
574 return Err(OperationError::ReplDomainLevelUnsatisfiable);
575 } else if !ctx_domain_devel && current_devel_flag {
576 error!("Unable to proceed with consumer refresh - incoming domain is from a stable version while this server is a development release.");
577 return Err(OperationError::ReplDomainLevelUnsatisfiable);
578 } else {
579 debug!(
580 "Proceeding to refresh from domain at level {}",
581 ctx_domain_version
582 );
583 };
584
585 self.set_phase_bootstrap();
587
588 self.be_txn
590 .set_db_d_uuid(ctx_domain_uuid)
591 .inspect_err(|err| {
592 error!(?err, "Failed to reset domain uuid");
593 })?;
594
595 self.reset_server_uuid()?;
599
600 self.be_txn
602 .danger_delete_all_db_content()
603 .inspect_err(|err| {
604 error!(?err, "Failed to clear existing server database content");
605 })?;
606
607 self.schema.generate_in_memory().inspect_err(|err| {
609 error!(?err, "Failed to reset in memory schema to clean state");
610 })?;
611
612 self.reindex(false).inspect_err(|err| {
615 error!(?err, "Failed to reload schema");
616 })?;
617
618 self.consumer_refresh_create_entries(ctx_schema_entries)
621 .inspect_err(|err| {
622 error!(?err, "Failed to refresh schema entries");
623 })?;
624
625 self.reload_schema().inspect_err(|err| {
627 error!(?err, "Failed to reload schema");
628 })?;
629
630 self.set_phase(ServerPhase::SchemaReady);
632
633 self.reindex(false).inspect_err(|err| {
636 error!(?err, "Failed to reload schema");
637 })?;
638
639 self.consumer_refresh_create_entries(ctx_meta_entries)
641 .inspect_err(|err| {
642 error!(?err, "Failed to refresh meta entries");
643 })?;
644
645 self.reload_domain_info().inspect_err(|err| {
649 error!(?err, "Failed to reload domain info");
650 })?;
651
652 self.changed_flags.insert(
654 ChangeFlag::SCHEMA
655 | ChangeFlag::ACP
656 | ChangeFlag::OAUTH2
657 | ChangeFlag::DOMAIN
658 | ChangeFlag::APPLICATION
659 | ChangeFlag::SYSTEM_CONFIG
660 | ChangeFlag::SYNC_AGREEMENT
661 | ChangeFlag::KEY_MATERIAL,
662 );
663
664 self.set_phase(ServerPhase::DomainInfoReady);
666
667 self.consumer_refresh_create_entries(ctx_entries)
671 .inspect_err(|err| {
672 error!(?err, "Failed to refresh schema entries");
673 })?;
674
675 let ruv = self.be_txn.get_ruv_write();
678
679 ruv.refresh_validate_ruv(&ctx_ranges).inspect_err(|err| {
680 error!(?err, "RUV ranges were not rebuilt correctly.");
681 })?;
682
683 ruv.refresh_update_ruv(&ctx_ranges).inspect_err(|err| {
684 error!(?err, "Unable to update RUV with supplier ranges.");
685 })?;
686
687 self.set_phase(ServerPhase::Running);
689
690 Ok(())
691 }
692}