kanidmd_lib/repl/
consumer.rs

1use super::proto::*;
2use crate::plugins::Plugins;
3use crate::prelude::*;
4use crate::server::{ChangeFlag, ServerPhase};
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::Arc;
7
8impl QueryServerWriteTransaction<'_> {
9    // Apply the state changes if they are valid.
10
11    fn consumer_incremental_apply_entries(
12        &mut self,
13        ctx_entries: Vec<ReplIncrementalEntryV1>,
14    ) -> Result<bool, OperationError> {
15        // trace!(?ctx_entries);
16
17        // No action needed for this if the entries are empty.
18        if ctx_entries.is_empty() {
19            debug!("No entries to act upon");
20            return Ok(false);
21        }
22
23        /*
24         *  Incremental is very similar to modify in how we have to treat the entries
25         *  with a pre and post state. However we need an incremental prepare so that
26         *  when new entries are provided to us we can merge to a stub and then commit
27         *  it correctly. This takes an extra backend interface that prepares the
28         *  entry stubs for us.
29         */
30
31        // I think we need to rehydrate all the repl content to a partial
32        // entry. This way all the types are consistent and ready.
33        let ctx_entries: Vec<_> = ctx_entries.into_iter().map(
34            EntryIncrementalNew::rehydrate
35        )
36        .collect::<Result<Vec<_>, _>>()
37        .inspect_err(|err| {
38            error!(?err, "Unable to process replication incremental entries to valid entry states for replication");
39        })?;
40
41        trace!(?ctx_entries);
42
43        let db_entries = self
44            .be_txn
45            .incremental_prepare(&ctx_entries)
46            .inspect_err(|err| {
47                error!(?err, "Failed to access entries from db");
48            })?;
49
50        trace!(?db_entries);
51
52        // Need to probably handle conflicts here in this phase. I think they
53        // need to be pushed to a separate list where they are then "created"
54        // as a conflict.
55
56        // First find if entries are in a conflict state. Remember, these conflicts are purely
57        // UUID creation conflicts at this phase in the process.
58
59        let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
60            .iter()
61            .zip(db_entries)
62            .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
63
64        debug!(conflicts = %conflicts.len(), proceed = %proceed.len());
65
66        // Now we have a set of conflicts and a set of entries to proceed.
67        //
68        //    /- entries that need to be created as conflicts.
69        //    |                /- entries that survive and need update to the db in place.
70        //    v                v
71        let (conflict_create, conflict_update): (
72            Vec<Option<EntrySealedNew>>,
73            Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
74        ) = conflicts
75            .into_iter()
76            .map(
77                |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
78                    let (opt_create, ent) =
79                        ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
80                    (opt_create, (ent, db_ent))
81                },
82            )
83            .unzip();
84
85        // ⚠️  If we end up with plugins triggering other entries to conflicts, we DON'T need to
86        // add them to this list. This is just for uuid conflicts, not higher level ones!
87        //
88        // ⚠️  We need to collect this from conflict_update since we may NOT be the originator
89        // server for some conflicts, but we still need to know the UUID is IN the conflict
90        // state for plugins. We also need to do this here before the conflict_update
91        // set is consumed by later steps.
92        //
93        // ⚠️  When we upgrade between two nodes, migrations will often create *new* system
94        // entries on both nodes. Until both nodes upgrade they can't replicate. This creates
95        // a situation where both nodes have identical entry content for system entries, but
96        // the entries that were created now are conflicts. Normally this is okay, because the
97        // first node to upgrade will have it's entries persisted, and the other nodes duplicate
98        // entries will be removed. However, just through the nature of being in the conflict
99        // state, these entries are then added to the conflict_uuid set. This conflict_uuid set
100        // is used by referential integrity to remove uuids from references so that group
101        // memberships don't accidentally leak to recipients that were not intended.
102        //
103        // To avoid this, we remove any system entries from this conflict set, so that they are
104        // exempt from this conflict handling which allows upgrades to work.
105        let mut conflict_uuids: BTreeSet<_> = conflict_update
106            .iter()
107            .filter_map(|(_, e)| {
108                let u = e.get_uuid();
109                if u >= DYNAMIC_RANGE_MINIMUM_UUID {
110                    // It is a user created node, process the conflict within plugins
111                    Some(u)
112                } else {
113                    // It is in a system range, do not process this entry
114                    None
115                }
116            })
117            .collect();
118
119        // Filter out None from conflict_create
120        let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
121
122        let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
123            .into_iter()
124            .map(|(ctx_ent, db_ent)| {
125                // This now is the set of entries that are able to be updated. Merge
126                // their attribute sets/states per the change state rules.
127
128                // This must create an EntryInvalidCommitted
129                let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema, self.trim_cid());
130                (merge_ent, db_ent)
131            })
132            .collect();
133
134        // We now merge the conflict updates and the updates that can proceed. This is correct
135        // since if an entry was conflicting by uuid then there is nothing for it to merge with
136        // so as a result we can just by pass that step. We now have all_updates which is
137        // the set of live entries to write back.
138        let mut all_updates = conflict_update
139            .into_iter()
140            .chain(proceed_update)
141            .collect::<Vec<_>>();
142
143        // ⚠️  This hook is probably not what you want to use for checking entries are consistent.
144        //
145        // The main issue is that at this point we have a set of entries that need to be
146        // created / marked into conflicts, and until that occurs it's hard to proceed with validations
147        // like attr unique because then we would need to walk the various sets to find cases where
148        // an attribute may not be unique "currently" but *would* be unique once the various entries
149        // have then been conflicted and updated.
150        //
151        // Instead we treat this like refint - we allow the database to "temporarily" become
152        // inconsistent, then we fix it immediately. This hook remains for cases in future
153        // where we may wish to have session cleanup performed for example.
154        Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
155            admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
156            e
157        })?;
158
159        // Now we have to schema check our entries. Remember, here because this is
160        // using into_iter it's possible that entries may be conflicted due to becoming
161        // schema invalid during the merge process.
162        let all_updates_valid = all_updates
163            .into_iter()
164            .map(|(ctx_ent, db_ent)| {
165                // Check the schema
166                //
167                // In these cases when an entry fails schema, we mark it to
168                // a conflict state and then retain it in the update process.
169                //
170                // The marking is done INSIDE this function!
171                let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
172                (sealed_ent, db_ent)
173            })
174            .collect::<Vec<_>>();
175
176        // We now have two sets!
177        //
178        // * conflict_create - entries to be created that are conflicted via add statements (duplicate uuid)
179        //                     these are only created on the entry origin node!
180        // * all_updates_valid - this has two types of entries
181        //   * entries that have survived a uuid conflict and need inplace write. Unlikely to become invalid.
182        //   * entries that were merged and are schema valid.
183        //   * entries that were merged and their attribute state has now become invalid and are conflicts.
184        //
185        // incremental_apply here handles both the creations and the update processes to ensure that
186        // everything is updated in a single consistent operation.
187        self.be_txn
188            .incremental_apply(&all_updates_valid, conflict_create)
189            .map_err(|e| {
190                admin_error!("betxn create failure {:?}", e);
191                e
192            })?;
193
194        Plugins::run_post_repl_incremental_conflict(
195            self,
196            all_updates_valid.as_slice(),
197            &mut conflict_uuids,
198        )
199        .map_err(|e| {
200            error!(
201                "Operation failed (post_repl_incremental_conflict plugin), {:?}",
202                e
203            );
204            e
205        })?;
206
207        // Plugins need these unzipped
208        //
209        let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
210            .into_iter()
211            // We previously excluded this to avoid doing unnecessary work on entries that
212            // were moving to a conflict state, and the survivor was staying "as is" on this
213            // node. However, this gets messy with dyngroups and memberof, where on a conflict
214            // the memberships are deleted across the replication boundary. In these cases
215            // we need dyngroups to see the valid entries, even if they are "identical to before"
216            // to re-assert all their memberships are valid.
217            /*
218            .filter(|(cand, _)| {
219                // Exclude anything that is conflicted as a result of the conflict plugins.
220                !conflict_uuids.contains(&cand.get_uuid())
221            })
222            */
223            .unzip();
224
225        // We don't need to process conflict_creates here, since they are all conflicting
226        // uuids which means that the conflict_uuids are all *here* so they will trigger anything
227        // that requires processing anyway. As well conflict_creates may not be the full
228        // set of conflict entries as we may not be the origin node! Conflict_creates is always
229        // a subset of the conflicts.
230        Plugins::run_post_repl_incremental(
231            self,
232            pre_cand.as_slice(),
233            cand.as_slice(),
234            &conflict_uuids,
235        )
236        .map_err(|e| {
237            error!("Operation failed (post_repl_incremental plugin), {:?}", e);
238            e
239        })?;
240
241        self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));
242
243        if !self.changed_flags.contains(ChangeFlag::ACP)
244            && cand
245                .iter()
246                .chain(pre_cand.iter().map(|e| e.as_ref()))
247                .any(|e| {
248                    e.attribute_equality(Attribute::Class, &EntryClass::AccessControlProfile.into())
249                })
250        {
251            self.changed_flags.insert(ChangeFlag::ACP)
252        }
253
254        if !self.changed_flags.contains(ChangeFlag::OAUTH2)
255            && cand
256                .iter()
257                .chain(pre_cand.iter().map(|e| e.as_ref()))
258                .any(|e| {
259                    e.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into())
260                })
261        {
262            self.changed_flags.insert(ChangeFlag::OAUTH2)
263        }
264
265        if !self.changed_flags.contains(ChangeFlag::OAUTH2_CLIENT)
266            && cand
267                .iter()
268                .chain(pre_cand.iter().map(|e| e.as_ref()))
269                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::OAuth2Client.into()))
270        {
271            self.changed_flags.insert(ChangeFlag::OAUTH2_CLIENT)
272        }
273
274        if !self.changed_flags.contains(ChangeFlag::FEATURE)
275            && cand
276                .iter()
277                .chain(pre_cand.iter().map(|e| e.as_ref()))
278                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Feature.into()))
279        {
280            self.changed_flags.insert(ChangeFlag::FEATURE)
281        }
282
283        if !self.changed_flags.contains(ChangeFlag::APPLICATION)
284            && cand
285                .iter()
286                .chain(pre_cand.iter().map(|e| e.as_ref()))
287                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Application.into()))
288        {
289            self.changed_flags.insert(ChangeFlag::APPLICATION)
290        }
291
292        if !self.changed_flags.contains(ChangeFlag::SYNC_AGREEMENT)
293            && cand
294                .iter()
295                .chain(pre_cand.iter().map(|e| e.as_ref()))
296                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::SyncAccount.into()))
297        {
298            self.changed_flags.insert(ChangeFlag::SYNC_AGREEMENT)
299        }
300
301        if !self.changed_flags.contains(ChangeFlag::KEY_MATERIAL)
302            && cand
303                .iter()
304                .chain(pre_cand.iter().map(|e| e.as_ref()))
305                .any(|e| {
306                    e.attribute_equality(Attribute::Class, &EntryClass::KeyProvider.into())
307                        || e.attribute_equality(Attribute::Class, &EntryClass::KeyObject.into())
308                })
309        {
310            self.changed_flags.insert(ChangeFlag::KEY_MATERIAL)
311        }
312
313        trace!(
314            changed = ?self.changed_flags.iter_names().collect::<Vec<_>>(),
315        );
316
317        Ok(true)
318    }
319
320    pub fn consumer_apply_changes(
321        &mut self,
322        ctx: ReplIncrementalContext,
323    ) -> Result<ConsumerState, OperationError> {
324        match ctx {
325            ReplIncrementalContext::DomainMismatch => {
326                error!("Unable to proceed with consumer incremental - the supplier has indicated that our domain_uuid's are not equivalent. This can occur when adding a new consumer to an existing topology.");
327                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
328                Ok(ConsumerState::RefreshRequired)
329            }
330            ReplIncrementalContext::NoChangesAvailable => {
331                debug!("no changes are available");
332                Ok(ConsumerState::Ok)
333            }
334            ReplIncrementalContext::RefreshRequired => {
335                error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
336                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
337                Ok(ConsumerState::RefreshRequired)
338            }
339            ReplIncrementalContext::UnwillingToSupply => {
340                warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
341                error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
342                Ok(ConsumerState::Ok)
343            }
344            ReplIncrementalContext::V1 {
345                domain_version,
346                domain_patch_level,
347                domain_uuid,
348                ranges,
349                schema_entries,
350                meta_entries,
351                entries,
352            } => self.consumer_apply_changes_v1(
353                domain_version,
354                domain_patch_level,
355                domain_uuid,
356                &ranges,
357                schema_entries,
358                meta_entries,
359                entries,
360            ),
361        }
362    }
363
364    #[instrument(level = "debug", skip_all)]
365    fn consumer_apply_changes_v1(
366        &mut self,
367        ctx_domain_version: DomainVersion,
368        ctx_domain_patch_level: u32,
369        ctx_domain_uuid: Uuid,
370        ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
371        ctx_schema_entries: Vec<ReplIncrementalEntryV1>,
372        ctx_meta_entries: Vec<ReplIncrementalEntryV1>,
373        ctx_entries: Vec<ReplIncrementalEntryV1>,
374    ) -> Result<ConsumerState, OperationError> {
375        if ctx_domain_version < DOMAIN_MIN_LEVEL {
376            error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
377            return Err(OperationError::ReplDomainLevelUnsatisfiable);
378        } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
379            error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
380            return Err(OperationError::ReplDomainLevelUnsatisfiable);
381        };
382
383        let domain_patch_level = if self.get_domain_development_taint() {
384            u32::MAX
385        } else {
386            self.get_domain_patch_level()
387        };
388
389        if ctx_domain_patch_level != domain_patch_level {
390            error!("Unable to proceed with consumer incremental - incoming domain patch level is not equal to our patch level. {} != {}", ctx_domain_patch_level, domain_patch_level);
391            return Err(OperationError::ReplDomainLevelUnsatisfiable);
392        };
393
394        // Assert that the d_uuid matches the repl domain uuid.
395        let db_uuid = self.be_txn.get_db_d_uuid()?;
396
397        if db_uuid != ctx_domain_uuid {
398            error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
399            return Err(OperationError::ReplDomainUuidMismatch);
400        }
401
402        // Preflight checks of the incoming RUV to ensure it's in a good state.
403        let txn_cid = self.get_cid().clone();
404        let ruv = self.be_txn.get_ruv_write();
405
406        let change_count = ctx_schema_entries.len() + ctx_meta_entries.len() + ctx_entries.len();
407
408        ruv.incremental_preflight_validate_ruv(ctx_ranges, &txn_cid)
409            .inspect_err(|err| {
410                error!(
411                    ?err,
412                    "Incoming RUV failed preflight checks, unable to proceed."
413                );
414            })?;
415
416        // == ⚠️  Below this point we begin to make changes! ==
417        debug!(
418            "Proceeding to apply incremental with {change_count} changes from domain {ctx_domain_uuid:?} at level {ctx_domain_version}"
419        );
420
421        debug!(?ctx_ranges);
422
423        debug!("Applying {} schema entries", ctx_schema_entries.len());
424        // Apply the schema entries first.
425        let schema_changed = self
426            .consumer_incremental_apply_entries(ctx_schema_entries)
427            .inspect_err(|err| {
428                error!(?err, "Failed to apply incremental schema entries");
429            })?;
430
431        if schema_changed {
432            // We need to reload schema now!
433            self.reload_schema().inspect_err(|err| {
434                error!(?err, "Failed to reload schema");
435            })?;
436        }
437
438        debug!("Applying {} meta entries", ctx_meta_entries.len());
439        // Apply meta entries now.
440        let meta_changed = self
441            .consumer_incremental_apply_entries(ctx_meta_entries)
442            .inspect_err(|err| {
443                error!(?err, "Failed to apply incremental meta entries");
444            })?;
445
446        // This is re-loaded in case the domain name changed on the remote
447        if meta_changed {
448            self.reload_domain_info().inspect_err(|err| {
449                error!(?err, "Failed to reload domain info");
450            })?;
451            self.reload_system_config().inspect_err(|err| {
452                error!(?err, "Failed to reload system configuration");
453            })?;
454        }
455
456        debug!("Applying {} context entries", ctx_entries.len());
457        // Update all other entries now.
458        self.consumer_incremental_apply_entries(ctx_entries)
459            .inspect_err(|err| {
460                error!(?err, "Failed to apply incremental meta entries");
461            })?;
462
463        // Reload the domain version, doing any needed migrations.
464        //
465        // While it seems odd that we do the migrations after we receive the entries,
466        // this is because the supplier will already be sending us everything that
467        // was just migrated. As a result, we only need to apply the migrations to entries
468        // that were not on the supplier, and therefore need updates here.
469        if meta_changed {
470            self.reload_domain_info_version().inspect_err(|err| {
471                error!(?err, "Failed to reload domain info version");
472            })?;
473        }
474
475        // Finally, confirm that the ranges that we have added match the ranges from our
476        // context. Note that we get this in a writeable form!
477        let ruv = self.be_txn.get_ruv_write();
478
479        ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
480            error!(?err, "RUV ranges were not rebuilt correctly.");
481        })?;
482
483        ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
484            error!(?err, "Unable to update RUV with supplier ranges.");
485        })?;
486
487        Ok(ConsumerState::Ok)
488    }
489
490    pub fn consumer_apply_refresh(
491        &mut self,
492        ctx: ReplRefreshContext,
493    ) -> Result<(), OperationError> {
494        match ctx {
495            ReplRefreshContext::V1 {
496                domain_version,
497                domain_devel,
498                domain_uuid,
499                ranges,
500                schema_entries,
501                meta_entries,
502                entries,
503            } => self.consumer_apply_refresh_v1(
504                domain_version,
505                domain_devel,
506                domain_uuid,
507                &ranges,
508                schema_entries,
509                meta_entries,
510                entries,
511            ),
512        }
513    }
514
515    fn consumer_refresh_create_entries(
516        &mut self,
517        ctx_entries: Vec<ReplEntryV1>,
518    ) -> Result<(), OperationError> {
519        let candidates = ctx_entries
520            .into_iter()
521            .map(EntryRefreshNew::from_repl_entry_v1)
522            .collect::<Result<Vec<EntryRefreshNew>, _>>()
523            .inspect_err(|err| {
524                error!(?err, "Failed to convert entries from supplier");
525            })?;
526
527        Plugins::run_pre_repl_refresh(self, candidates.as_slice()).map_err(|e| {
528            admin_error!(
529                "Refresh operation failed (pre_repl_refresh plugin), {:?}",
530                e
531            );
532            e
533        })?;
534
535        // No need to assign CID's since this is a repl import.
536        let norm_cand = candidates
537            .into_iter()
538            .map(|e| {
539                e.validate(&self.schema)
540                    .map_err(|e| {
541                        admin_error!("Schema Violation in refresh validate {:?}", e);
542                        OperationError::SchemaViolation(e)
543                    })
544                    .map(|e| {
545                        // Then seal the changes?
546                        e.seal(&self.schema)
547                    })
548            })
549            .collect::<Result<Vec<EntrySealedNew>, _>>()?;
550
551        let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
552            admin_error!("betxn create failure {:?}", e);
553            e
554        })?;
555
556        Plugins::run_post_repl_refresh(self, &commit_cand).map_err(|e| {
557            admin_error!(
558                "Refresh operation failed (post_repl_refresh plugin), {:?}",
559                e
560            );
561            e
562        })?;
563
564        self.changed_uuid
565            .extend(commit_cand.iter().map(|e| e.get_uuid()));
566
567        Ok(())
568    }
569
570    #[instrument(level = "info", skip_all)]
571    fn consumer_apply_refresh_v1(
572        &mut self,
573        ctx_domain_version: DomainVersion,
574        ctx_domain_devel: bool,
575        ctx_domain_uuid: Uuid,
576        ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
577        ctx_schema_entries: Vec<ReplEntryV1>,
578        ctx_meta_entries: Vec<ReplEntryV1>,
579        ctx_entries: Vec<ReplEntryV1>,
580    ) -> Result<(), OperationError> {
581        // Can we apply the domain version validly?
582        // if domain_version >= min_support ...
583        let current_devel_flag = option_env!("KANIDM_PRE_RELEASE").is_some();
584
585        if ctx_domain_version < DOMAIN_MIN_LEVEL {
586            error!("Unable to proceed with consumer refresh - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
587            return Err(OperationError::ReplDomainLevelUnsatisfiable);
588        } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
589            error!("Unable to proceed with consumer refresh - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
590            return Err(OperationError::ReplDomainLevelUnsatisfiable);
591        } else if ctx_domain_devel && !current_devel_flag {
592            error!("Unable to proceed with consumer refresh - incoming domain is from a development version while this server is a stable release.");
593            return Err(OperationError::ReplDomainLevelUnsatisfiable);
594        } else if !ctx_domain_devel && current_devel_flag {
595            error!("Unable to proceed with consumer refresh - incoming domain is from a stable version while this server is a development release.");
596            return Err(OperationError::ReplDomainLevelUnsatisfiable);
597        } else {
598            debug!(
599                "Proceeding to refresh from domain at level {}",
600                ctx_domain_version
601            );
602        };
603
604        // == ⚠️  Below this point we begin to make changes! ==
605        self.set_phase_bootstrap();
606
607        // Update the d_uuid. This is what defines us as being part of this repl topology!
608        self.be_txn
609            .set_db_d_uuid(ctx_domain_uuid)
610            .inspect_err(|err| {
611                error!(?err, "Failed to reset domain uuid");
612            })?;
613
614        // We need to reset our server uuid now. This is so that any other servers
615        // which had our former server_uuid in their RUV, is able to start to age it
616        // out and trim it.
617        self.reset_server_uuid()?;
618
619        // Delete all entries - *proper delete, not just tombstone!*
620        self.be_txn
621            .danger_delete_all_db_content()
622            .inspect_err(|err| {
623                error!(?err, "Failed to clear existing server database content");
624            })?;
625
626        // Reset this transactions schema to a completely clean slate.
627        self.schema.generate_in_memory().inspect_err(|err| {
628            error!(?err, "Failed to reset in memory schema to clean state");
629        })?;
630
631        // Reindex now to force some basic indexes to exist as we consume the schema
632        // from our replica.
633        self.reindex(false).inspect_err(|err| {
634            error!(?err, "Failed to reload schema");
635        })?;
636
637        // Apply the schema entries first. This is the foundation that everything
638        // else will build upon!
639        self.consumer_refresh_create_entries(ctx_schema_entries)
640            .inspect_err(|err| {
641                error!(?err, "Failed to refresh schema entries");
642            })?;
643
644        // We need to reload schema now!
645        self.reload_schema().inspect_err(|err| {
646            error!(?err, "Failed to reload schema");
647        })?;
648
649        // Schema is now ready
650        self.set_phase(ServerPhase::SchemaReady);
651
652        // We have to reindex to force all the existing indexes to be dumped
653        // and recreated before we start to import.
654        self.reindex(false).inspect_err(|err| {
655            error!(?err, "Failed to reload schema");
656        })?;
657
658        // Apply the domain info entry / system info / system config entry?
659        self.consumer_refresh_create_entries(ctx_meta_entries)
660            .inspect_err(|err| {
661                error!(?err, "Failed to refresh meta entries");
662            })?;
663
664        // NOTE: The domain info we receive here will have the domain version populated!
665        // That's okay though, because all the incoming data is already at the right
666        // version!
667        self.reload_domain_info().inspect_err(|err| {
668            error!(?err, "Failed to reload domain info");
669        })?;
670
671        // Mark that everything changed so that post commit hooks function as expected.
672        self.changed_flags.insert(
673            ChangeFlag::SCHEMA
674                | ChangeFlag::ACP
675                | ChangeFlag::OAUTH2
676                | ChangeFlag::OAUTH2_CLIENT
677                | ChangeFlag::FEATURE
678                | ChangeFlag::DOMAIN
679                | ChangeFlag::APPLICATION
680                | ChangeFlag::SYSTEM_CONFIG
681                | ChangeFlag::SYNC_AGREEMENT
682                | ChangeFlag::KEY_MATERIAL,
683        );
684
685        // Domain info is now ready.
686        self.set_phase(ServerPhase::DomainInfoReady);
687
688        // ==== That's it! We are GOOD to go! ====
689
690        // Create all the entries. Note we don't hit plugins here beside post repl plugs.
691        self.consumer_refresh_create_entries(ctx_entries)
692            .inspect_err(|err| {
693                error!(?err, "Failed to refresh schema entries");
694            })?;
695
696        // Finally, confirm that the ranges that we have recreated match the ranges from our
697        // context. Note that we get this in a writeable form!
698        let ruv = self.be_txn.get_ruv_write();
699
700        ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
701            error!(?err, "RUV ranges were not rebuilt correctly.");
702        })?;
703
704        ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
705            error!(?err, "Unable to update RUV with supplier ranges.");
706        })?;
707
708        // Refresh complete
709        self.set_phase(ServerPhase::Running);
710
711        Ok(())
712    }
713}