kanidmd_lib/be/
mod.rs

1//! The backend. This contains the "low level" storage and query code, which is
2//! implemented as a json-like kv document database. This has no rules about content
3//! of the server, which are all enforced at higher levels. The role of the backend
4//! is to persist content safely to disk, load that content, and execute queries
5//! utilising indexes in the most effective way possible.
6
7use crate::be::dbentry::{DbBackup, DbEntry};
8use crate::be::dbrepl::DbReplMeta;
9use crate::entry::Entry;
10use crate::filter::{Filter, FilterPlan, FilterResolved, FilterValidResolved};
11use crate::prelude::*;
12use crate::repl::cid::Cid;
13use crate::repl::proto::ReplCidRange;
14use crate::repl::ruv::{
15    ReplicationUpdateVector, ReplicationUpdateVectorReadTransaction,
16    ReplicationUpdateVectorTransaction, ReplicationUpdateVectorWriteTransaction,
17};
18use crate::utils::trigraph_iter;
19use crate::value::{IndexType, Value};
20use concread::cowcell::*;
21use hashbrown::{HashMap as Map, HashSet};
22use idlset::v2::IDLBitRange;
23use idlset::AndNot;
24use kanidm_proto::internal::{ConsistencyError, OperationError};
25use std::collections::BTreeMap;
26use std::fs;
27use std::ops::DerefMut;
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30use std::time::Duration;
31use tracing::{trace, trace_span};
32use uuid::Uuid;
33
34pub(crate) mod dbentry;
35pub(crate) mod dbrepl;
36pub(crate) mod dbvalue;
37
38mod idl_arc_sqlite;
39mod idl_sqlite;
40pub(crate) mod idxkey;
41pub(crate) mod keystorage;
42
43pub(crate) use self::idxkey::{IdxKey, IdxKeyRef, IdxKeyToRef, IdxSlope};
44use crate::be::idl_arc_sqlite::{
45    IdlArcSqlite, IdlArcSqliteReadTransaction, IdlArcSqliteTransaction,
46    IdlArcSqliteWriteTransaction,
47};
48use kanidm_proto::internal::FsType;
49
50// Currently disabled due to improvements in idlset for intersection handling.
51const FILTER_SEARCH_TEST_THRESHOLD: usize = 0;
52const FILTER_EXISTS_TEST_THRESHOLD: usize = 0;
53const FILTER_SUBSTR_TEST_THRESHOLD: usize = 4;
54
55#[derive(Debug, Clone)]
56/// Limits on the resources a single event can consume. These are defined per-event
57/// as they are derived from the userAuthToken based on that individual session
58pub(crate) struct Limits {
59    pub unindexed_allow: bool,
60    pub search_max_results: usize,
61    pub search_max_filter_test: usize,
62    pub filter_max_elements: usize,
63}
64
65impl Default for Limits {
66    fn default() -> Self {
67        Limits {
68            unindexed_allow: false,
69            search_max_results: DEFAULT_LIMIT_SEARCH_MAX_RESULTS as usize,
70            search_max_filter_test: DEFAULT_LIMIT_SEARCH_MAX_FILTER_TEST as usize,
71            filter_max_elements: DEFAULT_LIMIT_FILTER_MAX_ELEMENTS as usize,
72        }
73    }
74}
75
76impl Limits {
77    pub fn unlimited() -> Self {
78        Limits {
79            unindexed_allow: true,
80            search_max_results: usize::MAX >> 1,
81            search_max_filter_test: usize::MAX >> 1,
82            filter_max_elements: usize::MAX,
83        }
84    }
85
86    pub fn api_token() -> Self {
87        Limits {
88            unindexed_allow: false,
89            search_max_results: DEFAULT_LIMIT_API_SEARCH_MAX_RESULTS as usize,
90            search_max_filter_test: DEFAULT_LIMIT_API_SEARCH_MAX_FILTER_TEST as usize,
91            filter_max_elements: DEFAULT_LIMIT_FILTER_MAX_ELEMENTS as usize,
92        }
93    }
94}
95
96/// The result of a key value request containing the list of entry IDs that
97/// match the filter/query condition.
98#[derive(Debug, Clone)]
99pub enum IdList {
100    /// The value is not indexed, and must be assumed that all entries may match.
101    AllIds,
102    /// The index is "fuzzy" like a bloom filter (perhaps superset is a better description) -
103    /// it containes all elements that do match, but may have extra elements that don't.
104    /// This requires the caller to perform a filter test to assert that all
105    /// returned entries match all assertions within the filter.
106    Partial(IDLBitRange),
107    /// The set was indexed and is below the filter test threshold. This is because it's
108    /// now faster to test with the filter than to continue to access indexes at this point.
109    /// Like a partial set, this is a super set of the entries that match the query.
110    PartialThreshold(IDLBitRange),
111    /// The value is indexed and accurately represents the set of entries that precisely match.
112    Indexed(IDLBitRange),
113}
114
115#[derive(Debug)]
116pub struct IdRawEntry {
117    id: u64,
118    data: Vec<u8>,
119}
120
121#[derive(Debug, Clone)]
122pub struct IdxMeta {
123    pub idxkeys: Map<IdxKey, IdxSlope>,
124}
125
126impl IdxMeta {
127    pub fn new(idxkeys: Map<IdxKey, IdxSlope>) -> Self {
128        IdxMeta { idxkeys }
129    }
130}
131
132#[derive(Clone)]
133pub struct BackendConfig {
134    path: PathBuf,
135    pool_size: u32,
136    db_name: &'static str,
137    fstype: FsType,
138    // Cachesizes?
139    arcsize: Option<usize>,
140}
141
142impl BackendConfig {
143    pub fn new(
144        path: Option<&Path>,
145        pool_size: u32,
146        fstype: FsType,
147        arcsize: Option<usize>,
148    ) -> Self {
149        BackendConfig {
150            pool_size,
151            // This means if path is None, that "" implies an sqlite in memory/ram only database.
152            path: path.unwrap_or_else(|| Path::new("")).to_path_buf(),
153            db_name: "main",
154            fstype,
155            arcsize,
156        }
157    }
158
159    pub(crate) fn new_test(db_name: &'static str) -> Self {
160        BackendConfig {
161            pool_size: 1,
162            path: PathBuf::from(""),
163            db_name,
164            fstype: FsType::Generic,
165            arcsize: Some(2048),
166        }
167    }
168}
169
170#[derive(Clone)]
171pub struct Backend {
172    /// This is the actual datastorage layer.
173    idlayer: Arc<IdlArcSqlite>,
174    /// This is a copy-on-write cache of the index metadata that has been
175    /// extracted from attributes set, in the correct format for the backend
176    /// to consume. We use it to extract indexes from entries during write paths
177    /// and to allow the front end to know what indexes exist during a read.
178    idxmeta: Arc<CowCell<IdxMeta>>,
179    /// The current state of the replication update vector. This is effectively a
180    /// time series index of the full list of all changelog entries and what entries
181    /// that are part of that change.
182    ruv: Arc<ReplicationUpdateVector>,
183    cfg: BackendConfig,
184}
185
186pub struct BackendReadTransaction<'a> {
187    idlayer: IdlArcSqliteReadTransaction<'a>,
188    idxmeta: CowCellReadTxn<IdxMeta>,
189    ruv: ReplicationUpdateVectorReadTransaction<'a>,
190}
191
192unsafe impl Sync for BackendReadTransaction<'_> {}
193
194unsafe impl Send for BackendReadTransaction<'_> {}
195
196pub struct BackendWriteTransaction<'a> {
197    idlayer: IdlArcSqliteWriteTransaction<'a>,
198    idxmeta_wr: CowCellWriteTxn<'a, IdxMeta>,
199    ruv: ReplicationUpdateVectorWriteTransaction<'a>,
200}
201
202impl IdRawEntry {
203    fn into_dbentry(self) -> Result<(u64, DbEntry), OperationError> {
204        serde_json::from_slice(self.data.as_slice())
205            .map_err(|e| {
206                admin_error!(?e, "Serde JSON Error");
207                OperationError::SerdeJsonError
208            })
209            .map(|dbe| (self.id, dbe))
210    }
211
212    fn into_entry(self) -> Result<EntrySealedCommitted, OperationError> {
213        let db_e = serde_json::from_slice(self.data.as_slice()).map_err(|e| {
214            admin_error!(?e, id = %self.id, "Serde JSON Error");
215            let raw_str = String::from_utf8_lossy(self.data.as_slice());
216            debug!(raw = %raw_str);
217            OperationError::SerdeJsonError
218        })?;
219        // let id = u64::try_from(self.id).map_err(|_| OperationError::InvalidEntryId)?;
220        Entry::from_dbentry(db_e, self.id).ok_or(OperationError::CorruptedEntry(self.id))
221    }
222}
223
224pub trait BackendTransaction {
225    type IdlLayerType: IdlArcSqliteTransaction;
226    fn get_idlayer(&mut self) -> &mut Self::IdlLayerType;
227
228    type RuvType: ReplicationUpdateVectorTransaction;
229    fn get_ruv(&mut self) -> &mut Self::RuvType;
230
231    fn get_idxmeta_ref(&self) -> &IdxMeta;
232
233    /// Recursively apply a filter, transforming into IdList's on the way. This builds a query
234    /// execution log, so that it can be examined how an operation proceeded.
235    #[allow(clippy::cognitive_complexity)]
236    fn filter2idl(
237        &mut self,
238        filt: &FilterResolved,
239        thres: usize,
240    ) -> Result<(IdList, FilterPlan), OperationError> {
241        Ok(match filt {
242            FilterResolved::Eq(attr, value, idx) => {
243                if idx.is_some() {
244                    // Get the idx_key
245                    let idx_key = value.get_idx_eq_key();
246                    // Get the idl for this
247                    match self
248                        .get_idlayer()
249                        .get_idl(attr, IndexType::Equality, &idx_key)?
250                    {
251                        Some(idl) => (
252                            IdList::Indexed(idl),
253                            FilterPlan::EqIndexed(attr.clone(), idx_key),
254                        ),
255                        None => (IdList::AllIds, FilterPlan::EqCorrupt(attr.clone())),
256                    }
257                } else {
258                    // Schema believes this is not indexed
259                    (IdList::AllIds, FilterPlan::EqUnindexed(attr.clone()))
260                }
261            }
262            FilterResolved::Stw(attr, subvalue, idx)
263            | FilterResolved::Enw(attr, subvalue, idx)
264            | FilterResolved::Cnt(attr, subvalue, idx) => {
265                // Get the idx_key. Not all types support this, so may return "none".
266                trace!(?idx, ?subvalue, ?attr);
267                if let (true, Some(idx_key)) = (idx.is_some(), subvalue.get_idx_sub_key()) {
268                    self.filter2idl_sub(attr, idx_key)?
269                } else {
270                    // Schema believes this is not indexed
271                    (IdList::AllIds, FilterPlan::SubUnindexed(attr.clone()))
272                }
273            }
274            FilterResolved::Pres(attr, idx) => {
275                if idx.is_some() {
276                    // Get the idl for this
277                    match self.get_idlayer().get_idl(attr, IndexType::Presence, "_")? {
278                        Some(idl) => (IdList::Indexed(idl), FilterPlan::PresIndexed(attr.clone())),
279                        None => (IdList::AllIds, FilterPlan::PresCorrupt(attr.clone())),
280                    }
281                } else {
282                    // Schema believes this is not indexed
283                    (IdList::AllIds, FilterPlan::PresUnindexed(attr.clone()))
284                }
285            }
286            FilterResolved::LessThan(attr, _subvalue, _idx) => {
287                // We have no process for indexing this right now.
288                (IdList::AllIds, FilterPlan::LessThanUnindexed(attr.clone()))
289            }
290            FilterResolved::Or(l, _) => {
291                // Importantly if this has no inner elements, this returns
292                // an empty list.
293                let mut plan = Vec::with_capacity(0);
294                let mut result = IDLBitRange::new();
295                let mut partial = false;
296                let mut threshold = false;
297                // For each filter in l
298                for f in l.iter() {
299                    // get their idls
300                    match self.filter2idl(f, thres)? {
301                        (IdList::Indexed(idl), fp) => {
302                            plan.push(fp);
303                            // now union them (if possible)
304                            result = result | idl;
305                        }
306                        (IdList::Partial(idl), fp) => {
307                            plan.push(fp);
308                            // now union them (if possible)
309                            result = result | idl;
310                            partial = true;
311                        }
312                        (IdList::PartialThreshold(idl), fp) => {
313                            plan.push(fp);
314                            // now union them (if possible)
315                            result = result | idl;
316                            partial = true;
317                            threshold = true;
318                        }
319                        (IdList::AllIds, fp) => {
320                            plan.push(fp);
321                            // If we find anything unindexed, the whole term is unindexed.
322                            filter_trace!("Term {:?} is AllIds, shortcut return", f);
323                            let setplan = FilterPlan::OrUnindexed(plan);
324                            return Ok((IdList::AllIds, setplan));
325                        }
326                    }
327                } // end or.iter()
328                  // If we got here, every term must have been indexed or partial indexed.
329                if partial {
330                    if threshold {
331                        let setplan = FilterPlan::OrPartialThreshold(plan);
332                        (IdList::PartialThreshold(result), setplan)
333                    } else {
334                        let setplan = FilterPlan::OrPartial(plan);
335                        (IdList::Partial(result), setplan)
336                    }
337                } else {
338                    let setplan = FilterPlan::OrIndexed(plan);
339                    (IdList::Indexed(result), setplan)
340                }
341            }
342            FilterResolved::And(l, _) => {
343                // This algorithm is a little annoying. I couldn't get it to work with iter and
344                // folds due to the logic needed ...
345
346                // First, setup the two filter lists. We always apply AndNot after positive
347                // and terms.
348                let (f_andnot, f_rem): (Vec<_>, Vec<_>) = l.iter().partition(|f| f.is_andnot());
349
350                // We make this an iter, so everything comes off in order. if we used pop it means we
351                // pull from the tail, which is the WORST item to start with!
352                let mut f_rem_iter = f_rem.iter();
353
354                // Setup the initial result.
355                let (mut cand_idl, fp) = match f_rem_iter.next() {
356                    Some(f) => self.filter2idl(f, thres)?,
357                    None => {
358                        filter_warn!(
359                            "And filter was empty, or contains only AndNot, can not evaluate."
360                        );
361                        return Ok((IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid));
362                    }
363                };
364
365                // Setup the counter of terms we have left to evaluate.
366                // This is used so that we shortcut return ONLY when we really do have
367                // more terms remaining.
368                let mut f_rem_count = f_rem.len() + f_andnot.len() - 1;
369
370                // Setup the query plan tracker
371                let mut plan = vec![fp];
372
373                match &cand_idl {
374                    IdList::Indexed(idl) | IdList::Partial(idl) | IdList::PartialThreshold(idl) => {
375                        // When below thres, we have to return partials to trigger the entry_no_match_filter check.
376                        // But we only do this when there are actually multiple elements in the and,
377                        // because an and with 1 element now is FULLY resolved.
378                        if idl.below_threshold(thres) && f_rem_count > 0 {
379                            let setplan = FilterPlan::AndPartialThreshold(plan);
380                            return Ok((IdList::PartialThreshold(idl.clone()), setplan));
381                        } else if idl.is_empty() {
382                            // Regardless of the input state, if it's empty, this can never
383                            // be satisfied, so return we are indexed and complete.
384                            let setplan = FilterPlan::AndEmptyCand(plan);
385                            return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
386                        }
387                    }
388                    IdList::AllIds => {}
389                }
390
391                // Now, for all remaining,
392                for f in f_rem_iter {
393                    f_rem_count -= 1;
394                    let (inter, fp) = self.filter2idl(f, thres)?;
395                    plan.push(fp);
396                    cand_idl = match (cand_idl, inter) {
397                        (IdList::Indexed(ia), IdList::Indexed(ib)) => {
398                            let r = ia & ib;
399                            if r.below_threshold(thres) && f_rem_count > 0 {
400                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
401                                let setplan = FilterPlan::AndPartialThreshold(plan);
402                                return Ok((IdList::PartialThreshold(r), setplan));
403                            } else if r.is_empty() {
404                                // Regardless of the input state, if it's empty, this can never
405                                // be satisfied, so return we are indexed and complete.
406                                let setplan = FilterPlan::AndEmptyCand(plan);
407                                return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
408                            } else {
409                                IdList::Indexed(r)
410                            }
411                        }
412                        (IdList::Indexed(ia), IdList::Partial(ib))
413                        | (IdList::Partial(ia), IdList::Indexed(ib))
414                        | (IdList::Partial(ia), IdList::Partial(ib)) => {
415                            let r = ia & ib;
416                            if r.below_threshold(thres) && f_rem_count > 0 {
417                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
418                                let setplan = FilterPlan::AndPartialThreshold(plan);
419                                return Ok((IdList::PartialThreshold(r), setplan));
420                            } else {
421                                IdList::Partial(r)
422                            }
423                        }
424                        (IdList::Indexed(ia), IdList::PartialThreshold(ib))
425                        | (IdList::PartialThreshold(ia), IdList::Indexed(ib))
426                        | (IdList::PartialThreshold(ia), IdList::PartialThreshold(ib))
427                        | (IdList::PartialThreshold(ia), IdList::Partial(ib))
428                        | (IdList::Partial(ia), IdList::PartialThreshold(ib)) => {
429                            let r = ia & ib;
430                            if r.below_threshold(thres) && f_rem_count > 0 {
431                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
432                                let setplan = FilterPlan::AndPartialThreshold(plan);
433                                return Ok((IdList::PartialThreshold(r), setplan));
434                            } else {
435                                IdList::PartialThreshold(r)
436                            }
437                        }
438                        (IdList::Indexed(i), IdList::AllIds)
439                        | (IdList::AllIds, IdList::Indexed(i))
440                        | (IdList::Partial(i), IdList::AllIds)
441                        | (IdList::AllIds, IdList::Partial(i)) => IdList::Partial(i),
442                        (IdList::PartialThreshold(i), IdList::AllIds)
443                        | (IdList::AllIds, IdList::PartialThreshold(i)) => {
444                            IdList::PartialThreshold(i)
445                        }
446                        (IdList::AllIds, IdList::AllIds) => IdList::AllIds,
447                    };
448                }
449
450                // debug!("partial cand set ==> {:?}", cand_idl);
451
452                for f in f_andnot.iter() {
453                    f_rem_count -= 1;
454                    let FilterResolved::AndNot(f_in, _) = f else {
455                        filter_error!("Invalid server state, a cand filter leaked to andnot set!");
456                        return Err(OperationError::InvalidState);
457                    };
458                    let (inter, fp) = self.filter2idl(f_in, thres)?;
459                    // It's an and not, so we need to wrap the plan accordingly.
460                    plan.push(FilterPlan::AndNot(Box::new(fp)));
461                    cand_idl = match (cand_idl, inter) {
462                        (IdList::Indexed(ia), IdList::Indexed(ib)) => {
463                            let r = ia.andnot(ib);
464                            /*
465                            // Don't trigger threshold on and nots if fully indexed.
466                            if r.below_threshold(thres) {
467                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
468                                return Ok(IdList::PartialThreshold(r));
469                            } else {
470                                IdList::Indexed(r)
471                            }
472                            */
473                            IdList::Indexed(r)
474                        }
475                        (IdList::Indexed(ia), IdList::Partial(ib))
476                        | (IdList::Partial(ia), IdList::Indexed(ib))
477                        | (IdList::Partial(ia), IdList::Partial(ib)) => {
478                            let r = ia.andnot(ib);
479                            // DO trigger threshold on partials, because we have to apply the filter
480                            // test anyway, so we may as well shortcut at this point.
481                            if r.below_threshold(thres) && f_rem_count > 0 {
482                                let setplan = FilterPlan::AndPartialThreshold(plan);
483                                return Ok((IdList::PartialThreshold(r), setplan));
484                            } else {
485                                IdList::Partial(r)
486                            }
487                        }
488                        (IdList::Indexed(ia), IdList::PartialThreshold(ib))
489                        | (IdList::PartialThreshold(ia), IdList::Indexed(ib))
490                        | (IdList::PartialThreshold(ia), IdList::PartialThreshold(ib))
491                        | (IdList::PartialThreshold(ia), IdList::Partial(ib))
492                        | (IdList::Partial(ia), IdList::PartialThreshold(ib)) => {
493                            let r = ia.andnot(ib);
494                            // DO trigger threshold on partials, because we have to apply the filter
495                            // test anyway, so we may as well shortcut at this point.
496                            if r.below_threshold(thres) && f_rem_count > 0 {
497                                let setplan = FilterPlan::AndPartialThreshold(plan);
498                                return Ok((IdList::PartialThreshold(r), setplan));
499                            } else {
500                                IdList::PartialThreshold(r)
501                            }
502                        }
503
504                        (IdList::Indexed(_), IdList::AllIds)
505                        | (IdList::AllIds, IdList::Indexed(_))
506                        | (IdList::Partial(_), IdList::AllIds)
507                        | (IdList::AllIds, IdList::Partial(_))
508                        | (IdList::PartialThreshold(_), IdList::AllIds)
509                        | (IdList::AllIds, IdList::PartialThreshold(_)) => {
510                            // We could actually generate allids here
511                            // and then try to reduce the and-not set, but
512                            // for now we just return all ids.
513                            IdList::AllIds
514                        }
515                        (IdList::AllIds, IdList::AllIds) => IdList::AllIds,
516                    };
517                }
518
519                // What state is the final cand idl in?
520                let setplan = match cand_idl {
521                    IdList::Indexed(_) => FilterPlan::AndIndexed(plan),
522                    IdList::Partial(_) | IdList::PartialThreshold(_) => {
523                        FilterPlan::AndPartial(plan)
524                    }
525                    IdList::AllIds => FilterPlan::AndUnindexed(plan),
526                };
527
528                // Finally, return the result.
529                // debug!("final cand set ==> {:?}", cand_idl);
530                (cand_idl, setplan)
531            } // end and
532            FilterResolved::Inclusion(l, _) => {
533                // For inclusion to be valid, every term must have *at least* one element present.
534                // This really relies on indexing, and so it's internal only - generally only
535                // for fully indexed existence queries, such as from refint.
536
537                // This has a lot in common with an And and Or but not really quite either.
538                let mut plan = Vec::with_capacity(0);
539                let mut result = IDLBitRange::new();
540                // For each filter in l
541                for f in l.iter() {
542                    // get their idls
543                    match self.filter2idl(f, thres)? {
544                        (IdList::Indexed(idl), fp) => {
545                            plan.push(fp);
546                            if idl.is_empty() {
547                                // It's empty, so something is missing. Bail fast.
548                                filter_trace!("Inclusion is unable to proceed - an empty (missing) item was found!");
549                                let setplan = FilterPlan::InclusionIndexed(plan);
550                                return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
551                            } else {
552                                result = result | idl;
553                            }
554                        }
555                        (_, fp) => {
556                            plan.push(fp);
557                            let setplan = FilterPlan::InclusionInvalid(plan);
558                            error!(
559                                ?setplan,
560                                "Inclusion is unable to proceed - all terms must be fully indexed!"
561                            );
562                            return Ok((IdList::Partial(IDLBitRange::new()), setplan));
563                        }
564                    }
565                } // end or.iter()
566                  // If we got here, every term must have been indexed
567                let setplan = FilterPlan::InclusionIndexed(plan);
568                (IdList::Indexed(result), setplan)
569            }
570            // So why does this return empty? Normally we actually process an AndNot in the context
571            // of an "AND" query, but if it's used anywhere else IE the root filter, then there is
572            // no other set to exclude - therefore it's empty set. Additionally, even in an OR query
573            // the AndNot will be skipped as an empty set for the same reason.
574            FilterResolved::AndNot(_f, _) => {
575                // get the idl for f
576                // now do andnot?
577                filter_error!("Requested a top level or isolated AndNot, returning empty");
578                (IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid)
579            }
580            FilterResolved::Invalid(_) => {
581                // Indexed since it is always false and we don't want to influence filter testing
582                (IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid)
583            }
584        })
585    }
586
587    fn filter2idl_sub(
588        &mut self,
589        attr: &Attribute,
590        sub_idx_key: String,
591    ) -> Result<(IdList, FilterPlan), OperationError> {
592        // Now given that idx_key, we will iterate over the possible graphemes.
593        let mut grapheme_iter = trigraph_iter(&sub_idx_key);
594
595        // Substrings are always partial because we have to split the keys up
596        // and we don't pay attention to starts/ends with conditions. We need
597        // the caller to check those conditions manually at run time. This lets
598        // the index focus on trigraph indexes only rather than needing to
599        // worry about those other bits. In a way substring indexes are "fuzzy".
600
601        let mut idl = match grapheme_iter.next() {
602            Some(idx_key) => {
603                match self
604                    .get_idlayer()
605                    .get_idl(attr, IndexType::SubString, idx_key)?
606                {
607                    Some(idl) => idl,
608                    None => return Ok((IdList::AllIds, FilterPlan::SubCorrupt(attr.clone()))),
609                }
610            }
611            None => {
612                // If there are no graphemes this means the attempt is for an empty string, so
613                // we return an empty result set.
614                return Ok((IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid));
615            }
616        };
617
618        if idl.len() > FILTER_SUBSTR_TEST_THRESHOLD {
619            for idx_key in grapheme_iter {
620                // Get the idl for this
621                match self
622                    .get_idlayer()
623                    .get_idl(attr, IndexType::SubString, idx_key)?
624                {
625                    Some(r_idl) => {
626                        // Do an *and* operation between what we found and our working idl.
627                        idl = r_idl & idl;
628                    }
629                    None => {
630                        // if something didn't match, then we simply bail out after zeroing the current IDL.
631                        idl = IDLBitRange::new();
632                    }
633                };
634
635                if idl.len() < FILTER_SUBSTR_TEST_THRESHOLD {
636                    break;
637                }
638            }
639        } else {
640            drop(grapheme_iter);
641        }
642
643        // We exhausted the grapheme iter, exit with what we found.
644        Ok((
645            IdList::Partial(idl),
646            FilterPlan::SubIndexed(attr.clone(), sub_idx_key),
647        ))
648    }
649
650    #[instrument(level = "debug", name = "be::search", skip_all)]
651    fn search(
652        &mut self,
653        erl: &Limits,
654        filt: &Filter<FilterValidResolved>,
655    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
656        // Unlike DS, even if we don't get the index back, we can just pass
657        // to the in-memory filter test and be done.
658
659        trace!(filter_optimised = ?filt);
660
661        let (idl, fplan) = trace_span!("be::search -> filter2idl")
662            .in_scope(|| self.filter2idl(filt.to_inner(), FILTER_SEARCH_TEST_THRESHOLD))?;
663
664        debug!(search_filter_executed_plan = %fplan);
665
666        match &idl {
667            IdList::AllIds => {
668                if !erl.unindexed_allow {
669                    admin_error!(
670                        "filter (search) is fully unindexed, and not allowed by resource limits"
671                    );
672                    return Err(OperationError::ResourceLimit);
673                }
674            }
675            IdList::Partial(idl_br) => {
676                // if idl_br.len() > erl.search_max_filter_test {
677                if !idl_br.below_threshold(erl.search_max_filter_test) {
678                    admin_error!("filter (search) is partial indexed and greater than search_max_filter_test allowed by resource limits");
679                    return Err(OperationError::ResourceLimit);
680                }
681            }
682            IdList::PartialThreshold(_) => {
683                // Since we opted for this, this is not the fault
684                // of the user and we should not penalise them by limiting on partial.
685            }
686            IdList::Indexed(idl_br) => {
687                // We know this is resolved here, so we can attempt the limit
688                // check. This has to fold the whole index, but you know, class=pres is
689                // indexed ...
690                // if idl_br.len() > erl.search_max_results {
691                if !idl_br.below_threshold(erl.search_max_results) {
692                    admin_error!("filter (search) is indexed and greater than search_max_results allowed by resource limits");
693                    return Err(OperationError::ResourceLimit);
694                }
695            }
696        };
697
698        let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
699            admin_error!(?e, "get_identry failed");
700            e
701        })?;
702
703        let mut entries_filtered = match idl {
704            IdList::AllIds => trace_span!("be::search<entry::ftest::allids>").in_scope(|| {
705                entries
706                    .into_iter()
707                    .filter(|e| e.entry_match_no_index(filt))
708                    .collect()
709            }),
710            IdList::Partial(_) => trace_span!("be::search<entry::ftest::partial>").in_scope(|| {
711                entries
712                    .into_iter()
713                    .filter(|e| e.entry_match_no_index(filt))
714                    .collect()
715            }),
716            IdList::PartialThreshold(_) => trace_span!("be::search<entry::ftest::thresh>")
717                .in_scope(|| {
718                    entries
719                        .into_iter()
720                        .filter(|e| e.entry_match_no_index(filt))
721                        .collect()
722                }),
723            // Since the index fully resolved, we can shortcut the filter test step here!
724            IdList::Indexed(_) => {
725                filter_trace!("filter (search) was fully indexed 👏");
726                entries
727            }
728        };
729
730        // If the idl was not indexed, apply the resource limit now. Avoid the needless match since the
731        // if statement is quick.
732        if entries_filtered.len() > erl.search_max_results {
733            admin_error!("filter (search) is resolved and greater than search_max_results allowed by resource limits");
734            return Err(OperationError::ResourceLimit);
735        }
736
737        // Trim any excess capacity if needed
738        entries_filtered.shrink_to_fit();
739
740        Ok(entries_filtered)
741    }
742
743    /// Given a filter, assert some condition exists.
744    /// Basically, this is a specialised case of search, where we don't need to
745    /// load any candidates if they match. This is heavily used in uuid
746    /// refint and attr uniqueness.
747    #[instrument(level = "debug", name = "be::exists", skip_all)]
748    fn exists(
749        &mut self,
750        erl: &Limits,
751        filt: &Filter<FilterValidResolved>,
752    ) -> Result<bool, OperationError> {
753        trace!(filter_optimised = ?filt);
754
755        // Using the indexes, resolve the IdList here, or AllIds.
756        // Also get if the filter was 100% resolved or not.
757        let (idl, fplan) = trace_span!("be::exists -> filter2idl")
758            .in_scope(|| self.filter2idl(filt.to_inner(), FILTER_EXISTS_TEST_THRESHOLD))?;
759
760        debug!(exist_filter_executed_plan = %fplan);
761
762        // Apply limits to the IdList.
763        match &idl {
764            IdList::AllIds => {
765                if !erl.unindexed_allow {
766                    admin_error!(
767                        "filter (exists) is fully unindexed, and not allowed by resource limits"
768                    );
769                    return Err(OperationError::ResourceLimit);
770                }
771            }
772            IdList::Partial(idl_br) => {
773                if !idl_br.below_threshold(erl.search_max_filter_test) {
774                    admin_error!("filter (exists) is partial indexed and greater than search_max_filter_test allowed by resource limits");
775                    return Err(OperationError::ResourceLimit);
776                }
777            }
778            IdList::PartialThreshold(_) => {
779                // Since we opted for this, this is not the fault
780                // of the user and we should not penalise them.
781            }
782            IdList::Indexed(_) => {}
783        }
784
785        // Now, check the idl -- if it's fully resolved, we can skip this because the query
786        // was fully indexed.
787        match &idl {
788            IdList::Indexed(idl) => Ok(!idl.is_empty()),
789            _ => {
790                let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
791                    admin_error!(?e, "get_identry failed");
792                    e
793                })?;
794
795                // if not 100% resolved query, apply the filter test.
796                let entries_filtered: Vec<_> =
797                    trace_span!("be::exists<entry::ftest>").in_scope(|| {
798                        entries
799                            .into_iter()
800                            .filter(|e| e.entry_match_no_index(filt))
801                            .collect()
802                    });
803
804                Ok(!entries_filtered.is_empty())
805            }
806        } // end match idl
807    }
808
809    fn retrieve_range(
810        &mut self,
811        ranges: &BTreeMap<Uuid, ReplCidRange>,
812    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
813        // First pass the ranges to the ruv to resolve to an absolute set of
814        // entry id's.
815
816        let idl = self.get_ruv().range_to_idl(ranges);
817        // Because of how this works, I think that it's not possible for the idl
818        // to have any missing ids.
819        //
820        // If it was possible, we could just & with allids to remove the extraneous
821        // values.
822
823        if idl.is_empty() {
824            // return no entries.
825            return Ok(Vec::with_capacity(0));
826        }
827
828        // Make it an id list fr the backend.
829        let id_list = IdList::Indexed(idl);
830
831        self.get_idlayer().get_identry(&id_list).map_err(|e| {
832            admin_error!(?e, "get_identry failed");
833            e
834        })
835    }
836
837    fn verify(&mut self) -> Vec<Result<(), ConsistencyError>> {
838        self.get_idlayer().verify()
839    }
840
841    fn verify_entry_index(&mut self, e: &EntrySealedCommitted) -> Result<(), ConsistencyError> {
842        // First, check our references in name2uuid, uuid2spn and uuid2rdn
843        if e.mask_recycled_ts().is_some() {
844            let e_uuid = e.get_uuid();
845            // We only check these on live entries.
846            let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(None, Some(e));
847
848            let (Some(n2u_set), None) = (n2u_add, n2u_rem) else {
849                admin_error!("Invalid idx_name2uuid_diff state");
850                return Err(ConsistencyError::BackendIndexSync);
851            };
852
853            // If the set.len > 1, check each item.
854            n2u_set
855                .iter()
856                .try_for_each(|name| match self.get_idlayer().name2uuid(name) {
857                    Ok(Some(idx_uuid)) => {
858                        if idx_uuid == e_uuid {
859                            Ok(())
860                        } else {
861                            admin_error!("Invalid name2uuid state -> incorrect uuid association");
862                            Err(ConsistencyError::BackendIndexSync)
863                        }
864                    }
865                    r => {
866                        admin_error!(state = ?r, "Invalid name2uuid state");
867                        Err(ConsistencyError::BackendIndexSync)
868                    }
869                })?;
870
871            let spn = e.get_uuid2spn();
872            match self.get_idlayer().uuid2spn(e_uuid) {
873                Ok(Some(idx_spn)) => {
874                    if spn != idx_spn {
875                        admin_error!("Invalid uuid2spn state -> incorrect idx spn value");
876                        return Err(ConsistencyError::BackendIndexSync);
877                    }
878                }
879                r => {
880                    admin_error!(state = ?r, ?e_uuid, "Invalid uuid2spn state");
881                    trace!(entry = ?e);
882                    return Err(ConsistencyError::BackendIndexSync);
883                }
884            };
885
886            let rdn = e.get_uuid2rdn();
887            match self.get_idlayer().uuid2rdn(e_uuid) {
888                Ok(Some(idx_rdn)) => {
889                    if rdn != idx_rdn {
890                        admin_error!("Invalid uuid2rdn state -> incorrect idx rdn value");
891                        return Err(ConsistencyError::BackendIndexSync);
892                    }
893                }
894                r => {
895                    admin_error!(state = ?r, "Invalid uuid2rdn state");
896                    return Err(ConsistencyError::BackendIndexSync);
897                }
898            };
899        }
900
901        // Check the other entry:attr indexes are valid
902        //
903        // This is actually pretty hard to check, because we can check a value *should*
904        // exist, but not that a value should NOT be present in the index. Thought needed ...
905
906        // Got here? Ok!
907        Ok(())
908    }
909
910    fn verify_indexes(&mut self) -> Vec<Result<(), ConsistencyError>> {
911        let idl = IdList::AllIds;
912        let entries = match self.get_idlayer().get_identry(&idl) {
913            Ok(s) => s,
914            Err(e) => {
915                admin_error!(?e, "get_identry failure");
916                return vec![Err(ConsistencyError::Unknown)];
917            }
918        };
919
920        let r = entries.iter().try_for_each(|e| self.verify_entry_index(e));
921
922        if r.is_err() {
923            vec![r]
924        } else {
925            Vec::with_capacity(0)
926        }
927    }
928
929    fn verify_ruv(&mut self, results: &mut Vec<Result<(), ConsistencyError>>) {
930        // The way we verify this is building a whole second RUV and then comparing it.
931        let idl = IdList::AllIds;
932        let entries = match self.get_idlayer().get_identry(&idl) {
933            Ok(ent) => ent,
934            Err(e) => {
935                results.push(Err(ConsistencyError::Unknown));
936                admin_error!(?e, "get_identry failed");
937                return;
938            }
939        };
940
941        self.get_ruv().verify(&entries, results);
942    }
943
944    fn backup(&mut self, dst_path: &Path) -> Result<(), OperationError> {
945        let repl_meta = self.get_ruv().to_db_backup_ruv();
946
947        // load all entries into RAM, may need to change this later
948        // if the size of the database compared to RAM is an issue
949        let idl = IdList::AllIds;
950        let idlayer = self.get_idlayer();
951        let raw_entries: Vec<IdRawEntry> = idlayer.get_identry_raw(&idl)?;
952
953        let entries: Result<Vec<DbEntry>, _> = raw_entries
954            .iter()
955            .map(|id_ent| {
956                serde_json::from_slice(id_ent.data.as_slice())
957                    .map_err(|_| OperationError::SerdeJsonError) // log?
958            })
959            .collect();
960
961        let entries = entries?;
962
963        let db_s_uuid = idlayer
964            .get_db_s_uuid()
965            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
966        let db_d_uuid = idlayer
967            .get_db_d_uuid()
968            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
969        let db_ts_max = idlayer
970            .get_db_ts_max()
971            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
972
973        let keyhandles = idlayer.get_key_handles()?;
974
975        let bak = DbBackup::V5 {
976            // remember env is evaled at compile time.
977            version: env!("KANIDM_PKG_SERIES").to_string(),
978            db_s_uuid,
979            db_d_uuid,
980            db_ts_max,
981            keyhandles,
982            repl_meta,
983            entries,
984        };
985
986        let serialized_entries_str = serde_json::to_string(&bak).map_err(|e| {
987            admin_error!(?e, "serde error");
988            OperationError::SerdeJsonError
989        })?;
990
991        fs::write(dst_path, serialized_entries_str)
992            .map(|_| ())
993            .map_err(|e| {
994                admin_error!(?e, "fs::write error");
995                OperationError::FsError
996            })
997    }
998
999    fn name2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
1000        self.get_idlayer().name2uuid(name)
1001    }
1002
1003    fn externalid2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
1004        self.get_idlayer().externalid2uuid(name)
1005    }
1006
1007    fn uuid2spn(&mut self, uuid: Uuid) -> Result<Option<Value>, OperationError> {
1008        self.get_idlayer().uuid2spn(uuid)
1009    }
1010
1011    fn uuid2rdn(&mut self, uuid: Uuid) -> Result<Option<String>, OperationError> {
1012        self.get_idlayer().uuid2rdn(uuid)
1013    }
1014}
1015
1016impl<'a> BackendTransaction for BackendReadTransaction<'a> {
1017    type IdlLayerType = IdlArcSqliteReadTransaction<'a>;
1018    type RuvType = ReplicationUpdateVectorReadTransaction<'a>;
1019
1020    fn get_idlayer(&mut self) -> &mut IdlArcSqliteReadTransaction<'a> {
1021        &mut self.idlayer
1022    }
1023
1024    fn get_ruv(&mut self) -> &mut ReplicationUpdateVectorReadTransaction<'a> {
1025        &mut self.ruv
1026    }
1027
1028    fn get_idxmeta_ref(&self) -> &IdxMeta {
1029        &self.idxmeta
1030    }
1031}
1032
1033impl BackendReadTransaction<'_> {
1034    pub fn list_indexes(&mut self) -> Result<Vec<String>, OperationError> {
1035        self.get_idlayer().list_idxs()
1036    }
1037
1038    pub fn list_id2entry(&mut self) -> Result<Vec<(u64, String)>, OperationError> {
1039        self.get_idlayer().list_id2entry()
1040    }
1041
1042    pub fn list_index_content(
1043        &mut self,
1044        index_name: &str,
1045    ) -> Result<Vec<(String, IDLBitRange)>, OperationError> {
1046        self.get_idlayer().list_index_content(index_name)
1047    }
1048
1049    pub fn get_id2entry(&mut self, id: u64) -> Result<(u64, String), OperationError> {
1050        self.get_idlayer().get_id2entry(id)
1051    }
1052
1053    pub fn list_quarantined(&mut self) -> Result<Vec<(u64, String)>, OperationError> {
1054        self.get_idlayer().list_quarantined()
1055    }
1056}
1057
1058impl<'a> BackendTransaction for BackendWriteTransaction<'a> {
1059    type IdlLayerType = IdlArcSqliteWriteTransaction<'a>;
1060    type RuvType = ReplicationUpdateVectorWriteTransaction<'a>;
1061
1062    fn get_idlayer(&mut self) -> &mut IdlArcSqliteWriteTransaction<'a> {
1063        &mut self.idlayer
1064    }
1065
1066    fn get_ruv(&mut self) -> &mut ReplicationUpdateVectorWriteTransaction<'a> {
1067        &mut self.ruv
1068    }
1069
1070    fn get_idxmeta_ref(&self) -> &IdxMeta {
1071        &self.idxmeta_wr
1072    }
1073}
1074
1075impl<'a> BackendWriteTransaction<'a> {
1076    pub(crate) fn get_ruv_write(&mut self) -> &mut ReplicationUpdateVectorWriteTransaction<'a> {
1077        &mut self.ruv
1078    }
1079
1080    #[instrument(level = "debug", name = "be::create", skip_all)]
1081    pub fn create(
1082        &mut self,
1083        cid: &Cid,
1084        entries: Vec<EntrySealedNew>,
1085    ) -> Result<Vec<EntrySealedCommitted>, OperationError> {
1086        if entries.is_empty() {
1087            admin_error!("No entries provided to BE to create, invalid server call!");
1088            return Err(OperationError::EmptyRequest);
1089        }
1090
1091        // Check that every entry has a change associated
1092        // that matches the cid?
1093        entries.iter().try_for_each(|e| {
1094            if e.get_changestate().contains_tail_cid(cid) {
1095                Ok(())
1096            } else {
1097                admin_error!(
1098                    "Entry changelog does not contain a change related to this transaction"
1099                );
1100                Err(OperationError::ReplEntryNotChanged)
1101            }
1102        })?;
1103
1104        // Now, assign id's to all the new entries.
1105
1106        let mut id_max = self.idlayer.get_id2entry_max_id()?;
1107        let c_entries: Vec<_> = entries
1108            .into_iter()
1109            .map(|e| {
1110                id_max += 1;
1111                e.into_sealed_committed_id(id_max)
1112            })
1113            .collect();
1114
1115        // All good, lets update the RUV.
1116        // This auto compresses.
1117        let ruv_idl = IDLBitRange::from_iter(c_entries.iter().map(|e| e.get_id()));
1118
1119        // We don't need to skip this like in mod since creates always go to the ruv
1120        self.get_ruv().insert_change(cid, ruv_idl)?;
1121
1122        self.idlayer.write_identries(c_entries.iter())?;
1123
1124        self.idlayer.set_id2entry_max_id(id_max);
1125
1126        // Now update the indexes as required.
1127        for e in c_entries.iter() {
1128            self.entry_index(None, Some(e))?
1129        }
1130
1131        Ok(c_entries)
1132    }
1133
1134    #[instrument(level = "debug", name = "be::create", skip_all)]
1135    /// This is similar to create, but used in the replication path as it records all
1136    /// the CID's in the entry to the RUV, but without applying the current CID as
1137    /// a new value in the RUV. We *do not* want to apply the current CID in the RUV
1138    /// related to this entry as that could cause an infinite replication loop!
1139    pub fn refresh(
1140        &mut self,
1141        entries: Vec<EntrySealedNew>,
1142    ) -> Result<Vec<EntrySealedCommitted>, OperationError> {
1143        if entries.is_empty() {
1144            admin_error!("No entries provided to BE to create, invalid server call!");
1145            return Err(OperationError::EmptyRequest);
1146        }
1147
1148        // Assign id's to all the new entries.
1149        let mut id_max = self.idlayer.get_id2entry_max_id()?;
1150        let c_entries: Vec<_> = entries
1151            .into_iter()
1152            .map(|e| {
1153                id_max += 1;
1154                e.into_sealed_committed_id(id_max)
1155            })
1156            .collect();
1157
1158        self.idlayer.write_identries(c_entries.iter())?;
1159
1160        self.idlayer.set_id2entry_max_id(id_max);
1161
1162        // Update the RUV with all the changestates of the affected entries.
1163        for e in c_entries.iter() {
1164            self.get_ruv().update_entry_changestate(e)?;
1165        }
1166
1167        // Now update the indexes as required.
1168        for e in c_entries.iter() {
1169            self.entry_index(None, Some(e))?
1170        }
1171
1172        Ok(c_entries)
1173    }
1174
1175    #[instrument(level = "debug", name = "be::modify", skip_all)]
1176    pub fn modify(
1177        &mut self,
1178        cid: &Cid,
1179        pre_entries: &[Arc<EntrySealedCommitted>],
1180        post_entries: &[EntrySealedCommitted],
1181    ) -> Result<(), OperationError> {
1182        if post_entries.is_empty() || pre_entries.is_empty() {
1183            admin_error!("No entries provided to BE to modify, invalid server call!");
1184            return Err(OperationError::EmptyRequest);
1185        }
1186
1187        assert_eq!(post_entries.len(), pre_entries.len());
1188
1189        let post_entries_iter = post_entries.iter().filter(|e| {
1190            trace!(?cid);
1191            trace!(changestate = ?e.get_changestate());
1192            // If True - This means that at least one attribute that *is* replicated was changed
1193            // on this entry, so we need to update and add this to the RUV!
1194            //
1195            // If False - This means that the entry in question was updated but the changes are all
1196            // non-replicated so we DO NOT update the RUV here!
1197            e.get_changestate().contains_tail_cid(cid)
1198        });
1199
1200        // All good, lets update the RUV.
1201        // This auto compresses.
1202        let ruv_idl = IDLBitRange::from_iter(post_entries_iter.map(|e| e.get_id()));
1203
1204        if !ruv_idl.is_empty() {
1205            self.get_ruv().insert_change(cid, ruv_idl)?;
1206        }
1207
1208        // Now, given the list of id's, update them
1209        self.get_idlayer().write_identries(post_entries.iter())?;
1210
1211        // Finally, we now reindex all the changed entries. We do this by iterating and zipping
1212        // over the set, because we know the list is in the same order.
1213        pre_entries
1214            .iter()
1215            .zip(post_entries.iter())
1216            .try_for_each(|(pre, post)| self.entry_index(Some(pre.as_ref()), Some(post)))
1217    }
1218
1219    #[instrument(level = "debug", name = "be::incremental_prepare", skip_all)]
1220    pub fn incremental_prepare<'x>(
1221        &mut self,
1222        entry_meta: &[EntryIncrementalNew],
1223    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
1224        let mut ret_entries = Vec::with_capacity(entry_meta.len());
1225        let id_max_pre = self.idlayer.get_id2entry_max_id()?;
1226        let mut id_max = id_max_pre;
1227
1228        for ctx_ent in entry_meta.iter() {
1229            let ctx_ent_uuid = ctx_ent.get_uuid();
1230            let idx_key = ctx_ent_uuid.as_hyphenated().to_string();
1231
1232            let idl =
1233                self.get_idlayer()
1234                    .get_idl(&Attribute::Uuid, IndexType::Equality, &idx_key)?;
1235
1236            let entry = match idl {
1237                Some(idl) if idl.is_empty() => {
1238                    // Create the stub entry, we just need it to have an id number
1239                    // allocated.
1240                    id_max += 1;
1241
1242                    let stub_entry = Arc::new(EntrySealedCommitted::stub_sealed_committed_id(
1243                        id_max, ctx_ent,
1244                    ));
1245                    // Now, the stub entry needs to be indexed. If not, uuid2spn
1246                    // isn't created, so subsequent index diffs don't work correctly.
1247                    self.entry_index(None, Some(stub_entry.as_ref()))?;
1248
1249                    // Okay, entry ready to go.
1250                    stub_entry
1251                }
1252                Some(idl) if idl.len() == 1 => {
1253                    // Get the entry from this idl.
1254                    let mut entries = self
1255                        .get_idlayer()
1256                        .get_identry(&IdList::Indexed(idl))
1257                        .map_err(|e| {
1258                            admin_error!(?e, "get_identry failed");
1259                            e
1260                        })?;
1261
1262                    if let Some(entry) = entries.pop() {
1263                        // Return it.
1264                        entry
1265                    } else {
1266                        error!("Invalid entry state, index was unable to locate entry");
1267                        return Err(OperationError::InvalidDbState);
1268                    }
1269                    // Done, entry is ready to go
1270                }
1271                Some(idl) => {
1272                    // BUG - duplicate uuid!
1273                    error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must have only a single or no values. Contains {:?}", idl);
1274                    return Err(OperationError::InvalidDbState);
1275                }
1276                None => {
1277                    // BUG - corrupt index.
1278                    error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must be present");
1279                    return Err(OperationError::InvalidDbState);
1280                }
1281            };
1282
1283            ret_entries.push(entry);
1284        }
1285
1286        if id_max != id_max_pre {
1287            self.idlayer.set_id2entry_max_id(id_max);
1288        }
1289
1290        Ok(ret_entries)
1291    }
1292
1293    #[instrument(level = "debug", name = "be::incremental_apply", skip_all)]
1294    pub fn incremental_apply(
1295        &mut self,
1296        update_entries: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
1297        create_entries: Vec<EntrySealedNew>,
1298    ) -> Result<(), OperationError> {
1299        // For the values in create_cands, create these with similar code to the refresh
1300        // path.
1301        if !create_entries.is_empty() {
1302            // Assign id's to all the new entries.
1303            let mut id_max = self.idlayer.get_id2entry_max_id()?;
1304            let c_entries: Vec<_> = create_entries
1305                .into_iter()
1306                .map(|e| {
1307                    id_max += 1;
1308                    e.into_sealed_committed_id(id_max)
1309                })
1310                .collect();
1311
1312            self.idlayer.write_identries(c_entries.iter())?;
1313
1314            self.idlayer.set_id2entry_max_id(id_max);
1315
1316            // Update the RUV with all the changestates of the affected entries.
1317            for e in c_entries.iter() {
1318                self.get_ruv().update_entry_changestate(e)?;
1319            }
1320
1321            // Now update the indexes as required.
1322            for e in c_entries.iter() {
1323                self.entry_index(None, Some(e))?
1324            }
1325        }
1326
1327        // Otherwise this is a cid-less copy of modify.
1328        if !update_entries.is_empty() {
1329            self.get_idlayer()
1330                .write_identries(update_entries.iter().map(|(up, _)| up))?;
1331
1332            for (e, _) in update_entries.iter() {
1333                self.get_ruv().update_entry_changestate(e)?;
1334            }
1335
1336            for (post, pre) in update_entries.iter() {
1337                self.entry_index(Some(pre.as_ref()), Some(post))?
1338            }
1339        }
1340
1341        Ok(())
1342    }
1343
1344    #[instrument(level = "debug", name = "be::reap_tombstones", skip_all)]
1345    pub fn reap_tombstones(&mut self, cid: &Cid, trim_cid: &Cid) -> Result<usize, OperationError> {
1346        debug_assert!(cid > trim_cid);
1347        // Mark a new maximum for the RUV by inserting an empty change. This
1348        // is important to keep the changestate always advancing.
1349        self.get_ruv().insert_change(cid, IDLBitRange::default())?;
1350
1351        // We plan to clear the RUV up to this cid. So we need to build an IDL
1352        // of all the entries we need to examine.
1353        let idl = self.get_ruv().trim_up_to(trim_cid).map_err(|e| {
1354            admin_error!(
1355                ?e,
1356                "During tombstone cleanup, failed to trim RUV to {:?}",
1357                trim_cid
1358            );
1359            e
1360        })?;
1361
1362        let entries = self
1363            .get_idlayer()
1364            .get_identry(&IdList::Indexed(idl))
1365            .map_err(|e| {
1366                admin_error!(?e, "get_identry failed");
1367                e
1368            })?;
1369
1370        if entries.is_empty() {
1371            admin_debug!("No entries affected - reap_tombstones operation success");
1372            return Ok(0);
1373        }
1374
1375        // Now that we have a list of entries we need to partition them into
1376        // two sets. The entries that are tombstoned and ready to reap_tombstones, and
1377        // the entries that need to have their change logs trimmed.
1378        //
1379        // Remember, these tombstones can be reaped because they were tombstoned at time
1380        // point 'cid', and since we are now "past" that minimum cid, then other servers
1381        // will also be trimming these out.
1382        //
1383        // Note unlike a changelog impl, we don't need to trim changestates here. We
1384        // only need the RUV trimmed so that we know if other servers are laggin behind!
1385
1386        // What entries are tombstones and ready to be deleted?
1387
1388        let (tombstones, leftover): (Vec<_>, Vec<_>) = entries
1389            .into_iter()
1390            .partition(|e| e.get_changestate().can_delete(trim_cid));
1391
1392        let ruv_idls = self.get_ruv().ruv_idls();
1393
1394        // Assert that anything leftover still either is *alive* OR is a tombstone
1395        // and has entries in the RUV!
1396
1397        if !leftover
1398            .iter()
1399            .all(|e| e.get_changestate().is_live() || ruv_idls.contains(e.get_id()))
1400        {
1401            admin_error!("Left over entries may be orphaned due to missing RUV entries");
1402            return Err(OperationError::ReplInvalidRUVState);
1403        }
1404
1405        // Now setup to reap_tombstones the tombstones. Remember, in the post cleanup, it's could
1406        // now have been trimmed to a point we can purge them!
1407
1408        // Assert the id's exist on the entry.
1409        let id_list: IDLBitRange = tombstones.iter().map(|e| e.get_id()).collect();
1410
1411        // Ensure nothing here exists in the RUV index, else it means
1412        // we didn't trim properly, or some other state violation has occurred.
1413        if !((&ruv_idls & &id_list).is_empty()) {
1414            admin_error!("RUV still contains entries that are going to be removed.");
1415            return Err(OperationError::ReplInvalidRUVState);
1416        }
1417
1418        // Now, given the list of id's, reap_tombstones them.
1419        let sz = id_list.len();
1420        self.get_idlayer().delete_identry(id_list.into_iter())?;
1421
1422        // Finally, purge the indexes from the entries we removed. These still have
1423        // indexes due to class=tombstone.
1424        tombstones
1425            .iter()
1426            .try_for_each(|e| self.entry_index(Some(e), None))?;
1427
1428        Ok(sz)
1429    }
1430
1431    #[instrument(level = "debug", name = "be::update_idxmeta", skip_all)]
1432    pub fn update_idxmeta(&mut self, idxkeys: Vec<IdxKey>) -> Result<(), OperationError> {
1433        if self.is_idx_slopeyness_generated()? {
1434            trace!("Indexing slopes available");
1435        } else {
1436            warn!("No indexing slopes available. You should consider reindexing to generate these");
1437        };
1438
1439        // Setup idxkeys here. By default we set these all to "max slope" aka
1440        // all indexes are "equal" but also worse case unless analysed. If they
1441        // have been analysed, we can set the slope factor into here.
1442        let mut idxkeys = idxkeys
1443            .into_iter()
1444            .map(|k| self.get_idx_slope(&k).map(|slope| (k, slope)))
1445            .collect::<Result<Map<_, _>, _>>()?;
1446
1447        std::mem::swap(&mut self.idxmeta_wr.deref_mut().idxkeys, &mut idxkeys);
1448        Ok(())
1449    }
1450
1451    // Should take a mut index set, and then we write the whole thing back
1452    // in a single stripe.
1453    //
1454    // So we need a cache, which we load indexes into as we do ops, then we
1455    // modify them.
1456    //
1457    // At the end, we flush those cchange outs in a single run.
1458    // For create this is probably a
1459    // TODO: Can this be improved?
1460    #[allow(clippy::cognitive_complexity)]
1461    fn entry_index(
1462        &mut self,
1463        pre: Option<&EntrySealedCommitted>,
1464        post: Option<&EntrySealedCommitted>,
1465    ) -> Result<(), OperationError> {
1466        let (e_uuid, e_id, uuid_same) = match (pre, post) {
1467            (None, None) => {
1468                admin_error!("Invalid call to entry_index - no entries provided");
1469                return Err(OperationError::InvalidState);
1470            }
1471            (Some(pre), None) => {
1472                trace!("Attempting to remove entry indexes");
1473                (pre.get_uuid(), pre.get_id(), true)
1474            }
1475            (None, Some(post)) => {
1476                trace!("Attempting to create entry indexes");
1477                (post.get_uuid(), post.get_id(), true)
1478            }
1479            (Some(pre), Some(post)) => {
1480                trace!("Attempting to modify entry indexes");
1481                assert_eq!(pre.get_id(), post.get_id());
1482                (
1483                    post.get_uuid(),
1484                    post.get_id(),
1485                    pre.get_uuid() == post.get_uuid(),
1486                )
1487            }
1488        };
1489
1490        // Update the names/uuid maps. These have to mask out entries
1491        // that are recycled or tombstones, so these pretend as "deleted"
1492        // and can trigger correct actions.
1493
1494        let mask_pre = pre.and_then(|e| e.mask_recycled_ts());
1495        let mask_pre = if !uuid_same {
1496            // Okay, so if the uuids are different this is probably from
1497            // a replication conflict.  We can't just use the normal none/some
1498            // check from the Entry::idx functions as they only yield partial
1499            // changes. Because the uuid is changing, we have to treat pre
1500            // as a deleting entry, regardless of what state post is in.
1501            let uuid = mask_pre.map(|e| e.get_uuid()).ok_or_else(|| {
1502                admin_error!("Invalid entry state - possible memory corruption");
1503                OperationError::InvalidState
1504            })?;
1505
1506            let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(mask_pre, None);
1507            // There will never be content to add.
1508            assert!(n2u_add.is_none());
1509
1510            let (eid2u_add, eid2u_rem) = Entry::idx_externalid2uuid_diff(mask_pre, None);
1511            // There will never be content to add.
1512            assert!(eid2u_add.is_none());
1513
1514            let u2s_act = Entry::idx_uuid2spn_diff(mask_pre, None);
1515            let u2r_act = Entry::idx_uuid2rdn_diff(mask_pre, None);
1516
1517            trace!(?n2u_rem, ?eid2u_rem, ?u2s_act, ?u2r_act,);
1518
1519            // Write the changes out to the backend
1520            if let Some(rem) = n2u_rem {
1521                self.idlayer.write_name2uuid_rem(rem)?
1522            }
1523
1524            if let Some(rem) = eid2u_rem {
1525                self.idlayer.write_externalid2uuid_rem(rem)?
1526            }
1527
1528            match u2s_act {
1529                None => {}
1530                Some(Ok(k)) => self.idlayer.write_uuid2spn(uuid, Some(k))?,
1531                Some(Err(_)) => self.idlayer.write_uuid2spn(uuid, None)?,
1532            }
1533
1534            match u2r_act {
1535                None => {}
1536                Some(Ok(k)) => self.idlayer.write_uuid2rdn(uuid, Some(k))?,
1537                Some(Err(_)) => self.idlayer.write_uuid2rdn(uuid, None)?,
1538            }
1539            // Return none, mask_pre is now completed.
1540            None
1541        } else {
1542            // Return the state.
1543            mask_pre
1544        };
1545
1546        let mask_post = post.and_then(|e| e.mask_recycled_ts());
1547        let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(mask_pre, mask_post);
1548        let (eid2u_add, eid2u_rem) = Entry::idx_externalid2uuid_diff(mask_pre, mask_post);
1549
1550        let u2s_act = Entry::idx_uuid2spn_diff(mask_pre, mask_post);
1551        let u2r_act = Entry::idx_uuid2rdn_diff(mask_pre, mask_post);
1552
1553        trace!(
1554            ?n2u_add,
1555            ?n2u_rem,
1556            ?eid2u_add,
1557            ?eid2u_rem,
1558            ?u2s_act,
1559            ?u2r_act
1560        );
1561
1562        // Write the changes out to the backend
1563        if let Some(add) = n2u_add {
1564            self.idlayer.write_name2uuid_add(e_uuid, add)?
1565        }
1566        if let Some(rem) = n2u_rem {
1567            self.idlayer.write_name2uuid_rem(rem)?
1568        }
1569
1570        if let Some(add) = eid2u_add {
1571            self.idlayer.write_externalid2uuid_add(e_uuid, add)?
1572        }
1573        if let Some(rem) = eid2u_rem {
1574            self.idlayer.write_externalid2uuid_rem(rem)?
1575        }
1576
1577        match u2s_act {
1578            None => {}
1579            Some(Ok(k)) => self.idlayer.write_uuid2spn(e_uuid, Some(k))?,
1580            Some(Err(_)) => self.idlayer.write_uuid2spn(e_uuid, None)?,
1581        }
1582
1583        match u2r_act {
1584            None => {}
1585            Some(Ok(k)) => self.idlayer.write_uuid2rdn(e_uuid, Some(k))?,
1586            Some(Err(_)) => self.idlayer.write_uuid2rdn(e_uuid, None)?,
1587        }
1588
1589        // Extremely Cursed - Okay, we know that self.idxmeta will NOT be changed
1590        // in this function, but we need to borrow self as mut for the caches in
1591        // get_idl to work. As a result, this causes a double borrow. To work around
1592        // this we discard the lifetime on idxmeta, because we know that it will
1593        // remain constant for the life of the operation.
1594
1595        let idxmeta = unsafe { &(*(&self.idxmeta_wr.idxkeys as *const _)) };
1596
1597        let idx_diff = Entry::idx_diff(idxmeta, pre, post);
1598
1599        idx_diff.into_iter()
1600            .try_for_each(|act| {
1601                match act {
1602                    Ok((attr, itype, idx_key)) => {
1603                        trace!("Adding {:?} idx -> {:?}: {:?}", itype, attr, idx_key);
1604                        match self.idlayer.get_idl(attr, itype, &idx_key)? {
1605                            Some(mut idl) => {
1606                                idl.insert_id(e_id);
1607                                if cfg!(debug_assertions)
1608                                    && *attr == Attribute::Uuid && itype == IndexType::Equality {
1609                                        // This means a duplicate UUID has appeared in the index.
1610                                        if idl.len() > 1 {
1611                                            trace!(duplicate_idl = ?idl, ?idx_key);
1612                                        }
1613                                        debug_assert!(idl.len() <= 1);
1614                                }
1615                                self.idlayer.write_idl(attr, itype, &idx_key, &idl)
1616                            }
1617                            None => {
1618                                warn!(
1619                                    "WARNING: index {:?} {:?} was not found. YOU MUST REINDEX YOUR DATABASE",
1620                                    attr, itype
1621                                );
1622                                Ok(())
1623                            }
1624                        }
1625                    }
1626                    Err((attr, itype, idx_key)) => {
1627                        trace!("Removing {:?} idx -> {:?}: {:?}", itype, attr, idx_key);
1628                        match self.idlayer.get_idl(attr, itype, &idx_key)? {
1629                            Some(mut idl) => {
1630                                idl.remove_id(e_id);
1631                                if cfg!(debug_assertions) && *attr == Attribute::Uuid && itype == IndexType::Equality {
1632                                        // This means a duplicate UUID has appeared in the index.
1633                                        if idl.len() > 1 {
1634                                            trace!(duplicate_idl = ?idl, ?idx_key);
1635                                        }
1636                                        debug_assert!(idl.len() <= 1);
1637                                }
1638                                self.idlayer.write_idl(attr, itype, &idx_key, &idl)
1639                            }
1640                            None => {
1641                                warn!(
1642                                    "WARNING: index {:?} {:?} was not found. YOU MUST REINDEX YOUR DATABASE",
1643                                    attr, itype
1644                                );
1645                                Ok(())
1646                            }
1647                        }
1648                    }
1649                }
1650            })
1651        // End try_for_each
1652    }
1653
1654    #[allow(dead_code)]
1655    fn missing_idxs(&mut self) -> Result<Vec<(Attribute, IndexType)>, OperationError> {
1656        let idx_table_list = self.get_idlayer().list_idxs()?;
1657
1658        // Turn the vec to a real set
1659        let idx_table_set: HashSet<_> = idx_table_list.into_iter().collect();
1660
1661        let missing: Vec<_> = self
1662            .idxmeta_wr
1663            .idxkeys
1664            .keys()
1665            .filter_map(|ikey| {
1666                // what would the table name be?
1667                let tname = format!("idx_{}_{}", ikey.itype.as_idx_str(), ikey.attr.as_str());
1668                trace!("Checking for {}", tname);
1669
1670                if idx_table_set.contains(&tname) {
1671                    None
1672                } else {
1673                    Some((ikey.attr.clone(), ikey.itype))
1674                }
1675            })
1676            .collect();
1677        Ok(missing)
1678    }
1679
1680    fn create_idxs(&mut self) -> Result<(), OperationError> {
1681        // Create name2uuid and uuid2name
1682        trace!("Creating index -> name2uuid");
1683        self.idlayer.create_name2uuid()?;
1684
1685        trace!("Creating index -> externalid2uuid");
1686        self.idlayer.create_externalid2uuid()?;
1687
1688        trace!("Creating index -> uuid2spn");
1689        self.idlayer.create_uuid2spn()?;
1690
1691        trace!("Creating index -> uuid2rdn");
1692        self.idlayer.create_uuid2rdn()?;
1693
1694        self.idxmeta_wr
1695            .idxkeys
1696            .keys()
1697            .try_for_each(|ikey| self.idlayer.create_idx(&ikey.attr, ikey.itype))
1698    }
1699
1700    pub fn upgrade_reindex(&mut self, v: i64) -> Result<(), OperationError> {
1701        let dbv = self.get_db_index_version()?;
1702        admin_debug!(?dbv, ?v, "upgrade_reindex");
1703        if dbv < v {
1704            self.reindex(false)?;
1705            self.set_db_index_version(v)
1706        } else {
1707            Ok(())
1708        }
1709    }
1710
1711    #[instrument(level = "info", skip_all)]
1712    pub fn reindex(&mut self, immediate: bool) -> Result<(), OperationError> {
1713        let notice_immediate = immediate || (cfg!(not(test)) && cfg!(not(debug_assertions)));
1714
1715        info!(
1716            immediate = notice_immediate,
1717            "System reindex: started - this may take a long time!"
1718        );
1719
1720        // Purge the idxs
1721        self.idlayer.danger_purge_idxs()?;
1722
1723        // Using the index metadata on the txn, create all our idx tables
1724        self.create_idxs()?;
1725
1726        // Now, we need to iterate over everything in id2entry and index them
1727        // Future idea: Do this in batches of X amount to limit memory
1728        // consumption.
1729        let idl = IdList::AllIds;
1730        let entries = self.idlayer.get_identry(&idl).inspect_err(|err| {
1731            error!(?err, "get_identry failure");
1732        })?;
1733
1734        let mut count = 0;
1735
1736        // This is the longest phase of reindexing, so we have a "progress" display here.
1737        entries
1738            .iter()
1739            .try_for_each(|e| {
1740                if immediate {
1741                    count += 1;
1742                    if count % 2500 == 0 {
1743                        eprint!("{}", count);
1744                    } else if count % 250 == 0 {
1745                        eprint!(".");
1746                    }
1747                }
1748
1749                self.entry_index(None, Some(e))
1750            })
1751            .inspect_err(|err| {
1752                error!(?err, "reindex failed");
1753            })?;
1754
1755        if immediate {
1756            eprintln!(" done ✅");
1757        }
1758
1759        info!(immediate, "Reindexed {count} entries");
1760
1761        info!("Optimising Indexes: started");
1762        self.idlayer.optimise_dirty_idls();
1763        info!("Optimising Indexes: complete ✅");
1764        info!("Calculating Index Optimisation Slopes: started");
1765        self.idlayer.analyse_idx_slopes().inspect_err(|err| {
1766            error!(?err, "index optimisation failed");
1767        })?;
1768        info!("Calculating Index Optimisation Slopes: complete ✅");
1769        info!("System reindex: complete 🎉");
1770        Ok(())
1771    }
1772
1773    /// ⚠️  - This function will destroy all indexes in the database.
1774    ///
1775    /// It should only be called internally by the backend in limited and
1776    /// specific situations.
1777    fn danger_purge_idxs(&mut self) -> Result<(), OperationError> {
1778        self.get_idlayer().danger_purge_idxs()
1779    }
1780
1781    /// ⚠️  - This function will destroy all entries and indexes in the database.
1782    ///
1783    /// It should only be called internally by the backend in limited and
1784    /// specific situations.
1785    pub(crate) fn danger_delete_all_db_content(&mut self) -> Result<(), OperationError> {
1786        self.get_ruv().clear();
1787        self.get_idlayer()
1788            .danger_purge_id2entry()
1789            .and_then(|_| self.danger_purge_idxs())
1790    }
1791
1792    #[cfg(test)]
1793    pub fn load_test_idl(
1794        &mut self,
1795        attr: &Attribute,
1796        itype: IndexType,
1797        idx_key: &str,
1798    ) -> Result<Option<IDLBitRange>, OperationError> {
1799        self.get_idlayer().get_idl(attr, itype, idx_key)
1800    }
1801
1802    fn is_idx_slopeyness_generated(&mut self) -> Result<bool, OperationError> {
1803        self.get_idlayer().is_idx_slopeyness_generated()
1804    }
1805
1806    fn get_idx_slope(&mut self, ikey: &IdxKey) -> Result<IdxSlope, OperationError> {
1807        // Do we have the slopeyness?
1808        let slope = self
1809            .get_idlayer()
1810            .get_idx_slope(ikey)?
1811            .unwrap_or_else(|| get_idx_slope_default(ikey));
1812        trace!("index slope - {:?} -> {:?}", ikey, slope);
1813        Ok(slope)
1814    }
1815
1816    pub fn restore(&mut self, src_path: &Path) -> Result<(), OperationError> {
1817        let serialized_string = fs::read_to_string(src_path).map_err(|e| {
1818            admin_error!("fs::read_to_string {:?}", e);
1819            OperationError::FsError
1820        })?;
1821
1822        self.danger_delete_all_db_content().map_err(|e| {
1823            admin_error!("delete_all_db_content failed {:?}", e);
1824            e
1825        })?;
1826
1827        let idlayer = self.get_idlayer();
1828        // load all entries into RAM, may need to change this later
1829        // if the size of the database compared to RAM is an issue
1830
1831        let dbbak_option: Result<DbBackup, serde_json::Error> =
1832            serde_json::from_str(&serialized_string);
1833
1834        let dbbak = dbbak_option.map_err(|e| {
1835            admin_error!("serde_json error {:?}", e);
1836            OperationError::SerdeJsonError
1837        })?;
1838
1839        let (dbentries, repl_meta, maybe_version) = match dbbak {
1840            DbBackup::V1(dbentries) => (dbentries, None, None),
1841            DbBackup::V2 {
1842                db_s_uuid,
1843                db_d_uuid,
1844                db_ts_max,
1845                entries,
1846            } => {
1847                // Do stuff.
1848                idlayer.write_db_s_uuid(db_s_uuid)?;
1849                idlayer.write_db_d_uuid(db_d_uuid)?;
1850                idlayer.set_db_ts_max(db_ts_max)?;
1851                (entries, None, None)
1852            }
1853            DbBackup::V3 {
1854                db_s_uuid,
1855                db_d_uuid,
1856                db_ts_max,
1857                keyhandles,
1858                entries,
1859            } => {
1860                // Do stuff.
1861                idlayer.write_db_s_uuid(db_s_uuid)?;
1862                idlayer.write_db_d_uuid(db_d_uuid)?;
1863                idlayer.set_db_ts_max(db_ts_max)?;
1864                idlayer.set_key_handles(keyhandles)?;
1865                (entries, None, None)
1866            }
1867            DbBackup::V4 {
1868                db_s_uuid,
1869                db_d_uuid,
1870                db_ts_max,
1871                keyhandles,
1872                repl_meta,
1873                entries,
1874            } => {
1875                // Do stuff.
1876                idlayer.write_db_s_uuid(db_s_uuid)?;
1877                idlayer.write_db_d_uuid(db_d_uuid)?;
1878                idlayer.set_db_ts_max(db_ts_max)?;
1879                idlayer.set_key_handles(keyhandles)?;
1880                (entries, Some(repl_meta), None)
1881            }
1882            DbBackup::V5 {
1883                version,
1884                db_s_uuid,
1885                db_d_uuid,
1886                db_ts_max,
1887                keyhandles,
1888                repl_meta,
1889                entries,
1890            } => {
1891                // Do stuff.
1892                idlayer.write_db_s_uuid(db_s_uuid)?;
1893                idlayer.write_db_d_uuid(db_d_uuid)?;
1894                idlayer.set_db_ts_max(db_ts_max)?;
1895                idlayer.set_key_handles(keyhandles)?;
1896                (entries, Some(repl_meta), Some(version))
1897            }
1898        };
1899
1900        if let Some(version) = maybe_version {
1901            if version != env!("KANIDM_PKG_SERIES") {
1902                error!("The provided backup data is from server version {} and is unable to be restored on this instance ({})", version, env!("KANIDM_PKG_SERIES"));
1903                return Err(OperationError::DB0001MismatchedRestoreVersion);
1904            }
1905        } else {
1906            error!("The provided backup data is from an older server version and is unable to be restored.");
1907            return Err(OperationError::DB0002MismatchedRestoreVersion);
1908        };
1909
1910        // Rebuild the RUV from the backup.
1911        match repl_meta {
1912            Some(DbReplMeta::V1 { ruv: db_ruv }) => {
1913                self.get_ruv()
1914                    .restore(db_ruv.into_iter().map(|db_cid| db_cid.into()))?;
1915            }
1916            None => {
1917                warn!("Unable to restore replication metadata, this server may need a refresh.");
1918            }
1919        }
1920
1921        info!("Restoring {} entries ...", dbentries.len());
1922
1923        // Now, we setup all the entries with new ids.
1924        let mut id_max = 0;
1925        let identries: Result<Vec<IdRawEntry>, _> = dbentries
1926            .iter()
1927            .map(|e| {
1928                id_max += 1;
1929                let data = serde_json::to_vec(&e).map_err(|_| OperationError::SerdeCborError)?;
1930                Ok(IdRawEntry { id: id_max, data })
1931            })
1932            .collect();
1933
1934        let idlayer = self.get_idlayer();
1935
1936        idlayer.write_identries_raw(identries?.into_iter())?;
1937
1938        info!("Restored {} entries", dbentries.len());
1939
1940        let vr = self.verify();
1941        if vr.is_empty() {
1942            Ok(())
1943        } else {
1944            Err(OperationError::ConsistencyError(
1945                vr.into_iter().filter_map(|v| v.err()).collect(),
1946            ))
1947        }
1948    }
1949
1950    /// If any RUV elements are present in the DB, load them now. This provides us with
1951    /// the RUV boundaries and change points from previous operations of the server, so
1952    /// that ruv_rebuild can "fill in" the gaps.
1953    ///
1954    /// # SAFETY
1955    ///
1956    /// Note that you should only call this function during the server startup
1957    /// to reload the RUV data from the entries of the database.
1958    ///
1959    /// Before calling this, the in memory ruv MUST be clear.
1960    #[instrument(level = "debug", name = "be::ruv_rebuild", skip_all)]
1961    fn ruv_reload(&mut self) -> Result<(), OperationError> {
1962        let idlayer = self.get_idlayer();
1963
1964        let db_ruv = idlayer.get_db_ruv()?;
1965
1966        // Setup the CID's that existed previously. We don't need to know what entries
1967        // they affect, we just need them to ensure that we have ranges for replication
1968        // comparison to take effect properly.
1969        self.get_ruv().restore(db_ruv)?;
1970
1971        // Then populate the RUV with the data from the entries.
1972        self.ruv_rebuild()
1973    }
1974
1975    #[instrument(level = "debug", name = "be::ruv_rebuild", skip_all)]
1976    fn ruv_rebuild(&mut self) -> Result<(), OperationError> {
1977        // Rebuild the ruv!
1978        // For now this has to read from all the entries in the DB, but in the future
1979        // we'll actually store this properly (?). If it turns out this is really fast
1980        // we may just rebuild this always on startup.
1981
1982        // NOTE: An important detail is that we don't rely on indexes here!
1983
1984        let idl = IdList::AllIds;
1985        let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
1986            admin_error!(?e, "get_identry failed");
1987            e
1988        })?;
1989
1990        self.get_ruv().rebuild(&entries)?;
1991
1992        Ok(())
1993    }
1994
1995    pub fn quarantine_entry(&mut self, id: u64) -> Result<(), OperationError> {
1996        self.get_idlayer().quarantine_entry(id)?;
1997        // We have to set the index version to 0 so that on next start we force
1998        // a reindex to automatically occur.
1999        self.set_db_index_version(0)
2000    }
2001
2002    pub fn restore_quarantined(&mut self, id: u64) -> Result<(), OperationError> {
2003        self.get_idlayer().restore_quarantined(id)?;
2004        // We have to set the index version to 0 so that on next start we force
2005        // a reindex to automatically occur.
2006        self.set_db_index_version(0)
2007    }
2008
2009    #[cfg(any(test, debug_assertions))]
2010    pub fn clear_cache(&mut self) -> Result<(), OperationError> {
2011        self.get_idlayer().clear_cache()
2012    }
2013
2014    pub fn commit(self) -> Result<(), OperationError> {
2015        let BackendWriteTransaction {
2016            mut idlayer,
2017            idxmeta_wr,
2018            ruv,
2019        } = self;
2020
2021        // write the ruv content back to the db.
2022        idlayer.write_db_ruv(ruv.added(), ruv.removed())?;
2023
2024        idlayer.commit().map(|()| {
2025            ruv.commit();
2026            idxmeta_wr.commit();
2027        })
2028    }
2029
2030    pub(crate) fn reset_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
2031        // The value is missing. Generate a new one and store it.
2032        let nsid = Uuid::new_v4();
2033        self.get_idlayer().write_db_s_uuid(nsid).map_err(|err| {
2034            error!(?err, "Unable to persist server uuid");
2035            err
2036        })?;
2037        Ok(nsid)
2038    }
2039    pub fn get_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
2040        let res = self.get_idlayer().get_db_s_uuid().map_err(|err| {
2041            error!(?err, "Failed to read server uuid");
2042            err
2043        })?;
2044        match res {
2045            Some(s_uuid) => Ok(s_uuid),
2046            None => self.reset_db_s_uuid(),
2047        }
2048    }
2049
2050    /// This generates a new domain UUID and stores it into the database,
2051    /// returning the new UUID
2052    fn reset_db_d_uuid(&mut self) -> Result<Uuid, OperationError> {
2053        let nsid = Uuid::new_v4();
2054        self.get_idlayer().write_db_d_uuid(nsid).map_err(|err| {
2055            error!(?err, "Unable to persist domain uuid");
2056            err
2057        })?;
2058        Ok(nsid)
2059    }
2060
2061    /// Manually set a new domain UUID and store it into the DB. This is used
2062    /// as part of a replication refresh.
2063    pub fn set_db_d_uuid(&mut self, nsid: Uuid) -> Result<(), OperationError> {
2064        self.get_idlayer().write_db_d_uuid(nsid)
2065    }
2066
2067    /// This pulls the domain UUID from the database
2068    pub fn get_db_d_uuid(&mut self) -> Result<Uuid, OperationError> {
2069        let res = self.get_idlayer().get_db_d_uuid().map_err(|err| {
2070            error!(?err, "Failed to read domain uuid");
2071            err
2072        })?;
2073        match res {
2074            Some(d_uuid) => Ok(d_uuid),
2075            None => self.reset_db_d_uuid(),
2076        }
2077    }
2078
2079    pub fn set_db_ts_max(&mut self, ts: Duration) -> Result<(), OperationError> {
2080        self.get_idlayer().set_db_ts_max(ts)
2081    }
2082
2083    pub fn get_db_ts_max(&mut self, ts: Duration) -> Result<Duration, OperationError> {
2084        // if none, return ts. If found, return it.
2085        match self.get_idlayer().get_db_ts_max()? {
2086            Some(dts) => Ok(dts),
2087            None => Ok(ts),
2088        }
2089    }
2090
2091    fn get_db_index_version(&mut self) -> Result<i64, OperationError> {
2092        self.get_idlayer().get_db_index_version()
2093    }
2094
2095    fn set_db_index_version(&mut self, v: i64) -> Result<(), OperationError> {
2096        self.get_idlayer().set_db_index_version(v)
2097    }
2098}
2099
2100// We have a number of hardcoded, "obvious" slopes that should
2101// exist. We return these when the analysis has not been run, as
2102// these are values that are generally "good enough" for most applications
2103fn get_idx_slope_default(ikey: &IdxKey) -> IdxSlope {
2104    match (ikey.attr.as_str(), &ikey.itype) {
2105        (ATTR_NAME, IndexType::Equality)
2106        | (ATTR_SPN, IndexType::Equality)
2107        | (ATTR_UUID, IndexType::Equality) => 1,
2108        (ATTR_CLASS, IndexType::Equality) => 180,
2109        (_, IndexType::Equality) => 45,
2110        (_, IndexType::SubString) => 90,
2111        (_, IndexType::Presence) => 90,
2112    }
2113}
2114
2115// In the future this will do the routing between the chosen backends etc.
2116impl Backend {
2117    #[instrument(level = "debug", name = "be::new", skip_all)]
2118    pub fn new(
2119        mut cfg: BackendConfig,
2120        // path: &str,
2121        // mut pool_size: u32,
2122        // fstype: FsType,
2123        idxkeys: Vec<IdxKey>,
2124        vacuum: bool,
2125    ) -> Result<Self, OperationError> {
2126        debug!(db_tickets = ?cfg.pool_size, profile = %env!("KANIDM_PROFILE_NAME"), cpu_flags = %env!("KANIDM_CPU_FLAGS"));
2127
2128        // If in memory, reduce pool to 1
2129        if cfg.path.as_os_str().is_empty() {
2130            cfg.pool_size = 1;
2131        }
2132
2133        // Setup idxkeys here. By default we set these all to "max slope" aka
2134        // all indexes are "equal" but also worse case unless analysed.
2135        //
2136        // During startup this will be "fixed" as the schema core will call reload_idxmeta
2137        // which will trigger a reload of the analysis data (if present).
2138        let idxkeys: Map<_, _> = idxkeys
2139            .into_iter()
2140            .map(|ikey| {
2141                let slope = get_idx_slope_default(&ikey);
2142                (ikey, slope)
2143            })
2144            .collect();
2145
2146        // Load the replication update vector here. Initially we build an in memory
2147        // RUV, and then we load it from the DB.
2148        let ruv = Arc::new(ReplicationUpdateVector::default());
2149
2150        // this has a ::memory() type, but will path == "" work?
2151        let idlayer = Arc::new(IdlArcSqlite::new(&cfg, vacuum)?);
2152        let be = Backend {
2153            cfg,
2154            idlayer,
2155            ruv,
2156            idxmeta: Arc::new(CowCell::new(IdxMeta::new(idxkeys))),
2157        };
2158
2159        // Now complete our setup with a txn
2160        // In this case we can use an empty idx meta because we don't
2161        // access any parts of
2162        // the indexing subsystem here.
2163        let mut idl_write = be.idlayer.write()?;
2164        idl_write
2165            .setup()
2166            .and_then(|_| idl_write.commit())
2167            .map_err(|e| {
2168                admin_error!(?e, "Failed to setup idlayer");
2169                e
2170            })?;
2171
2172        // Now rebuild the ruv.
2173        let mut be_write = be.write()?;
2174        be_write
2175            .ruv_reload()
2176            .and_then(|_| be_write.commit())
2177            .map_err(|e| {
2178                admin_error!(?e, "Failed to reload ruv");
2179                e
2180            })?;
2181
2182        Ok(be)
2183    }
2184
2185    pub fn get_pool_size(&self) -> u32 {
2186        debug_assert!(self.cfg.pool_size > 0);
2187        self.cfg.pool_size
2188    }
2189
2190    pub fn try_quiesce(&self) {
2191        self.idlayer.try_quiesce();
2192    }
2193
2194    pub fn read(&self) -> Result<BackendReadTransaction, OperationError> {
2195        Ok(BackendReadTransaction {
2196            idlayer: self.idlayer.read()?,
2197            idxmeta: self.idxmeta.read(),
2198            ruv: self.ruv.read(),
2199        })
2200    }
2201
2202    pub fn write(&self) -> Result<BackendWriteTransaction, OperationError> {
2203        Ok(BackendWriteTransaction {
2204            idlayer: self.idlayer.write()?,
2205            idxmeta_wr: self.idxmeta.write(),
2206            ruv: self.ruv.write(),
2207        })
2208    }
2209}
2210
2211// What are the possible actions we'll receive here?
2212
2213#[cfg(test)]
2214mod tests {
2215    use super::super::entry::{Entry, EntryInit, EntryNew};
2216    use super::Limits;
2217    use super::{
2218        Backend, BackendConfig, BackendTransaction, BackendWriteTransaction, DbBackup, IdList,
2219        IdxKey, OperationError,
2220    };
2221    use crate::prelude::*;
2222    use crate::repl::cid::Cid;
2223    use crate::value::{IndexType, PartialValue, Value};
2224    use idlset::v2::IDLBitRange;
2225    use std::fs;
2226    use std::iter::FromIterator;
2227    use std::path::Path;
2228    use std::sync::Arc;
2229    use std::time::Duration;
2230
2231    lazy_static! {
2232        static ref CID_ZERO: Cid = Cid::new_zero();
2233        static ref CID_ONE: Cid = Cid::new_count(1);
2234        static ref CID_TWO: Cid = Cid::new_count(2);
2235        static ref CID_THREE: Cid = Cid::new_count(3);
2236        static ref CID_ADV: Cid = Cid::new_count(10);
2237    }
2238
2239    macro_rules! run_test {
2240        ($test_fn:expr) => {{
2241            sketching::test_init();
2242
2243            // This is a demo idxmeta, purely for testing.
2244            let idxmeta = vec![
2245                IdxKey {
2246                    attr: Attribute::Name.into(),
2247                    itype: IndexType::Equality,
2248                },
2249                IdxKey {
2250                    attr: Attribute::Name.into(),
2251                    itype: IndexType::Presence,
2252                },
2253                IdxKey {
2254                    attr: Attribute::Name.into(),
2255                    itype: IndexType::SubString,
2256                },
2257                IdxKey {
2258                    attr: Attribute::Uuid.into(),
2259                    itype: IndexType::Equality,
2260                },
2261                IdxKey {
2262                    attr: Attribute::Uuid.into(),
2263                    itype: IndexType::Presence,
2264                },
2265                IdxKey {
2266                    attr: Attribute::TestAttr.into(),
2267                    itype: IndexType::Equality,
2268                },
2269                IdxKey {
2270                    attr: Attribute::TestNumber.into(),
2271                    itype: IndexType::Equality,
2272                },
2273            ];
2274
2275            let be = Backend::new(BackendConfig::new_test("main"), idxmeta, false)
2276                .expect("Failed to setup backend");
2277
2278            let mut be_txn = be.write().unwrap();
2279
2280            let r = $test_fn(&mut be_txn);
2281            // Commit, to guarantee it worked.
2282            assert!(be_txn.commit().is_ok());
2283            r
2284        }};
2285    }
2286
2287    macro_rules! entry_exists {
2288        ($be:expr, $ent:expr) => {{
2289            let ei = $ent.clone().into_sealed_committed();
2290            let filt = ei
2291                .filter_from_attrs(&[Attribute::Uuid.into()])
2292                .expect("failed to generate filter")
2293                .into_valid_resolved();
2294            let lims = Limits::unlimited();
2295            let entries = $be.search(&lims, &filt).expect("failed to search");
2296            entries.first().is_some()
2297        }};
2298    }
2299
2300    macro_rules! entry_attr_pres {
2301        ($be:expr, $ent:expr, $attr:expr) => {{
2302            let ei = $ent.clone().into_sealed_committed();
2303            let filt = ei
2304                .filter_from_attrs(&[Attribute::UserId.into()])
2305                .expect("failed to generate filter")
2306                .into_valid_resolved();
2307            let lims = Limits::unlimited();
2308            let entries = $be.search(&lims, &filt).expect("failed to search");
2309            match entries.first() {
2310                Some(ent) => ent.attribute_pres($attr),
2311                None => false,
2312            }
2313        }};
2314    }
2315
2316    macro_rules! idl_state {
2317        ($be:expr, $attr:expr, $itype:expr, $idx_key:expr, $expect:expr) => {{
2318            let t_idl = $be
2319                .load_test_idl(&$attr, $itype, &$idx_key.to_string())
2320                .expect("IdList Load failed");
2321            let t = $expect.map(|v: Vec<u64>| IDLBitRange::from_iter(v));
2322            assert_eq!(t_idl, t);
2323        }};
2324    }
2325
2326    #[test]
2327    fn test_be_simple_create() {
2328        run_test!(|be: &mut BackendWriteTransaction| {
2329            trace!("Simple Create");
2330
2331            let empty_result = be.create(&CID_ZERO, Vec::with_capacity(0));
2332            trace!("{:?}", empty_result);
2333            assert_eq!(empty_result, Err(OperationError::EmptyRequest));
2334
2335            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2336            e.add_ava(Attribute::UserId, Value::from("william"));
2337            e.add_ava(
2338                Attribute::Uuid,
2339                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2340            );
2341            let e = e.into_sealed_new();
2342
2343            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
2344
2345            assert!(single_result.is_ok());
2346
2347            // Construct a filter
2348            assert!(entry_exists!(be, e));
2349        });
2350    }
2351
2352    #[test]
2353    fn test_be_simple_search() {
2354        run_test!(|be: &mut BackendWriteTransaction| {
2355            trace!("Simple Search");
2356
2357            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2358            e.add_ava(Attribute::UserId, Value::from("claire"));
2359            e.add_ava(
2360                Attribute::Uuid,
2361                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2362            );
2363            let e = e.into_sealed_new();
2364
2365            let single_result = be.create(&CID_ZERO, vec![e]);
2366            assert!(single_result.is_ok());
2367            // Test a simple EQ search
2368
2369            let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("claire")));
2370
2371            let lims = Limits::unlimited();
2372
2373            let r = be.search(&lims, &filt);
2374            assert!(r.expect("Search failed!").len() == 1);
2375
2376            // Test empty search
2377
2378            // Test class pres
2379
2380            // Search with no results
2381        });
2382    }
2383
2384    #[test]
2385    fn test_be_search_with_invalid() {
2386        run_test!(|be: &mut BackendWriteTransaction| {
2387            trace!("Simple Search");
2388
2389            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2390            e.add_ava(Attribute::UserId, Value::from("bagel"));
2391            e.add_ava(
2392                Attribute::Uuid,
2393                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2394            );
2395            let e = e.into_sealed_new();
2396
2397            let single_result = be.create(&CID_ZERO, vec![e]);
2398            assert!(single_result.is_ok());
2399
2400            // Test Search with or condition including invalid attribute
2401            let filt = filter_resolved!(f_or(vec![
2402                f_eq(Attribute::UserId, PartialValue::new_utf8s("bagel")),
2403                f_invalid(Attribute::UserId)
2404            ]));
2405
2406            let lims = Limits::unlimited();
2407
2408            let r = be.search(&lims, &filt);
2409            assert!(r.expect("Search failed!").len() == 1);
2410
2411            // Test Search with or condition including invalid attribute
2412            let filt = filter_resolved!(f_and(vec![
2413                f_eq(Attribute::UserId, PartialValue::new_utf8s("bagel")),
2414                f_invalid(Attribute::UserId)
2415            ]));
2416
2417            let lims = Limits::unlimited();
2418
2419            let r = be.search(&lims, &filt);
2420            assert!(r.expect("Search failed!").is_empty());
2421        });
2422    }
2423
2424    #[test]
2425    fn test_be_simple_modify() {
2426        run_test!(|be: &mut BackendWriteTransaction| {
2427            trace!("Simple Modify");
2428            let lims = Limits::unlimited();
2429            // First create some entries (3?)
2430            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2431            e1.add_ava(Attribute::UserId, Value::from("william"));
2432            e1.add_ava(
2433                Attribute::Uuid,
2434                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2435            );
2436
2437            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2438            e2.add_ava(Attribute::UserId, Value::from("alice"));
2439            e2.add_ava(
2440                Attribute::Uuid,
2441                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2442            );
2443
2444            let ve1 = e1.clone().into_sealed_new();
2445            let ve2 = e2.clone().into_sealed_new();
2446
2447            assert!(be.create(&CID_ZERO, vec![ve1, ve2]).is_ok());
2448            assert!(entry_exists!(be, e1));
2449            assert!(entry_exists!(be, e2));
2450
2451            // You need to now retrieve the entries back out to get the entry id's
2452            let mut results = be
2453                .search(&lims, &filter_resolved!(f_pres(Attribute::UserId)))
2454                .expect("Failed to search");
2455
2456            // Get these out to usable entries.
2457            let r1 = results.remove(0);
2458            let r2 = results.remove(0);
2459
2460            let mut r1 = r1.as_ref().clone().into_invalid();
2461            let mut r2 = r2.as_ref().clone().into_invalid();
2462
2463            // Modify no id (err)
2464            // This is now impossible due to the state machine design.
2465            // However, with some unsafe ....
2466            let ue1 = e1.clone().into_sealed_committed();
2467            assert!(be
2468                .modify(&CID_ZERO, &[Arc::new(ue1.clone())], &[ue1])
2469                .is_err());
2470            // Modify none
2471            assert!(be.modify(&CID_ZERO, &[], &[]).is_err());
2472
2473            // Make some changes to r1, r2.
2474            let pre1 = Arc::new(r1.clone().into_sealed_committed());
2475            let pre2 = Arc::new(r2.clone().into_sealed_committed());
2476            r1.add_ava(Attribute::TestAttr, Value::from("modified"));
2477            r2.add_ava(Attribute::TestAttr, Value::from("modified"));
2478
2479            // Now ... cheat.
2480
2481            let vr1 = r1.into_sealed_committed();
2482            let vr2 = r2.into_sealed_committed();
2483
2484            // Modify single
2485            assert!(be.modify(&CID_ZERO, &[pre1], &[vr1.clone()]).is_ok());
2486            // Assert no other changes
2487            assert!(entry_attr_pres!(be, vr1, Attribute::TestAttr));
2488            assert!(!entry_attr_pres!(be, vr2, Attribute::TestAttr));
2489
2490            // Modify both
2491            assert!(be
2492                .modify(
2493                    &CID_ZERO,
2494                    &[Arc::new(vr1.clone()), pre2],
2495                    &[vr1.clone(), vr2.clone()]
2496                )
2497                .is_ok());
2498
2499            assert!(entry_attr_pres!(be, vr1, Attribute::TestAttr));
2500            assert!(entry_attr_pres!(be, vr2, Attribute::TestAttr));
2501        });
2502    }
2503
2504    #[test]
2505    fn test_be_simple_delete() {
2506        run_test!(|be: &mut BackendWriteTransaction| {
2507            trace!("Simple Delete");
2508            let lims = Limits::unlimited();
2509
2510            // First create some entries (3?)
2511            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2512            e1.add_ava(Attribute::UserId, Value::from("william"));
2513            e1.add_ava(
2514                Attribute::Uuid,
2515                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2516            );
2517
2518            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2519            e2.add_ava(Attribute::UserId, Value::from("alice"));
2520            e2.add_ava(
2521                Attribute::Uuid,
2522                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2523            );
2524
2525            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2526            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2527            e3.add_ava(
2528                Attribute::Uuid,
2529                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2530            );
2531
2532            let ve1 = e1.clone().into_sealed_new();
2533            let ve2 = e2.clone().into_sealed_new();
2534            let ve3 = e3.clone().into_sealed_new();
2535
2536            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2537            assert!(entry_exists!(be, e1));
2538            assert!(entry_exists!(be, e2));
2539            assert!(entry_exists!(be, e3));
2540
2541            // You need to now retrieve the entries back out to get the entry id's
2542            let mut results = be
2543                .search(&lims, &filter_resolved!(f_pres(Attribute::UserId)))
2544                .expect("Failed to search");
2545
2546            // Get these out to usable entries.
2547            let r1 = results.remove(0);
2548            let r2 = results.remove(0);
2549            let r3 = results.remove(0);
2550
2551            // Deletes nothing, all entries are live.
2552            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
2553
2554            // Put them into the tombstone state, and write that down.
2555            // This sets up the RUV with the changes.
2556            let r1_ts = r1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
2557
2558            assert!(be.modify(&CID_ONE, &[r1], &[r1_ts.clone()]).is_ok());
2559
2560            let r2_ts = r2.to_tombstone(CID_TWO.clone()).into_sealed_committed();
2561            let r3_ts = r3.to_tombstone(CID_TWO.clone()).into_sealed_committed();
2562
2563            assert!(be
2564                .modify(&CID_TWO, &[r2, r3], &[r2_ts.clone(), r3_ts.clone()])
2565                .is_ok());
2566
2567            // The entry are now tombstones, but is still in the ruv. This is because we
2568            // targeted CID_ZERO, not ONE.
2569            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
2570
2571            assert!(entry_exists!(be, r1_ts));
2572            assert!(entry_exists!(be, r2_ts));
2573            assert!(entry_exists!(be, r3_ts));
2574
2575            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ONE), Ok(0)));
2576
2577            assert!(entry_exists!(be, r1_ts));
2578            assert!(entry_exists!(be, r2_ts));
2579            assert!(entry_exists!(be, r3_ts));
2580
2581            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_TWO), Ok(1)));
2582
2583            assert!(!entry_exists!(be, r1_ts));
2584            assert!(entry_exists!(be, r2_ts));
2585            assert!(entry_exists!(be, r3_ts));
2586
2587            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(2)));
2588
2589            assert!(!entry_exists!(be, r1_ts));
2590            assert!(!entry_exists!(be, r2_ts));
2591            assert!(!entry_exists!(be, r3_ts));
2592
2593            // Nothing left
2594            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(0)));
2595
2596            assert!(!entry_exists!(be, r1_ts));
2597            assert!(!entry_exists!(be, r2_ts));
2598            assert!(!entry_exists!(be, r3_ts));
2599        });
2600    }
2601
2602    #[test]
2603    fn test_be_backup_restore() {
2604        let db_backup_file_name =
2605            Path::new(option_env!("OUT_DIR").unwrap_or("/tmp")).join(".backup_test.json");
2606        eprintln!(" ⚠️   {}", db_backup_file_name.display());
2607        run_test!(|be: &mut BackendWriteTransaction| {
2608            // Important! Need db metadata setup!
2609            be.reset_db_s_uuid().unwrap();
2610            be.reset_db_d_uuid().unwrap();
2611            be.set_db_ts_max(Duration::from_secs(1)).unwrap();
2612
2613            // First create some entries (3?)
2614            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2615            e1.add_ava(Attribute::UserId, Value::from("william"));
2616            e1.add_ava(
2617                Attribute::Uuid,
2618                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2619            );
2620
2621            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2622            e2.add_ava(Attribute::UserId, Value::from("alice"));
2623            e2.add_ava(
2624                Attribute::Uuid,
2625                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2626            );
2627
2628            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2629            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2630            e3.add_ava(
2631                Attribute::Uuid,
2632                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2633            );
2634
2635            let ve1 = e1.clone().into_sealed_new();
2636            let ve2 = e2.clone().into_sealed_new();
2637            let ve3 = e3.clone().into_sealed_new();
2638
2639            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2640            assert!(entry_exists!(be, e1));
2641            assert!(entry_exists!(be, e2));
2642            assert!(entry_exists!(be, e3));
2643
2644            let result = fs::remove_file(&db_backup_file_name);
2645
2646            if let Err(e) = result {
2647                // if the error is the file is not found, that's what we want so continue,
2648                // otherwise return the error
2649                if e.kind() == std::io::ErrorKind::NotFound {}
2650            }
2651
2652            be.backup(&db_backup_file_name).expect("Backup failed!");
2653            be.restore(&db_backup_file_name).expect("Restore failed!");
2654
2655            assert!(be.verify().is_empty());
2656        });
2657    }
2658
2659    #[test]
2660    fn test_be_backup_restore_tampered() {
2661        let db_backup_file_name =
2662            Path::new(option_env!("OUT_DIR").unwrap_or("/tmp")).join(".backup2_test.json");
2663        eprintln!(" ⚠️   {}", db_backup_file_name.display());
2664        run_test!(|be: &mut BackendWriteTransaction| {
2665            // Important! Need db metadata setup!
2666            be.reset_db_s_uuid().unwrap();
2667            be.reset_db_d_uuid().unwrap();
2668            be.set_db_ts_max(Duration::from_secs(1)).unwrap();
2669            // First create some entries (3?)
2670            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2671            e1.add_ava(Attribute::UserId, Value::from("william"));
2672            e1.add_ava(
2673                Attribute::Uuid,
2674                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2675            );
2676
2677            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2678            e2.add_ava(Attribute::UserId, Value::from("alice"));
2679            e2.add_ava(
2680                Attribute::Uuid,
2681                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2682            );
2683
2684            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2685            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2686            e3.add_ava(
2687                Attribute::Uuid,
2688                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2689            );
2690
2691            let ve1 = e1.clone().into_sealed_new();
2692            let ve2 = e2.clone().into_sealed_new();
2693            let ve3 = e3.clone().into_sealed_new();
2694
2695            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2696            assert!(entry_exists!(be, e1));
2697            assert!(entry_exists!(be, e2));
2698            assert!(entry_exists!(be, e3));
2699
2700            let result = fs::remove_file(&db_backup_file_name);
2701
2702            if let Err(e) = result {
2703                // if the error is the file is not found, that's what we want so continue,
2704                // otherwise return the error
2705                if e.kind() == std::io::ErrorKind::NotFound {}
2706            }
2707
2708            be.backup(&db_backup_file_name).expect("Backup failed!");
2709
2710            // Now here, we need to tamper with the file.
2711            let serialized_string = fs::read_to_string(&db_backup_file_name).unwrap();
2712            trace!(?serialized_string);
2713            let mut dbbak: DbBackup = serde_json::from_str(&serialized_string).unwrap();
2714
2715            match &mut dbbak {
2716                DbBackup::V5 {
2717                    version: _,
2718                    db_s_uuid: _,
2719                    db_d_uuid: _,
2720                    db_ts_max: _,
2721                    keyhandles: _,
2722                    repl_meta: _,
2723                    entries,
2724                } => {
2725                    let _ = entries.pop();
2726                }
2727                _ => {
2728                    // We no longer use these format versions!
2729                    unreachable!()
2730                }
2731            };
2732
2733            let serialized_entries_str = serde_json::to_string_pretty(&dbbak).unwrap();
2734            fs::write(&db_backup_file_name, serialized_entries_str).unwrap();
2735
2736            be.restore(&db_backup_file_name).expect("Restore failed!");
2737
2738            assert!(be.verify().is_empty());
2739        });
2740    }
2741
2742    #[test]
2743    fn test_be_sid_generation_and_reset() {
2744        run_test!(|be: &mut BackendWriteTransaction| {
2745            let sid1 = be.get_db_s_uuid().unwrap();
2746            let sid2 = be.get_db_s_uuid().unwrap();
2747            assert_eq!(sid1, sid2);
2748            let sid3 = be.reset_db_s_uuid().unwrap();
2749            assert!(sid1 != sid3);
2750            let sid4 = be.get_db_s_uuid().unwrap();
2751            assert_eq!(sid3, sid4);
2752        });
2753    }
2754
2755    #[test]
2756    fn test_be_reindex_empty() {
2757        run_test!(|be: &mut BackendWriteTransaction| {
2758            // Add some test data?
2759            let missing = be.missing_idxs().unwrap();
2760            assert_eq!(missing.len(), 7);
2761            assert!(be.reindex(false).is_ok());
2762            let missing = be.missing_idxs().unwrap();
2763            debug!("{:?}", missing);
2764            assert!(missing.is_empty());
2765        });
2766    }
2767
2768    #[test]
2769    fn test_be_reindex_data() {
2770        run_test!(|be: &mut BackendWriteTransaction| {
2771            // Add some test data?
2772            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2773            e1.add_ava(Attribute::Name, Value::new_iname("william"));
2774            e1.add_ava(
2775                Attribute::Uuid,
2776                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2777            );
2778            let e1 = e1.into_sealed_new();
2779
2780            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2781            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
2782            e2.add_ava(
2783                Attribute::Uuid,
2784                Value::from("bd651620-00dd-426b-aaa0-4494f7b7906f"),
2785            );
2786            let e2 = e2.into_sealed_new();
2787
2788            be.create(&CID_ZERO, vec![e1, e2]).unwrap();
2789
2790            // purge indexes
2791            be.danger_purge_idxs().unwrap();
2792            // Check they are gone
2793            let missing = be.missing_idxs().unwrap();
2794            assert_eq!(missing.len(), 7);
2795            assert!(be.reindex(false).is_ok());
2796            let missing = be.missing_idxs().unwrap();
2797            debug!("{:?}", missing);
2798            assert!(missing.is_empty());
2799            // check name and uuid ids on eq, sub, pres
2800
2801            idl_state!(
2802                be,
2803                Attribute::Name,
2804                IndexType::Equality,
2805                "william",
2806                Some(vec![1])
2807            );
2808
2809            idl_state!(
2810                be,
2811                Attribute::Name,
2812                IndexType::Equality,
2813                "claire",
2814                Some(vec![2])
2815            );
2816
2817            for sub in [
2818                "w", "m", "wi", "il", "ll", "li", "ia", "am", "wil", "ill", "lli", "lia", "iam",
2819            ] {
2820                idl_state!(
2821                    be,
2822                    Attribute::Name,
2823                    IndexType::SubString,
2824                    sub,
2825                    Some(vec![1])
2826                );
2827            }
2828
2829            for sub in [
2830                "c", "r", "e", "cl", "la", "ai", "ir", "re", "cla", "lai", "air", "ire",
2831            ] {
2832                idl_state!(
2833                    be,
2834                    Attribute::Name,
2835                    IndexType::SubString,
2836                    sub,
2837                    Some(vec![2])
2838                );
2839            }
2840
2841            for sub in ["i", "a", "l"] {
2842                idl_state!(
2843                    be,
2844                    Attribute::Name,
2845                    IndexType::SubString,
2846                    sub,
2847                    Some(vec![1, 2])
2848                );
2849            }
2850
2851            idl_state!(
2852                be,
2853                Attribute::Name,
2854                IndexType::Presence,
2855                "_",
2856                Some(vec![1, 2])
2857            );
2858
2859            idl_state!(
2860                be,
2861                Attribute::Uuid,
2862                IndexType::Equality,
2863                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
2864                Some(vec![1])
2865            );
2866
2867            idl_state!(
2868                be,
2869                Attribute::Uuid,
2870                IndexType::Equality,
2871                "bd651620-00dd-426b-aaa0-4494f7b7906f",
2872                Some(vec![2])
2873            );
2874
2875            idl_state!(
2876                be,
2877                Attribute::Uuid,
2878                IndexType::Presence,
2879                "_",
2880                Some(vec![1, 2])
2881            );
2882
2883            // Show what happens with empty
2884
2885            idl_state!(
2886                be,
2887                Attribute::Name,
2888                IndexType::Equality,
2889                "not-exist",
2890                Some(Vec::with_capacity(0))
2891            );
2892
2893            idl_state!(
2894                be,
2895                Attribute::Uuid,
2896                IndexType::Equality,
2897                "fake-0079-4b8c-8a56-593b22aa44d1",
2898                Some(Vec::with_capacity(0))
2899            );
2900
2901            let uuid_p_idl = be
2902                .load_test_idl(&Attribute::from("not_indexed"), IndexType::Presence, "_")
2903                .unwrap(); // unwrap the result
2904            assert_eq!(uuid_p_idl, None);
2905
2906            // Check name2uuid
2907            let claire_uuid = uuid!("bd651620-00dd-426b-aaa0-4494f7b7906f");
2908            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
2909
2910            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
2911            assert_eq!(be.name2uuid("william"), Ok(Some(william_uuid)));
2912            assert_eq!(
2913                be.name2uuid("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2914                Ok(None)
2915            );
2916            // check uuid2spn
2917            assert_eq!(
2918                be.uuid2spn(claire_uuid),
2919                Ok(Some(Value::new_iname("claire")))
2920            );
2921            assert_eq!(
2922                be.uuid2spn(william_uuid),
2923                Ok(Some(Value::new_iname("william")))
2924            );
2925            // check uuid2rdn
2926            assert_eq!(
2927                be.uuid2rdn(claire_uuid),
2928                Ok(Some("name=claire".to_string()))
2929            );
2930            assert_eq!(
2931                be.uuid2rdn(william_uuid),
2932                Ok(Some("name=william".to_string()))
2933            );
2934        });
2935    }
2936
2937    #[test]
2938    fn test_be_index_create_delete_simple() {
2939        run_test!(|be: &mut BackendWriteTransaction| {
2940            // First, setup our index tables!
2941            assert!(be.reindex(false).is_ok());
2942            // Test that on entry create, the indexes are made correctly.
2943            // this is a similar case to reindex.
2944            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2945            e1.add_ava(Attribute::Name, Value::from("william"));
2946            e1.add_ava(
2947                Attribute::Uuid,
2948                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2949            );
2950            let e1 = e1.into_sealed_new();
2951
2952            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
2953            let mut rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
2954            let e1 = rset.pop().unwrap();
2955
2956            idl_state!(
2957                be,
2958                Attribute::Name.as_ref(),
2959                IndexType::Equality,
2960                "william",
2961                Some(vec![1])
2962            );
2963
2964            idl_state!(
2965                be,
2966                Attribute::Name.as_ref(),
2967                IndexType::Presence,
2968                "_",
2969                Some(vec![1])
2970            );
2971
2972            idl_state!(
2973                be,
2974                Attribute::Uuid.as_ref(),
2975                IndexType::Equality,
2976                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
2977                Some(vec![1])
2978            );
2979
2980            idl_state!(
2981                be,
2982                Attribute::Uuid.as_ref(),
2983                IndexType::Presence,
2984                "_",
2985                Some(vec![1])
2986            );
2987
2988            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
2989            assert_eq!(be.name2uuid("william"), Ok(Some(william_uuid)));
2990            assert_eq!(be.uuid2spn(william_uuid), Ok(Some(Value::from("william"))));
2991            assert_eq!(
2992                be.uuid2rdn(william_uuid),
2993                Ok(Some("name=william".to_string()))
2994            );
2995
2996            // == Now we reap_tombstones, and assert we removed the items.
2997            let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
2998            assert!(be.modify(&CID_ONE, &[e1], &[e1_ts]).is_ok());
2999            be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
3000
3001            idl_state!(
3002                be,
3003                Attribute::Name.as_ref(),
3004                IndexType::Equality,
3005                "william",
3006                Some(Vec::with_capacity(0))
3007            );
3008
3009            idl_state!(
3010                be,
3011                Attribute::Name.as_ref(),
3012                IndexType::Presence,
3013                "_",
3014                Some(Vec::with_capacity(0))
3015            );
3016
3017            idl_state!(
3018                be,
3019                Attribute::Uuid.as_ref(),
3020                IndexType::Equality,
3021                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3022                Some(Vec::with_capacity(0))
3023            );
3024
3025            idl_state!(
3026                be,
3027                Attribute::Uuid.as_ref(),
3028                IndexType::Presence,
3029                "_",
3030                Some(Vec::with_capacity(0))
3031            );
3032
3033            assert_eq!(be.name2uuid("william"), Ok(None));
3034            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3035            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3036        })
3037    }
3038
3039    #[test]
3040    fn test_be_index_create_delete_multi() {
3041        run_test!(|be: &mut BackendWriteTransaction| {
3042            // delete multiple entries at a time, without deleting others
3043            // First, setup our index tables!
3044            assert!(be.reindex(false).is_ok());
3045            // Test that on entry create, the indexes are made correctly.
3046            // this is a similar case to reindex.
3047            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3048            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3049            e1.add_ava(
3050                Attribute::Uuid,
3051                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3052            );
3053            let e1 = e1.into_sealed_new();
3054
3055            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3056            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3057            e2.add_ava(
3058                Attribute::Uuid,
3059                Value::from("bd651620-00dd-426b-aaa0-4494f7b7906f"),
3060            );
3061            let e2 = e2.into_sealed_new();
3062
3063            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
3064            e3.add_ava(Attribute::UserId, Value::new_iname("lucy"));
3065            e3.add_ava(
3066                Attribute::Uuid,
3067                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
3068            );
3069            let e3 = e3.into_sealed_new();
3070
3071            let mut rset = be.create(&CID_ZERO, vec![e1, e2, e3]).unwrap();
3072            rset.remove(1);
3073            let mut rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3074            let e1 = rset.pop().unwrap();
3075            let e3 = rset.pop().unwrap();
3076
3077            // Now remove e1, e3.
3078            let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3079            let e3_ts = e3.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3080            assert!(be.modify(&CID_ONE, &[e1, e3], &[e1_ts, e3_ts]).is_ok());
3081            be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
3082
3083            idl_state!(
3084                be,
3085                Attribute::Name.as_ref(),
3086                IndexType::Equality,
3087                "claire",
3088                Some(vec![2])
3089            );
3090
3091            idl_state!(
3092                be,
3093                Attribute::Name.as_ref(),
3094                IndexType::Presence,
3095                "_",
3096                Some(vec![2])
3097            );
3098
3099            idl_state!(
3100                be,
3101                Attribute::Uuid.as_ref(),
3102                IndexType::Equality,
3103                "bd651620-00dd-426b-aaa0-4494f7b7906f",
3104                Some(vec![2])
3105            );
3106
3107            idl_state!(
3108                be,
3109                Attribute::Uuid.as_ref(),
3110                IndexType::Presence,
3111                "_",
3112                Some(vec![2])
3113            );
3114
3115            let claire_uuid = uuid!("bd651620-00dd-426b-aaa0-4494f7b7906f");
3116            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3117            let lucy_uuid = uuid!("7b23c99d-c06b-4a9a-a958-3afa56383e1d");
3118
3119            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
3120            let x = be.uuid2spn(claire_uuid);
3121            trace!(?x);
3122            assert_eq!(
3123                be.uuid2spn(claire_uuid),
3124                Ok(Some(Value::new_iname("claire")))
3125            );
3126            assert_eq!(
3127                be.uuid2rdn(claire_uuid),
3128                Ok(Some("name=claire".to_string()))
3129            );
3130
3131            assert_eq!(be.name2uuid("william"), Ok(None));
3132            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3133            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3134
3135            assert_eq!(be.name2uuid("lucy"), Ok(None));
3136            assert_eq!(be.uuid2spn(lucy_uuid), Ok(None));
3137            assert_eq!(be.uuid2rdn(lucy_uuid), Ok(None));
3138        })
3139    }
3140
3141    #[test]
3142    fn test_be_index_modify_simple() {
3143        run_test!(|be: &mut BackendWriteTransaction| {
3144            assert!(be.reindex(false).is_ok());
3145            // modify with one type, ensuring we clean the indexes behind
3146            // us. For the test to be "accurate" we must add one attr, remove one attr
3147            // and change one attr.
3148            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3149            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3150            e1.add_ava(
3151                Attribute::Uuid,
3152                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3153            );
3154            e1.add_ava(Attribute::TestAttr, Value::from("test"));
3155            let e1 = e1.into_sealed_new();
3156
3157            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3158            let rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3159            // Now, alter the new entry.
3160            let mut ce1 = rset[0].as_ref().clone().into_invalid();
3161            // add something.
3162            ce1.add_ava(Attribute::TestNumber, Value::from("test"));
3163            // remove something.
3164            ce1.purge_ava(Attribute::TestAttr);
3165            // mod something.
3166            ce1.purge_ava(Attribute::Name);
3167            ce1.add_ava(Attribute::Name, Value::new_iname("claire"));
3168
3169            let ce1 = ce1.into_sealed_committed();
3170
3171            be.modify(&CID_ZERO, &rset, &[ce1]).unwrap();
3172
3173            // Now check the idls
3174            idl_state!(
3175                be,
3176                Attribute::Name.as_ref(),
3177                IndexType::Equality,
3178                "claire",
3179                Some(vec![1])
3180            );
3181
3182            idl_state!(
3183                be,
3184                Attribute::Name.as_ref(),
3185                IndexType::Presence,
3186                "_",
3187                Some(vec![1])
3188            );
3189
3190            idl_state!(
3191                be,
3192                Attribute::TestNumber.as_ref(),
3193                IndexType::Equality,
3194                "test",
3195                Some(vec![1])
3196            );
3197
3198            idl_state!(
3199                be,
3200                Attribute::TestAttr,
3201                IndexType::Equality,
3202                "test",
3203                Some(vec![])
3204            );
3205
3206            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3207            assert_eq!(be.name2uuid("william"), Ok(None));
3208            assert_eq!(be.name2uuid("claire"), Ok(Some(william_uuid)));
3209            assert_eq!(
3210                be.uuid2spn(william_uuid),
3211                Ok(Some(Value::new_iname("claire")))
3212            );
3213            assert_eq!(
3214                be.uuid2rdn(william_uuid),
3215                Ok(Some("name=claire".to_string()))
3216            );
3217        })
3218    }
3219
3220    #[test]
3221    fn test_be_index_modify_rename() {
3222        run_test!(|be: &mut BackendWriteTransaction| {
3223            assert!(be.reindex(false).is_ok());
3224            // test when we change name AND uuid
3225            // This will be needing to be correct for conflicts when we add
3226            // replication support!
3227            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3228            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3229            e1.add_ava(
3230                Attribute::Uuid,
3231                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3232            );
3233            let e1 = e1.into_sealed_new();
3234
3235            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3236            let rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3237            // Now, alter the new entry.
3238            let mut ce1 = rset[0].as_ref().clone().into_invalid();
3239            ce1.purge_ava(Attribute::Name);
3240            ce1.purge_ava(Attribute::Uuid);
3241            ce1.add_ava(Attribute::Name, Value::new_iname("claire"));
3242            ce1.add_ava(
3243                Attribute::Uuid,
3244                Value::from("04091a7a-6ce4-42d2-abf5-c2ce244ac9e8"),
3245            );
3246            let ce1 = ce1.into_sealed_committed();
3247
3248            be.modify(&CID_ZERO, &rset, &[ce1]).unwrap();
3249
3250            idl_state!(
3251                be,
3252                Attribute::Name.as_ref(),
3253                IndexType::Equality,
3254                "claire",
3255                Some(vec![1])
3256            );
3257
3258            idl_state!(
3259                be,
3260                Attribute::Uuid.as_ref(),
3261                IndexType::Equality,
3262                "04091a7a-6ce4-42d2-abf5-c2ce244ac9e8",
3263                Some(vec![1])
3264            );
3265
3266            idl_state!(
3267                be,
3268                Attribute::Name.as_ref(),
3269                IndexType::Presence,
3270                "_",
3271                Some(vec![1])
3272            );
3273            idl_state!(
3274                be,
3275                Attribute::Uuid.as_ref(),
3276                IndexType::Presence,
3277                "_",
3278                Some(vec![1])
3279            );
3280
3281            idl_state!(
3282                be,
3283                Attribute::Uuid.as_ref(),
3284                IndexType::Equality,
3285                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3286                Some(Vec::with_capacity(0))
3287            );
3288            idl_state!(
3289                be,
3290                Attribute::Name.as_ref(),
3291                IndexType::Equality,
3292                "william",
3293                Some(Vec::with_capacity(0))
3294            );
3295
3296            let claire_uuid = uuid!("04091a7a-6ce4-42d2-abf5-c2ce244ac9e8");
3297            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3298            assert_eq!(be.name2uuid("william"), Ok(None));
3299            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
3300            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3301            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3302            assert_eq!(
3303                be.uuid2spn(claire_uuid),
3304                Ok(Some(Value::new_iname("claire")))
3305            );
3306            assert_eq!(
3307                be.uuid2rdn(claire_uuid),
3308                Ok(Some("name=claire".to_string()))
3309            );
3310        })
3311    }
3312
3313    #[test]
3314    fn test_be_index_search_simple() {
3315        run_test!(|be: &mut BackendWriteTransaction| {
3316            assert!(be.reindex(false).is_ok());
3317
3318            // Create a test entry with some indexed / unindexed values.
3319            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3320            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3321            e1.add_ava(
3322                Attribute::Uuid,
3323                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3324            );
3325            e1.add_ava(Attribute::NoIndex, Value::from("william"));
3326            e1.add_ava(Attribute::OtherNoIndex, Value::from("william"));
3327            let e1 = e1.into_sealed_new();
3328
3329            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3330            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3331            e2.add_ava(
3332                Attribute::Uuid,
3333                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d2"),
3334            );
3335            let e2 = e2.into_sealed_new();
3336
3337            let _rset = be.create(&CID_ZERO, vec![e1, e2]).unwrap();
3338            // Test fully unindexed
3339            let f_un =
3340                filter_resolved!(f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william")));
3341
3342            let (r, _plan) = be.filter2idl(f_un.to_inner(), 0).unwrap();
3343            match r {
3344                IdList::AllIds => {}
3345                _ => {
3346                    panic!("");
3347                }
3348            }
3349
3350            // Test that a fully indexed search works
3351            let feq = filter_resolved!(f_eq(Attribute::Name, PartialValue::new_utf8s("william")));
3352
3353            let (r, _plan) = be.filter2idl(feq.to_inner(), 0).unwrap();
3354            match r {
3355                IdList::Indexed(idl) => {
3356                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3357                }
3358                _ => {
3359                    panic!("");
3360                }
3361            }
3362
3363            // Test and/or
3364            //   full index and
3365            let f_in_and = filter_resolved!(f_and!([
3366                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3367                f_eq(
3368                    Attribute::Uuid,
3369                    PartialValue::new_utf8s("db237e8a-0079-4b8c-8a56-593b22aa44d1")
3370                )
3371            ]));
3372
3373            let (r, _plan) = be.filter2idl(f_in_and.to_inner(), 0).unwrap();
3374            match r {
3375                IdList::Indexed(idl) => {
3376                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3377                }
3378                _ => {
3379                    panic!("");
3380                }
3381            }
3382
3383            //   partial index and
3384            let f_p1 = filter_resolved!(f_and!([
3385                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3386                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william"))
3387            ]));
3388
3389            let f_p2 = filter_resolved!(f_and!([
3390                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3391                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william"))
3392            ]));
3393
3394            let (r, _plan) = be.filter2idl(f_p1.to_inner(), 0).unwrap();
3395            match r {
3396                IdList::Partial(idl) => {
3397                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3398                }
3399                _ => unreachable!(),
3400            }
3401
3402            let (r, _plan) = be.filter2idl(f_p2.to_inner(), 0).unwrap();
3403            match r {
3404                IdList::Partial(idl) => {
3405                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3406                }
3407                _ => unreachable!(),
3408            }
3409
3410            // Substrings are always partial
3411            let f_p3 = filter_resolved!(f_sub(Attribute::Name, PartialValue::new_utf8s("wil")));
3412
3413            let (r, plan) = be.filter2idl(f_p3.to_inner(), 0).unwrap();
3414            trace!(?r, ?plan);
3415            match r {
3416                IdList::Partial(idl) => {
3417                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3418                }
3419                _ => unreachable!(),
3420            }
3421
3422            //   no index and
3423            let f_no_and = filter_resolved!(f_and!([
3424                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william")),
3425                f_eq(Attribute::OtherNoIndex, PartialValue::new_utf8s("william"))
3426            ]));
3427
3428            let (r, _plan) = be.filter2idl(f_no_and.to_inner(), 0).unwrap();
3429            match r {
3430                IdList::AllIds => {}
3431                _ => {
3432                    panic!("");
3433                }
3434            }
3435
3436            //   full index or
3437            let f_in_or = filter_resolved!(f_or!([f_eq(
3438                Attribute::Name,
3439                PartialValue::new_utf8s("william")
3440            )]));
3441
3442            let (r, _plan) = be.filter2idl(f_in_or.to_inner(), 0).unwrap();
3443            match r {
3444                IdList::Indexed(idl) => {
3445                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3446                }
3447                _ => {
3448                    panic!("");
3449                }
3450            }
3451            //   partial (aka allids) or
3452            let f_un_or = filter_resolved!(f_or!([f_eq(
3453                Attribute::NoIndex,
3454                PartialValue::new_utf8s("william")
3455            )]));
3456
3457            let (r, _plan) = be.filter2idl(f_un_or.to_inner(), 0).unwrap();
3458            match r {
3459                IdList::AllIds => {}
3460                _ => {
3461                    panic!("");
3462                }
3463            }
3464
3465            // Test root andnot
3466            let f_r_andnot = filter_resolved!(f_andnot(f_eq(
3467                Attribute::Name,
3468                PartialValue::new_utf8s("william")
3469            )));
3470
3471            let (r, _plan) = be.filter2idl(f_r_andnot.to_inner(), 0).unwrap();
3472            match r {
3473                IdList::Indexed(idl) => {
3474                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3475                }
3476                _ => {
3477                    panic!("");
3478                }
3479            }
3480
3481            // test andnot as only in and
3482            let f_and_andnot = filter_resolved!(f_and!([f_andnot(f_eq(
3483                Attribute::Name,
3484                PartialValue::new_utf8s("william")
3485            ))]));
3486
3487            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3488            match r {
3489                IdList::Indexed(idl) => {
3490                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3491                }
3492                _ => {
3493                    panic!("");
3494                }
3495            }
3496            // test andnot as only in or
3497            let f_or_andnot = filter_resolved!(f_or!([f_andnot(f_eq(
3498                Attribute::Name,
3499                PartialValue::new_utf8s("william")
3500            ))]));
3501
3502            let (r, _plan) = be.filter2idl(f_or_andnot.to_inner(), 0).unwrap();
3503            match r {
3504                IdList::Indexed(idl) => {
3505                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3506                }
3507                _ => {
3508                    panic!("");
3509                }
3510            }
3511
3512            // test andnot in and (first) with name
3513            let f_and_andnot = filter_resolved!(f_and!([
3514                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire"))),
3515                f_pres(Attribute::Name)
3516            ]));
3517
3518            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3519            match r {
3520                IdList::Indexed(idl) => {
3521                    debug!("{:?}", idl);
3522                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3523                }
3524                _ => {
3525                    panic!("");
3526                }
3527            }
3528            // test andnot in and (last) with name
3529            let f_and_andnot = filter_resolved!(f_and!([
3530                f_pres(Attribute::Name),
3531                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire")))
3532            ]));
3533
3534            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3535            match r {
3536                IdList::Indexed(idl) => {
3537                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3538                }
3539                _ => {
3540                    panic!("");
3541                }
3542            }
3543            // test andnot in and (first) with no-index
3544            let f_and_andnot = filter_resolved!(f_and!([
3545                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire"))),
3546                f_pres(Attribute::NoIndex)
3547            ]));
3548
3549            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3550            match r {
3551                IdList::AllIds => {}
3552                _ => {
3553                    panic!("");
3554                }
3555            }
3556            // test andnot in and (last) with no-index
3557            let f_and_andnot = filter_resolved!(f_and!([
3558                f_pres(Attribute::NoIndex),
3559                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire")))
3560            ]));
3561
3562            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3563            match r {
3564                IdList::AllIds => {}
3565                _ => {
3566                    panic!("");
3567                }
3568            }
3569
3570            //   empty or
3571            let f_e_or = filter_resolved!(f_or!([]));
3572
3573            let (r, _plan) = be.filter2idl(f_e_or.to_inner(), 0).unwrap();
3574            match r {
3575                IdList::Indexed(idl) => {
3576                    assert_eq!(idl, IDLBitRange::from_iter(vec![]));
3577                }
3578                _ => {
3579                    panic!("");
3580                }
3581            }
3582
3583            let f_e_and = filter_resolved!(f_and!([]));
3584
3585            let (r, _plan) = be.filter2idl(f_e_and.to_inner(), 0).unwrap();
3586            match r {
3587                IdList::Indexed(idl) => {
3588                    assert_eq!(idl, IDLBitRange::from_iter(vec![]));
3589                }
3590                _ => {
3591                    panic!("");
3592                }
3593            }
3594        })
3595    }
3596
3597    #[test]
3598    fn test_be_index_search_missing() {
3599        run_test!(|be: &mut BackendWriteTransaction| {
3600            // Test where the index is in schema but not created (purge idxs)
3601            // should fall back to an empty set because we can't satisfy the term
3602            be.danger_purge_idxs().unwrap();
3603            debug!("{:?}", be.missing_idxs().unwrap());
3604            let f_eq = filter_resolved!(f_eq(Attribute::Name, PartialValue::new_utf8s("william")));
3605
3606            let (r, _plan) = be.filter2idl(f_eq.to_inner(), 0).unwrap();
3607            match r {
3608                IdList::AllIds => {}
3609                _ => {
3610                    panic!("");
3611                }
3612            }
3613        })
3614    }
3615
3616    #[test]
3617    fn test_be_index_slope_generation() {
3618        run_test!(|be: &mut BackendWriteTransaction| {
3619            // Create some test entry with some indexed / unindexed values.
3620            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3621            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3622            e1.add_ava(
3623                Attribute::Uuid,
3624                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3625            );
3626            e1.add_ava(Attribute::TestAttr, Value::from("dupe"));
3627            e1.add_ava(Attribute::TestNumber, Value::from("1"));
3628            let e1 = e1.into_sealed_new();
3629
3630            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3631            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3632            e2.add_ava(
3633                Attribute::Uuid,
3634                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d2"),
3635            );
3636            e2.add_ava(Attribute::TestAttr, Value::from("dupe"));
3637            e2.add_ava(Attribute::TestNumber, Value::from("1"));
3638            let e2 = e2.into_sealed_new();
3639
3640            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
3641            e3.add_ava(Attribute::Name, Value::new_iname("benny"));
3642            e3.add_ava(
3643                Attribute::Uuid,
3644                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d3"),
3645            );
3646            e3.add_ava(Attribute::TestAttr, Value::from("dupe"));
3647            e3.add_ava(Attribute::TestNumber, Value::from("2"));
3648            let e3 = e3.into_sealed_new();
3649
3650            let _rset = be.create(&CID_ZERO, vec![e1, e2, e3]).unwrap();
3651
3652            // If the slopes haven't been generated yet, there are some hardcoded values
3653            // that we can use instead. They aren't generated until a first re-index.
3654            assert!(!be.is_idx_slopeyness_generated().unwrap());
3655
3656            let ta_eq_slope = be
3657                .get_idx_slope(&IdxKey::new(Attribute::TestAttr, IndexType::Equality))
3658                .unwrap();
3659            assert_eq!(ta_eq_slope, 45);
3660
3661            let tb_eq_slope = be
3662                .get_idx_slope(&IdxKey::new(Attribute::TestNumber, IndexType::Equality))
3663                .unwrap();
3664            assert_eq!(tb_eq_slope, 45);
3665
3666            let name_eq_slope = be
3667                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Equality))
3668                .unwrap();
3669            assert_eq!(name_eq_slope, 1);
3670            let uuid_eq_slope = be
3671                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Equality))
3672                .unwrap();
3673            assert_eq!(uuid_eq_slope, 1);
3674
3675            let name_pres_slope = be
3676                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Presence))
3677                .unwrap();
3678            assert_eq!(name_pres_slope, 90);
3679            let uuid_pres_slope = be
3680                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Presence))
3681                .unwrap();
3682            assert_eq!(uuid_pres_slope, 90);
3683            // Check the slopes are what we expect for hardcoded values.
3684
3685            // Now check slope generation for the values. Today these are calculated
3686            // at reindex time, so we now perform the re-index.
3687            assert!(be.reindex(false).is_ok());
3688            assert!(be.is_idx_slopeyness_generated().unwrap());
3689
3690            let ta_eq_slope = be
3691                .get_idx_slope(&IdxKey::new(Attribute::TestAttr, IndexType::Equality))
3692                .unwrap();
3693            assert_eq!(ta_eq_slope, 200);
3694
3695            let tb_eq_slope = be
3696                .get_idx_slope(&IdxKey::new(Attribute::TestNumber, IndexType::Equality))
3697                .unwrap();
3698            assert_eq!(tb_eq_slope, 133);
3699
3700            let name_eq_slope = be
3701                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Equality))
3702                .unwrap();
3703            assert_eq!(name_eq_slope, 51);
3704            let uuid_eq_slope = be
3705                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Equality))
3706                .unwrap();
3707            assert_eq!(uuid_eq_slope, 51);
3708
3709            let name_pres_slope = be
3710                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Presence))
3711                .unwrap();
3712            assert_eq!(name_pres_slope, 200);
3713            let uuid_pres_slope = be
3714                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Presence))
3715                .unwrap();
3716            assert_eq!(uuid_pres_slope, 200);
3717        })
3718    }
3719
3720    #[test]
3721    fn test_be_limits_allids() {
3722        run_test!(|be: &mut BackendWriteTransaction| {
3723            let mut lim_allow_allids = Limits::unlimited();
3724            lim_allow_allids.unindexed_allow = true;
3725
3726            let mut lim_deny_allids = Limits::unlimited();
3727            lim_deny_allids.unindexed_allow = false;
3728
3729            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3730            e.add_ava(Attribute::UserId, Value::from("william"));
3731            e.add_ava(
3732                Attribute::Uuid,
3733                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3734            );
3735            e.add_ava(Attribute::NonExist, Value::from("x"));
3736            let e = e.into_sealed_new();
3737            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
3738
3739            assert!(single_result.is_ok());
3740            let filt = e
3741                .filter_from_attrs(&[Attribute::NonExist])
3742                .expect("failed to generate filter")
3743                .into_valid_resolved();
3744            // check allow on allids
3745            let res = be.search(&lim_allow_allids, &filt);
3746            assert!(res.is_ok());
3747            let res = be.exists(&lim_allow_allids, &filt);
3748            assert!(res.is_ok());
3749
3750            // check deny on allids
3751            let res = be.search(&lim_deny_allids, &filt);
3752            assert_eq!(res, Err(OperationError::ResourceLimit));
3753            let res = be.exists(&lim_deny_allids, &filt);
3754            assert_eq!(res, Err(OperationError::ResourceLimit));
3755        })
3756    }
3757
3758    #[test]
3759    fn test_be_limits_results_max() {
3760        run_test!(|be: &mut BackendWriteTransaction| {
3761            let mut lim_allow = Limits::unlimited();
3762            lim_allow.search_max_results = usize::MAX;
3763
3764            let mut lim_deny = Limits::unlimited();
3765            lim_deny.search_max_results = 0;
3766
3767            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3768            e.add_ava(Attribute::UserId, Value::from("william"));
3769            e.add_ava(
3770                Attribute::Uuid,
3771                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3772            );
3773            e.add_ava(Attribute::NonExist, Value::from("x"));
3774            let e = e.into_sealed_new();
3775            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
3776            assert!(single_result.is_ok());
3777
3778            let filt = e
3779                .filter_from_attrs(&[Attribute::NonExist])
3780                .expect("failed to generate filter")
3781                .into_valid_resolved();
3782
3783            // --> This is the all ids path (unindexed)
3784            // check allow on entry max
3785            let res = be.search(&lim_allow, &filt);
3786            assert!(res.is_ok());
3787            let res = be.exists(&lim_allow, &filt);
3788            assert!(res.is_ok());
3789
3790            // check deny on entry max
3791            let res = be.search(&lim_deny, &filt);
3792            assert_eq!(res, Err(OperationError::ResourceLimit));
3793            // we don't limit on exists because we never load the entries.
3794            let res = be.exists(&lim_deny, &filt);
3795            assert!(res.is_ok());
3796
3797            // --> This will shortcut due to indexing.
3798            assert!(be.reindex(false).is_ok());
3799            let res = be.search(&lim_deny, &filt);
3800            assert_eq!(res, Err(OperationError::ResourceLimit));
3801            // we don't limit on exists because we never load the entries.
3802            let res = be.exists(&lim_deny, &filt);
3803            assert!(res.is_ok());
3804        })
3805    }
3806
3807    #[test]
3808    fn test_be_limits_partial_filter() {
3809        run_test!(|be: &mut BackendWriteTransaction| {
3810            // This relies on how we do partials, so it could be a bit sensitive.
3811            // A partial is generated after an allids + indexed in a single and
3812            // as we require both conditions to exist. Allids comes from unindexed
3813            // terms. we need to ensure we don't hit partial threshold too.
3814            //
3815            // This means we need an and query where the first term is allids
3816            // and the second is indexed, but without the filter shortcutting.
3817            //
3818            // To achieve this we need a monstrously evil query.
3819            //
3820            let mut lim_allow = Limits::unlimited();
3821            lim_allow.search_max_filter_test = usize::MAX;
3822
3823            let mut lim_deny = Limits::unlimited();
3824            lim_deny.search_max_filter_test = 0;
3825
3826            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3827            e.add_ava(Attribute::Name, Value::new_iname("william"));
3828            e.add_ava(
3829                Attribute::Uuid,
3830                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3831            );
3832            e.add_ava(Attribute::NonExist, Value::from("x"));
3833            e.add_ava(Attribute::NonExist, Value::from("y"));
3834            let e = e.into_sealed_new();
3835            let single_result = be.create(&CID_ZERO, vec![e]);
3836            assert!(single_result.is_ok());
3837
3838            // Reindex so we have things in place for our query
3839            assert!(be.reindex(false).is_ok());
3840
3841            // 🚨 This is evil!
3842            // The and allows us to hit "allids + indexed -> partial".
3843            // the or terms prevent re-arrangement. They can't be folded or dead
3844            // term elimed either.
3845            //
3846            // This means the f_or nonexist will become allids and the second will be indexed
3847            // due to f_eq userid in both with the result of william.
3848            //
3849            // This creates a partial, and because it's the first iteration in the loop, this
3850            // doesn't encounter partial threshold testing.
3851            let filt = filter_resolved!(f_and!([
3852                f_or!([
3853                    f_eq(Attribute::NonExist, PartialValue::new_utf8s("x")),
3854                    f_eq(Attribute::NonExist, PartialValue::new_utf8s("y"))
3855                ]),
3856                f_or!([
3857                    f_eq(Attribute::Name, PartialValue::new_utf8s("claire")),
3858                    f_eq(Attribute::Name, PartialValue::new_utf8s("william"))
3859                ]),
3860            ]));
3861
3862            let res = be.search(&lim_allow, &filt);
3863            assert!(res.is_ok());
3864            let res = be.exists(&lim_allow, &filt);
3865            assert!(res.is_ok());
3866
3867            // check deny on entry max
3868            let res = be.search(&lim_deny, &filt);
3869            assert_eq!(res, Err(OperationError::ResourceLimit));
3870            // we don't limit on exists because we never load the entries.
3871            let res = be.exists(&lim_deny, &filt);
3872            assert_eq!(res, Err(OperationError::ResourceLimit));
3873        })
3874    }
3875
3876    #[test]
3877    fn test_be_multiple_create() {
3878        sketching::test_init();
3879
3880        // This is a demo idxmeta, purely for testing.
3881        let idxmeta = vec![IdxKey {
3882            attr: Attribute::Uuid,
3883            itype: IndexType::Equality,
3884        }];
3885
3886        let be_a = Backend::new(BackendConfig::new_test("main"), idxmeta.clone(), false)
3887            .expect("Failed to setup backend");
3888
3889        let be_b = Backend::new(BackendConfig::new_test("db_2"), idxmeta, false)
3890            .expect("Failed to setup backend");
3891
3892        let mut be_a_txn = be_a.write().unwrap();
3893        let mut be_b_txn = be_b.write().unwrap();
3894
3895        assert!(be_a_txn.get_db_s_uuid() != be_b_txn.get_db_s_uuid());
3896
3897        // Create into A
3898        let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3899        e.add_ava(Attribute::UserId, Value::from("william"));
3900        e.add_ava(
3901            Attribute::Uuid,
3902            Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3903        );
3904        let e = e.into_sealed_new();
3905
3906        let single_result = be_a_txn.create(&CID_ZERO, vec![e]);
3907
3908        assert!(single_result.is_ok());
3909
3910        // Assert it's in A but not B.
3911        let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("william")));
3912
3913        let lims = Limits::unlimited();
3914
3915        let r = be_a_txn.search(&lims, &filt);
3916        assert!(r.expect("Search failed!").len() == 1);
3917
3918        let r = be_b_txn.search(&lims, &filt);
3919        assert!(r.expect("Search failed!").is_empty());
3920
3921        // Create into B
3922        let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3923        e.add_ava(Attribute::UserId, Value::from("claire"));
3924        e.add_ava(
3925            Attribute::Uuid,
3926            Value::from("0c680959-0944-47d6-9dea-53304d124266"),
3927        );
3928        let e = e.into_sealed_new();
3929
3930        let single_result = be_b_txn.create(&CID_ZERO, vec![e]);
3931
3932        assert!(single_result.is_ok());
3933
3934        // Assert it's in B but not A
3935        let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("claire")));
3936
3937        let lims = Limits::unlimited();
3938
3939        let r = be_a_txn.search(&lims, &filt);
3940        assert!(r.expect("Search failed!").is_empty());
3941
3942        let r = be_b_txn.search(&lims, &filt);
3943        assert!(r.expect("Search failed!").len() == 1);
3944    }
3945}