kanidmd_lib/plugins/
attrunique.rs

1// Attribute uniqueness plugin. We read the schema and determine if the
2// value should be unique, and how to handle if it is not. This will
3// matter a lot when it comes to replication based on first-wins or
4// both change approaches.
5//
6//
7use std::collections::VecDeque;
8use std::collections::{BTreeMap, BTreeSet};
9use std::sync::Arc;
10
11use tracing::trace;
12
13use crate::event::{CreateEvent, ModifyEvent};
14use crate::plugins::Plugin;
15use crate::prelude::*;
16use crate::schema::SchemaTransaction;
17
18pub struct AttrUnique;
19
20fn get_cand_attr_set<'a, VALID: 'a, STATE: 'a, T>(
21    // cand: &[Entry<VALID, STATE>],
22    cand: T,
23    uniqueattrs: &[Attribute],
24) -> Result<BTreeMap<(Attribute, PartialValue), Vec<Uuid>>, OperationError>
25where
26    T: IntoIterator<Item = &'a Entry<VALID, STATE>>,
27{
28    let mut cand_attr: BTreeMap<(Attribute, PartialValue), Vec<Uuid>> = BTreeMap::new();
29
30    cand.into_iter()
31        // We don't need to consider recycled or tombstoned entries
32        .filter_map(|e| e.mask_recycled_ts())
33        .try_for_each(|e| {
34            let uuid = e
35                .get_ava_single_uuid(Attribute::Uuid)
36                .ok_or_else(|| {
37                    error!("An entry is missing its uuid. This should be impossible!");
38                    OperationError::InvalidEntryState
39                })?;
40
41            // Faster to iterate over the attr vec inside this loop.
42            for attr in uniqueattrs.iter() {
43                if let Some(vs) = e.get_ava_set(attr) {
44                for pv in vs.to_partialvalue_iter() {
45                    let key = (attr.clone(), pv);
46                    cand_attr.entry(key)
47                        // Must have conflicted, lets append.
48                        .and_modify(|v| {
49                            warn!(
50                                "ava already exists -> {:?} on entry {:?} has conflicts within change set",
51                                attr,
52                                e.get_display_id()
53                            );
54                            v.push(uuid)
55                        })
56                        // Not found, lets setup.
57                        .or_insert_with(|| vec![uuid]);
58                    }
59                }
60            }
61
62            Ok(())
63        })
64        .map(|()| cand_attr)
65}
66
67fn enforce_unique<VALID, STATE>(
68    qs: &mut QueryServerWriteTransaction,
69    cand: &[Entry<VALID, STATE>],
70) -> Result<(), OperationError> {
71    let uniqueattrs = {
72        let schema = qs.get_schema();
73        schema.get_attributes_unique()
74    };
75
76    // Build a set of all the value -> uuid for the cands.
77    // If already exist, reject due to dup.
78    let cand_attr_set = get_cand_attr_set(cand, uniqueattrs).map_err(|e| {
79        error!(err = ?e, "failed to get cand attr set");
80        e
81    })?;
82
83    // No candidates to check!
84    if cand_attr_set.is_empty() {
85        return Ok(());
86    }
87
88    // Now we have to identify and error on anything that has multiple items.
89    let mut cand_attr = Vec::with_capacity(cand_attr_set.len());
90    let mut err = false;
91    for (key, mut uuid_set) in cand_attr_set.into_iter() {
92        if let Some(uuid) = uuid_set.pop() {
93            if uuid_set.is_empty() {
94                // Good, only single uuid, this can proceed.
95                cand_attr.push((key, uuid));
96            } else {
97                // Multiple uuid(s) may remain, this is a conflict. We already warned on it
98                // before in the processing. Do we need to warn again?
99                err = true;
100            }
101        } else {
102            // Corrupt? How did we even get here?
103            warn!("datastructure corruption occurred while processing candidate attribute set");
104            debug_assert!(false);
105            return Err(OperationError::Plugin(PluginError::AttrUnique(
106                "corruption detected".to_string(),
107            )));
108        }
109    }
110
111    if err {
112        return Err(OperationError::Plugin(PluginError::AttrUnique(
113            "duplicate value detected".to_string(),
114        )));
115    }
116
117    // Now do an internal search on name and !uuid for each
118    let mut cand_filters = Vec::with_capacity(0);
119    for ((attr, v), uuid) in cand_attr.iter() {
120        // and[ attr eq k, andnot [ uuid eq v ]]
121        // Basically this says where name but also not self.
122        cand_filters.push(f_and(vec![
123            FC::Eq(attr.clone(), v.clone()),
124            f_andnot(FC::Eq(Attribute::Uuid, PartialValue::Uuid(*uuid))),
125        ]));
126    }
127
128    // Or
129    let filt_in = filter!(f_or(cand_filters.clone()));
130
131    trace!(?filt_in);
132
133    // If any results, reject.
134    let conflict_cand = qs.internal_exists(&filt_in).inspect_err(|err| {
135        error!(?err, "internal exists error");
136    })?;
137
138    // TODO! Need to make this show what conflicted!
139    // We can probably bisect over the filter to work this out?
140
141    if conflict_cand {
142        // Some kind of conflict exists. We need to isolate which parts of the filter were suspect.
143        // To do this, we bisect over the filter and it's suspect elements.
144        //
145        // In most cases there is likely only 1 suspect element. But in some there are more. To make
146        // this process faster we "bisect" over chunks of the filter remaining until we have only single elements left.
147        //
148        // We do a bisect rather than a linear one-at-a-time search because we want to try to somewhat minimise calls
149        // through internal exists since that has a filter resolve and validate step.
150
151        // Fast-ish path. There is 0 or 1 element, so we just fast return.
152        if cand_filters.len() < 2 {
153            error!(
154                ?cand_filters,
155                "The following filter conditions failed to assert uniqueness"
156            );
157        } else {
158            // First iteration, we already failed and we know that, so we just prime and setup two
159            // chunks here.
160
161            let mid = cand_filters.len() / 2;
162            let (left, right) = cand_filters.split_at(mid);
163
164            let mut queue = VecDeque::new();
165            queue.push_back(left);
166            queue.push_back(right);
167
168            // Ok! We are setup to go
169
170            while let Some(cand_query) = queue.pop_front() {
171                let filt_in = filter!(f_or(cand_query.to_vec()));
172                let conflict_cand = qs.internal_search(filt_in).map_err(|e| {
173                    admin_error!("internal exists error {:?}", e);
174                    e
175                })?;
176
177                // A conflict was found!
178                if let Some(conflict_cand_zero) = conflict_cand.first() {
179                    if cand_query.len() >= 2 {
180                        // Continue to split to isolate.
181                        let mid = cand_query.len() / 2;
182                        let (left, right) = cand_query.split_at(mid);
183                        queue.push_back(left);
184                        queue.push_back(right);
185                        // Continue!
186                    } else {
187                        // Report this as a failing query.
188                        error!(cand_filters = ?cand_query, conflicting_with = %conflict_cand_zero.get_display_id(), "The following filter conditions failed to assert uniqueness");
189                    }
190                }
191            }
192            // End logging / warning iterator
193        }
194
195        Err(OperationError::Plugin(PluginError::AttrUnique(
196            "duplicate value detected".to_string(),
197        )))
198    } else {
199        // If all okay, okay!
200        Ok(())
201    }
202}
203
204impl Plugin for AttrUnique {
205    fn id() -> &'static str {
206        "plugin_attrunique"
207    }
208
209    #[instrument(level = "debug", name = "attrunique_pre_create_transform", skip_all)]
210    fn pre_create_transform(
211        qs: &mut QueryServerWriteTransaction,
212        cand: &mut Vec<Entry<EntryInvalid, EntryNew>>,
213        _ce: &CreateEvent,
214    ) -> Result<(), OperationError> {
215        enforce_unique(qs, cand)
216    }
217
218    #[instrument(level = "debug", name = "attrunique_pre_modify", skip_all)]
219    fn pre_modify(
220        qs: &mut QueryServerWriteTransaction,
221        _pre_cand: &[Arc<EntrySealedCommitted>],
222        cand: &mut Vec<Entry<EntryInvalid, EntryCommitted>>,
223        _me: &ModifyEvent,
224    ) -> Result<(), OperationError> {
225        enforce_unique(qs, cand)
226    }
227
228    #[instrument(level = "debug", name = "attrunique_pre_batch_modify", skip_all)]
229    fn pre_batch_modify(
230        qs: &mut QueryServerWriteTransaction,
231        _pre_cand: &[Arc<EntrySealedCommitted>],
232        cand: &mut Vec<Entry<EntryInvalid, EntryCommitted>>,
233        _me: &BatchModifyEvent,
234    ) -> Result<(), OperationError> {
235        enforce_unique(qs, cand)
236    }
237
238    #[instrument(level = "debug", name = "attrunique_pre_repl_refresh", skip_all)]
239    fn pre_repl_refresh(
240        qs: &mut QueryServerWriteTransaction,
241        cand: &[EntryRefreshNew],
242    ) -> Result<(), OperationError> {
243        enforce_unique(qs, cand)
244    }
245
246    #[instrument(level = "debug", name = "attrunique_post_repl_incremental", skip_all)]
247    fn post_repl_incremental_conflict(
248        qs: &mut QueryServerWriteTransaction,
249        cand: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
250        conflict_uuids: &mut BTreeSet<Uuid>,
251    ) -> Result<(), OperationError> {
252        // We need to detect attribute unique violations here. This can *easily* happen in
253        // replication since we have two nodes where different entries can modify an attribute
254        // and on the next incremental replication the uniqueness violation occurs.
255        //
256        // Because of this we have some key properties that we can observe.
257        //
258        // Every node when it makes a change with regard to its own content is already compliant
259        // to attribute uniqueness. This means the consumers db content before we begin is
260        // fully consistent.
261        //
262        // As attributes can be updated multiple times before it is replicated the cid of the
263        // attribute may not be a true reflection of order of events when considering which
264        // attribute-value should survive/conflict.
265        //
266        // Attribute uniqueness constraints can *only* be violated on entries that have been
267        // replicated or are involved in replication (e.g. a conflict survivor entry).
268        //
269        // The content of the cand set may contain both replicated entries and conflict survivors
270        // that are in the process of being updated. Entries within the cand set *may* be in
271        // a conflict state with each other.
272        //
273        // Since this is a post operation, the content of these cand entries is *also* current
274        // in the database.
275        //
276        // This means that:
277        // * We can build a set of attr unique queries from the cand set.
278        // * We can ignore conflicts while building that set.
279        // * Any conflicts detected in the DB on executing that filter would be a super set of the
280        //   conflicts that exist in reality.
281        // * All entries that are involved in the attr unique collision must become conflicts.
282
283        let uniqueattrs = {
284            let schema = qs.get_schema();
285            schema.get_attributes_unique()
286        };
287
288        // Build a set of all the value -> uuid for the cands.
289        // If already exist, reject due to dup.
290        let cand_attr_set =
291            get_cand_attr_set(cand.iter().map(|(e, _)| e), uniqueattrs).map_err(|e| {
292                error!(err = ?e, "failed to get cand attr set");
293                e
294            })?;
295
296        // No candidates to check!
297        if cand_attr_set.is_empty() {
298            return Ok(());
299        }
300
301        // HAPPY FAST PATH - we do the fast existence query and if it passes
302        // we can *proceed*, nothing has conflicted.
303        let cand_filters: Vec<_> = cand_attr_set
304            .iter()
305            .flat_map(|((attr, v), uuids)| {
306                uuids.iter().map(|uuid| {
307                    // and[ attr eq k, andnot [ uuid eq v ]]
308                    // Basically this says where name but also not self.
309                    f_and(vec![
310                        FC::Eq(attr.clone(), v.clone()),
311                        f_andnot(FC::Eq(Attribute::Uuid, PartialValue::Uuid(*uuid))),
312                    ])
313                })
314            })
315            .collect();
316
317        let filt_in = filter!(f_or(cand_filters));
318
319        trace!(?filt_in);
320
321        // If any results, reject.
322        let conflict_cand = qs.internal_exists(&filt_in).inspect_err(|err| {
323            error!(?err, "internal exists error");
324        })?;
325
326        if conflict_cand {
327            // Unlike enforce unique, we need to be more thorough here. Enforce unique
328            // just has to block the whole operation. We *can't* fail the operation
329            // in the same way, we need to individually isolate each collision to
330            // turn all the involved entries into conflicts. Because of this, rather
331            // than bisection like we do in enforce_unique to find violating entries
332            // for admins to read, we need to actually isolate each and every conflicting
333            // uuid. To achieve this we need to change the structure of the query we perform
334            // to actually get everything that has a conflict now.
335
336            // For each uuid, show the set of uuids this conflicts with.
337            let mut conflict_uuid_map: BTreeMap<Uuid, BTreeSet<Uuid>> = BTreeMap::new();
338
339            // We need to invert this now to have a set of uuid: Vec<(attr, pv)>
340            // rather than the other direction which was optimised for the detection of
341            // candidate conflicts during updates.
342
343            let mut cand_attr_map: BTreeMap<Uuid, BTreeSet<_>> = BTreeMap::new();
344
345            cand_attr_set.into_iter().for_each(|(key, uuids)| {
346                uuids.into_iter().for_each(|uuid| {
347                    cand_attr_map
348                        .entry(uuid)
349                        .and_modify(|set| {
350                            set.insert(key.clone());
351                        })
352                        .or_insert_with(|| {
353                            let mut set = BTreeSet::new();
354                            set.insert(key.clone());
355                            set
356                        });
357                })
358            });
359
360            for (uuid, ava_set) in cand_attr_map.into_iter() {
361                let cand_filters: Vec<_> = ava_set
362                    .iter()
363                    .map(|(attr, pv)| {
364                        f_and(vec![
365                            FC::Eq(attr.clone(), pv.clone()),
366                            f_andnot(FC::Eq(Attribute::Uuid, PartialValue::Uuid(uuid))),
367                        ])
368                    })
369                    .collect();
370
371                let filt_in = filter!(f_or(cand_filters.clone()));
372
373                let filt_conflicts = qs.internal_search(filt_in).map_err(|e| {
374                    admin_error!("internal search error {:?}", e);
375                    e
376                })?;
377
378                // Important! This needs to conflict in *both directions*. We have to
379                // indicate that uuid has been conflicted by the entries in filt_conflicts,
380                // but also that the entries in filt_conflicts now conflict on us! Also remember
381                // that entries in either direction *may already* be in the conflict map, so we
382                // need to be very careful here not to stomp anything - append only!
383                if !filt_conflicts.is_empty() {
384                    let mut conflict_uuid_set = BTreeSet::new();
385
386                    for e in filt_conflicts {
387                        // Mark that this entry conflicted to us.
388                        conflict_uuid_set.insert(e.get_uuid());
389                        // Mark that the entry needs to conflict against us.
390                        conflict_uuid_map
391                            .entry(e.get_uuid())
392                            .and_modify(|set| {
393                                set.insert(uuid);
394                            })
395                            .or_insert_with(|| {
396                                let mut set = BTreeSet::new();
397                                set.insert(uuid);
398                                set
399                            });
400                    }
401
402                    conflict_uuid_map
403                        .entry(uuid)
404                        .and_modify(|set| set.append(&mut conflict_uuid_set))
405                        .or_insert_with(|| conflict_uuid_set);
406                }
407            }
408
409            trace!(?conflict_uuid_map);
410
411            if conflict_uuid_map.is_empty() {
412                error!("Impossible state. Attribute unique conflicts were detected in fast path, but were not found in slow path.");
413                return Err(OperationError::InvalidState);
414            }
415
416            // Now get all these values out with modify writable
417
418            let filt = filter!(FC::Or(
419                conflict_uuid_map
420                    .keys()
421                    .map(|u| f_eq(Attribute::Uuid, PartialValue::Uuid(*u)))
422                    .collect()
423            ));
424
425            let mut work_set = qs.internal_search_writeable(&filt)?;
426
427            for (_, entry) in work_set.iter_mut() {
428                let Some(uuid) = entry.get_uuid() else {
429                    error!("Impossible state. Entry that was declared in conflict map does not have a uuid.");
430                    return Err(OperationError::InvalidState);
431                };
432
433                // Add the uuid to the conflict uuids now.
434                conflict_uuids.insert(uuid);
435
436                if let Some(conflict_uuid_set) = conflict_uuid_map.get(&uuid) {
437                    entry.to_conflict(conflict_uuid_set.iter().copied())
438                } else {
439                    error!("Impossible state. Entry that was declared in conflict map was not present in work set.");
440                    return Err(OperationError::InvalidState);
441                }
442            }
443
444            qs.internal_apply_writable(work_set).map_err(|e| {
445                admin_error!("Failed to commit memberof group set {:?}", e);
446                e
447            })?;
448
449            // Okay we *finally got here. We are done!
450            Ok(())
451        } else {
452            // 🎉
453            Ok(())
454        }
455    }
456
457    #[instrument(level = "debug", name = "attrunique::verify", skip_all)]
458    fn verify(qs: &mut QueryServerReadTransaction) -> Vec<Result<(), ConsistencyError>> {
459        // Only check live entries, not recycled.
460        let filt_in = filter!(f_pres(Attribute::Class));
461
462        let all_cand = match qs
463            .internal_search(filt_in)
464            .map_err(|_| Err(ConsistencyError::QueryServerSearchFailure))
465        {
466            Ok(all_cand) => all_cand,
467            Err(e) => return vec![e],
468        };
469
470        let all_cand: Vec<_> = all_cand.into_iter().map(|e| e.as_ref().clone()).collect();
471
472        let uniqueattrs = {
473            let schema = qs.get_schema();
474            schema.get_attributes_unique()
475        };
476
477        let mut res: Vec<Result<(), ConsistencyError>> = Vec::with_capacity(0);
478
479        if get_cand_attr_set(&all_cand, uniqueattrs).is_err() {
480            res.push(Err(ConsistencyError::DuplicateUniqueAttribute))
481        }
482
483        trace!(?res);
484
485        res
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use crate::prelude::*;
492
493    // Test entry in db, and same name, reject.
494    #[test]
495    fn test_pre_create_name_unique() {
496        let e: Entry<EntryInit, EntryNew> = entry_init!(
497            (Attribute::Class, EntryClass::Person.to_value()),
498            (Attribute::Class, EntryClass::Account.to_value()),
499            (Attribute::Name, Value::new_iname("testperson")),
500            (Attribute::Description, Value::new_utf8s("testperson")),
501            (Attribute::DisplayName, Value::new_utf8s("testperson"))
502        );
503
504        let create = vec![e.clone()];
505        let preload = vec![e];
506
507        run_create_test!(
508            Err(OperationError::Plugin(PluginError::AttrUnique(
509                "duplicate value detected".to_string()
510            ))),
511            preload,
512            create,
513            None,
514            |_| {}
515        );
516    }
517
518    // Test two entries in create that would have same name, reject.
519    #[test]
520    fn test_pre_create_name_unique_2() {
521        let e: Entry<EntryInit, EntryNew> = entry_init!(
522            (Attribute::Class, EntryClass::Person.to_value()),
523            (Attribute::Class, EntryClass::Account.to_value()),
524            (Attribute::Name, Value::new_iname("testperson")),
525            (Attribute::Description, Value::new_utf8s("testperson")),
526            (Attribute::DisplayName, Value::new_utf8s("testperson"))
527        );
528
529        let create = vec![e.clone(), e];
530        let preload = Vec::with_capacity(0);
531
532        run_create_test!(
533            Err(OperationError::Plugin(PluginError::AttrUnique(
534                "ava already exists".to_string()
535            ))),
536            preload,
537            create,
538            None,
539            |_| {}
540        );
541    }
542
543    // Remember, an entry can't have a duplicate value within itself so we don't need to
544    // test this case.
545
546    // A mod to something that exists, reject.
547    #[test]
548    fn test_pre_modify_name_unique() {
549        let ea: Entry<EntryInit, EntryNew> = entry_init!(
550            (Attribute::Class, EntryClass::Group.to_value()),
551            (Attribute::Name, Value::new_iname("testgroup_a")),
552            (Attribute::Description, Value::new_utf8s("testgroup"))
553        );
554        let eb: Entry<EntryInit, EntryNew> = entry_init!(
555            (Attribute::Class, EntryClass::Group.to_value()),
556            (Attribute::Name, Value::new_iname("testgroup_b")),
557            (Attribute::Description, Value::new_utf8s("testgroup"))
558        );
559
560        let preload = vec![ea, eb];
561
562        run_modify_test!(
563            Err(OperationError::Plugin(PluginError::AttrUnique(
564                "duplicate value detected".to_string()
565            ))),
566            preload,
567            filter!(f_or!([f_eq(
568                Attribute::Name,
569                PartialValue::new_iname("testgroup_b")
570            ),])),
571            ModifyList::new_list(vec![
572                Modify::Purged(Attribute::Name),
573                Modify::Present(Attribute::Name, Value::new_iname("testgroup_a"))
574            ]),
575            None,
576            |_| {},
577            |_| {}
578        );
579    }
580
581    // Two items modded to have the same value, reject.
582    #[test]
583    fn test_pre_modify_name_unique_2() {
584        let ea: Entry<EntryInit, EntryNew> = entry_init!(
585            (Attribute::Class, EntryClass::Group.to_value()),
586            (Attribute::Name, Value::new_iname("testgroup_a")),
587            (Attribute::Description, Value::new_utf8s("testgroup"))
588        );
589        let eb: Entry<EntryInit, EntryNew> = entry_init!(
590            (Attribute::Class, EntryClass::Group.to_value()),
591            (Attribute::Name, Value::new_iname("testgroup_b")),
592            (Attribute::Description, Value::new_utf8s("testgroup"))
593        );
594
595        let preload = vec![ea, eb];
596
597        run_modify_test!(
598            Err(OperationError::Plugin(PluginError::AttrUnique(
599                "ava already exists".to_string()
600            ))),
601            preload,
602            filter!(f_or!([
603                f_eq(Attribute::Name, PartialValue::new_iname("testgroup_a")),
604                f_eq(Attribute::Name, PartialValue::new_iname("testgroup_b")),
605            ])),
606            ModifyList::new_list(vec![
607                Modify::Purged(Attribute::Name),
608                Modify::Present(Attribute::Name, Value::new_iname("testgroup"))
609            ]),
610            None,
611            |_| {},
612            |_| {}
613        );
614    }
615
616    #[test]
617    fn test_verify_name_unique() {
618        // Can we preload two dups and verify to show we detect?
619    }
620}