kanidmd_lib/be/
mod.rs

1//! The backend. This contains the "low level" storage and query code, which is
2//! implemented as a json-like kv document database. This has no rules about content
3//! of the server, which are all enforced at higher levels. The role of the backend
4//! is to persist content safely to disk, load that content, and execute queries
5//! utilising indexes in the most effective way possible.
6
7use crate::be::dbentry::{DbBackup, DbEntry};
8use crate::be::dbrepl::DbReplMeta;
9use crate::entry::Entry;
10use crate::filter::{Filter, FilterPlan, FilterResolved, FilterValidResolved};
11use crate::prelude::*;
12use crate::repl::cid::Cid;
13use crate::repl::proto::ReplCidRange;
14use crate::repl::ruv::{
15    ReplicationUpdateVector, ReplicationUpdateVectorReadTransaction,
16    ReplicationUpdateVectorTransaction, ReplicationUpdateVectorWriteTransaction,
17};
18use crate::utils::trigraph_iter;
19use crate::value::{IndexType, Value};
20use concread::cowcell::*;
21use hashbrown::{HashMap as Map, HashSet};
22use idlset::v2::IDLBitRange;
23use idlset::AndNot;
24use kanidm_proto::internal::{ConsistencyError, OperationError};
25use std::collections::BTreeMap;
26use std::fs;
27use std::ops::DerefMut;
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30use std::time::Duration;
31use tracing::{trace, trace_span};
32use uuid::Uuid;
33
34pub(crate) mod dbentry;
35pub(crate) mod dbrepl;
36pub(crate) mod dbvalue;
37
38mod idl_arc_sqlite;
39mod idl_sqlite;
40pub(crate) mod idxkey;
41pub(crate) mod keystorage;
42
43pub(crate) use self::idxkey::{IdxKey, IdxKeyRef, IdxKeyToRef, IdxSlope};
44use crate::be::idl_arc_sqlite::{
45    IdlArcSqlite, IdlArcSqliteReadTransaction, IdlArcSqliteTransaction,
46    IdlArcSqliteWriteTransaction,
47};
48use kanidm_proto::internal::FsType;
49
50// Currently disabled due to improvements in idlset for intersection handling.
51const FILTER_SEARCH_TEST_THRESHOLD: usize = 0;
52const FILTER_EXISTS_TEST_THRESHOLD: usize = 0;
53const FILTER_SUBSTR_TEST_THRESHOLD: usize = 4;
54
55#[derive(Debug, Clone)]
56/// Limits on the resources a single event can consume. These are defined per-event
57/// as they are derived from the userAuthToken based on that individual session
58pub struct Limits {
59    pub unindexed_allow: bool,
60    pub search_max_results: usize,
61    pub search_max_filter_test: usize,
62    pub filter_max_elements: usize,
63}
64
65impl Default for Limits {
66    fn default() -> Self {
67        Limits {
68            unindexed_allow: false,
69            search_max_results: DEFAULT_LIMIT_SEARCH_MAX_RESULTS as usize,
70            search_max_filter_test: DEFAULT_LIMIT_SEARCH_MAX_FILTER_TEST as usize,
71            filter_max_elements: DEFAULT_LIMIT_FILTER_MAX_ELEMENTS as usize,
72        }
73    }
74}
75
76impl Limits {
77    pub fn unlimited() -> Self {
78        Limits {
79            unindexed_allow: true,
80            search_max_results: usize::MAX >> 1,
81            search_max_filter_test: usize::MAX >> 1,
82            filter_max_elements: usize::MAX,
83        }
84    }
85
86    pub fn api_token() -> Self {
87        Limits {
88            unindexed_allow: false,
89            search_max_results: DEFAULT_LIMIT_API_SEARCH_MAX_RESULTS as usize,
90            search_max_filter_test: DEFAULT_LIMIT_API_SEARCH_MAX_FILTER_TEST as usize,
91            filter_max_elements: DEFAULT_LIMIT_FILTER_MAX_ELEMENTS as usize,
92        }
93    }
94}
95
96/// The result of a key value request containing the list of entry IDs that
97/// match the filter/query condition.
98#[derive(Debug, Clone)]
99pub enum IdList {
100    /// The value is not indexed, and must be assumed that all entries may match.
101    AllIds,
102    /// The index is "fuzzy" like a bloom filter (perhaps superset is a better description) -
103    /// it containes all elements that do match, but may have extra elements that don't.
104    /// This requires the caller to perform a filter test to assert that all
105    /// returned entries match all assertions within the filter.
106    Partial(IDLBitRange),
107    /// The set was indexed and is below the filter test threshold. This is because it's
108    /// now faster to test with the filter than to continue to access indexes at this point.
109    /// Like a partial set, this is a super set of the entries that match the query.
110    PartialThreshold(IDLBitRange),
111    /// The value is indexed and accurately represents the set of entries that precisely match.
112    Indexed(IDLBitRange),
113}
114
115#[derive(Debug)]
116pub struct IdRawEntry {
117    id: u64,
118    data: Vec<u8>,
119}
120
121#[derive(Debug, Clone)]
122pub struct IdxMeta {
123    pub idxkeys: Map<IdxKey, IdxSlope>,
124}
125
126impl IdxMeta {
127    pub fn new(idxkeys: Map<IdxKey, IdxSlope>) -> Self {
128        IdxMeta { idxkeys }
129    }
130}
131
132#[derive(Clone)]
133pub struct BackendConfig {
134    path: PathBuf,
135    pool_size: u32,
136    db_name: &'static str,
137    fstype: FsType,
138    // Cachesizes?
139    arcsize: Option<usize>,
140}
141
142impl BackendConfig {
143    pub fn new(
144        path: Option<&Path>,
145        pool_size: u32,
146        fstype: FsType,
147        arcsize: Option<usize>,
148    ) -> Self {
149        BackendConfig {
150            pool_size,
151            // This means if path is None, that "" implies an sqlite in memory/ram only database.
152            path: path.unwrap_or_else(|| Path::new("")).to_path_buf(),
153            db_name: "main",
154            fstype,
155            arcsize,
156        }
157    }
158
159    pub(crate) fn new_test(db_name: &'static str) -> Self {
160        BackendConfig {
161            pool_size: 1,
162            path: PathBuf::from(""),
163            db_name,
164            fstype: FsType::Generic,
165            arcsize: Some(2048),
166        }
167    }
168}
169
170#[derive(Clone)]
171pub struct Backend {
172    /// This is the actual datastorage layer.
173    idlayer: Arc<IdlArcSqlite>,
174    /// This is a copy-on-write cache of the index metadata that has been
175    /// extracted from attributes set, in the correct format for the backend
176    /// to consume. We use it to extract indexes from entries during write paths
177    /// and to allow the front end to know what indexes exist during a read.
178    idxmeta: Arc<CowCell<IdxMeta>>,
179    /// The current state of the replication update vector. This is effectively a
180    /// time series index of the full list of all changelog entries and what entries
181    /// that are part of that change.
182    ruv: Arc<ReplicationUpdateVector>,
183    cfg: BackendConfig,
184}
185
186pub struct BackendReadTransaction<'a> {
187    idlayer: IdlArcSqliteReadTransaction<'a>,
188    idxmeta: CowCellReadTxn<IdxMeta>,
189    ruv: ReplicationUpdateVectorReadTransaction<'a>,
190}
191
192unsafe impl Sync for BackendReadTransaction<'_> {}
193
194unsafe impl Send for BackendReadTransaction<'_> {}
195
196pub struct BackendWriteTransaction<'a> {
197    idlayer: IdlArcSqliteWriteTransaction<'a>,
198    idxmeta_wr: CowCellWriteTxn<'a, IdxMeta>,
199    ruv: ReplicationUpdateVectorWriteTransaction<'a>,
200}
201
202impl IdRawEntry {
203    fn into_dbentry(self) -> Result<(u64, DbEntry), OperationError> {
204        serde_json::from_slice(self.data.as_slice())
205            .map_err(|e| {
206                admin_error!(?e, "Serde JSON Error");
207                OperationError::SerdeJsonError
208            })
209            .map(|dbe| (self.id, dbe))
210    }
211
212    fn into_entry(self) -> Result<EntrySealedCommitted, OperationError> {
213        let db_e = serde_json::from_slice(self.data.as_slice()).map_err(|e| {
214            admin_error!(?e, id = %self.id, "Serde JSON Error");
215            let raw_str = String::from_utf8_lossy(self.data.as_slice());
216            debug!(raw = %raw_str);
217            OperationError::SerdeJsonError
218        })?;
219        // let id = u64::try_from(self.id).map_err(|_| OperationError::InvalidEntryId)?;
220        Entry::from_dbentry(db_e, self.id).ok_or(OperationError::CorruptedEntry(self.id))
221    }
222}
223
224pub trait BackendTransaction {
225    type IdlLayerType: IdlArcSqliteTransaction;
226    fn get_idlayer(&mut self) -> &mut Self::IdlLayerType;
227
228    type RuvType: ReplicationUpdateVectorTransaction;
229    fn get_ruv(&mut self) -> &mut Self::RuvType;
230
231    fn get_idxmeta_ref(&self) -> &IdxMeta;
232
233    /// Recursively apply a filter, transforming into IdList's on the way. This builds a query
234    /// execution log, so that it can be examined how an operation proceeded.
235    #[allow(clippy::cognitive_complexity)]
236    fn filter2idl(
237        &mut self,
238        filt: &FilterResolved,
239        thres: usize,
240    ) -> Result<(IdList, FilterPlan), OperationError> {
241        Ok(match filt {
242            FilterResolved::Eq(attr, value, idx) => {
243                if idx.is_some() {
244                    // Get the idx_key
245                    let idx_key = value.get_idx_eq_key();
246                    // Get the idl for this
247                    match self
248                        .get_idlayer()
249                        .get_idl(attr, IndexType::Equality, &idx_key)?
250                    {
251                        Some(idl) => (
252                            IdList::Indexed(idl),
253                            FilterPlan::EqIndexed(attr.clone(), idx_key),
254                        ),
255                        None => (IdList::AllIds, FilterPlan::EqCorrupt(attr.clone())),
256                    }
257                } else {
258                    // Schema believes this is not indexed
259                    (IdList::AllIds, FilterPlan::EqUnindexed(attr.clone()))
260                }
261            }
262            FilterResolved::Stw(attr, subvalue, idx)
263            | FilterResolved::Enw(attr, subvalue, idx)
264            | FilterResolved::Cnt(attr, subvalue, idx) => {
265                // Get the idx_key. Not all types support this, so may return "none".
266                trace!(?idx, ?subvalue, ?attr);
267                if let (true, Some(idx_key)) = (idx.is_some(), subvalue.get_idx_sub_key()) {
268                    self.filter2idl_sub(attr, idx_key)?
269                } else {
270                    // Schema believes this is not indexed
271                    (IdList::AllIds, FilterPlan::SubUnindexed(attr.clone()))
272                }
273            }
274            FilterResolved::Pres(attr, idx) => {
275                if idx.is_some() {
276                    // Get the idl for this
277                    match self.get_idlayer().get_idl(attr, IndexType::Presence, "_")? {
278                        Some(idl) => (IdList::Indexed(idl), FilterPlan::PresIndexed(attr.clone())),
279                        None => (IdList::AllIds, FilterPlan::PresCorrupt(attr.clone())),
280                    }
281                } else {
282                    // Schema believes this is not indexed
283                    (IdList::AllIds, FilterPlan::PresUnindexed(attr.clone()))
284                }
285            }
286            FilterResolved::LessThan(attr, _subvalue, _idx) => {
287                // We have no process for indexing this right now.
288                (IdList::AllIds, FilterPlan::LessThanUnindexed(attr.clone()))
289            }
290            FilterResolved::Or(l, _) => {
291                // Importantly if this has no inner elements, this returns
292                // an empty list.
293                let mut plan = Vec::with_capacity(0);
294                let mut result = IDLBitRange::new();
295                let mut partial = false;
296                let mut threshold = false;
297                // For each filter in l
298                for f in l.iter() {
299                    // get their idls
300                    match self.filter2idl(f, thres)? {
301                        (IdList::Indexed(idl), fp) => {
302                            plan.push(fp);
303                            // now union them (if possible)
304                            result = result | idl;
305                        }
306                        (IdList::Partial(idl), fp) => {
307                            plan.push(fp);
308                            // now union them (if possible)
309                            result = result | idl;
310                            partial = true;
311                        }
312                        (IdList::PartialThreshold(idl), fp) => {
313                            plan.push(fp);
314                            // now union them (if possible)
315                            result = result | idl;
316                            partial = true;
317                            threshold = true;
318                        }
319                        (IdList::AllIds, fp) => {
320                            plan.push(fp);
321                            // If we find anything unindexed, the whole term is unindexed.
322                            filter_trace!("Term {:?} is AllIds, shortcut return", f);
323                            let setplan = FilterPlan::OrUnindexed(plan);
324                            return Ok((IdList::AllIds, setplan));
325                        }
326                    }
327                } // end or.iter()
328                  // If we got here, every term must have been indexed or partial indexed.
329                if partial {
330                    if threshold {
331                        let setplan = FilterPlan::OrPartialThreshold(plan);
332                        (IdList::PartialThreshold(result), setplan)
333                    } else {
334                        let setplan = FilterPlan::OrPartial(plan);
335                        (IdList::Partial(result), setplan)
336                    }
337                } else {
338                    let setplan = FilterPlan::OrIndexed(plan);
339                    (IdList::Indexed(result), setplan)
340                }
341            }
342            FilterResolved::And(l, _) => {
343                // This algorithm is a little annoying. I couldn't get it to work with iter and
344                // folds due to the logic needed ...
345
346                // First, setup the two filter lists. We always apply AndNot after positive
347                // and terms.
348                let (f_andnot, f_rem): (Vec<_>, Vec<_>) = l.iter().partition(|f| f.is_andnot());
349
350                // We make this an iter, so everything comes off in order. if we used pop it means we
351                // pull from the tail, which is the WORST item to start with!
352                let mut f_rem_iter = f_rem.iter();
353
354                // Setup the initial result.
355                let (mut cand_idl, fp) = match f_rem_iter.next() {
356                    Some(f) => self.filter2idl(f, thres)?,
357                    None => {
358                        filter_warn!(
359                            "And filter was empty, or contains only AndNot, can not evaluate."
360                        );
361                        return Ok((IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid));
362                    }
363                };
364
365                // Setup the counter of terms we have left to evaluate.
366                // This is used so that we shortcut return ONLY when we really do have
367                // more terms remaining.
368                let mut f_rem_count = f_rem.len() + f_andnot.len() - 1;
369
370                // Setup the query plan tracker
371                let mut plan = vec![fp];
372
373                match &cand_idl {
374                    IdList::Indexed(idl) | IdList::Partial(idl) | IdList::PartialThreshold(idl) => {
375                        // When below thres, we have to return partials to trigger the entry_no_match_filter check.
376                        // But we only do this when there are actually multiple elements in the and,
377                        // because an and with 1 element now is FULLY resolved.
378                        if idl.below_threshold(thres) && f_rem_count > 0 {
379                            let setplan = FilterPlan::AndPartialThreshold(plan);
380                            return Ok((IdList::PartialThreshold(idl.clone()), setplan));
381                        } else if idl.is_empty() {
382                            // Regardless of the input state, if it's empty, this can never
383                            // be satisfied, so return we are indexed and complete.
384                            let setplan = FilterPlan::AndEmptyCand(plan);
385                            return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
386                        }
387                    }
388                    IdList::AllIds => {}
389                }
390
391                // Now, for all remaining,
392                for f in f_rem_iter {
393                    f_rem_count -= 1;
394                    let (inter, fp) = self.filter2idl(f, thres)?;
395                    plan.push(fp);
396                    cand_idl = match (cand_idl, inter) {
397                        (IdList::Indexed(ia), IdList::Indexed(ib)) => {
398                            let r = ia & ib;
399                            if r.below_threshold(thres) && f_rem_count > 0 {
400                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
401                                let setplan = FilterPlan::AndPartialThreshold(plan);
402                                return Ok((IdList::PartialThreshold(r), setplan));
403                            } else if r.is_empty() {
404                                // Regardless of the input state, if it's empty, this can never
405                                // be satisfied, so return we are indexed and complete.
406                                let setplan = FilterPlan::AndEmptyCand(plan);
407                                return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
408                            } else {
409                                IdList::Indexed(r)
410                            }
411                        }
412                        (IdList::Indexed(ia), IdList::Partial(ib))
413                        | (IdList::Partial(ia), IdList::Indexed(ib))
414                        | (IdList::Partial(ia), IdList::Partial(ib)) => {
415                            let r = ia & ib;
416                            if r.below_threshold(thres) && f_rem_count > 0 {
417                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
418                                let setplan = FilterPlan::AndPartialThreshold(plan);
419                                return Ok((IdList::PartialThreshold(r), setplan));
420                            } else {
421                                IdList::Partial(r)
422                            }
423                        }
424                        (IdList::Indexed(ia), IdList::PartialThreshold(ib))
425                        | (IdList::PartialThreshold(ia), IdList::Indexed(ib))
426                        | (IdList::PartialThreshold(ia), IdList::PartialThreshold(ib))
427                        | (IdList::PartialThreshold(ia), IdList::Partial(ib))
428                        | (IdList::Partial(ia), IdList::PartialThreshold(ib)) => {
429                            let r = ia & ib;
430                            if r.below_threshold(thres) && f_rem_count > 0 {
431                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
432                                let setplan = FilterPlan::AndPartialThreshold(plan);
433                                return Ok((IdList::PartialThreshold(r), setplan));
434                            } else {
435                                IdList::PartialThreshold(r)
436                            }
437                        }
438                        (IdList::Indexed(i), IdList::AllIds)
439                        | (IdList::AllIds, IdList::Indexed(i))
440                        | (IdList::Partial(i), IdList::AllIds)
441                        | (IdList::AllIds, IdList::Partial(i)) => IdList::Partial(i),
442                        (IdList::PartialThreshold(i), IdList::AllIds)
443                        | (IdList::AllIds, IdList::PartialThreshold(i)) => {
444                            IdList::PartialThreshold(i)
445                        }
446                        (IdList::AllIds, IdList::AllIds) => IdList::AllIds,
447                    };
448                }
449
450                // debug!("partial cand set ==> {:?}", cand_idl);
451
452                for f in f_andnot.iter() {
453                    f_rem_count -= 1;
454                    let FilterResolved::AndNot(f_in, _) = f else {
455                        filter_error!("Invalid server state, a cand filter leaked to andnot set!");
456                        return Err(OperationError::InvalidState);
457                    };
458                    let (inter, fp) = self.filter2idl(f_in, thres)?;
459                    // It's an and not, so we need to wrap the plan accordingly.
460                    plan.push(FilterPlan::AndNot(Box::new(fp)));
461                    cand_idl = match (cand_idl, inter) {
462                        (IdList::Indexed(ia), IdList::Indexed(ib)) => {
463                            let r = ia.andnot(ib);
464                            /*
465                            // Don't trigger threshold on and nots if fully indexed.
466                            if r.below_threshold(thres) {
467                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
468                                return Ok(IdList::PartialThreshold(r));
469                            } else {
470                                IdList::Indexed(r)
471                            }
472                            */
473                            IdList::Indexed(r)
474                        }
475                        (IdList::Indexed(ia), IdList::Partial(ib))
476                        | (IdList::Partial(ia), IdList::Indexed(ib))
477                        | (IdList::Partial(ia), IdList::Partial(ib)) => {
478                            let r = ia.andnot(ib);
479                            // DO trigger threshold on partials, because we have to apply the filter
480                            // test anyway, so we may as well shortcut at this point.
481                            if r.below_threshold(thres) && f_rem_count > 0 {
482                                let setplan = FilterPlan::AndPartialThreshold(plan);
483                                return Ok((IdList::PartialThreshold(r), setplan));
484                            } else {
485                                IdList::Partial(r)
486                            }
487                        }
488                        (IdList::Indexed(ia), IdList::PartialThreshold(ib))
489                        | (IdList::PartialThreshold(ia), IdList::Indexed(ib))
490                        | (IdList::PartialThreshold(ia), IdList::PartialThreshold(ib))
491                        | (IdList::PartialThreshold(ia), IdList::Partial(ib))
492                        | (IdList::Partial(ia), IdList::PartialThreshold(ib)) => {
493                            let r = ia.andnot(ib);
494                            // DO trigger threshold on partials, because we have to apply the filter
495                            // test anyway, so we may as well shortcut at this point.
496                            if r.below_threshold(thres) && f_rem_count > 0 {
497                                let setplan = FilterPlan::AndPartialThreshold(plan);
498                                return Ok((IdList::PartialThreshold(r), setplan));
499                            } else {
500                                IdList::PartialThreshold(r)
501                            }
502                        }
503
504                        (IdList::Indexed(_), IdList::AllIds)
505                        | (IdList::AllIds, IdList::Indexed(_))
506                        | (IdList::Partial(_), IdList::AllIds)
507                        | (IdList::AllIds, IdList::Partial(_))
508                        | (IdList::PartialThreshold(_), IdList::AllIds)
509                        | (IdList::AllIds, IdList::PartialThreshold(_)) => {
510                            // We could actually generate allids here
511                            // and then try to reduce the and-not set, but
512                            // for now we just return all ids.
513                            IdList::AllIds
514                        }
515                        (IdList::AllIds, IdList::AllIds) => IdList::AllIds,
516                    };
517                }
518
519                // What state is the final cand idl in?
520                let setplan = match cand_idl {
521                    IdList::Indexed(_) => FilterPlan::AndIndexed(plan),
522                    IdList::Partial(_) | IdList::PartialThreshold(_) => {
523                        FilterPlan::AndPartial(plan)
524                    }
525                    IdList::AllIds => FilterPlan::AndUnindexed(plan),
526                };
527
528                // Finally, return the result.
529                // debug!("final cand set ==> {:?}", cand_idl);
530                (cand_idl, setplan)
531            } // end and
532            FilterResolved::Inclusion(l, _) => {
533                // For inclusion to be valid, every term must have *at least* one element present.
534                // This really relies on indexing, and so it's internal only - generally only
535                // for fully indexed existence queries, such as from refint.
536
537                // This has a lot in common with an And and Or but not really quite either.
538                let mut plan = Vec::with_capacity(0);
539                let mut result = IDLBitRange::new();
540                // For each filter in l
541                for f in l.iter() {
542                    // get their idls
543                    match self.filter2idl(f, thres)? {
544                        (IdList::Indexed(idl), fp) => {
545                            plan.push(fp);
546                            if idl.is_empty() {
547                                // It's empty, so something is missing. Bail fast.
548                                filter_trace!("Inclusion is unable to proceed - an empty (missing) item was found!");
549                                let setplan = FilterPlan::InclusionIndexed(plan);
550                                return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
551                            } else {
552                                result = result | idl;
553                            }
554                        }
555                        (_, fp) => {
556                            plan.push(fp);
557                            let setplan = FilterPlan::InclusionInvalid(plan);
558                            error!(
559                                ?setplan,
560                                "Inclusion is unable to proceed - all terms must be fully indexed!"
561                            );
562                            return Ok((IdList::Partial(IDLBitRange::new()), setplan));
563                        }
564                    }
565                } // end or.iter()
566                  // If we got here, every term must have been indexed
567                let setplan = FilterPlan::InclusionIndexed(plan);
568                (IdList::Indexed(result), setplan)
569            }
570            // So why does this return empty? Normally we actually process an AndNot in the context
571            // of an "AND" query, but if it's used anywhere else IE the root filter, then there is
572            // no other set to exclude - therefore it's empty set. Additionally, even in an OR query
573            // the AndNot will be skipped as an empty set for the same reason.
574            FilterResolved::AndNot(_f, _) => {
575                // get the idl for f
576                // now do andnot?
577                filter_error!("Requested a top level or isolated AndNot, returning empty");
578                (IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid)
579            }
580            FilterResolved::Invalid(_) => {
581                // Indexed since it is always false and we don't want to influence filter testing
582                (IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid)
583            }
584        })
585    }
586
587    fn filter2idl_sub(
588        &mut self,
589        attr: &Attribute,
590        sub_idx_key: String,
591    ) -> Result<(IdList, FilterPlan), OperationError> {
592        // Now given that idx_key, we will iterate over the possible graphemes.
593        let mut grapheme_iter = trigraph_iter(&sub_idx_key);
594
595        // Substrings are always partial because we have to split the keys up
596        // and we don't pay attention to starts/ends with conditions. We need
597        // the caller to check those conditions manually at run time. This lets
598        // the index focus on trigraph indexes only rather than needing to
599        // worry about those other bits. In a way substring indexes are "fuzzy".
600
601        let mut idl = match grapheme_iter.next() {
602            Some(idx_key) => {
603                match self
604                    .get_idlayer()
605                    .get_idl(attr, IndexType::SubString, idx_key)?
606                {
607                    Some(idl) => idl,
608                    None => return Ok((IdList::AllIds, FilterPlan::SubCorrupt(attr.clone()))),
609                }
610            }
611            None => {
612                // If there are no graphemes this means the attempt is for an empty string, so
613                // we return an empty result set.
614                return Ok((IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid));
615            }
616        };
617
618        if idl.len() > FILTER_SUBSTR_TEST_THRESHOLD {
619            for idx_key in grapheme_iter {
620                // Get the idl for this
621                match self
622                    .get_idlayer()
623                    .get_idl(attr, IndexType::SubString, idx_key)?
624                {
625                    Some(r_idl) => {
626                        // Do an *and* operation between what we found and our working idl.
627                        idl = r_idl & idl;
628                    }
629                    None => {
630                        // if something didn't match, then we simply bail out after zeroing the current IDL.
631                        idl = IDLBitRange::new();
632                    }
633                };
634
635                if idl.len() < FILTER_SUBSTR_TEST_THRESHOLD {
636                    break;
637                }
638            }
639        } else {
640            drop(grapheme_iter);
641        }
642
643        // We exhausted the grapheme iter, exit with what we found.
644        Ok((
645            IdList::Partial(idl),
646            FilterPlan::SubIndexed(attr.clone(), sub_idx_key),
647        ))
648    }
649
650    #[instrument(level = "debug", name = "be::search", skip_all)]
651    fn search(
652        &mut self,
653        erl: &Limits,
654        filt: &Filter<FilterValidResolved>,
655    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
656        // Unlike DS, even if we don't get the index back, we can just pass
657        // to the in-memory filter test and be done.
658
659        trace!(filter_optimised = ?filt);
660
661        let (idl, fplan) = trace_span!("be::search -> filter2idl")
662            .in_scope(|| self.filter2idl(filt.to_inner(), FILTER_SEARCH_TEST_THRESHOLD))?;
663
664        debug!(search_filter_executed_plan = %fplan);
665
666        match &idl {
667            IdList::AllIds => {
668                if !erl.unindexed_allow {
669                    admin_error!(
670                        "filter (search) is fully unindexed, and not allowed by resource limits"
671                    );
672                    return Err(OperationError::ResourceLimit);
673                }
674            }
675            IdList::Partial(idl_br) => {
676                // if idl_br.len() > erl.search_max_filter_test {
677                if !idl_br.below_threshold(erl.search_max_filter_test) {
678                    admin_error!("filter (search) is partial indexed and greater than search_max_filter_test allowed by resource limits");
679                    return Err(OperationError::ResourceLimit);
680                }
681            }
682            IdList::PartialThreshold(_) => {
683                // Since we opted for this, this is not the fault
684                // of the user and we should not penalise them by limiting on partial.
685            }
686            IdList::Indexed(idl_br) => {
687                // We know this is resolved here, so we can attempt the limit
688                // check. This has to fold the whole index, but you know, class=pres is
689                // indexed ...
690                // if idl_br.len() > erl.search_max_results {
691                if !idl_br.below_threshold(erl.search_max_results) {
692                    admin_error!("filter (search) is indexed and greater than search_max_results allowed by resource limits");
693                    return Err(OperationError::ResourceLimit);
694                }
695            }
696        };
697
698        let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
699            admin_error!(?e, "get_identry failed");
700            e
701        })?;
702
703        let mut entries_filtered = match idl {
704            IdList::AllIds => trace_span!("be::search<entry::ftest::allids>").in_scope(|| {
705                entries
706                    .into_iter()
707                    .filter(|e| e.entry_match_no_index(filt))
708                    .collect()
709            }),
710            IdList::Partial(_) => trace_span!("be::search<entry::ftest::partial>").in_scope(|| {
711                entries
712                    .into_iter()
713                    .filter(|e| e.entry_match_no_index(filt))
714                    .collect()
715            }),
716            IdList::PartialThreshold(_) => trace_span!("be::search<entry::ftest::thresh>")
717                .in_scope(|| {
718                    entries
719                        .into_iter()
720                        .filter(|e| e.entry_match_no_index(filt))
721                        .collect()
722                }),
723            // Since the index fully resolved, we can shortcut the filter test step here!
724            IdList::Indexed(_) => {
725                filter_trace!("filter (search) was fully indexed 👏");
726                entries
727            }
728        };
729
730        // If the idl was not indexed, apply the resource limit now. Avoid the needless match since the
731        // if statement is quick.
732        if entries_filtered.len() > erl.search_max_results {
733            admin_error!("filter (search) is resolved and greater than search_max_results allowed by resource limits");
734            return Err(OperationError::ResourceLimit);
735        }
736
737        // Trim any excess capacity if needed
738        entries_filtered.shrink_to_fit();
739
740        Ok(entries_filtered)
741    }
742
743    /// Given a filter, assert some condition exists.
744    /// Basically, this is a specialised case of search, where we don't need to
745    /// load any candidates if they match. This is heavily used in uuid
746    /// refint and attr uniqueness.
747    #[instrument(level = "debug", name = "be::exists", skip_all)]
748    fn exists(
749        &mut self,
750        erl: &Limits,
751        filt: &Filter<FilterValidResolved>,
752    ) -> Result<bool, OperationError> {
753        trace!(filter_optimised = ?filt);
754
755        // Using the indexes, resolve the IdList here, or AllIds.
756        // Also get if the filter was 100% resolved or not.
757        let (idl, fplan) = trace_span!("be::exists -> filter2idl")
758            .in_scope(|| self.filter2idl(filt.to_inner(), FILTER_EXISTS_TEST_THRESHOLD))?;
759
760        debug!(exist_filter_executed_plan = %fplan);
761
762        // Apply limits to the IdList.
763        match &idl {
764            IdList::AllIds => {
765                if !erl.unindexed_allow {
766                    admin_error!(
767                        "filter (exists) is fully unindexed, and not allowed by resource limits"
768                    );
769                    return Err(OperationError::ResourceLimit);
770                }
771            }
772            IdList::Partial(idl_br) => {
773                if !idl_br.below_threshold(erl.search_max_filter_test) {
774                    admin_error!("filter (exists) is partial indexed and greater than search_max_filter_test allowed by resource limits");
775                    return Err(OperationError::ResourceLimit);
776                }
777            }
778            IdList::PartialThreshold(_) => {
779                // Since we opted for this, this is not the fault
780                // of the user and we should not penalise them.
781            }
782            IdList::Indexed(_) => {}
783        }
784
785        // Now, check the idl -- if it's fully resolved, we can skip this because the query
786        // was fully indexed.
787        match &idl {
788            IdList::Indexed(idl) => Ok(!idl.is_empty()),
789            _ => {
790                let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
791                    admin_error!(?e, "get_identry failed");
792                    e
793                })?;
794
795                // if not 100% resolved query, apply the filter test.
796                let entries_filtered: Vec<_> =
797                    trace_span!("be::exists<entry::ftest>").in_scope(|| {
798                        entries
799                            .into_iter()
800                            .filter(|e| e.entry_match_no_index(filt))
801                            .collect()
802                    });
803
804                Ok(!entries_filtered.is_empty())
805            }
806        } // end match idl
807    }
808
809    fn retrieve_range(
810        &mut self,
811        ranges: &BTreeMap<Uuid, ReplCidRange>,
812    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
813        // First pass the ranges to the ruv to resolve to an absolute set of
814        // entry id's.
815
816        let idl = self.get_ruv().range_to_idl(ranges);
817        // Because of how this works, I think that it's not possible for the idl
818        // to have any missing ids.
819        //
820        // If it was possible, we could just & with allids to remove the extraneous
821        // values.
822
823        if idl.is_empty() {
824            // return no entries.
825            return Ok(Vec::with_capacity(0));
826        }
827
828        // Make it an id list fr the backend.
829        let id_list = IdList::Indexed(idl);
830
831        self.get_idlayer().get_identry(&id_list).map_err(|e| {
832            admin_error!(?e, "get_identry failed");
833            e
834        })
835    }
836
837    fn verify(&mut self) -> Vec<Result<(), ConsistencyError>> {
838        self.get_idlayer().verify()
839    }
840
841    fn verify_entry_index(&mut self, e: &EntrySealedCommitted) -> Result<(), ConsistencyError> {
842        // First, check our references in name2uuid, uuid2spn and uuid2rdn
843        if e.mask_recycled_ts().is_some() {
844            let e_uuid = e.get_uuid();
845            // We only check these on live entries.
846            let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(None, Some(e));
847
848            let (Some(n2u_set), None) = (n2u_add, n2u_rem) else {
849                admin_error!("Invalid idx_name2uuid_diff state");
850                return Err(ConsistencyError::BackendIndexSync);
851            };
852
853            // If the set.len > 1, check each item.
854            n2u_set
855                .iter()
856                .try_for_each(|name| match self.get_idlayer().name2uuid(name) {
857                    Ok(Some(idx_uuid)) => {
858                        if idx_uuid == e_uuid {
859                            Ok(())
860                        } else {
861                            admin_error!("Invalid name2uuid state -> incorrect uuid association");
862                            Err(ConsistencyError::BackendIndexSync)
863                        }
864                    }
865                    r => {
866                        admin_error!(state = ?r, "Invalid name2uuid state");
867                        Err(ConsistencyError::BackendIndexSync)
868                    }
869                })?;
870
871            let spn = e.get_uuid2spn();
872            match self.get_idlayer().uuid2spn(e_uuid) {
873                Ok(Some(idx_spn)) => {
874                    if spn != idx_spn {
875                        admin_error!("Invalid uuid2spn state -> incorrect idx spn value");
876                        return Err(ConsistencyError::BackendIndexSync);
877                    }
878                }
879                r => {
880                    admin_error!(state = ?r, ?e_uuid, "Invalid uuid2spn state");
881                    trace!(entry = ?e);
882                    return Err(ConsistencyError::BackendIndexSync);
883                }
884            };
885
886            let rdn = e.get_uuid2rdn();
887            match self.get_idlayer().uuid2rdn(e_uuid) {
888                Ok(Some(idx_rdn)) => {
889                    if rdn != idx_rdn {
890                        admin_error!("Invalid uuid2rdn state -> incorrect idx rdn value");
891                        return Err(ConsistencyError::BackendIndexSync);
892                    }
893                }
894                r => {
895                    admin_error!(state = ?r, "Invalid uuid2rdn state");
896                    return Err(ConsistencyError::BackendIndexSync);
897                }
898            };
899        }
900
901        // Check the other entry:attr indexes are valid
902        //
903        // This is actually pretty hard to check, because we can check a value *should*
904        // exist, but not that a value should NOT be present in the index. Thought needed ...
905
906        // Got here? Ok!
907        Ok(())
908    }
909
910    fn verify_indexes(&mut self) -> Vec<Result<(), ConsistencyError>> {
911        let idl = IdList::AllIds;
912        let entries = match self.get_idlayer().get_identry(&idl) {
913            Ok(s) => s,
914            Err(e) => {
915                admin_error!(?e, "get_identry failure");
916                return vec![Err(ConsistencyError::Unknown)];
917            }
918        };
919
920        let r = entries.iter().try_for_each(|e| self.verify_entry_index(e));
921
922        if r.is_err() {
923            vec![r]
924        } else {
925            Vec::with_capacity(0)
926        }
927    }
928
929    fn verify_ruv(&mut self, results: &mut Vec<Result<(), ConsistencyError>>) {
930        // The way we verify this is building a whole second RUV and then comparing it.
931        let idl = IdList::AllIds;
932        let entries = match self.get_idlayer().get_identry(&idl) {
933            Ok(ent) => ent,
934            Err(e) => {
935                results.push(Err(ConsistencyError::Unknown));
936                admin_error!(?e, "get_identry failed");
937                return;
938            }
939        };
940
941        self.get_ruv().verify(&entries, results);
942    }
943
944    fn backup(&mut self, dst_path: &Path) -> Result<(), OperationError> {
945        let repl_meta = self.get_ruv().to_db_backup_ruv();
946
947        // load all entries into RAM, may need to change this later
948        // if the size of the database compared to RAM is an issue
949        let idl = IdList::AllIds;
950        let idlayer = self.get_idlayer();
951        let raw_entries: Vec<IdRawEntry> = idlayer.get_identry_raw(&idl)?;
952
953        let entries: Result<Vec<DbEntry>, _> = raw_entries
954            .iter()
955            .map(|id_ent| {
956                serde_json::from_slice(id_ent.data.as_slice())
957                    .map_err(|_| OperationError::SerdeJsonError) // log?
958            })
959            .collect();
960
961        let entries = entries?;
962
963        let db_s_uuid = idlayer
964            .get_db_s_uuid()
965            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
966        let db_d_uuid = idlayer
967            .get_db_d_uuid()
968            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
969        let db_ts_max = idlayer
970            .get_db_ts_max()
971            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
972
973        let keyhandles = idlayer.get_key_handles()?;
974
975        let bak = DbBackup::V5 {
976            // remember env is evaled at compile time.
977            version: env!("KANIDM_PKG_SERIES").to_string(),
978            db_s_uuid,
979            db_d_uuid,
980            db_ts_max,
981            keyhandles,
982            repl_meta,
983            entries,
984        };
985
986        let serialized_entries_str = serde_json::to_string(&bak).map_err(|e| {
987            admin_error!(?e, "serde error");
988            OperationError::SerdeJsonError
989        })?;
990
991        fs::write(dst_path, serialized_entries_str)
992            .map(|_| ())
993            .map_err(|e| {
994                admin_error!(?e, "fs::write error");
995                OperationError::FsError
996            })
997    }
998
999    fn name2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
1000        self.get_idlayer().name2uuid(name)
1001    }
1002
1003    fn externalid2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
1004        self.get_idlayer().externalid2uuid(name)
1005    }
1006
1007    fn uuid2spn(&mut self, uuid: Uuid) -> Result<Option<Value>, OperationError> {
1008        self.get_idlayer().uuid2spn(uuid)
1009    }
1010
1011    fn uuid2rdn(&mut self, uuid: Uuid) -> Result<Option<String>, OperationError> {
1012        self.get_idlayer().uuid2rdn(uuid)
1013    }
1014}
1015
1016impl<'a> BackendTransaction for BackendReadTransaction<'a> {
1017    type IdlLayerType = IdlArcSqliteReadTransaction<'a>;
1018    type RuvType = ReplicationUpdateVectorReadTransaction<'a>;
1019
1020    fn get_idlayer(&mut self) -> &mut IdlArcSqliteReadTransaction<'a> {
1021        &mut self.idlayer
1022    }
1023
1024    fn get_ruv(&mut self) -> &mut ReplicationUpdateVectorReadTransaction<'a> {
1025        &mut self.ruv
1026    }
1027
1028    fn get_idxmeta_ref(&self) -> &IdxMeta {
1029        &self.idxmeta
1030    }
1031}
1032
1033impl BackendReadTransaction<'_> {
1034    pub fn list_indexes(&mut self) -> Result<Vec<String>, OperationError> {
1035        self.get_idlayer().list_idxs()
1036    }
1037
1038    pub fn list_id2entry(&mut self) -> Result<Vec<(u64, String)>, OperationError> {
1039        self.get_idlayer().list_id2entry()
1040    }
1041
1042    pub fn list_index_content(
1043        &mut self,
1044        index_name: &str,
1045    ) -> Result<Vec<(String, IDLBitRange)>, OperationError> {
1046        self.get_idlayer().list_index_content(index_name)
1047    }
1048
1049    pub fn get_id2entry(&mut self, id: u64) -> Result<(u64, String), OperationError> {
1050        self.get_idlayer().get_id2entry(id)
1051    }
1052
1053    pub fn list_quarantined(&mut self) -> Result<Vec<(u64, String)>, OperationError> {
1054        self.get_idlayer().list_quarantined()
1055    }
1056}
1057
1058impl<'a> BackendTransaction for BackendWriteTransaction<'a> {
1059    type IdlLayerType = IdlArcSqliteWriteTransaction<'a>;
1060    type RuvType = ReplicationUpdateVectorWriteTransaction<'a>;
1061
1062    fn get_idlayer(&mut self) -> &mut IdlArcSqliteWriteTransaction<'a> {
1063        &mut self.idlayer
1064    }
1065
1066    fn get_ruv(&mut self) -> &mut ReplicationUpdateVectorWriteTransaction<'a> {
1067        &mut self.ruv
1068    }
1069
1070    fn get_idxmeta_ref(&self) -> &IdxMeta {
1071        &self.idxmeta_wr
1072    }
1073}
1074
1075impl<'a> BackendWriteTransaction<'a> {
1076    pub(crate) fn get_ruv_write(&mut self) -> &mut ReplicationUpdateVectorWriteTransaction<'a> {
1077        &mut self.ruv
1078    }
1079
1080    #[instrument(level = "debug", name = "be::create", skip_all)]
1081    pub fn create(
1082        &mut self,
1083        cid: &Cid,
1084        entries: Vec<EntrySealedNew>,
1085    ) -> Result<Vec<EntrySealedCommitted>, OperationError> {
1086        if entries.is_empty() {
1087            admin_error!("No entries provided to BE to create, invalid server call!");
1088            return Err(OperationError::EmptyRequest);
1089        }
1090
1091        // Check that every entry has a change associated
1092        // that matches the cid?
1093        entries.iter().try_for_each(|e| {
1094            if e.get_changestate().contains_tail_cid(cid) {
1095                Ok(())
1096            } else {
1097                admin_error!(
1098                    "Entry changelog does not contain a change related to this transaction"
1099                );
1100                Err(OperationError::ReplEntryNotChanged)
1101            }
1102        })?;
1103
1104        // Now, assign id's to all the new entries.
1105
1106        let mut id_max = self.idlayer.get_id2entry_max_id()?;
1107        let c_entries: Vec<_> = entries
1108            .into_iter()
1109            .map(|e| {
1110                id_max += 1;
1111                e.into_sealed_committed_id(id_max)
1112            })
1113            .collect();
1114
1115        // All good, lets update the RUV.
1116        // This auto compresses.
1117        let ruv_idl = IDLBitRange::from_iter(c_entries.iter().map(|e| e.get_id()));
1118
1119        // We don't need to skip this like in mod since creates always go to the ruv
1120        self.get_ruv().insert_change(cid, ruv_idl)?;
1121
1122        self.idlayer.write_identries(c_entries.iter())?;
1123
1124        self.idlayer.set_id2entry_max_id(id_max);
1125
1126        // Now update the indexes as required.
1127        for e in c_entries.iter() {
1128            self.entry_index(None, Some(e))?
1129        }
1130
1131        Ok(c_entries)
1132    }
1133
1134    #[instrument(level = "debug", name = "be::create", skip_all)]
1135    /// This is similar to create, but used in the replication path as it records all
1136    /// the CID's in the entry to the RUV, but without applying the current CID as
1137    /// a new value in the RUV. We *do not* want to apply the current CID in the RUV
1138    /// related to this entry as that could cause an infinite replication loop!
1139    pub fn refresh(
1140        &mut self,
1141        entries: Vec<EntrySealedNew>,
1142    ) -> Result<Vec<EntrySealedCommitted>, OperationError> {
1143        if entries.is_empty() {
1144            admin_error!("No entries provided to BE to create, invalid server call!");
1145            return Err(OperationError::EmptyRequest);
1146        }
1147
1148        // Assign id's to all the new entries.
1149        let mut id_max = self.idlayer.get_id2entry_max_id()?;
1150        let c_entries: Vec<_> = entries
1151            .into_iter()
1152            .map(|e| {
1153                id_max += 1;
1154                e.into_sealed_committed_id(id_max)
1155            })
1156            .collect();
1157
1158        self.idlayer.write_identries(c_entries.iter())?;
1159
1160        self.idlayer.set_id2entry_max_id(id_max);
1161
1162        // Update the RUV with all the changestates of the affected entries.
1163        for e in c_entries.iter() {
1164            self.get_ruv().update_entry_changestate(e)?;
1165        }
1166
1167        // Now update the indexes as required.
1168        for e in c_entries.iter() {
1169            self.entry_index(None, Some(e))?
1170        }
1171
1172        Ok(c_entries)
1173    }
1174
1175    #[instrument(level = "debug", name = "be::modify", skip_all)]
1176    pub fn modify(
1177        &mut self,
1178        cid: &Cid,
1179        pre_entries: &[Arc<EntrySealedCommitted>],
1180        post_entries: &[EntrySealedCommitted],
1181    ) -> Result<(), OperationError> {
1182        if post_entries.is_empty() || pre_entries.is_empty() {
1183            admin_error!("No entries provided to BE to modify, invalid server call!");
1184            return Err(OperationError::EmptyRequest);
1185        }
1186
1187        assert_eq!(post_entries.len(), pre_entries.len());
1188
1189        let post_entries_iter = post_entries.iter().filter(|e| {
1190            trace!(?cid);
1191            trace!(changestate = ?e.get_changestate());
1192            // If True - This means that at least one attribute that *is* replicated was changed
1193            // on this entry, so we need to update and add this to the RUV!
1194            //
1195            // If False - This means that the entry in question was updated but the changes are all
1196            // non-replicated so we DO NOT update the RUV here!
1197            e.get_changestate().contains_tail_cid(cid)
1198        });
1199
1200        // All good, lets update the RUV.
1201        // This auto compresses.
1202        let ruv_idl = IDLBitRange::from_iter(post_entries_iter.map(|e| e.get_id()));
1203
1204        if !ruv_idl.is_empty() {
1205            self.get_ruv().insert_change(cid, ruv_idl)?;
1206        }
1207
1208        // Now, given the list of id's, update them
1209        self.get_idlayer().write_identries(post_entries.iter())?;
1210
1211        // Finally, we now reindex all the changed entries. We do this by iterating and zipping
1212        // over the set, because we know the list is in the same order.
1213        pre_entries
1214            .iter()
1215            .zip(post_entries.iter())
1216            .try_for_each(|(pre, post)| self.entry_index(Some(pre.as_ref()), Some(post)))
1217    }
1218
1219    #[instrument(level = "debug", name = "be::incremental_prepare", skip_all)]
1220    pub fn incremental_prepare(
1221        &mut self,
1222        entry_meta: &[EntryIncrementalNew],
1223    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
1224        let mut ret_entries = Vec::with_capacity(entry_meta.len());
1225        let id_max_pre = self.idlayer.get_id2entry_max_id()?;
1226        let mut id_max = id_max_pre;
1227
1228        for ctx_ent in entry_meta.iter() {
1229            let ctx_ent_uuid = ctx_ent.get_uuid();
1230            let idx_key = ctx_ent_uuid.as_hyphenated().to_string();
1231
1232            let idl =
1233                self.get_idlayer()
1234                    .get_idl(&Attribute::Uuid, IndexType::Equality, &idx_key)?;
1235
1236            let entry = match idl {
1237                Some(idl) if idl.is_empty() => {
1238                    // Create the stub entry, we just need it to have an id number
1239                    // allocated.
1240                    id_max += 1;
1241
1242                    let stub_entry = Arc::new(EntrySealedCommitted::stub_sealed_committed_id(
1243                        id_max, ctx_ent,
1244                    ));
1245                    // Now, the stub entry needs to be indexed. If not, uuid2spn
1246                    // isn't created, so subsequent index diffs don't work correctly.
1247                    self.entry_index(None, Some(stub_entry.as_ref()))?;
1248
1249                    // Okay, entry ready to go.
1250                    stub_entry
1251                }
1252                Some(idl) if idl.len() == 1 => {
1253                    // Get the entry from this idl.
1254                    let mut entries = self
1255                        .get_idlayer()
1256                        .get_identry(&IdList::Indexed(idl))
1257                        .map_err(|e| {
1258                            admin_error!(?e, "get_identry failed");
1259                            e
1260                        })?;
1261
1262                    if let Some(entry) = entries.pop() {
1263                        // Return it.
1264                        entry
1265                    } else {
1266                        error!("Invalid entry state, index was unable to locate entry");
1267                        return Err(OperationError::InvalidDbState);
1268                    }
1269                    // Done, entry is ready to go
1270                }
1271                Some(idl) => {
1272                    // BUG - duplicate uuid!
1273                    error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must have only a single or no values. Contains {:?}", idl);
1274                    return Err(OperationError::InvalidDbState);
1275                }
1276                None => {
1277                    // BUG - corrupt index.
1278                    error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must be present");
1279                    return Err(OperationError::InvalidDbState);
1280                }
1281            };
1282
1283            ret_entries.push(entry);
1284        }
1285
1286        if id_max != id_max_pre {
1287            self.idlayer.set_id2entry_max_id(id_max);
1288        }
1289
1290        Ok(ret_entries)
1291    }
1292
1293    #[instrument(level = "debug", name = "be::incremental_apply", skip_all)]
1294    pub fn incremental_apply(
1295        &mut self,
1296        update_entries: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
1297        create_entries: Vec<EntrySealedNew>,
1298    ) -> Result<(), OperationError> {
1299        // For the values in create_cands, create these with similar code to the refresh
1300        // path.
1301        if !create_entries.is_empty() {
1302            // Assign id's to all the new entries.
1303            let mut id_max = self.idlayer.get_id2entry_max_id()?;
1304            let c_entries: Vec<_> = create_entries
1305                .into_iter()
1306                .map(|e| {
1307                    id_max += 1;
1308                    e.into_sealed_committed_id(id_max)
1309                })
1310                .collect();
1311
1312            self.idlayer.write_identries(c_entries.iter())?;
1313
1314            self.idlayer.set_id2entry_max_id(id_max);
1315
1316            // Update the RUV with all the changestates of the affected entries.
1317            for e in c_entries.iter() {
1318                self.get_ruv().update_entry_changestate(e)?;
1319            }
1320
1321            // Now update the indexes as required.
1322            for e in c_entries.iter() {
1323                self.entry_index(None, Some(e))?
1324            }
1325        }
1326
1327        // Otherwise this is a cid-less copy of modify.
1328        if !update_entries.is_empty() {
1329            self.get_idlayer()
1330                .write_identries(update_entries.iter().map(|(up, _)| up))?;
1331
1332            for (e, _) in update_entries.iter() {
1333                self.get_ruv().update_entry_changestate(e)?;
1334            }
1335
1336            for (post, pre) in update_entries.iter() {
1337                self.entry_index(Some(pre.as_ref()), Some(post))?
1338            }
1339        }
1340
1341        Ok(())
1342    }
1343
1344    #[instrument(level = "debug", name = "be::reap_tombstones", skip_all)]
1345    pub fn reap_tombstones(&mut self, cid: &Cid, trim_cid: &Cid) -> Result<usize, OperationError> {
1346        debug_assert!(cid > trim_cid);
1347        // Mark a new maximum for the RUV by inserting an empty change. This
1348        // is important to keep the changestate always advancing.
1349        self.get_ruv().insert_change(cid, IDLBitRange::default())?;
1350
1351        // We plan to clear the RUV up to this cid. So we need to build an IDL
1352        // of all the entries we need to examine.
1353        let idl = self.get_ruv().trim_up_to(trim_cid).map_err(|e| {
1354            admin_error!(
1355                ?e,
1356                "During tombstone cleanup, failed to trim RUV to {:?}",
1357                trim_cid
1358            );
1359            e
1360        })?;
1361
1362        let entries = self
1363            .get_idlayer()
1364            .get_identry(&IdList::Indexed(idl))
1365            .map_err(|e| {
1366                admin_error!(?e, "get_identry failed");
1367                e
1368            })?;
1369
1370        if entries.is_empty() {
1371            admin_debug!("No entries affected - reap_tombstones operation success");
1372            return Ok(0);
1373        }
1374
1375        // Now that we have a list of entries we need to partition them into
1376        // two sets. The entries that are tombstoned and ready to reap_tombstones, and
1377        // the entries that need to have their change logs trimmed.
1378        //
1379        // Remember, these tombstones can be reaped because they were tombstoned at time
1380        // point 'cid', and since we are now "past" that minimum cid, then other servers
1381        // will also be trimming these out.
1382        //
1383        // Note unlike a changelog impl, we don't need to trim changestates here. We
1384        // only need the RUV trimmed so that we know if other servers are laggin behind!
1385
1386        // What entries are tombstones and ready to be deleted?
1387
1388        let (tombstones, leftover): (Vec<_>, Vec<_>) = entries
1389            .into_iter()
1390            .partition(|e| e.get_changestate().can_delete(trim_cid));
1391
1392        let ruv_idls = self.get_ruv().ruv_idls();
1393
1394        // Assert that anything leftover still either is *alive* OR is a tombstone
1395        // and has entries in the RUV!
1396
1397        if !leftover
1398            .iter()
1399            .all(|e| e.get_changestate().is_live() || ruv_idls.contains(e.get_id()))
1400        {
1401            admin_error!("Left over entries may be orphaned due to missing RUV entries");
1402            return Err(OperationError::ReplInvalidRUVState);
1403        }
1404
1405        // Now setup to reap_tombstones the tombstones. Remember, in the post cleanup, it's could
1406        // now have been trimmed to a point we can purge them!
1407
1408        // Assert the id's exist on the entry.
1409        let id_list: IDLBitRange = tombstones.iter().map(|e| e.get_id()).collect();
1410
1411        // Ensure nothing here exists in the RUV index, else it means
1412        // we didn't trim properly, or some other state violation has occurred.
1413        if !((&ruv_idls & &id_list).is_empty()) {
1414            admin_error!("RUV still contains entries that are going to be removed.");
1415            return Err(OperationError::ReplInvalidRUVState);
1416        }
1417
1418        // Now, given the list of id's, reap_tombstones them.
1419        let sz = id_list.len();
1420        self.get_idlayer().delete_identry(id_list.into_iter())?;
1421
1422        // Finally, purge the indexes from the entries we removed. These still have
1423        // indexes due to class=tombstone.
1424        tombstones
1425            .iter()
1426            .try_for_each(|e| self.entry_index(Some(e), None))?;
1427
1428        Ok(sz)
1429    }
1430
1431    #[instrument(level = "debug", name = "be::update_idxmeta", skip_all)]
1432    pub fn update_idxmeta(&mut self, idxkeys: Vec<IdxKey>) -> Result<(), OperationError> {
1433        if self.is_idx_slopeyness_generated()? {
1434            trace!("Indexing slopes available");
1435        } else {
1436            warn!("No indexing slopes available. You should consider reindexing to generate these");
1437        };
1438
1439        // Setup idxkeys here. By default we set these all to "max slope" aka
1440        // all indexes are "equal" but also worse case unless analysed. If they
1441        // have been analysed, we can set the slope factor into here.
1442        let mut idxkeys = idxkeys
1443            .into_iter()
1444            .map(|k| self.get_idx_slope(&k).map(|slope| (k, slope)))
1445            .collect::<Result<Map<_, _>, _>>()?;
1446
1447        std::mem::swap(&mut self.idxmeta_wr.deref_mut().idxkeys, &mut idxkeys);
1448        Ok(())
1449    }
1450
1451    // Should take a mut index set, and then we write the whole thing back
1452    // in a single stripe.
1453    //
1454    // So we need a cache, which we load indexes into as we do ops, then we
1455    // modify them.
1456    //
1457    // At the end, we flush those cchange outs in a single run.
1458    // For create this is probably a
1459    // TODO: Can this be improved?
1460    #[allow(clippy::cognitive_complexity)]
1461    fn entry_index(
1462        &mut self,
1463        pre: Option<&EntrySealedCommitted>,
1464        post: Option<&EntrySealedCommitted>,
1465    ) -> Result<(), OperationError> {
1466        let (e_uuid, e_id, uuid_same) = match (pre, post) {
1467            (None, None) => {
1468                admin_error!("Invalid call to entry_index - no entries provided");
1469                return Err(OperationError::InvalidState);
1470            }
1471            (Some(pre), None) => {
1472                trace!("Attempting to remove entry indexes");
1473                (pre.get_uuid(), pre.get_id(), true)
1474            }
1475            (None, Some(post)) => {
1476                trace!("Attempting to create entry indexes");
1477                (post.get_uuid(), post.get_id(), true)
1478            }
1479            (Some(pre), Some(post)) => {
1480                trace!("Attempting to modify entry indexes");
1481                assert_eq!(pre.get_id(), post.get_id());
1482                (
1483                    post.get_uuid(),
1484                    post.get_id(),
1485                    pre.get_uuid() == post.get_uuid(),
1486                )
1487            }
1488        };
1489
1490        // Update the names/uuid maps. These have to mask out entries
1491        // that are recycled or tombstones, so these pretend as "deleted"
1492        // and can trigger correct actions.
1493
1494        let mask_pre = pre.and_then(|e| e.mask_recycled_ts());
1495        let mask_pre = if !uuid_same {
1496            // Okay, so if the uuids are different this is probably from
1497            // a replication conflict.  We can't just use the normal none/some
1498            // check from the Entry::idx functions as they only yield partial
1499            // changes. Because the uuid is changing, we have to treat pre
1500            // as a deleting entry, regardless of what state post is in.
1501            let uuid = mask_pre.map(|e| e.get_uuid()).ok_or_else(|| {
1502                admin_error!("Invalid entry state - possible memory corruption");
1503                OperationError::InvalidState
1504            })?;
1505
1506            let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(mask_pre, None);
1507            // There will never be content to add.
1508            assert!(n2u_add.is_none());
1509
1510            let (eid2u_add, eid2u_rem) = Entry::idx_externalid2uuid_diff(mask_pre, None);
1511            // There will never be content to add.
1512            assert!(eid2u_add.is_none());
1513
1514            let u2s_act = Entry::idx_uuid2spn_diff(mask_pre, None);
1515            let u2r_act = Entry::idx_uuid2rdn_diff(mask_pre, None);
1516
1517            trace!(?n2u_rem, ?eid2u_rem, ?u2s_act, ?u2r_act,);
1518
1519            // Write the changes out to the backend
1520            if let Some(rem) = n2u_rem {
1521                self.idlayer.write_name2uuid_rem(rem)?
1522            }
1523
1524            if let Some(rem) = eid2u_rem {
1525                self.idlayer.write_externalid2uuid_rem(rem)?
1526            }
1527
1528            match u2s_act {
1529                None => {}
1530                Some(Ok(k)) => self.idlayer.write_uuid2spn(uuid, Some(k))?,
1531                Some(Err(_)) => self.idlayer.write_uuid2spn(uuid, None)?,
1532            }
1533
1534            match u2r_act {
1535                None => {}
1536                Some(Ok(k)) => self.idlayer.write_uuid2rdn(uuid, Some(k))?,
1537                Some(Err(_)) => self.idlayer.write_uuid2rdn(uuid, None)?,
1538            }
1539            // Return none, mask_pre is now completed.
1540            None
1541        } else {
1542            // Return the state.
1543            mask_pre
1544        };
1545
1546        let mask_post = post.and_then(|e| e.mask_recycled_ts());
1547        let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(mask_pre, mask_post);
1548        let (eid2u_add, eid2u_rem) = Entry::idx_externalid2uuid_diff(mask_pre, mask_post);
1549
1550        let u2s_act = Entry::idx_uuid2spn_diff(mask_pre, mask_post);
1551        let u2r_act = Entry::idx_uuid2rdn_diff(mask_pre, mask_post);
1552
1553        trace!(
1554            ?n2u_add,
1555            ?n2u_rem,
1556            ?eid2u_add,
1557            ?eid2u_rem,
1558            ?u2s_act,
1559            ?u2r_act
1560        );
1561
1562        // Write the changes out to the backend
1563        if let Some(add) = n2u_add {
1564            self.idlayer.write_name2uuid_add(e_uuid, add)?
1565        }
1566        if let Some(rem) = n2u_rem {
1567            self.idlayer.write_name2uuid_rem(rem)?
1568        }
1569
1570        if let Some(add) = eid2u_add {
1571            self.idlayer.write_externalid2uuid_add(e_uuid, add)?
1572        }
1573        if let Some(rem) = eid2u_rem {
1574            self.idlayer.write_externalid2uuid_rem(rem)?
1575        }
1576
1577        match u2s_act {
1578            None => {}
1579            Some(Ok(k)) => self.idlayer.write_uuid2spn(e_uuid, Some(k))?,
1580            Some(Err(_)) => self.idlayer.write_uuid2spn(e_uuid, None)?,
1581        }
1582
1583        match u2r_act {
1584            None => {}
1585            Some(Ok(k)) => self.idlayer.write_uuid2rdn(e_uuid, Some(k))?,
1586            Some(Err(_)) => self.idlayer.write_uuid2rdn(e_uuid, None)?,
1587        }
1588
1589        // Extremely Cursed - Okay, we know that self.idxmeta will NOT be changed
1590        // in this function, but we need to borrow self as mut for the caches in
1591        // get_idl to work. As a result, this causes a double borrow. To work around
1592        // this we discard the lifetime on idxmeta, because we know that it will
1593        // remain constant for the life of the operation.
1594
1595        let idxmeta = unsafe { &(*(&self.idxmeta_wr.idxkeys as *const _)) };
1596
1597        let idx_diff = Entry::idx_diff(idxmeta, pre, post);
1598
1599        idx_diff.into_iter()
1600            .try_for_each(|act| {
1601                match act {
1602                    Ok((attr, itype, idx_key)) => {
1603                        trace!("Adding {:?} idx -> {:?}: {:?}", itype, attr, idx_key);
1604                        match self.idlayer.get_idl(attr, itype, &idx_key)? {
1605                            Some(mut idl) => {
1606                                idl.insert_id(e_id);
1607                                if cfg!(debug_assertions)
1608                                    && *attr == Attribute::Uuid && itype == IndexType::Equality {
1609                                        // This means a duplicate UUID has appeared in the index.
1610                                        if idl.len() > 1 {
1611                                            trace!(duplicate_idl = ?idl, ?idx_key);
1612                                        }
1613                                        debug_assert!(idl.len() <= 1);
1614                                }
1615                                self.idlayer.write_idl(attr, itype, &idx_key, &idl)
1616                            }
1617                            None => {
1618                                warn!(
1619                                    "WARNING: index {:?} {:?} was not found. YOU MUST REINDEX YOUR DATABASE",
1620                                    attr, itype
1621                                );
1622                                Ok(())
1623                            }
1624                        }
1625                    }
1626                    Err((attr, itype, idx_key)) => {
1627                        trace!("Removing {:?} idx -> {:?}: {:?}", itype, attr, idx_key);
1628                        match self.idlayer.get_idl(attr, itype, &idx_key)? {
1629                            Some(mut idl) => {
1630                                idl.remove_id(e_id);
1631                                if cfg!(debug_assertions) && *attr == Attribute::Uuid && itype == IndexType::Equality {
1632                                        // This means a duplicate UUID has appeared in the index.
1633                                        if idl.len() > 1 {
1634                                            trace!(duplicate_idl = ?idl, ?idx_key);
1635                                        }
1636                                        debug_assert!(idl.len() <= 1);
1637                                }
1638                                self.idlayer.write_idl(attr, itype, &idx_key, &idl)
1639                            }
1640                            None => {
1641                                warn!(
1642                                    "WARNING: index {:?} {:?} was not found. YOU MUST REINDEX YOUR DATABASE",
1643                                    attr, itype
1644                                );
1645                                Ok(())
1646                            }
1647                        }
1648                    }
1649                }
1650            })
1651        // End try_for_each
1652    }
1653
1654    #[allow(dead_code)]
1655    fn missing_idxs(&mut self) -> Result<Vec<(Attribute, IndexType)>, OperationError> {
1656        let idx_table_list = self.get_idlayer().list_idxs()?;
1657
1658        // Turn the vec to a real set
1659        let idx_table_set: HashSet<_> = idx_table_list.into_iter().collect();
1660
1661        let missing: Vec<_> = self
1662            .idxmeta_wr
1663            .idxkeys
1664            .keys()
1665            .filter_map(|ikey| {
1666                // what would the table name be?
1667                let tname = format!("idx_{}_{}", ikey.itype.as_idx_str(), ikey.attr.as_str());
1668                trace!("Checking for {}", tname);
1669
1670                if idx_table_set.contains(&tname) {
1671                    None
1672                } else {
1673                    Some((ikey.attr.clone(), ikey.itype))
1674                }
1675            })
1676            .collect();
1677        Ok(missing)
1678    }
1679
1680    fn create_idxs(&mut self) -> Result<(), OperationError> {
1681        // Create name2uuid and uuid2name
1682        trace!("Creating index -> name2uuid");
1683        self.idlayer.create_name2uuid()?;
1684
1685        trace!("Creating index -> externalid2uuid");
1686        self.idlayer.create_externalid2uuid()?;
1687
1688        trace!("Creating index -> uuid2spn");
1689        self.idlayer.create_uuid2spn()?;
1690
1691        trace!("Creating index -> uuid2rdn");
1692        self.idlayer.create_uuid2rdn()?;
1693
1694        self.idxmeta_wr
1695            .idxkeys
1696            .keys()
1697            .try_for_each(|ikey| self.idlayer.create_idx(&ikey.attr, ikey.itype))
1698    }
1699
1700    pub fn upgrade_reindex(&mut self, v: i64) -> Result<(), OperationError> {
1701        let dbv = self.get_db_index_version()?;
1702        admin_debug!(?dbv, ?v, "upgrade_reindex");
1703        if dbv < v {
1704            self.reindex(false)?;
1705            self.set_db_index_version(v)
1706        } else {
1707            Ok(())
1708        }
1709    }
1710
1711    #[instrument(level = "info", skip_all)]
1712    pub fn reindex(&mut self, immediate: bool) -> Result<(), OperationError> {
1713        let notice_immediate = immediate || (cfg!(not(test)) && cfg!(not(debug_assertions)));
1714
1715        info!(
1716            immediate = notice_immediate,
1717            "System reindex: started - this may take a long time!"
1718        );
1719
1720        // Purge the idxs
1721        self.idlayer.danger_purge_idxs()?;
1722
1723        // Using the index metadata on the txn, create all our idx tables
1724        self.create_idxs()?;
1725
1726        // Now, we need to iterate over everything in id2entry and index them
1727        // Future idea: Do this in batches of X amount to limit memory
1728        // consumption.
1729        let idl = IdList::AllIds;
1730        let entries = self.idlayer.get_identry(&idl).inspect_err(|err| {
1731            error!(?err, "get_identry failure");
1732        })?;
1733
1734        let mut count = 0;
1735
1736        // This is the longest phase of reindexing, so we have a "progress" display here.
1737        entries
1738            .iter()
1739            .try_for_each(|e| {
1740                if immediate {
1741                    count += 1;
1742                    if count % 2500 == 0 {
1743                        eprint!("{count}");
1744                    } else if count % 250 == 0 {
1745                        eprint!(".");
1746                    }
1747                }
1748
1749                self.entry_index(None, Some(e))
1750            })
1751            .inspect_err(|err| {
1752                error!(?err, "reindex failed");
1753            })?;
1754
1755        if immediate {
1756            eprintln!(" done ✅");
1757        }
1758
1759        info!(immediate, "Reindexed {count} entries");
1760
1761        info!("Optimising Indexes: started");
1762        self.idlayer.optimise_dirty_idls();
1763        info!("Optimising Indexes: complete ✅");
1764        info!("Calculating Index Optimisation Slopes: started");
1765        self.idlayer.analyse_idx_slopes().inspect_err(|err| {
1766            error!(?err, "index optimisation failed");
1767        })?;
1768        info!("Calculating Index Optimisation Slopes: complete ✅");
1769        info!("System reindex: complete 🎉");
1770        Ok(())
1771    }
1772
1773    /// ⚠️  - This function will destroy all indexes in the database.
1774    ///
1775    /// It should only be called internally by the backend in limited and
1776    /// specific situations.
1777    fn danger_purge_idxs(&mut self) -> Result<(), OperationError> {
1778        self.get_idlayer().danger_purge_idxs()
1779    }
1780
1781    /// ⚠️  - This function will destroy all entries and indexes in the database.
1782    ///
1783    /// It should only be called internally by the backend in limited and
1784    /// specific situations.
1785    pub(crate) fn danger_delete_all_db_content(&mut self) -> Result<(), OperationError> {
1786        self.get_ruv().clear();
1787        self.get_idlayer()
1788            .danger_purge_id2entry()
1789            .and_then(|_| self.danger_purge_idxs())
1790    }
1791
1792    #[cfg(test)]
1793    pub fn load_test_idl(
1794        &mut self,
1795        attr: &Attribute,
1796        itype: IndexType,
1797        idx_key: &str,
1798    ) -> Result<Option<IDLBitRange>, OperationError> {
1799        self.get_idlayer().get_idl(attr, itype, idx_key)
1800    }
1801
1802    fn is_idx_slopeyness_generated(&mut self) -> Result<bool, OperationError> {
1803        self.get_idlayer().is_idx_slopeyness_generated()
1804    }
1805
1806    fn get_idx_slope(&mut self, ikey: &IdxKey) -> Result<IdxSlope, OperationError> {
1807        // Do we have the slopeyness?
1808        let slope = self
1809            .get_idlayer()
1810            .get_idx_slope(ikey)?
1811            .unwrap_or_else(|| get_idx_slope_default(ikey));
1812        trace!("index slope - {:?} -> {:?}", ikey, slope);
1813        Ok(slope)
1814    }
1815
1816    pub fn restore(&mut self, src_path: &Path) -> Result<(), OperationError> {
1817        let serialized_string = fs::read_to_string(src_path).map_err(|e| {
1818            admin_error!("fs::read_to_string {:?}", e);
1819            OperationError::FsError
1820        })?;
1821
1822        self.danger_delete_all_db_content().map_err(|e| {
1823            admin_error!("delete_all_db_content failed {:?}", e);
1824            e
1825        })?;
1826
1827        let idlayer = self.get_idlayer();
1828        // load all entries into RAM, may need to change this later
1829        // if the size of the database compared to RAM is an issue
1830
1831        let dbbak_option: Result<DbBackup, serde_json::Error> =
1832            serde_json::from_str(&serialized_string);
1833
1834        let dbbak = dbbak_option.map_err(|e| {
1835            admin_error!("serde_json error {:?}", e);
1836            OperationError::SerdeJsonError
1837        })?;
1838
1839        let (dbentries, repl_meta, maybe_version) = match dbbak {
1840            DbBackup::V1(dbentries) => (dbentries, None, None),
1841            DbBackup::V2 {
1842                db_s_uuid,
1843                db_d_uuid,
1844                db_ts_max,
1845                entries,
1846            } => {
1847                // Do stuff.
1848                idlayer.write_db_s_uuid(db_s_uuid)?;
1849                idlayer.write_db_d_uuid(db_d_uuid)?;
1850                idlayer.set_db_ts_max(db_ts_max)?;
1851                (entries, None, None)
1852            }
1853            DbBackup::V3 {
1854                db_s_uuid,
1855                db_d_uuid,
1856                db_ts_max,
1857                keyhandles,
1858                entries,
1859            } => {
1860                // Do stuff.
1861                idlayer.write_db_s_uuid(db_s_uuid)?;
1862                idlayer.write_db_d_uuid(db_d_uuid)?;
1863                idlayer.set_db_ts_max(db_ts_max)?;
1864                idlayer.set_key_handles(keyhandles)?;
1865                (entries, None, None)
1866            }
1867            DbBackup::V4 {
1868                db_s_uuid,
1869                db_d_uuid,
1870                db_ts_max,
1871                keyhandles,
1872                repl_meta,
1873                entries,
1874            } => {
1875                // Do stuff.
1876                idlayer.write_db_s_uuid(db_s_uuid)?;
1877                idlayer.write_db_d_uuid(db_d_uuid)?;
1878                idlayer.set_db_ts_max(db_ts_max)?;
1879                idlayer.set_key_handles(keyhandles)?;
1880                (entries, Some(repl_meta), None)
1881            }
1882            DbBackup::V5 {
1883                version,
1884                db_s_uuid,
1885                db_d_uuid,
1886                db_ts_max,
1887                keyhandles,
1888                repl_meta,
1889                entries,
1890            } => {
1891                // Do stuff.
1892                idlayer.write_db_s_uuid(db_s_uuid)?;
1893                idlayer.write_db_d_uuid(db_d_uuid)?;
1894                idlayer.set_db_ts_max(db_ts_max)?;
1895                idlayer.set_key_handles(keyhandles)?;
1896                (entries, Some(repl_meta), Some(version))
1897            }
1898        };
1899
1900        if let Some(version) = maybe_version {
1901            if version != env!("KANIDM_PKG_SERIES") {
1902                error!("The provided backup data is from server version {} and is unable to be restored on this instance ({})", version, env!("KANIDM_PKG_SERIES"));
1903                return Err(OperationError::DB0001MismatchedRestoreVersion);
1904            }
1905        } else {
1906            error!("The provided backup data is from an older server version and is unable to be restored.");
1907            return Err(OperationError::DB0002MismatchedRestoreVersion);
1908        };
1909
1910        // Rebuild the RUV from the backup.
1911        match repl_meta {
1912            Some(DbReplMeta::V1 { ruv: db_ruv }) => {
1913                self.get_ruv()
1914                    .restore(db_ruv.into_iter().map(|db_cid| db_cid.into()))?;
1915            }
1916            None => {
1917                warn!("Unable to restore replication metadata, this server may need a refresh.");
1918            }
1919        }
1920
1921        info!("Restoring {} entries ...", dbentries.len());
1922
1923        // Now, we setup all the entries with new ids.
1924        let mut id_max = 0;
1925        let identries: Result<Vec<IdRawEntry>, _> = dbentries
1926            .iter()
1927            .map(|e| {
1928                id_max += 1;
1929                let data = serde_json::to_vec(&e).map_err(|_| OperationError::SerdeCborError)?;
1930                Ok(IdRawEntry { id: id_max, data })
1931            })
1932            .collect();
1933
1934        let idlayer = self.get_idlayer();
1935
1936        idlayer.write_identries_raw(identries?.into_iter())?;
1937
1938        info!("Restored {} entries", dbentries.len());
1939
1940        let vr = self.verify();
1941        if vr.is_empty() {
1942            Ok(())
1943        } else {
1944            Err(OperationError::ConsistencyError(
1945                vr.into_iter().filter_map(|v| v.err()).collect(),
1946            ))
1947        }
1948    }
1949
1950    /// If any RUV elements are present in the DB, load them now. This provides us with
1951    /// the RUV boundaries and change points from previous operations of the server, so
1952    /// that ruv_rebuild can "fill in" the gaps.
1953    ///
1954    /// # SAFETY
1955    ///
1956    /// Note that you should only call this function during the server startup
1957    /// to reload the RUV data from the entries of the database.
1958    ///
1959    /// Before calling this, the in memory ruv MUST be clear.
1960    #[instrument(level = "debug", name = "be::ruv_rebuild", skip_all)]
1961    fn ruv_reload(&mut self) -> Result<(), OperationError> {
1962        let idlayer = self.get_idlayer();
1963
1964        let db_ruv = idlayer.get_db_ruv()?;
1965
1966        // Setup the CID's that existed previously. We don't need to know what entries
1967        // they affect, we just need them to ensure that we have ranges for replication
1968        // comparison to take effect properly.
1969        self.get_ruv().restore(db_ruv)?;
1970
1971        // Then populate the RUV with the data from the entries.
1972        self.ruv_rebuild()
1973    }
1974
1975    #[instrument(level = "debug", name = "be::ruv_rebuild", skip_all)]
1976    fn ruv_rebuild(&mut self) -> Result<(), OperationError> {
1977        // Rebuild the ruv!
1978        // For now this has to read from all the entries in the DB, but in the future
1979        // we'll actually store this properly (?). If it turns out this is really fast
1980        // we may just rebuild this always on startup.
1981
1982        // NOTE: An important detail is that we don't rely on indexes here!
1983
1984        let idl = IdList::AllIds;
1985        let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
1986            admin_error!(?e, "get_identry failed");
1987            e
1988        })?;
1989
1990        self.get_ruv().rebuild(&entries)?;
1991
1992        Ok(())
1993    }
1994
1995    pub fn quarantine_entry(&mut self, id: u64) -> Result<(), OperationError> {
1996        self.get_idlayer().quarantine_entry(id)?;
1997        // We have to set the index version to 0 so that on next start we force
1998        // a reindex to automatically occur.
1999        self.set_db_index_version(0)
2000    }
2001
2002    pub fn restore_quarantined(&mut self, id: u64) -> Result<(), OperationError> {
2003        self.get_idlayer().restore_quarantined(id)?;
2004        // We have to set the index version to 0 so that on next start we force
2005        // a reindex to automatically occur.
2006        self.set_db_index_version(0)
2007    }
2008
2009    #[cfg(any(test, debug_assertions))]
2010    pub fn clear_cache(&mut self) -> Result<(), OperationError> {
2011        self.get_idlayer().clear_cache()
2012    }
2013
2014    pub fn commit(self) -> Result<(), OperationError> {
2015        let BackendWriteTransaction {
2016            mut idlayer,
2017            idxmeta_wr,
2018            ruv,
2019        } = self;
2020
2021        // write the ruv content back to the db.
2022        idlayer.write_db_ruv(ruv.added(), ruv.removed())?;
2023
2024        idlayer.commit().map(|()| {
2025            ruv.commit();
2026            idxmeta_wr.commit();
2027        })
2028    }
2029
2030    pub(crate) fn reset_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
2031        // The value is missing. Generate a new one and store it.
2032        let nsid = Uuid::new_v4();
2033        self.get_idlayer().write_db_s_uuid(nsid).map_err(|err| {
2034            error!(?err, "Unable to persist server uuid");
2035            err
2036        })?;
2037        Ok(nsid)
2038    }
2039    pub fn get_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
2040        let res = self.get_idlayer().get_db_s_uuid().map_err(|err| {
2041            error!(?err, "Failed to read server uuid");
2042            err
2043        })?;
2044        match res {
2045            Some(s_uuid) => Ok(s_uuid),
2046            None => self.reset_db_s_uuid(),
2047        }
2048    }
2049
2050    /// This generates a new domain UUID and stores it into the database,
2051    /// returning the new UUID
2052    fn reset_db_d_uuid(&mut self) -> Result<Uuid, OperationError> {
2053        let nsid = Uuid::new_v4();
2054        self.get_idlayer().write_db_d_uuid(nsid).map_err(|err| {
2055            error!(?err, "Unable to persist domain uuid");
2056            err
2057        })?;
2058        Ok(nsid)
2059    }
2060
2061    /// Manually set a new domain UUID and store it into the DB. This is used
2062    /// as part of a replication refresh.
2063    pub fn set_db_d_uuid(&mut self, nsid: Uuid) -> Result<(), OperationError> {
2064        self.get_idlayer().write_db_d_uuid(nsid)
2065    }
2066
2067    /// This pulls the domain UUID from the database
2068    pub fn get_db_d_uuid(&mut self) -> Result<Uuid, OperationError> {
2069        let res = self.get_idlayer().get_db_d_uuid().map_err(|err| {
2070            error!(?err, "Failed to read domain uuid");
2071            err
2072        })?;
2073        match res {
2074            Some(d_uuid) => Ok(d_uuid),
2075            None => self.reset_db_d_uuid(),
2076        }
2077    }
2078
2079    pub fn set_db_ts_max(&mut self, ts: Duration) -> Result<(), OperationError> {
2080        self.get_idlayer().set_db_ts_max(ts)
2081    }
2082
2083    pub fn get_db_ts_max(&mut self, ts: Duration) -> Result<Duration, OperationError> {
2084        // if none, return ts. If found, return it.
2085        match self.get_idlayer().get_db_ts_max()? {
2086            Some(dts) => Ok(dts),
2087            None => Ok(ts),
2088        }
2089    }
2090
2091    fn get_db_index_version(&mut self) -> Result<i64, OperationError> {
2092        self.get_idlayer().get_db_index_version()
2093    }
2094
2095    fn set_db_index_version(&mut self, v: i64) -> Result<(), OperationError> {
2096        self.get_idlayer().set_db_index_version(v)
2097    }
2098}
2099
2100// We have a number of hardcoded, "obvious" slopes that should
2101// exist. We return these when the analysis has not been run, as
2102// these are values that are generally "good enough" for most applications
2103fn get_idx_slope_default(ikey: &IdxKey) -> IdxSlope {
2104    match (ikey.attr.as_str(), &ikey.itype) {
2105        (ATTR_NAME, IndexType::Equality)
2106        | (ATTR_SPN, IndexType::Equality)
2107        | (ATTR_UUID, IndexType::Equality) => 1,
2108        (ATTR_CLASS, IndexType::Equality) => 180,
2109        (_, IndexType::Equality) => 45,
2110        (_, IndexType::SubString) => 90,
2111        (_, IndexType::Presence) => 90,
2112    }
2113}
2114
2115// In the future this will do the routing between the chosen backends etc.
2116impl Backend {
2117    #[instrument(level = "debug", name = "be::new", skip_all)]
2118    pub fn new(
2119        mut cfg: BackendConfig,
2120        // path: &str,
2121        // mut pool_size: u32,
2122        // fstype: FsType,
2123        idxkeys: Vec<IdxKey>,
2124        vacuum: bool,
2125    ) -> Result<Self, OperationError> {
2126        debug!(db_tickets = ?cfg.pool_size, profile = %env!("KANIDM_PROFILE_NAME"), cpu_flags = %env!("KANIDM_CPU_FLAGS"));
2127
2128        // If in memory, reduce pool to 1
2129        if cfg.path.as_os_str().is_empty() {
2130            cfg.pool_size = 1;
2131        }
2132
2133        // Setup idxkeys here. By default we set these all to "max slope" aka
2134        // all indexes are "equal" but also worse case unless analysed.
2135        //
2136        // During startup this will be "fixed" as the schema core will call reload_idxmeta
2137        // which will trigger a reload of the analysis data (if present).
2138        let idxkeys: Map<_, _> = idxkeys
2139            .into_iter()
2140            .map(|ikey| {
2141                let slope = get_idx_slope_default(&ikey);
2142                (ikey, slope)
2143            })
2144            .collect();
2145
2146        // Load the replication update vector here. Initially we build an in memory
2147        // RUV, and then we load it from the DB.
2148        let ruv = Arc::new(ReplicationUpdateVector::default());
2149
2150        // this has a ::memory() type, but will path == "" work?
2151        let idlayer = Arc::new(IdlArcSqlite::new(&cfg, vacuum)?);
2152        let be = Backend {
2153            cfg,
2154            idlayer,
2155            ruv,
2156            idxmeta: Arc::new(CowCell::new(IdxMeta::new(idxkeys))),
2157        };
2158
2159        // Now complete our setup with a txn
2160        // In this case we can use an empty idx meta because we don't
2161        // access any parts of
2162        // the indexing subsystem here.
2163        let mut idl_write = be.idlayer.write()?;
2164        idl_write
2165            .setup()
2166            .and_then(|_| idl_write.commit())
2167            .map_err(|e| {
2168                admin_error!(?e, "Failed to setup idlayer");
2169                e
2170            })?;
2171
2172        // Now rebuild the ruv.
2173        let mut be_write = be.write()?;
2174        be_write
2175            .ruv_reload()
2176            .and_then(|_| be_write.commit())
2177            .map_err(|e| {
2178                admin_error!(?e, "Failed to reload ruv");
2179                e
2180            })?;
2181
2182        Ok(be)
2183    }
2184
2185    pub fn get_pool_size(&self) -> u32 {
2186        debug_assert!(self.cfg.pool_size > 0);
2187        self.cfg.pool_size
2188    }
2189
2190    pub fn try_quiesce(&self) {
2191        self.idlayer.try_quiesce();
2192    }
2193
2194    pub fn read(&self) -> Result<BackendReadTransaction<'_>, OperationError> {
2195        Ok(BackendReadTransaction {
2196            idlayer: self.idlayer.read()?,
2197            idxmeta: self.idxmeta.read(),
2198            ruv: self.ruv.read(),
2199        })
2200    }
2201
2202    pub fn write(&self) -> Result<BackendWriteTransaction<'_>, OperationError> {
2203        Ok(BackendWriteTransaction {
2204            idlayer: self.idlayer.write()?,
2205            idxmeta_wr: self.idxmeta.write(),
2206            ruv: self.ruv.write(),
2207        })
2208    }
2209}
2210
2211// What are the possible actions we'll receive here?
2212
2213#[cfg(test)]
2214mod tests {
2215    use super::super::entry::{Entry, EntryInit, EntryNew};
2216    use super::Limits;
2217    use super::{
2218        Backend, BackendConfig, BackendTransaction, BackendWriteTransaction, DbBackup, IdList,
2219        IdxKey, OperationError,
2220    };
2221    use crate::prelude::*;
2222    use crate::repl::cid::Cid;
2223    use crate::value::{IndexType, PartialValue, Value};
2224    use idlset::v2::IDLBitRange;
2225    use std::fs;
2226    use std::iter::FromIterator;
2227    use std::path::Path;
2228    use std::sync::Arc;
2229    use std::time::Duration;
2230
2231    lazy_static! {
2232        static ref CID_ZERO: Cid = Cid::new_zero();
2233        static ref CID_ONE: Cid = Cid::new_count(1);
2234        static ref CID_TWO: Cid = Cid::new_count(2);
2235        static ref CID_THREE: Cid = Cid::new_count(3);
2236        static ref CID_ADV: Cid = Cid::new_count(10);
2237    }
2238
2239    macro_rules! run_test {
2240        ($test_fn:expr) => {{
2241            sketching::test_init();
2242
2243            // This is a demo idxmeta, purely for testing.
2244            let idxmeta = vec![
2245                IdxKey {
2246                    attr: Attribute::Name.into(),
2247                    itype: IndexType::Equality,
2248                },
2249                IdxKey {
2250                    attr: Attribute::Name.into(),
2251                    itype: IndexType::Presence,
2252                },
2253                IdxKey {
2254                    attr: Attribute::Name.into(),
2255                    itype: IndexType::SubString,
2256                },
2257                IdxKey {
2258                    attr: Attribute::Uuid.into(),
2259                    itype: IndexType::Equality,
2260                },
2261                IdxKey {
2262                    attr: Attribute::Uuid.into(),
2263                    itype: IndexType::Presence,
2264                },
2265                IdxKey {
2266                    attr: Attribute::TestAttr.into(),
2267                    itype: IndexType::Equality,
2268                },
2269                IdxKey {
2270                    attr: Attribute::TestNumber.into(),
2271                    itype: IndexType::Equality,
2272                },
2273            ];
2274
2275            let be = Backend::new(BackendConfig::new_test("main"), idxmeta, false)
2276                .expect("Failed to setup backend");
2277
2278            let mut be_txn = be.write().unwrap();
2279
2280            let r = $test_fn(&mut be_txn);
2281            // Commit, to guarantee it worked.
2282            assert!(be_txn.commit().is_ok());
2283            r
2284        }};
2285    }
2286
2287    macro_rules! entry_exists {
2288        ($be:expr, $ent:expr) => {{
2289            let ei = $ent.clone().into_sealed_committed();
2290            let filt = ei
2291                .filter_from_attrs(&[Attribute::Uuid.into()])
2292                .expect("failed to generate filter")
2293                .into_valid_resolved();
2294            let lims = Limits::unlimited();
2295            let entries = $be.search(&lims, &filt).expect("failed to search");
2296            entries.first().is_some()
2297        }};
2298    }
2299
2300    macro_rules! entry_attr_pres {
2301        ($be:expr, $ent:expr, $attr:expr) => {{
2302            let ei = $ent.clone().into_sealed_committed();
2303            let filt = ei
2304                .filter_from_attrs(&[Attribute::UserId.into()])
2305                .expect("failed to generate filter")
2306                .into_valid_resolved();
2307            let lims = Limits::unlimited();
2308            let entries = $be.search(&lims, &filt).expect("failed to search");
2309            match entries.first() {
2310                Some(ent) => ent.attribute_pres($attr),
2311                None => false,
2312            }
2313        }};
2314    }
2315
2316    macro_rules! idl_state {
2317        ($be:expr, $attr:expr, $itype:expr, $idx_key:expr, $expect:expr) => {{
2318            let t_idl = $be
2319                .load_test_idl(&$attr, $itype, &$idx_key.to_string())
2320                .expect("IdList Load failed");
2321            let t = $expect.map(|v: Vec<u64>| IDLBitRange::from_iter(v));
2322            assert_eq!(t_idl, t);
2323        }};
2324    }
2325
2326    #[test]
2327    fn test_be_simple_create() {
2328        run_test!(|be: &mut BackendWriteTransaction| {
2329            trace!("Simple Create");
2330
2331            let empty_result = be.create(&CID_ZERO, Vec::with_capacity(0));
2332            trace!("{:?}", empty_result);
2333            assert_eq!(empty_result, Err(OperationError::EmptyRequest));
2334
2335            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2336            e.add_ava(Attribute::UserId, Value::from("william"));
2337            e.add_ava(
2338                Attribute::Uuid,
2339                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2340            );
2341            let e = e.into_sealed_new();
2342
2343            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
2344
2345            assert!(single_result.is_ok());
2346
2347            // Construct a filter
2348            assert!(entry_exists!(be, e));
2349        });
2350    }
2351
2352    #[test]
2353    fn test_be_simple_search() {
2354        run_test!(|be: &mut BackendWriteTransaction| {
2355            trace!("Simple Search");
2356
2357            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2358            e.add_ava(Attribute::UserId, Value::from("claire"));
2359            e.add_ava(
2360                Attribute::Uuid,
2361                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2362            );
2363            let e = e.into_sealed_new();
2364
2365            let single_result = be.create(&CID_ZERO, vec![e]);
2366            assert!(single_result.is_ok());
2367            // Test a simple EQ search
2368
2369            let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("claire")));
2370
2371            let lims = Limits::unlimited();
2372
2373            let r = be.search(&lims, &filt);
2374            assert!(r.expect("Search failed!").len() == 1);
2375
2376            // Test empty search
2377
2378            // Test class pres
2379
2380            // Search with no results
2381        });
2382    }
2383
2384    #[test]
2385    fn test_be_search_with_invalid() {
2386        run_test!(|be: &mut BackendWriteTransaction| {
2387            trace!("Simple Search");
2388
2389            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2390            e.add_ava(Attribute::UserId, Value::from("bagel"));
2391            e.add_ava(
2392                Attribute::Uuid,
2393                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2394            );
2395            let e = e.into_sealed_new();
2396
2397            let single_result = be.create(&CID_ZERO, vec![e]);
2398            assert!(single_result.is_ok());
2399
2400            // Test Search with or condition including invalid attribute
2401            let filt = filter_resolved!(f_or(vec![
2402                f_eq(Attribute::UserId, PartialValue::new_utf8s("bagel")),
2403                f_invalid(Attribute::UserId)
2404            ]));
2405
2406            let lims = Limits::unlimited();
2407
2408            let r = be.search(&lims, &filt);
2409            assert!(r.expect("Search failed!").len() == 1);
2410
2411            // Test Search with or condition including invalid attribute
2412            let filt = filter_resolved!(f_and(vec![
2413                f_eq(Attribute::UserId, PartialValue::new_utf8s("bagel")),
2414                f_invalid(Attribute::UserId)
2415            ]));
2416
2417            let lims = Limits::unlimited();
2418
2419            let r = be.search(&lims, &filt);
2420            assert!(r.expect("Search failed!").is_empty());
2421        });
2422    }
2423
2424    #[test]
2425    fn test_be_simple_modify() {
2426        run_test!(|be: &mut BackendWriteTransaction| {
2427            trace!("Simple Modify");
2428            let lims = Limits::unlimited();
2429            // First create some entries (3?)
2430            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2431            e1.add_ava(Attribute::UserId, Value::from("william"));
2432            e1.add_ava(
2433                Attribute::Uuid,
2434                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2435            );
2436
2437            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2438            e2.add_ava(Attribute::UserId, Value::from("alice"));
2439            e2.add_ava(
2440                Attribute::Uuid,
2441                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2442            );
2443
2444            let ve1 = e1.clone().into_sealed_new();
2445            let ve2 = e2.clone().into_sealed_new();
2446
2447            assert!(be.create(&CID_ZERO, vec![ve1, ve2]).is_ok());
2448            assert!(entry_exists!(be, e1));
2449            assert!(entry_exists!(be, e2));
2450
2451            // You need to now retrieve the entries back out to get the entry id's
2452            let mut results = be
2453                .search(&lims, &filter_resolved!(f_pres(Attribute::UserId)))
2454                .expect("Failed to search");
2455
2456            // Get these out to usable entries.
2457            let r1 = results.remove(0);
2458            let r2 = results.remove(0);
2459
2460            let mut r1 = r1.as_ref().clone().into_invalid();
2461            let mut r2 = r2.as_ref().clone().into_invalid();
2462
2463            // Modify no id (err)
2464            // This is now impossible due to the state machine design.
2465            // However, with some unsafe ....
2466            let ue1 = e1.clone().into_sealed_committed();
2467            assert!(be
2468                .modify(&CID_ZERO, &[Arc::new(ue1.clone())], &[ue1])
2469                .is_err());
2470            // Modify none
2471            assert!(be.modify(&CID_ZERO, &[], &[]).is_err());
2472
2473            // Make some changes to r1, r2.
2474            let pre1 = Arc::new(r1.clone().into_sealed_committed());
2475            let pre2 = Arc::new(r2.clone().into_sealed_committed());
2476            r1.add_ava(Attribute::TestAttr, Value::from("modified"));
2477            r2.add_ava(Attribute::TestAttr, Value::from("modified"));
2478
2479            // Now ... cheat.
2480
2481            let vr1 = r1.into_sealed_committed();
2482            let vr2 = r2.into_sealed_committed();
2483
2484            // Modify single
2485            assert!(be
2486                .modify(&CID_ZERO, &[pre1], std::slice::from_ref(&vr1))
2487                .is_ok());
2488            // Assert no other changes
2489            assert!(entry_attr_pres!(be, vr1, Attribute::TestAttr));
2490            assert!(!entry_attr_pres!(be, vr2, Attribute::TestAttr));
2491
2492            // Modify both
2493            assert!(be
2494                .modify(
2495                    &CID_ZERO,
2496                    &[Arc::new(vr1.clone()), pre2],
2497                    &[vr1.clone(), vr2.clone()]
2498                )
2499                .is_ok());
2500
2501            assert!(entry_attr_pres!(be, vr1, Attribute::TestAttr));
2502            assert!(entry_attr_pres!(be, vr2, Attribute::TestAttr));
2503        });
2504    }
2505
2506    #[test]
2507    fn test_be_simple_delete() {
2508        run_test!(|be: &mut BackendWriteTransaction| {
2509            trace!("Simple Delete");
2510            let lims = Limits::unlimited();
2511
2512            // First create some entries (3?)
2513            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2514            e1.add_ava(Attribute::UserId, Value::from("william"));
2515            e1.add_ava(
2516                Attribute::Uuid,
2517                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2518            );
2519
2520            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2521            e2.add_ava(Attribute::UserId, Value::from("alice"));
2522            e2.add_ava(
2523                Attribute::Uuid,
2524                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2525            );
2526
2527            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2528            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2529            e3.add_ava(
2530                Attribute::Uuid,
2531                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2532            );
2533
2534            let ve1 = e1.clone().into_sealed_new();
2535            let ve2 = e2.clone().into_sealed_new();
2536            let ve3 = e3.clone().into_sealed_new();
2537
2538            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2539            assert!(entry_exists!(be, e1));
2540            assert!(entry_exists!(be, e2));
2541            assert!(entry_exists!(be, e3));
2542
2543            // You need to now retrieve the entries back out to get the entry id's
2544            let mut results = be
2545                .search(&lims, &filter_resolved!(f_pres(Attribute::UserId)))
2546                .expect("Failed to search");
2547
2548            // Get these out to usable entries.
2549            let r1 = results.remove(0);
2550            let r2 = results.remove(0);
2551            let r3 = results.remove(0);
2552
2553            // Deletes nothing, all entries are live.
2554            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
2555
2556            // Put them into the tombstone state, and write that down.
2557            // This sets up the RUV with the changes.
2558            let r1_ts = r1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
2559
2560            assert!(be
2561                .modify(&CID_ONE, &[r1], std::slice::from_ref(&r1_ts))
2562                .is_ok());
2563
2564            let r2_ts = r2.to_tombstone(CID_TWO.clone()).into_sealed_committed();
2565            let r3_ts = r3.to_tombstone(CID_TWO.clone()).into_sealed_committed();
2566
2567            assert!(be
2568                .modify(&CID_TWO, &[r2, r3], &[r2_ts.clone(), r3_ts.clone()])
2569                .is_ok());
2570
2571            // The entry are now tombstones, but is still in the ruv. This is because we
2572            // targeted CID_ZERO, not ONE.
2573            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
2574
2575            assert!(entry_exists!(be, r1_ts));
2576            assert!(entry_exists!(be, r2_ts));
2577            assert!(entry_exists!(be, r3_ts));
2578
2579            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ONE), Ok(0)));
2580
2581            assert!(entry_exists!(be, r1_ts));
2582            assert!(entry_exists!(be, r2_ts));
2583            assert!(entry_exists!(be, r3_ts));
2584
2585            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_TWO), Ok(1)));
2586
2587            assert!(!entry_exists!(be, r1_ts));
2588            assert!(entry_exists!(be, r2_ts));
2589            assert!(entry_exists!(be, r3_ts));
2590
2591            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(2)));
2592
2593            assert!(!entry_exists!(be, r1_ts));
2594            assert!(!entry_exists!(be, r2_ts));
2595            assert!(!entry_exists!(be, r3_ts));
2596
2597            // Nothing left
2598            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(0)));
2599
2600            assert!(!entry_exists!(be, r1_ts));
2601            assert!(!entry_exists!(be, r2_ts));
2602            assert!(!entry_exists!(be, r3_ts));
2603        });
2604    }
2605
2606    #[test]
2607    fn test_be_backup_restore() {
2608        let db_backup_file_name =
2609            Path::new(option_env!("OUT_DIR").unwrap_or("/tmp")).join(".backup_test.json");
2610        eprintln!(" ⚠️   {}", db_backup_file_name.display());
2611        run_test!(|be: &mut BackendWriteTransaction| {
2612            // Important! Need db metadata setup!
2613            be.reset_db_s_uuid().unwrap();
2614            be.reset_db_d_uuid().unwrap();
2615            be.set_db_ts_max(Duration::from_secs(1)).unwrap();
2616
2617            // First create some entries (3?)
2618            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2619            e1.add_ava(Attribute::UserId, Value::from("william"));
2620            e1.add_ava(
2621                Attribute::Uuid,
2622                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2623            );
2624
2625            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2626            e2.add_ava(Attribute::UserId, Value::from("alice"));
2627            e2.add_ava(
2628                Attribute::Uuid,
2629                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2630            );
2631
2632            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2633            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2634            e3.add_ava(
2635                Attribute::Uuid,
2636                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2637            );
2638
2639            let ve1 = e1.clone().into_sealed_new();
2640            let ve2 = e2.clone().into_sealed_new();
2641            let ve3 = e3.clone().into_sealed_new();
2642
2643            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2644            assert!(entry_exists!(be, e1));
2645            assert!(entry_exists!(be, e2));
2646            assert!(entry_exists!(be, e3));
2647
2648            let result = fs::remove_file(&db_backup_file_name);
2649
2650            if let Err(e) = result {
2651                // if the error is the file is not found, that's what we want so continue,
2652                // otherwise return the error
2653                if e.kind() == std::io::ErrorKind::NotFound {}
2654            }
2655
2656            be.backup(&db_backup_file_name).expect("Backup failed!");
2657            be.restore(&db_backup_file_name).expect("Restore failed!");
2658
2659            assert!(be.verify().is_empty());
2660        });
2661    }
2662
2663    #[test]
2664    fn test_be_backup_restore_tampered() {
2665        let db_backup_file_name =
2666            Path::new(option_env!("OUT_DIR").unwrap_or("/tmp")).join(".backup2_test.json");
2667        eprintln!(" ⚠️   {}", db_backup_file_name.display());
2668        run_test!(|be: &mut BackendWriteTransaction| {
2669            // Important! Need db metadata setup!
2670            be.reset_db_s_uuid().unwrap();
2671            be.reset_db_d_uuid().unwrap();
2672            be.set_db_ts_max(Duration::from_secs(1)).unwrap();
2673            // First create some entries (3?)
2674            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2675            e1.add_ava(Attribute::UserId, Value::from("william"));
2676            e1.add_ava(
2677                Attribute::Uuid,
2678                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2679            );
2680
2681            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2682            e2.add_ava(Attribute::UserId, Value::from("alice"));
2683            e2.add_ava(
2684                Attribute::Uuid,
2685                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2686            );
2687
2688            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2689            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2690            e3.add_ava(
2691                Attribute::Uuid,
2692                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2693            );
2694
2695            let ve1 = e1.clone().into_sealed_new();
2696            let ve2 = e2.clone().into_sealed_new();
2697            let ve3 = e3.clone().into_sealed_new();
2698
2699            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2700            assert!(entry_exists!(be, e1));
2701            assert!(entry_exists!(be, e2));
2702            assert!(entry_exists!(be, e3));
2703
2704            let result = fs::remove_file(&db_backup_file_name);
2705
2706            if let Err(e) = result {
2707                // if the error is the file is not found, that's what we want so continue,
2708                // otherwise return the error
2709                if e.kind() == std::io::ErrorKind::NotFound {}
2710            }
2711
2712            be.backup(&db_backup_file_name).expect("Backup failed!");
2713
2714            // Now here, we need to tamper with the file.
2715            let serialized_string = fs::read_to_string(&db_backup_file_name).unwrap();
2716            trace!(?serialized_string);
2717            let mut dbbak: DbBackup = serde_json::from_str(&serialized_string).unwrap();
2718
2719            match &mut dbbak {
2720                DbBackup::V5 {
2721                    version: _,
2722                    db_s_uuid: _,
2723                    db_d_uuid: _,
2724                    db_ts_max: _,
2725                    keyhandles: _,
2726                    repl_meta: _,
2727                    entries,
2728                } => {
2729                    let _ = entries.pop();
2730                }
2731                _ => {
2732                    // We no longer use these format versions!
2733                    unreachable!()
2734                }
2735            };
2736
2737            let serialized_entries_str = serde_json::to_string_pretty(&dbbak).unwrap();
2738            fs::write(&db_backup_file_name, serialized_entries_str).unwrap();
2739
2740            be.restore(&db_backup_file_name).expect("Restore failed!");
2741
2742            assert!(be.verify().is_empty());
2743        });
2744    }
2745
2746    #[test]
2747    fn test_be_sid_generation_and_reset() {
2748        run_test!(|be: &mut BackendWriteTransaction| {
2749            let sid1 = be.get_db_s_uuid().unwrap();
2750            let sid2 = be.get_db_s_uuid().unwrap();
2751            assert_eq!(sid1, sid2);
2752            let sid3 = be.reset_db_s_uuid().unwrap();
2753            assert!(sid1 != sid3);
2754            let sid4 = be.get_db_s_uuid().unwrap();
2755            assert_eq!(sid3, sid4);
2756        });
2757    }
2758
2759    #[test]
2760    fn test_be_reindex_empty() {
2761        run_test!(|be: &mut BackendWriteTransaction| {
2762            // Add some test data?
2763            let missing = be.missing_idxs().unwrap();
2764            assert_eq!(missing.len(), 7);
2765            assert!(be.reindex(false).is_ok());
2766            let missing = be.missing_idxs().unwrap();
2767            debug!("{:?}", missing);
2768            assert!(missing.is_empty());
2769        });
2770    }
2771
2772    #[test]
2773    fn test_be_reindex_data() {
2774        run_test!(|be: &mut BackendWriteTransaction| {
2775            // Add some test data?
2776            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2777            e1.add_ava(Attribute::Name, Value::new_iname("william"));
2778            e1.add_ava(
2779                Attribute::Uuid,
2780                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2781            );
2782            let e1 = e1.into_sealed_new();
2783
2784            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2785            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
2786            e2.add_ava(
2787                Attribute::Uuid,
2788                Value::from("bd651620-00dd-426b-aaa0-4494f7b7906f"),
2789            );
2790            let e2 = e2.into_sealed_new();
2791
2792            be.create(&CID_ZERO, vec![e1, e2]).unwrap();
2793
2794            // purge indexes
2795            be.danger_purge_idxs().unwrap();
2796            // Check they are gone
2797            let missing = be.missing_idxs().unwrap();
2798            assert_eq!(missing.len(), 7);
2799            assert!(be.reindex(false).is_ok());
2800            let missing = be.missing_idxs().unwrap();
2801            debug!("{:?}", missing);
2802            assert!(missing.is_empty());
2803            // check name and uuid ids on eq, sub, pres
2804
2805            idl_state!(
2806                be,
2807                Attribute::Name,
2808                IndexType::Equality,
2809                "william",
2810                Some(vec![1])
2811            );
2812
2813            idl_state!(
2814                be,
2815                Attribute::Name,
2816                IndexType::Equality,
2817                "claire",
2818                Some(vec![2])
2819            );
2820
2821            for sub in [
2822                "w", "m", "wi", "il", "ll", "li", "ia", "am", "wil", "ill", "lli", "lia", "iam",
2823            ] {
2824                idl_state!(
2825                    be,
2826                    Attribute::Name,
2827                    IndexType::SubString,
2828                    sub,
2829                    Some(vec![1])
2830                );
2831            }
2832
2833            for sub in [
2834                "c", "r", "e", "cl", "la", "ai", "ir", "re", "cla", "lai", "air", "ire",
2835            ] {
2836                idl_state!(
2837                    be,
2838                    Attribute::Name,
2839                    IndexType::SubString,
2840                    sub,
2841                    Some(vec![2])
2842                );
2843            }
2844
2845            for sub in ["i", "a", "l"] {
2846                idl_state!(
2847                    be,
2848                    Attribute::Name,
2849                    IndexType::SubString,
2850                    sub,
2851                    Some(vec![1, 2])
2852                );
2853            }
2854
2855            idl_state!(
2856                be,
2857                Attribute::Name,
2858                IndexType::Presence,
2859                "_",
2860                Some(vec![1, 2])
2861            );
2862
2863            idl_state!(
2864                be,
2865                Attribute::Uuid,
2866                IndexType::Equality,
2867                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
2868                Some(vec![1])
2869            );
2870
2871            idl_state!(
2872                be,
2873                Attribute::Uuid,
2874                IndexType::Equality,
2875                "bd651620-00dd-426b-aaa0-4494f7b7906f",
2876                Some(vec![2])
2877            );
2878
2879            idl_state!(
2880                be,
2881                Attribute::Uuid,
2882                IndexType::Presence,
2883                "_",
2884                Some(vec![1, 2])
2885            );
2886
2887            // Show what happens with empty
2888
2889            idl_state!(
2890                be,
2891                Attribute::Name,
2892                IndexType::Equality,
2893                "not-exist",
2894                Some(Vec::with_capacity(0))
2895            );
2896
2897            idl_state!(
2898                be,
2899                Attribute::Uuid,
2900                IndexType::Equality,
2901                "fake-0079-4b8c-8a56-593b22aa44d1",
2902                Some(Vec::with_capacity(0))
2903            );
2904
2905            let uuid_p_idl = be
2906                .load_test_idl(&Attribute::from("not_indexed"), IndexType::Presence, "_")
2907                .unwrap(); // unwrap the result
2908            assert_eq!(uuid_p_idl, None);
2909
2910            // Check name2uuid
2911            let claire_uuid = uuid!("bd651620-00dd-426b-aaa0-4494f7b7906f");
2912            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
2913
2914            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
2915            assert_eq!(be.name2uuid("william"), Ok(Some(william_uuid)));
2916            assert_eq!(
2917                be.name2uuid("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2918                Ok(None)
2919            );
2920            // check uuid2spn
2921            assert_eq!(
2922                be.uuid2spn(claire_uuid),
2923                Ok(Some(Value::new_iname("claire")))
2924            );
2925            assert_eq!(
2926                be.uuid2spn(william_uuid),
2927                Ok(Some(Value::new_iname("william")))
2928            );
2929            // check uuid2rdn
2930            assert_eq!(
2931                be.uuid2rdn(claire_uuid),
2932                Ok(Some("name=claire".to_string()))
2933            );
2934            assert_eq!(
2935                be.uuid2rdn(william_uuid),
2936                Ok(Some("name=william".to_string()))
2937            );
2938        });
2939    }
2940
2941    #[test]
2942    fn test_be_index_create_delete_simple() {
2943        run_test!(|be: &mut BackendWriteTransaction| {
2944            // First, setup our index tables!
2945            assert!(be.reindex(false).is_ok());
2946            // Test that on entry create, the indexes are made correctly.
2947            // this is a similar case to reindex.
2948            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2949            e1.add_ava(Attribute::Name, Value::from("william"));
2950            e1.add_ava(
2951                Attribute::Uuid,
2952                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2953            );
2954            let e1 = e1.into_sealed_new();
2955
2956            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
2957            let mut rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
2958            let e1 = rset.pop().unwrap();
2959
2960            idl_state!(
2961                be,
2962                Attribute::Name.as_ref(),
2963                IndexType::Equality,
2964                "william",
2965                Some(vec![1])
2966            );
2967
2968            idl_state!(
2969                be,
2970                Attribute::Name.as_ref(),
2971                IndexType::Presence,
2972                "_",
2973                Some(vec![1])
2974            );
2975
2976            idl_state!(
2977                be,
2978                Attribute::Uuid.as_ref(),
2979                IndexType::Equality,
2980                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
2981                Some(vec![1])
2982            );
2983
2984            idl_state!(
2985                be,
2986                Attribute::Uuid.as_ref(),
2987                IndexType::Presence,
2988                "_",
2989                Some(vec![1])
2990            );
2991
2992            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
2993            assert_eq!(be.name2uuid("william"), Ok(Some(william_uuid)));
2994            assert_eq!(be.uuid2spn(william_uuid), Ok(Some(Value::from("william"))));
2995            assert_eq!(
2996                be.uuid2rdn(william_uuid),
2997                Ok(Some("name=william".to_string()))
2998            );
2999
3000            // == Now we reap_tombstones, and assert we removed the items.
3001            let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3002            assert!(be.modify(&CID_ONE, &[e1], &[e1_ts]).is_ok());
3003            be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
3004
3005            idl_state!(
3006                be,
3007                Attribute::Name.as_ref(),
3008                IndexType::Equality,
3009                "william",
3010                Some(Vec::with_capacity(0))
3011            );
3012
3013            idl_state!(
3014                be,
3015                Attribute::Name.as_ref(),
3016                IndexType::Presence,
3017                "_",
3018                Some(Vec::with_capacity(0))
3019            );
3020
3021            idl_state!(
3022                be,
3023                Attribute::Uuid.as_ref(),
3024                IndexType::Equality,
3025                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3026                Some(Vec::with_capacity(0))
3027            );
3028
3029            idl_state!(
3030                be,
3031                Attribute::Uuid.as_ref(),
3032                IndexType::Presence,
3033                "_",
3034                Some(Vec::with_capacity(0))
3035            );
3036
3037            assert_eq!(be.name2uuid("william"), Ok(None));
3038            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3039            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3040        })
3041    }
3042
3043    #[test]
3044    fn test_be_index_create_delete_multi() {
3045        run_test!(|be: &mut BackendWriteTransaction| {
3046            // delete multiple entries at a time, without deleting others
3047            // First, setup our index tables!
3048            assert!(be.reindex(false).is_ok());
3049            // Test that on entry create, the indexes are made correctly.
3050            // this is a similar case to reindex.
3051            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3052            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3053            e1.add_ava(
3054                Attribute::Uuid,
3055                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3056            );
3057            let e1 = e1.into_sealed_new();
3058
3059            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3060            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3061            e2.add_ava(
3062                Attribute::Uuid,
3063                Value::from("bd651620-00dd-426b-aaa0-4494f7b7906f"),
3064            );
3065            let e2 = e2.into_sealed_new();
3066
3067            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
3068            e3.add_ava(Attribute::UserId, Value::new_iname("lucy"));
3069            e3.add_ava(
3070                Attribute::Uuid,
3071                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
3072            );
3073            let e3 = e3.into_sealed_new();
3074
3075            let mut rset = be.create(&CID_ZERO, vec![e1, e2, e3]).unwrap();
3076            rset.remove(1);
3077            let mut rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3078            let e1 = rset.pop().unwrap();
3079            let e3 = rset.pop().unwrap();
3080
3081            // Now remove e1, e3.
3082            let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3083            let e3_ts = e3.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3084            assert!(be.modify(&CID_ONE, &[e1, e3], &[e1_ts, e3_ts]).is_ok());
3085            be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
3086
3087            idl_state!(
3088                be,
3089                Attribute::Name.as_ref(),
3090                IndexType::Equality,
3091                "claire",
3092                Some(vec![2])
3093            );
3094
3095            idl_state!(
3096                be,
3097                Attribute::Name.as_ref(),
3098                IndexType::Presence,
3099                "_",
3100                Some(vec![2])
3101            );
3102
3103            idl_state!(
3104                be,
3105                Attribute::Uuid.as_ref(),
3106                IndexType::Equality,
3107                "bd651620-00dd-426b-aaa0-4494f7b7906f",
3108                Some(vec![2])
3109            );
3110
3111            idl_state!(
3112                be,
3113                Attribute::Uuid.as_ref(),
3114                IndexType::Presence,
3115                "_",
3116                Some(vec![2])
3117            );
3118
3119            let claire_uuid = uuid!("bd651620-00dd-426b-aaa0-4494f7b7906f");
3120            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3121            let lucy_uuid = uuid!("7b23c99d-c06b-4a9a-a958-3afa56383e1d");
3122
3123            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
3124            let x = be.uuid2spn(claire_uuid);
3125            trace!(?x);
3126            assert_eq!(
3127                be.uuid2spn(claire_uuid),
3128                Ok(Some(Value::new_iname("claire")))
3129            );
3130            assert_eq!(
3131                be.uuid2rdn(claire_uuid),
3132                Ok(Some("name=claire".to_string()))
3133            );
3134
3135            assert_eq!(be.name2uuid("william"), Ok(None));
3136            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3137            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3138
3139            assert_eq!(be.name2uuid("lucy"), Ok(None));
3140            assert_eq!(be.uuid2spn(lucy_uuid), Ok(None));
3141            assert_eq!(be.uuid2rdn(lucy_uuid), Ok(None));
3142        })
3143    }
3144
3145    #[test]
3146    fn test_be_index_modify_simple() {
3147        run_test!(|be: &mut BackendWriteTransaction| {
3148            assert!(be.reindex(false).is_ok());
3149            // modify with one type, ensuring we clean the indexes behind
3150            // us. For the test to be "accurate" we must add one attr, remove one attr
3151            // and change one attr.
3152            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3153            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3154            e1.add_ava(
3155                Attribute::Uuid,
3156                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3157            );
3158            e1.add_ava(Attribute::TestAttr, Value::from("test"));
3159            let e1 = e1.into_sealed_new();
3160
3161            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3162            let rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3163            // Now, alter the new entry.
3164            let mut ce1 = rset[0].as_ref().clone().into_invalid();
3165            // add something.
3166            ce1.add_ava(Attribute::TestNumber, Value::from("test"));
3167            // remove something.
3168            ce1.purge_ava(Attribute::TestAttr);
3169            // mod something.
3170            ce1.purge_ava(Attribute::Name);
3171            ce1.add_ava(Attribute::Name, Value::new_iname("claire"));
3172
3173            let ce1 = ce1.into_sealed_committed();
3174
3175            be.modify(&CID_ZERO, &rset, &[ce1]).unwrap();
3176
3177            // Now check the idls
3178            idl_state!(
3179                be,
3180                Attribute::Name.as_ref(),
3181                IndexType::Equality,
3182                "claire",
3183                Some(vec![1])
3184            );
3185
3186            idl_state!(
3187                be,
3188                Attribute::Name.as_ref(),
3189                IndexType::Presence,
3190                "_",
3191                Some(vec![1])
3192            );
3193
3194            idl_state!(
3195                be,
3196                Attribute::TestNumber.as_ref(),
3197                IndexType::Equality,
3198                "test",
3199                Some(vec![1])
3200            );
3201
3202            idl_state!(
3203                be,
3204                Attribute::TestAttr,
3205                IndexType::Equality,
3206                "test",
3207                Some(vec![])
3208            );
3209
3210            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3211            assert_eq!(be.name2uuid("william"), Ok(None));
3212            assert_eq!(be.name2uuid("claire"), Ok(Some(william_uuid)));
3213            assert_eq!(
3214                be.uuid2spn(william_uuid),
3215                Ok(Some(Value::new_iname("claire")))
3216            );
3217            assert_eq!(
3218                be.uuid2rdn(william_uuid),
3219                Ok(Some("name=claire".to_string()))
3220            );
3221        })
3222    }
3223
3224    #[test]
3225    fn test_be_index_modify_rename() {
3226        run_test!(|be: &mut BackendWriteTransaction| {
3227            assert!(be.reindex(false).is_ok());
3228            // test when we change name AND uuid
3229            // This will be needing to be correct for conflicts when we add
3230            // replication support!
3231            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3232            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3233            e1.add_ava(
3234                Attribute::Uuid,
3235                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3236            );
3237            let e1 = e1.into_sealed_new();
3238
3239            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3240            let rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3241            // Now, alter the new entry.
3242            let mut ce1 = rset[0].as_ref().clone().into_invalid();
3243            ce1.purge_ava(Attribute::Name);
3244            ce1.purge_ava(Attribute::Uuid);
3245            ce1.add_ava(Attribute::Name, Value::new_iname("claire"));
3246            ce1.add_ava(
3247                Attribute::Uuid,
3248                Value::from("04091a7a-6ce4-42d2-abf5-c2ce244ac9e8"),
3249            );
3250            let ce1 = ce1.into_sealed_committed();
3251
3252            be.modify(&CID_ZERO, &rset, &[ce1]).unwrap();
3253
3254            idl_state!(
3255                be,
3256                Attribute::Name.as_ref(),
3257                IndexType::Equality,
3258                "claire",
3259                Some(vec![1])
3260            );
3261
3262            idl_state!(
3263                be,
3264                Attribute::Uuid.as_ref(),
3265                IndexType::Equality,
3266                "04091a7a-6ce4-42d2-abf5-c2ce244ac9e8",
3267                Some(vec![1])
3268            );
3269
3270            idl_state!(
3271                be,
3272                Attribute::Name.as_ref(),
3273                IndexType::Presence,
3274                "_",
3275                Some(vec![1])
3276            );
3277            idl_state!(
3278                be,
3279                Attribute::Uuid.as_ref(),
3280                IndexType::Presence,
3281                "_",
3282                Some(vec![1])
3283            );
3284
3285            idl_state!(
3286                be,
3287                Attribute::Uuid.as_ref(),
3288                IndexType::Equality,
3289                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3290                Some(Vec::with_capacity(0))
3291            );
3292            idl_state!(
3293                be,
3294                Attribute::Name.as_ref(),
3295                IndexType::Equality,
3296                "william",
3297                Some(Vec::with_capacity(0))
3298            );
3299
3300            let claire_uuid = uuid!("04091a7a-6ce4-42d2-abf5-c2ce244ac9e8");
3301            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3302            assert_eq!(be.name2uuid("william"), Ok(None));
3303            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
3304            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3305            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3306            assert_eq!(
3307                be.uuid2spn(claire_uuid),
3308                Ok(Some(Value::new_iname("claire")))
3309            );
3310            assert_eq!(
3311                be.uuid2rdn(claire_uuid),
3312                Ok(Some("name=claire".to_string()))
3313            );
3314        })
3315    }
3316
3317    #[test]
3318    fn test_be_index_search_simple() {
3319        run_test!(|be: &mut BackendWriteTransaction| {
3320            assert!(be.reindex(false).is_ok());
3321
3322            // Create a test entry with some indexed / unindexed values.
3323            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3324            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3325            e1.add_ava(
3326                Attribute::Uuid,
3327                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3328            );
3329            e1.add_ava(Attribute::NoIndex, Value::from("william"));
3330            e1.add_ava(Attribute::OtherNoIndex, Value::from("william"));
3331            let e1 = e1.into_sealed_new();
3332
3333            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3334            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3335            e2.add_ava(
3336                Attribute::Uuid,
3337                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d2"),
3338            );
3339            let e2 = e2.into_sealed_new();
3340
3341            let _rset = be.create(&CID_ZERO, vec![e1, e2]).unwrap();
3342            // Test fully unindexed
3343            let f_un =
3344                filter_resolved!(f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william")));
3345
3346            let (r, _plan) = be.filter2idl(f_un.to_inner(), 0).unwrap();
3347            match r {
3348                IdList::AllIds => {}
3349                _ => {
3350                    panic!("");
3351                }
3352            }
3353
3354            // Test that a fully indexed search works
3355            let feq = filter_resolved!(f_eq(Attribute::Name, PartialValue::new_utf8s("william")));
3356
3357            let (r, _plan) = be.filter2idl(feq.to_inner(), 0).unwrap();
3358            match r {
3359                IdList::Indexed(idl) => {
3360                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3361                }
3362                _ => {
3363                    panic!("");
3364                }
3365            }
3366
3367            // Test and/or
3368            //   full index and
3369            let f_in_and = filter_resolved!(f_and!([
3370                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3371                f_eq(
3372                    Attribute::Uuid,
3373                    PartialValue::new_utf8s("db237e8a-0079-4b8c-8a56-593b22aa44d1")
3374                )
3375            ]));
3376
3377            let (r, _plan) = be.filter2idl(f_in_and.to_inner(), 0).unwrap();
3378            match r {
3379                IdList::Indexed(idl) => {
3380                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3381                }
3382                _ => {
3383                    panic!("");
3384                }
3385            }
3386
3387            //   partial index and
3388            let f_p1 = filter_resolved!(f_and!([
3389                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3390                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william"))
3391            ]));
3392
3393            let f_p2 = filter_resolved!(f_and!([
3394                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3395                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william"))
3396            ]));
3397
3398            let (r, _plan) = be.filter2idl(f_p1.to_inner(), 0).unwrap();
3399            match r {
3400                IdList::Partial(idl) => {
3401                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3402                }
3403                _ => unreachable!(),
3404            }
3405
3406            let (r, _plan) = be.filter2idl(f_p2.to_inner(), 0).unwrap();
3407            match r {
3408                IdList::Partial(idl) => {
3409                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3410                }
3411                _ => unreachable!(),
3412            }
3413
3414            // Substrings are always partial
3415            let f_p3 = filter_resolved!(f_sub(Attribute::Name, PartialValue::new_utf8s("wil")));
3416
3417            let (r, plan) = be.filter2idl(f_p3.to_inner(), 0).unwrap();
3418            trace!(?r, ?plan);
3419            match r {
3420                IdList::Partial(idl) => {
3421                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3422                }
3423                _ => unreachable!(),
3424            }
3425
3426            //   no index and
3427            let f_no_and = filter_resolved!(f_and!([
3428                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william")),
3429                f_eq(Attribute::OtherNoIndex, PartialValue::new_utf8s("william"))
3430            ]));
3431
3432            let (r, _plan) = be.filter2idl(f_no_and.to_inner(), 0).unwrap();
3433            match r {
3434                IdList::AllIds => {}
3435                _ => {
3436                    panic!("");
3437                }
3438            }
3439
3440            //   full index or
3441            let f_in_or = filter_resolved!(f_or!([f_eq(
3442                Attribute::Name,
3443                PartialValue::new_utf8s("william")
3444            )]));
3445
3446            let (r, _plan) = be.filter2idl(f_in_or.to_inner(), 0).unwrap();
3447            match r {
3448                IdList::Indexed(idl) => {
3449                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3450                }
3451                _ => {
3452                    panic!("");
3453                }
3454            }
3455            //   partial (aka allids) or
3456            let f_un_or = filter_resolved!(f_or!([f_eq(
3457                Attribute::NoIndex,
3458                PartialValue::new_utf8s("william")
3459            )]));
3460
3461            let (r, _plan) = be.filter2idl(f_un_or.to_inner(), 0).unwrap();
3462            match r {
3463                IdList::AllIds => {}
3464                _ => {
3465                    panic!("");
3466                }
3467            }
3468
3469            // Test root andnot
3470            let f_r_andnot = filter_resolved!(f_andnot(f_eq(
3471                Attribute::Name,
3472                PartialValue::new_utf8s("william")
3473            )));
3474
3475            let (r, _plan) = be.filter2idl(f_r_andnot.to_inner(), 0).unwrap();
3476            match r {
3477                IdList::Indexed(idl) => {
3478                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3479                }
3480                _ => {
3481                    panic!("");
3482                }
3483            }
3484
3485            // test andnot as only in and
3486            let f_and_andnot = filter_resolved!(f_and!([f_andnot(f_eq(
3487                Attribute::Name,
3488                PartialValue::new_utf8s("william")
3489            ))]));
3490
3491            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3492            match r {
3493                IdList::Indexed(idl) => {
3494                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3495                }
3496                _ => {
3497                    panic!("");
3498                }
3499            }
3500            // test andnot as only in or
3501            let f_or_andnot = filter_resolved!(f_or!([f_andnot(f_eq(
3502                Attribute::Name,
3503                PartialValue::new_utf8s("william")
3504            ))]));
3505
3506            let (r, _plan) = be.filter2idl(f_or_andnot.to_inner(), 0).unwrap();
3507            match r {
3508                IdList::Indexed(idl) => {
3509                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3510                }
3511                _ => {
3512                    panic!("");
3513                }
3514            }
3515
3516            // test andnot in and (first) with name
3517            let f_and_andnot = filter_resolved!(f_and!([
3518                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire"))),
3519                f_pres(Attribute::Name)
3520            ]));
3521
3522            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3523            match r {
3524                IdList::Indexed(idl) => {
3525                    debug!("{:?}", idl);
3526                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3527                }
3528                _ => {
3529                    panic!("");
3530                }
3531            }
3532            // test andnot in and (last) with name
3533            let f_and_andnot = filter_resolved!(f_and!([
3534                f_pres(Attribute::Name),
3535                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire")))
3536            ]));
3537
3538            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3539            match r {
3540                IdList::Indexed(idl) => {
3541                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3542                }
3543                _ => {
3544                    panic!("");
3545                }
3546            }
3547            // test andnot in and (first) with no-index
3548            let f_and_andnot = filter_resolved!(f_and!([
3549                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire"))),
3550                f_pres(Attribute::NoIndex)
3551            ]));
3552
3553            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3554            match r {
3555                IdList::AllIds => {}
3556                _ => {
3557                    panic!("");
3558                }
3559            }
3560            // test andnot in and (last) with no-index
3561            let f_and_andnot = filter_resolved!(f_and!([
3562                f_pres(Attribute::NoIndex),
3563                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire")))
3564            ]));
3565
3566            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3567            match r {
3568                IdList::AllIds => {}
3569                _ => {
3570                    panic!("");
3571                }
3572            }
3573
3574            //   empty or
3575            let f_e_or = filter_resolved!(f_or!([]));
3576
3577            let (r, _plan) = be.filter2idl(f_e_or.to_inner(), 0).unwrap();
3578            match r {
3579                IdList::Indexed(idl) => {
3580                    assert_eq!(idl, IDLBitRange::from_iter(vec![]));
3581                }
3582                _ => {
3583                    panic!("");
3584                }
3585            }
3586
3587            let f_e_and = filter_resolved!(f_and!([]));
3588
3589            let (r, _plan) = be.filter2idl(f_e_and.to_inner(), 0).unwrap();
3590            match r {
3591                IdList::Indexed(idl) => {
3592                    assert_eq!(idl, IDLBitRange::from_iter(vec![]));
3593                }
3594                _ => {
3595                    panic!("");
3596                }
3597            }
3598        })
3599    }
3600
3601    #[test]
3602    fn test_be_index_search_missing() {
3603        run_test!(|be: &mut BackendWriteTransaction| {
3604            // Test where the index is in schema but not created (purge idxs)
3605            // should fall back to an empty set because we can't satisfy the term
3606            be.danger_purge_idxs().unwrap();
3607            debug!("{:?}", be.missing_idxs().unwrap());
3608            let f_eq = filter_resolved!(f_eq(Attribute::Name, PartialValue::new_utf8s("william")));
3609
3610            let (r, _plan) = be.filter2idl(f_eq.to_inner(), 0).unwrap();
3611            match r {
3612                IdList::AllIds => {}
3613                _ => {
3614                    panic!("");
3615                }
3616            }
3617        })
3618    }
3619
3620    #[test]
3621    fn test_be_index_slope_generation() {
3622        run_test!(|be: &mut BackendWriteTransaction| {
3623            // Create some test entry with some indexed / unindexed values.
3624            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3625            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3626            e1.add_ava(
3627                Attribute::Uuid,
3628                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3629            );
3630            e1.add_ava(Attribute::TestAttr, Value::from("dupe"));
3631            e1.add_ava(Attribute::TestNumber, Value::from("1"));
3632            let e1 = e1.into_sealed_new();
3633
3634            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3635            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3636            e2.add_ava(
3637                Attribute::Uuid,
3638                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d2"),
3639            );
3640            e2.add_ava(Attribute::TestAttr, Value::from("dupe"));
3641            e2.add_ava(Attribute::TestNumber, Value::from("1"));
3642            let e2 = e2.into_sealed_new();
3643
3644            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
3645            e3.add_ava(Attribute::Name, Value::new_iname("benny"));
3646            e3.add_ava(
3647                Attribute::Uuid,
3648                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d3"),
3649            );
3650            e3.add_ava(Attribute::TestAttr, Value::from("dupe"));
3651            e3.add_ava(Attribute::TestNumber, Value::from("2"));
3652            let e3 = e3.into_sealed_new();
3653
3654            let _rset = be.create(&CID_ZERO, vec![e1, e2, e3]).unwrap();
3655
3656            // If the slopes haven't been generated yet, there are some hardcoded values
3657            // that we can use instead. They aren't generated until a first re-index.
3658            assert!(!be.is_idx_slopeyness_generated().unwrap());
3659
3660            let ta_eq_slope = be
3661                .get_idx_slope(&IdxKey::new(Attribute::TestAttr, IndexType::Equality))
3662                .unwrap();
3663            assert_eq!(ta_eq_slope, 45);
3664
3665            let tb_eq_slope = be
3666                .get_idx_slope(&IdxKey::new(Attribute::TestNumber, IndexType::Equality))
3667                .unwrap();
3668            assert_eq!(tb_eq_slope, 45);
3669
3670            let name_eq_slope = be
3671                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Equality))
3672                .unwrap();
3673            assert_eq!(name_eq_slope, 1);
3674            let uuid_eq_slope = be
3675                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Equality))
3676                .unwrap();
3677            assert_eq!(uuid_eq_slope, 1);
3678
3679            let name_pres_slope = be
3680                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Presence))
3681                .unwrap();
3682            assert_eq!(name_pres_slope, 90);
3683            let uuid_pres_slope = be
3684                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Presence))
3685                .unwrap();
3686            assert_eq!(uuid_pres_slope, 90);
3687            // Check the slopes are what we expect for hardcoded values.
3688
3689            // Now check slope generation for the values. Today these are calculated
3690            // at reindex time, so we now perform the re-index.
3691            assert!(be.reindex(false).is_ok());
3692            assert!(be.is_idx_slopeyness_generated().unwrap());
3693
3694            let ta_eq_slope = be
3695                .get_idx_slope(&IdxKey::new(Attribute::TestAttr, IndexType::Equality))
3696                .unwrap();
3697            assert_eq!(ta_eq_slope, 200);
3698
3699            let tb_eq_slope = be
3700                .get_idx_slope(&IdxKey::new(Attribute::TestNumber, IndexType::Equality))
3701                .unwrap();
3702            assert_eq!(tb_eq_slope, 133);
3703
3704            let name_eq_slope = be
3705                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Equality))
3706                .unwrap();
3707            assert_eq!(name_eq_slope, 51);
3708            let uuid_eq_slope = be
3709                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Equality))
3710                .unwrap();
3711            assert_eq!(uuid_eq_slope, 51);
3712
3713            let name_pres_slope = be
3714                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Presence))
3715                .unwrap();
3716            assert_eq!(name_pres_slope, 200);
3717            let uuid_pres_slope = be
3718                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Presence))
3719                .unwrap();
3720            assert_eq!(uuid_pres_slope, 200);
3721        })
3722    }
3723
3724    #[test]
3725    fn test_be_limits_allids() {
3726        run_test!(|be: &mut BackendWriteTransaction| {
3727            let mut lim_allow_allids = Limits::unlimited();
3728            lim_allow_allids.unindexed_allow = true;
3729
3730            let mut lim_deny_allids = Limits::unlimited();
3731            lim_deny_allids.unindexed_allow = false;
3732
3733            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3734            e.add_ava(Attribute::UserId, Value::from("william"));
3735            e.add_ava(
3736                Attribute::Uuid,
3737                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3738            );
3739            e.add_ava(Attribute::NonExist, Value::from("x"));
3740            let e = e.into_sealed_new();
3741            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
3742
3743            assert!(single_result.is_ok());
3744            let filt = e
3745                .filter_from_attrs(&[Attribute::NonExist])
3746                .expect("failed to generate filter")
3747                .into_valid_resolved();
3748            // check allow on allids
3749            let res = be.search(&lim_allow_allids, &filt);
3750            assert!(res.is_ok());
3751            let res = be.exists(&lim_allow_allids, &filt);
3752            assert!(res.is_ok());
3753
3754            // check deny on allids
3755            let res = be.search(&lim_deny_allids, &filt);
3756            assert_eq!(res, Err(OperationError::ResourceLimit));
3757            let res = be.exists(&lim_deny_allids, &filt);
3758            assert_eq!(res, Err(OperationError::ResourceLimit));
3759        })
3760    }
3761
3762    #[test]
3763    fn test_be_limits_results_max() {
3764        run_test!(|be: &mut BackendWriteTransaction| {
3765            let mut lim_allow = Limits::unlimited();
3766            lim_allow.search_max_results = usize::MAX;
3767
3768            let mut lim_deny = Limits::unlimited();
3769            lim_deny.search_max_results = 0;
3770
3771            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3772            e.add_ava(Attribute::UserId, Value::from("william"));
3773            e.add_ava(
3774                Attribute::Uuid,
3775                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3776            );
3777            e.add_ava(Attribute::NonExist, Value::from("x"));
3778            let e = e.into_sealed_new();
3779            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
3780            assert!(single_result.is_ok());
3781
3782            let filt = e
3783                .filter_from_attrs(&[Attribute::NonExist])
3784                .expect("failed to generate filter")
3785                .into_valid_resolved();
3786
3787            // --> This is the all ids path (unindexed)
3788            // check allow on entry max
3789            let res = be.search(&lim_allow, &filt);
3790            assert!(res.is_ok());
3791            let res = be.exists(&lim_allow, &filt);
3792            assert!(res.is_ok());
3793
3794            // check deny on entry max
3795            let res = be.search(&lim_deny, &filt);
3796            assert_eq!(res, Err(OperationError::ResourceLimit));
3797            // we don't limit on exists because we never load the entries.
3798            let res = be.exists(&lim_deny, &filt);
3799            assert!(res.is_ok());
3800
3801            // --> This will shortcut due to indexing.
3802            assert!(be.reindex(false).is_ok());
3803            let res = be.search(&lim_deny, &filt);
3804            assert_eq!(res, Err(OperationError::ResourceLimit));
3805            // we don't limit on exists because we never load the entries.
3806            let res = be.exists(&lim_deny, &filt);
3807            assert!(res.is_ok());
3808        })
3809    }
3810
3811    #[test]
3812    fn test_be_limits_partial_filter() {
3813        run_test!(|be: &mut BackendWriteTransaction| {
3814            // This relies on how we do partials, so it could be a bit sensitive.
3815            // A partial is generated after an allids + indexed in a single and
3816            // as we require both conditions to exist. Allids comes from unindexed
3817            // terms. we need to ensure we don't hit partial threshold too.
3818            //
3819            // This means we need an and query where the first term is allids
3820            // and the second is indexed, but without the filter shortcutting.
3821            //
3822            // To achieve this we need a monstrously evil query.
3823            //
3824            let mut lim_allow = Limits::unlimited();
3825            lim_allow.search_max_filter_test = usize::MAX;
3826
3827            let mut lim_deny = Limits::unlimited();
3828            lim_deny.search_max_filter_test = 0;
3829
3830            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3831            e.add_ava(Attribute::Name, Value::new_iname("william"));
3832            e.add_ava(
3833                Attribute::Uuid,
3834                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3835            );
3836            e.add_ava(Attribute::NonExist, Value::from("x"));
3837            e.add_ava(Attribute::NonExist, Value::from("y"));
3838            let e = e.into_sealed_new();
3839            let single_result = be.create(&CID_ZERO, vec![e]);
3840            assert!(single_result.is_ok());
3841
3842            // Reindex so we have things in place for our query
3843            assert!(be.reindex(false).is_ok());
3844
3845            // 🚨 This is evil!
3846            // The and allows us to hit "allids + indexed -> partial".
3847            // the or terms prevent re-arrangement. They can't be folded or dead
3848            // term elimed either.
3849            //
3850            // This means the f_or nonexist will become allids and the second will be indexed
3851            // due to f_eq userid in both with the result of william.
3852            //
3853            // This creates a partial, and because it's the first iteration in the loop, this
3854            // doesn't encounter partial threshold testing.
3855            let filt = filter_resolved!(f_and!([
3856                f_or!([
3857                    f_eq(Attribute::NonExist, PartialValue::new_utf8s("x")),
3858                    f_eq(Attribute::NonExist, PartialValue::new_utf8s("y"))
3859                ]),
3860                f_or!([
3861                    f_eq(Attribute::Name, PartialValue::new_utf8s("claire")),
3862                    f_eq(Attribute::Name, PartialValue::new_utf8s("william"))
3863                ]),
3864            ]));
3865
3866            let res = be.search(&lim_allow, &filt);
3867            assert!(res.is_ok());
3868            let res = be.exists(&lim_allow, &filt);
3869            assert!(res.is_ok());
3870
3871            // check deny on entry max
3872            let res = be.search(&lim_deny, &filt);
3873            assert_eq!(res, Err(OperationError::ResourceLimit));
3874            // we don't limit on exists because we never load the entries.
3875            let res = be.exists(&lim_deny, &filt);
3876            assert_eq!(res, Err(OperationError::ResourceLimit));
3877        })
3878    }
3879
3880    #[test]
3881    fn test_be_multiple_create() {
3882        sketching::test_init();
3883
3884        // This is a demo idxmeta, purely for testing.
3885        let idxmeta = vec![IdxKey {
3886            attr: Attribute::Uuid,
3887            itype: IndexType::Equality,
3888        }];
3889
3890        let be_a = Backend::new(BackendConfig::new_test("main"), idxmeta.clone(), false)
3891            .expect("Failed to setup backend");
3892
3893        let be_b = Backend::new(BackendConfig::new_test("db_2"), idxmeta, false)
3894            .expect("Failed to setup backend");
3895
3896        let mut be_a_txn = be_a.write().unwrap();
3897        let mut be_b_txn = be_b.write().unwrap();
3898
3899        assert!(be_a_txn.get_db_s_uuid() != be_b_txn.get_db_s_uuid());
3900
3901        // Create into A
3902        let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3903        e.add_ava(Attribute::UserId, Value::from("william"));
3904        e.add_ava(
3905            Attribute::Uuid,
3906            Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3907        );
3908        let e = e.into_sealed_new();
3909
3910        let single_result = be_a_txn.create(&CID_ZERO, vec![e]);
3911
3912        assert!(single_result.is_ok());
3913
3914        // Assert it's in A but not B.
3915        let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("william")));
3916
3917        let lims = Limits::unlimited();
3918
3919        let r = be_a_txn.search(&lims, &filt);
3920        assert!(r.expect("Search failed!").len() == 1);
3921
3922        let r = be_b_txn.search(&lims, &filt);
3923        assert!(r.expect("Search failed!").is_empty());
3924
3925        // Create into B
3926        let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3927        e.add_ava(Attribute::UserId, Value::from("claire"));
3928        e.add_ava(
3929            Attribute::Uuid,
3930            Value::from("0c680959-0944-47d6-9dea-53304d124266"),
3931        );
3932        let e = e.into_sealed_new();
3933
3934        let single_result = be_b_txn.create(&CID_ZERO, vec![e]);
3935
3936        assert!(single_result.is_ok());
3937
3938        // Assert it's in B but not A
3939        let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("claire")));
3940
3941        let lims = Limits::unlimited();
3942
3943        let r = be_a_txn.search(&lims, &filt);
3944        assert!(r.expect("Search failed!").is_empty());
3945
3946        let r = be_b_txn.search(&lims, &filt);
3947        assert!(r.expect("Search failed!").len() == 1);
3948    }
3949}