Skip to main content

kanidmd_lib/repl/
consumer.rs

1use super::proto::*;
2use crate::plugins::Plugins;
3use crate::prelude::*;
4use crate::server::{ChangeFlag, ServerPhase};
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::Arc;
7
8impl QueryServerWriteTransaction<'_> {
9    // Apply the state changes if they are valid.
10
11    fn consumer_incremental_apply_entries(
12        &mut self,
13        ctx_entries: Vec<ReplIncrementalEntryV1>,
14    ) -> Result<bool, OperationError> {
15        // trace!(?ctx_entries);
16
17        // No action needed for this if the entries are empty.
18        if ctx_entries.is_empty() {
19            debug!("No entries to act upon");
20            return Ok(false);
21        }
22
23        /*
24         *  Incremental is very similar to modify in how we have to treat the entries
25         *  with a pre and post state. However we need an incremental prepare so that
26         *  when new entries are provided to us we can merge to a stub and then commit
27         *  it correctly. This takes an extra backend interface that prepares the
28         *  entry stubs for us.
29         */
30
31        // I think we need to rehydrate all the repl content to a partial
32        // entry. This way all the types are consistent and ready.
33        let ctx_entries: Vec<_> = ctx_entries.into_iter().map(
34            EntryIncrementalNew::rehydrate
35        )
36        .collect::<Result<Vec<_>, _>>()
37        .inspect_err(|err| {
38            error!(?err, "Unable to process replication incremental entries to valid entry states for replication");
39        })?;
40
41        trace!(?ctx_entries);
42
43        let db_entries = self
44            .be_txn
45            .incremental_prepare(&ctx_entries)
46            .inspect_err(|err| {
47                error!(?err, "Failed to access entries from db");
48            })?;
49
50        trace!(?db_entries);
51
52        // Need to probably handle conflicts here in this phase. I think they
53        // need to be pushed to a separate list where they are then "created"
54        // as a conflict.
55
56        // First find if entries are in a conflict state. Remember, these conflicts are purely
57        // UUID creation conflicts at this phase in the process.
58
59        let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
60            .iter()
61            .zip(db_entries)
62            .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
63
64        debug!(conflicts = %conflicts.len(), proceed = %proceed.len());
65
66        // Now we have a set of conflicts and a set of entries to proceed.
67        //
68        //    /- entries that need to be created as conflicts.
69        //    |                /- entries that survive and need update to the db in place.
70        //    v                v
71        let (conflict_create, conflict_update): (
72            Vec<Option<EntrySealedNew>>,
73            Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
74        ) = conflicts
75            .into_iter()
76            .map(
77                |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
78                    let (opt_create, ent) =
79                        ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
80                    (opt_create, (ent, db_ent))
81                },
82            )
83            .unzip();
84
85        // ⚠️  If we end up with plugins triggering other entries to conflicts, we DON'T need to
86        // add them to this list. This is just for uuid conflicts, not higher level ones!
87        //
88        // ⚠️  We need to collect this from conflict_update since we may NOT be the originator
89        // server for some conflicts, but we still need to know the UUID is IN the conflict
90        // state for plugins. We also need to do this here before the conflict_update
91        // set is consumed by later steps.
92        //
93        // ⚠️  When we upgrade between two nodes, migrations will often create *new* system
94        // entries on both nodes. Until both nodes upgrade they can't replicate. This creates
95        // a situation where both nodes have identical entry content for system entries, but
96        // the entries that were created now are conflicts. Normally this is okay, because the
97        // first node to upgrade will have it's entries persisted, and the other nodes duplicate
98        // entries will be removed. However, just through the nature of being in the conflict
99        // state, these entries are then added to the conflict_uuid set. This conflict_uuid set
100        // is used by referential integrity to remove uuids from references so that group
101        // memberships don't accidentally leak to recipients that were not intended.
102        //
103        // To avoid this, we remove any system entries from this conflict set, so that they are
104        // exempt from this conflict handling which allows upgrades to work.
105        let mut conflict_uuids: BTreeSet<_> = conflict_update
106            .iter()
107            .filter_map(|(_, e)| {
108                let u = e.get_uuid();
109                if u >= DYNAMIC_RANGE_MINIMUM_UUID {
110                    // It is a user created node, process the conflict within plugins
111                    Some(u)
112                } else {
113                    // It is in a system range, do not process this entry
114                    None
115                }
116            })
117            .collect();
118
119        // Filter out None from conflict_create
120        let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
121
122        let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
123            .into_iter()
124            .map(|(ctx_ent, db_ent)| {
125                // This now is the set of entries that are able to be updated. Merge
126                // their attribute sets/states per the change state rules.
127
128                // This must create an EntryInvalidCommitted
129                let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema, self.trim_cid());
130                (merge_ent, db_ent)
131            })
132            .collect();
133
134        // We now merge the conflict updates and the updates that can proceed. This is correct
135        // since if an entry was conflicting by uuid then there is nothing for it to merge with
136        // so as a result we can just by pass that step. We now have all_updates which is
137        // the set of live entries to write back.
138        let mut all_updates = conflict_update
139            .into_iter()
140            .chain(proceed_update)
141            .collect::<Vec<_>>();
142
143        // ⚠️  This hook is probably not what you want to use for checking entries are consistent.
144        //
145        // The main issue is that at this point we have a set of entries that need to be
146        // created / marked into conflicts, and until that occurs it's hard to proceed with validations
147        // like attr unique because then we would need to walk the various sets to find cases where
148        // an attribute may not be unique "currently" but *would* be unique once the various entries
149        // have then been conflicted and updated.
150        //
151        // Instead we treat this like refint - we allow the database to "temporarily" become
152        // inconsistent, then we fix it immediately. This hook remains for cases in future
153        // where we may wish to have session cleanup performed for example.
154        Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
155            admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
156            e
157        })?;
158
159        // Now we have to schema check our entries. Remember, here because this is
160        // using into_iter it's possible that entries may be conflicted due to becoming
161        // schema invalid during the merge process.
162        let all_updates_valid = all_updates
163            .into_iter()
164            .map(|(ctx_ent, db_ent)| {
165                // Check the schema
166                //
167                // In these cases when an entry fails schema, we mark it to
168                // a conflict state and then retain it in the update process.
169                //
170                // The marking is done INSIDE this function!
171                let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
172                (sealed_ent, db_ent)
173            })
174            .collect::<Vec<_>>();
175
176        // We now have two sets!
177        //
178        // * conflict_create - entries to be created that are conflicted via add statements (duplicate uuid)
179        //                     these are only created on the entry origin node!
180        // * all_updates_valid - this has two types of entries
181        //   * entries that have survived a uuid conflict and need inplace write. Unlikely to become invalid.
182        //   * entries that were merged and are schema valid.
183        //   * entries that were merged and their attribute state has now become invalid and are conflicts.
184        //
185        // incremental_apply here handles both the creations and the update processes to ensure that
186        // everything is updated in a single consistent operation.
187        self.be_txn
188            .incremental_apply(&all_updates_valid, conflict_create)
189            .map_err(|e| {
190                admin_error!("betxn create failure {:?}", e);
191                e
192            })?;
193
194        Plugins::run_post_repl_incremental_conflict(
195            self,
196            all_updates_valid.as_slice(),
197            &mut conflict_uuids,
198        )
199        .map_err(|e| {
200            error!(
201                "Operation failed (post_repl_incremental_conflict plugin), {:?}",
202                e
203            );
204            e
205        })?;
206
207        // Plugins need these unzipped
208        //
209        let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
210            .into_iter()
211            // We previously excluded this to avoid doing unnecessary work on entries that
212            // were moving to a conflict state, and the survivor was staying "as is" on this
213            // node. However, this gets messy with dyngroups and memberof, where on a conflict
214            // the memberships are deleted across the replication boundary. In these cases
215            // we need dyngroups to see the valid entries, even if they are "identical to before"
216            // to re-assert all their memberships are valid.
217            /*
218            .filter(|(cand, _)| {
219                // Exclude anything that is conflicted as a result of the conflict plugins.
220                !conflict_uuids.contains(&cand.get_uuid())
221            })
222            */
223            .unzip();
224
225        // We don't need to process conflict_creates here, since they are all conflicting
226        // uuids which means that the conflict_uuids are all *here* so they will trigger anything
227        // that requires processing anyway. As well conflict_creates may not be the full
228        // set of conflict entries as we may not be the origin node! Conflict_creates is always
229        // a subset of the conflicts.
230        Plugins::run_post_repl_incremental(
231            self,
232            pre_cand.as_slice(),
233            cand.as_slice(),
234            &conflict_uuids,
235        )
236        .map_err(|e| {
237            error!("Operation failed (post_repl_incremental plugin), {:?}", e);
238            e
239        })?;
240
241        self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));
242
243        if !self.changed_flags.contains(ChangeFlag::ACP)
244            && cand
245                .iter()
246                .chain(pre_cand.iter().map(|e| e.as_ref()))
247                .any(|e| {
248                    e.attribute_equality(Attribute::Class, &EntryClass::AccessControlProfile.into())
249                })
250        {
251            self.changed_flags.insert(ChangeFlag::ACP)
252        }
253
254        if !self.changed_flags.contains(ChangeFlag::OAUTH2)
255            && cand
256                .iter()
257                .chain(pre_cand.iter().map(|e| e.as_ref()))
258                .any(|e| {
259                    e.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into())
260                })
261        {
262            self.changed_flags.insert(ChangeFlag::OAUTH2)
263        }
264
265        if !self.changed_flags.contains(ChangeFlag::OAUTH2_CLIENT)
266            && cand
267                .iter()
268                .chain(pre_cand.iter().map(|e| e.as_ref()))
269                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::OAuth2Client.into()))
270        {
271            self.changed_flags.insert(ChangeFlag::OAUTH2_CLIENT)
272        }
273
274        if !self.changed_flags.contains(ChangeFlag::FEATURE)
275            && cand
276                .iter()
277                .chain(pre_cand.iter().map(|e| e.as_ref()))
278                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Feature.into()))
279        {
280            self.changed_flags.insert(ChangeFlag::FEATURE)
281        }
282
283        if !self.changed_flags.contains(ChangeFlag::APPLICATION)
284            && cand
285                .iter()
286                .chain(pre_cand.iter().map(|e| e.as_ref()))
287                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Application.into()))
288        {
289            self.changed_flags.insert(ChangeFlag::APPLICATION)
290        }
291
292        if !self.changed_flags.contains(ChangeFlag::SYNC_AGREEMENT)
293            && cand
294                .iter()
295                .chain(pre_cand.iter().map(|e| e.as_ref()))
296                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::SyncAccount.into()))
297        {
298            self.changed_flags.insert(ChangeFlag::SYNC_AGREEMENT)
299        }
300
301        if !self.changed_flags.contains(ChangeFlag::KEY_MATERIAL)
302            && cand
303                .iter()
304                .chain(pre_cand.iter().map(|e| e.as_ref()))
305                .any(|e| {
306                    e.attribute_equality(Attribute::Class, &EntryClass::KeyProvider.into())
307                        || e.attribute_equality(Attribute::Class, &EntryClass::KeyObject.into())
308                })
309        {
310            self.changed_flags.insert(ChangeFlag::KEY_MATERIAL)
311        }
312
313        trace!(
314            changed = ?self.changed_flags.iter_names().collect::<Vec<_>>(),
315        );
316
317        Ok(true)
318    }
319
320    pub fn consumer_apply_changes(
321        &mut self,
322        ctx: ReplIncrementalContext,
323    ) -> Result<ConsumerState, OperationError> {
324        match ctx {
325            ReplIncrementalContext::DomainMismatch => {
326                error!("Unable to proceed with consumer incremental - the supplier has indicated that our domain_uuid's are not equivalent. This can occur when adding a new consumer to an existing topology.");
327                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
328                Ok(ConsumerState::RefreshRequired)
329            }
330            ReplIncrementalContext::NoChangesAvailable => {
331                debug!("no changes are available");
332                Ok(ConsumerState::Ok)
333            }
334            ReplIncrementalContext::RefreshRequired => {
335                error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
336                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
337                Ok(ConsumerState::RefreshRequired)
338            }
339            ReplIncrementalContext::UnwillingToSupply => {
340                warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
341                error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
342                Ok(ConsumerState::Ok)
343            }
344            ReplIncrementalContext::V1 {
345                domain_version,
346                domain_patch_level,
347                domain_uuid,
348                ranges,
349                schema_entries,
350                meta_entries,
351                entries,
352            } => self.consumer_apply_changes_v1(
353                domain_version,
354                domain_patch_level,
355                domain_uuid,
356                &ranges,
357                schema_entries,
358                meta_entries,
359                entries,
360            ),
361        }
362    }
363
364    #[instrument(level = "debug", skip_all)]
365    fn consumer_apply_changes_v1(
366        &mut self,
367        ctx_domain_version: DomainVersion,
368        ctx_domain_patch_level: u32,
369        ctx_domain_uuid: Uuid,
370        ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
371        ctx_schema_entries: Vec<ReplIncrementalEntryV1>,
372        ctx_meta_entries: Vec<ReplIncrementalEntryV1>,
373        ctx_entries: Vec<ReplIncrementalEntryV1>,
374    ) -> Result<ConsumerState, OperationError> {
375        if ctx_domain_version < DOMAIN_MINIMUM_REPLICATION_LEVEL {
376            error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MINIMUM_REPLICATION_LEVEL);
377            return Err(OperationError::ReplDomainLevelUnsatisfiable);
378        } else if ctx_domain_version > DOMAIN_MAXIMUM_REPLICATION_LEVEL {
379            error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAXIMUM_REPLICATION_LEVEL);
380            return Err(OperationError::ReplDomainLevelUnsatisfiable);
381        };
382
383        let domain_patch_level = if self.get_domain_development_taint() {
384            u32::MAX
385        } else {
386            self.get_domain_patch_level()
387        };
388
389        if ctx_domain_patch_level != domain_patch_level {
390            error!("Unable to proceed with consumer incremental - incoming domain patch level is not equal to our patch level. {} != {}", ctx_domain_patch_level, domain_patch_level);
391            return Err(OperationError::ReplDomainLevelUnsatisfiable);
392        };
393
394        // Assert that the d_uuid matches the repl domain uuid.
395        let db_uuid = self.be_txn.get_db_d_uuid()?;
396
397        if db_uuid != ctx_domain_uuid {
398            error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
399            return Err(OperationError::ReplDomainUuidMismatch);
400        }
401
402        // Preflight checks of the incoming RUV to ensure it's in a good state.
403        let txn_cid = self.get_cid().clone();
404        let ruv = self.be_txn.get_ruv_write();
405
406        let change_count = ctx_schema_entries.len() + ctx_meta_entries.len() + ctx_entries.len();
407
408        ruv.incremental_preflight_validate_ruv(ctx_ranges, &txn_cid)
409            .inspect_err(|err| {
410                error!(
411                    ?err,
412                    "Incoming RUV failed preflight checks, unable to proceed."
413                );
414            })?;
415
416        // == ⚠️  Below this point we begin to make changes! ==
417        debug!(
418            "Proceeding to apply incremental with {change_count} changes from domain {ctx_domain_uuid:?} at level {ctx_domain_version}"
419        );
420
421        debug!(?ctx_ranges);
422
423        if ctx_domain_version < DOMAIN_LEVEL_1_11 {
424            // Schema is an in memory property attached to domain level from 1_11, and so the meta entries
425            // will trigger the schema to reload if required.
426            debug!("Applying {} schema entries", ctx_schema_entries.len());
427            // Apply the schema entries first.
428            let schema_changed = self
429                .consumer_incremental_apply_entries(ctx_schema_entries)
430                .inspect_err(|err| {
431                    error!(?err, "Failed to apply incremental schema entries");
432                })?;
433
434            if schema_changed {
435                // We need to reload schema now!
436                self.reload_schema().inspect_err(|err| {
437                    error!(?err, "Failed to reload schema");
438                })?;
439            }
440        } else {
441            self.reload_schema().inspect_err(|err| {
442                error!(?err, "Failed to reload schema");
443            })?;
444        }
445
446        debug!("Applying {} meta entries", ctx_meta_entries.len());
447        // Apply meta entries now.
448        let meta_changed = self
449            .consumer_incremental_apply_entries(ctx_meta_entries)
450            .inspect_err(|err| {
451                error!(?err, "Failed to apply incremental meta entries");
452            })?;
453
454        // This is re-loaded in case the domain name changed on the remote
455        if meta_changed {
456            self.reload_domain_info().inspect_err(|err| {
457                error!(?err, "Failed to reload domain info");
458            })?;
459            self.reload_system_config().inspect_err(|err| {
460                error!(?err, "Failed to reload system configuration");
461            })?;
462        }
463
464        debug!("Applying {} context entries", ctx_entries.len());
465        // Update all other entries now.
466        self.consumer_incremental_apply_entries(ctx_entries)
467            .inspect_err(|err| {
468                error!(?err, "Failed to apply incremental meta entries");
469            })?;
470
471        // Reload the domain version, doing any needed migrations.
472        //
473        // While it seems odd that we do the migrations after we receive the entries,
474        // this is because the supplier will already be sending us everything that
475        // was just migrated. As a result, we only need to apply the migrations to entries
476        // that were not on the supplier, and therefore need updates here.
477        if meta_changed {
478            self.reload_domain_info_version().inspect_err(|err| {
479                error!(?err, "Failed to reload domain info version");
480            })?;
481        }
482
483        // Finally, confirm that the ranges that we have added match the ranges from our
484        // context. Note that we get this in a writeable form!
485        let ruv = self.be_txn.get_ruv_write();
486
487        ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
488            error!(?err, "RUV ranges were not rebuilt correctly.");
489        })?;
490
491        ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
492            error!(?err, "Unable to update RUV with supplier ranges.");
493        })?;
494
495        Ok(ConsumerState::Ok)
496    }
497
498    pub fn consumer_apply_refresh(
499        &mut self,
500        ctx: ReplRefreshContext,
501    ) -> Result<(), OperationError> {
502        match ctx {
503            ReplRefreshContext::V1 {
504                domain_version,
505                domain_devel,
506                domain_uuid,
507                ranges,
508                schema_entries,
509                meta_entries,
510                entries,
511            } => self.consumer_apply_refresh_v1(
512                domain_version,
513                domain_devel,
514                domain_uuid,
515                &ranges,
516                schema_entries,
517                meta_entries,
518                entries,
519            ),
520        }
521    }
522
523    fn consumer_refresh_create_entries(
524        &mut self,
525        ctx_entries: Vec<ReplEntryV1>,
526    ) -> Result<(), OperationError> {
527        let candidates = ctx_entries
528            .into_iter()
529            .map(EntryRefreshNew::from_repl_entry_v1)
530            .collect::<Result<Vec<EntryRefreshNew>, _>>()
531            .inspect_err(|err| {
532                error!(?err, "Failed to convert entries from supplier");
533            })?;
534
535        Plugins::run_pre_repl_refresh(self, candidates.as_slice()).map_err(|e| {
536            admin_error!(
537                "Refresh operation failed (pre_repl_refresh plugin), {:?}",
538                e
539            );
540            e
541        })?;
542
543        // No need to assign CID's since this is a repl import.
544        let norm_cand = candidates
545            .into_iter()
546            .map(|e| {
547                e.validate(&self.schema)
548                    .map_err(|e| {
549                        admin_error!("Schema Violation in refresh validate {:?}", e);
550                        OperationError::SchemaViolation(e)
551                    })
552                    .map(|e| {
553                        // Then seal the changes?
554                        e.seal(&self.schema)
555                    })
556            })
557            .collect::<Result<Vec<EntrySealedNew>, _>>()?;
558
559        let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
560            admin_error!("betxn create failure {:?}", e);
561            e
562        })?;
563
564        Plugins::run_post_repl_refresh(self, &commit_cand).map_err(|e| {
565            admin_error!(
566                "Refresh operation failed (post_repl_refresh plugin), {:?}",
567                e
568            );
569            e
570        })?;
571
572        self.changed_uuid
573            .extend(commit_cand.iter().map(|e| e.get_uuid()));
574
575        Ok(())
576    }
577
578    #[instrument(level = "info", skip_all)]
579    fn consumer_apply_refresh_v1(
580        &mut self,
581        ctx_domain_version: DomainVersion,
582        ctx_domain_devel: bool,
583        ctx_domain_uuid: Uuid,
584        ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
585        ctx_schema_entries: Vec<ReplEntryV1>,
586        ctx_meta_entries: Vec<ReplEntryV1>,
587        ctx_entries: Vec<ReplEntryV1>,
588    ) -> Result<(), OperationError> {
589        // Can we apply the domain version validly?
590        // if domain_version >= min_support ...
591        let current_devel_flag = option_env!("KANIDM_PRE_RELEASE").is_some();
592
593        if ctx_domain_version < DOMAIN_MINIMUM_REPLICATION_LEVEL {
594            error!("Unable to proceed with consumer refresh - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MINIMUM_REPLICATION_LEVEL);
595            return Err(OperationError::ReplDomainLevelUnsatisfiable);
596        } else if ctx_domain_version > DOMAIN_MAXIMUM_REPLICATION_LEVEL {
597            error!("Unable to proceed with consumer refresh - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAXIMUM_REPLICATION_LEVEL);
598            return Err(OperationError::ReplDomainLevelUnsatisfiable);
599        } else if ctx_domain_devel && !current_devel_flag {
600            error!("Unable to proceed with consumer refresh - incoming domain is from a development version while this server is a stable release.");
601            return Err(OperationError::ReplDomainLevelUnsatisfiable);
602        } else if !ctx_domain_devel && current_devel_flag {
603            error!("Unable to proceed with consumer refresh - incoming domain is from a stable version while this server is a development release.");
604            return Err(OperationError::ReplDomainLevelUnsatisfiable);
605        } else {
606            debug!(
607                "Proceeding to refresh from domain at level {}",
608                ctx_domain_version
609            );
610        };
611
612        // == ⚠️  Below this point we begin to make changes! ==
613        self.set_phase_bootstrap();
614
615        // Update the d_uuid. This is what defines us as being part of this repl topology!
616        self.be_txn
617            .set_db_d_uuid(ctx_domain_uuid)
618            .inspect_err(|err| {
619                error!(?err, "Failed to reset domain uuid");
620            })?;
621
622        // We need to reset our server uuid now. This is so that any other servers
623        // which had our former server_uuid in their RUV, is able to start to age it
624        // out and trim it.
625        self.reset_server_uuid()?;
626
627        // Delete all entries - *proper delete, not just tombstone!*
628        self.be_txn
629            .danger_delete_all_db_content()
630            .inspect_err(|err| {
631                error!(?err, "Failed to clear existing server database content");
632            })?;
633
634        // Reset this transactions schema to a completely clean slate.
635
636        // Schema is an in memory property attached to domain level from 1_11, and so we don't actually
637        // need to apply these meta entries.
638        if ctx_domain_version < DOMAIN_LEVEL_1_11 {
639            self.schema.generate_in_memory().inspect_err(|err| {
640                error!(?err, "Failed to reset in memory schema to clean state");
641            })?;
642
643            // Reindex now to force some basic indexes to exist
644            self.reindex(false).inspect_err(|err| {
645                error!(?err, "Failed to reload schema");
646            })?;
647
648            // Apply the schema entries first. This is the foundation that everything
649            // else will build upon!
650            self.consumer_refresh_create_entries(ctx_schema_entries)
651                .inspect_err(|err| {
652                    error!(?err, "Failed to refresh schema entries");
653                })?;
654        }
655
656        // We need to reload schema now! From 1.11 and above, this is what actually
657        // generates the in memory schema.
658        self.reload_schema().inspect_err(|err| {
659            error!(?err, "Failed to reload schema");
660        })?;
661
662        // Schema is now ready
663        self.set_phase(ServerPhase::SchemaReady);
664
665        // We have to reindex to force all the existing indexes to be dumped
666        // and recreated before we start to import.
667        self.reindex(false).inspect_err(|err| {
668            error!(?err, "Failed to reload schema");
669        })?;
670
671        // Apply the domain info entry / system info / system config entry?
672        self.consumer_refresh_create_entries(ctx_meta_entries)
673            .inspect_err(|err| {
674                error!(?err, "Failed to refresh meta entries");
675            })?;
676
677        // NOTE: The domain info we receive here will have the domain version populated!
678        // That's okay though, because all the incoming data is already at the right
679        // version!
680        self.reload_domain_info().inspect_err(|err| {
681            error!(?err, "Failed to reload domain info");
682        })?;
683
684        // Mark that everything changed so that post commit hooks function as expected.
685        self.changed_flags.insert(
686            ChangeFlag::SCHEMA
687                | ChangeFlag::ACP
688                | ChangeFlag::OAUTH2
689                | ChangeFlag::OAUTH2_CLIENT
690                | ChangeFlag::FEATURE
691                | ChangeFlag::DOMAIN
692                | ChangeFlag::APPLICATION
693                | ChangeFlag::SYSTEM_CONFIG
694                | ChangeFlag::SYNC_AGREEMENT
695                | ChangeFlag::KEY_MATERIAL,
696        );
697
698        // Domain info is now ready.
699        self.set_phase(ServerPhase::DomainInfoReady);
700
701        // ==== That's it! We are GOOD to go! ====
702
703        // Create all the entries. Note we don't hit plugins here beside post repl plugs.
704        self.consumer_refresh_create_entries(ctx_entries)
705            .inspect_err(|err| {
706                error!(?err, "Failed to refresh main db entries");
707            })?;
708
709        // Finally, confirm that the ranges that we have recreated match the ranges from our
710        // context. Note that we get this in a writeable form!
711        let ruv = self.be_txn.get_ruv_write();
712
713        ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
714            error!(?err, "RUV ranges were not rebuilt correctly.");
715        })?;
716
717        ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
718            error!(?err, "Unable to update RUV with supplier ranges.");
719        })?;
720
721        // Refresh complete
722        self.set_phase(ServerPhase::Running);
723
724        Ok(())
725    }
726}