kanidmd_lib/repl/
supplier.rs

1use super::proto::{
2    ReplEntryV1, ReplIncrementalContext, ReplIncrementalEntryV1, ReplRefreshContext, ReplRuvRange,
3};
4use super::ruv::{RangeDiffStatus, ReplicationUpdateVector, ReplicationUpdateVectorTransaction};
5use crate::be::keystorage::{KeyHandle, KeyHandleId};
6use crate::be::BackendTransaction;
7use crate::prelude::*;
8use crypto_glue::{
9    der::SecretDocument,
10    ecdsa_p256::{self, EcdsaP256DerSignature, EcdsaP256SigningKey, EcdsaP256VerifyingKey},
11    rand,
12    traits::Pkcs8EncodePrivateKey,
13    x509::{
14        self, oiddb, Builder, Certificate, CertificateBuilder, ExtendedKeyUsage, GeneralName,
15        GeneralizedTime, Ia5String, OctetString, SubjectAltName, SubjectPublicKeyInfoOwned,
16    },
17};
18use rustls::pki_types::{IpAddr, ServerName};
19use std::str::FromStr;
20
21impl QueryServerWriteTransaction<'_> {
22    fn supplier_generate_key_cert(
23        &mut self,
24        domain_name: &str,
25    ) -> Result<(SecretDocument, Certificate), OperationError> {
26        // Invalid, must need to re-generate.
27        let s_uuid = self.get_server_uuid();
28
29        let private_key = ecdsa_p256::new_key();
30        let signing_key = EcdsaP256SigningKey::from(&private_key);
31        let verifying_key = EcdsaP256VerifyingKey::from(&signing_key);
32
33        let pub_key = SubjectPublicKeyInfoOwned::from_key(verifying_key).map_err(|err| {
34            error!(?err, "Unable to create public key from private key");
35            OperationError::CryptographyError
36        })?;
37
38        // Indicate that this is self-signed.
39        let profile = x509::Profile::Manual { issuer: None };
40
41        let not_before = GeneralizedTime::from_unix_duration(self.get_curtime())
42            .map(x509::Time::from)
43            .map_err(|err| {
44                error!(?err, "Unable to convert current time to GeneralizedTime");
45                OperationError::CryptographyError
46            })?;
47        let not_after = GeneralizedTime::from_unix_duration(
48            self.get_curtime() + Duration::from_secs(REPL_MTLS_CERTIFICATE_EXPIRY),
49        )
50        .map(x509::Time::from)
51        .map_err(|err| {
52            error!(?err, "Unable to convert current time to GeneralizedTime");
53            OperationError::CryptographyError
54        })?;
55
56        let validity = x509::Validity {
57            not_before,
58            not_after,
59        };
60
61        let serial_number = x509::uuid_to_serial(s_uuid);
62        let subject =
63            x509::Name::from_str(&format!("O=Kanidm Replication,CN={s_uuid}")).map_err(|err| {
64                error!(?err, "Unable to parse subject dn");
65                OperationError::CryptographyError
66            })?;
67
68        let mut x509_builder = CertificateBuilder::new(
69            profile,
70            serial_number,
71            validity,
72            subject,
73            pub_key,
74            &signing_key,
75        )
76        .map_err(|err| {
77            error!(?err, "Unable to construct certificate builder");
78            OperationError::CryptographyError
79        })?;
80
81        // Key Usage (server + client )
82        let eku_extension = ExtendedKeyUsage(vec![
83            oiddb::rfc5280::ID_KP_CLIENT_AUTH,
84            oiddb::rfc5280::ID_KP_SERVER_AUTH,
85        ]);
86
87        x509_builder.add_extension(&eku_extension).map_err(|err| {
88            error!(?err, "Unable to add extended key usage extension");
89            OperationError::CryptographyError
90        })?;
91
92        // Subject Alt Name
93        // We need to understand how rustls treats this to issue the correct SAN value.
94        // How does rustls interpret this name?
95        let Ok(server_name) = ServerName::try_from(domain_name.to_owned()) else {
96            error!("Invalid server name for replication");
97            return Err(OperationError::CryptographyError);
98        };
99
100        let subject_alt_name = match server_name {
101            ServerName::DnsName(_) => {
102                let alt_name = Ia5String::new(domain_name).map_err(|err| {
103                    error!(?err, "Invalid subject alt name");
104                    OperationError::CryptographyError
105                })?;
106                SubjectAltName(vec![GeneralName::DnsName(alt_name)])
107            }
108            ServerName::IpAddress(IpAddr::V4(ipv4_addr)) => {
109                let ip_address = OctetString::new(ipv4_addr.as_ref()).map_err(|err| {
110                    error!(?err, "Unable to convert ipv4 address to octet string");
111                    OperationError::CryptographyError
112                })?;
113                SubjectAltName(vec![GeneralName::IpAddress(ip_address)])
114            }
115            ServerName::IpAddress(IpAddr::V6(ipv6_addr)) => {
116                let ip_address = OctetString::new(ipv6_addr.as_ref()).map_err(|err| {
117                    error!(?err, "Unable to convert ipv6 address to octet string");
118                    OperationError::CryptographyError
119                })?;
120                SubjectAltName(vec![GeneralName::IpAddress(ip_address)])
121            }
122
123            _ => return Err(OperationError::CryptographyError),
124        };
125
126        x509_builder
127            .add_extension(&subject_alt_name)
128            .map_err(|err| {
129                error!(?err, "Unable to add subject alt name");
130                OperationError::CryptographyError
131            })?;
132
133        let mut rng = rand::thread_rng();
134        let x509 = x509_builder
135            .build_with_rng::<EcdsaP256DerSignature>(&mut rng)
136            .map_err(|err| {
137                error!(?err, "Unable to generate self signed key/cert");
138                OperationError::CryptographyError
139            })?;
140
141        let private = private_key.to_pkcs8_der().map_err(|err| {
142            error!(?err, "Unable to encode private key");
143            OperationError::CryptographyError
144        })?;
145
146        let kh = KeyHandle::X509Key {
147            private: private.clone(),
148            x509: x509.clone(),
149        };
150
151        self.get_be_txn()
152            .set_key_handle(KeyHandleId::ReplicationKey, kh)
153            .map_err(|err| {
154                error!(?err, "Unable to persist replication key");
155                err
156            })
157            .map(|()| (private, x509))
158    }
159
160    #[instrument(level = "info", skip_all)]
161    pub fn supplier_renew_key_cert(&mut self, domain_name: &str) -> Result<(), OperationError> {
162        self.supplier_generate_key_cert(domain_name).map(|_| ())
163    }
164
165    #[instrument(level = "info", skip_all)]
166    pub fn supplier_get_key_cert(
167        &mut self,
168        domain_name: &str,
169    ) -> Result<(SecretDocument, Certificate), OperationError> {
170        // Later we need to put this through a HSM or similar, but we will always need a way
171        // to persist a handle, so we still need the db write and load components.
172
173        // Does the handle exist?
174        let maybe_key_handle = self
175            .get_be_txn()
176            .get_key_handle(KeyHandleId::ReplicationKey)
177            .map_err(|err| {
178                error!(?err, "Unable to access replication key");
179                err
180            })?;
181
182        // Can you process the keyhandle?
183        let key_cert = match maybe_key_handle {
184            Some(KeyHandle::X509Key { private, x509 }) => (private, x509),
185            /*
186            Some(Keyhandle::...) => {
187                // invalid key
188                // error? regenerate?
189            }
190            */
191            None => self.supplier_generate_key_cert(domain_name)?,
192        };
193
194        Ok(key_cert)
195    }
196}
197
198impl QueryServerReadTransaction<'_> {
199    // Given a consumers state, calculate the differential of changes they
200    // need to be sent to bring them to the equivalent state.
201
202    // We use the RUV or Cookie to determine if:
203    // * The consumer requires a full-reinit.
204    // * Which entry attr-states need to be sent, if any
205
206    #[instrument(level = "debug", skip_all)]
207    pub fn supplier_provide_changes(
208        &mut self,
209        ctx_ruv: ReplRuvRange,
210    ) -> Result<ReplIncrementalContext, OperationError> {
211        // Convert types if needed. This way we can compare ruv's correctly.
212        let (ctx_domain_uuid, ctx_ranges) = match ctx_ruv {
213            ReplRuvRange::V1 {
214                domain_uuid,
215                ranges,
216            } => (domain_uuid, ranges),
217        };
218
219        if ctx_domain_uuid != self.d_info.d_uuid {
220            error!("Replication - Consumer Domain UUID does not match our local domain uuid.");
221            debug!(consumer_domain_uuid = ?ctx_domain_uuid, supplier_domain_uuid = ?self.d_info.d_uuid);
222            return Ok(ReplIncrementalContext::DomainMismatch);
223        }
224
225        // This is a reasonably tricky part of the code, because we are attempting to do a
226        // distributed and async liveness check. What content has the consumer seen? What
227        // could they have trimmed from their own RUV?
228        //
229        // Since tombstone purging always creates an anchor, then there are always "pings"
230        // effectively going out of "empty" changes that drive the RUV forward. This assists us
231        // to detect this situation.
232        //
233        // If a server has been replicating correctly, then it should have at least *some* overlap
234        // with us since content has always advanced.
235        //
236        // If a server has "stalled" then it will have *no* overlap. This can manifest as a need
237        // to supply all ranges as though they were new because the lagging consumer has trimmed out
238        // all the old content.
239        //
240        // When a server is newly added it will have overlap because it will have refreshed from
241        // another server.
242        //
243        // When a server is "trimmed" from the RUV, it no longer influences the overlap decision
244        // because the other servers will have continued to advance.
245
246        let trim_cid = self.trim_cid().clone();
247
248        let supplier_ruv = self.get_be_txn().get_ruv();
249
250        let our_ranges = supplier_ruv.filter_ruv_range(&trim_cid).map_err(|e| {
251            error!(err = ?e, "Unable to access supplier RUV range");
252            e
253        })?;
254
255        // Compare this to our internal ranges - work out the list of entry
256        // id's that are now different.
257
258        let supply_ranges = ReplicationUpdateVector::range_diff(&ctx_ranges, &our_ranges);
259
260        // If empty, return an empty set of changes!
261
262        let ranges = match supply_ranges {
263            RangeDiffStatus::Ok(ranges) => ranges,
264            RangeDiffStatus::Refresh { lag_range } => {
265                error!("Replication - Consumer is lagging and must be refreshed.");
266                debug!(?lag_range);
267                debug!(consumer_ranges = ?ctx_ranges);
268                debug!(supplier_ranges = ?our_ranges);
269                return Ok(ReplIncrementalContext::RefreshRequired);
270            }
271            RangeDiffStatus::Unwilling { adv_range } => {
272                error!("Replication - Supplier is lagging and must be investigated.");
273                debug!(?adv_range);
274                debug!(consumer_ranges = ?ctx_ranges);
275                debug!(supplier_ranges = ?our_ranges);
276                return Ok(ReplIncrementalContext::UnwillingToSupply);
277            }
278            RangeDiffStatus::Critical {
279                lag_range,
280                adv_range,
281            } => {
282                error!(?adv_range, ?lag_range, "Replication Critical - Consumers are advanced of us, and also lagging! This must be immediately investigated!");
283                debug!(consumer_ranges = ?ctx_ranges);
284                debug!(supplier_ranges = ?our_ranges);
285                return Ok(ReplIncrementalContext::UnwillingToSupply);
286            }
287            RangeDiffStatus::NoRUVOverlap => {
288                error!("Replication Critical - Consumers RUV has desynchronised and diverged! This must be immediately investigated!");
289                debug!(consumer_ranges = ?ctx_ranges);
290                debug!(supplier_ranges = ?our_ranges);
291                return Ok(ReplIncrementalContext::UnwillingToSupply);
292            }
293        };
294
295        debug!("these ranges will be supplied");
296        debug!(supply_ranges = ?ranges);
297        debug!(consumer_ranges = ?ctx_ranges);
298        debug!(supplier_ranges = ?our_ranges);
299
300        if ranges.is_empty() {
301            debug!("No Changes Available");
302            return Ok(ReplIncrementalContext::NoChangesAvailable);
303        }
304
305        // From the set of change id's, fetch those entries.
306        // This is done by supplying the ranges to the be which extracts
307        // the entries affected by the idls in question.
308        let entries = self.get_be_txn().retrieve_range(&ranges).map_err(|e| {
309            admin_error!(?e, "backend failure");
310            OperationError::Backend
311        })?;
312
313        // Separate the entries into schema, meta and remaining.
314        let (schema_entries, rem_entries): (Vec<_>, Vec<_>) = entries.into_iter().partition(|e| {
315            e.get_ava_set(Attribute::Class)
316                .map(|cls| {
317                    cls.contains(&EntryClass::AttributeType.into() as &PartialValue)
318                        || cls.contains(&EntryClass::ClassType.into() as &PartialValue)
319                })
320                .unwrap_or(false)
321        });
322
323        let (meta_entries, entries): (Vec<_>, Vec<_>) = rem_entries.into_iter().partition(|e| {
324            e.get_ava_set(Attribute::Class)
325                .map(|cls| {
326                    cls.contains(&EntryClass::DomainInfo.into() as &PartialValue)
327                        || cls.contains(&EntryClass::SystemInfo.into() as &PartialValue)
328                        || cls.contains(&EntryClass::SystemConfig.into() as &PartialValue)
329                        || cls.contains(&EntryClass::KeyProvider.into() as &PartialValue)
330                })
331                .unwrap_or(false)
332        });
333
334        trace!(?schema_entries);
335        trace!(?meta_entries);
336        trace!(?entries);
337
338        // For each entry, determine the changes that exist on the entry that fall
339        // into the ruv range - reduce to a incremental set of changes.
340
341        let schema = self.get_schema();
342        let domain_version = self.d_info.d_vers;
343        let domain_patch_level = if self.d_info.d_devel_taint {
344            u32::MAX
345        } else {
346            self.d_info.d_patch_level
347        };
348        let domain_uuid = self.d_info.d_uuid;
349
350        let schema_entries: Vec<_> = schema_entries
351            .into_iter()
352            .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
353            .collect();
354
355        let meta_entries: Vec<_> = meta_entries
356            .into_iter()
357            .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
358            .collect();
359
360        let entries: Vec<_> = entries
361            .into_iter()
362            .map(|e| ReplIncrementalEntryV1::new(e.as_ref(), schema, &ranges))
363            .collect();
364
365        // Finally, populate the ranges with anchors from the RUV
366        let supplier_ruv = self.get_be_txn().get_ruv();
367        let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
368
369        // Build the incremental context.
370        Ok(ReplIncrementalContext::V1 {
371            domain_version,
372            domain_patch_level,
373            domain_uuid,
374            ranges,
375            schema_entries,
376            meta_entries,
377            entries,
378        })
379    }
380
381    #[instrument(level = "debug", skip_all)]
382    pub fn supplier_provide_refresh(&mut self) -> Result<ReplRefreshContext, OperationError> {
383        // Get the current schema. We use this for attribute and entry filtering.
384        let schema = self.get_schema();
385
386        // A refresh must provide
387        //
388        // * the current domain version
389        let domain_version = self.d_info.d_vers;
390        let domain_devel = self.d_info.d_devel_taint;
391        let domain_uuid = self.d_info.d_uuid;
392
393        let trim_cid = self.trim_cid().clone();
394
395        // What is the set of data we are providing?
396        let ranges = self
397            .get_be_txn()
398            .get_ruv()
399            .filter_ruv_range(&trim_cid)
400            .map_err(|e| {
401                error!(err = ?e, "Unable to access supplier RUV range");
402                e
403            })?;
404
405        // * the domain uuid
406        // * the set of schema entries
407        // * the set of non-schema entries
408        // - We must exclude certain entries and attributes!
409        //   * schema defines what we exclude!
410
411        let schema_filter_inner = f_or!([
412            f_eq(Attribute::Class, EntryClass::AttributeType.into()),
413            f_eq(Attribute::Class, EntryClass::ClassType.into()),
414        ]);
415
416        let schema_filter = filter!(schema_filter_inner.clone());
417
418        let meta_filter_inner = f_or!([
419            f_eq(Attribute::Class, EntryClass::DomainInfo.into()),
420            f_eq(Attribute::Class, EntryClass::SystemInfo.into()),
421            f_eq(Attribute::Class, EntryClass::SystemConfig.into()),
422            f_eq(Attribute::Class, EntryClass::KeyProvider.into()),
423        ]);
424
425        let meta_filter = filter!(meta_filter_inner.clone());
426
427        let entry_filter = filter_all!(f_or!([
428            f_and!([
429                f_pres(Attribute::Class),
430                f_andnot(f_or(vec![schema_filter_inner, meta_filter_inner])),
431            ]),
432            f_eq(Attribute::Class, EntryClass::Tombstone.into()),
433            f_eq(Attribute::Class, EntryClass::Recycled.into()),
434        ]));
435
436        let schema_entries = self
437            .internal_search(schema_filter)
438            .map(|ent| {
439                ent.into_iter()
440                    .map(|e| ReplEntryV1::new(e.as_ref(), schema))
441                    .collect()
442            })
443            .inspect_err(|err| {
444                error!(?err, "Failed to access schema entries");
445            })?;
446
447        let meta_entries = self
448            .internal_search(meta_filter)
449            .map(|ent| {
450                ent.into_iter()
451                    .map(|e| ReplEntryV1::new(e.as_ref(), schema))
452                    .collect()
453            })
454            .inspect_err(|err| {
455                error!(?err, "Failed to access meta entries");
456            })?;
457
458        let entries = self
459            .internal_search(entry_filter)
460            .map(|ent| {
461                ent.into_iter()
462                    .map(|e| ReplEntryV1::new(e.as_ref(), schema))
463                    .collect()
464            })
465            .inspect_err(|err| {
466                error!(?err, "Failed to access entries");
467            })?;
468
469        // Finally, populate the ranges with anchors from the RUV
470        let supplier_ruv = self.get_be_txn().get_ruv();
471        let ranges = supplier_ruv.get_anchored_ranges(ranges)?;
472
473        Ok(ReplRefreshContext::V1 {
474            domain_version,
475            domain_devel,
476            domain_uuid,
477            ranges,
478            schema_entries,
479            meta_entries,
480            entries,
481        })
482    }
483}