kanidmd_lib/repl/
proto.rs

1use super::cid::Cid;
2use super::entry::EntryChangeState;
3use super::entry::State;
4use crate::be::dbvalue::DbValueSetV2;
5use crate::entry::Eattrs;
6use crate::prelude::*;
7use crate::schema::{SchemaReadTransaction, SchemaTransaction};
8use crate::valueset;
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeMap;
11use std::fmt;
12
13pub enum ConsumerState {
14    Ok,
15    RefreshRequired,
16}
17
18#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
19pub struct ReplCidV1 {
20    #[serde(rename = "t")]
21    pub ts: Duration,
22    #[serde(rename = "s")]
23    pub s_uuid: Uuid,
24}
25
26// From / Into CID
27impl From<&Cid> for ReplCidV1 {
28    fn from(cid: &Cid) -> Self {
29        ReplCidV1 {
30            ts: cid.ts,
31            s_uuid: cid.s_uuid,
32        }
33    }
34}
35
36impl From<ReplCidV1> for Cid {
37    fn from(cid: ReplCidV1) -> Self {
38        Cid {
39            ts: cid.ts,
40            s_uuid: cid.s_uuid,
41        }
42    }
43}
44
45impl From<&ReplCidV1> for Cid {
46    fn from(cid: &ReplCidV1) -> Self {
47        Cid {
48            ts: cid.ts,
49            s_uuid: cid.s_uuid,
50        }
51    }
52}
53
54/// An anchored CID range. This contains a minimum and maximum range of CID times for a server,
55/// and also includes the list of all CIDs that occur between those two points. This allows these
56/// extra change "anchors" to be injected into the consumer RUV during an incremental. Once
57/// inserted, these anchors prevent RUV trimming from creating "jumps" due to idle servers.
58#[derive(Serialize, Deserialize, PartialEq, Eq)]
59pub struct ReplAnchoredCidRange {
60    #[serde(rename = "m")]
61    pub ts_min: Duration,
62    #[serde(rename = "a", default)]
63    pub anchors: Vec<Duration>,
64    #[serde(rename = "x")]
65    pub ts_max: Duration,
66}
67
68impl fmt::Debug for ReplAnchoredCidRange {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        write!(
71            f,
72            "{:032} --{}-> {:032}",
73            self.ts_min.as_nanos(),
74            self.anchors.len(),
75            self.ts_max.as_nanos()
76        )
77    }
78}
79
80/// A CID range. This contains the minimum and maximum values of a range. This is used for
81/// querying the RUV to select all elements in this range.
82#[derive(Serialize, Deserialize, PartialEq, Eq)]
83pub struct ReplCidRange {
84    #[serde(rename = "m")]
85    pub ts_min: Duration,
86    #[serde(rename = "x")]
87    pub ts_max: Duration,
88}
89
90impl fmt::Debug for ReplCidRange {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        write!(
93            f,
94            "{:032} -> {:032}",
95            self.ts_min.as_nanos(),
96            self.ts_max.as_nanos()
97        )
98    }
99}
100
101#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
102pub enum ReplRuvRange {
103    V1 {
104        domain_uuid: Uuid,
105        ranges: BTreeMap<Uuid, ReplCidRange>,
106    },
107}
108
109impl ReplRuvRange {
110    pub fn is_empty(&self) -> bool {
111        match self {
112            ReplRuvRange::V1 { ranges, .. } => ranges.is_empty(),
113        }
114    }
115}
116
117#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
118pub struct ReplAttrStateV1 {
119    cid: ReplCidV1,
120    attr: Option<DbValueSetV2>,
121}
122
123#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
124pub enum ReplStateV1 {
125    Live {
126        at: ReplCidV1,
127        // Also add AT here for breaking entry origin on conflict.
128        attrs: BTreeMap<Attribute, ReplAttrStateV1>,
129    },
130    Tombstone {
131        at: ReplCidV1,
132    },
133}
134
135#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
136#[serde(rename_all = "lowercase")]
137// I think partial entries should be separate? This clearly implies a refresh.
138pub struct ReplEntryV1 {
139    uuid: Uuid,
140    // Change State
141    st: ReplStateV1,
142}
143
144impl ReplEntryV1 {
145    pub fn new(entry: &EntrySealedCommitted, schema: &SchemaReadTransaction) -> ReplEntryV1 {
146        let cs = entry.get_changestate();
147        let uuid = entry.get_uuid();
148
149        let st = match cs.current() {
150            State::Live { at, changes } => {
151                let live_attrs = entry.get_ava();
152
153                let attrs = changes
154                    .iter()
155                    .filter_map(|(attr_name, cid)| {
156                        if schema.is_replicated(attr_name) {
157                            let live_attr = live_attrs.get(attr_name);
158
159                            let cid = cid.into();
160                            let attr = live_attr.and_then(|maybe|
161                                // There is a quirk in the way we currently handle certain
162                                // types of adds/deletes that it may be possible to have an
163                                // empty value set still in memory on a supplier. In the future
164                                // we may make it so in memory valuesets can be empty and sent
165                                // but for now, if it's an empty set in any capacity, we map
166                                // to None and just send the Cid since they have the same result
167                                // on how the entry/attr state looks at each end.
168                                if maybe.is_empty() {
169                                    None
170                                } else {
171                                    Some(maybe.to_db_valueset_v2())
172                                }
173                            );
174
175                            Some((attr_name.clone(), ReplAttrStateV1 { cid, attr }))
176                        } else {
177                            None
178                        }
179                    })
180                    .collect();
181
182                ReplStateV1::Live {
183                    at: at.into(),
184                    attrs,
185                }
186            }
187            State::Tombstone { at } => ReplStateV1::Tombstone { at: at.into() },
188        };
189
190        ReplEntryV1 { uuid, st }
191    }
192
193    pub fn rehydrate(self) -> Result<(EntryChangeState, Eattrs), OperationError> {
194        match self.st {
195            ReplStateV1::Live { at, attrs } => {
196                trace!("{:?} {:#?}", at, attrs);
197                // We need to build two sets, one for the Entry Change States, and one for the
198                // Eattrs.
199                let mut changes = BTreeMap::default();
200                let mut eattrs = Eattrs::default();
201
202                for (attr_name, ReplAttrStateV1 { cid, attr }) in attrs.into_iter() {
203                    let cid: Cid = cid.into();
204
205                    if let Some(attr_value) = attr {
206                        let v = valueset::from_db_valueset_v2(attr_value).inspect_err(|err| {
207                            error!(?err, "Unable to restore valueset for {}", attr_name);
208                        })?;
209                        if eattrs.insert(attr_name.clone(), v).is_some() {
210                            error!(
211                                "Impossible eattrs state, attribute {} appears to be duplicated!",
212                                attr_name
213                            );
214                            return Err(OperationError::InvalidEntryState);
215                        }
216                    }
217
218                    if changes.insert(attr_name.clone(), cid).is_some() {
219                        error!(
220                            "Impossible changes state, attribute {} appears to be duplicated!",
221                            attr_name
222                        );
223                        return Err(OperationError::InvalidEntryState);
224                    }
225                }
226
227                let at: Cid = at.into();
228
229                let ecstate = EntryChangeState {
230                    st: State::Live { at, changes },
231                };
232                Ok((ecstate, eattrs))
233            }
234            ReplStateV1::Tombstone { at } => {
235                let at: Cid = at.into();
236
237                let mut eattrs = Eattrs::default();
238
239                let class_ava = vs_iutf8![EntryClass::Object.into(), EntryClass::Tombstone.into()];
240                let last_mod_ava = vs_cid![at.clone()];
241
242                eattrs.insert(Attribute::Uuid, vs_uuid![self.uuid]);
243                eattrs.insert(Attribute::Class, class_ava);
244                eattrs.insert(Attribute::LastModifiedCid, last_mod_ava);
245
246                let ecstate = EntryChangeState {
247                    st: State::Tombstone { at },
248                };
249
250                Ok((ecstate, eattrs))
251            }
252        }
253    }
254}
255
256#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
257#[serde(rename_all = "lowercase")]
258// I think partial entries should be separate? This clearly implies a refresh.
259pub struct ReplIncrementalEntryV1 {
260    pub(crate) uuid: Uuid,
261    // Change State
262    pub(crate) st: ReplStateV1,
263}
264
265impl ReplIncrementalEntryV1 {
266    pub fn new(
267        entry: &EntrySealedCommitted,
268        schema: &SchemaReadTransaction,
269        ctx_range: &BTreeMap<Uuid, ReplCidRange>,
270    ) -> ReplIncrementalEntryV1 {
271        let cs = entry.get_changestate();
272        let uuid = entry.get_uuid();
273
274        let st = match cs.current() {
275            State::Live { at, changes } => {
276                // Only put attributes into the change state that were changed within the range that was
277                // requested.
278                let live_attrs = entry.get_ava();
279
280                let attrs = changes
281                    .iter()
282                    .filter_map(|(attr_name, cid)| {
283                        // If the cid is within the ctx range
284                        let within = schema.is_replicated(attr_name)
285                            && ctx_range
286                                .get(&cid.s_uuid)
287                                .map(|repl_range| {
288                                    // Supply anything up to and including.
289                                    cid.ts <= repl_range.ts_max &&
290                                    // ts_min is always what the consumer already has.
291                                    cid.ts > repl_range.ts_min
292                                })
293                                // If not present in the range, assume it's not needed.
294                                .unwrap_or(false);
295
296                        // Then setup to supply it.
297                        if within {
298                            let live_attr = live_attrs.get(attr_name);
299                            let cid = cid.into();
300                            let attr = live_attr.and_then(|maybe| {
301                                if maybe.is_empty() {
302                                    None
303                                } else {
304                                    Some(maybe.to_db_valueset_v2())
305                                }
306                            });
307
308                            Some((attr_name.clone(), ReplAttrStateV1 { cid, attr }))
309                        } else {
310                            None
311                        }
312                    })
313                    .collect();
314
315                ReplStateV1::Live {
316                    at: at.into(),
317                    attrs,
318                }
319            }
320            // Don't care what the at is - send the tombstone.
321            State::Tombstone { at } => ReplStateV1::Tombstone { at: at.into() },
322        };
323
324        ReplIncrementalEntryV1 { uuid, st }
325    }
326
327    pub fn rehydrate(self) -> Result<(Uuid, EntryChangeState, Eattrs), OperationError> {
328        match self.st {
329            ReplStateV1::Live { at, attrs } => {
330                trace!("{:?} {:#?}", at, attrs);
331                let mut changes = BTreeMap::default();
332                let mut eattrs = Eattrs::default();
333
334                for (attr_name, ReplAttrStateV1 { cid, attr }) in attrs.into_iter() {
335                    let cid: Cid = cid.into();
336
337                    if let Some(attr_value) = attr {
338                        let v = valueset::from_db_valueset_v2(attr_value).inspect_err(|err| {
339                            error!(?err, "Unable to restore valueset for {}", attr_name);
340                        })?;
341                        if eattrs.insert(attr_name.clone(), v).is_some() {
342                            error!(
343                                "Impossible eattrs state, attribute {} appears to be duplicated!",
344                                attr_name
345                            );
346                            return Err(OperationError::InvalidEntryState);
347                        }
348                    }
349
350                    if changes.insert(attr_name.clone(), cid).is_some() {
351                        error!(
352                            "Impossible changes state, attribute {} appears to be duplicated!",
353                            attr_name
354                        );
355                        return Err(OperationError::InvalidEntryState);
356                    }
357                }
358
359                let at: Cid = at.into();
360
361                let ecstate = EntryChangeState {
362                    st: State::Live { at, changes },
363                };
364                Ok((self.uuid, ecstate, eattrs))
365            }
366            ReplStateV1::Tombstone { at } => {
367                let at: Cid = at.into();
368                let eattrs = Eattrs::default();
369                let ecstate = EntryChangeState {
370                    st: State::Tombstone { at },
371                };
372                Ok((self.uuid, ecstate, eattrs))
373            }
374        }
375    }
376}
377
378// From / Into Entry
379
380#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
381#[serde(rename_all = "lowercase")]
382pub enum ReplRefreshContext {
383    V1 {
384        domain_version: DomainVersion,
385        domain_devel: bool,
386        domain_uuid: Uuid,
387        // We need to send the current state of the ranges to populate into
388        // the ranges so that lookups and ranges work properly.
389        ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
390        schema_entries: Vec<ReplEntryV1>,
391        meta_entries: Vec<ReplEntryV1>,
392        entries: Vec<ReplEntryV1>,
393    },
394}
395
396#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
397#[serde(rename_all = "lowercase")]
398pub enum ReplIncrementalContext {
399    DomainMismatch,
400    NoChangesAvailable,
401    RefreshRequired,
402    UnwillingToSupply,
403    V1 {
404        domain_version: DomainVersion,
405        #[serde(default)]
406        domain_patch_level: u32,
407        domain_uuid: Uuid,
408        // We need to send the current state of the ranges to populate into
409        // the ranges so that lookups and ranges work properly, and the
410        // consumer ends with the same state as we have (or at least merges)
411        // it with this.
412        ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
413        schema_entries: Vec<ReplIncrementalEntryV1>,
414        meta_entries: Vec<ReplIncrementalEntryV1>,
415        entries: Vec<ReplIncrementalEntryV1>,
416    },
417}