1use super::proto::{
2    ReplEntryV1, ReplIncrementalContext, ReplIncrementalEntryV1, ReplRefreshContext, ReplRuvRange,
3};
4use super::ruv::{RangeDiffStatus, ReplicationUpdateVector, ReplicationUpdateVectorTransaction};
5use crate::be::BackendTransaction;
6use crate::prelude::*;
7
8use crate::be::keystorage::{KeyHandle, KeyHandleId};
9use kanidm_lib_crypto::mtls::build_self_signed_server_and_client_identity;
10use kanidm_lib_crypto::prelude::{PKey, Private, X509};
11
12impl QueryServerWriteTransaction<'_> {
13    fn supplier_generate_key_cert(
14        &mut self,
15        domain_name: &str,
16    ) -> Result<(PKey<Private>, X509), OperationError> {
17        let s_uuid = self.get_server_uuid();
19
20        let (private, x509) = build_self_signed_server_and_client_identity(
21            s_uuid,
22            domain_name,
23            REPL_MTLS_CERTIFICATE_DAYS,
24        )
25        .map_err(|err| {
26            error!(?err, "Unable to generate self signed key/cert");
27            OperationError::CryptographyError
29        })?;
30
31        let kh = KeyHandle::X509Key {
32            private: private.clone(),
33            x509: x509.clone(),
34        };
35
36        self.get_be_txn()
37            .set_key_handle(KeyHandleId::ReplicationKey, kh)
38            .map_err(|err| {
39                error!(?err, "Unable to persist replication key");
40                err
41            })
42            .map(|()| (private, x509))
43    }
44
45    #[instrument(level = "info", skip_all)]
46    pub fn supplier_renew_key_cert(&mut self, domain_name: &str) -> Result<(), OperationError> {
47        self.supplier_generate_key_cert(domain_name).map(|_| ())
48    }
49
50    #[instrument(level = "info", skip_all)]
51    pub fn supplier_get_key_cert(
52        &mut self,
53        domain_name: &str,
54    ) -> Result<(PKey<Private>, X509), OperationError> {
55        let maybe_key_handle = self
60            .get_be_txn()
61            .get_key_handle(KeyHandleId::ReplicationKey)
62            .map_err(|err| {
63                error!(?err, "Unable to access replication key");
64                err
65            })?;
66
67        let key_cert = match maybe_key_handle {
69            Some(KeyHandle::X509Key { private, x509 }) => (private, x509),
70            None => self.supplier_generate_key_cert(domain_name)?,
77        };
78
79        Ok(key_cert)
80    }
81}
82
83impl QueryServerReadTransaction<'_> {
84    #[instrument(level = "debug", skip_all)]
92    pub fn supplier_provide_changes(
93        &mut self,
94        ctx_ruv: ReplRuvRange,
95    ) -> Result<ReplIncrementalContext, OperationError> {
96        let (ctx_domain_uuid, ctx_ranges) = match ctx_ruv {
98            ReplRuvRange::V1 {
99                domain_uuid,
100                ranges,
101            } => (domain_uuid, ranges),
102        };
103
104        if ctx_domain_uuid != self.d_info.d_uuid {
105            error!("Replication - Consumer Domain UUID does not match our local domain uuid.");
106            debug!(consumer_domain_uuid = ?ctx_domain_uuid, supplier_domain_uuid = ?self.d_info.d_uuid);
107            return Ok(ReplIncrementalContext::DomainMismatch);
108        }
109
110        let trim_cid = self.trim_cid().clone();
132
133        let supplier_ruv = self.get_be_txn().get_ruv();
134
135        let our_ranges = supplier_ruv.filter_ruv_range(&trim_cid).map_err(|e| {
136            error!(err = ?e, "Unable to access supplier RUV range");
137            e
138        })?;
139
140        let supply_ranges = ReplicationUpdateVector::range_diff(&ctx_ranges, &our_ranges);
144
145        let ranges = match supply_ranges {
148            RangeDiffStatus::Ok(ranges) => ranges,
149            RangeDiffStatus::Refresh { lag_range } => {
150                error!("Replication - Consumer is lagging and must be refreshed.");
151                debug!(?lag_range);
152                debug!(consumer_ranges = ?ctx_ranges);
153                debug!(supplier_ranges = ?our_ranges);
154                return Ok(ReplIncrementalContext::RefreshRequired);
155            }
156            RangeDiffStatus::Unwilling { adv_range } => {
157                error!("Replication - Supplier is lagging and must be investigated.");
158                debug!(?adv_range);
159                debug!(consumer_ranges = ?ctx_ranges);
160                debug!(supplier_ranges = ?our_ranges);
161                return Ok(ReplIncrementalContext::UnwillingToSupply);
162            }
163            RangeDiffStatus::Critical {
164                lag_range,
165                adv_range,
166            } => {
167                error!(?adv_range, ?lag_range, "Replication Critical - Consumers are advanced of us, and also lagging! This must be immediately investigated!");
168                debug!(consumer_ranges = ?ctx_ranges);
169                debug!(supplier_ranges = ?our_ranges);
170                return Ok(ReplIncrementalContext::UnwillingToSupply);
171            }
172            RangeDiffStatus::NoRUVOverlap => {
173                error!("Replication Critical - Consumers RUV has desynchronised and diverged! This must be immediately investigated!");
174                debug!(consumer_ranges = ?ctx_ranges);
175                debug!(supplier_ranges = ?our_ranges);
176                return Ok(ReplIncrementalContext::UnwillingToSupply);
177            }
178        };
179
180        debug!("these ranges will be supplied");
181        debug!(supply_ranges = ?ranges);
182        debug!(consumer_ranges = ?ctx_ranges);
183        debug!(supplier_ranges = ?our_ranges);
184
185        if ranges.is_empty() {
186            debug!("No Changes Available");
187            return Ok(ReplIncrementalContext::NoChangesAvailable);
188        }
189
190        let entries = self.get_be_txn().retrieve_range(&ranges).map_err(|e| {
194            admin_error!(?e, "backend failure");
195            OperationError::Backend
196        })?;
197
198        let (schema_entries, rem_entries): (Vec<_>, Vec<_>) = entries.into_iter().partition(|e| {
200            e.get_ava_set(Attribute::Class)
201                .map(|cls| {
202                    cls.contains(&EntryClass::AttributeType.into() as &PartialValue)
203                        || cls.contains(&EntryClass::ClassType.into() as &PartialValue)
204                })
205                .unwrap_or(false)
206        });
207
208        let (meta_entries, entries): (Vec<_>, Vec<_>) = rem_entries.into_iter().partition(|e| {
209            e.get_ava_set(Attribute::Class)
210                .map(|cls| {
211                    cls.contains(&EntryClass::DomainInfo.into() as &PartialValue)
212                        || cls.contains(&EntryClass::SystemInfo.into() as &PartialValue)
213                        || cls.contains(&EntryClass::SystemConfig.into() as &PartialValue)
214                        || cls.contains(&EntryClass::KeyProvider.into() as &PartialValue)
215                })
216                .unwrap_or(false)
217        });
218
219        trace!(?schema_entries);
220        trace!(?meta_entries);
221        trace!(?entries);
222
223        let schema = self.get_schema();
227        let domain_version = self.d_info.d_vers;
228        let domain_patch_level = if self.d_info.d_devel_taint {
229            u32::MAX
230        } else {
231            self.d_info.d_patch_level
232        };
233        let domain_uuid = self.d_info.d_uuid;
234
235        let schema_entries: Vec<_> = schema_entries
236            .into_iter()
237            .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
238            .collect();
239
240        let meta_entries: Vec<_> = meta_entries
241            .into_iter()
242            .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
243            .collect();
244
245        let entries: Vec<_> = entries
246            .into_iter()
247            .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
248            .collect();
249
250        let supplier_ruv = self.get_be_txn().get_ruv();
252        let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
253
254        Ok(ReplIncrementalContext::V1 {
256            domain_version,
257            domain_patch_level,
258            domain_uuid,
259            ranges,
260            schema_entries,
261            meta_entries,
262            entries,
263        })
264    }
265
266    #[instrument(level = "debug", skip_all)]
267    pub fn supplier_provide_refresh(&mut self) -> Result<ReplRefreshContext, OperationError> {
268        let schema = self.get_schema();
270
271        let domain_version = self.d_info.d_vers;
275        let domain_devel = self.d_info.d_devel_taint;
276        let domain_uuid = self.d_info.d_uuid;
277
278        let trim_cid = self.trim_cid().clone();
279
280        let ranges = self
282            .get_be_txn()
283            .get_ruv()
284            .filter_ruv_range(&trim_cid)
285            .map_err(|e| {
286                error!(err = ?e, "Unable to access supplier RUV range");
287                e
288            })?;
289
290        let schema_filter_inner = f_or!([
297            f_eq(Attribute::Class, EntryClass::AttributeType.into()),
298            f_eq(Attribute::Class, EntryClass::ClassType.into()),
299        ]);
300
301        let schema_filter = filter!(schema_filter_inner.clone());
302
303        let meta_filter_inner = f_or!([
304            f_eq(Attribute::Class, EntryClass::DomainInfo.into()),
305            f_eq(Attribute::Class, EntryClass::SystemInfo.into()),
306            f_eq(Attribute::Class, EntryClass::SystemConfig.into()),
307            f_eq(Attribute::Class, EntryClass::KeyProvider.into()),
308        ]);
309
310        let meta_filter = filter!(meta_filter_inner.clone());
311
312        let entry_filter = filter_all!(f_or!([
313            f_and!([
314                f_pres(Attribute::Class),
315                f_andnot(f_or(vec![schema_filter_inner, meta_filter_inner])),
316            ]),
317            f_eq(Attribute::Class, EntryClass::Tombstone.into()),
318            f_eq(Attribute::Class, EntryClass::Recycled.into()),
319        ]));
320
321        let schema_entries = self
322            .internal_search(schema_filter)
323            .map(|ent| {
324                ent.into_iter()
325                    .map(|e| ReplEntryV1::new(e.as_ref(), schema))
326                    .collect()
327            })
328            .inspect_err(|err| {
329                error!(?err, "Failed to access schema entries");
330            })?;
331
332        let meta_entries = self
333            .internal_search(meta_filter)
334            .map(|ent| {
335                ent.into_iter()
336                    .map(|e| ReplEntryV1::new(e.as_ref(), schema))
337                    .collect()
338            })
339            .inspect_err(|err| {
340                error!(?err, "Failed to access meta entries");
341            })?;
342
343        let entries = self
344            .internal_search(entry_filter)
345            .map(|ent| {
346                ent.into_iter()
347                    .map(|e| ReplEntryV1::new(e.as_ref(), schema))
348                    .collect()
349            })
350            .inspect_err(|err| {
351                error!(?err, "Failed to access entries");
352            })?;
353
354        let supplier_ruv = self.get_be_txn().get_ruv();
356        let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
357
358        Ok(ReplRefreshContext::V1 {
359            domain_version,
360            domain_devel,
361            domain_uuid,
362            ranges,
363            schema_entries,
364            meta_entries,
365            entries,
366        })
367    }
368}