1use super::proto::{
2 ReplEntryV1, ReplIncrementalContext, ReplIncrementalEntryV1, ReplRefreshContext, ReplRuvRange,
3};
4use super::ruv::{RangeDiffStatus, ReplicationUpdateVector, ReplicationUpdateVectorTransaction};
5use crate::be::BackendTransaction;
6use crate::prelude::*;
7
8use crate::be::keystorage::{KeyHandle, KeyHandleId};
9use kanidm_lib_crypto::mtls::build_self_signed_server_and_client_identity;
10use kanidm_lib_crypto::prelude::{PKey, Private, X509};
11
12impl QueryServerWriteTransaction<'_> {
13 fn supplier_generate_key_cert(
14 &mut self,
15 domain_name: &str,
16 ) -> Result<(PKey<Private>, X509), OperationError> {
17 let s_uuid = self.get_server_uuid();
19
20 let (private, x509) = build_self_signed_server_and_client_identity(
21 s_uuid,
22 domain_name,
23 REPL_MTLS_CERTIFICATE_DAYS,
24 )
25 .map_err(|err| {
26 error!(?err, "Unable to generate self signed key/cert");
27 OperationError::CryptographyError
29 })?;
30
31 let kh = KeyHandle::X509Key {
32 private: private.clone(),
33 x509: x509.clone(),
34 };
35
36 self.get_be_txn()
37 .set_key_handle(KeyHandleId::ReplicationKey, kh)
38 .map_err(|err| {
39 error!(?err, "Unable to persist replication key");
40 err
41 })
42 .map(|()| (private, x509))
43 }
44
45 #[instrument(level = "info", skip_all)]
46 pub fn supplier_renew_key_cert(&mut self, domain_name: &str) -> Result<(), OperationError> {
47 self.supplier_generate_key_cert(domain_name).map(|_| ())
48 }
49
50 #[instrument(level = "info", skip_all)]
51 pub fn supplier_get_key_cert(
52 &mut self,
53 domain_name: &str,
54 ) -> Result<(PKey<Private>, X509), OperationError> {
55 let maybe_key_handle = self
60 .get_be_txn()
61 .get_key_handle(KeyHandleId::ReplicationKey)
62 .map_err(|err| {
63 error!(?err, "Unable to access replication key");
64 err
65 })?;
66
67 let key_cert = match maybe_key_handle {
69 Some(KeyHandle::X509Key { private, x509 }) => (private, x509),
70 None => self.supplier_generate_key_cert(domain_name)?,
77 };
78
79 Ok(key_cert)
80 }
81}
82
83impl QueryServerReadTransaction<'_> {
84 #[instrument(level = "debug", skip_all)]
92 pub fn supplier_provide_changes(
93 &mut self,
94 ctx_ruv: ReplRuvRange,
95 ) -> Result<ReplIncrementalContext, OperationError> {
96 let (ctx_domain_uuid, ctx_ranges) = match ctx_ruv {
98 ReplRuvRange::V1 {
99 domain_uuid,
100 ranges,
101 } => (domain_uuid, ranges),
102 };
103
104 if ctx_domain_uuid != self.d_info.d_uuid {
105 error!("Replication - Consumer Domain UUID does not match our local domain uuid.");
106 debug!(consumer_domain_uuid = ?ctx_domain_uuid, supplier_domain_uuid = ?self.d_info.d_uuid);
107 return Ok(ReplIncrementalContext::DomainMismatch);
108 }
109
110 let trim_cid = self.trim_cid().clone();
132
133 let supplier_ruv = self.get_be_txn().get_ruv();
134
135 let our_ranges = supplier_ruv.filter_ruv_range(&trim_cid).map_err(|e| {
136 error!(err = ?e, "Unable to access supplier RUV range");
137 e
138 })?;
139
140 let supply_ranges = ReplicationUpdateVector::range_diff(&ctx_ranges, &our_ranges);
144
145 let ranges = match supply_ranges {
148 RangeDiffStatus::Ok(ranges) => ranges,
149 RangeDiffStatus::Refresh { lag_range } => {
150 error!("Replication - Consumer is lagging and must be refreshed.");
151 info!(?lag_range);
152 debug!(consumer_ranges = ?ctx_ranges);
153 debug!(supplier_ranges = ?our_ranges);
154 return Ok(ReplIncrementalContext::RefreshRequired);
155 }
156 RangeDiffStatus::Unwilling { adv_range } => {
157 error!("Replication - Supplier is lagging and must be investigated.");
158 info!(?adv_range);
159 debug!(consumer_ranges = ?ctx_ranges);
160 debug!(supplier_ranges = ?our_ranges);
161 return Ok(ReplIncrementalContext::UnwillingToSupply);
162 }
163 RangeDiffStatus::Critical {
164 lag_range,
165 adv_range,
166 } => {
167 error!("Replication Critical - Consumers are advanced of us, and also lagging! This must be immediately investigated!");
168 info!(?lag_range);
169 info!(?adv_range);
170 debug!(consumer_ranges = ?ctx_ranges);
171 debug!(supplier_ranges = ?our_ranges);
172 return Ok(ReplIncrementalContext::UnwillingToSupply);
173 }
174 RangeDiffStatus::NoRUVOverlap => {
175 error!("Replication Critical - Consumers RUV has desynchronised and diverged! This must be immediately investigated!");
176 debug!(consumer_ranges = ?ctx_ranges);
177 debug!(supplier_ranges = ?our_ranges);
178 return Ok(ReplIncrementalContext::UnwillingToSupply);
179 }
180 };
181
182 debug!("these ranges will be supplied");
183 debug!(supply_ranges = ?ranges);
184 debug!(consumer_ranges = ?ctx_ranges);
185 debug!(supplier_ranges = ?our_ranges);
186
187 if ranges.is_empty() {
188 debug!("No Changes Available");
189 return Ok(ReplIncrementalContext::NoChangesAvailable);
190 }
191
192 let entries = self.get_be_txn().retrieve_range(&ranges).map_err(|e| {
196 admin_error!(?e, "backend failure");
197 OperationError::Backend
198 })?;
199
200 let (schema_entries, rem_entries): (Vec<_>, Vec<_>) = entries.into_iter().partition(|e| {
202 e.get_ava_set(Attribute::Class)
203 .map(|cls| {
204 cls.contains(&EntryClass::AttributeType.into() as &PartialValue)
205 || cls.contains(&EntryClass::ClassType.into() as &PartialValue)
206 })
207 .unwrap_or(false)
208 });
209
210 let (meta_entries, entries): (Vec<_>, Vec<_>) = rem_entries.into_iter().partition(|e| {
211 e.get_ava_set(Attribute::Class)
212 .map(|cls| {
213 cls.contains(&EntryClass::DomainInfo.into() as &PartialValue)
214 || cls.contains(&EntryClass::SystemInfo.into() as &PartialValue)
215 || cls.contains(&EntryClass::SystemConfig.into() as &PartialValue)
216 || cls.contains(&EntryClass::KeyProvider.into() as &PartialValue)
217 })
218 .unwrap_or(false)
219 });
220
221 trace!(?schema_entries);
222 trace!(?meta_entries);
223 trace!(?entries);
224
225 let schema = self.get_schema();
229 let domain_version = self.d_info.d_vers;
230 let domain_patch_level = if self.d_info.d_devel_taint {
231 u32::MAX
232 } else {
233 self.d_info.d_patch_level
234 };
235 let domain_uuid = self.d_info.d_uuid;
236
237 let schema_entries: Vec<_> = schema_entries
238 .into_iter()
239 .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
240 .collect();
241
242 let meta_entries: Vec<_> = meta_entries
243 .into_iter()
244 .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
245 .collect();
246
247 let entries: Vec<_> = entries
248 .into_iter()
249 .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
250 .collect();
251
252 let supplier_ruv = self.get_be_txn().get_ruv();
254 let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
255
256 Ok(ReplIncrementalContext::V1 {
258 domain_version,
259 domain_patch_level,
260 domain_uuid,
261 ranges,
262 schema_entries,
263 meta_entries,
264 entries,
265 })
266 }
267
268 #[instrument(level = "debug", skip_all)]
269 pub fn supplier_provide_refresh(&mut self) -> Result<ReplRefreshContext, OperationError> {
270 let schema = self.get_schema();
272
273 let domain_version = self.d_info.d_vers;
277 let domain_devel = self.d_info.d_devel_taint;
278 let domain_uuid = self.d_info.d_uuid;
279
280 let trim_cid = self.trim_cid().clone();
281
282 let ranges = self
284 .get_be_txn()
285 .get_ruv()
286 .filter_ruv_range(&trim_cid)
287 .map_err(|e| {
288 error!(err = ?e, "Unable to access supplier RUV range");
289 e
290 })?;
291
292 let schema_filter_inner = f_or!([
299 f_eq(Attribute::Class, EntryClass::AttributeType.into()),
300 f_eq(Attribute::Class, EntryClass::ClassType.into()),
301 ]);
302
303 let schema_filter = filter!(schema_filter_inner.clone());
304
305 let meta_filter_inner = f_or!([
306 f_eq(Attribute::Class, EntryClass::DomainInfo.into()),
307 f_eq(Attribute::Class, EntryClass::SystemInfo.into()),
308 f_eq(Attribute::Class, EntryClass::SystemConfig.into()),
309 f_eq(Attribute::Class, EntryClass::KeyProvider.into()),
310 ]);
311
312 let meta_filter = filter!(meta_filter_inner.clone());
313
314 let entry_filter = filter_all!(f_or!([
315 f_and!([
316 f_pres(Attribute::Class),
317 f_andnot(f_or(vec![schema_filter_inner, meta_filter_inner])),
318 ]),
319 f_eq(Attribute::Class, EntryClass::Tombstone.into()),
320 f_eq(Attribute::Class, EntryClass::Recycled.into()),
321 ]));
322
323 let schema_entries = self
324 .internal_search(schema_filter)
325 .map(|ent| {
326 ent.into_iter()
327 .map(|e| ReplEntryV1::new(e.as_ref(), schema))
328 .collect()
329 })
330 .inspect_err(|err| {
331 error!(?err, "Failed to access schema entries");
332 })?;
333
334 let meta_entries = self
335 .internal_search(meta_filter)
336 .map(|ent| {
337 ent.into_iter()
338 .map(|e| ReplEntryV1::new(e.as_ref(), schema))
339 .collect()
340 })
341 .inspect_err(|err| {
342 error!(?err, "Failed to access meta entries");
343 })?;
344
345 let entries = self
346 .internal_search(entry_filter)
347 .map(|ent| {
348 ent.into_iter()
349 .map(|e| ReplEntryV1::new(e.as_ref(), schema))
350 .collect()
351 })
352 .inspect_err(|err| {
353 error!(?err, "Failed to access entries");
354 })?;
355
356 let supplier_ruv = self.get_be_txn().get_ruv();
358 let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
359
360 Ok(ReplRefreshContext::V1 {
361 domain_version,
362 domain_devel,
363 domain_uuid,
364 ranges,
365 schema_entries,
366 meta_entries,
367 entries,
368 })
369 }
370}