kanidmd_lib/repl/
ruv.rs

1use crate::be::dbrepl::DbReplMeta;
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, BTreeSet};
4use std::ops::Bound::*;
5use std::sync::Arc;
6use std::time::Duration;
7
8use concread::bptree::{BptreeMap, BptreeMapReadSnapshot, BptreeMapReadTxn, BptreeMapWriteTxn};
9
10use idlset::v2::IDLBitRange;
11
12use crate::prelude::*;
13use crate::repl::cid::Cid;
14use crate::repl::proto::{ReplAnchoredCidRange, ReplCidRange};
15use std::fmt;
16
17#[derive(Default)]
18pub struct ReplicationUpdateVector {
19    // This sorts by time. We store the set of entry id's that are affected in an operation.
20    // Due to how replication state works, it is possibly that id's in these sets *may* not
21    // exist anymore, so these bit ranges likely need intersection with allids before use.
22    data: BptreeMap<Cid, IDLBitRange>,
23    // This sorts by Server ID. It's used for the RUV to build ranges for you ... guessed it
24    // range queries. These are used to build the set of differences that need to be sent in
25    // a replication operation.
26    //
27    // we need a way to invert the cid, but without duplication? Maybe an invert cid type?
28    // This way it still orders things in the right order by time stamp just searches by cid
29    // first.
30    ranged: BptreeMap<Uuid, BTreeSet<Duration>>,
31}
32
33/// The status of replication after investigating the RUV states.
34#[derive(Debug, PartialEq, Eq)]
35pub(crate) enum RangeDiffStatus {
36    /// Ok - can proceed with replication, supplying the following
37    /// ranges of changes to the consumer.
38    Ok(BTreeMap<Uuid, ReplCidRange>),
39    /// Refresh - The consumer is lagging and is missing a set of changes
40    /// that are required to proceed. The consumer *MUST* be refreshed
41    /// immediately.
42    Refresh {
43        lag_range: BTreeMap<Uuid, ReplCidRange>,
44    },
45    /// Unwilling - The consumer is advanced beyond our state, and supplying
46    /// changes to them may introduce inconsistency in replication. This
47    /// server should be investigated immediately.
48    Unwilling {
49        adv_range: BTreeMap<Uuid, ReplCidRange>,
50    },
51    /// Critical - The consumer is lagging and missing changes, but also is
52    /// in possession of changes advancing it beyond our current state. This
53    /// is a critical fault in replication and the topology must be
54    /// investigated immediately.
55    Critical {
56        lag_range: BTreeMap<Uuid, ReplCidRange>,
57        adv_range: BTreeMap<Uuid, ReplCidRange>,
58    },
59    /// No RUV Overlap - The consumer has likely desynchronised and no longer has
60    /// common overlap with it's RUV to ours. This can indicate it has trimmed
61    /// content we may have, or may have become part of a split brain situation.
62    /// For replication to proceed, there must be *at least* one common overlapping
63    /// point in the RUV.
64    NoRUVOverlap,
65}
66
67impl ReplicationUpdateVector {
68    pub fn write(&self) -> ReplicationUpdateVectorWriteTransaction<'_> {
69        ReplicationUpdateVectorWriteTransaction {
70            // Need to take the write first, then the read to guarantee ordering.
71            added: Some(BTreeSet::default()),
72            data: self.data.write(),
73            data_pre: self.data.read(),
74            ranged: self.ranged.write(),
75        }
76    }
77
78    pub fn read(&self) -> ReplicationUpdateVectorReadTransaction<'_> {
79        ReplicationUpdateVectorReadTransaction {
80            data: self.data.read(),
81            ranged: self.ranged.read(),
82        }
83    }
84
85    pub(crate) fn range_diff(
86        consumer_range: &BTreeMap<Uuid, ReplCidRange>,
87        supplier_range: &BTreeMap<Uuid, ReplCidRange>,
88    ) -> RangeDiffStatus {
89        // We need to build a new set of ranges that express the difference between
90        // these two states.
91        let mut diff_range = BTreeMap::default();
92        let mut lag_range = BTreeMap::default();
93        let mut adv_range = BTreeMap::default();
94
95        let mut consumer_lagging = false;
96        let mut supplier_lagging = false;
97        let mut valid_content_overlap = false;
98
99        // We need to look at each uuid in the *supplier* and assert if they are present
100        // on the *consumer*.
101        //
102        // If there are s_uuids with the same max, we don't add it to the
103        // diff
104
105        for (supplier_s_uuid, supplier_cid_range) in supplier_range.iter() {
106            match consumer_range.get(supplier_s_uuid) {
107                Some(consumer_cid_range) => {
108                    // We have the same server uuid in our RUV's so some content overlap
109                    // must exist (or has existed);
110                    valid_content_overlap = true;
111
112                    // The two windows just have to overlap. If they over lap
113                    // meaning that consumer max > supplier min, then if supplier
114                    // max > consumer max, then the range between consumer max
115                    // and supplier max must be supplied.
116                    //
117                    //   [ consumer min ... consumer max ]
118                    //      <-- [ supplier min .. supplier max ] -->
119                    //
120                    // In other words if we have:
121                    //
122                    //   [ consumer min ... consumer max ]
123                    //                                      [ supplier min ... supplier max ]
124                    //                                     ^
125                    //                                     \-- no overlap of the range windows!
126                    //
127                    // then because there has been too much lag between consumer and
128                    // the supplier then there is a risk of changes being dropped or
129                    // missing. In the future we could alter this to force the resend
130                    // of zero -> supplier max, but I think thought is needed to
131                    // ensure no corruption in this case.
132                    if consumer_cid_range.ts_max < supplier_cid_range.ts_min {
133                        //
134                        //   [ consumer min ... consumer max ]
135                        //                                      [ supplier min ... supplier max ]
136                        //                                     ^
137                        //                                     \-- no overlap of the range windows!
138                        //
139                        consumer_lagging = true;
140                        lag_range.insert(
141                            *supplier_s_uuid,
142                            ReplCidRange {
143                                ts_min: supplier_cid_range.ts_min,
144                                ts_max: consumer_cid_range.ts_max,
145                            },
146                        );
147                    } else if supplier_cid_range.ts_max < consumer_cid_range.ts_min {
148                        //
149                        //                                      [ consumer min ... consumer max ]
150                        //   [ supplier min ... supplier max ]
151                        //                                     ^
152                        //                                     \-- no overlap of the range windows!
153                        //
154                        // This means we can't supply because we are missing changes that the consumer
155                        // has. *we* are lagging.
156                        supplier_lagging = true;
157                        adv_range.insert(
158                            *supplier_s_uuid,
159                            ReplCidRange {
160                                ts_min: supplier_cid_range.ts_max,
161                                ts_max: consumer_cid_range.ts_min,
162                            },
163                        );
164                    } else if consumer_cid_range.ts_max < supplier_cid_range.ts_max {
165                        //
166                        //                                         /-- consumer needs these changes
167                        //                                         v
168                        //   [ consumer min ... consumer max ] -->                   ]
169                        //                           [ supplier min ... supplier max ]
170                        //                              ^
171                        //                              \-- overlap of the range windows
172                        //
173                        // We require the changes from consumer max -> supplier max.
174                        diff_range.insert(
175                            *supplier_s_uuid,
176                            ReplCidRange {
177                                ts_min: consumer_cid_range.ts_max,
178                                ts_max: supplier_cid_range.ts_max,
179                            },
180                        );
181                    }
182                    //
183                    //                                       /-- The consumer has changes we don't have.
184                    //                                       |   So we don't need to supply
185                    //                                       v
186                    //                             [ consumer min ... consumer max ]
187                    //   [ supplier min ... supplier max ]
188                    //                              ^
189                    //                              \-- overlap of the range windows
190                    //
191                    //  OR
192                    //
193                    //   [ consumer min ... consumer max ]
194                    //   [ supplier min ... supplier max ]
195                    //                              ^
196                    //                              \-- the windows max is identical
197                    //                                  no actions needed
198                    //
199                    // In this case there is no action required since consumer_cid_range.ts_max
200                    // must be greater than or equal to supplier max.
201                }
202                None => {
203                    // The consumer does not have any content from this
204                    // server. Select from Zero -> max of the supplier.
205                    diff_range.insert(
206                        *supplier_s_uuid,
207                        ReplCidRange {
208                            ts_min: Duration::ZERO,
209                            ts_max: supplier_cid_range.ts_max,
210                        },
211                    );
212                }
213            }
214        }
215
216        if !valid_content_overlap {
217            return RangeDiffStatus::NoRUVOverlap;
218        }
219
220        match (consumer_lagging, supplier_lagging) {
221            (false, false) => RangeDiffStatus::Ok(diff_range),
222            (true, false) => RangeDiffStatus::Refresh { lag_range },
223            (false, true) => RangeDiffStatus::Unwilling { adv_range },
224            (true, true) => RangeDiffStatus::Critical {
225                lag_range,
226                adv_range,
227            },
228        }
229    }
230}
231
232pub struct ReplicationUpdateVectorWriteTransaction<'a> {
233    added: Option<BTreeSet<Cid>>,
234    data: BptreeMapWriteTxn<'a, Cid, IDLBitRange>,
235    data_pre: BptreeMapReadTxn<'a, Cid, IDLBitRange>,
236    ranged: BptreeMapWriteTxn<'a, Uuid, BTreeSet<Duration>>,
237}
238
239impl fmt::Debug for ReplicationUpdateVectorWriteTransaction<'_> {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        writeln!(f, "RUV DATA DUMP")?;
242        self.data
243            .iter()
244            .try_for_each(|(cid, idl)| writeln!(f, "* [{cid} {idl:?}]"))?;
245        writeln!(f, "RUV RANGE DUMP")?;
246        self.ranged
247            .iter()
248            .flat_map(|(s_uuid, ts_set)| ts_set.iter().map(|ts| Cid::new(*s_uuid, *ts)))
249            .try_for_each(|cid| writeln!(f, "[{cid}]"))
250    }
251}
252
253pub struct ReplicationUpdateVectorReadTransaction<'a> {
254    data: BptreeMapReadTxn<'a, Cid, IDLBitRange>,
255    ranged: BptreeMapReadTxn<'a, Uuid, BTreeSet<Duration>>,
256}
257
258impl fmt::Debug for ReplicationUpdateVectorReadTransaction<'_> {
259    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260        writeln!(f, "RUV DATA DUMP")?;
261        self.data
262            .iter()
263            .try_for_each(|(cid, idl)| writeln!(f, "* [{cid} {idl:?}]"))?;
264        writeln!(f, "RUV RANGE DUMP")?;
265        self.ranged
266            .iter()
267            .try_for_each(|(s_uuid, ts)| writeln!(f, "* [{s_uuid} {ts:?}]"))
268    }
269}
270
271pub trait ReplicationUpdateVectorTransaction {
272    fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange>;
273
274    fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>>;
275
276    fn to_db_backup_ruv(&self) -> DbReplMeta {
277        DbReplMeta::V1 {
278            ruv: self.ruv_snapshot().keys().map(|cid| cid.into()).collect(),
279        }
280    }
281
282    /// Return a filtered view of our RUV ranges. This acts similar to "trim" where any s_uuid
283    /// where the max cid is less than trim_cid will be excluded from the view.
284    fn filter_ruv_range(
285        &self,
286        trim_cid: &Cid,
287    ) -> Result<BTreeMap<Uuid, ReplCidRange>, OperationError> {
288        self.range_snapshot()
289            .iter()
290            .filter_map(|(s_uuid, range)| match (range.first(), range.last()) {
291                (Some(first), Some(last)) => {
292                    if last < &trim_cid.ts {
293                        None
294                    } else {
295                        Some(Ok((
296                            *s_uuid,
297                            ReplCidRange {
298                                ts_min: *first,
299                                ts_max: *last,
300                            },
301                        )))
302                    }
303                }
304                _ => {
305                    error!(
306                        "invalid state for server uuid {:?}, no ranges present",
307                        s_uuid
308                    );
309                    Some(Err(OperationError::InvalidState))
310                }
311            })
312            .collect::<Result<BTreeMap<_, _>, _>>()
313    }
314
315    /// Return the complete set of RUV ranges present on this replica
316    fn current_ruv_range(&self) -> Result<BTreeMap<Uuid, ReplCidRange>, OperationError> {
317        self.range_snapshot()
318            .iter()
319            .map(|(s_uuid, range)| match (range.first(), range.last()) {
320                (Some(first), Some(last)) => Ok((
321                    *s_uuid,
322                    ReplCidRange {
323                        ts_min: *first,
324                        ts_max: *last,
325                    },
326                )),
327                _ => {
328                    error!(
329                        "invalid state for server uuid {:?}, no ranges present",
330                        s_uuid
331                    );
332                    Err(OperationError::InvalidState)
333                }
334            })
335            .collect::<Result<BTreeMap<_, _>, _>>()
336    }
337
338    fn range_to_idl(&self, ctx_ranges: &BTreeMap<Uuid, ReplCidRange>) -> IDLBitRange {
339        let mut idl = IDLBitRange::new();
340        // Force the set to be compressed, saves on seeks during inserts.
341        idl.compress();
342        let range = self.range_snapshot();
343        let ruv = self.ruv_snapshot();
344
345        // The range we have has a collection of s_uuid containing low -> high ranges.
346        // We need to convert this to absolute ranges of all the idlbitranges that
347        // relate to the entries we have.
348
349        for (s_uuid, ctx_range) in ctx_ranges {
350            // For each server and range low to high, iterate over
351            // the list of CID's in the main RUV.
352
353            let Some(ruv_range) = range.get(s_uuid) else {
354                // This is valid because if we clean up a server range on
355                // this node, but the other server isn't aware yet, so we
356                // just no-op this. The changes we have will still be
357                // correctly found and sent.
358                debug!(?s_uuid, "range not found in ruv.");
359                continue;
360            };
361
362            // Get from the min to the max. Unbounded and
363            // Included(ctx_range.ts_max) are the same in
364            // this context.
365            for ts in ruv_range.range((Excluded(ctx_range.ts_min), Unbounded)) {
366                let cid = Cid {
367                    ts: *ts,
368                    s_uuid: *s_uuid,
369                };
370
371                if let Some(ruv_idl) = ruv.get(&cid) {
372                    ruv_idl.into_iter().for_each(|id| idl.insert_id(id))
373                }
374                // If the cid isn't found, it may have been trimmed, but that's okay. A cid in
375                // a range can be trimmed if all entries of that cid have since tombstoned so
376                // no longer need to be applied in change ranges.
377            }
378        }
379
380        idl
381    }
382
383    fn verify(
384        &self,
385        entries: &[Arc<EntrySealedCommitted>],
386        results: &mut Vec<Result<(), ConsistencyError>>,
387    ) {
388        // Okay rebuild the RUV in parallel.
389        let mut check_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::default();
390        for entry in entries {
391            // The DB id we need.
392            let eid = entry.get_id();
393            let ecstate = entry.get_changestate();
394            // We don't need the details of the change - only the cid of the
395            // change that this entry was involved in.
396            for cid in ecstate.cid_iter() {
397                // Add to the main ruv data.
398                if let Some(idl) = check_ruv.get_mut(cid) {
399                    // We can't guarantee id order, so we have to do this properly.
400                    idl.insert_id(eid);
401                } else {
402                    let mut idl = IDLBitRange::new();
403                    idl.insert_id(eid);
404                    check_ruv.insert(cid.clone(), idl);
405                }
406            }
407        }
408
409        trace!(?check_ruv);
410        // Get the current state
411        let snapshot_ruv = self.ruv_snapshot();
412
413        // Now compare. We want to do this checking for each CID in each, and then asserting
414        // the content is the same.
415
416        let mut check_iter = check_ruv.iter();
417        let mut snap_iter = snapshot_ruv.iter();
418
419        let mut check_next = check_iter.next();
420        let mut snap_next = snap_iter.next();
421
422        while let (Some((ck, cv)), Some((sk, sv))) = (&check_next, &snap_next) {
423            match ck.cmp(sk) {
424                Ordering::Equal => {
425                    // Counter intuitive, but here we check that the check set is a *subset*
426                    // of the ruv snapshot. This is because when we have an entry that is
427                    // tombstoned, all it's CID interactions are "lost" and it's cid becomes
428                    // that of when it was tombstoned. So the "rebuilt" ruv will miss that
429                    // entry.
430                    //
431                    // In the future the RUV concept may be ditched entirely anyway, thoughts needed.
432                    let intersect = *cv & *sv;
433                    if *cv == &intersect {
434                        trace!("{:?} is consistent!", ck);
435                    } else {
436                        error!("{:?} is NOT consistent! IDL's differ", ck);
437                        debug_assert!(false);
438                        results.push(Err(ConsistencyError::RuvInconsistent(ck.to_string())));
439                    }
440                    check_next = check_iter.next();
441                    snap_next = snap_iter.next();
442                }
443                // Because we are zipping between these two sets, we only need to compare when
444                // the CID's are equal. Otherwise we need the other iter to "catch up"
445                Ordering::Less => {
446                    check_next = check_iter.next();
447                }
448                Ordering::Greater => {
449                    snap_next = snap_iter.next();
450                }
451            }
452        }
453
454        while let Some((ck, _cv)) = &check_next {
455            debug!("{:?} may not be consistent! CID missing from RUV", ck);
456            // debug_assert!(false);
457            // results.push(Err(ConsistencyError::RuvInconsistent(ck.to_string())));
458            check_next = check_iter.next();
459        }
460
461        while let Some((sk, _sv)) = &snap_next {
462            debug!(
463                "{:?} may not be consistent! CID should not exist in RUV",
464                sk
465            );
466            // debug_assert!(false);
467            // results.push(Err(ConsistencyError::RuvInconsistent(sk.to_string())));
468            snap_next = snap_iter.next();
469        }
470
471        // Assert that the content of the ranged set matches the data set and has the
472        // correct set of values.
473        let snapshot_range = self.range_snapshot();
474
475        for cid in snapshot_ruv.keys() {
476            if let Some(server_range) = snapshot_range.get(&cid.s_uuid) {
477                if !server_range.contains(&cid.ts) {
478                    warn!(
479                        "{:?} is NOT consistent! server range is missing cid in index",
480                        cid
481                    );
482                    debug_assert!(false);
483                    results.push(Err(ConsistencyError::RuvInconsistent(
484                        cid.s_uuid.to_string(),
485                    )));
486                }
487            } else {
488                warn!(
489                    "{:?} is NOT consistent! server range is not present",
490                    cid.s_uuid
491                );
492                debug_assert!(false);
493                results.push(Err(ConsistencyError::RuvInconsistent(
494                    cid.s_uuid.to_string(),
495                )));
496            }
497        }
498
499        // Done!
500    }
501
502    fn get_anchored_ranges(
503        &self,
504        ranges: BTreeMap<Uuid, ReplCidRange>,
505    ) -> Result<BTreeMap<Uuid, ReplAnchoredCidRange>, OperationError> {
506        let self_range_snapshot = self.range_snapshot();
507
508        ranges
509            .into_iter()
510            .map(|(s_uuid, ReplCidRange { ts_min, ts_max })| {
511                let ts_range = self_range_snapshot.get(&s_uuid).ok_or_else(|| {
512                    error!(
513                        ?s_uuid,
514                        "expected cid range for server in ruv, was not present"
515                    );
516                    OperationError::InvalidState
517                })?;
518
519                // If these are equal and excluded, btreeset panics
520                let anchors = if ts_max > ts_min {
521                    // We exclude the ends because these are already in the ts_min/max
522                    ts_range
523                        .range((Excluded(ts_min), Excluded(ts_max)))
524                        .copied()
525                        .collect::<Vec<_>>()
526                } else {
527                    Vec::with_capacity(0)
528                };
529
530                Ok((
531                    s_uuid,
532                    ReplAnchoredCidRange {
533                        ts_min,
534                        anchors,
535                        ts_max,
536                    },
537                ))
538            })
539            .collect()
540    }
541}
542
543impl ReplicationUpdateVectorTransaction for ReplicationUpdateVectorWriteTransaction<'_> {
544    fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange> {
545        self.data.to_snapshot()
546    }
547
548    fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>> {
549        self.ranged.to_snapshot()
550    }
551}
552
553impl ReplicationUpdateVectorTransaction for ReplicationUpdateVectorReadTransaction<'_> {
554    fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange> {
555        self.data.to_snapshot()
556    }
557
558    fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>> {
559        self.ranged.to_snapshot()
560    }
561}
562
563impl ReplicationUpdateVectorWriteTransaction<'_> {
564    pub fn clear(&mut self) {
565        self.added = None;
566        self.data.clear();
567        self.ranged.clear();
568    }
569
570    pub(crate) fn incremental_preflight_validate_ruv(
571        &self,
572        ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
573        txn_cid: &Cid,
574    ) -> Result<(), OperationError> {
575        // Check that the incoming ranges, for our servers id, do not exceed
576        // our servers max state. This can occur if we restore from a backup
577        // where the replication state is older than what our partners have,
578        // meaning that the context may have diverged in a way we can't then
579        // resolve.
580
581        if let Some(our_cid_range_max) = self
582            .ranged
583            .get(&txn_cid.s_uuid)
584            .and_then(|range| range.last().copied())
585        {
586            if let Some(incoming_cid_range) = ctx_ranges.get(&txn_cid.s_uuid) {
587                if incoming_cid_range.ts_max > our_cid_range_max {
588                    error!("The incoming data contains changes matching this server's UUID, and those changes are newer than the local version. This can occur if the server was restored from a backup which was taken before sending out changes. Replication is unable to proceed as data corruption may occur. You must refresh this consumer immediately to continue.");
589                    return Err(OperationError::ReplServerUuidSplitDataState);
590                }
591            }
592        }
593
594        let warn_time = txn_cid.ts + REPL_SUPPLIER_ADVANCE_WINDOW;
595        for (s_uuid, incoming_cid_range) in ctx_ranges.iter() {
596            if incoming_cid_range.ts_max > warn_time {
597                // TODO: This would be a great place for fault management to pick up this warning
598                warn!(
599                    "Incoming changes from {:?} are further than {} seconds in the future.",
600                    s_uuid,
601                    REPL_SUPPLIER_ADVANCE_WINDOW.as_secs()
602                );
603            }
604        }
605
606        Ok(())
607    }
608
609    pub(crate) fn refresh_validate_ruv(
610        &self,
611        ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
612    ) -> Result<(), OperationError> {
613        // Assert that the ruv that currently exists, is a valid data set of
614        // the supplied consumer range - especially check that when a uuid exists in
615        // our ruv, that it's maximum matches the ctx ruv.
616        //
617        // Since the ctx range comes from the supplier, when we rebuild due to the
618        // state machine then some values may not exist since they were replaced
619        // or updated. It's also possible that the imported range maximums *may not*
620        // exist especially in three way replication scenarios where S1:A was the S1
621        // maximum but is replaced by S2:B. This would make S1:A still it's valid
622        // maximum but no entry reflects that in it's change state.
623        let mut valid = true;
624
625        for (ctx_server_uuid, ctx_server_range) in ctx_ranges.iter() {
626            match self.ranged.get(ctx_server_uuid) {
627                Some(server_range) => {
628                    let ctx_ts = &ctx_server_range.ts_max;
629                    match server_range.last() {
630                        Some(s_ts) if s_ts <= ctx_ts => {
631                            // Ok - our entries reflect maximum or earlier.
632                            trace!(?ctx_server_uuid, ?ctx_ts, ?s_ts, "valid");
633                        }
634                        Some(s_ts) => {
635                            valid = false;
636                            warn!(?ctx_server_uuid, ?ctx_ts, ?s_ts, "inconsistent s_uuid in ruv, consumer ruv is advanced past supplier");
637                        }
638                        None => {
639                            valid = false;
640                            warn!(
641                                ?ctx_server_uuid,
642                                ?ctx_ts,
643                                "inconsistent server range in ruv, no maximum ts found for s_uuid"
644                            );
645                        }
646                    }
647                }
648                None => {
649                    // valid = false;
650                    trace!(
651                        ?ctx_server_uuid,
652                        "s_uuid absent from ranged ruv, possible that changes have been expired"
653                    );
654                }
655            }
656        }
657
658        if valid {
659            Ok(())
660        } else {
661            Err(OperationError::ReplInvalidRUVState)
662        }
663    }
664
665    #[instrument(level = "trace", name = "ruv::refresh_update_ruv", skip_all)]
666    pub(crate) fn refresh_update_ruv(
667        &mut self,
668        ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
669    ) -> Result<(), OperationError> {
670        // Previously this would just add in the ranges, and then the actual entries
671        // from the changestate would populate the data/ranges. Now we add empty idls
672        // to each of these so that they are db persisted allowing ruv reload.
673        for (ctx_s_uuid, ctx_range) in ctx_ranges.iter() {
674            let cid_iter = std::iter::once(&ctx_range.ts_min)
675                .chain(ctx_range.anchors.iter())
676                .chain(std::iter::once(&ctx_range.ts_max))
677                .map(|ts| Cid::new(*ctx_s_uuid, *ts));
678
679            for cid in cid_iter {
680                self.insert_change(&cid, IDLBitRange::default())?;
681            }
682        }
683
684        Ok(())
685    }
686
687    /// Restore the ruv from a DB backup. It's important to note here that
688    /// we don't actually need to restore and of the IDL's in the process. we only
689    /// needs the CID's of the changes/points in time. This is because when the
690    /// db entries are restored, their changesets will re-populate the data that we
691    /// need in the RUV at these points. The reason we need these ranges without IDL
692    /// is so that trim and replication works properly.
693    #[instrument(level = "debug", name = "ruv::restore", skip_all)]
694    pub(crate) fn restore<I>(&mut self, iter: I) -> Result<(), OperationError>
695    where
696        I: IntoIterator<Item = Cid>,
697    {
698        let mut rebuild_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::new();
699        let mut rebuild_range: BTreeMap<Uuid, BTreeSet<Duration>> = BTreeMap::default();
700
701        for cid in iter {
702            if !rebuild_ruv.contains_key(&cid) {
703                let idl = IDLBitRange::new();
704                rebuild_ruv.insert(cid.clone(), idl);
705            }
706
707            if let Some(server_range) = rebuild_range.get_mut(&cid.s_uuid) {
708                server_range.insert(cid.ts);
709            } else {
710                let mut ts_range = BTreeSet::default();
711                ts_range.insert(cid.ts);
712                rebuild_range.insert(cid.s_uuid, ts_range);
713            }
714        }
715
716        self.data.extend(rebuild_ruv);
717        self.ranged.extend(rebuild_range);
718
719        Ok(())
720    }
721
722    #[instrument(level = "debug", name = "ruv::rebuild", skip_all)]
723    pub fn rebuild(&mut self, entries: &[Arc<EntrySealedCommitted>]) -> Result<(), OperationError> {
724        // Entries and their internal changestates are the "source of truth" for all changes
725        // that have ever occurred and are stored on this server. So we use them to rebuild our RUV
726        // here!
727        //
728        // We only update RUV items where an anchor exists.
729
730        // let mut rebuild_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::new();
731        // let mut rebuild_range: BTreeMap<Uuid, BTreeSet<Duration>> = BTreeMap::default();
732
733        for entry in entries {
734            // The DB id we need.
735            let eid = entry.get_id();
736            let ecstate = entry.get_changestate();
737            // We don't need the details of the change - only the cid of the
738            // change that this entry was involved in.
739            for cid in ecstate.cid_iter() {
740                // if let Some(idl) = rebuild_ruv.get_mut(cid) {
741                if let Some(idl) = self.data.get_mut(cid) {
742                    // We can't guarantee id order, so we have to do this properly.
743                    idl.insert_id(eid);
744                    /*
745                    } else {
746                        let mut idl = IDLBitRange::new();
747                        idl.insert_id(eid);
748                        rebuild_ruv.insert(cid.clone(), idl);
749                    */
750                }
751
752                /*
753                if let Some(server_range) = rebuild_range.get_mut(&cid.s_uuid) {
754                    server_range.insert(cid.ts);
755                } else {
756                    let mut ts_range = BTreeSet::default();
757                    ts_range.insert(cid.ts);
758                    rebuild_range.insert(cid.s_uuid, ts_range);
759                }
760                */
761            }
762        }
763
764        // Finally, we need to do a cleanup/compact of the IDL's if possible.
765        self.data.range_mut(..).for_each(|(_k, idl)| {
766            idl.maybe_compress();
767        });
768
769        // self.data.extend(rebuild_ruv);
770
771        // Warning - you can't extend here because this is keyed by UUID. You need
772        // to go through each key and then merge the sets.
773
774        /*
775        rebuild_range.into_iter().for_each(|(s_uuid, ts_set)| {
776            if let Some(ex_ts_set) = self.ranged.get_mut(&s_uuid) {
777                ex_ts_set.extend(ts_set)
778            } else {
779                self.ranged.insert(s_uuid, ts_set);
780            }
781        });
782        */
783
784        Ok(())
785    }
786
787    pub fn insert_change(&mut self, cid: &Cid, idl: IDLBitRange) -> Result<(), OperationError> {
788        // Remember, in a transaction the changes can be updated multiple times.
789        if let Some(ex_idl) = self.data.get_mut(cid) {
790            // This ensures both sets have all the available ids.
791            let idl = ex_idl as &_ | &idl;
792            *ex_idl = idl;
793        } else {
794            self.data.insert(cid.clone(), idl);
795        }
796
797        if let Some(server_range) = self.ranged.get_mut(&cid.s_uuid) {
798            server_range.insert(cid.ts);
799        } else {
800            let mut range = BTreeSet::default();
801            range.insert(cid.ts);
802            self.ranged.insert(cid.s_uuid, range);
803        }
804
805        if let Some(added) = &mut self.added {
806            added.insert(cid.clone());
807        }
808
809        Ok(())
810    }
811
812    pub fn update_entry_changestate(
813        &mut self,
814        entry: &EntrySealedCommitted,
815    ) -> Result<(), OperationError> {
816        let eid = entry.get_id();
817        let ecstate = entry.get_changestate();
818
819        trace!("Updating ruv state from entry {}", eid);
820        trace!(?ecstate);
821
822        for cid in ecstate.cid_iter() {
823            if let Some(idl) = self.data.get_mut(cid) {
824                // We can't guarantee id order, so we have to do this properly.
825                idl.insert_id(eid);
826            } else {
827                let mut idl = IDLBitRange::new();
828                idl.insert_id(eid);
829                self.data.insert(cid.clone(), idl);
830            }
831
832            if let Some(server_range) = self.ranged.get_mut(&cid.s_uuid) {
833                server_range.insert(cid.ts);
834            } else {
835                let mut ts_range = BTreeSet::default();
836                ts_range.insert(cid.ts);
837                self.ranged.insert(cid.s_uuid, ts_range);
838            }
839        }
840
841        Ok(())
842    }
843
844    pub fn ruv_idls(&self) -> IDLBitRange {
845        let mut idl = IDLBitRange::new();
846        self.data.iter().for_each(|(_cid, ex_idl)| {
847            idl = ex_idl as &_ | &idl;
848        });
849        idl
850    }
851
852    /*
853        How to handle changelog trimming? If we trim a server out from the RUV as a whole, we
854        need to be sure we don't oversupply changes the consumer already has. How can we do
855        this cleanly? Or do we just deal with it because our own local trim will occur soon after?
856
857        The situation would be
858
859        A:   1    ->    3
860        B:   1    ->    3
861
862        Assuming A trims first:
863
864        A:
865        B:   1    ->    3
866
867        Then on A <- B, B would try to supply 1->3 to A assuming it is not present. However,
868        the trim would occur soon after on B causing:
869
870        A:
871        B:
872
873        And then the supply would stop. So either A needs to retain the max/min in it's range
874        to allow the comparison here to continue even if it's ruv is cleaned. Or, we need to
875        have a delayed trim on the range that is 2x the normal trim range to give a buffer?
876
877        Mostly longer ruv/cid ranges aren't an issue for us, so could we just make these ranges
878        really large?
879
880        NOTE: For now we do NOT trim out max CID's of any s_uuid so that we don't have to confront
881        this edge case yet.
882
883        // == RESOLVED: Previously this was a problem as the CID ranges of any node may not be a
884        // complete view of all CID's that existed on any other node. Now with anchors in replication
885        // this changes and we have not only a complete view of all CID's that were created, but
886        // tombstone purge always create an empty anchor so the RUV always advances. This way we
887        // have a stronger assurance about which servers are live and which are not.
888    */
889
890    // Problem Cases
891
892    /*
893       What about generations? There is a "live" generation which can be replicated and a
894       former generation of ranges that previously existed. To replicate:
895           // The consumer must have content within the current live range.
896           consumer.live_max < supplier.live_max
897           consumer.live_max >= supplier.live_min
898           // The consumer must have all content that was formerly known.
899           consumer.live_min >= supplier.former_max
900           // I don't think we care what
901
902
903        // == RESOLVED: Anchors give us the generations that existed previously without us
904        // needing to worry about this.
905    */
906
907    pub fn trim_up_to(&mut self, cid: &Cid) -> Result<IDLBitRange, OperationError> {
908        trace!(trim_up_to_cid = ?cid);
909        let mut idl = IDLBitRange::new();
910        let mut remove_suuid = Vec::with_capacity(0);
911
912        // Here we can use the for_each here to be trimming the
913        // range set since that is not ordered by time, we need
914        // to do fragmented searches over this no matter what we
915        // try to do.
916
917        for (cid, ex_idl) in self.data.range((Unbounded, Excluded(cid))) {
918            trace!(?cid, "examining for RUV removal");
919            idl = ex_idl as &_ | &idl;
920
921            // Remove the reverse version of the cid from the ranged index.
922            match self.ranged.get_mut(&cid.s_uuid) {
923                Some(server_range) => {
924                    // Remove returns a bool if the element WAS present.
925                    if !server_range.remove(&cid.ts) {
926                        error!("Impossible State - The RUV is corrupted due to missing sid:ts pair in ranged index");
927                        error!(ruv = ?self);
928                        error!(?cid);
929                        return Err(OperationError::InvalidState);
930                    }
931
932                    if server_range.is_empty() {
933                        remove_suuid.push(cid.s_uuid);
934                        warn!(s_uuid = ?cid.s_uuid, "disconnected server detected - this will be removed!");
935                    } else {
936                        trace!(?server_range, "retaining server");
937                    }
938                }
939                None => {
940                    error!("Impossible State - The RUV is corrupted due to missing sid in ranged index");
941                    error!(ruv = ?self);
942                    error!(?cid);
943                    return Err(OperationError::InvalidState);
944                }
945            }
946        }
947
948        // We can now remove old server id's because we have a reliable liveness check in the
949        // method of anchors being transmissed during replication. If a server is offline for
950        // an extended period, it will not have created any anchors, and it will eventually become
951        // empty in the data range. This allow it to be trimmed out.
952        for s_uuid in remove_suuid {
953            let x = self.ranged.remove(&s_uuid);
954            assert!(x.map(|y| y.is_empty()).unwrap_or(false))
955        }
956
957        // Trim all cid's up to this value, and return the range of IDs
958        // that are affected.
959        self.data.split_off_lt(cid);
960
961        Ok(idl)
962    }
963
964    pub fn added(&self) -> Box<dyn Iterator<Item = Cid> + '_> {
965        if let Some(added) = self.added.as_ref() {
966            // return what was added this txn. We previously would iterate
967            // from data_pre.max() with data, but if an anchor was added that
968            // pre-dated data_pre.max() it wouldn't be committed to the db ruv
969            // (even though it was in the in memory ruv).
970            Box::new(added.iter().map(|cid| {
971                debug!(added_cid = ?cid);
972                cid.clone()
973            }))
974        } else {
975            // We have been cleared during this txn, so everything in data is
976            // added.
977            Box::new(self.data.iter().map(|(cid, _)| {
978                debug!(added_cid = ?cid);
979                cid.clone()
980            }))
981        }
982    }
983
984    pub fn removed(&self) -> impl Iterator<Item = Cid> + '_ {
985        let prev_bound = if self.added.is_none() {
986            // We have been cleared during this txn, so everything in pre is
987            // removed.
988            Unbounded
989        } else if let Some((min, _)) = self.data.first_key_value() {
990            Excluded(min.clone())
991        } else {
992            // If empty, assume everything is removed.
993            Unbounded
994        };
995
996        // iterate through our previous data to find what has been removed given
997        // the ranges determined above.
998        self.data_pre
999            .range((Unbounded, prev_bound))
1000            .map(|(cid, _)| {
1001                debug!(removed_cid = ?cid);
1002                cid.clone()
1003            })
1004    }
1005
1006    pub fn commit(self) {
1007        self.data.commit();
1008        self.ranged.commit();
1009    }
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014    use super::RangeDiffStatus;
1015    use super::ReplCidRange;
1016    use super::ReplicationUpdateVector;
1017    use std::collections::BTreeMap;
1018    use std::time::Duration;
1019
1020    const UUID_A: uuid::Uuid = uuid::uuid!("13b530b0-efdd-4934-8fb7-9c35c8aab79e");
1021    const UUID_B: uuid::Uuid = uuid::uuid!("16327cf8-6a34-4a17-982c-b2eaa6d02d00");
1022    const UUID_C: uuid::Uuid = uuid::uuid!("2ed717e3-15be-41e6-b966-10a1f6d7ea1c");
1023
1024    #[test]
1025    fn test_ruv_range_diff_1() {
1026        let ctx_a = BTreeMap::default();
1027        let ctx_b = BTreeMap::default();
1028
1029        let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1030        let expect = RangeDiffStatus::NoRUVOverlap;
1031        assert_eq!(result, expect);
1032
1033        // Test the inverse.
1034        let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1035        let expect = RangeDiffStatus::NoRUVOverlap;
1036        assert_eq!(result, expect);
1037    }
1038
1039    #[test]
1040    fn test_ruv_range_diff_2() {
1041        let ctx_a = btreemap!((
1042            UUID_A,
1043            ReplCidRange {
1044                ts_min: Duration::from_secs(1),
1045                ts_max: Duration::from_secs(3),
1046            }
1047        ));
1048        let ctx_b = BTreeMap::default();
1049
1050        let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1051        let expect = RangeDiffStatus::NoRUVOverlap;
1052        assert_eq!(result, expect);
1053
1054        let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1055        let expect = RangeDiffStatus::NoRUVOverlap;
1056        assert_eq!(result, expect);
1057    }
1058
1059    #[test]
1060    fn test_ruv_range_diff_3() {
1061        let ctx_a = btreemap!((
1062            UUID_A,
1063            ReplCidRange {
1064                ts_min: Duration::from_secs(1),
1065                ts_max: Duration::from_secs(3),
1066            }
1067        ));
1068        let ctx_b = btreemap!((
1069            UUID_A,
1070            ReplCidRange {
1071                ts_min: Duration::from_secs(1),
1072                ts_max: Duration::from_secs(3),
1073            }
1074        ));
1075
1076        let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1077        let expect = RangeDiffStatus::Ok(BTreeMap::default());
1078        assert_eq!(result, expect);
1079
1080        let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1081        let expect = RangeDiffStatus::Ok(BTreeMap::default());
1082        assert_eq!(result, expect);
1083    }
1084
1085    #[test]
1086    fn test_ruv_range_diff_4() {
1087        let ctx_a = btreemap!((
1088            UUID_A,
1089            ReplCidRange {
1090                ts_min: Duration::from_secs(1),
1091                ts_max: Duration::from_secs(3),
1092            }
1093        ));
1094        let ctx_b = btreemap!((
1095            UUID_A,
1096            ReplCidRange {
1097                ts_min: Duration::from_secs(1),
1098                ts_max: Duration::from_secs(4),
1099            }
1100        ));
1101
1102        let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1103        let expect = RangeDiffStatus::Ok(btreemap!((
1104            UUID_A,
1105            ReplCidRange {
1106                ts_min: Duration::from_secs(3),
1107                ts_max: Duration::from_secs(4),
1108            }
1109        )));
1110        assert_eq!(result, expect);
1111
1112        let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1113        let expect = RangeDiffStatus::Ok(BTreeMap::default());
1114        assert_eq!(result, expect);
1115    }
1116
1117    #[test]
1118    fn test_ruv_range_diff_5() {
1119        let ctx_a = btreemap!((
1120            UUID_A,
1121            ReplCidRange {
1122                ts_min: Duration::from_secs(5),
1123                ts_max: Duration::from_secs(7),
1124            }
1125        ));
1126        let ctx_b = btreemap!((
1127            UUID_A,
1128            ReplCidRange {
1129                ts_min: Duration::from_secs(1),
1130                ts_max: Duration::from_secs(4),
1131            }
1132        ));
1133
1134        let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1135        let expect = RangeDiffStatus::Unwilling {
1136            adv_range: btreemap!((
1137                UUID_A,
1138                ReplCidRange {
1139                    ts_min: Duration::from_secs(4),
1140                    ts_max: Duration::from_secs(5),
1141                }
1142            )),
1143        };
1144        assert_eq!(result, expect);
1145
1146        let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1147        let expect = RangeDiffStatus::Refresh {
1148            lag_range: btreemap!((
1149                UUID_A,
1150                ReplCidRange {
1151                    ts_min: Duration::from_secs(5),
1152                    ts_max: Duration::from_secs(4),
1153                }
1154            )),
1155        };
1156        assert_eq!(result, expect);
1157    }
1158
1159    #[test]
1160    fn test_ruv_range_diff_6() {
1161        let ctx_a = btreemap!((
1162            UUID_A,
1163            ReplCidRange {
1164                ts_min: Duration::from_secs(1),
1165                ts_max: Duration::from_secs(4),
1166            }
1167        ));
1168        let ctx_b = btreemap!(
1169            (
1170                UUID_A,
1171                ReplCidRange {
1172                    ts_min: Duration::from_secs(1),
1173                    ts_max: Duration::from_secs(3),
1174                }
1175            ),
1176            (
1177                UUID_B,
1178                ReplCidRange {
1179                    ts_min: Duration::from_secs(2),
1180                    ts_max: Duration::from_secs(4),
1181                }
1182            )
1183        );
1184
1185        let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1186        let expect = RangeDiffStatus::Ok(btreemap!((
1187            UUID_B,
1188            ReplCidRange {
1189                ts_min: Duration::ZERO,
1190                ts_max: Duration::from_secs(4),
1191            }
1192        )));
1193        assert_eq!(result, expect);
1194
1195        let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1196        let expect = RangeDiffStatus::Ok(btreemap!((
1197            UUID_A,
1198            ReplCidRange {
1199                ts_min: Duration::from_secs(3),
1200                ts_max: Duration::from_secs(4),
1201            }
1202        )));
1203        assert_eq!(result, expect);
1204    }
1205
1206    #[test]
1207    fn test_ruv_range_diff_7() {
1208        let ctx_a = btreemap!(
1209            (
1210                UUID_A,
1211                ReplCidRange {
1212                    ts_min: Duration::from_secs(1),
1213                    ts_max: Duration::from_secs(4),
1214                }
1215            ),
1216            (
1217                UUID_C,
1218                ReplCidRange {
1219                    ts_min: Duration::from_secs(2),
1220                    ts_max: Duration::from_secs(5),
1221                }
1222            )
1223        );
1224        let ctx_b = btreemap!(
1225            (
1226                UUID_A,
1227                ReplCidRange {
1228                    ts_min: Duration::from_secs(1),
1229                    ts_max: Duration::from_secs(3),
1230                }
1231            ),
1232            (
1233                UUID_B,
1234                ReplCidRange {
1235                    ts_min: Duration::from_secs(2),
1236                    ts_max: Duration::from_secs(4),
1237                }
1238            ),
1239            (
1240                UUID_C,
1241                ReplCidRange {
1242                    ts_min: Duration::from_secs(3),
1243                    ts_max: Duration::from_secs(4),
1244                }
1245            )
1246        );
1247
1248        let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1249        let expect = RangeDiffStatus::Ok(btreemap!((
1250            UUID_B,
1251            ReplCidRange {
1252                ts_min: Duration::ZERO,
1253                ts_max: Duration::from_secs(4),
1254            }
1255        )));
1256        assert_eq!(result, expect);
1257
1258        let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1259        let expect = RangeDiffStatus::Ok(btreemap!(
1260            (
1261                UUID_A,
1262                ReplCidRange {
1263                    ts_min: Duration::from_secs(3),
1264                    ts_max: Duration::from_secs(4),
1265                }
1266            ),
1267            (
1268                UUID_C,
1269                ReplCidRange {
1270                    ts_min: Duration::from_secs(4),
1271                    ts_max: Duration::from_secs(5),
1272                }
1273            )
1274        ));
1275        assert_eq!(result, expect);
1276    }
1277
1278    #[test]
1279    fn test_ruv_range_diff_8() {
1280        let ctx_a = btreemap!(
1281            (
1282                UUID_A,
1283                ReplCidRange {
1284                    ts_min: Duration::from_secs(4),
1285                    ts_max: Duration::from_secs(6),
1286                }
1287            ),
1288            (
1289                UUID_B,
1290                ReplCidRange {
1291                    ts_min: Duration::from_secs(1),
1292                    ts_max: Duration::from_secs(2),
1293                }
1294            )
1295        );
1296        let ctx_b = btreemap!(
1297            (
1298                UUID_A,
1299                ReplCidRange {
1300                    ts_min: Duration::from_secs(1),
1301                    ts_max: Duration::from_secs(2),
1302                }
1303            ),
1304            (
1305                UUID_B,
1306                ReplCidRange {
1307                    ts_min: Duration::from_secs(4),
1308                    ts_max: Duration::from_secs(6),
1309                }
1310            )
1311        );
1312
1313        let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1314        let expect = RangeDiffStatus::Critical {
1315            adv_range: btreemap!((
1316                UUID_A,
1317                ReplCidRange {
1318                    ts_min: Duration::from_secs(2),
1319                    ts_max: Duration::from_secs(4),
1320                }
1321            )),
1322            lag_range: btreemap!((
1323                UUID_B,
1324                ReplCidRange {
1325                    ts_min: Duration::from_secs(4),
1326                    ts_max: Duration::from_secs(2),
1327                }
1328            )),
1329        };
1330        assert_eq!(result, expect);
1331
1332        let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1333        let expect = RangeDiffStatus::Critical {
1334            adv_range: btreemap!((
1335                UUID_B,
1336                ReplCidRange {
1337                    ts_min: Duration::from_secs(2),
1338                    ts_max: Duration::from_secs(4),
1339                }
1340            )),
1341            lag_range: btreemap!((
1342                UUID_A,
1343                ReplCidRange {
1344                    ts_min: Duration::from_secs(4),
1345                    ts_max: Duration::from_secs(2),
1346                }
1347            )),
1348        };
1349        assert_eq!(result, expect);
1350    }
1351}