kanidmd_lib/repl/ruv.rs
1use crate::be::dbrepl::DbReplMeta;
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, BTreeSet};
4use std::ops::Bound::*;
5use std::sync::Arc;
6use std::time::Duration;
7
8use concread::bptree::{BptreeMap, BptreeMapReadSnapshot, BptreeMapReadTxn, BptreeMapWriteTxn};
9
10use idlset::v2::IDLBitRange;
11
12use crate::prelude::*;
13use crate::repl::cid::Cid;
14use crate::repl::proto::{ReplAnchoredCidRange, ReplCidRange};
15use std::fmt;
16
17#[derive(Default)]
18pub struct ReplicationUpdateVector {
19 // This sorts by time. We store the set of entry id's that are affected in an operation.
20 // Due to how replication state works, it is possibly that id's in these sets *may* not
21 // exist anymore, so these bit ranges likely need intersection with allids before use.
22 data: BptreeMap<Cid, IDLBitRange>,
23 // This sorts by Server ID. It's used for the RUV to build ranges for you ... guessed it
24 // range queries. These are used to build the set of differences that need to be sent in
25 // a replication operation.
26 //
27 // we need a way to invert the cid, but without duplication? Maybe an invert cid type?
28 // This way it still orders things in the right order by time stamp just searches by cid
29 // first.
30 ranged: BptreeMap<Uuid, BTreeSet<Duration>>,
31}
32
33/// The status of replication after investigating the RUV states.
34#[derive(Debug, PartialEq, Eq)]
35pub(crate) enum RangeDiffStatus {
36 /// Ok - can proceed with replication, supplying the following
37 /// ranges of changes to the consumer.
38 Ok(BTreeMap<Uuid, ReplCidRange>),
39 /// Refresh - The consumer is lagging and is missing a set of changes
40 /// that are required to proceed. The consumer *MUST* be refreshed
41 /// immediately.
42 Refresh {
43 lag_range: BTreeMap<Uuid, ReplCidRange>,
44 },
45 /// Unwilling - The consumer is advanced beyond our state, and supplying
46 /// changes to them may introduce inconsistency in replication. This
47 /// server should be investigated immediately.
48 Unwilling {
49 adv_range: BTreeMap<Uuid, ReplCidRange>,
50 },
51 /// Critical - The consumer is lagging and missing changes, but also is
52 /// in possession of changes advancing it beyond our current state. This
53 /// is a critical fault in replication and the topology must be
54 /// investigated immediately.
55 Critical {
56 lag_range: BTreeMap<Uuid, ReplCidRange>,
57 adv_range: BTreeMap<Uuid, ReplCidRange>,
58 },
59 /// No RUV Overlap - The consumer has likely desynchronised and no longer has
60 /// common overlap with it's RUV to ours. This can indicate it has trimmed
61 /// content we may have, or may have become part of a split brain situation.
62 /// For replication to proceed, there must be *at least* one common overlapping
63 /// point in the RUV.
64 NoRUVOverlap,
65}
66
67impl ReplicationUpdateVector {
68 pub fn write(&self) -> ReplicationUpdateVectorWriteTransaction<'_> {
69 ReplicationUpdateVectorWriteTransaction {
70 // Need to take the write first, then the read to guarantee ordering.
71 added: Some(BTreeSet::default()),
72 data: self.data.write(),
73 data_pre: self.data.read(),
74 ranged: self.ranged.write(),
75 }
76 }
77
78 pub fn read(&self) -> ReplicationUpdateVectorReadTransaction<'_> {
79 ReplicationUpdateVectorReadTransaction {
80 data: self.data.read(),
81 ranged: self.ranged.read(),
82 }
83 }
84
85 pub(crate) fn range_diff(
86 consumer_range: &BTreeMap<Uuid, ReplCidRange>,
87 supplier_range: &BTreeMap<Uuid, ReplCidRange>,
88 ) -> RangeDiffStatus {
89 // We need to build a new set of ranges that express the difference between
90 // these two states.
91 let mut diff_range = BTreeMap::default();
92 let mut lag_range = BTreeMap::default();
93 let mut adv_range = BTreeMap::default();
94
95 let mut consumer_lagging = false;
96 let mut supplier_lagging = false;
97 let mut valid_content_overlap = false;
98
99 // We need to look at each uuid in the *supplier* and assert if they are present
100 // on the *consumer*.
101 //
102 // If there are s_uuids with the same max, we don't add it to the
103 // diff
104
105 for (supplier_s_uuid, supplier_cid_range) in supplier_range.iter() {
106 match consumer_range.get(supplier_s_uuid) {
107 Some(consumer_cid_range) => {
108 // We have the same server uuid in our RUV's so some content overlap
109 // must exist (or has existed);
110 valid_content_overlap = true;
111
112 // The two windows just have to overlap. If they over lap
113 // meaning that consumer max > supplier min, then if supplier
114 // max > consumer max, then the range between consumer max
115 // and supplier max must be supplied.
116 //
117 // [ consumer min ... consumer max ]
118 // <-- [ supplier min .. supplier max ] -->
119 //
120 // In other words if we have:
121 //
122 // [ consumer min ... consumer max ]
123 // [ supplier min ... supplier max ]
124 // ^
125 // \-- no overlap of the range windows!
126 //
127 // then because there has been too much lag between consumer and
128 // the supplier then there is a risk of changes being dropped or
129 // missing. In the future we could alter this to force the resend
130 // of zero -> supplier max, but I think thought is needed to
131 // ensure no corruption in this case.
132 if consumer_cid_range.ts_max < supplier_cid_range.ts_min {
133 //
134 // [ consumer min ... consumer max ]
135 // [ supplier min ... supplier max ]
136 // ^
137 // \-- no overlap of the range windows!
138 //
139 consumer_lagging = true;
140 lag_range.insert(
141 *supplier_s_uuid,
142 ReplCidRange {
143 ts_min: supplier_cid_range.ts_min,
144 ts_max: consumer_cid_range.ts_max,
145 },
146 );
147 } else if supplier_cid_range.ts_max < consumer_cid_range.ts_min {
148 //
149 // [ consumer min ... consumer max ]
150 // [ supplier min ... supplier max ]
151 // ^
152 // \-- no overlap of the range windows!
153 //
154 // This means we can't supply because we are missing changes that the consumer
155 // has. *we* are lagging.
156 supplier_lagging = true;
157 adv_range.insert(
158 *supplier_s_uuid,
159 ReplCidRange {
160 ts_min: supplier_cid_range.ts_max,
161 ts_max: consumer_cid_range.ts_min,
162 },
163 );
164 } else if consumer_cid_range.ts_max < supplier_cid_range.ts_max {
165 //
166 // /-- consumer needs these changes
167 // v
168 // [ consumer min ... consumer max ] --> ]
169 // [ supplier min ... supplier max ]
170 // ^
171 // \-- overlap of the range windows
172 //
173 // We require the changes from consumer max -> supplier max.
174 diff_range.insert(
175 *supplier_s_uuid,
176 ReplCidRange {
177 ts_min: consumer_cid_range.ts_max,
178 ts_max: supplier_cid_range.ts_max,
179 },
180 );
181 }
182 //
183 // /-- The consumer has changes we don't have.
184 // | So we don't need to supply
185 // v
186 // [ consumer min ... consumer max ]
187 // [ supplier min ... supplier max ]
188 // ^
189 // \-- overlap of the range windows
190 //
191 // OR
192 //
193 // [ consumer min ... consumer max ]
194 // [ supplier min ... supplier max ]
195 // ^
196 // \-- the windows max is identical
197 // no actions needed
198 //
199 // In this case there is no action required since consumer_cid_range.ts_max
200 // must be greater than or equal to supplier max.
201 }
202 None => {
203 // The consumer does not have any content from this
204 // server. Select from Zero -> max of the supplier.
205 diff_range.insert(
206 *supplier_s_uuid,
207 ReplCidRange {
208 ts_min: Duration::ZERO,
209 ts_max: supplier_cid_range.ts_max,
210 },
211 );
212 }
213 }
214 }
215
216 if !valid_content_overlap {
217 return RangeDiffStatus::NoRUVOverlap;
218 }
219
220 match (consumer_lagging, supplier_lagging) {
221 (false, false) => RangeDiffStatus::Ok(diff_range),
222 (true, false) => RangeDiffStatus::Refresh { lag_range },
223 (false, true) => RangeDiffStatus::Unwilling { adv_range },
224 (true, true) => RangeDiffStatus::Critical {
225 lag_range,
226 adv_range,
227 },
228 }
229 }
230}
231
232pub struct ReplicationUpdateVectorWriteTransaction<'a> {
233 added: Option<BTreeSet<Cid>>,
234 data: BptreeMapWriteTxn<'a, Cid, IDLBitRange>,
235 data_pre: BptreeMapReadTxn<'a, Cid, IDLBitRange>,
236 ranged: BptreeMapWriteTxn<'a, Uuid, BTreeSet<Duration>>,
237}
238
239impl fmt::Debug for ReplicationUpdateVectorWriteTransaction<'_> {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 writeln!(f, "RUV DATA DUMP")?;
242 self.data
243 .iter()
244 .try_for_each(|(cid, idl)| writeln!(f, "* [{cid} {idl:?}]"))?;
245 writeln!(f, "RUV RANGE DUMP")?;
246 self.ranged
247 .iter()
248 .flat_map(|(s_uuid, ts_set)| ts_set.iter().map(|ts| Cid::new(*s_uuid, *ts)))
249 .try_for_each(|cid| writeln!(f, "[{cid}]"))
250 }
251}
252
253pub struct ReplicationUpdateVectorReadTransaction<'a> {
254 data: BptreeMapReadTxn<'a, Cid, IDLBitRange>,
255 ranged: BptreeMapReadTxn<'a, Uuid, BTreeSet<Duration>>,
256}
257
258impl fmt::Debug for ReplicationUpdateVectorReadTransaction<'_> {
259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260 writeln!(f, "RUV DATA DUMP")?;
261 self.data
262 .iter()
263 .try_for_each(|(cid, idl)| writeln!(f, "* [{cid} {idl:?}]"))?;
264 writeln!(f, "RUV RANGE DUMP")?;
265 self.ranged
266 .iter()
267 .try_for_each(|(s_uuid, ts)| writeln!(f, "* [{s_uuid} {ts:?}]"))
268 }
269}
270
271pub trait ReplicationUpdateVectorTransaction {
272 fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange>;
273
274 fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>>;
275
276 fn to_db_backup_ruv(&self) -> DbReplMeta {
277 DbReplMeta::V1 {
278 ruv: self.ruv_snapshot().keys().map(|cid| cid.into()).collect(),
279 }
280 }
281
282 /// Return a filtered view of our RUV ranges. This acts similar to "trim" where any s_uuid
283 /// where the max cid is less than trim_cid will be excluded from the view.
284 fn filter_ruv_range(
285 &self,
286 trim_cid: &Cid,
287 ) -> Result<BTreeMap<Uuid, ReplCidRange>, OperationError> {
288 self.range_snapshot()
289 .iter()
290 .filter_map(|(s_uuid, range)| match (range.first(), range.last()) {
291 (Some(first), Some(last)) => {
292 if last < &trim_cid.ts {
293 None
294 } else {
295 Some(Ok((
296 *s_uuid,
297 ReplCidRange {
298 ts_min: *first,
299 ts_max: *last,
300 },
301 )))
302 }
303 }
304 _ => {
305 error!(
306 "invalid state for server uuid {:?}, no ranges present",
307 s_uuid
308 );
309 Some(Err(OperationError::InvalidState))
310 }
311 })
312 .collect::<Result<BTreeMap<_, _>, _>>()
313 }
314
315 /// Return the complete set of RUV ranges present on this replica
316 fn current_ruv_range(&self) -> Result<BTreeMap<Uuid, ReplCidRange>, OperationError> {
317 self.range_snapshot()
318 .iter()
319 .map(|(s_uuid, range)| match (range.first(), range.last()) {
320 (Some(first), Some(last)) => Ok((
321 *s_uuid,
322 ReplCidRange {
323 ts_min: *first,
324 ts_max: *last,
325 },
326 )),
327 _ => {
328 error!(
329 "invalid state for server uuid {:?}, no ranges present",
330 s_uuid
331 );
332 Err(OperationError::InvalidState)
333 }
334 })
335 .collect::<Result<BTreeMap<_, _>, _>>()
336 }
337
338 fn range_to_idl(&self, ctx_ranges: &BTreeMap<Uuid, ReplCidRange>) -> IDLBitRange {
339 let mut idl = IDLBitRange::new();
340 // Force the set to be compressed, saves on seeks during inserts.
341 idl.compress();
342 let range = self.range_snapshot();
343 let ruv = self.ruv_snapshot();
344
345 // The range we have has a collection of s_uuid containing low -> high ranges.
346 // We need to convert this to absolute ranges of all the idlbitranges that
347 // relate to the entries we have.
348
349 for (s_uuid, ctx_range) in ctx_ranges {
350 // For each server and range low to high, iterate over
351 // the list of CID's in the main RUV.
352
353 let Some(ruv_range) = range.get(s_uuid) else {
354 // This is valid because if we clean up a server range on
355 // this node, but the other server isn't aware yet, so we
356 // just no-op this. The changes we have will still be
357 // correctly found and sent.
358 debug!(?s_uuid, "range not found in ruv.");
359 continue;
360 };
361
362 // Get from the min to the max. Unbounded and
363 // Included(ctx_range.ts_max) are the same in
364 // this context.
365 for ts in ruv_range.range((Excluded(ctx_range.ts_min), Unbounded)) {
366 let cid = Cid {
367 ts: *ts,
368 s_uuid: *s_uuid,
369 };
370
371 if let Some(ruv_idl) = ruv.get(&cid) {
372 ruv_idl.into_iter().for_each(|id| idl.insert_id(id))
373 }
374 // If the cid isn't found, it may have been trimmed, but that's okay. A cid in
375 // a range can be trimmed if all entries of that cid have since tombstoned so
376 // no longer need to be applied in change ranges.
377 }
378 }
379
380 idl
381 }
382
383 fn verify(
384 &self,
385 entries: &[Arc<EntrySealedCommitted>],
386 results: &mut Vec<Result<(), ConsistencyError>>,
387 ) {
388 // Okay rebuild the RUV in parallel.
389 let mut check_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::default();
390 for entry in entries {
391 // The DB id we need.
392 let eid = entry.get_id();
393 let ecstate = entry.get_changestate();
394 // We don't need the details of the change - only the cid of the
395 // change that this entry was involved in.
396 for cid in ecstate.cid_iter() {
397 // Add to the main ruv data.
398 if let Some(idl) = check_ruv.get_mut(cid) {
399 // We can't guarantee id order, so we have to do this properly.
400 idl.insert_id(eid);
401 } else {
402 let mut idl = IDLBitRange::new();
403 idl.insert_id(eid);
404 check_ruv.insert(cid.clone(), idl);
405 }
406 }
407 }
408
409 trace!(?check_ruv);
410 // Get the current state
411 let snapshot_ruv = self.ruv_snapshot();
412
413 // Now compare. We want to do this checking for each CID in each, and then asserting
414 // the content is the same.
415
416 let mut check_iter = check_ruv.iter();
417 let mut snap_iter = snapshot_ruv.iter();
418
419 let mut check_next = check_iter.next();
420 let mut snap_next = snap_iter.next();
421
422 while let (Some((ck, cv)), Some((sk, sv))) = (&check_next, &snap_next) {
423 match ck.cmp(sk) {
424 Ordering::Equal => {
425 // Counter intuitive, but here we check that the check set is a *subset*
426 // of the ruv snapshot. This is because when we have an entry that is
427 // tombstoned, all it's CID interactions are "lost" and it's cid becomes
428 // that of when it was tombstoned. So the "rebuilt" ruv will miss that
429 // entry.
430 //
431 // In the future the RUV concept may be ditched entirely anyway, thoughts needed.
432 let intersect = *cv & *sv;
433 if *cv == &intersect {
434 trace!("{:?} is consistent!", ck);
435 } else {
436 error!("{:?} is NOT consistent! IDL's differ", ck);
437 debug_assert!(false);
438 results.push(Err(ConsistencyError::RuvInconsistent(ck.to_string())));
439 }
440 check_next = check_iter.next();
441 snap_next = snap_iter.next();
442 }
443 // Because we are zipping between these two sets, we only need to compare when
444 // the CID's are equal. Otherwise we need the other iter to "catch up"
445 Ordering::Less => {
446 check_next = check_iter.next();
447 }
448 Ordering::Greater => {
449 snap_next = snap_iter.next();
450 }
451 }
452 }
453
454 while let Some((ck, _cv)) = &check_next {
455 debug!("{:?} may not be consistent! CID missing from RUV", ck);
456 // debug_assert!(false);
457 // results.push(Err(ConsistencyError::RuvInconsistent(ck.to_string())));
458 check_next = check_iter.next();
459 }
460
461 while let Some((sk, _sv)) = &snap_next {
462 debug!(
463 "{:?} may not be consistent! CID should not exist in RUV",
464 sk
465 );
466 // debug_assert!(false);
467 // results.push(Err(ConsistencyError::RuvInconsistent(sk.to_string())));
468 snap_next = snap_iter.next();
469 }
470
471 // Assert that the content of the ranged set matches the data set and has the
472 // correct set of values.
473 let snapshot_range = self.range_snapshot();
474
475 for cid in snapshot_ruv.keys() {
476 if let Some(server_range) = snapshot_range.get(&cid.s_uuid) {
477 if !server_range.contains(&cid.ts) {
478 warn!(
479 "{:?} is NOT consistent! server range is missing cid in index",
480 cid
481 );
482 debug_assert!(false);
483 results.push(Err(ConsistencyError::RuvInconsistent(
484 cid.s_uuid.to_string(),
485 )));
486 }
487 } else {
488 warn!(
489 "{:?} is NOT consistent! server range is not present",
490 cid.s_uuid
491 );
492 debug_assert!(false);
493 results.push(Err(ConsistencyError::RuvInconsistent(
494 cid.s_uuid.to_string(),
495 )));
496 }
497 }
498
499 // Done!
500 }
501
502 fn get_anchored_ranges(
503 &self,
504 ranges: BTreeMap<Uuid, ReplCidRange>,
505 ) -> Result<BTreeMap<Uuid, ReplAnchoredCidRange>, OperationError> {
506 let self_range_snapshot = self.range_snapshot();
507
508 ranges
509 .into_iter()
510 .map(|(s_uuid, ReplCidRange { ts_min, ts_max })| {
511 let ts_range = self_range_snapshot.get(&s_uuid).ok_or_else(|| {
512 error!(
513 ?s_uuid,
514 "expected cid range for server in ruv, was not present"
515 );
516 OperationError::InvalidState
517 })?;
518
519 // If these are equal and excluded, btreeset panics
520 let anchors = if ts_max > ts_min {
521 // We exclude the ends because these are already in the ts_min/max
522 ts_range
523 .range((Excluded(ts_min), Excluded(ts_max)))
524 .copied()
525 .collect::<Vec<_>>()
526 } else {
527 Vec::with_capacity(0)
528 };
529
530 Ok((
531 s_uuid,
532 ReplAnchoredCidRange {
533 ts_min,
534 anchors,
535 ts_max,
536 },
537 ))
538 })
539 .collect()
540 }
541}
542
543impl ReplicationUpdateVectorTransaction for ReplicationUpdateVectorWriteTransaction<'_> {
544 fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange> {
545 self.data.to_snapshot()
546 }
547
548 fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>> {
549 self.ranged.to_snapshot()
550 }
551}
552
553impl ReplicationUpdateVectorTransaction for ReplicationUpdateVectorReadTransaction<'_> {
554 fn ruv_snapshot(&self) -> BptreeMapReadSnapshot<'_, Cid, IDLBitRange> {
555 self.data.to_snapshot()
556 }
557
558 fn range_snapshot(&self) -> BptreeMapReadSnapshot<'_, Uuid, BTreeSet<Duration>> {
559 self.ranged.to_snapshot()
560 }
561}
562
563impl ReplicationUpdateVectorWriteTransaction<'_> {
564 pub fn clear(&mut self) {
565 self.added = None;
566 self.data.clear();
567 self.ranged.clear();
568 }
569
570 pub(crate) fn incremental_preflight_validate_ruv(
571 &self,
572 ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
573 txn_cid: &Cid,
574 ) -> Result<(), OperationError> {
575 // Check that the incoming ranges, for our servers id, do not exceed
576 // our servers max state. This can occur if we restore from a backup
577 // where the replication state is older than what our partners have,
578 // meaning that the context may have diverged in a way we can't then
579 // resolve.
580
581 if let Some(our_cid_range_max) = self
582 .ranged
583 .get(&txn_cid.s_uuid)
584 .and_then(|range| range.last().copied())
585 {
586 if let Some(incoming_cid_range) = ctx_ranges.get(&txn_cid.s_uuid) {
587 if incoming_cid_range.ts_max > our_cid_range_max {
588 error!("The incoming data contains changes matching this server's UUID, and those changes are newer than the local version. This can occur if the server was restored from a backup which was taken before sending out changes. Replication is unable to proceed as data corruption may occur. You must refresh this consumer immediately to continue.");
589 return Err(OperationError::ReplServerUuidSplitDataState);
590 }
591 }
592 }
593
594 let warn_time = txn_cid.ts + REPL_SUPPLIER_ADVANCE_WINDOW;
595 for (s_uuid, incoming_cid_range) in ctx_ranges.iter() {
596 if incoming_cid_range.ts_max > warn_time {
597 // TODO: This would be a great place for fault management to pick up this warning
598 warn!(
599 "Incoming changes from {:?} are further than {} seconds in the future.",
600 s_uuid,
601 REPL_SUPPLIER_ADVANCE_WINDOW.as_secs()
602 );
603 }
604 }
605
606 Ok(())
607 }
608
609 pub(crate) fn refresh_validate_ruv(
610 &self,
611 ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
612 ) -> Result<(), OperationError> {
613 // Assert that the ruv that currently exists, is a valid data set of
614 // the supplied consumer range - especially check that when a uuid exists in
615 // our ruv, that it's maximum matches the ctx ruv.
616 //
617 // Since the ctx range comes from the supplier, when we rebuild due to the
618 // state machine then some values may not exist since they were replaced
619 // or updated. It's also possible that the imported range maximums *may not*
620 // exist especially in three way replication scenarios where S1:A was the S1
621 // maximum but is replaced by S2:B. This would make S1:A still it's valid
622 // maximum but no entry reflects that in it's change state.
623 let mut valid = true;
624
625 for (ctx_server_uuid, ctx_server_range) in ctx_ranges.iter() {
626 match self.ranged.get(ctx_server_uuid) {
627 Some(server_range) => {
628 let ctx_ts = &ctx_server_range.ts_max;
629 match server_range.last() {
630 Some(s_ts) if s_ts <= ctx_ts => {
631 // Ok - our entries reflect maximum or earlier.
632 trace!(?ctx_server_uuid, ?ctx_ts, ?s_ts, "valid");
633 }
634 Some(s_ts) => {
635 valid = false;
636 warn!(?ctx_server_uuid, ?ctx_ts, ?s_ts, "inconsistent s_uuid in ruv, consumer ruv is advanced past supplier");
637 }
638 None => {
639 valid = false;
640 warn!(
641 ?ctx_server_uuid,
642 ?ctx_ts,
643 "inconsistent server range in ruv, no maximum ts found for s_uuid"
644 );
645 }
646 }
647 }
648 None => {
649 // valid = false;
650 trace!(
651 ?ctx_server_uuid,
652 "s_uuid absent from ranged ruv, possible that changes have been expired"
653 );
654 }
655 }
656 }
657
658 if valid {
659 Ok(())
660 } else {
661 Err(OperationError::ReplInvalidRUVState)
662 }
663 }
664
665 #[instrument(level = "trace", name = "ruv::refresh_update_ruv", skip_all)]
666 pub(crate) fn refresh_update_ruv(
667 &mut self,
668 ctx_ranges: &BTreeMap<Uuid, ReplAnchoredCidRange>,
669 ) -> Result<(), OperationError> {
670 // Previously this would just add in the ranges, and then the actual entries
671 // from the changestate would populate the data/ranges. Now we add empty idls
672 // to each of these so that they are db persisted allowing ruv reload.
673 for (ctx_s_uuid, ctx_range) in ctx_ranges.iter() {
674 let cid_iter = std::iter::once(&ctx_range.ts_min)
675 .chain(ctx_range.anchors.iter())
676 .chain(std::iter::once(&ctx_range.ts_max))
677 .map(|ts| Cid::new(*ctx_s_uuid, *ts));
678
679 for cid in cid_iter {
680 self.insert_change(&cid, IDLBitRange::default())?;
681 }
682 }
683
684 Ok(())
685 }
686
687 /// Restore the ruv from a DB backup. It's important to note here that
688 /// we don't actually need to restore and of the IDL's in the process. we only
689 /// needs the CID's of the changes/points in time. This is because when the
690 /// db entries are restored, their changesets will re-populate the data that we
691 /// need in the RUV at these points. The reason we need these ranges without IDL
692 /// is so that trim and replication works properly.
693 #[instrument(level = "debug", name = "ruv::restore", skip_all)]
694 pub(crate) fn restore<I>(&mut self, iter: I) -> Result<(), OperationError>
695 where
696 I: IntoIterator<Item = Cid>,
697 {
698 let mut rebuild_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::new();
699 let mut rebuild_range: BTreeMap<Uuid, BTreeSet<Duration>> = BTreeMap::default();
700
701 for cid in iter {
702 if !rebuild_ruv.contains_key(&cid) {
703 let idl = IDLBitRange::new();
704 rebuild_ruv.insert(cid.clone(), idl);
705 }
706
707 if let Some(server_range) = rebuild_range.get_mut(&cid.s_uuid) {
708 server_range.insert(cid.ts);
709 } else {
710 let mut ts_range = BTreeSet::default();
711 ts_range.insert(cid.ts);
712 rebuild_range.insert(cid.s_uuid, ts_range);
713 }
714 }
715
716 self.data.extend(rebuild_ruv);
717 self.ranged.extend(rebuild_range);
718
719 Ok(())
720 }
721
722 #[instrument(level = "debug", name = "ruv::rebuild", skip_all)]
723 pub fn rebuild(&mut self, entries: &[Arc<EntrySealedCommitted>]) -> Result<(), OperationError> {
724 // Entries and their internal changestates are the "source of truth" for all changes
725 // that have ever occurred and are stored on this server. So we use them to rebuild our RUV
726 // here!
727 //
728 // We only update RUV items where an anchor exists.
729
730 // let mut rebuild_ruv: BTreeMap<Cid, IDLBitRange> = BTreeMap::new();
731 // let mut rebuild_range: BTreeMap<Uuid, BTreeSet<Duration>> = BTreeMap::default();
732
733 for entry in entries {
734 // The DB id we need.
735 let eid = entry.get_id();
736 let ecstate = entry.get_changestate();
737 // We don't need the details of the change - only the cid of the
738 // change that this entry was involved in.
739 for cid in ecstate.cid_iter() {
740 // if let Some(idl) = rebuild_ruv.get_mut(cid) {
741 if let Some(idl) = self.data.get_mut(cid) {
742 // We can't guarantee id order, so we have to do this properly.
743 idl.insert_id(eid);
744 /*
745 } else {
746 let mut idl = IDLBitRange::new();
747 idl.insert_id(eid);
748 rebuild_ruv.insert(cid.clone(), idl);
749 */
750 }
751
752 /*
753 if let Some(server_range) = rebuild_range.get_mut(&cid.s_uuid) {
754 server_range.insert(cid.ts);
755 } else {
756 let mut ts_range = BTreeSet::default();
757 ts_range.insert(cid.ts);
758 rebuild_range.insert(cid.s_uuid, ts_range);
759 }
760 */
761 }
762 }
763
764 // Finally, we need to do a cleanup/compact of the IDL's if possible.
765 self.data.range_mut(..).for_each(|(_k, idl)| {
766 idl.maybe_compress();
767 });
768
769 // self.data.extend(rebuild_ruv);
770
771 // Warning - you can't extend here because this is keyed by UUID. You need
772 // to go through each key and then merge the sets.
773
774 /*
775 rebuild_range.into_iter().for_each(|(s_uuid, ts_set)| {
776 if let Some(ex_ts_set) = self.ranged.get_mut(&s_uuid) {
777 ex_ts_set.extend(ts_set)
778 } else {
779 self.ranged.insert(s_uuid, ts_set);
780 }
781 });
782 */
783
784 Ok(())
785 }
786
787 pub fn insert_change(&mut self, cid: &Cid, idl: IDLBitRange) -> Result<(), OperationError> {
788 // Remember, in a transaction the changes can be updated multiple times.
789 if let Some(ex_idl) = self.data.get_mut(cid) {
790 // This ensures both sets have all the available ids.
791 let idl = ex_idl as &_ | &idl;
792 *ex_idl = idl;
793 } else {
794 self.data.insert(cid.clone(), idl);
795 }
796
797 if let Some(server_range) = self.ranged.get_mut(&cid.s_uuid) {
798 server_range.insert(cid.ts);
799 } else {
800 let mut range = BTreeSet::default();
801 range.insert(cid.ts);
802 self.ranged.insert(cid.s_uuid, range);
803 }
804
805 if let Some(added) = &mut self.added {
806 added.insert(cid.clone());
807 }
808
809 Ok(())
810 }
811
812 pub fn update_entry_changestate(
813 &mut self,
814 entry: &EntrySealedCommitted,
815 ) -> Result<(), OperationError> {
816 let eid = entry.get_id();
817 let ecstate = entry.get_changestate();
818
819 trace!("Updating ruv state from entry {}", eid);
820 trace!(?ecstate);
821
822 for cid in ecstate.cid_iter() {
823 if let Some(idl) = self.data.get_mut(cid) {
824 // We can't guarantee id order, so we have to do this properly.
825 idl.insert_id(eid);
826 } else {
827 let mut idl = IDLBitRange::new();
828 idl.insert_id(eid);
829 self.data.insert(cid.clone(), idl);
830 }
831
832 if let Some(server_range) = self.ranged.get_mut(&cid.s_uuid) {
833 server_range.insert(cid.ts);
834 } else {
835 let mut ts_range = BTreeSet::default();
836 ts_range.insert(cid.ts);
837 self.ranged.insert(cid.s_uuid, ts_range);
838 }
839 }
840
841 Ok(())
842 }
843
844 pub fn ruv_idls(&self) -> IDLBitRange {
845 let mut idl = IDLBitRange::new();
846 self.data.iter().for_each(|(_cid, ex_idl)| {
847 idl = ex_idl as &_ | &idl;
848 });
849 idl
850 }
851
852 /*
853 How to handle changelog trimming? If we trim a server out from the RUV as a whole, we
854 need to be sure we don't oversupply changes the consumer already has. How can we do
855 this cleanly? Or do we just deal with it because our own local trim will occur soon after?
856
857 The situation would be
858
859 A: 1 -> 3
860 B: 1 -> 3
861
862 Assuming A trims first:
863
864 A:
865 B: 1 -> 3
866
867 Then on A <- B, B would try to supply 1->3 to A assuming it is not present. However,
868 the trim would occur soon after on B causing:
869
870 A:
871 B:
872
873 And then the supply would stop. So either A needs to retain the max/min in it's range
874 to allow the comparison here to continue even if it's ruv is cleaned. Or, we need to
875 have a delayed trim on the range that is 2x the normal trim range to give a buffer?
876
877 Mostly longer ruv/cid ranges aren't an issue for us, so could we just make these ranges
878 really large?
879
880 NOTE: For now we do NOT trim out max CID's of any s_uuid so that we don't have to confront
881 this edge case yet.
882
883 // == RESOLVED: Previously this was a problem as the CID ranges of any node may not be a
884 // complete view of all CID's that existed on any other node. Now with anchors in replication
885 // this changes and we have not only a complete view of all CID's that were created, but
886 // tombstone purge always create an empty anchor so the RUV always advances. This way we
887 // have a stronger assurance about which servers are live and which are not.
888 */
889
890 // Problem Cases
891
892 /*
893 What about generations? There is a "live" generation which can be replicated and a
894 former generation of ranges that previously existed. To replicate:
895 // The consumer must have content within the current live range.
896 consumer.live_max < supplier.live_max
897 consumer.live_max >= supplier.live_min
898 // The consumer must have all content that was formerly known.
899 consumer.live_min >= supplier.former_max
900 // I don't think we care what
901
902
903 // == RESOLVED: Anchors give us the generations that existed previously without us
904 // needing to worry about this.
905 */
906
907 pub fn trim_up_to(&mut self, cid: &Cid) -> Result<IDLBitRange, OperationError> {
908 trace!(trim_up_to_cid = ?cid);
909 let mut idl = IDLBitRange::new();
910 let mut remove_suuid = Vec::with_capacity(0);
911
912 // Here we can use the for_each here to be trimming the
913 // range set since that is not ordered by time, we need
914 // to do fragmented searches over this no matter what we
915 // try to do.
916
917 for (cid, ex_idl) in self.data.range((Unbounded, Excluded(cid))) {
918 trace!(?cid, "examining for RUV removal");
919 idl = ex_idl as &_ | &idl;
920
921 // Remove the reverse version of the cid from the ranged index.
922 match self.ranged.get_mut(&cid.s_uuid) {
923 Some(server_range) => {
924 // Remove returns a bool if the element WAS present.
925 if !server_range.remove(&cid.ts) {
926 error!("Impossible State - The RUV is corrupted due to missing sid:ts pair in ranged index");
927 error!(ruv = ?self);
928 error!(?cid);
929 return Err(OperationError::InvalidState);
930 }
931
932 if server_range.is_empty() {
933 remove_suuid.push(cid.s_uuid);
934 warn!(s_uuid = ?cid.s_uuid, "disconnected server detected - this will be removed!");
935 } else {
936 trace!(?server_range, "retaining server");
937 }
938 }
939 None => {
940 error!("Impossible State - The RUV is corrupted due to missing sid in ranged index");
941 error!(ruv = ?self);
942 error!(?cid);
943 return Err(OperationError::InvalidState);
944 }
945 }
946 }
947
948 // We can now remove old server id's because we have a reliable liveness check in the
949 // method of anchors being transmissed during replication. If a server is offline for
950 // an extended period, it will not have created any anchors, and it will eventually become
951 // empty in the data range. This allow it to be trimmed out.
952 for s_uuid in remove_suuid {
953 let x = self.ranged.remove(&s_uuid);
954 assert!(x.map(|y| y.is_empty()).unwrap_or(false))
955 }
956
957 // Trim all cid's up to this value, and return the range of IDs
958 // that are affected.
959 self.data.split_off_lt(cid);
960
961 Ok(idl)
962 }
963
964 pub fn added(&self) -> Box<dyn Iterator<Item = Cid> + '_> {
965 if let Some(added) = self.added.as_ref() {
966 // return what was added this txn. We previously would iterate
967 // from data_pre.max() with data, but if an anchor was added that
968 // pre-dated data_pre.max() it wouldn't be committed to the db ruv
969 // (even though it was in the in memory ruv).
970 Box::new(added.iter().map(|cid| {
971 debug!(added_cid = ?cid);
972 cid.clone()
973 }))
974 } else {
975 // We have been cleared during this txn, so everything in data is
976 // added.
977 Box::new(self.data.iter().map(|(cid, _)| {
978 debug!(added_cid = ?cid);
979 cid.clone()
980 }))
981 }
982 }
983
984 pub fn removed(&self) -> impl Iterator<Item = Cid> + '_ {
985 let prev_bound = if self.added.is_none() {
986 // We have been cleared during this txn, so everything in pre is
987 // removed.
988 Unbounded
989 } else if let Some((min, _)) = self.data.first_key_value() {
990 Excluded(min.clone())
991 } else {
992 // If empty, assume everything is removed.
993 Unbounded
994 };
995
996 // iterate through our previous data to find what has been removed given
997 // the ranges determined above.
998 self.data_pre
999 .range((Unbounded, prev_bound))
1000 .map(|(cid, _)| {
1001 debug!(removed_cid = ?cid);
1002 cid.clone()
1003 })
1004 }
1005
1006 pub fn commit(self) {
1007 self.data.commit();
1008 self.ranged.commit();
1009 }
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014 use super::RangeDiffStatus;
1015 use super::ReplCidRange;
1016 use super::ReplicationUpdateVector;
1017 use std::collections::BTreeMap;
1018 use std::time::Duration;
1019
1020 const UUID_A: uuid::Uuid = uuid::uuid!("13b530b0-efdd-4934-8fb7-9c35c8aab79e");
1021 const UUID_B: uuid::Uuid = uuid::uuid!("16327cf8-6a34-4a17-982c-b2eaa6d02d00");
1022 const UUID_C: uuid::Uuid = uuid::uuid!("2ed717e3-15be-41e6-b966-10a1f6d7ea1c");
1023
1024 #[test]
1025 fn test_ruv_range_diff_1() {
1026 let ctx_a = BTreeMap::default();
1027 let ctx_b = BTreeMap::default();
1028
1029 let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1030 let expect = RangeDiffStatus::NoRUVOverlap;
1031 assert_eq!(result, expect);
1032
1033 // Test the inverse.
1034 let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1035 let expect = RangeDiffStatus::NoRUVOverlap;
1036 assert_eq!(result, expect);
1037 }
1038
1039 #[test]
1040 fn test_ruv_range_diff_2() {
1041 let ctx_a = btreemap!((
1042 UUID_A,
1043 ReplCidRange {
1044 ts_min: Duration::from_secs(1),
1045 ts_max: Duration::from_secs(3),
1046 }
1047 ));
1048 let ctx_b = BTreeMap::default();
1049
1050 let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1051 let expect = RangeDiffStatus::NoRUVOverlap;
1052 assert_eq!(result, expect);
1053
1054 let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1055 let expect = RangeDiffStatus::NoRUVOverlap;
1056 assert_eq!(result, expect);
1057 }
1058
1059 #[test]
1060 fn test_ruv_range_diff_3() {
1061 let ctx_a = btreemap!((
1062 UUID_A,
1063 ReplCidRange {
1064 ts_min: Duration::from_secs(1),
1065 ts_max: Duration::from_secs(3),
1066 }
1067 ));
1068 let ctx_b = btreemap!((
1069 UUID_A,
1070 ReplCidRange {
1071 ts_min: Duration::from_secs(1),
1072 ts_max: Duration::from_secs(3),
1073 }
1074 ));
1075
1076 let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1077 let expect = RangeDiffStatus::Ok(BTreeMap::default());
1078 assert_eq!(result, expect);
1079
1080 let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1081 let expect = RangeDiffStatus::Ok(BTreeMap::default());
1082 assert_eq!(result, expect);
1083 }
1084
1085 #[test]
1086 fn test_ruv_range_diff_4() {
1087 let ctx_a = btreemap!((
1088 UUID_A,
1089 ReplCidRange {
1090 ts_min: Duration::from_secs(1),
1091 ts_max: Duration::from_secs(3),
1092 }
1093 ));
1094 let ctx_b = btreemap!((
1095 UUID_A,
1096 ReplCidRange {
1097 ts_min: Duration::from_secs(1),
1098 ts_max: Duration::from_secs(4),
1099 }
1100 ));
1101
1102 let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1103 let expect = RangeDiffStatus::Ok(btreemap!((
1104 UUID_A,
1105 ReplCidRange {
1106 ts_min: Duration::from_secs(3),
1107 ts_max: Duration::from_secs(4),
1108 }
1109 )));
1110 assert_eq!(result, expect);
1111
1112 let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1113 let expect = RangeDiffStatus::Ok(BTreeMap::default());
1114 assert_eq!(result, expect);
1115 }
1116
1117 #[test]
1118 fn test_ruv_range_diff_5() {
1119 let ctx_a = btreemap!((
1120 UUID_A,
1121 ReplCidRange {
1122 ts_min: Duration::from_secs(5),
1123 ts_max: Duration::from_secs(7),
1124 }
1125 ));
1126 let ctx_b = btreemap!((
1127 UUID_A,
1128 ReplCidRange {
1129 ts_min: Duration::from_secs(1),
1130 ts_max: Duration::from_secs(4),
1131 }
1132 ));
1133
1134 let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1135 let expect = RangeDiffStatus::Unwilling {
1136 adv_range: btreemap!((
1137 UUID_A,
1138 ReplCidRange {
1139 ts_min: Duration::from_secs(4),
1140 ts_max: Duration::from_secs(5),
1141 }
1142 )),
1143 };
1144 assert_eq!(result, expect);
1145
1146 let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1147 let expect = RangeDiffStatus::Refresh {
1148 lag_range: btreemap!((
1149 UUID_A,
1150 ReplCidRange {
1151 ts_min: Duration::from_secs(5),
1152 ts_max: Duration::from_secs(4),
1153 }
1154 )),
1155 };
1156 assert_eq!(result, expect);
1157 }
1158
1159 #[test]
1160 fn test_ruv_range_diff_6() {
1161 let ctx_a = btreemap!((
1162 UUID_A,
1163 ReplCidRange {
1164 ts_min: Duration::from_secs(1),
1165 ts_max: Duration::from_secs(4),
1166 }
1167 ));
1168 let ctx_b = btreemap!(
1169 (
1170 UUID_A,
1171 ReplCidRange {
1172 ts_min: Duration::from_secs(1),
1173 ts_max: Duration::from_secs(3),
1174 }
1175 ),
1176 (
1177 UUID_B,
1178 ReplCidRange {
1179 ts_min: Duration::from_secs(2),
1180 ts_max: Duration::from_secs(4),
1181 }
1182 )
1183 );
1184
1185 let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1186 let expect = RangeDiffStatus::Ok(btreemap!((
1187 UUID_B,
1188 ReplCidRange {
1189 ts_min: Duration::ZERO,
1190 ts_max: Duration::from_secs(4),
1191 }
1192 )));
1193 assert_eq!(result, expect);
1194
1195 let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1196 let expect = RangeDiffStatus::Ok(btreemap!((
1197 UUID_A,
1198 ReplCidRange {
1199 ts_min: Duration::from_secs(3),
1200 ts_max: Duration::from_secs(4),
1201 }
1202 )));
1203 assert_eq!(result, expect);
1204 }
1205
1206 #[test]
1207 fn test_ruv_range_diff_7() {
1208 let ctx_a = btreemap!(
1209 (
1210 UUID_A,
1211 ReplCidRange {
1212 ts_min: Duration::from_secs(1),
1213 ts_max: Duration::from_secs(4),
1214 }
1215 ),
1216 (
1217 UUID_C,
1218 ReplCidRange {
1219 ts_min: Duration::from_secs(2),
1220 ts_max: Duration::from_secs(5),
1221 }
1222 )
1223 );
1224 let ctx_b = btreemap!(
1225 (
1226 UUID_A,
1227 ReplCidRange {
1228 ts_min: Duration::from_secs(1),
1229 ts_max: Duration::from_secs(3),
1230 }
1231 ),
1232 (
1233 UUID_B,
1234 ReplCidRange {
1235 ts_min: Duration::from_secs(2),
1236 ts_max: Duration::from_secs(4),
1237 }
1238 ),
1239 (
1240 UUID_C,
1241 ReplCidRange {
1242 ts_min: Duration::from_secs(3),
1243 ts_max: Duration::from_secs(4),
1244 }
1245 )
1246 );
1247
1248 let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1249 let expect = RangeDiffStatus::Ok(btreemap!((
1250 UUID_B,
1251 ReplCidRange {
1252 ts_min: Duration::ZERO,
1253 ts_max: Duration::from_secs(4),
1254 }
1255 )));
1256 assert_eq!(result, expect);
1257
1258 let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1259 let expect = RangeDiffStatus::Ok(btreemap!(
1260 (
1261 UUID_A,
1262 ReplCidRange {
1263 ts_min: Duration::from_secs(3),
1264 ts_max: Duration::from_secs(4),
1265 }
1266 ),
1267 (
1268 UUID_C,
1269 ReplCidRange {
1270 ts_min: Duration::from_secs(4),
1271 ts_max: Duration::from_secs(5),
1272 }
1273 )
1274 ));
1275 assert_eq!(result, expect);
1276 }
1277
1278 #[test]
1279 fn test_ruv_range_diff_8() {
1280 let ctx_a = btreemap!(
1281 (
1282 UUID_A,
1283 ReplCidRange {
1284 ts_min: Duration::from_secs(4),
1285 ts_max: Duration::from_secs(6),
1286 }
1287 ),
1288 (
1289 UUID_B,
1290 ReplCidRange {
1291 ts_min: Duration::from_secs(1),
1292 ts_max: Duration::from_secs(2),
1293 }
1294 )
1295 );
1296 let ctx_b = btreemap!(
1297 (
1298 UUID_A,
1299 ReplCidRange {
1300 ts_min: Duration::from_secs(1),
1301 ts_max: Duration::from_secs(2),
1302 }
1303 ),
1304 (
1305 UUID_B,
1306 ReplCidRange {
1307 ts_min: Duration::from_secs(4),
1308 ts_max: Duration::from_secs(6),
1309 }
1310 )
1311 );
1312
1313 let result = ReplicationUpdateVector::range_diff(&ctx_a, &ctx_b);
1314 let expect = RangeDiffStatus::Critical {
1315 adv_range: btreemap!((
1316 UUID_A,
1317 ReplCidRange {
1318 ts_min: Duration::from_secs(2),
1319 ts_max: Duration::from_secs(4),
1320 }
1321 )),
1322 lag_range: btreemap!((
1323 UUID_B,
1324 ReplCidRange {
1325 ts_min: Duration::from_secs(4),
1326 ts_max: Duration::from_secs(2),
1327 }
1328 )),
1329 };
1330 assert_eq!(result, expect);
1331
1332 let result = ReplicationUpdateVector::range_diff(&ctx_b, &ctx_a);
1333 let expect = RangeDiffStatus::Critical {
1334 adv_range: btreemap!((
1335 UUID_B,
1336 ReplCidRange {
1337 ts_min: Duration::from_secs(2),
1338 ts_max: Duration::from_secs(4),
1339 }
1340 )),
1341 lag_range: btreemap!((
1342 UUID_A,
1343 ReplCidRange {
1344 ts_min: Duration::from_secs(4),
1345 ts_max: Duration::from_secs(2),
1346 }
1347 )),
1348 };
1349 assert_eq!(result, expect);
1350 }
1351}