kanidmd_lib/repl/
supplier.rs

1use super::proto::{
2    ReplEntryV1, ReplIncrementalContext, ReplIncrementalEntryV1, ReplRefreshContext, ReplRuvRange,
3};
4use super::ruv::{RangeDiffStatus, ReplicationUpdateVector, ReplicationUpdateVectorTransaction};
5use crate::be::BackendTransaction;
6use crate::prelude::*;
7
8use crate::be::keystorage::{KeyHandle, KeyHandleId};
9use kanidm_lib_crypto::mtls::build_self_signed_server_and_client_identity;
10use kanidm_lib_crypto::prelude::{PKey, Private, X509};
11
12impl QueryServerWriteTransaction<'_> {
13    fn supplier_generate_key_cert(
14        &mut self,
15        domain_name: &str,
16    ) -> Result<(PKey<Private>, X509), OperationError> {
17        // Invalid, must need to re-generate.
18        let s_uuid = self.get_server_uuid();
19
20        let (private, x509) = build_self_signed_server_and_client_identity(
21            s_uuid,
22            domain_name,
23            REPL_MTLS_CERTIFICATE_DAYS,
24        )
25        .map_err(|err| {
26            error!(?err, "Unable to generate self signed key/cert");
27            // What error?
28            OperationError::CryptographyError
29        })?;
30
31        let kh = KeyHandle::X509Key {
32            private: private.clone(),
33            x509: x509.clone(),
34        };
35
36        self.get_be_txn()
37            .set_key_handle(KeyHandleId::ReplicationKey, kh)
38            .map_err(|err| {
39                error!(?err, "Unable to persist replication key");
40                err
41            })
42            .map(|()| (private, x509))
43    }
44
45    #[instrument(level = "info", skip_all)]
46    pub fn supplier_renew_key_cert(&mut self, domain_name: &str) -> Result<(), OperationError> {
47        self.supplier_generate_key_cert(domain_name).map(|_| ())
48    }
49
50    #[instrument(level = "info", skip_all)]
51    pub fn supplier_get_key_cert(
52        &mut self,
53        domain_name: &str,
54    ) -> Result<(PKey<Private>, X509), OperationError> {
55        // Later we need to put this through a HSM or similar, but we will always need a way
56        // to persist a handle, so we still need the db write and load components.
57
58        // Does the handle exist?
59        let maybe_key_handle = self
60            .get_be_txn()
61            .get_key_handle(KeyHandleId::ReplicationKey)
62            .map_err(|err| {
63                error!(?err, "Unable to access replication key");
64                err
65            })?;
66
67        // Can you process the keyhandle?
68        let key_cert = match maybe_key_handle {
69            Some(KeyHandle::X509Key { private, x509 }) => (private, x509),
70            /*
71            Some(Keyhandle::...) => {
72                // invalid key
73                // error? regenerate?
74            }
75            */
76            None => self.supplier_generate_key_cert(domain_name)?,
77        };
78
79        Ok(key_cert)
80    }
81}
82
83impl QueryServerReadTransaction<'_> {
84    // Given a consumers state, calculate the differential of changes they
85    // need to be sent to bring them to the equivalent state.
86
87    // We use the RUV or Cookie to determine if:
88    // * The consumer requires a full-reinit.
89    // * Which entry attr-states need to be sent, if any
90
91    #[instrument(level = "debug", skip_all)]
92    pub fn supplier_provide_changes(
93        &mut self,
94        ctx_ruv: ReplRuvRange,
95    ) -> Result<ReplIncrementalContext, OperationError> {
96        // Convert types if needed. This way we can compare ruv's correctly.
97        let (ctx_domain_uuid, ctx_ranges) = match ctx_ruv {
98            ReplRuvRange::V1 {
99                domain_uuid,
100                ranges,
101            } => (domain_uuid, ranges),
102        };
103
104        if ctx_domain_uuid != self.d_info.d_uuid {
105            error!("Replication - Consumer Domain UUID does not match our local domain uuid.");
106            debug!(consumer_domain_uuid = ?ctx_domain_uuid, supplier_domain_uuid = ?self.d_info.d_uuid);
107            return Ok(ReplIncrementalContext::DomainMismatch);
108        }
109
110        // This is a reasonably tricky part of the code, because we are attempting to do a
111        // distributed and async liveness check. What content has the consumer seen? What
112        // could they have trimmed from their own RUV?
113        //
114        // Since tombstone purging always creates an anchor, then there are always "pings"
115        // effectively going out of "empty" changes that drive the RUV forward. This assists us
116        // to detect this situation.
117        //
118        // If a server has been replicating correctly, then it should have at least *some* overlap
119        // with us since content has always advanced.
120        //
121        // If a server has "stalled" then it will have *no* overlap. This can manifest as a need
122        // to supply all ranges as though they were new because the lagging consumer has trimmed out
123        // all the old content.
124        //
125        // When a server is newly added it will have overlap because it will have refreshed from
126        // another server.
127        //
128        // When a server is "trimmed" from the RUV, it no longer influences the overlap decision
129        // because the other servers will have continued to advance.
130
131        let trim_cid = self.trim_cid().clone();
132
133        let supplier_ruv = self.get_be_txn().get_ruv();
134
135        let our_ranges = supplier_ruv.filter_ruv_range(&trim_cid).map_err(|e| {
136            error!(err = ?e, "Unable to access supplier RUV range");
137            e
138        })?;
139
140        // Compare this to our internal ranges - work out the list of entry
141        // id's that are now different.
142
143        let supply_ranges = ReplicationUpdateVector::range_diff(&ctx_ranges, &our_ranges);
144
145        // If empty, return an empty set of changes!
146
147        let ranges = match supply_ranges {
148            RangeDiffStatus::Ok(ranges) => ranges,
149            RangeDiffStatus::Refresh { lag_range } => {
150                error!("Replication - Consumer is lagging and must be refreshed.");
151                info!(?lag_range);
152                debug!(consumer_ranges = ?ctx_ranges);
153                debug!(supplier_ranges = ?our_ranges);
154                return Ok(ReplIncrementalContext::RefreshRequired);
155            }
156            RangeDiffStatus::Unwilling { adv_range } => {
157                error!("Replication - Supplier is lagging and must be investigated.");
158                info!(?adv_range);
159                debug!(consumer_ranges = ?ctx_ranges);
160                debug!(supplier_ranges = ?our_ranges);
161                return Ok(ReplIncrementalContext::UnwillingToSupply);
162            }
163            RangeDiffStatus::Critical {
164                lag_range,
165                adv_range,
166            } => {
167                error!("Replication Critical - Consumers are advanced of us, and also lagging! This must be immediately investigated!");
168                info!(?lag_range);
169                info!(?adv_range);
170                debug!(consumer_ranges = ?ctx_ranges);
171                debug!(supplier_ranges = ?our_ranges);
172                return Ok(ReplIncrementalContext::UnwillingToSupply);
173            }
174            RangeDiffStatus::NoRUVOverlap => {
175                error!("Replication Critical - Consumers RUV has desynchronised and diverged! This must be immediately investigated!");
176                debug!(consumer_ranges = ?ctx_ranges);
177                debug!(supplier_ranges = ?our_ranges);
178                return Ok(ReplIncrementalContext::UnwillingToSupply);
179            }
180        };
181
182        debug!("these ranges will be supplied");
183        debug!(supply_ranges = ?ranges);
184        debug!(consumer_ranges = ?ctx_ranges);
185        debug!(supplier_ranges = ?our_ranges);
186
187        if ranges.is_empty() {
188            debug!("No Changes Available");
189            return Ok(ReplIncrementalContext::NoChangesAvailable);
190        }
191
192        // From the set of change id's, fetch those entries.
193        // This is done by supplying the ranges to the be which extracts
194        // the entries affected by the idls in question.
195        let entries = self.get_be_txn().retrieve_range(&ranges).map_err(|e| {
196            admin_error!(?e, "backend failure");
197            OperationError::Backend
198        })?;
199
200        // Separate the entries into schema, meta and remaining.
201        let (schema_entries, rem_entries): (Vec<_>, Vec<_>) = entries.into_iter().partition(|e| {
202            e.get_ava_set(Attribute::Class)
203                .map(|cls| {
204                    cls.contains(&EntryClass::AttributeType.into() as &PartialValue)
205                        || cls.contains(&EntryClass::ClassType.into() as &PartialValue)
206                })
207                .unwrap_or(false)
208        });
209
210        let (meta_entries, entries): (Vec<_>, Vec<_>) = rem_entries.into_iter().partition(|e| {
211            e.get_ava_set(Attribute::Class)
212                .map(|cls| {
213                    cls.contains(&EntryClass::DomainInfo.into() as &PartialValue)
214                        || cls.contains(&EntryClass::SystemInfo.into() as &PartialValue)
215                        || cls.contains(&EntryClass::SystemConfig.into() as &PartialValue)
216                        || cls.contains(&EntryClass::KeyProvider.into() as &PartialValue)
217                })
218                .unwrap_or(false)
219        });
220
221        trace!(?schema_entries);
222        trace!(?meta_entries);
223        trace!(?entries);
224
225        // For each entry, determine the changes that exist on the entry that fall
226        // into the ruv range - reduce to a incremental set of changes.
227
228        let schema = self.get_schema();
229        let domain_version = self.d_info.d_vers;
230        let domain_patch_level = if self.d_info.d_devel_taint {
231            u32::MAX
232        } else {
233            self.d_info.d_patch_level
234        };
235        let domain_uuid = self.d_info.d_uuid;
236
237        let schema_entries: Vec<_> = schema_entries
238            .into_iter()
239            .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
240            .collect();
241
242        let meta_entries: Vec<_> = meta_entries
243            .into_iter()
244            .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
245            .collect();
246
247        let entries: Vec<_> = entries
248            .into_iter()
249            .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
250            .collect();
251
252        // Finally, populate the ranges with anchors from the RUV
253        let supplier_ruv = self.get_be_txn().get_ruv();
254        let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
255
256        // Build the incremental context.
257        Ok(ReplIncrementalContext::V1 {
258            domain_version,
259            domain_patch_level,
260            domain_uuid,
261            ranges,
262            schema_entries,
263            meta_entries,
264            entries,
265        })
266    }
267
268    #[instrument(level = "debug", skip_all)]
269    pub fn supplier_provide_refresh(&mut self) -> Result<ReplRefreshContext, OperationError> {
270        // Get the current schema. We use this for attribute and entry filtering.
271        let schema = self.get_schema();
272
273        // A refresh must provide
274        //
275        // * the current domain version
276        let domain_version = self.d_info.d_vers;
277        let domain_devel = self.d_info.d_devel_taint;
278        let domain_uuid = self.d_info.d_uuid;
279
280        let trim_cid = self.trim_cid().clone();
281
282        // What is the set of data we are providing?
283        let ranges = self
284            .get_be_txn()
285            .get_ruv()
286            .filter_ruv_range(&trim_cid)
287            .map_err(|e| {
288                error!(err = ?e, "Unable to access supplier RUV range");
289                e
290            })?;
291
292        // * the domain uuid
293        // * the set of schema entries
294        // * the set of non-schema entries
295        // - We must exclude certain entries and attributes!
296        //   * schema defines what we exclude!
297
298        let schema_filter_inner = f_or!([
299            f_eq(Attribute::Class, EntryClass::AttributeType.into()),
300            f_eq(Attribute::Class, EntryClass::ClassType.into()),
301        ]);
302
303        let schema_filter = filter!(schema_filter_inner.clone());
304
305        let meta_filter_inner = f_or!([
306            f_eq(Attribute::Class, EntryClass::DomainInfo.into()),
307            f_eq(Attribute::Class, EntryClass::SystemInfo.into()),
308            f_eq(Attribute::Class, EntryClass::SystemConfig.into()),
309            f_eq(Attribute::Class, EntryClass::KeyProvider.into()),
310        ]);
311
312        let meta_filter = filter!(meta_filter_inner.clone());
313
314        let entry_filter = filter_all!(f_or!([
315            f_and!([
316                f_pres(Attribute::Class),
317                f_andnot(f_or(vec![schema_filter_inner, meta_filter_inner])),
318            ]),
319            f_eq(Attribute::Class, EntryClass::Tombstone.into()),
320            f_eq(Attribute::Class, EntryClass::Recycled.into()),
321        ]));
322
323        let schema_entries = self
324            .internal_search(schema_filter)
325            .map(|ent| {
326                ent.into_iter()
327                    .map(|e| ReplEntryV1::new(e.as_ref(), schema))
328                    .collect()
329            })
330            .inspect_err(|err| {
331                error!(?err, "Failed to access schema entries");
332            })?;
333
334        let meta_entries = self
335            .internal_search(meta_filter)
336            .map(|ent| {
337                ent.into_iter()
338                    .map(|e| ReplEntryV1::new(e.as_ref(), schema))
339                    .collect()
340            })
341            .inspect_err(|err| {
342                error!(?err, "Failed to access meta entries");
343            })?;
344
345        let entries = self
346            .internal_search(entry_filter)
347            .map(|ent| {
348                ent.into_iter()
349                    .map(|e| ReplEntryV1::new(e.as_ref(), schema))
350                    .collect()
351            })
352            .inspect_err(|err| {
353                error!(?err, "Failed to access entries");
354            })?;
355
356        // Finally, populate the ranges with anchors from the RUV
357        let supplier_ruv = self.get_be_txn().get_ruv();
358        let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
359
360        Ok(ReplRefreshContext::V1 {
361            domain_version,
362            domain_devel,
363            domain_uuid,
364            ranges,
365            schema_entries,
366            meta_entries,
367            entries,
368        })
369    }
370}