kanidmd_lib/repl/
consumer.rs

1use super::proto::*;
2use crate::plugins::Plugins;
3use crate::prelude::*;
4use crate::server::{ChangeFlag, ServerPhase};
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::Arc;
7
8impl QueryServerWriteTransaction<'_> {
9    // Apply the state changes if they are valid.
10
11    fn consumer_incremental_apply_entries(
12        &mut self,
13        ctx_entries: Vec<ReplIncrementalEntryV1>,
14    ) -> Result<bool, OperationError> {
15        // trace!(?ctx_entries);
16
17        // No action needed for this if the entries are empty.
18        if ctx_entries.is_empty() {
19            debug!("No entries to act upon");
20            return Ok(false);
21        }
22
23        /*
24         *  Incremental is very similar to modify in how we have to treat the entries
25         *  with a pre and post state. However we need an incremental prepare so that
26         *  when new entries are provided to us we can merge to a stub and then commit
27         *  it correctly. This takes an extra backend interface that prepares the
28         *  entry stubs for us.
29         */
30
31        // I think we need to rehydrate all the repl content to a partial
32        // entry. This way all the types are consistent and ready.
33        let ctx_entries: Vec<_> = ctx_entries.into_iter().map(
34            EntryIncrementalNew::rehydrate
35        )
36        .collect::<Result<Vec<_>, _>>()
37        .map_err(|e| {
38            error!(err = ?e, "Unable to process replication incremental entries to valid entry states for replication");
39            e
40        })?;
41
42        trace!(?ctx_entries);
43
44        let db_entries = self
45            .be_txn
46            .incremental_prepare(&ctx_entries)
47            .inspect_err(|err| {
48                error!(?err, "Failed to access entries from db");
49            })?;
50
51        trace!(?db_entries);
52
53        // Need to probably handle conflicts here in this phase. I think they
54        // need to be pushed to a separate list where they are then "created"
55        // as a conflict.
56
57        // First find if entries are in a conflict state.
58
59        let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
60            .iter()
61            .zip(db_entries)
62            .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
63
64        debug!(conflicts = %conflicts.len(), proceed = %proceed.len());
65
66        // Now we have a set of conflicts and a set of entries to proceed.
67        //
68        //    /- entries that need to be created as conflicts.
69        //    |                /- entries that survive and need update to the db in place.
70        //    v                v
71        let (conflict_create, conflict_update): (
72            Vec<Option<EntrySealedNew>>,
73            Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
74        ) = conflicts
75            .into_iter()
76            .map(
77                |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
78                    let (opt_create, ent) =
79                        ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
80                    (opt_create, (ent, db_ent))
81                },
82            )
83            .unzip();
84
85        // ⚠️  If we end up with plugins triggering other entries to conflicts, we DON'T need to
86        // add them to this list. This is just for uuid conflicts, not higher level ones!
87        //
88        // ⚠️  We need to collect this from conflict_update since we may NOT be the originator
89        // server for some conflicts, but we still need to know the UUID is IN the conflict
90        // state for plugins. We also need to do this here before the conflict_update
91        // set is consumed by later steps.
92        //
93        // ⚠️  When we upgrade between two nodes, migrations will often create *new* system
94        // entries on both nodes. Until both nodes upgrade they can't replicate. This creates
95        // a situation where both nodes have identical entry content for system entries, but
96        // the entries that were created now are conflicts. Normally this is okay, because the
97        // first node to upgrade will have it's entries persisted, and the other nodes duplicate
98        // entries will be removed. However, just through the nature of being in the conflict
99        // state, these entries are then added to the conflict_uuid set. This conflict_uuid set
100        // is used by referential integrity to remove uuids from references so that group
101        // memberships don't accidentally leak to recipients that were not intended.
102        //
103        // To avoid this, we remove any system entries from this conflict set, so that they are
104        // exempt from this conflict handling which allows upgrades to work.
105        let mut conflict_uuids: BTreeSet<_> = conflict_update
106            .iter()
107            .filter_map(|(_, e)| {
108                let u = e.get_uuid();
109                if u >= DYNAMIC_RANGE_MINIMUM_UUID {
110                    // It is a user created node, process the conflict within plugins
111                    Some(u)
112                } else {
113                    // It is in a system range, do not process this entry
114                    None
115                }
116            })
117            .collect();
118
119        // Filter out None from conflict_create
120        let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
121
122        let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
123            .into_iter()
124            .map(|(ctx_ent, db_ent)| {
125                // This now is the set of entries that are able to be updated. Merge
126                // their attribute sets/states per the change state rules.
127
128                // This must create an EntryInvalidCommitted
129                let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema, self.trim_cid());
130                (merge_ent, db_ent)
131            })
132            .collect();
133
134        // We now merge the conflict updates and the updates that can proceed. This is correct
135        // since if an entry was conflicting by uuid then there is nothing for it to merge with
136        // so as a result we can just by pass that step. We now have all_updates which is
137        // the set of live entries to write back.
138        let mut all_updates = conflict_update
139            .into_iter()
140            .chain(proceed_update)
141            .collect::<Vec<_>>();
142
143        // ⚠️  This hook is probably not what you want to use for checking entries are consistent.
144        //
145        // The main issue is that at this point we have a set of entries that need to be
146        // created / marked into conflicts, and until that occurs it's hard to proceed with validations
147        // like attr unique because then we would need to walk the various sets to find cases where
148        // an attribute may not be unique "currently" but *would* be unique once the various entries
149        // have then been conflicted and updated.
150        //
151        // Instead we treat this like refint - we allow the database to "temporarily" become
152        // inconsistent, then we fix it immediately. This hook remains for cases in future
153        // where we may wish to have session cleanup performed for example.
154        Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
155            admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
156            e
157        })?;
158
159        // Now we have to schema check our entries. Remember, here because this is
160        // using into_iter it's possible that entries may be conflicted due to becoming
161        // schema invalid during the merge process.
162        let all_updates_valid = all_updates
163            .into_iter()
164            .map(|(ctx_ent, db_ent)| {
165                // Check the schema
166                //
167                // In these cases when an entry fails schema, we mark it to
168                // a conflict state and then retain it in the update process.
169                //
170                // The marking is done INSIDE this function!
171                let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
172                (sealed_ent, db_ent)
173            })
174            .collect::<Vec<_>>();
175
176        // We now have two sets!
177        //
178        // * conflict_create - entries to be created that are conflicted via add statements (duplicate uuid)
179        //                     these are only created on the entry origin node!
180        // * all_updates_valid - this has two types of entries
181        //   * entries that have survived a uuid conflict and need inplace write. Unlikely to become invalid.
182        //   * entries that were merged and are schema valid.
183        //   * entries that were merged and their attribute state has now become invalid and are conflicts.
184        //
185        // incremental_apply here handles both the creations and the update processes to ensure that
186        // everything is updated in a single consistent operation.
187        self.be_txn
188            .incremental_apply(&all_updates_valid, conflict_create)
189            .map_err(|e| {
190                admin_error!("betxn create failure {:?}", e);
191                e
192            })?;
193
194        Plugins::run_post_repl_incremental_conflict(
195            self,
196            all_updates_valid.as_slice(),
197            &mut conflict_uuids,
198        )
199        .map_err(|e| {
200            error!(
201                "Operation failed (post_repl_incremental_conflict plugin), {:?}",
202                e
203            );
204            e
205        })?;
206
207        // Plugins need these unzipped
208        //
209        let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
210            .into_iter()
211            // We previously excluded this to avoid doing unnecessary work on entries that
212            // were moving to a conflict state, and the survivor was staying "as is" on this
213            // node. However, this gets messy with dyngroups and memberof, where on a conflict
214            // the memberships are deleted across the replication boundary. In these cases
215            // we need dyngroups to see the valid entries, even if they are "identical to before"
216            // to re-assert all their memberships are valid.
217            /*
218            .filter(|(cand, _)| {
219                // Exclude anything that is conflicted as a result of the conflict plugins.
220                !conflict_uuids.contains(&cand.get_uuid())
221            })
222            */
223            .unzip();
224
225        // We don't need to process conflict_creates here, since they are all conflicting
226        // uuids which means that the conflict_uuids are all *here* so they will trigger anything
227        // that requires processing anyway. As well conflict_creates may not be the full
228        // set of conflict entries as we may not be the origin node! Conflict_creates is always
229        // a subset of the conflicts.
230        Plugins::run_post_repl_incremental(
231            self,
232            pre_cand.as_slice(),
233            cand.as_slice(),
234            &conflict_uuids,
235        )
236        .map_err(|e| {
237            error!("Operation failed (post_repl_incremental plugin), {:?}", e);
238            e
239        })?;
240
241        self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));
242
243        if !self.changed_flags.contains(ChangeFlag::ACP)
244            && cand
245                .iter()
246                .chain(pre_cand.iter().map(|e| e.as_ref()))
247                .any(|e| {
248                    e.attribute_equality(Attribute::Class, &EntryClass::AccessControlProfile.into())
249                })
250        {
251            self.changed_flags.insert(ChangeFlag::ACP)
252        }
253
254        if !self.changed_flags.contains(ChangeFlag::OAUTH2)
255            && cand
256                .iter()
257                .chain(pre_cand.iter().map(|e| e.as_ref()))
258                .any(|e| {
259                    e.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into())
260                })
261        {
262            self.changed_flags.insert(ChangeFlag::OAUTH2)
263        }
264
265        if !self.changed_flags.contains(ChangeFlag::APPLICATION)
266            && cand
267                .iter()
268                .chain(pre_cand.iter().map(|e| e.as_ref()))
269                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Application.into()))
270        {
271            self.changed_flags.insert(ChangeFlag::APPLICATION)
272        }
273
274        if !self.changed_flags.contains(ChangeFlag::SYNC_AGREEMENT)
275            && cand
276                .iter()
277                .chain(pre_cand.iter().map(|e| e.as_ref()))
278                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::SyncAccount.into()))
279        {
280            self.changed_flags.insert(ChangeFlag::SYNC_AGREEMENT)
281        }
282
283        if !self.changed_flags.contains(ChangeFlag::KEY_MATERIAL)
284            && cand
285                .iter()
286                .chain(pre_cand.iter().map(|e| e.as_ref()))
287                .any(|e| {
288                    e.attribute_equality(Attribute::Class, &EntryClass::KeyProvider.into())
289                        || e.attribute_equality(Attribute::Class, &EntryClass::KeyObject.into())
290                })
291        {
292            self.changed_flags.insert(ChangeFlag::KEY_MATERIAL)
293        }
294
295        trace!(
296            changed = ?self.changed_flags.iter_names().collect::<Vec<_>>(),
297        );
298
299        Ok(true)
300    }
301
302    pub fn consumer_apply_changes(
303        &mut self,
304        ctx: ReplIncrementalContext,
305    ) -> Result<ConsumerState, OperationError> {
306        match ctx {
307            ReplIncrementalContext::DomainMismatch => {
308                error!("Unable to proceed with consumer incremental - the supplier has indicated that our domain_uuid's are not equivalent. This can occur when adding a new consumer to an existing topology.");
309                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
310                Ok(ConsumerState::RefreshRequired)
311            }
312            ReplIncrementalContext::NoChangesAvailable => {
313                info!("no changes are available");
314                Ok(ConsumerState::Ok)
315            }
316            ReplIncrementalContext::RefreshRequired => {
317                error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
318                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
319                Ok(ConsumerState::RefreshRequired)
320            }
321            ReplIncrementalContext::UnwillingToSupply => {
322                warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
323                error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
324                Ok(ConsumerState::Ok)
325            }
326            ReplIncrementalContext::V1 {
327                domain_version,
328                domain_patch_level,
329                domain_uuid,
330                ranges,
331                schema_entries,
332                meta_entries,
333                entries,
334            } => self.consumer_apply_changes_v1(
335                domain_version,
336                domain_patch_level,
337                domain_uuid,
338                ranges,
339                schema_entries,
340                meta_entries,
341                entries,
342            ),
343        }
344    }
345
346    #[instrument(level = "debug", skip_all)]
347    fn consumer_apply_changes_v1(
348        &mut self,
349        ctx_domain_version: DomainVersion,
350        ctx_domain_patch_level: u32,
351        ctx_domain_uuid: Uuid,
352        ctx_ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
353        ctx_schema_entries: Vec<ReplIncrementalEntryV1>,
354        ctx_meta_entries: Vec<ReplIncrementalEntryV1>,
355        ctx_entries: Vec<ReplIncrementalEntryV1>,
356    ) -> Result<ConsumerState, OperationError> {
357        if ctx_domain_version < DOMAIN_MIN_LEVEL {
358            error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
359            return Err(OperationError::ReplDomainLevelUnsatisfiable);
360        } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
361            error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
362            return Err(OperationError::ReplDomainLevelUnsatisfiable);
363        };
364
365        let domain_patch_level = if self.get_domain_development_taint() {
366            u32::MAX
367        } else {
368            self.get_domain_patch_level()
369        };
370
371        if ctx_domain_patch_level != domain_patch_level {
372            error!("Unable to proceed with consumer incremental - incoming domain patch level is not equal to our patch level. {} != {}", ctx_domain_patch_level, domain_patch_level);
373            return Err(OperationError::ReplDomainLevelUnsatisfiable);
374        };
375
376        // Assert that the d_uuid matches the repl domain uuid.
377        let db_uuid = self.be_txn.get_db_d_uuid()?;
378
379        if db_uuid != ctx_domain_uuid {
380            error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
381            return Err(OperationError::ReplDomainUuidMismatch);
382        }
383
384        // Preflight checks of the incoming RUV to ensure it's in a good state.
385        let txn_cid = self.get_cid().clone();
386        let ruv = self.be_txn.get_ruv_write();
387
388        ruv.incremental_preflight_validate_ruv(&ctx_ranges, &txn_cid)
389            .inspect_err(|err| {
390                error!(
391                    ?err,
392                    "Incoming RUV failed preflight checks, unable to proceed."
393                );
394            })?;
395
396        // == ⚠️  Below this point we begin to make changes! ==
397        debug!(
398            "Proceeding to apply incremental from domain {:?} at level {}",
399            ctx_domain_uuid, ctx_domain_version
400        );
401
402        debug!(?ctx_ranges);
403
404        debug!("Applying schema entries");
405        // Apply the schema entries first.
406        let schema_changed = self
407            .consumer_incremental_apply_entries(ctx_schema_entries)
408            .inspect_err(|err| {
409                error!(?err, "Failed to apply incremental schema entries");
410            })?;
411
412        if schema_changed {
413            // We need to reload schema now!
414            self.reload_schema().inspect_err(|err| {
415                error!(?err, "Failed to reload schema");
416            })?;
417        }
418
419        debug!("Applying meta entries");
420        // Apply meta entries now.
421        let meta_changed = self
422            .consumer_incremental_apply_entries(ctx_meta_entries)
423            .inspect_err(|err| {
424                error!(?err, "Failed to apply incremental meta entries");
425            })?;
426
427        // This is re-loaded in case the domain name changed on the remote
428        if meta_changed {
429            self.reload_domain_info().inspect_err(|err| {
430                error!(?err, "Failed to reload domain info");
431            })?;
432            self.reload_system_config().inspect_err(|err| {
433                error!(?err, "Failed to reload system configuration");
434            })?;
435        }
436
437        debug!("Applying all context entries");
438        // Update all other entries now.
439        self.consumer_incremental_apply_entries(ctx_entries)
440            .inspect_err(|err| {
441                error!(?err, "Failed to apply incremental meta entries");
442            })?;
443
444        // Reload the domain version, doing any needed migrations.
445        //
446        // While it seems odd that we do the migrations after we receive the entries,
447        // this is because the supplier will already be sending us everything that
448        // was just migrated. As a result, we only need to apply the migrations to entries
449        // that were not on the supplier, and therefore need updates here.
450        if meta_changed {
451            self.reload_domain_info_version().inspect_err(|err| {
452                error!(?err, "Failed to reload domain info version");
453            })?;
454        }
455
456        // Finally, confirm that the ranges that we have added match the ranges from our
457        // context. Note that we get this in a writeable form!
458        let ruv = self.be_txn.get_ruv_write();
459
460        ruv.refresh_validate_ruv(&ctx_ranges).inspect_err(|err| {
461            error!(?err, "RUV ranges were not rebuilt correctly.");
462        })?;
463
464        ruv.refresh_update_ruv(&ctx_ranges).inspect_err(|err| {
465            error!(?err, "Unable to update RUV with supplier ranges.");
466        })?;
467
468        Ok(ConsumerState::Ok)
469    }
470
471    pub fn consumer_apply_refresh(
472        &mut self,
473        ctx: ReplRefreshContext,
474    ) -> Result<(), OperationError> {
475        match ctx {
476            ReplRefreshContext::V1 {
477                domain_version,
478                domain_devel,
479                domain_uuid,
480                ranges,
481                schema_entries,
482                meta_entries,
483                entries,
484            } => self.consumer_apply_refresh_v1(
485                domain_version,
486                domain_devel,
487                domain_uuid,
488                ranges,
489                schema_entries,
490                meta_entries,
491                entries,
492            ),
493        }
494    }
495
496    fn consumer_refresh_create_entries(
497        &mut self,
498        ctx_entries: Vec<ReplEntryV1>,
499    ) -> Result<(), OperationError> {
500        let candidates = ctx_entries
501            .into_iter()
502            .map(EntryRefreshNew::from_repl_entry_v1)
503            .collect::<Result<Vec<EntryRefreshNew>, _>>()
504            .inspect_err(|err| {
505                error!(?err, "Failed to convert entries from supplier");
506            })?;
507
508        Plugins::run_pre_repl_refresh(self, candidates.as_slice()).map_err(|e| {
509            admin_error!(
510                "Refresh operation failed (pre_repl_refresh plugin), {:?}",
511                e
512            );
513            e
514        })?;
515
516        // No need to assign CID's since this is a repl import.
517        let norm_cand = candidates
518            .into_iter()
519            .map(|e| {
520                e.validate(&self.schema)
521                    .map_err(|e| {
522                        admin_error!("Schema Violation in refresh validate {:?}", e);
523                        OperationError::SchemaViolation(e)
524                    })
525                    .map(|e| {
526                        // Then seal the changes?
527                        e.seal(&self.schema)
528                    })
529            })
530            .collect::<Result<Vec<EntrySealedNew>, _>>()?;
531
532        let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
533            admin_error!("betxn create failure {:?}", e);
534            e
535        })?;
536
537        Plugins::run_post_repl_refresh(self, &commit_cand).map_err(|e| {
538            admin_error!(
539                "Refresh operation failed (post_repl_refresh plugin), {:?}",
540                e
541            );
542            e
543        })?;
544
545        self.changed_uuid
546            .extend(commit_cand.iter().map(|e| e.get_uuid()));
547
548        Ok(())
549    }
550
551    #[instrument(level = "info", skip_all)]
552    fn consumer_apply_refresh_v1(
553        &mut self,
554        ctx_domain_version: DomainVersion,
555        ctx_domain_devel: bool,
556        ctx_domain_uuid: Uuid,
557        ctx_ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
558        ctx_schema_entries: Vec<ReplEntryV1>,
559        ctx_meta_entries: Vec<ReplEntryV1>,
560        ctx_entries: Vec<ReplEntryV1>,
561    ) -> Result<(), OperationError> {
562        // Can we apply the domain version validly?
563        // if domain_version >= min_support ...
564        let current_devel_flag = option_env!("KANIDM_PRE_RELEASE").is_some();
565
566        if ctx_domain_version < DOMAIN_MIN_LEVEL {
567            error!("Unable to proceed with consumer refresh - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
568            return Err(OperationError::ReplDomainLevelUnsatisfiable);
569        } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
570            error!("Unable to proceed with consumer refresh - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
571            return Err(OperationError::ReplDomainLevelUnsatisfiable);
572        } else if ctx_domain_devel && !current_devel_flag {
573            error!("Unable to proceed with consumer refresh - incoming domain is from a development version while this server is a stable release.");
574            return Err(OperationError::ReplDomainLevelUnsatisfiable);
575        } else if !ctx_domain_devel && current_devel_flag {
576            error!("Unable to proceed with consumer refresh - incoming domain is from a stable version while this server is a development release.");
577            return Err(OperationError::ReplDomainLevelUnsatisfiable);
578        } else {
579            debug!(
580                "Proceeding to refresh from domain at level {}",
581                ctx_domain_version
582            );
583        };
584
585        // == ⚠️  Below this point we begin to make changes! ==
586        self.set_phase_bootstrap();
587
588        // Update the d_uuid. This is what defines us as being part of this repl topology!
589        self.be_txn
590            .set_db_d_uuid(ctx_domain_uuid)
591            .inspect_err(|err| {
592                error!(?err, "Failed to reset domain uuid");
593            })?;
594
595        // We need to reset our server uuid now. This is so that any other servers
596        // which had our former server_uuid in their RUV, is able to start to age it
597        // out and trim it.
598        self.reset_server_uuid()?;
599
600        // Delete all entries - *proper delete, not just tombstone!*
601        self.be_txn
602            .danger_delete_all_db_content()
603            .inspect_err(|err| {
604                error!(?err, "Failed to clear existing server database content");
605            })?;
606
607        // Reset this transactions schema to a completely clean slate.
608        self.schema.generate_in_memory().inspect_err(|err| {
609            error!(?err, "Failed to reset in memory schema to clean state");
610        })?;
611
612        // Reindex now to force some basic indexes to exist as we consume the schema
613        // from our replica.
614        self.reindex(false).inspect_err(|err| {
615            error!(?err, "Failed to reload schema");
616        })?;
617
618        // Apply the schema entries first. This is the foundation that everything
619        // else will build upon!
620        self.consumer_refresh_create_entries(ctx_schema_entries)
621            .inspect_err(|err| {
622                error!(?err, "Failed to refresh schema entries");
623            })?;
624
625        // We need to reload schema now!
626        self.reload_schema().inspect_err(|err| {
627            error!(?err, "Failed to reload schema");
628        })?;
629
630        // Schema is now ready
631        self.set_phase(ServerPhase::SchemaReady);
632
633        // We have to reindex to force all the existing indexes to be dumped
634        // and recreated before we start to import.
635        self.reindex(false).inspect_err(|err| {
636            error!(?err, "Failed to reload schema");
637        })?;
638
639        // Apply the domain info entry / system info / system config entry?
640        self.consumer_refresh_create_entries(ctx_meta_entries)
641            .inspect_err(|err| {
642                error!(?err, "Failed to refresh meta entries");
643            })?;
644
645        // NOTE: The domain info we receive here will have the domain version populated!
646        // That's okay though, because all the incoming data is already at the right
647        // version!
648        self.reload_domain_info().inspect_err(|err| {
649            error!(?err, "Failed to reload domain info");
650        })?;
651
652        // Mark that everything changed so that post commit hooks function as expected.
653        self.changed_flags.insert(
654            ChangeFlag::SCHEMA
655                | ChangeFlag::ACP
656                | ChangeFlag::OAUTH2
657                | ChangeFlag::DOMAIN
658                | ChangeFlag::APPLICATION
659                | ChangeFlag::SYSTEM_CONFIG
660                | ChangeFlag::SYNC_AGREEMENT
661                | ChangeFlag::KEY_MATERIAL,
662        );
663
664        // Domain info is now ready.
665        self.set_phase(ServerPhase::DomainInfoReady);
666
667        // ==== That's it! We are GOOD to go! ====
668
669        // Create all the entries. Note we don't hit plugins here beside post repl plugs.
670        self.consumer_refresh_create_entries(ctx_entries)
671            .inspect_err(|err| {
672                error!(?err, "Failed to refresh schema entries");
673            })?;
674
675        // Finally, confirm that the ranges that we have recreated match the ranges from our
676        // context. Note that we get this in a writeable form!
677        let ruv = self.be_txn.get_ruv_write();
678
679        ruv.refresh_validate_ruv(&ctx_ranges).inspect_err(|err| {
680            error!(?err, "RUV ranges were not rebuilt correctly.");
681        })?;
682
683        ruv.refresh_update_ruv(&ctx_ranges).inspect_err(|err| {
684            error!(?err, "Unable to update RUV with supplier ranges.");
685        })?;
686
687        // Refresh complete
688        self.set_phase(ServerPhase::Running);
689
690        Ok(())
691    }
692}