1use super::proto::*;
2use crate::plugins::Plugins;
3use crate::prelude::*;
4use crate::server::{ChangeFlag, ServerPhase};
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::Arc;
7
8impl QueryServerWriteTransaction<'_> {
9    fn consumer_incremental_apply_entries(
12        &mut self,
13        ctx_entries: Vec<ReplIncrementalEntryV1>,
14    ) -> Result<bool, OperationError> {
15        if ctx_entries.is_empty() {
19            debug!("No entries to act upon");
20            return Ok(false);
21        }
22
23        let ctx_entries: Vec<_> = ctx_entries.into_iter().map(
34            EntryIncrementalNew::rehydrate
35        )
36        .collect::<Result<Vec<_>, _>>()
37        .map_err(|e| {
38            error!(err = ?e, "Unable to process replication incremental entries to valid entry states for replication");
39            e
40        })?;
41
42        trace!(?ctx_entries);
43
44        let db_entries = self
45            .be_txn
46            .incremental_prepare(&ctx_entries)
47            .inspect_err(|err| {
48                error!(?err, "Failed to access entries from db");
49            })?;
50
51        trace!(?db_entries);
52
53        let (conflicts, proceed): (Vec<_>, Vec<_>) = ctx_entries
60            .iter()
61            .zip(db_entries)
62            .partition(|(ctx_ent, db_ent)| ctx_ent.is_add_conflict(db_ent.as_ref()));
63
64        debug!(conflicts = %conflicts.len(), proceed = %proceed.len());
65
66        let (conflict_create, conflict_update): (
72            Vec<Option<EntrySealedNew>>,
73            Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)>,
74        ) = conflicts
75            .into_iter()
76            .map(
77                |(ctx_ent, db_ent): (&EntryIncrementalNew, Arc<EntrySealedCommitted>)| {
78                    let (opt_create, ent) =
79                        ctx_ent.resolve_add_conflict(self.get_cid(), db_ent.as_ref());
80                    (opt_create, (ent, db_ent))
81                },
82            )
83            .unzip();
84
85        let mut conflict_uuids: BTreeSet<_> = conflict_update
106            .iter()
107            .filter_map(|(_, e)| {
108                let u = e.get_uuid();
109                if u >= DYNAMIC_RANGE_MINIMUM_UUID {
110                    Some(u)
112                } else {
113                    None
115                }
116            })
117            .collect();
118
119        let conflict_create: Vec<EntrySealedNew> = conflict_create.into_iter().flatten().collect();
121
122        let proceed_update: Vec<(EntryIncrementalCommitted, Arc<EntrySealedCommitted>)> = proceed
123            .into_iter()
124            .map(|(ctx_ent, db_ent)| {
125                let merge_ent = ctx_ent.merge_state(db_ent.as_ref(), &self.schema, self.trim_cid());
130                (merge_ent, db_ent)
131            })
132            .collect();
133
134        let mut all_updates = conflict_update
139            .into_iter()
140            .chain(proceed_update)
141            .collect::<Vec<_>>();
142
143        Plugins::run_pre_repl_incremental(self, all_updates.as_mut_slice()).map_err(|e| {
155            admin_error!("Operation failed (pre_repl_incremental plugin), {:?}", e);
156            e
157        })?;
158
159        let all_updates_valid = all_updates
163            .into_iter()
164            .map(|(ctx_ent, db_ent)| {
165                let sealed_ent = ctx_ent.validate_repl(&self.schema).seal(&self.schema);
172                (sealed_ent, db_ent)
173            })
174            .collect::<Vec<_>>();
175
176        self.be_txn
188            .incremental_apply(&all_updates_valid, conflict_create)
189            .map_err(|e| {
190                admin_error!("betxn create failure {:?}", e);
191                e
192            })?;
193
194        Plugins::run_post_repl_incremental_conflict(
195            self,
196            all_updates_valid.as_slice(),
197            &mut conflict_uuids,
198        )
199        .map_err(|e| {
200            error!(
201                "Operation failed (post_repl_incremental_conflict plugin), {:?}",
202                e
203            );
204            e
205        })?;
206
207        let (cand, pre_cand): (Vec<_>, Vec<_>) = all_updates_valid
210            .into_iter()
211            .unzip();
224
225        Plugins::run_post_repl_incremental(
231            self,
232            pre_cand.as_slice(),
233            cand.as_slice(),
234            &conflict_uuids,
235        )
236        .map_err(|e| {
237            error!("Operation failed (post_repl_incremental plugin), {:?}", e);
238            e
239        })?;
240
241        self.changed_uuid.extend(cand.iter().map(|e| e.get_uuid()));
242
243        if !self.changed_flags.contains(ChangeFlag::ACP)
244            && cand
245                .iter()
246                .chain(pre_cand.iter().map(|e| e.as_ref()))
247                .any(|e| {
248                    e.attribute_equality(Attribute::Class, &EntryClass::AccessControlProfile.into())
249                })
250        {
251            self.changed_flags.insert(ChangeFlag::ACP)
252        }
253
254        if !self.changed_flags.contains(ChangeFlag::OAUTH2)
255            && cand
256                .iter()
257                .chain(pre_cand.iter().map(|e| e.as_ref()))
258                .any(|e| {
259                    e.attribute_equality(Attribute::Class, &EntryClass::OAuth2ResourceServer.into())
260                })
261        {
262            self.changed_flags.insert(ChangeFlag::OAUTH2)
263        }
264
265        if !self.changed_flags.contains(ChangeFlag::APPLICATION)
266            && cand
267                .iter()
268                .chain(pre_cand.iter().map(|e| e.as_ref()))
269                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::Application.into()))
270        {
271            self.changed_flags.insert(ChangeFlag::APPLICATION)
272        }
273
274        if !self.changed_flags.contains(ChangeFlag::SYNC_AGREEMENT)
275            && cand
276                .iter()
277                .chain(pre_cand.iter().map(|e| e.as_ref()))
278                .any(|e| e.attribute_equality(Attribute::Class, &EntryClass::SyncAccount.into()))
279        {
280            self.changed_flags.insert(ChangeFlag::SYNC_AGREEMENT)
281        }
282
283        if !self.changed_flags.contains(ChangeFlag::KEY_MATERIAL)
284            && cand
285                .iter()
286                .chain(pre_cand.iter().map(|e| e.as_ref()))
287                .any(|e| {
288                    e.attribute_equality(Attribute::Class, &EntryClass::KeyProvider.into())
289                        || e.attribute_equality(Attribute::Class, &EntryClass::KeyObject.into())
290                })
291        {
292            self.changed_flags.insert(ChangeFlag::KEY_MATERIAL)
293        }
294
295        trace!(
296            changed = ?self.changed_flags.iter_names().collect::<Vec<_>>(),
297        );
298
299        Ok(true)
300    }
301
302    pub fn consumer_apply_changes(
303        &mut self,
304        ctx: ReplIncrementalContext,
305    ) -> Result<ConsumerState, OperationError> {
306        match ctx {
307            ReplIncrementalContext::DomainMismatch => {
308                error!("Unable to proceed with consumer incremental - the supplier has indicated that our domain_uuid's are not equivalent. This can occur when adding a new consumer to an existing topology.");
309                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
310                Ok(ConsumerState::RefreshRequired)
311            }
312            ReplIncrementalContext::NoChangesAvailable => {
313                debug!("no changes are available");
314                Ok(ConsumerState::Ok)
315            }
316            ReplIncrementalContext::RefreshRequired => {
317                error!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is outdated, and replication would introduce data corruption.");
318                error!("This server's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
319                Ok(ConsumerState::RefreshRequired)
320            }
321            ReplIncrementalContext::UnwillingToSupply => {
322                warn!("Unable to proceed with consumer incremental - the supplier has indicated that our RUV is ahead, and replication would introduce data corruption.");
323                error!("This supplier's content must be refreshed to proceed. If you have configured automatic refresh, this will occur shortly.");
324                Ok(ConsumerState::Ok)
325            }
326            ReplIncrementalContext::V1 {
327                domain_version,
328                domain_patch_level,
329                domain_uuid,
330                ranges,
331                schema_entries,
332                meta_entries,
333                entries,
334            } => self.consumer_apply_changes_v1(
335                domain_version,
336                domain_patch_level,
337                domain_uuid,
338                &ranges,
339                schema_entries,
340                meta_entries,
341                entries,
342            ),
343        }
344    }
345
346    #[instrument(level = "debug", skip_all)]
347    fn consumer_apply_changes_v1(
348        &mut self,
349        ctx_domain_version: DomainVersion,
350        ctx_domain_patch_level: u32,
351        ctx_domain_uuid: Uuid,
352        ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
353        ctx_schema_entries: Vec<ReplIncrementalEntryV1>,
354        ctx_meta_entries: Vec<ReplIncrementalEntryV1>,
355        ctx_entries: Vec<ReplIncrementalEntryV1>,
356    ) -> Result<ConsumerState, OperationError> {
357        if ctx_domain_version < DOMAIN_MIN_LEVEL {
358            error!("Unable to proceed with consumer incremental - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
359            return Err(OperationError::ReplDomainLevelUnsatisfiable);
360        } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
361            error!("Unable to proceed with consumer incremental - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
362            return Err(OperationError::ReplDomainLevelUnsatisfiable);
363        };
364
365        let domain_patch_level = if self.get_domain_development_taint() {
366            u32::MAX
367        } else {
368            self.get_domain_patch_level()
369        };
370
371        if ctx_domain_patch_level != domain_patch_level {
372            error!("Unable to proceed with consumer incremental - incoming domain patch level is not equal to our patch level. {} != {}", ctx_domain_patch_level, domain_patch_level);
373            return Err(OperationError::ReplDomainLevelUnsatisfiable);
374        };
375
376        let db_uuid = self.be_txn.get_db_d_uuid()?;
378
379        if db_uuid != ctx_domain_uuid {
380            error!("Unable to proceed with consumer incremental - incoming domain uuid does not match our database uuid. You must investigate this situation. {:?} != {:?}", db_uuid, ctx_domain_uuid);
381            return Err(OperationError::ReplDomainUuidMismatch);
382        }
383
384        let txn_cid = self.get_cid().clone();
386        let ruv = self.be_txn.get_ruv_write();
387
388        let change_count = ctx_schema_entries.len() + ctx_meta_entries.len() + ctx_entries.len();
389
390        ruv.incremental_preflight_validate_ruv(ctx_ranges, &txn_cid)
391            .inspect_err(|err| {
392                error!(
393                    ?err,
394                    "Incoming RUV failed preflight checks, unable to proceed."
395                );
396            })?;
397
398        debug!(
400            "Proceeding to apply incremental with {change_count} changes from domain {ctx_domain_uuid:?} at level {ctx_domain_version}"
401        );
402
403        debug!(?ctx_ranges);
404
405        debug!("Applying {} schema entries", ctx_schema_entries.len());
406        let schema_changed = self
408            .consumer_incremental_apply_entries(ctx_schema_entries)
409            .inspect_err(|err| {
410                error!(?err, "Failed to apply incremental schema entries");
411            })?;
412
413        if schema_changed {
414            self.reload_schema().inspect_err(|err| {
416                error!(?err, "Failed to reload schema");
417            })?;
418        }
419
420        debug!("Applying {} meta entries", ctx_meta_entries.len());
421        let meta_changed = self
423            .consumer_incremental_apply_entries(ctx_meta_entries)
424            .inspect_err(|err| {
425                error!(?err, "Failed to apply incremental meta entries");
426            })?;
427
428        if meta_changed {
430            self.reload_domain_info().inspect_err(|err| {
431                error!(?err, "Failed to reload domain info");
432            })?;
433            self.reload_system_config().inspect_err(|err| {
434                error!(?err, "Failed to reload system configuration");
435            })?;
436        }
437
438        debug!("Applying {} context entries", ctx_entries.len());
439        self.consumer_incremental_apply_entries(ctx_entries)
441            .inspect_err(|err| {
442                error!(?err, "Failed to apply incremental meta entries");
443            })?;
444
445        if meta_changed {
452            self.reload_domain_info_version().inspect_err(|err| {
453                error!(?err, "Failed to reload domain info version");
454            })?;
455        }
456
457        let ruv = self.be_txn.get_ruv_write();
460
461        ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
462            error!(?err, "RUV ranges were not rebuilt correctly.");
463        })?;
464
465        ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
466            error!(?err, "Unable to update RUV with supplier ranges.");
467        })?;
468
469        Ok(ConsumerState::Ok)
470    }
471
472    pub fn consumer_apply_refresh(
473        &mut self,
474        ctx: ReplRefreshContext,
475    ) -> Result<(), OperationError> {
476        match ctx {
477            ReplRefreshContext::V1 {
478                domain_version,
479                domain_devel,
480                domain_uuid,
481                ranges,
482                schema_entries,
483                meta_entries,
484                entries,
485            } => self.consumer_apply_refresh_v1(
486                domain_version,
487                domain_devel,
488                domain_uuid,
489                &ranges,
490                schema_entries,
491                meta_entries,
492                entries,
493            ),
494        }
495    }
496
497    fn consumer_refresh_create_entries(
498        &mut self,
499        ctx_entries: Vec<ReplEntryV1>,
500    ) -> Result<(), OperationError> {
501        let candidates = ctx_entries
502            .into_iter()
503            .map(EntryRefreshNew::from_repl_entry_v1)
504            .collect::<Result<Vec<EntryRefreshNew>, _>>()
505            .inspect_err(|err| {
506                error!(?err, "Failed to convert entries from supplier");
507            })?;
508
509        Plugins::run_pre_repl_refresh(self, candidates.as_slice()).map_err(|e| {
510            admin_error!(
511                "Refresh operation failed (pre_repl_refresh plugin), {:?}",
512                e
513            );
514            e
515        })?;
516
517        let norm_cand = candidates
519            .into_iter()
520            .map(|e| {
521                e.validate(&self.schema)
522                    .map_err(|e| {
523                        admin_error!("Schema Violation in refresh validate {:?}", e);
524                        OperationError::SchemaViolation(e)
525                    })
526                    .map(|e| {
527                        e.seal(&self.schema)
529                    })
530            })
531            .collect::<Result<Vec<EntrySealedNew>, _>>()?;
532
533        let commit_cand = self.be_txn.refresh(norm_cand).map_err(|e| {
534            admin_error!("betxn create failure {:?}", e);
535            e
536        })?;
537
538        Plugins::run_post_repl_refresh(self, &commit_cand).map_err(|e| {
539            admin_error!(
540                "Refresh operation failed (post_repl_refresh plugin), {:?}",
541                e
542            );
543            e
544        })?;
545
546        self.changed_uuid
547            .extend(commit_cand.iter().map(|e| e.get_uuid()));
548
549        Ok(())
550    }
551
552    #[instrument(level = "info", skip_all)]
553    fn consumer_apply_refresh_v1(
554        &mut self,
555        ctx_domain_version: DomainVersion,
556        ctx_domain_devel: bool,
557        ctx_domain_uuid: Uuid,
558        ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
559        ctx_schema_entries: Vec<ReplEntryV1>,
560        ctx_meta_entries: Vec<ReplEntryV1>,
561        ctx_entries: Vec<ReplEntryV1>,
562    ) -> Result<(), OperationError> {
563        let current_devel_flag = option_env!("KANIDM_PRE_RELEASE").is_some();
566
567        if ctx_domain_version < DOMAIN_MIN_LEVEL {
568            error!("Unable to proceed with consumer refresh - incoming domain level is lower than our minimum supported level. {} < {}", ctx_domain_version, DOMAIN_MIN_LEVEL);
569            return Err(OperationError::ReplDomainLevelUnsatisfiable);
570        } else if ctx_domain_version > DOMAIN_MAX_LEVEL {
571            error!("Unable to proceed with consumer refresh - incoming domain level is greater than our maximum supported level. {} > {}", ctx_domain_version, DOMAIN_MAX_LEVEL);
572            return Err(OperationError::ReplDomainLevelUnsatisfiable);
573        } else if ctx_domain_devel && !current_devel_flag {
574            error!("Unable to proceed with consumer refresh - incoming domain is from a development version while this server is a stable release.");
575            return Err(OperationError::ReplDomainLevelUnsatisfiable);
576        } else if !ctx_domain_devel && current_devel_flag {
577            error!("Unable to proceed with consumer refresh - incoming domain is from a stable version while this server is a development release.");
578            return Err(OperationError::ReplDomainLevelUnsatisfiable);
579        } else {
580            debug!(
581                "Proceeding to refresh from domain at level {}",
582                ctx_domain_version
583            );
584        };
585
586        self.set_phase_bootstrap();
588
589        self.be_txn
591            .set_db_d_uuid(ctx_domain_uuid)
592            .inspect_err(|err| {
593                error!(?err, "Failed to reset domain uuid");
594            })?;
595
596        self.reset_server_uuid()?;
600
601        self.be_txn
603            .danger_delete_all_db_content()
604            .inspect_err(|err| {
605                error!(?err, "Failed to clear existing server database content");
606            })?;
607
608        self.schema.generate_in_memory().inspect_err(|err| {
610            error!(?err, "Failed to reset in memory schema to clean state");
611        })?;
612
613        self.reindex(false).inspect_err(|err| {
616            error!(?err, "Failed to reload schema");
617        })?;
618
619        self.consumer_refresh_create_entries(ctx_schema_entries)
622            .inspect_err(|err| {
623                error!(?err, "Failed to refresh schema entries");
624            })?;
625
626        self.reload_schema().inspect_err(|err| {
628            error!(?err, "Failed to reload schema");
629        })?;
630
631        self.set_phase(ServerPhase::SchemaReady);
633
634        self.reindex(false).inspect_err(|err| {
637            error!(?err, "Failed to reload schema");
638        })?;
639
640        self.consumer_refresh_create_entries(ctx_meta_entries)
642            .inspect_err(|err| {
643                error!(?err, "Failed to refresh meta entries");
644            })?;
645
646        self.reload_domain_info().inspect_err(|err| {
650            error!(?err, "Failed to reload domain info");
651        })?;
652
653        self.changed_flags.insert(
655            ChangeFlag::SCHEMA
656                | ChangeFlag::ACP
657                | ChangeFlag::OAUTH2
658                | ChangeFlag::DOMAIN
659                | ChangeFlag::APPLICATION
660                | ChangeFlag::SYSTEM_CONFIG
661                | ChangeFlag::SYNC_AGREEMENT
662                | ChangeFlag::KEY_MATERIAL,
663        );
664
665        self.set_phase(ServerPhase::DomainInfoReady);
667
668        self.consumer_refresh_create_entries(ctx_entries)
672            .inspect_err(|err| {
673                error!(?err, "Failed to refresh schema entries");
674            })?;
675
676        let ruv = self.be_txn.get_ruv_write();
679
680        ruv.refresh_validate_ruv(ctx_ranges).inspect_err(|err| {
681            error!(?err, "RUV ranges were not rebuilt correctly.");
682        })?;
683
684        ruv.refresh_update_ruv(ctx_ranges).inspect_err(|err| {
685            error!(?err, "Unable to update RUV with supplier ranges.");
686        })?;
687
688        self.set_phase(ServerPhase::Running);
690
691        Ok(())
692    }
693}