1use super::proto::{
2 ReplEntryV1, ReplIncrementalContext, ReplIncrementalEntryV1, ReplRefreshContext, ReplRuvRange,
3};
4use super::ruv::{RangeDiffStatus, ReplicationUpdateVector, ReplicationUpdateVectorTransaction};
5use crate::be::keystorage::{KeyHandle, KeyHandleId};
6use crate::be::BackendTransaction;
7use crate::prelude::*;
8use crypto_glue::{
9 der::SecretDocument,
10 ecdsa_p256::{self, EcdsaP256DerSignature, EcdsaP256SigningKey, EcdsaP256VerifyingKey},
11 rand,
12 traits::Pkcs8EncodePrivateKey,
13 x509::{
14 self, oiddb, Builder, Certificate, CertificateBuilder, ExtendedKeyUsage, GeneralName,
15 GeneralizedTime, Ia5String, OctetString, SubjectAltName, SubjectPublicKeyInfoOwned,
16 },
17};
18use rustls::pki_types::{IpAddr, ServerName};
19use std::str::FromStr;
20
21impl QueryServerWriteTransaction<'_> {
22 fn supplier_generate_key_cert(
23 &mut self,
24 domain_name: &str,
25 ) -> Result<(SecretDocument, Certificate), OperationError> {
26 let s_uuid = self.get_server_uuid();
28
29 let private_key = ecdsa_p256::new_key();
30 let signing_key = EcdsaP256SigningKey::from(&private_key);
31 let verifying_key = EcdsaP256VerifyingKey::from(&signing_key);
32
33 let pub_key = SubjectPublicKeyInfoOwned::from_key(verifying_key).map_err(|err| {
34 error!(?err, "Unable to create public key from private key");
35 OperationError::CryptographyError
36 })?;
37
38 let profile = x509::Profile::Manual { issuer: None };
40
41 let not_before = GeneralizedTime::from_unix_duration(self.get_curtime())
42 .map(x509::Time::from)
43 .map_err(|err| {
44 error!(?err, "Unable to convert current time to GeneralizedTime");
45 OperationError::CryptographyError
46 })?;
47 let not_after = GeneralizedTime::from_unix_duration(
48 self.get_curtime() + Duration::from_secs(REPL_MTLS_CERTIFICATE_EXPIRY),
49 )
50 .map(x509::Time::from)
51 .map_err(|err| {
52 error!(?err, "Unable to convert current time to GeneralizedTime");
53 OperationError::CryptographyError
54 })?;
55
56 let validity = x509::Validity {
57 not_before,
58 not_after,
59 };
60
61 let serial_number = x509::uuid_to_serial(s_uuid);
62 let subject =
63 x509::Name::from_str(&format!("O=Kanidm Replication,CN={s_uuid}")).map_err(|err| {
64 error!(?err, "Unable to parse subject dn");
65 OperationError::CryptographyError
66 })?;
67
68 let mut x509_builder = CertificateBuilder::new(
69 profile,
70 serial_number,
71 validity,
72 subject,
73 pub_key,
74 &signing_key,
75 )
76 .map_err(|err| {
77 error!(?err, "Unable to construct certificate builder");
78 OperationError::CryptographyError
79 })?;
80
81 let eku_extension = ExtendedKeyUsage(vec![
83 oiddb::rfc5280::ID_KP_CLIENT_AUTH,
84 oiddb::rfc5280::ID_KP_SERVER_AUTH,
85 ]);
86
87 x509_builder.add_extension(&eku_extension).map_err(|err| {
88 error!(?err, "Unable to add extended key usage extension");
89 OperationError::CryptographyError
90 })?;
91
92 let Ok(server_name) = ServerName::try_from(domain_name.to_owned()) else {
96 error!("Invalid server name for replication");
97 return Err(OperationError::CryptographyError);
98 };
99
100 let subject_alt_name = match server_name {
101 ServerName::DnsName(_) => {
102 let alt_name = Ia5String::new(domain_name).map_err(|err| {
103 error!(?err, "Invalid subject alt name");
104 OperationError::CryptographyError
105 })?;
106 SubjectAltName(vec![GeneralName::DnsName(alt_name)])
107 }
108 ServerName::IpAddress(IpAddr::V4(ipv4_addr)) => {
109 let ip_address = OctetString::new(ipv4_addr.as_ref()).map_err(|err| {
110 error!(?err, "Unable to convert ipv4 address to octet string");
111 OperationError::CryptographyError
112 })?;
113 SubjectAltName(vec![GeneralName::IpAddress(ip_address)])
114 }
115 ServerName::IpAddress(IpAddr::V6(ipv6_addr)) => {
116 let ip_address = OctetString::new(ipv6_addr.as_ref()).map_err(|err| {
117 error!(?err, "Unable to convert ipv6 address to octet string");
118 OperationError::CryptographyError
119 })?;
120 SubjectAltName(vec![GeneralName::IpAddress(ip_address)])
121 }
122
123 _ => return Err(OperationError::CryptographyError),
124 };
125
126 x509_builder
127 .add_extension(&subject_alt_name)
128 .map_err(|err| {
129 error!(?err, "Unable to add subject alt name");
130 OperationError::CryptographyError
131 })?;
132
133 let mut rng = rand::thread_rng();
134 let x509 = x509_builder
135 .build_with_rng::<EcdsaP256DerSignature>(&mut rng)
136 .map_err(|err| {
137 error!(?err, "Unable to generate self signed key/cert");
138 OperationError::CryptographyError
139 })?;
140
141 let private = private_key.to_pkcs8_der().map_err(|err| {
142 error!(?err, "Unable to encode private key");
143 OperationError::CryptographyError
144 })?;
145
146 let kh = KeyHandle::X509Key {
147 private: private.clone(),
148 x509: x509.clone(),
149 };
150
151 self.get_be_txn()
152 .set_key_handle(KeyHandleId::ReplicationKey, kh)
153 .map_err(|err| {
154 error!(?err, "Unable to persist replication key");
155 err
156 })
157 .map(|()| (private, x509))
158 }
159
160 #[instrument(level = "info", skip_all)]
161 pub fn supplier_renew_key_cert(&mut self, domain_name: &str) -> Result<(), OperationError> {
162 self.supplier_generate_key_cert(domain_name).map(|_| ())
163 }
164
165 #[instrument(level = "info", skip_all)]
166 pub fn supplier_get_key_cert(
167 &mut self,
168 domain_name: &str,
169 ) -> Result<(SecretDocument, Certificate), OperationError> {
170 let maybe_key_handle = self
175 .get_be_txn()
176 .get_key_handle(KeyHandleId::ReplicationKey)
177 .map_err(|err| {
178 error!(?err, "Unable to access replication key");
179 err
180 })?;
181
182 let key_cert = match maybe_key_handle {
184 Some(KeyHandle::X509Key { private, x509 }) => (private, x509),
185 None => self.supplier_generate_key_cert(domain_name)?,
192 };
193
194 Ok(key_cert)
195 }
196}
197
198impl QueryServerReadTransaction<'_> {
199 #[instrument(level = "debug", skip_all)]
207 pub fn supplier_provide_changes(
208 &mut self,
209 ctx_ruv: ReplRuvRange,
210 ) -> Result<ReplIncrementalContext, OperationError> {
211 let (ctx_domain_uuid, ctx_ranges) = match ctx_ruv {
213 ReplRuvRange::V1 {
214 domain_uuid,
215 ranges,
216 } => (domain_uuid, ranges),
217 };
218
219 if ctx_domain_uuid != self.d_info.d_uuid {
220 error!("Replication - Consumer Domain UUID does not match our local domain uuid.");
221 debug!(consumer_domain_uuid = ?ctx_domain_uuid, supplier_domain_uuid = ?self.d_info.d_uuid);
222 return Ok(ReplIncrementalContext::DomainMismatch);
223 }
224
225 let trim_cid = self.trim_cid().clone();
247
248 let supplier_ruv = self.get_be_txn().get_ruv();
249
250 let our_ranges = supplier_ruv.filter_ruv_range(&trim_cid).map_err(|e| {
251 error!(err = ?e, "Unable to access supplier RUV range");
252 e
253 })?;
254
255 let supply_ranges = ReplicationUpdateVector::range_diff(&ctx_ranges, &our_ranges);
259
260 let ranges = match supply_ranges {
263 RangeDiffStatus::Ok(ranges) => ranges,
264 RangeDiffStatus::Refresh { lag_range } => {
265 error!("Replication - Consumer is lagging and must be refreshed.");
266 debug!(?lag_range);
267 debug!(consumer_ranges = ?ctx_ranges);
268 debug!(supplier_ranges = ?our_ranges);
269 return Ok(ReplIncrementalContext::RefreshRequired);
270 }
271 RangeDiffStatus::Unwilling { adv_range } => {
272 error!("Replication - Supplier is lagging and must be investigated.");
273 debug!(?adv_range);
274 debug!(consumer_ranges = ?ctx_ranges);
275 debug!(supplier_ranges = ?our_ranges);
276 return Ok(ReplIncrementalContext::UnwillingToSupply);
277 }
278 RangeDiffStatus::Critical {
279 lag_range,
280 adv_range,
281 } => {
282 error!(?adv_range, ?lag_range, "Replication Critical - Consumers are advanced of us, and also lagging! This must be immediately investigated!");
283 debug!(consumer_ranges = ?ctx_ranges);
284 debug!(supplier_ranges = ?our_ranges);
285 return Ok(ReplIncrementalContext::UnwillingToSupply);
286 }
287 RangeDiffStatus::NoRUVOverlap => {
288 error!("Replication Critical - Consumers RUV has desynchronised and diverged! This must be immediately investigated!");
289 debug!(consumer_ranges = ?ctx_ranges);
290 debug!(supplier_ranges = ?our_ranges);
291 return Ok(ReplIncrementalContext::UnwillingToSupply);
292 }
293 };
294
295 debug!("these ranges will be supplied");
296 debug!(supply_ranges = ?ranges);
297 debug!(consumer_ranges = ?ctx_ranges);
298 debug!(supplier_ranges = ?our_ranges);
299
300 if ranges.is_empty() {
301 debug!("No Changes Available");
302 return Ok(ReplIncrementalContext::NoChangesAvailable);
303 }
304
305 let entries = self.get_be_txn().retrieve_range(&ranges).map_err(|e| {
309 admin_error!(?e, "backend failure");
310 OperationError::Backend
311 })?;
312
313 let (schema_entries, rem_entries): (Vec<_>, Vec<_>) = entries.into_iter().partition(|e| {
315 e.get_ava_set(Attribute::Class)
316 .map(|cls| {
317 cls.contains(&EntryClass::AttributeType.into() as &PartialValue)
318 || cls.contains(&EntryClass::ClassType.into() as &PartialValue)
319 })
320 .unwrap_or(false)
321 });
322
323 let (meta_entries, entries): (Vec<_>, Vec<_>) = rem_entries.into_iter().partition(|e| {
324 e.get_ava_set(Attribute::Class)
325 .map(|cls| {
326 cls.contains(&EntryClass::DomainInfo.into() as &PartialValue)
327 || cls.contains(&EntryClass::SystemInfo.into() as &PartialValue)
328 || cls.contains(&EntryClass::SystemConfig.into() as &PartialValue)
329 || cls.contains(&EntryClass::KeyProvider.into() as &PartialValue)
330 })
331 .unwrap_or(false)
332 });
333
334 trace!(?schema_entries);
335 trace!(?meta_entries);
336 trace!(?entries);
337
338 let schema = self.get_schema();
342 let domain_version = self.d_info.d_vers;
343 let domain_patch_level = if self.d_info.d_devel_taint {
344 u32::MAX
345 } else {
346 self.d_info.d_patch_level
347 };
348 let domain_uuid = self.d_info.d_uuid;
349
350 let schema_entries: Vec<_> = schema_entries
351 .into_iter()
352 .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
353 .collect();
354
355 let meta_entries: Vec<_> = meta_entries
356 .into_iter()
357 .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
358 .collect();
359
360 let entries: Vec<_> = entries
361 .into_iter()
362 .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
363 .collect();
364
365 let supplier_ruv = self.get_be_txn().get_ruv();
367 let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
368
369 Ok(ReplIncrementalContext::V1 {
371 domain_version,
372 domain_patch_level,
373 domain_uuid,
374 ranges,
375 schema_entries,
376 meta_entries,
377 entries,
378 })
379 }
380
381 #[instrument(level = "debug", skip_all)]
382 pub fn supplier_provide_refresh(&mut self) -> Result<ReplRefreshContext, OperationError> {
383 let schema = self.get_schema();
385
386 let domain_version = self.d_info.d_vers;
390 let domain_devel = self.d_info.d_devel_taint;
391 let domain_uuid = self.d_info.d_uuid;
392
393 let trim_cid = self.trim_cid().clone();
394
395 let ranges = self
397 .get_be_txn()
398 .get_ruv()
399 .filter_ruv_range(&trim_cid)
400 .map_err(|e| {
401 error!(err = ?e, "Unable to access supplier RUV range");
402 e
403 })?;
404
405 let schema_filter_inner = f_or!([
412 f_eq(Attribute::Class, EntryClass::AttributeType.into()),
413 f_eq(Attribute::Class, EntryClass::ClassType.into()),
414 ]);
415
416 let schema_filter = filter!(schema_filter_inner.clone());
417
418 let meta_filter_inner = f_or!([
419 f_eq(Attribute::Class, EntryClass::DomainInfo.into()),
420 f_eq(Attribute::Class, EntryClass::SystemInfo.into()),
421 f_eq(Attribute::Class, EntryClass::SystemConfig.into()),
422 f_eq(Attribute::Class, EntryClass::KeyProvider.into()),
423 ]);
424
425 let meta_filter = filter!(meta_filter_inner.clone());
426
427 let entry_filter = filter_all!(f_or!([
428 f_and!([
429 f_pres(Attribute::Class),
430 f_andnot(f_or(vec![schema_filter_inner, meta_filter_inner])),
431 ]),
432 f_eq(Attribute::Class, EntryClass::Tombstone.into()),
433 f_eq(Attribute::Class, EntryClass::Recycled.into()),
434 ]));
435
436 let schema_entries = self
437 .internal_search(schema_filter)
438 .map(|ent| {
439 ent.into_iter()
440 .map(|e| ReplEntryV1::new(e.as_ref(), schema))
441 .collect()
442 })
443 .inspect_err(|err| {
444 error!(?err, "Failed to access schema entries");
445 })?;
446
447 let meta_entries = self
448 .internal_search(meta_filter)
449 .map(|ent| {
450 ent.into_iter()
451 .map(|e| ReplEntryV1::new(e.as_ref(), schema))
452 .collect()
453 })
454 .inspect_err(|err| {
455 error!(?err, "Failed to access meta entries");
456 })?;
457
458 let entries = self
459 .internal_search(entry_filter)
460 .map(|ent| {
461 ent.into_iter()
462 .map(|e| ReplEntryV1::new(e.as_ref(), schema))
463 .collect()
464 })
465 .inspect_err(|err| {
466 error!(?err, "Failed to access entries");
467 })?;
468
469 let supplier_ruv = self.get_be_txn().get_ruv();
471 let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
472
473 Ok(ReplRefreshContext::V1 {
474 domain_version,
475 domain_devel,
476 domain_uuid,
477 ranges,
478 schema_entries,
479 meta_entries,
480 entries,
481 })
482 }
483}