kanidmd_lib/plugins/
attrunique.rs

1// Attribute uniqueness plugin. We read the schema and determine if the
2// value should be unique, and how to handle if it is not. This will
3// matter a lot when it comes to replication based on first-wins or
4// both change approaches.
5//
6//
7use std::collections::VecDeque;
8use std::collections::{BTreeMap, BTreeSet};
9use std::sync::Arc;
10
11use tracing::trace;
12
13use crate::event::{CreateEvent, ModifyEvent};
14use crate::plugins::Plugin;
15use crate::prelude::*;
16use crate::schema::SchemaTransaction;
17
18pub struct AttrUnique;
19
20fn get_cand_attr_set<'a, VALID: 'a, STATE: 'a, T>(
21    // cand: &[Entry<VALID, STATE>],
22    cand: T,
23    uniqueattrs: &[Attribute],
24) -> Result<BTreeMap<(Attribute, PartialValue), Vec<Uuid>>, OperationError>
25where
26    T: IntoIterator<Item = &'a Entry<VALID, STATE>>,
27{
28    let mut cand_attr: BTreeMap<(Attribute, PartialValue), Vec<Uuid>> = BTreeMap::new();
29
30    cand.into_iter()
31        // We don't need to consider recycled or tombstoned entries
32        .filter_map(|e| e.mask_recycled_ts())
33        .try_for_each(|e| {
34            let uuid = e
35                .get_ava_single_uuid(Attribute::Uuid)
36                .ok_or_else(|| {
37                    error!("An entry is missing its uuid. This should be impossible!");
38                    OperationError::InvalidEntryState
39                })?;
40
41            // Faster to iterate over the attr vec inside this loop.
42            for attr in uniqueattrs.iter() {
43                if let Some(vs) = e.get_ava_set(attr) {
44                for pv in vs.to_partialvalue_iter() {
45                    let key = (attr.clone(), pv);
46                    cand_attr.entry(key)
47                        // Must have conflicted, lets append.
48                        .and_modify(|v| {
49                            warn!(
50                                "ava already exists -> {:?} on entry {:?} has conflicts within change set",
51                                attr,
52                                e.get_display_id()
53                            );
54                            v.push(uuid)
55                        })
56                        // Not found, lets setup.
57                        .or_insert_with(|| vec![uuid]);
58                    }
59                }
60            }
61
62            Ok(())
63        })
64        .map(|()| cand_attr)
65}
66
67fn enforce_unique<VALID, STATE>(
68    qs: &mut QueryServerWriteTransaction,
69    cand: &[Entry<VALID, STATE>],
70) -> Result<(), OperationError> {
71    let uniqueattrs = {
72        let schema = qs.get_schema();
73        schema.get_attributes_unique()
74    };
75
76    // Build a set of all the value -> uuid for the cands.
77    // If already exist, reject due to dup.
78    let cand_attr_set = get_cand_attr_set(cand, uniqueattrs).map_err(|e| {
79        error!(err = ?e, "failed to get cand attr set");
80        e
81    })?;
82
83    // No candidates to check!
84    if cand_attr_set.is_empty() {
85        return Ok(());
86    }
87
88    // Now we have to identify and error on anything that has multiple items.
89    let mut cand_attr = Vec::with_capacity(cand_attr_set.len());
90    let mut err = false;
91    for (key, mut uuid_set) in cand_attr_set.into_iter() {
92        if let Some(uuid) = uuid_set.pop() {
93            if uuid_set.is_empty() {
94                // Good, only single uuid, this can proceed.
95                cand_attr.push((key, uuid));
96            } else {
97                // Multiple uuid(s) may remain, this is a conflict. We already warned on it
98                // before in the processing. Do we need to warn again?
99                err = true;
100            }
101        } else {
102            // Corrupt? How did we even get here?
103            warn!("datastructure corruption occurred while processing candidate attribute set");
104            debug_assert!(false);
105            return Err(OperationError::Plugin(PluginError::AttrUnique(
106                "corruption detected".to_string(),
107            )));
108        }
109    }
110
111    if err {
112        return Err(OperationError::Plugin(PluginError::AttrUnique(
113            "duplicate value detected".to_string(),
114        )));
115    }
116
117    // Now do an internal search on name and !uuid for each
118    let mut cand_filters = Vec::with_capacity(0);
119    for ((attr, v), uuid) in cand_attr.iter() {
120        // and[ attr eq k, andnot [ uuid eq v ]]
121        // Basically this says where name but also not self.
122        cand_filters.push(f_and(vec![
123            FC::Eq(attr.clone(), v.clone()),
124            f_andnot(FC::Eq(Attribute::Uuid, PartialValue::Uuid(*uuid))),
125        ]));
126    }
127
128    // Or
129    let filt_in = filter!(f_or(cand_filters.clone()));
130
131    trace!(?filt_in);
132
133    // If any results, reject.
134    let conflict_cand = qs.internal_exists(filt_in).map_err(|e| {
135        admin_error!("internal exists error {:?}", e);
136        e
137    })?;
138
139    // TODO! Need to make this show what conflicted!
140    // We can probably bisect over the filter to work this out?
141
142    if conflict_cand {
143        // Some kind of conflict exists. We need to isolate which parts of the filter were suspect.
144        // To do this, we bisect over the filter and it's suspect elements.
145        //
146        // In most cases there is likely only 1 suspect element. But in some there are more. To make
147        // this process faster we "bisect" over chunks of the filter remaining until we have only single elements left.
148        //
149        // We do a bisect rather than a linear one-at-a-time search because we want to try to somewhat minimise calls
150        // through internal exists since that has a filter resolve and validate step.
151
152        // Fast-ish path. There is 0 or 1 element, so we just fast return.
153        if cand_filters.len() < 2 {
154            error!(
155                ?cand_filters,
156                "The following filter conditions failed to assert uniqueness"
157            );
158        } else {
159            // First iteration, we already failed and we know that, so we just prime and setup two
160            // chunks here.
161
162            let mid = cand_filters.len() / 2;
163            let (left, right) = cand_filters.split_at(mid);
164
165            let mut queue = VecDeque::new();
166            queue.push_back(left);
167            queue.push_back(right);
168
169            // Ok! We are setup to go
170
171            while let Some(cand_query) = queue.pop_front() {
172                let filt_in = filter!(f_or(cand_query.to_vec()));
173                let conflict_cand = qs.internal_search(filt_in).map_err(|e| {
174                    admin_error!("internal exists error {:?}", e);
175                    e
176                })?;
177
178                // A conflict was found!
179                if let Some(conflict_cand_zero) = conflict_cand.first() {
180                    if cand_query.len() >= 2 {
181                        // Continue to split to isolate.
182                        let mid = cand_query.len() / 2;
183                        let (left, right) = cand_query.split_at(mid);
184                        queue.push_back(left);
185                        queue.push_back(right);
186                        // Continue!
187                    } else {
188                        // Report this as a failing query.
189                        error!(cand_filters = ?cand_query, conflicting_with = %conflict_cand_zero.get_display_id(), "The following filter conditions failed to assert uniqueness");
190                    }
191                }
192            }
193            // End logging / warning iterator
194        }
195
196        Err(OperationError::Plugin(PluginError::AttrUnique(
197            "duplicate value detected".to_string(),
198        )))
199    } else {
200        // If all okay, okay!
201        Ok(())
202    }
203}
204
205impl Plugin for AttrUnique {
206    fn id() -> &'static str {
207        "plugin_attrunique"
208    }
209
210    #[instrument(level = "debug", name = "attrunique_pre_create_transform", skip_all)]
211    fn pre_create_transform(
212        qs: &mut QueryServerWriteTransaction,
213        cand: &mut Vec<Entry<EntryInvalid, EntryNew>>,
214        _ce: &CreateEvent,
215    ) -> Result<(), OperationError> {
216        enforce_unique(qs, cand)
217    }
218
219    #[instrument(level = "debug", name = "attrunique_pre_modify", skip_all)]
220    fn pre_modify(
221        qs: &mut QueryServerWriteTransaction,
222        _pre_cand: &[Arc<EntrySealedCommitted>],
223        cand: &mut Vec<Entry<EntryInvalid, EntryCommitted>>,
224        _me: &ModifyEvent,
225    ) -> Result<(), OperationError> {
226        enforce_unique(qs, cand)
227    }
228
229    #[instrument(level = "debug", name = "attrunique_pre_batch_modify", skip_all)]
230    fn pre_batch_modify(
231        qs: &mut QueryServerWriteTransaction,
232        _pre_cand: &[Arc<EntrySealedCommitted>],
233        cand: &mut Vec<Entry<EntryInvalid, EntryCommitted>>,
234        _me: &BatchModifyEvent,
235    ) -> Result<(), OperationError> {
236        enforce_unique(qs, cand)
237    }
238
239    #[instrument(level = "debug", name = "attrunique_pre_repl_refresh", skip_all)]
240    fn pre_repl_refresh(
241        qs: &mut QueryServerWriteTransaction,
242        cand: &[EntryRefreshNew],
243    ) -> Result<(), OperationError> {
244        enforce_unique(qs, cand)
245    }
246
247    #[instrument(level = "debug", name = "attrunique_post_repl_incremental", skip_all)]
248    fn post_repl_incremental_conflict(
249        qs: &mut QueryServerWriteTransaction,
250        cand: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
251        conflict_uuids: &mut BTreeSet<Uuid>,
252    ) -> Result<(), OperationError> {
253        // We need to detect attribute unique violations here. This can *easily* happen in
254        // replication since we have two nodes where different entries can modify an attribute
255        // and on the next incremental replication the uniqueness violation occurs.
256        //
257        // Because of this we have some key properties that we can observe.
258        //
259        // Every node when it makes a change with regard to its own content is already compliant
260        // to attribute uniqueness. This means the consumers db content before we begin is
261        // fully consistent.
262        //
263        // As attributes can be updated multiple times before it is replicated the cid of the
264        // attribute may not be a true reflection of order of events when considering which
265        // attribute-value should survive/conflict.
266        //
267        // Attribute uniqueness constraints can *only* be violated on entries that have been
268        // replicated or are involved in replication (e.g. a conflict survivor entry).
269        //
270        // The content of the cand set may contain both replicated entries and conflict survivors
271        // that are in the process of being updated. Entries within the cand set *may* be in
272        // a conflict state with each other.
273        //
274        // Since this is a post operation, the content of these cand entries is *also* current
275        // in the database.
276        //
277        // This means that:
278        // * We can build a set of attr unique queries from the cand set.
279        // * We can ignore conflicts while building that set.
280        // * Any conflicts detected in the DB on executing that filter would be a super set of the
281        //   conflicts that exist in reality.
282        // * All entries that are involved in the attr unique collision must become conflicts.
283
284        let uniqueattrs = {
285            let schema = qs.get_schema();
286            schema.get_attributes_unique()
287        };
288
289        // Build a set of all the value -> uuid for the cands.
290        // If already exist, reject due to dup.
291        let cand_attr_set =
292            get_cand_attr_set(cand.iter().map(|(e, _)| e), uniqueattrs).map_err(|e| {
293                error!(err = ?e, "failed to get cand attr set");
294                e
295            })?;
296
297        // No candidates to check!
298        if cand_attr_set.is_empty() {
299            return Ok(());
300        }
301
302        // HAPPY FAST PATH - we do the fast existence query and if it passes
303        // we can *proceed*, nothing has conflicted.
304        let cand_filters: Vec<_> = cand_attr_set
305            .iter()
306            .flat_map(|((attr, v), uuids)| {
307                uuids.iter().map(|uuid| {
308                    // and[ attr eq k, andnot [ uuid eq v ]]
309                    // Basically this says where name but also not self.
310                    f_and(vec![
311                        FC::Eq(attr.clone(), v.clone()),
312                        f_andnot(FC::Eq(Attribute::Uuid, PartialValue::Uuid(*uuid))),
313                    ])
314                })
315            })
316            .collect();
317
318        let filt_in = filter!(f_or(cand_filters));
319
320        trace!(?filt_in);
321
322        // If any results, reject.
323        let conflict_cand = qs.internal_exists(filt_in).map_err(|e| {
324            admin_error!("internal exists error {:?}", e);
325            e
326        })?;
327
328        if conflict_cand {
329            // Unlike enforce unique, we need to be more thorough here. Enforce unique
330            // just has to block the whole operation. We *can't* fail the operation
331            // in the same way, we need to individually isolate each collision to
332            // turn all the involved entries into conflicts. Because of this, rather
333            // than bisection like we do in enforce_unique to find violating entries
334            // for admins to read, we need to actually isolate each and every conflicting
335            // uuid. To achieve this we need to change the structure of the query we perform
336            // to actually get everything that has a conflict now.
337
338            // For each uuid, show the set of uuids this conflicts with.
339            let mut conflict_uuid_map: BTreeMap<Uuid, BTreeSet<Uuid>> = BTreeMap::new();
340
341            // We need to invert this now to have a set of uuid: Vec<(attr, pv)>
342            // rather than the other direction which was optimised for the detection of
343            // candidate conflicts during updates.
344
345            let mut cand_attr_map: BTreeMap<Uuid, BTreeSet<_>> = BTreeMap::new();
346
347            cand_attr_set.into_iter().for_each(|(key, uuids)| {
348                uuids.into_iter().for_each(|uuid| {
349                    cand_attr_map
350                        .entry(uuid)
351                        .and_modify(|set| {
352                            set.insert(key.clone());
353                        })
354                        .or_insert_with(|| {
355                            let mut set = BTreeSet::new();
356                            set.insert(key.clone());
357                            set
358                        });
359                })
360            });
361
362            for (uuid, ava_set) in cand_attr_map.into_iter() {
363                let cand_filters: Vec<_> = ava_set
364                    .iter()
365                    .map(|(attr, pv)| {
366                        f_and(vec![
367                            FC::Eq(attr.clone(), pv.clone()),
368                            f_andnot(FC::Eq(Attribute::Uuid, PartialValue::Uuid(uuid))),
369                        ])
370                    })
371                    .collect();
372
373                let filt_in = filter!(f_or(cand_filters.clone()));
374
375                let filt_conflicts = qs.internal_search(filt_in).map_err(|e| {
376                    admin_error!("internal search error {:?}", e);
377                    e
378                })?;
379
380                // Important! This needs to conflict in *both directions*. We have to
381                // indicate that uuid has been conflicted by the entries in filt_conflicts,
382                // but also that the entries in filt_conflicts now conflict on us! Also remember
383                // that entries in either direction *may already* be in the conflict map, so we
384                // need to be very careful here not to stomp anything - append only!
385                if !filt_conflicts.is_empty() {
386                    let mut conflict_uuid_set = BTreeSet::new();
387
388                    for e in filt_conflicts {
389                        // Mark that this entry conflicted to us.
390                        conflict_uuid_set.insert(e.get_uuid());
391                        // Mark that the entry needs to conflict against us.
392                        conflict_uuid_map
393                            .entry(e.get_uuid())
394                            .and_modify(|set| {
395                                set.insert(uuid);
396                            })
397                            .or_insert_with(|| {
398                                let mut set = BTreeSet::new();
399                                set.insert(uuid);
400                                set
401                            });
402                    }
403
404                    conflict_uuid_map
405                        .entry(uuid)
406                        .and_modify(|set| set.append(&mut conflict_uuid_set))
407                        .or_insert_with(|| conflict_uuid_set);
408                }
409            }
410
411            trace!(?conflict_uuid_map);
412
413            if conflict_uuid_map.is_empty() {
414                error!("Impossible state. Attribute unique conflicts were detected in fast path, but were not found in slow path.");
415                return Err(OperationError::InvalidState);
416            }
417
418            // Now get all these values out with modify writable
419
420            let filt = filter!(FC::Or(
421                conflict_uuid_map
422                    .keys()
423                    .map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(*u)))
424                    .collect()
425            ));
426
427            let mut work_set = qs.internal_search_writeable(&filt)?;
428
429            for (_, entry) in work_set.iter_mut() {
430                let Some(uuid) = entry.get_uuid() else {
431                    error!("Impossible state. Entry that was declared in conflict map does not have a uuid.");
432                    return Err(OperationError::InvalidState);
433                };
434
435                // Add the uuid to the conflict uuids now.
436                conflict_uuids.insert(uuid);
437
438                if let Some(conflict_uuid_set) = conflict_uuid_map.get(&uuid) {
439                    entry.to_conflict(conflict_uuid_set.iter().copied())
440                } else {
441                    error!("Impossible state. Entry that was declared in conflict map was not present in work set.");
442                    return Err(OperationError::InvalidState);
443                }
444            }
445
446            qs.internal_apply_writable(work_set).map_err(|e| {
447                admin_error!("Failed to commit memberof group set {:?}", e);
448                e
449            })?;
450
451            // Okay we *finally got here. We are done!
452            Ok(())
453        } else {
454            // 🎉
455            Ok(())
456        }
457    }
458
459    #[instrument(level = "debug", name = "attrunique::verify", skip_all)]
460    fn verify(qs: &mut QueryServerReadTransaction) -> Vec<Result<(), ConsistencyError>> {
461        // Only check live entries, not recycled.
462        let filt_in = filter!(f_pres(Attribute::Class));
463
464        let all_cand = match qs
465            .internal_search(filt_in)
466            .map_err(|_| Err(ConsistencyError::QueryServerSearchFailure))
467        {
468            Ok(all_cand) => all_cand,
469            Err(e) => return vec![e],
470        };
471
472        let all_cand: Vec<_> = all_cand.into_iter().map(|e| e.as_ref().clone()).collect();
473
474        let uniqueattrs = {
475            let schema = qs.get_schema();
476            schema.get_attributes_unique()
477        };
478
479        let mut res: Vec<Result<(), ConsistencyError>> = Vec::with_capacity(0);
480
481        if get_cand_attr_set(&all_cand, uniqueattrs).is_err() {
482            res.push(Err(ConsistencyError::DuplicateUniqueAttribute))
483        }
484
485        trace!(?res);
486
487        res
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use crate::prelude::*;
494
495    // Test entry in db, and same name, reject.
496    #[test]
497    fn test_pre_create_name_unique() {
498        let e: Entry<EntryInit, EntryNew> = entry_init!(
499            (Attribute::Class, EntryClass::Person.to_value()),
500            (Attribute::Class, EntryClass::Account.to_value()),
501            (Attribute::Name, Value::new_iname("testperson")),
502            (Attribute::Description, Value::new_utf8s("testperson")),
503            (Attribute::DisplayName, Value::new_utf8s("testperson"))
504        );
505
506        let create = vec![e.clone()];
507        let preload = vec![e];
508
509        run_create_test!(
510            Err(OperationError::Plugin(PluginError::AttrUnique(
511                "duplicate value detected".to_string()
512            ))),
513            preload,
514            create,
515            None,
516            |_| {}
517        );
518    }
519
520    // Test two entries in create that would have same name, reject.
521    #[test]
522    fn test_pre_create_name_unique_2() {
523        let e: Entry<EntryInit, EntryNew> = entry_init!(
524            (Attribute::Class, EntryClass::Person.to_value()),
525            (Attribute::Class, EntryClass::Account.to_value()),
526            (Attribute::Name, Value::new_iname("testperson")),
527            (Attribute::Description, Value::new_utf8s("testperson")),
528            (Attribute::DisplayName, Value::new_utf8s("testperson"))
529        );
530
531        let create = vec![e.clone(), e];
532        let preload = Vec::with_capacity(0);
533
534        run_create_test!(
535            Err(OperationError::Plugin(PluginError::AttrUnique(
536                "ava already exists".to_string()
537            ))),
538            preload,
539            create,
540            None,
541            |_| {}
542        );
543    }
544
545    // Remember, an entry can't have a duplicate value within itself so we don't need to
546    // test this case.
547
548    // A mod to something that exists, reject.
549    #[test]
550    fn test_pre_modify_name_unique() {
551        let ea: Entry<EntryInit, EntryNew> = entry_init!(
552            (Attribute::Class, EntryClass::Group.to_value()),
553            (Attribute::Name, Value::new_iname("testgroup_a")),
554            (Attribute::Description, Value::new_utf8s("testgroup"))
555        );
556        let eb: Entry<EntryInit, EntryNew> = entry_init!(
557            (Attribute::Class, EntryClass::Group.to_value()),
558            (Attribute::Name, Value::new_iname("testgroup_b")),
559            (Attribute::Description, Value::new_utf8s("testgroup"))
560        );
561
562        let preload = vec![ea, eb];
563
564        run_modify_test!(
565            Err(OperationError::Plugin(PluginError::AttrUnique(
566                "duplicate value detected".to_string()
567            ))),
568            preload,
569            filter!(f_or!([f_eq(
570                Attribute::Name,
571                PartialValue::new_iname("testgroup_b")
572            ),])),
573            ModifyList::new_list(vec![
574                Modify::Purged(Attribute::Name),
575                Modify::Present(Attribute::Name, Value::new_iname("testgroup_a"))
576            ]),
577            None,
578            |_| {},
579            |_| {}
580        );
581    }
582
583    // Two items modded to have the same value, reject.
584    #[test]
585    fn test_pre_modify_name_unique_2() {
586        let ea: Entry<EntryInit, EntryNew> = entry_init!(
587            (Attribute::Class, EntryClass::Group.to_value()),
588            (Attribute::Name, Value::new_iname("testgroup_a")),
589            (Attribute::Description, Value::new_utf8s("testgroup"))
590        );
591        let eb: Entry<EntryInit, EntryNew> = entry_init!(
592            (Attribute::Class, EntryClass::Group.to_value()),
593            (Attribute::Name, Value::new_iname("testgroup_b")),
594            (Attribute::Description, Value::new_utf8s("testgroup"))
595        );
596
597        let preload = vec![ea, eb];
598
599        run_modify_test!(
600            Err(OperationError::Plugin(PluginError::AttrUnique(
601                "ava already exists".to_string()
602            ))),
603            preload,
604            filter!(f_or!([
605                f_eq(Attribute::Name, PartialValue::new_iname("testgroup_a")),
606                f_eq(Attribute::Name, PartialValue::new_iname("testgroup_b")),
607            ])),
608            ModifyList::new_list(vec![
609                Modify::Purged(Attribute::Name),
610                Modify::Present(Attribute::Name, Value::new_iname("testgroup"))
611            ]),
612            None,
613            |_| {},
614            |_| {}
615        );
616    }
617
618    #[test]
619    fn test_verify_name_unique() {
620        // Can we preload two dups and verify to show we detect?
621    }
622}