1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
use super::proto::*;
use crate::plugins::Plugins;
use crate::prelude::*;
use crate::server::ChangeFlag;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;

impl<'a> QueryServerWriteTransaction<'a> {
    // Apply the state changes if they are valid.

    fn consumer_incremental_apply_entries(
        &mut self,
        ctx_entries: Vec<ReplIncrementalEntryV1>,
    ) -> Result<bool, OperationError> {
        // trace!(?ctx_entries);

        // No action needed for this if the entries are empty.
        if ctx_entries.is_empty() {
            debug!("No entries to act upon");
            return Ok(false);
        }

        /*
         *  Incremental is very similar to modify in how we have to treat the entries
         *  with a pre and post state. However we need an incremental prepare so that
         *  when new entries are provided to us we can merge to a stub and then commit
         *  it correctly. This takes an extra backend interface that prepares the
         *  entry stubs for us.
         */

        // I think we need to rehydrate all the repl content to a partial
        // entry. This way all the types are consistent and ready.
        let ctx_entries: Vec<_> = ctx_entries.into_iter().map(
            EntryIncrementalNew::rehydrate
        )
        .collect::<Result<Vec<_>, _>>()
        .map_err(|e| {
            error!(err = ?e, "Unable to process replication incremental entries to valid entry states for replication");
            e
        })?;

        trace!(?ctx_entries);

        let db_entries = self
            .be_txn
            .incremental_prepare(&ctx_entries)
            .inspect_err(|err| {
                error!(?err, "Failed to access entries from db");
            })?;

        trace!(?db_entries);

        // Need to probably handle conflicts here in this phase. I think they
        // need to be pushed to a separate list where they are then "created"
        // as a conflict.

        // First find if entries are in a conflict state.

        let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
            .iter()
            .zip(db_entries)
            .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));

        debug!(conflicts = %conflicts.len(), proceed = %proceed.len());

        // Now we have a set of conflicts and a set of entries to proceed.
        //
        //    /- entries that need to be created as conflicts.
        //    |                /- entries that survive and need update to the db in place.
        //    v                v
        let (conflict_create, conflict_update): (
            Vec<Option<EntrySealedNew>>,
            Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
        ) = conflicts
            .into_iter()
            .map(
                |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
                    let (opt_create, ent) =
                        ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
                    (opt_create, (ent, db_ent))
                },
            )
            .unzip();

        // ⚠️  If we end up with plugins triggering other entries to conflicts, we DON'T need to
        // add them to this list. This is just for uuid conflicts, not higher level ones!
        //
        // ⚠️  We need to collect this from conflict_update since we may NOT be the originator
        // server for some conflicts, but we still need to know the UUID is IN the conflict
        // state for plugins. We also need to do this here before the conflict_update
        // set is consumed by later steps.
        //
        // ⚠️  When we upgrade between two nodes, migrations will often create *new* system
        // entries on both nodes. Until both nodes upgrade they can't replicate. This creates
        // a situation where both nodes have identical entry content for system entries, but
        // the entries that were created now are conflicts. Normally this is okay, because the
        // first node to upgrade will have it's entries persisted, and the other nodes duplicate
        // entries will be removed. However, just through the nature of being in the conflict
        // state, these entries are then added to the conflict_uuid set. This conflict_uuid set
        // is used by referential integrity to remove uuids from references so that group
        // memberships don't accidentally leak to recipients that were not intended.
        //
        // To avoid this, we remove any system entries from this conflict set, so that they are
        // exempt from this conflict handling which allows upgrades to work.
        let mut conflict_uuids: BTreeSet<_> = conflict_update
            .iter()
            .filter_map(|(_, e)| {
                let u = e.get_uuid();
                if u >= DYNAMIC_RANGE_MINIMUM_UUID {
                    // It is a user created node, process the conflict within plugins
                    Some(u)
                } else {
                    // It is in a system range, do not process this entry
                    None
                }
            })
            .collect();

        // Filter out None from conflict_create
        let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();

        let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
            .into_iter()
            .map(|(ctx_ent, db_ent)| {
                // This now is the set of entries that are able to be updated. Merge
                // their attribute sets/states per the change state rules.

                // This must create an EntryInvalidCommitted
                let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema, self.trim_cid());
                (merge_ent, db_ent)
            })
            .collect();

        // We now merge the conflict updates and the updates that can proceed. This is correct
        // since if an entry was conflicting by uuid then there is nothing for it to merge with
        // so as a result we can just by pass that step. We now have all_updates which is
        // the set of live entries to write back.
        let mut all_updates = conflict_update
            .into_iter()
            .chain(proceed_update)
            .collect::<Vec<_>>();

        // ⚠️  This hook is probably not what you want to use for checking entries are consistent.
        //
        // The main issue is that at this point we have a set of entries that need to be
        // created / marked into conflicts, and until that occurs it's hard to proceed with validations
        // like attr unique because then we would need to walk the various sets to find cases where
        // an attribute may not be unique "currently" but *would* be unique once the various entries
        // have then been conflicted and updated.
        //
        // Instead we treat this like refint - we allow the database to "temporarily" become
        // inconsistent, then we fix it immediately. This hook remains for cases in future
        // where we may wish to have session cleanup performed for example.
        Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
            admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
            e
        })?;

        // Now we have to schema check our entries. Remember, here because this is
        // using into_iter it's possible that entries may be conflicted due to becoming
        // schema invalid during the merge process.
        let all_updates_valid = all_updates
            .into_iter()
            .map(|(ctx_ent, db_ent)| {
                // Check the schema
                //
                // In these cases when an entry fails schema, we mark it to
                // a conflict state and then retain it in the update process.
                //
                // The marking is done INSIDE this function!
                let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
                (sealed_ent, db_ent)
            })
            .collect::<Vec<_>>();

        // We now have two sets!
        //
        // * conflict_create - entries to be created that are conflicted via add statements (duplicate uuid)
        //                     these are only created on the entry origin node!
        // * all_updates_valid - this has two types of entries
        //   * entries that have survived a uuid conflict and need inplace write. Unlikely to become invalid.
        //   * entries that were merged and are schema valid.
        //   * entries that were merged and their attribute state has now become invalid and are conflicts.
        //
        // incremental_apply here handles both the creations and the update processes to ensure that
        // everything is updated in a single consistent operation.
        self.be_txn
            .incremental_apply(&all_updates_valid, conflict_create)
            .map_err(|e| {
                admin_error!("betxn create failure {:?}", e);
                e
            })?;

        Plugins::run_post_repl_incremental_conflict(
            self,
            all_updates_valid.as_slice(),
            &mut conflict_uuids,
        )
        .map_err(|e| {
            error!(
                "Operation failed (post_repl_incremental_conflict plugin), {:?}",
                e
            );
            e
        })?;

        // Plugins need these unzipped
        //
        let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
            .into_iter()
            // We previously excluded this to avoid doing unnecessary work on entries that
            // were moving to a conflict state, and the survivor was staying "as is" on this
            // node. However, this gets messy with dyngroups and memberof, where on a conflict
            // the memberships are deleted across the replication boundary. In these cases
            // we need dyngroups to see the valid entries, even if they are "identical to before"
            // to re-assert all their memberships are valid.
            /*
            .filter(|(cand, _)| {
                // Exclude anything that is conflicted as a result of the conflict plugins.
                !conflict_uuids.contains(&cand.get_uuid())
            })
            */
            .unzip();

        // We don't need to process conflict_creates here, since they are all conflicting
        // uuids which means that the conflict_uuids are all *here* so they will trigger anything
        // that requires processing anyway. As well conflict_creates may not be the full
        // set of conflict entries as we may not be the origin node! Conflict_creates is always
        // a subset of the conflicts.
        Plugins::run_post_repl_incremental(
            self,
            pre_cand.as_slice(),
            cand.as_slice(),
            &conflict_uuids,
        )
        .map_err(|e| {
            error!("Operation failed (post_repl_incremental plugin), {:?}", e);
            e
        })?;

        self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));

        if !self.changed_flags.contains(ChangeFlag::ACP)
            && cand
                .iter()
                .chain(pre_cand.iter().map(|e| e.as_ref()))
                .any(|e| {
                    e.attribute_equality(Attribute::Class, &EntryClass::AccessControlProfile.into())
                })
        {
            self.changed_flags.insert(ChangeFlag::ACP)
        }

        if !self.changed_flags.contains(ChangeFlag::OAUTH2)
            && cand
                .iter()
                .chain(pre_cand.iter().map(|e| e.as_ref()))
                .any(|e| {
                    e.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into())
                })
        {
            self.changed_flags.insert(ChangeFlag::OAUTH2)
        }

        if !self.changed_flags.contains(ChangeFlag::APPLICATION)
            && cand
                .iter()
                .chain(pre_cand.iter().map(|e| e.as_ref()))
                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Application.into()))
        {
            self.changed_flags.insert(ChangeFlag::APPLICATION)
        }

        if !self.changed_flags.contains(ChangeFlag::SYNC_AGREEMENT)
            && cand
                .iter()
                .chain(pre_cand.iter().map(|e| e.as_ref()))
                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::SyncAccount.into()))
        {
            self.changed_flags.insert(ChangeFlag::SYNC_AGREEMENT)
        }

        if !self.changed_flags.contains(ChangeFlag::KEY_MATERIAL)
            && cand
                .iter()
                .chain(pre_cand.iter().map(|e| e.as_ref()))
                .any(|e| {
                    e.attribute_equality(Attribute::Class, &EntryClass::KeyProvider.into())
                        || e.attribute_equality(Attribute::Class, &EntryClass::KeyObject.into())
                })
        {
            self.changed_flags.insert(ChangeFlag::KEY_MATERIAL)
        }

        trace!(
            changed = ?self.changed_flags.iter_names().collect::<Vec<_>>(),
        );

        Ok(true)
    }

    pub fn consumer_apply_changes(
        &mut self,
        ctx: ReplIncrementalContext,
    ) -> Result<ConsumerState, OperationError> {
        match ctx {
            ReplIncrementalContext::DomainMismatch => {
                error!("Unable to proceed with consumer incremental - the supplier has indicated that our domain_uuid's are not equivalent. This can occur when adding a new consumer to an existing topology.");
                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
                Ok(ConsumerState::RefreshRequired)
            }
            ReplIncrementalContext::NoChangesAvailable => {
                info!("no changes are available");
                Ok(ConsumerState::Ok)
            }
            ReplIncrementalContext::RefreshRequired => {
                error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
                Ok(ConsumerState::RefreshRequired)
            }
            ReplIncrementalContext::UnwillingToSupply => {
                warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
                error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
                Ok(ConsumerState::Ok)
            }
            ReplIncrementalContext::V1 {
                domain_version,
                domain_patch_level,
                domain_uuid,
                ranges,
                schema_entries,
                meta_entries,
                entries,
            } => self.consumer_apply_changes_v1(
                domain_version,
                domain_patch_level,
                domain_uuid,
                ranges,
                schema_entries,
                meta_entries,
                entries,
            ),
        }
    }

    #[instrument(level = "debug", skip_all)]
    fn consumer_apply_changes_v1(
        &mut self,
        ctx_domain_version: DomainVersion,
        ctx_domain_patch_level: u32,
        ctx_domain_uuid: Uuid,
        ctx_ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
        ctx_schema_entries: Vec<ReplIncrementalEntryV1>,
        ctx_meta_entries: Vec<ReplIncrementalEntryV1>,
        ctx_entries: Vec<ReplIncrementalEntryV1>,
    ) -> Result<ConsumerState, OperationError> {
        if ctx_domain_version < DOMAIN_MIN_LEVEL {
            error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
            return Err(OperationError::ReplDomainLevelUnsatisfiable);
        } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
            error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
            return Err(OperationError::ReplDomainLevelUnsatisfiable);
        };

        let domain_patch_level = if self.get_domain_development_taint() {
            u32::MAX
        } else {
            self.get_domain_patch_level()
        };

        if ctx_domain_patch_level != domain_patch_level {
            error!("Unable to proceed with consumer incremental - incoming domain patch level is not equal to our patch level. {} != {}", ctx_domain_patch_level, domain_patch_level);
            return Err(OperationError::ReplDomainLevelUnsatisfiable);
        };

        // Assert that the d_uuid matches the repl domain uuid.
        let db_uuid = self.be_txn.get_db_d_uuid()?;

        if db_uuid != ctx_domain_uuid {
            error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
            return Err(OperationError::ReplDomainUuidMismatch);
        }

        // Preflight checks of the incoming RUV to ensure it's in a good state.
        let txn_cid = self.get_cid().clone();
        let ruv = self.be_txn.get_ruv_write();

        ruv.incremental_preflight_validate_ruv(&ctx_ranges, &txn_cid)
            .inspect_err(|err| {
                error!(
                    ?err,
                    "Incoming RUV failed preflight checks, unable to proceed."
                );
            })?;

        // == ⚠️  Below this point we begin to make changes! ==
        info!(
            "Proceeding to apply incremental from domain {:?} at level {}",
            ctx_domain_uuid, ctx_domain_version
        );

        debug!(?ctx_ranges);

        debug!("Applying schema entries");
        // Apply the schema entries first.
        let schema_changed = self
            .consumer_incremental_apply_entries(ctx_schema_entries)
            .inspect_err(|err| {
                error!(?err, "Failed to apply incremental schema entries");
            })?;

        if schema_changed {
            // We need to reload schema now!
            self.reload_schema().inspect_err(|err| {
                error!(?err, "Failed to reload schema");
            })?;
        }

        debug!("Applying meta entries");
        // Apply meta entries now.
        let meta_changed = self
            .consumer_incremental_apply_entries(ctx_meta_entries)
            .inspect_err(|err| {
                error!(?err, "Failed to apply incremental meta entries");
            })?;

        // This is re-loaded in case the domain name changed on the remote
        if meta_changed {
            self.reload_domain_info().inspect_err(|err| {
                error!(?err, "Failed to reload domain info");
            })?;
            self.reload_system_config().inspect_err(|err| {
                error!(?err, "Failed to reload system configuration");
            })?;
        }

        debug!("Applying all context entries");
        // Update all other entries now.
        self.consumer_incremental_apply_entries(ctx_entries)
            .inspect_err(|err| {
                error!(?err, "Failed to apply incremental meta entries");
            })?;

        // Reload the domain version, doing any needed migrations.
        //
        // While it seems odd that we do the migrations after we receive the entries,
        // this is because the supplier will already be sending us everything that
        // was just migrated. As a result, we only need to apply the migrations to entries
        // that were not on the supplier, and therefore need updates here.
        if meta_changed {
            self.reload_domain_info_version().inspect_err(|err| {
                error!(?err, "Failed to reload domain info version");
            })?;
        }

        // Finally, confirm that the ranges that we have added match the ranges from our
        // context. Note that we get this in a writeable form!
        let ruv = self.be_txn.get_ruv_write();

        ruv.refresh_validate_ruv(&ctx_ranges).inspect_err(|err| {
            error!(?err, "RUV ranges were not rebuilt correctly.");
        })?;

        ruv.refresh_update_ruv(&ctx_ranges).inspect_err(|err| {
            error!(?err, "Unable to update RUV with supplier ranges.");
        })?;

        Ok(ConsumerState::Ok)
    }

    pub fn consumer_apply_refresh(
        &mut self,
        ctx: ReplRefreshContext,
    ) -> Result<(), OperationError> {
        match ctx {
            ReplRefreshContext::V1 {
                domain_version,
                domain_devel,
                domain_uuid,
                ranges,
                schema_entries,
                meta_entries,
                entries,
            } => self.consumer_apply_refresh_v1(
                domain_version,
                domain_devel,
                domain_uuid,
                ranges,
                schema_entries,
                meta_entries,
                entries,
            ),
        }
    }

    fn consumer_refresh_create_entries(
        &mut self,
        ctx_entries: Vec<ReplEntryV1>,
    ) -> Result<(), OperationError> {
        let candidates = ctx_entries
            .into_iter()
            .map(EntryRefreshNew::from_repl_entry_v1)
            .collect::<Result<Vec<EntryRefreshNew>, _>>()
            .inspect_err(|err| {
                error!(?err, "Failed to convert entries from supplier");
            })?;

        Plugins::run_pre_repl_refresh(self, candidates.as_slice()).map_err(|e| {
            admin_error!(
                "Refresh operation failed (pre_repl_refresh plugin), {:?}",
                e
            );
            e
        })?;

        // No need to assign CID's since this is a repl import.
        let norm_cand = candidates
            .into_iter()
            .map(|e| {
                e.validate(&self.schema)
                    .map_err(|e| {
                        admin_error!("Schema Violation in refresh validate {:?}", e);
                        OperationError::SchemaViolation(e)
                    })
                    .map(|e| {
                        // Then seal the changes?
                        e.seal(&self.schema)
                    })
            })
            .collect::<Result<Vec<EntrySealedNew>, _>>()?;

        let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
            admin_error!("betxn create failure {:?}", e);
            e
        })?;

        Plugins::run_post_repl_refresh(self, &commit_cand).map_err(|e| {
            admin_error!(
                "Refresh operation failed (post_repl_refresh plugin), {:?}",
                e
            );
            e
        })?;

        self.changed_uuid
            .extend(commit_cand.iter().map(|e| e.get_uuid()));

        Ok(())
    }

    #[instrument(level = "debug", skip_all)]
    fn consumer_apply_refresh_v1(
        &mut self,
        ctx_domain_version: DomainVersion,
        ctx_domain_devel: bool,
        ctx_domain_uuid: Uuid,
        ctx_ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
        ctx_schema_entries: Vec<ReplEntryV1>,
        ctx_meta_entries: Vec<ReplEntryV1>,
        ctx_entries: Vec<ReplEntryV1>,
    ) -> Result<(), OperationError> {
        // Can we apply the domain version validly?
        // if domain_version >= min_support ...
        let current_devel_flag = option_env!("KANIDM_PRE_RELEASE").is_some();

        if ctx_domain_version < DOMAIN_MIN_LEVEL {
            error!("Unable to proceed with consumer refresh - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
            return Err(OperationError::ReplDomainLevelUnsatisfiable);
        } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
            error!("Unable to proceed with consumer refresh - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
            return Err(OperationError::ReplDomainLevelUnsatisfiable);
        } else if ctx_domain_devel && !current_devel_flag {
            error!("Unable to proceed with consumer refresh - incoming domain is from a development version while this server is a stable release.");
            return Err(OperationError::ReplDomainLevelUnsatisfiable);
        } else if !ctx_domain_devel && current_devel_flag {
            error!("Unable to proceed with consumer refresh - incoming domain is from a stable version while this server is a development release.");
            return Err(OperationError::ReplDomainLevelUnsatisfiable);
        } else {
            debug!(
                "Proceeding to refresh from domain at level {}",
                ctx_domain_version
            );
        };

        // == ⚠️  Below this point we begin to make changes! ==

        // Update the d_uuid. This is what defines us as being part of this repl topology!
        self.be_txn
            .set_db_d_uuid(ctx_domain_uuid)
            .inspect_err(|err| {
                error!(?err, "Failed to reset domain uuid");
            })?;

        // We need to reset our server uuid now. This is so that any other servers
        // which had our former server_uuid in their RUV, is able to start to age it
        // out and trim it.
        self.reset_server_uuid()?;

        // Delete all entries - *proper delete, not just tombstone!*

        self.be_txn
            .danger_delete_all_db_content()
            .inspect_err(|err| {
                error!(?err, "Failed to clear existing server database content");
            })?;

        // Reset this transactions schema to a completely clean slate.
        self.schema.generate_in_memory().inspect_err(|err| {
            error!(?err, "Failed to reset in memory schema to clean state");
        })?;

        // Apply the schema entries first. This is the foundation that everything
        // else will build upon!
        self.consumer_refresh_create_entries(ctx_schema_entries)
            .inspect_err(|err| {
                error!(?err, "Failed to refresh schema entries");
            })?;

        // We need to reload schema now!
        self.reload_schema().inspect_err(|err| {
            error!(?err, "Failed to reload schema");
        })?;

        // We have to reindex to force all the existing indexes to be dumped
        // and recreated before we start to import.
        self.reindex().inspect_err(|err| {
            error!(?err, "Failed to reload schema");
        })?;

        // Apply the domain info entry / system info / system config entry?
        self.consumer_refresh_create_entries(ctx_meta_entries)
            .inspect_err(|err| {
                error!(?err, "Failed to refresh meta entries");
            })?;

        // NOTE: The domain info we receive here will have the domain version populated!
        // That's okay though, because all the incoming data is already at the right
        // version!
        self.reload_domain_info().inspect_err(|err| {
            error!(?err, "Failed to reload domain info");
        })?;

        // Mark that everything changed so that post commit hooks function as expected.
        self.changed_flags.insert(
            ChangeFlag::SCHEMA
                | ChangeFlag::ACP
                | ChangeFlag::OAUTH2
                | ChangeFlag::DOMAIN
                | ChangeFlag::APPLICATION
                | ChangeFlag::SYSTEM_CONFIG
                | ChangeFlag::SYNC_AGREEMENT
                | ChangeFlag::KEY_MATERIAL,
        );

        // That's it! We are GOOD to go!

        // Create all the entries. Note we don't hit plugins here beside post repl plugs.
        self.consumer_refresh_create_entries(ctx_entries)
            .inspect_err(|err| {
                error!(?err, "Failed to refresh schema entries");
            })?;

        // Finally, confirm that the ranges that we have recreated match the ranges from our
        // context. Note that we get this in a writeable form!
        let ruv = self.be_txn.get_ruv_write();

        ruv.refresh_validate_ruv(&ctx_ranges).inspect_err(|err| {
            error!(?err, "RUV ranges were not rebuilt correctly.");
        })?;

        ruv.refresh_update_ruv(&ctx_ranges).inspect_err(|err| {
            error!(?err, "Unable to update RUV with supplier ranges.");
        })?;

        Ok(())
    }
}