kanidmd_lib/be/
mod.rs

1//! The backend. This contains the "low level" storage and query code, which is
2//! implemented as a json-like kv document database. This has no rules about content
3//! of the server, which are all enforced at higher levels. The role of the backend
4//! is to persist content safely to disk, load that content, and execute queries
5//! utilising indexes in the most effective way possible.
6
7use crate::be::dbentry::{DbBackup, DbEntry};
8use crate::be::dbrepl::DbReplMeta;
9use crate::entry::Entry;
10use crate::filter::{Filter, FilterPlan, FilterResolved, FilterValidResolved};
11use crate::prelude::*;
12use crate::repl::cid::Cid;
13use crate::repl::proto::ReplCidRange;
14use crate::repl::ruv::{
15    ReplicationUpdateVector, ReplicationUpdateVectorReadTransaction,
16    ReplicationUpdateVectorTransaction, ReplicationUpdateVectorWriteTransaction,
17};
18use crate::utils::trigraph_iter;
19use crate::value::{IndexType, Value};
20use concread::cowcell::*;
21use hashbrown::{HashMap as Map, HashSet};
22use idlset::v2::IDLBitRange;
23use idlset::AndNot;
24use kanidm_proto::backup::BackupCompression;
25use kanidm_proto::internal::{ConsistencyError, OperationError};
26use std::collections::BTreeMap;
27use std::io::prelude::*;
28use std::ops::DerefMut;
29use std::path::{Path, PathBuf};
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::{trace, trace_span};
33use uuid::Uuid;
34
35use flate2::write::GzEncoder;
36use flate2::Compression;
37
38pub(crate) mod dbentry;
39pub(crate) mod dbrepl;
40pub(crate) mod dbvalue;
41
42mod idl_arc_sqlite;
43mod idl_sqlite;
44pub(crate) mod idxkey;
45pub(crate) mod keystorage;
46
47pub(crate) use self::idxkey::{IdxKey, IdxKeyRef, IdxKeyToRef, IdxSlope};
48use crate::be::idl_arc_sqlite::{
49    IdlArcSqlite, IdlArcSqliteReadTransaction, IdlArcSqliteTransaction,
50    IdlArcSqliteWriteTransaction,
51};
52use kanidm_proto::internal::FsType;
53
54// Currently disabled due to improvements in idlset for intersection handling.
55const FILTER_SEARCH_TEST_THRESHOLD: usize = 0;
56const FILTER_EXISTS_TEST_THRESHOLD: usize = 0;
57const FILTER_SUBSTR_TEST_THRESHOLD: usize = 4;
58
59#[derive(Debug, Clone)]
60/// Limits on the resources a single event can consume. These are defined per-event
61/// as they are derived from the userAuthToken based on that individual session
62pub struct Limits {
63    pub unindexed_allow: bool,
64    pub search_max_results: usize,
65    pub search_max_filter_test: usize,
66    pub filter_max_elements: usize,
67}
68
69impl Default for Limits {
70    fn default() -> Self {
71        Limits {
72            unindexed_allow: false,
73            search_max_results: DEFAULT_LIMIT_SEARCH_MAX_RESULTS as usize,
74            search_max_filter_test: DEFAULT_LIMIT_SEARCH_MAX_FILTER_TEST as usize,
75            filter_max_elements: DEFAULT_LIMIT_FILTER_MAX_ELEMENTS as usize,
76        }
77    }
78}
79
80impl Limits {
81    pub fn unlimited() -> Self {
82        Limits {
83            unindexed_allow: true,
84            search_max_results: usize::MAX >> 1,
85            search_max_filter_test: usize::MAX >> 1,
86            filter_max_elements: usize::MAX,
87        }
88    }
89
90    pub fn api_token() -> Self {
91        Limits {
92            unindexed_allow: false,
93            search_max_results: DEFAULT_LIMIT_API_SEARCH_MAX_RESULTS as usize,
94            search_max_filter_test: DEFAULT_LIMIT_API_SEARCH_MAX_FILTER_TEST as usize,
95            filter_max_elements: DEFAULT_LIMIT_FILTER_MAX_ELEMENTS as usize,
96        }
97    }
98}
99
100/// The result of a key value request containing the list of entry IDs that
101/// match the filter/query condition.
102#[derive(Debug, Clone)]
103pub enum IdList {
104    /// The value is not indexed, and must be assumed that all entries may match.
105    AllIds,
106    /// The index is "fuzzy" like a bloom filter (perhaps superset is a better description) -
107    /// it contains all elements that do match, but may have extra elements that don't.
108    /// This requires the caller to perform a filter test to assert that all
109    /// returned entries match all assertions within the filter.
110    Partial(IDLBitRange),
111    /// The set was indexed and is below the filter test threshold. This is because it's
112    /// now faster to test with the filter than to continue to access indexes at this point.
113    /// Like a partial set, this is a super set of the entries that match the query.
114    PartialThreshold(IDLBitRange),
115    /// The value is indexed and accurately represents the set of entries that precisely match.
116    Indexed(IDLBitRange),
117}
118
119#[derive(Debug)]
120pub struct IdRawEntry {
121    id: u64,
122    data: Vec<u8>,
123}
124
125#[derive(Debug, Clone)]
126pub struct IdxMeta {
127    pub idxkeys: Map<IdxKey, IdxSlope>,
128}
129
130impl IdxMeta {
131    pub fn new(idxkeys: Map<IdxKey, IdxSlope>) -> Self {
132        IdxMeta { idxkeys }
133    }
134}
135
136#[derive(Clone)]
137pub struct BackendConfig {
138    path: PathBuf,
139    pool_size: u32,
140    db_name: &'static str,
141    fstype: FsType,
142    // Cachesizes?
143    arcsize: Option<usize>,
144}
145
146impl BackendConfig {
147    pub fn new(
148        path: Option<&Path>,
149        pool_size: u32,
150        fstype: FsType,
151        arcsize: Option<usize>,
152    ) -> Self {
153        BackendConfig {
154            pool_size,
155            // This means if path is None, that "" implies an sqlite in memory/ram only database.
156            path: path.unwrap_or_else(|| Path::new("")).to_path_buf(),
157            db_name: "main",
158            fstype,
159            arcsize,
160        }
161    }
162
163    pub(crate) fn new_test(db_name: &'static str) -> Self {
164        BackendConfig {
165            pool_size: 1,
166            path: PathBuf::from(""),
167            db_name,
168            fstype: FsType::Generic,
169            arcsize: Some(2048),
170        }
171    }
172}
173
174#[derive(Clone)]
175pub struct Backend {
176    /// This is the actual datastorage layer.
177    idlayer: Arc<IdlArcSqlite>,
178    /// This is a copy-on-write cache of the index metadata that has been
179    /// extracted from attributes set, in the correct format for the backend
180    /// to consume. We use it to extract indexes from entries during write paths
181    /// and to allow the front end to know what indexes exist during a read.
182    idxmeta: Arc<CowCell<IdxMeta>>,
183    /// The current state of the replication update vector. This is effectively a
184    /// time series index of the full list of all changelog entries and what entries
185    /// that are part of that change.
186    ruv: Arc<ReplicationUpdateVector>,
187    cfg: BackendConfig,
188}
189
190pub struct BackendReadTransaction<'a> {
191    idlayer: IdlArcSqliteReadTransaction<'a>,
192    idxmeta: CowCellReadTxn<IdxMeta>,
193    ruv: ReplicationUpdateVectorReadTransaction<'a>,
194}
195
196unsafe impl Sync for BackendReadTransaction<'_> {}
197
198unsafe impl Send for BackendReadTransaction<'_> {}
199
200pub struct BackendWriteTransaction<'a> {
201    idlayer: IdlArcSqliteWriteTransaction<'a>,
202    idxmeta_wr: CowCellWriteTxn<'a, IdxMeta>,
203    ruv: ReplicationUpdateVectorWriteTransaction<'a>,
204}
205
206impl IdRawEntry {
207    fn into_dbentry(self) -> Result<(u64, DbEntry), OperationError> {
208        serde_json::from_slice(self.data.as_slice())
209            .map_err(|e| {
210                admin_error!(?e, "Serde JSON Error");
211                OperationError::SerdeJsonError
212            })
213            .map(|dbe| (self.id, dbe))
214    }
215
216    fn into_entry(self) -> Result<EntrySealedCommitted, OperationError> {
217        let db_e = serde_json::from_slice(self.data.as_slice()).map_err(|e| {
218            admin_error!(?e, id = %self.id, "Serde JSON Error");
219            let raw_str = String::from_utf8_lossy(self.data.as_slice());
220            debug!(raw = %raw_str);
221            OperationError::SerdeJsonError
222        })?;
223        // let id = u64::try_from(self.id).map_err(|_| OperationError::InvalidEntryId)?;
224        Entry::from_dbentry(db_e, self.id).ok_or(OperationError::CorruptedEntry(self.id))
225    }
226}
227
228pub trait BackendTransaction {
229    type IdlLayerType: IdlArcSqliteTransaction;
230    fn get_idlayer(&mut self) -> &mut Self::IdlLayerType;
231
232    type RuvType: ReplicationUpdateVectorTransaction;
233    fn get_ruv(&mut self) -> &mut Self::RuvType;
234
235    fn get_idxmeta_ref(&self) -> &IdxMeta;
236
237    /// Recursively apply a filter, transforming into IdList's on the way. This builds a query
238    /// execution log, so that it can be examined how an operation proceeded.
239    #[allow(clippy::cognitive_complexity)]
240    fn filter2idl(
241        &mut self,
242        filt: &FilterResolved,
243        thres: usize,
244    ) -> Result<(IdList, FilterPlan), OperationError> {
245        Ok(match filt {
246            FilterResolved::Eq(attr, value, idx) => {
247                if idx.is_some() {
248                    // Get the idx_key
249                    let idx_key = value.get_idx_eq_key();
250                    // Get the idl for this
251                    match self
252                        .get_idlayer()
253                        .get_idl(attr, IndexType::Equality, &idx_key)?
254                    {
255                        Some(idl) => (
256                            IdList::Indexed(idl),
257                            FilterPlan::EqIndexed(attr.clone(), idx_key),
258                        ),
259                        None => (IdList::AllIds, FilterPlan::EqCorrupt(attr.clone())),
260                    }
261                } else {
262                    // Schema believes this is not indexed
263                    (IdList::AllIds, FilterPlan::EqUnindexed(attr.clone()))
264                }
265            }
266            FilterResolved::Stw(attr, subvalue, idx)
267            | FilterResolved::Enw(attr, subvalue, idx)
268            | FilterResolved::Cnt(attr, subvalue, idx) => {
269                // Get the idx_key. Not all types support this, so may return "none".
270                trace!(?idx, ?subvalue, ?attr);
271                if let (true, Some(idx_key)) = (idx.is_some(), subvalue.get_idx_sub_key()) {
272                    self.filter2idl_sub(attr, idx_key)?
273                } else {
274                    // Schema believes this is not indexed
275                    (IdList::AllIds, FilterPlan::SubUnindexed(attr.clone()))
276                }
277            }
278            FilterResolved::Pres(attr, idx) => {
279                if idx.is_some() {
280                    // Get the idl for this
281                    match self.get_idlayer().get_idl(attr, IndexType::Presence, "_")? {
282                        Some(idl) => (IdList::Indexed(idl), FilterPlan::PresIndexed(attr.clone())),
283                        None => (IdList::AllIds, FilterPlan::PresCorrupt(attr.clone())),
284                    }
285                } else {
286                    // Schema believes this is not indexed
287                    (IdList::AllIds, FilterPlan::PresUnindexed(attr.clone()))
288                }
289            }
290            FilterResolved::LessThan(attr, _subvalue, idx) => {
291                if idx.is_some() {
292                    // TODO: Temporary but we get the PRESENCE index for Ordering operations to
293                    // reduce the amount of entries we need to filter in memory. In future we need
294                    // a true ordering index, but that's a large block of work on it's own. For now
295                    // this already helps a lot for in memory processing.
296                    match self.get_idlayer().get_idl(attr, IndexType::Presence, "_")? {
297                        Some(idl) => (
298                            IdList::Partial(idl),
299                            FilterPlan::LessThanIndexed(attr.clone()),
300                        ),
301                        None => (IdList::AllIds, FilterPlan::LessThanCorrupt(attr.clone())),
302                    }
303                } else {
304                    (IdList::AllIds, FilterPlan::LessThanUnindexed(attr.clone()))
305                }
306            }
307            FilterResolved::Or(l, _) => {
308                // Importantly if this has no inner elements, this returns
309                // an empty list.
310                let mut plan = Vec::with_capacity(0);
311                let mut result = IDLBitRange::new();
312                let mut partial = false;
313                let mut threshold = false;
314                // For each filter in l
315                for f in l.iter() {
316                    // get their idls
317                    match self.filter2idl(f, thres)? {
318                        (IdList::Indexed(idl), fp) => {
319                            plan.push(fp);
320                            // now union them (if possible)
321                            result = result | idl;
322                        }
323                        (IdList::Partial(idl), fp) => {
324                            plan.push(fp);
325                            // now union them (if possible)
326                            result = result | idl;
327                            partial = true;
328                        }
329                        (IdList::PartialThreshold(idl), fp) => {
330                            plan.push(fp);
331                            // now union them (if possible)
332                            result = result | idl;
333                            partial = true;
334                            threshold = true;
335                        }
336                        (IdList::AllIds, fp) => {
337                            plan.push(fp);
338                            // If we find anything unindexed, the whole term is unindexed.
339                            filter_trace!("Term {:?} is AllIds, shortcut return", f);
340                            let setplan = FilterPlan::OrUnindexed(plan);
341                            return Ok((IdList::AllIds, setplan));
342                        }
343                    }
344                } // end or.iter()
345                  // If we got here, every term must have been indexed or partial indexed.
346                if partial {
347                    if threshold {
348                        let setplan = FilterPlan::OrPartialThreshold(plan);
349                        (IdList::PartialThreshold(result), setplan)
350                    } else {
351                        let setplan = FilterPlan::OrPartial(plan);
352                        (IdList::Partial(result), setplan)
353                    }
354                } else {
355                    let setplan = FilterPlan::OrIndexed(plan);
356                    (IdList::Indexed(result), setplan)
357                }
358            }
359            FilterResolved::And(l, _) => {
360                // This algorithm is a little annoying. I couldn't get it to work with iter and
361                // folds due to the logic needed ...
362
363                // First, setup the two filter lists. We always apply AndNot after positive
364                // and terms.
365                let (f_andnot, f_rem): (Vec<_>, Vec<_>) = l.iter().partition(|f| f.is_andnot());
366
367                // We make this an iter, so everything comes off in order. if we used pop it means we
368                // pull from the tail, which is the WORST item to start with!
369                let mut f_rem_iter = f_rem.iter();
370
371                // Setup the initial result.
372                let (mut cand_idl, fp) = match f_rem_iter.next() {
373                    Some(f) => self.filter2idl(f, thres)?,
374                    None => {
375                        filter_warn!(
376                            "And filter was empty, or contains only AndNot, can not evaluate."
377                        );
378                        return Ok((IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid));
379                    }
380                };
381
382                // Setup the counter of terms we have left to evaluate.
383                // This is used so that we shortcut return ONLY when we really do have
384                // more terms remaining.
385                let mut f_rem_count = f_rem.len() + f_andnot.len() - 1;
386
387                // Setup the query plan tracker
388                let mut plan = vec![fp];
389
390                match &cand_idl {
391                    IdList::Indexed(idl) | IdList::Partial(idl) | IdList::PartialThreshold(idl) => {
392                        // When below thres, we have to return partials to trigger the entry_no_match_filter check.
393                        // But we only do this when there are actually multiple elements in the and,
394                        // because an and with 1 element now is FULLY resolved.
395                        if idl.below_threshold(thres) && f_rem_count > 0 {
396                            let setplan = FilterPlan::AndPartialThreshold(plan);
397                            return Ok((IdList::PartialThreshold(idl.clone()), setplan));
398                        } else if idl.is_empty() {
399                            // Regardless of the input state, if it's empty, this can never
400                            // be satisfied, so return we are indexed and complete.
401                            let setplan = FilterPlan::AndEmptyCand(plan);
402                            return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
403                        }
404                    }
405                    IdList::AllIds => {}
406                }
407
408                // Now, for all remaining,
409                for f in f_rem_iter {
410                    f_rem_count -= 1;
411                    let (inter, fp) = self.filter2idl(f, thres)?;
412                    plan.push(fp);
413                    cand_idl = match (cand_idl, inter) {
414                        (IdList::Indexed(ia), IdList::Indexed(ib)) => {
415                            let r = ia & ib;
416                            if r.below_threshold(thres) && f_rem_count > 0 {
417                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
418                                let setplan = FilterPlan::AndPartialThreshold(plan);
419                                return Ok((IdList::PartialThreshold(r), setplan));
420                            } else if r.is_empty() {
421                                // Regardless of the input state, if it's empty, this can never
422                                // be satisfied, so return we are indexed and complete.
423                                let setplan = FilterPlan::AndEmptyCand(plan);
424                                return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
425                            } else {
426                                IdList::Indexed(r)
427                            }
428                        }
429                        (IdList::Indexed(ia), IdList::Partial(ib))
430                        | (IdList::Partial(ia), IdList::Indexed(ib))
431                        | (IdList::Partial(ia), IdList::Partial(ib)) => {
432                            let r = ia & ib;
433                            if r.below_threshold(thres) && f_rem_count > 0 {
434                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
435                                let setplan = FilterPlan::AndPartialThreshold(plan);
436                                return Ok((IdList::PartialThreshold(r), setplan));
437                            } else {
438                                IdList::Partial(r)
439                            }
440                        }
441                        (IdList::Indexed(ia), IdList::PartialThreshold(ib))
442                        | (IdList::PartialThreshold(ia), IdList::Indexed(ib))
443                        | (IdList::PartialThreshold(ia), IdList::PartialThreshold(ib))
444                        | (IdList::PartialThreshold(ia), IdList::Partial(ib))
445                        | (IdList::Partial(ia), IdList::PartialThreshold(ib)) => {
446                            let r = ia & ib;
447                            if r.below_threshold(thres) && f_rem_count > 0 {
448                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
449                                let setplan = FilterPlan::AndPartialThreshold(plan);
450                                return Ok((IdList::PartialThreshold(r), setplan));
451                            } else {
452                                IdList::PartialThreshold(r)
453                            }
454                        }
455                        (IdList::Indexed(i), IdList::AllIds)
456                        | (IdList::AllIds, IdList::Indexed(i))
457                        | (IdList::Partial(i), IdList::AllIds)
458                        | (IdList::AllIds, IdList::Partial(i)) => IdList::Partial(i),
459                        (IdList::PartialThreshold(i), IdList::AllIds)
460                        | (IdList::AllIds, IdList::PartialThreshold(i)) => {
461                            IdList::PartialThreshold(i)
462                        }
463                        (IdList::AllIds, IdList::AllIds) => IdList::AllIds,
464                    };
465                }
466
467                // debug!("partial cand set ==> {:?}", cand_idl);
468
469                for f in f_andnot.iter() {
470                    f_rem_count -= 1;
471                    let FilterResolved::AndNot(f_in, _) = f else {
472                        filter_error!("Invalid server state, a cand filter leaked to andnot set!");
473                        return Err(OperationError::InvalidState);
474                    };
475                    let (inter, fp) = self.filter2idl(f_in, thres)?;
476                    // It's an and not, so we need to wrap the plan accordingly.
477                    plan.push(FilterPlan::AndNot(Box::new(fp)));
478                    cand_idl = match (cand_idl, inter) {
479                        (IdList::Indexed(ia), IdList::Indexed(ib)) => {
480                            let r = ia.andnot(ib);
481                            /*
482                            // Don't trigger threshold on and nots if fully indexed.
483                            if r.below_threshold(thres) {
484                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
485                                return Ok(IdList::PartialThreshold(r));
486                            } else {
487                                IdList::Indexed(r)
488                            }
489                            */
490                            IdList::Indexed(r)
491                        }
492                        (IdList::Indexed(ia), IdList::Partial(ib))
493                        | (IdList::Partial(ia), IdList::Indexed(ib))
494                        | (IdList::Partial(ia), IdList::Partial(ib)) => {
495                            let r = ia.andnot(ib);
496                            // DO trigger threshold on partials, because we have to apply the filter
497                            // test anyway, so we may as well shortcut at this point.
498                            if r.below_threshold(thres) && f_rem_count > 0 {
499                                let setplan = FilterPlan::AndPartialThreshold(plan);
500                                return Ok((IdList::PartialThreshold(r), setplan));
501                            } else {
502                                IdList::Partial(r)
503                            }
504                        }
505                        (IdList::Indexed(ia), IdList::PartialThreshold(ib))
506                        | (IdList::PartialThreshold(ia), IdList::Indexed(ib))
507                        | (IdList::PartialThreshold(ia), IdList::PartialThreshold(ib))
508                        | (IdList::PartialThreshold(ia), IdList::Partial(ib))
509                        | (IdList::Partial(ia), IdList::PartialThreshold(ib)) => {
510                            let r = ia.andnot(ib);
511                            // DO trigger threshold on partials, because we have to apply the filter
512                            // test anyway, so we may as well shortcut at this point.
513                            if r.below_threshold(thres) && f_rem_count > 0 {
514                                let setplan = FilterPlan::AndPartialThreshold(plan);
515                                return Ok((IdList::PartialThreshold(r), setplan));
516                            } else {
517                                IdList::PartialThreshold(r)
518                            }
519                        }
520
521                        (IdList::Indexed(_), IdList::AllIds)
522                        | (IdList::AllIds, IdList::Indexed(_))
523                        | (IdList::Partial(_), IdList::AllIds)
524                        | (IdList::AllIds, IdList::Partial(_))
525                        | (IdList::PartialThreshold(_), IdList::AllIds)
526                        | (IdList::AllIds, IdList::PartialThreshold(_)) => {
527                            // We could actually generate allids here
528                            // and then try to reduce the and-not set, but
529                            // for now we just return all ids.
530                            IdList::AllIds
531                        }
532                        (IdList::AllIds, IdList::AllIds) => IdList::AllIds,
533                    };
534                }
535
536                // What state is the final cand idl in?
537                let setplan = match cand_idl {
538                    IdList::Indexed(_) => FilterPlan::AndIndexed(plan),
539                    IdList::Partial(_) | IdList::PartialThreshold(_) => {
540                        FilterPlan::AndPartial(plan)
541                    }
542                    IdList::AllIds => FilterPlan::AndUnindexed(plan),
543                };
544
545                // Finally, return the result.
546                // debug!("final cand set ==> {:?}", cand_idl);
547                (cand_idl, setplan)
548            } // end and
549            FilterResolved::Inclusion(l, _) => {
550                // For inclusion to be valid, every term must have *at least* one element present.
551                // This really relies on indexing, and so it's internal only - generally only
552                // for fully indexed existence queries, such as from refint.
553
554                // This has a lot in common with an And and Or but not really quite either.
555                let mut plan = Vec::with_capacity(0);
556                let mut result = IDLBitRange::new();
557                // For each filter in l
558                for f in l.iter() {
559                    // get their idls
560                    match self.filter2idl(f, thres)? {
561                        (IdList::Indexed(idl), fp) => {
562                            plan.push(fp);
563                            if idl.is_empty() {
564                                // It's empty, so something is missing. Bail fast.
565                                filter_trace!("Inclusion is unable to proceed - an empty (missing) item was found!");
566                                let setplan = FilterPlan::InclusionIndexed(plan);
567                                return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
568                            } else {
569                                result = result | idl;
570                            }
571                        }
572                        (_, fp) => {
573                            plan.push(fp);
574                            let setplan = FilterPlan::InclusionInvalid(plan);
575                            error!(
576                                ?setplan,
577                                "Inclusion is unable to proceed - all terms must be fully indexed!"
578                            );
579                            return Ok((IdList::Partial(IDLBitRange::new()), setplan));
580                        }
581                    }
582                } // end or.iter()
583                  // If we got here, every term must have been indexed
584                let setplan = FilterPlan::InclusionIndexed(plan);
585                (IdList::Indexed(result), setplan)
586            }
587            // So why does this return empty? Normally we actually process an AndNot in the context
588            // of an "AND" query, but if it's used anywhere else IE the root filter, then there is
589            // no other set to exclude - therefore it's empty set. Additionally, even in an OR query
590            // the AndNot will be skipped as an empty set for the same reason.
591            FilterResolved::AndNot(_f, _) => {
592                // get the idl for f
593                // now do andnot?
594                filter_error!("Requested a top level or isolated AndNot, returning empty");
595                (IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid)
596            }
597            FilterResolved::Invalid(_) => {
598                // Indexed since it is always false and we don't want to influence filter testing
599                (IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid)
600            }
601        })
602    }
603
604    fn filter2idl_sub(
605        &mut self,
606        attr: &Attribute,
607        sub_idx_key: String,
608    ) -> Result<(IdList, FilterPlan), OperationError> {
609        // Now given that idx_key, we will iterate over the possible graphemes.
610        let mut grapheme_iter = trigraph_iter(&sub_idx_key);
611
612        // Substrings are always partial because we have to split the keys up
613        // and we don't pay attention to starts/ends with conditions. We need
614        // the caller to check those conditions manually at run time. This lets
615        // the index focus on trigraph indexes only rather than needing to
616        // worry about those other bits. In a way substring indexes are "fuzzy".
617
618        let mut idl = match grapheme_iter.next() {
619            Some(idx_key) => {
620                match self
621                    .get_idlayer()
622                    .get_idl(attr, IndexType::SubString, idx_key)?
623                {
624                    Some(idl) => idl,
625                    None => return Ok((IdList::AllIds, FilterPlan::SubCorrupt(attr.clone()))),
626                }
627            }
628            None => {
629                // If there are no graphemes this means the attempt is for an empty string, so
630                // we return an empty result set.
631                return Ok((IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid));
632            }
633        };
634
635        if idl.len() > FILTER_SUBSTR_TEST_THRESHOLD {
636            for idx_key in grapheme_iter {
637                // Get the idl for this
638                match self
639                    .get_idlayer()
640                    .get_idl(attr, IndexType::SubString, idx_key)?
641                {
642                    Some(r_idl) => {
643                        // Do an *and* operation between what we found and our working idl.
644                        idl = r_idl & idl;
645                    }
646                    None => {
647                        // if something didn't match, then we simply bail out after zeroing the current IDL.
648                        idl = IDLBitRange::new();
649                    }
650                };
651
652                if idl.len() < FILTER_SUBSTR_TEST_THRESHOLD {
653                    break;
654                }
655            }
656        } else {
657            drop(grapheme_iter);
658        }
659
660        // We exhausted the grapheme iter, exit with what we found.
661        Ok((
662            IdList::Partial(idl),
663            FilterPlan::SubIndexed(attr.clone(), sub_idx_key),
664        ))
665    }
666
667    #[instrument(level = "debug", name = "be::search", skip_all)]
668    fn search(
669        &mut self,
670        erl: &Limits,
671        filt: &Filter<FilterValidResolved>,
672    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
673        // Unlike DS, even if we don't get the index back, we can just pass
674        // to the in-memory filter test and be done.
675
676        trace!(filter_optimised = ?filt);
677
678        let (idl, fplan) = trace_span!("be::search -> filter2idl")
679            .in_scope(|| self.filter2idl(filt.to_inner(), FILTER_SEARCH_TEST_THRESHOLD))?;
680
681        debug!(search_filter_executed_plan = %fplan);
682
683        match &idl {
684            IdList::AllIds => {
685                if !erl.unindexed_allow {
686                    admin_error!(
687                        "filter (search) is fully unindexed, and not allowed by resource limits"
688                    );
689                    return Err(OperationError::ResourceLimit);
690                }
691            }
692            IdList::Partial(idl_br) => {
693                // if idl_br.len() > erl.search_max_filter_test {
694                if !idl_br.below_threshold(erl.search_max_filter_test) {
695                    admin_error!("filter (search) is partial indexed and greater than search_max_filter_test allowed by resource limits");
696                    return Err(OperationError::ResourceLimit);
697                }
698            }
699            IdList::PartialThreshold(_) => {
700                // Since we opted for this, this is not the fault
701                // of the user and we should not penalise them by limiting on partial.
702            }
703            IdList::Indexed(idl_br) => {
704                // We know this is resolved here, so we can attempt the limit
705                // check. This has to fold the whole index, but you know, class=pres is
706                // indexed ...
707                // if idl_br.len() > erl.search_max_results {
708                if !idl_br.below_threshold(erl.search_max_results) {
709                    admin_error!("filter (search) is indexed and greater than search_max_results allowed by resource limits");
710                    return Err(OperationError::ResourceLimit);
711                }
712            }
713        };
714
715        let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
716            admin_error!(?e, "get_identry failed");
717            e
718        })?;
719
720        let mut entries_filtered = match idl {
721            IdList::AllIds => trace_span!("be::search<entry::ftest::allids>").in_scope(|| {
722                entries
723                    .into_iter()
724                    .filter(|e| e.entry_match_no_index(filt))
725                    .collect()
726            }),
727            IdList::Partial(_) => trace_span!("be::search<entry::ftest::partial>").in_scope(|| {
728                entries
729                    .into_iter()
730                    .filter(|e| e.entry_match_no_index(filt))
731                    .collect()
732            }),
733            IdList::PartialThreshold(_) => trace_span!("be::search<entry::ftest::thresh>")
734                .in_scope(|| {
735                    entries
736                        .into_iter()
737                        .filter(|e| e.entry_match_no_index(filt))
738                        .collect()
739                }),
740            // Since the index fully resolved, we can shortcut the filter test step here!
741            IdList::Indexed(_) => {
742                filter_trace!("filter (search) was fully indexed 👏");
743                entries
744            }
745        };
746
747        // If the idl was not indexed, apply the resource limit now. Avoid the needless match since the
748        // if statement is quick.
749        if entries_filtered.len() > erl.search_max_results {
750            admin_error!("filter (search) is resolved and greater than search_max_results allowed by resource limits");
751            return Err(OperationError::ResourceLimit);
752        }
753
754        // Trim any excess capacity if needed
755        entries_filtered.shrink_to_fit();
756
757        Ok(entries_filtered)
758    }
759
760    /// Given a filter, assert some condition exists.
761    /// Basically, this is a specialised case of search, where we don't need to
762    /// load any candidates if they match. This is heavily used in uuid
763    /// refint and attr uniqueness.
764    #[instrument(level = "debug", name = "be::exists", skip_all)]
765    fn exists(
766        &mut self,
767        erl: &Limits,
768        filt: &Filter<FilterValidResolved>,
769    ) -> Result<bool, OperationError> {
770        trace!(filter_optimised = ?filt);
771
772        // Using the indexes, resolve the IdList here, or AllIds.
773        // Also get if the filter was 100% resolved or not.
774        let (idl, fplan) = trace_span!("be::exists -> filter2idl")
775            .in_scope(|| self.filter2idl(filt.to_inner(), FILTER_EXISTS_TEST_THRESHOLD))?;
776
777        debug!(exist_filter_executed_plan = %fplan);
778
779        // Apply limits to the IdList.
780        match &idl {
781            IdList::AllIds => {
782                if !erl.unindexed_allow {
783                    admin_error!(
784                        "filter (exists) is fully unindexed, and not allowed by resource limits"
785                    );
786                    return Err(OperationError::ResourceLimit);
787                }
788            }
789            IdList::Partial(idl_br) => {
790                if !idl_br.below_threshold(erl.search_max_filter_test) {
791                    admin_error!("filter (exists) is partial indexed and greater than search_max_filter_test allowed by resource limits");
792                    return Err(OperationError::ResourceLimit);
793                }
794            }
795            IdList::PartialThreshold(_) => {
796                // Since we opted for this, this is not the fault
797                // of the user and we should not penalise them.
798            }
799            IdList::Indexed(_) => {}
800        }
801
802        // Now, check the idl -- if it's fully resolved, we can skip this because the query
803        // was fully indexed.
804        match &idl {
805            IdList::Indexed(idl) => Ok(!idl.is_empty()),
806            _ => {
807                let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
808                    admin_error!(?e, "get_identry failed");
809                    e
810                })?;
811
812                // if not 100% resolved query, apply the filter test.
813                let entries_filtered: Vec<_> =
814                    trace_span!("be::exists<entry::ftest>").in_scope(|| {
815                        entries
816                            .into_iter()
817                            .filter(|e| e.entry_match_no_index(filt))
818                            .collect()
819                    });
820
821                Ok(!entries_filtered.is_empty())
822            }
823        } // end match idl
824    }
825
826    fn retrieve_range(
827        &mut self,
828        ranges: &BTreeMap<Uuid, ReplCidRange>,
829    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
830        // First pass the ranges to the ruv to resolve to an absolute set of
831        // entry id's.
832
833        let idl = self.get_ruv().range_to_idl(ranges);
834        // Because of how this works, I think that it's not possible for the idl
835        // to have any missing ids.
836        //
837        // If it was possible, we could just & with allids to remove the extraneous
838        // values.
839
840        if idl.is_empty() {
841            // return no entries.
842            return Ok(Vec::with_capacity(0));
843        }
844
845        // Make it an id list fr the backend.
846        let id_list = IdList::Indexed(idl);
847
848        self.get_idlayer().get_identry(&id_list).map_err(|e| {
849            admin_error!(?e, "get_identry failed");
850            e
851        })
852    }
853
854    fn verify(&mut self) -> Vec<Result<(), ConsistencyError>> {
855        self.get_idlayer().verify()
856    }
857
858    fn verify_entry_index(&mut self, e: &EntrySealedCommitted) -> Result<(), ConsistencyError> {
859        // First, check our references in name2uuid, uuid2spn and uuid2rdn
860        if e.mask_recycled_ts().is_some() {
861            let e_uuid = e.get_uuid();
862            // We only check these on live entries.
863            let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(None, Some(e));
864
865            let (Some(n2u_set), None) = (n2u_add, n2u_rem) else {
866                admin_error!("Invalid idx_name2uuid_diff state");
867                return Err(ConsistencyError::BackendIndexSync);
868            };
869
870            // If the set.len > 1, check each item.
871            n2u_set
872                .iter()
873                .try_for_each(|name| match self.get_idlayer().name2uuid(name) {
874                    Ok(Some(idx_uuid)) => {
875                        if idx_uuid == e_uuid {
876                            Ok(())
877                        } else {
878                            admin_error!("Invalid name2uuid state -> incorrect uuid association");
879                            Err(ConsistencyError::BackendIndexSync)
880                        }
881                    }
882                    r => {
883                        admin_error!(state = ?r, "Invalid name2uuid state");
884                        Err(ConsistencyError::BackendIndexSync)
885                    }
886                })?;
887
888            let spn = e.get_uuid2spn();
889            match self.get_idlayer().uuid2spn(e_uuid) {
890                Ok(Some(idx_spn)) => {
891                    if spn != idx_spn {
892                        admin_error!("Invalid uuid2spn state -> incorrect idx spn value");
893                        return Err(ConsistencyError::BackendIndexSync);
894                    }
895                }
896                r => {
897                    admin_error!(state = ?r, ?e_uuid, "Invalid uuid2spn state");
898                    trace!(entry = ?e);
899                    return Err(ConsistencyError::BackendIndexSync);
900                }
901            };
902
903            let rdn = e.get_uuid2rdn();
904            match self.get_idlayer().uuid2rdn(e_uuid) {
905                Ok(Some(idx_rdn)) => {
906                    if rdn != idx_rdn {
907                        admin_error!("Invalid uuid2rdn state -> incorrect idx rdn value");
908                        return Err(ConsistencyError::BackendIndexSync);
909                    }
910                }
911                r => {
912                    admin_error!(state = ?r, "Invalid uuid2rdn state");
913                    return Err(ConsistencyError::BackendIndexSync);
914                }
915            };
916        }
917
918        // Check the other entry:attr indexes are valid
919        //
920        // This is actually pretty hard to check, because we can check a value *should*
921        // exist, but not that a value should NOT be present in the index. Thought needed ...
922
923        // Got here? Ok!
924        Ok(())
925    }
926
927    fn verify_indexes(&mut self) -> Vec<Result<(), ConsistencyError>> {
928        let idl = IdList::AllIds;
929        let entries = match self.get_idlayer().get_identry(&idl) {
930            Ok(s) => s,
931            Err(e) => {
932                admin_error!(?e, "get_identry failure");
933                return vec![Err(ConsistencyError::Unknown)];
934            }
935        };
936
937        let r = entries.iter().try_for_each(|e| self.verify_entry_index(e));
938
939        if r.is_err() {
940            vec![r]
941        } else {
942            Vec::with_capacity(0)
943        }
944    }
945
946    fn verify_ruv(&mut self, results: &mut Vec<Result<(), ConsistencyError>>) {
947        // The way we verify this is building a whole second RUV and then comparing it.
948        let idl = IdList::AllIds;
949        let entries = match self.get_idlayer().get_identry(&idl) {
950            Ok(ent) => ent,
951            Err(e) => {
952                results.push(Err(ConsistencyError::Unknown));
953                admin_error!(?e, "get_identry failed");
954                return;
955            }
956        };
957
958        self.get_ruv().verify(&entries, results);
959    }
960
961    fn backup<OUT>(
962        &mut self,
963        mut output: OUT,
964        compression: BackupCompression,
965    ) -> Result<(), OperationError>
966    where
967        OUT: std::io::Write,
968    {
969        let repl_meta = self.get_ruv().to_db_backup_ruv();
970
971        // load all entries into RAM, may need to change this later
972        // if the size of the database compared to RAM is an issue
973        let idl = IdList::AllIds;
974        let idlayer = self.get_idlayer();
975        let raw_entries: Vec<IdRawEntry> = idlayer.get_identry_raw(&idl)?;
976
977        let entries: Result<Vec<DbEntry>, _> = raw_entries
978            .iter()
979            .map(|id_ent| {
980                serde_json::from_slice(id_ent.data.as_slice())
981                    .map_err(|_| OperationError::SerdeJsonError) // log?
982            })
983            .collect();
984
985        let entries = entries?;
986
987        let db_s_uuid = idlayer
988            .get_db_s_uuid()
989            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
990        let db_d_uuid = idlayer
991            .get_db_d_uuid()
992            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
993        let db_ts_max = idlayer
994            .get_db_ts_max()
995            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
996
997        let keyhandles = idlayer.get_key_handles()?;
998
999        let bak = DbBackup::V5 {
1000            // remember env is evaled at compile time.
1001            version: env!("KANIDM_PKG_SERIES").to_string(),
1002            db_s_uuid,
1003            db_d_uuid,
1004            db_ts_max,
1005            keyhandles,
1006            repl_meta,
1007            entries,
1008        };
1009
1010        let serialized_entries_str = serde_json::to_string(&bak).map_err(|e| {
1011            admin_error!(?e, "serde error");
1012            OperationError::SerdeJsonError
1013        })?;
1014
1015        match compression {
1016            BackupCompression::NoCompression => {
1017                output
1018                    .write(serialized_entries_str.as_bytes())
1019                    .map_err(|e| {
1020                        error!(?e, "fs::write error");
1021                        OperationError::FsError
1022                    })?;
1023            }
1024            BackupCompression::Gzip => {
1025                let mut encoder = GzEncoder::new(&mut output, Compression::best());
1026                encoder
1027                    .write_all(serialized_entries_str.as_bytes())
1028                    .map_err(|e| {
1029                        error!(?e, "Gzip compression error writing backup");
1030                        OperationError::FsError
1031                    })?;
1032            }
1033        }
1034
1035        output.flush().map_err(|err| {
1036            error!(?err, "Unable to flush backup output stream");
1037            OperationError::FsError
1038        })?;
1039
1040        Ok(())
1041    }
1042
1043    fn name2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
1044        self.get_idlayer().name2uuid(name)
1045    }
1046
1047    fn externalid2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
1048        self.get_idlayer().externalid2uuid(name)
1049    }
1050
1051    fn uuid2spn(&mut self, uuid: Uuid) -> Result<Option<Value>, OperationError> {
1052        self.get_idlayer().uuid2spn(uuid)
1053    }
1054
1055    fn uuid2rdn(&mut self, uuid: Uuid) -> Result<Option<String>, OperationError> {
1056        self.get_idlayer().uuid2rdn(uuid)
1057    }
1058}
1059
1060impl<'a> BackendTransaction for BackendReadTransaction<'a> {
1061    type IdlLayerType = IdlArcSqliteReadTransaction<'a>;
1062    type RuvType = ReplicationUpdateVectorReadTransaction<'a>;
1063
1064    fn get_idlayer(&mut self) -> &mut IdlArcSqliteReadTransaction<'a> {
1065        &mut self.idlayer
1066    }
1067
1068    fn get_ruv(&mut self) -> &mut ReplicationUpdateVectorReadTransaction<'a> {
1069        &mut self.ruv
1070    }
1071
1072    fn get_idxmeta_ref(&self) -> &IdxMeta {
1073        &self.idxmeta
1074    }
1075}
1076
1077impl BackendReadTransaction<'_> {
1078    pub fn list_indexes(&mut self) -> Result<Vec<String>, OperationError> {
1079        self.get_idlayer().list_idxs()
1080    }
1081
1082    pub fn list_id2entry(&mut self) -> Result<Vec<(u64, String)>, OperationError> {
1083        self.get_idlayer().list_id2entry()
1084    }
1085
1086    pub fn list_index_content(
1087        &mut self,
1088        index_name: &str,
1089    ) -> Result<Vec<(String, IDLBitRange)>, OperationError> {
1090        self.get_idlayer().list_index_content(index_name)
1091    }
1092
1093    pub fn get_id2entry(&mut self, id: u64) -> Result<(u64, String), OperationError> {
1094        self.get_idlayer().get_id2entry(id)
1095    }
1096
1097    pub fn list_quarantined(&mut self) -> Result<Vec<(u64, String)>, OperationError> {
1098        self.get_idlayer().list_quarantined()
1099    }
1100}
1101
1102impl<'a> BackendTransaction for BackendWriteTransaction<'a> {
1103    type IdlLayerType = IdlArcSqliteWriteTransaction<'a>;
1104    type RuvType = ReplicationUpdateVectorWriteTransaction<'a>;
1105
1106    fn get_idlayer(&mut self) -> &mut IdlArcSqliteWriteTransaction<'a> {
1107        &mut self.idlayer
1108    }
1109
1110    fn get_ruv(&mut self) -> &mut ReplicationUpdateVectorWriteTransaction<'a> {
1111        &mut self.ruv
1112    }
1113
1114    fn get_idxmeta_ref(&self) -> &IdxMeta {
1115        &self.idxmeta_wr
1116    }
1117}
1118
1119impl<'a> BackendWriteTransaction<'a> {
1120    pub(crate) fn get_ruv_write(&mut self) -> &mut ReplicationUpdateVectorWriteTransaction<'a> {
1121        &mut self.ruv
1122    }
1123
1124    #[instrument(level = "debug", name = "be::create", skip_all)]
1125    pub fn create(
1126        &mut self,
1127        cid: &Cid,
1128        entries: Vec<EntrySealedNew>,
1129    ) -> Result<Vec<EntrySealedCommitted>, OperationError> {
1130        if entries.is_empty() {
1131            admin_error!("No entries provided to BE to create, invalid server call!");
1132            return Err(OperationError::EmptyRequest);
1133        }
1134
1135        // Check that every entry has a change associated
1136        // that matches the cid?
1137        entries.iter().try_for_each(|e| {
1138            if e.get_changestate().contains_tail_cid(cid) {
1139                Ok(())
1140            } else {
1141                admin_error!(
1142                    "Entry changelog does not contain a change related to this transaction"
1143                );
1144                Err(OperationError::ReplEntryNotChanged)
1145            }
1146        })?;
1147
1148        // Now, assign id's to all the new entries.
1149
1150        let mut id_max = self.idlayer.get_id2entry_max_id()?;
1151        let c_entries: Vec<_> = entries
1152            .into_iter()
1153            .map(|e| {
1154                id_max += 1;
1155                e.into_sealed_committed_id(id_max)
1156            })
1157            .collect();
1158
1159        // All good, lets update the RUV.
1160        // This auto compresses.
1161        let ruv_idl = IDLBitRange::from_iter(c_entries.iter().map(|e| e.get_id()));
1162
1163        // We don't need to skip this like in mod since creates always go to the ruv
1164        self.get_ruv().insert_change(cid, ruv_idl)?;
1165
1166        self.idlayer.write_identries(c_entries.iter())?;
1167
1168        self.idlayer.set_id2entry_max_id(id_max);
1169
1170        // Now update the indexes as required.
1171        for e in c_entries.iter() {
1172            self.entry_index(None, Some(e))?
1173        }
1174
1175        Ok(c_entries)
1176    }
1177
1178    #[instrument(level = "debug", name = "be::create", skip_all)]
1179    /// This is similar to create, but used in the replication path as it records all
1180    /// the CID's in the entry to the RUV, but without applying the current CID as
1181    /// a new value in the RUV. We *do not* want to apply the current CID in the RUV
1182    /// related to this entry as that could cause an infinite replication loop!
1183    pub fn refresh(
1184        &mut self,
1185        entries: Vec<EntrySealedNew>,
1186    ) -> Result<Vec<EntrySealedCommitted>, OperationError> {
1187        if entries.is_empty() {
1188            admin_error!("No entries provided to BE to create, invalid server call!");
1189            return Err(OperationError::EmptyRequest);
1190        }
1191
1192        // Assign id's to all the new entries.
1193        let mut id_max = self.idlayer.get_id2entry_max_id()?;
1194        let c_entries: Vec<_> = entries
1195            .into_iter()
1196            .map(|e| {
1197                id_max += 1;
1198                e.into_sealed_committed_id(id_max)
1199            })
1200            .collect();
1201
1202        self.idlayer.write_identries(c_entries.iter())?;
1203
1204        self.idlayer.set_id2entry_max_id(id_max);
1205
1206        // Update the RUV with all the changestates of the affected entries.
1207        for e in c_entries.iter() {
1208            self.get_ruv().update_entry_changestate(e)?;
1209        }
1210
1211        // Now update the indexes as required.
1212        for e in c_entries.iter() {
1213            self.entry_index(None, Some(e))?
1214        }
1215
1216        Ok(c_entries)
1217    }
1218
1219    #[instrument(level = "debug", name = "be::modify", skip_all)]
1220    pub fn modify(
1221        &mut self,
1222        cid: &Cid,
1223        pre_entries: &[Arc<EntrySealedCommitted>],
1224        post_entries: &[EntrySealedCommitted],
1225    ) -> Result<(), OperationError> {
1226        if post_entries.is_empty() || pre_entries.is_empty() {
1227            admin_error!("No entries provided to BE to modify, invalid server call!");
1228            return Err(OperationError::EmptyRequest);
1229        }
1230
1231        assert_eq!(post_entries.len(), pre_entries.len());
1232
1233        let post_entries_iter = post_entries.iter().filter(|e| {
1234            trace!(?cid);
1235            trace!(changestate = ?e.get_changestate());
1236            // If True - This means that at least one attribute that *is* replicated was changed
1237            // on this entry, so we need to update and add this to the RUV!
1238            //
1239            // If False - This means that the entry in question was updated but the changes are all
1240            // non-replicated so we DO NOT update the RUV here!
1241            e.get_changestate().contains_tail_cid(cid)
1242        });
1243
1244        // All good, lets update the RUV.
1245        // This auto compresses.
1246        let ruv_idl = IDLBitRange::from_iter(post_entries_iter.map(|e| e.get_id()));
1247
1248        if !ruv_idl.is_empty() {
1249            self.get_ruv().insert_change(cid, ruv_idl)?;
1250        }
1251
1252        // Now, given the list of id's, update them
1253        self.get_idlayer().write_identries(post_entries.iter())?;
1254
1255        // Finally, we now reindex all the changed entries. We do this by iterating and zipping
1256        // over the set, because we know the list is in the same order.
1257        pre_entries
1258            .iter()
1259            .zip(post_entries.iter())
1260            .try_for_each(|(pre, post)| self.entry_index(Some(pre.as_ref()), Some(post)))
1261    }
1262
1263    #[instrument(level = "debug", name = "be::incremental_prepare", skip_all)]
1264    pub fn incremental_prepare(
1265        &mut self,
1266        entry_meta: &[EntryIncrementalNew],
1267    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
1268        let mut ret_entries = Vec::with_capacity(entry_meta.len());
1269        let id_max_pre = self.idlayer.get_id2entry_max_id()?;
1270        let mut id_max = id_max_pre;
1271
1272        for ctx_ent in entry_meta.iter() {
1273            let ctx_ent_uuid = ctx_ent.get_uuid();
1274            let idx_key = ctx_ent_uuid.as_hyphenated().to_string();
1275
1276            let idl =
1277                self.get_idlayer()
1278                    .get_idl(&Attribute::Uuid, IndexType::Equality, &idx_key)?;
1279
1280            let entry = match idl {
1281                Some(idl) if idl.is_empty() => {
1282                    // Create the stub entry, we just need it to have an id number
1283                    // allocated.
1284                    id_max += 1;
1285
1286                    let stub_entry = Arc::new(EntrySealedCommitted::stub_sealed_committed_id(
1287                        id_max, ctx_ent,
1288                    ));
1289                    // Now, the stub entry needs to be indexed. If not, uuid2spn
1290                    // isn't created, so subsequent index diffs don't work correctly.
1291                    self.entry_index(None, Some(stub_entry.as_ref()))?;
1292
1293                    // Okay, entry ready to go.
1294                    stub_entry
1295                }
1296                Some(idl) if idl.len() == 1 => {
1297                    // Get the entry from this idl.
1298                    let mut entries = self
1299                        .get_idlayer()
1300                        .get_identry(&IdList::Indexed(idl))
1301                        .map_err(|e| {
1302                            admin_error!(?e, "get_identry failed");
1303                            e
1304                        })?;
1305
1306                    if let Some(entry) = entries.pop() {
1307                        // Return it.
1308                        entry
1309                    } else {
1310                        error!("Invalid entry state, index was unable to locate entry");
1311                        return Err(OperationError::InvalidDbState);
1312                    }
1313                    // Done, entry is ready to go
1314                }
1315                Some(idl) => {
1316                    // BUG - duplicate uuid!
1317                    error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must have only a single or no values. Contains {:?}", idl);
1318                    return Err(OperationError::InvalidDbState);
1319                }
1320                None => {
1321                    // BUG - corrupt index.
1322                    error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must be present");
1323                    return Err(OperationError::InvalidDbState);
1324                }
1325            };
1326
1327            ret_entries.push(entry);
1328        }
1329
1330        if id_max != id_max_pre {
1331            self.idlayer.set_id2entry_max_id(id_max);
1332        }
1333
1334        Ok(ret_entries)
1335    }
1336
1337    #[instrument(level = "debug", name = "be::incremental_apply", skip_all)]
1338    pub fn incremental_apply(
1339        &mut self,
1340        update_entries: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
1341        create_entries: Vec<EntrySealedNew>,
1342    ) -> Result<(), OperationError> {
1343        // For the values in create_cands, create these with similar code to the refresh
1344        // path.
1345        if !create_entries.is_empty() {
1346            // Assign id's to all the new entries.
1347            let mut id_max = self.idlayer.get_id2entry_max_id()?;
1348            let c_entries: Vec<_> = create_entries
1349                .into_iter()
1350                .map(|e| {
1351                    id_max += 1;
1352                    e.into_sealed_committed_id(id_max)
1353                })
1354                .collect();
1355
1356            self.idlayer.write_identries(c_entries.iter())?;
1357
1358            self.idlayer.set_id2entry_max_id(id_max);
1359
1360            // Update the RUV with all the changestates of the affected entries.
1361            for e in c_entries.iter() {
1362                self.get_ruv().update_entry_changestate(e)?;
1363            }
1364
1365            // Now update the indexes as required.
1366            for e in c_entries.iter() {
1367                self.entry_index(None, Some(e))?
1368            }
1369        }
1370
1371        // Otherwise this is a cid-less copy of modify.
1372        if !update_entries.is_empty() {
1373            self.get_idlayer()
1374                .write_identries(update_entries.iter().map(|(up, _)| up))?;
1375
1376            for (e, _) in update_entries.iter() {
1377                self.get_ruv().update_entry_changestate(e)?;
1378            }
1379
1380            for (post, pre) in update_entries.iter() {
1381                self.entry_index(Some(pre.as_ref()), Some(post))?
1382            }
1383        }
1384
1385        Ok(())
1386    }
1387
1388    #[instrument(level = "debug", name = "be::reap_tombstones", skip_all)]
1389    pub fn reap_tombstones(&mut self, cid: &Cid, trim_cid: &Cid) -> Result<usize, OperationError> {
1390        debug_assert!(cid > trim_cid);
1391        // Mark a new maximum for the RUV by inserting an empty change. This
1392        // is important to keep the changestate always advancing.
1393        self.get_ruv().insert_change(cid, IDLBitRange::default())?;
1394
1395        // We plan to clear the RUV up to this cid. So we need to build an IDL
1396        // of all the entries we need to examine.
1397        let idl = self.get_ruv().trim_up_to(trim_cid).map_err(|e| {
1398            admin_error!(
1399                ?e,
1400                "During tombstone cleanup, failed to trim RUV to {:?}",
1401                trim_cid
1402            );
1403            e
1404        })?;
1405
1406        let entries = self
1407            .get_idlayer()
1408            .get_identry(&IdList::Indexed(idl))
1409            .map_err(|e| {
1410                admin_error!(?e, "get_identry failed");
1411                e
1412            })?;
1413
1414        if entries.is_empty() {
1415            admin_debug!("No entries affected - reap_tombstones operation success");
1416            return Ok(0);
1417        }
1418
1419        // Now that we have a list of entries we need to partition them into
1420        // two sets. The entries that are tombstoned and ready to reap_tombstones, and
1421        // the entries that need to have their change logs trimmed.
1422        //
1423        // Remember, these tombstones can be reaped because they were tombstoned at time
1424        // point 'cid', and since we are now "past" that minimum cid, then other servers
1425        // will also be trimming these out.
1426        //
1427        // Note unlike a changelog impl, we don't need to trim changestates here. We
1428        // only need the RUV trimmed so that we know if other servers are laggin behind!
1429
1430        // What entries are tombstones and ready to be deleted?
1431
1432        let (tombstones, leftover): (Vec<_>, Vec<_>) = entries
1433            .into_iter()
1434            .partition(|e| e.get_changestate().can_delete(trim_cid));
1435
1436        let ruv_idls = self.get_ruv().ruv_idls();
1437
1438        // Assert that anything leftover still either is *alive* OR is a tombstone
1439        // and has entries in the RUV!
1440
1441        if !leftover
1442            .iter()
1443            .all(|e| e.get_changestate().is_live() || ruv_idls.contains(e.get_id()))
1444        {
1445            admin_error!("Left over entries may be orphaned due to missing RUV entries");
1446            return Err(OperationError::ReplInvalidRUVState);
1447        }
1448
1449        // Now setup to reap_tombstones the tombstones. Remember, in the post cleanup, it's could
1450        // now have been trimmed to a point we can purge them!
1451
1452        // Assert the id's exist on the entry.
1453        let id_list: IDLBitRange = tombstones.iter().map(|e| e.get_id()).collect();
1454
1455        // Ensure nothing here exists in the RUV index, else it means
1456        // we didn't trim properly, or some other state violation has occurred.
1457        if !((&ruv_idls & &id_list).is_empty()) {
1458            admin_error!("RUV still contains entries that are going to be removed.");
1459            return Err(OperationError::ReplInvalidRUVState);
1460        }
1461
1462        // Now, given the list of id's, reap_tombstones them.
1463        let sz = id_list.len();
1464        self.get_idlayer().delete_identry(id_list.into_iter())?;
1465
1466        // Finally, purge the indexes from the entries we removed. These still have
1467        // indexes due to class=tombstone.
1468        tombstones
1469            .iter()
1470            .try_for_each(|e| self.entry_index(Some(e), None))?;
1471
1472        Ok(sz)
1473    }
1474
1475    #[instrument(level = "debug", name = "be::update_idxmeta", skip_all)]
1476    pub fn update_idxmeta(&mut self, idxkeys: Vec<IdxKey>) -> Result<(), OperationError> {
1477        if self.is_idx_slopeyness_generated()? {
1478            trace!("Indexing slopes available");
1479        } else {
1480            warn!("No indexing slopes available. You should consider reindexing to generate these");
1481        };
1482
1483        // TODO: I think anytime we update idx meta is when we should reindex in memory
1484        // indexes.
1485        // Probably needs to be similar to create_idxs so we iterate over the set of
1486        // purely in memory idxs.
1487
1488        // Setup idxkeys here. By default we set these all to "max slope" aka
1489        // all indexes are "equal" but also worse case unless analysed. If they
1490        // have been analysed, we can set the slope factor into here.
1491        let mut idxkeys = idxkeys
1492            .into_iter()
1493            .map(|k| self.get_idx_slope(&k).map(|slope| (k, slope)))
1494            .collect::<Result<Map<_, _>, _>>()?;
1495
1496        std::mem::swap(&mut self.idxmeta_wr.deref_mut().idxkeys, &mut idxkeys);
1497        Ok(())
1498    }
1499
1500    // Should take a mut index set, and then we write the whole thing back
1501    // in a single stripe.
1502    //
1503    // So we need a cache, which we load indexes into as we do ops, then we
1504    // modify them.
1505    //
1506    // At the end, we flush those cchange outs in a single run.
1507    // For create this is probably a
1508    // TODO: Can this be improved?
1509    #[allow(clippy::cognitive_complexity)]
1510    fn entry_index(
1511        &mut self,
1512        pre: Option<&EntrySealedCommitted>,
1513        post: Option<&EntrySealedCommitted>,
1514    ) -> Result<(), OperationError> {
1515        let (e_uuid, e_id, uuid_same) = match (pre, post) {
1516            (None, None) => {
1517                admin_error!("Invalid call to entry_index - no entries provided");
1518                return Err(OperationError::InvalidState);
1519            }
1520            (Some(pre), None) => {
1521                trace!("Attempting to remove entry indexes");
1522                (pre.get_uuid(), pre.get_id(), true)
1523            }
1524            (None, Some(post)) => {
1525                trace!("Attempting to create entry indexes");
1526                (post.get_uuid(), post.get_id(), true)
1527            }
1528            (Some(pre), Some(post)) => {
1529                trace!("Attempting to modify entry indexes");
1530                assert_eq!(pre.get_id(), post.get_id());
1531                (
1532                    post.get_uuid(),
1533                    post.get_id(),
1534                    pre.get_uuid() == post.get_uuid(),
1535                )
1536            }
1537        };
1538
1539        // Update the names/uuid maps. These have to mask out entries
1540        // that are recycled or tombstones, so these pretend as "deleted"
1541        // and can trigger correct actions.
1542
1543        let mask_pre = pre.and_then(|e| e.mask_recycled_ts());
1544        let mask_pre = if !uuid_same {
1545            // Okay, so if the uuids are different this is probably from
1546            // a replication conflict.  We can't just use the normal none/some
1547            // check from the Entry::idx functions as they only yield partial
1548            // changes. Because the uuid is changing, we have to treat pre
1549            // as a deleting entry, regardless of what state post is in.
1550            let uuid = mask_pre.map(|e| e.get_uuid()).ok_or_else(|| {
1551                admin_error!("Invalid entry state - possible memory corruption");
1552                OperationError::InvalidState
1553            })?;
1554
1555            let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(mask_pre, None);
1556            // There will never be content to add.
1557            assert!(n2u_add.is_none());
1558
1559            let (eid2u_add, eid2u_rem) = Entry::idx_externalid2uuid_diff(mask_pre, None);
1560            // There will never be content to add.
1561            assert!(eid2u_add.is_none());
1562
1563            let u2s_act = Entry::idx_uuid2spn_diff(mask_pre, None);
1564            let u2r_act = Entry::idx_uuid2rdn_diff(mask_pre, None);
1565
1566            trace!(?n2u_rem, ?eid2u_rem, ?u2s_act, ?u2r_act,);
1567
1568            // Write the changes out to the backend
1569            if let Some(rem) = n2u_rem {
1570                self.idlayer.write_name2uuid_rem(rem)?
1571            }
1572
1573            if let Some(rem) = eid2u_rem {
1574                self.idlayer.write_externalid2uuid_rem(rem)?
1575            }
1576
1577            match u2s_act {
1578                None => {}
1579                Some(Ok(k)) => self.idlayer.write_uuid2spn(uuid, Some(k))?,
1580                Some(Err(_)) => self.idlayer.write_uuid2spn(uuid, None)?,
1581            }
1582
1583            match u2r_act {
1584                None => {}
1585                Some(Ok(k)) => self.idlayer.write_uuid2rdn(uuid, Some(k))?,
1586                Some(Err(_)) => self.idlayer.write_uuid2rdn(uuid, None)?,
1587            }
1588            // Return none, mask_pre is now completed.
1589            None
1590        } else {
1591            // Return the state.
1592            mask_pre
1593        };
1594
1595        let mask_post = post.and_then(|e| e.mask_recycled_ts());
1596        let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(mask_pre, mask_post);
1597        let (eid2u_add, eid2u_rem) = Entry::idx_externalid2uuid_diff(mask_pre, mask_post);
1598
1599        let u2s_act = Entry::idx_uuid2spn_diff(mask_pre, mask_post);
1600        let u2r_act = Entry::idx_uuid2rdn_diff(mask_pre, mask_post);
1601
1602        trace!(
1603            ?n2u_add,
1604            ?n2u_rem,
1605            ?eid2u_add,
1606            ?eid2u_rem,
1607            ?u2s_act,
1608            ?u2r_act
1609        );
1610
1611        // Write the changes out to the backend
1612        if let Some(add) = n2u_add {
1613            self.idlayer.write_name2uuid_add(e_uuid, add)?
1614        }
1615        if let Some(rem) = n2u_rem {
1616            self.idlayer.write_name2uuid_rem(rem)?
1617        }
1618
1619        if let Some(add) = eid2u_add {
1620            self.idlayer.write_externalid2uuid_add(e_uuid, add)?
1621        }
1622        if let Some(rem) = eid2u_rem {
1623            self.idlayer.write_externalid2uuid_rem(rem)?
1624        }
1625
1626        match u2s_act {
1627            None => {}
1628            Some(Ok(k)) => self.idlayer.write_uuid2spn(e_uuid, Some(k))?,
1629            Some(Err(_)) => self.idlayer.write_uuid2spn(e_uuid, None)?,
1630        }
1631
1632        match u2r_act {
1633            None => {}
1634            Some(Ok(k)) => self.idlayer.write_uuid2rdn(e_uuid, Some(k))?,
1635            Some(Err(_)) => self.idlayer.write_uuid2rdn(e_uuid, None)?,
1636        }
1637
1638        // Extremely Cursed - Okay, we know that self.idxmeta will NOT be changed
1639        // in this function, but we need to borrow self as mut for the caches in
1640        // get_idl to work. As a result, this causes a double borrow. To work around
1641        // this we discard the lifetime on idxmeta, because we know that it will
1642        // remain constant for the life of the operation.
1643
1644        let idxmeta = unsafe { &(*(&self.idxmeta_wr.idxkeys as *const _)) };
1645
1646        let idx_diff = Entry::idx_diff(idxmeta, pre, post);
1647
1648        idx_diff.into_iter()
1649            .try_for_each(|act| {
1650                match act {
1651                    Ok((attr, itype, idx_key)) => {
1652                        trace!("Adding {:?} idx -> {:?}: {:?}", itype, attr, idx_key);
1653                        match self.idlayer.get_idl(attr, itype, &idx_key)? {
1654                            Some(mut idl) => {
1655                                idl.insert_id(e_id);
1656                                if cfg!(debug_assertions)
1657                                    && *attr == Attribute::Uuid && itype == IndexType::Equality {
1658                                        // This means a duplicate UUID has appeared in the index.
1659                                        if idl.len() > 1 {
1660                                            trace!(duplicate_idl = ?idl, ?idx_key);
1661                                        }
1662                                        debug_assert!(idl.len() <= 1);
1663                                }
1664                                self.idlayer.write_idl(attr, itype, &idx_key, &idl)
1665                            }
1666                            None => {
1667                                warn!(
1668                                    "WARNING: index {:?} {:?} was not found. YOU MUST REINDEX YOUR DATABASE",
1669                                    attr, itype
1670                                );
1671                                Ok(())
1672                            }
1673                        }
1674                    }
1675                    Err((attr, itype, idx_key)) => {
1676                        trace!("Removing {:?} idx -> {:?}: {:?}", itype, attr, idx_key);
1677                        match self.idlayer.get_idl(attr, itype, &idx_key)? {
1678                            Some(mut idl) => {
1679                                idl.remove_id(e_id);
1680                                if cfg!(debug_assertions) && *attr == Attribute::Uuid && itype == IndexType::Equality {
1681                                        // This means a duplicate UUID has appeared in the index.
1682                                        if idl.len() > 1 {
1683                                            trace!(duplicate_idl = ?idl, ?idx_key);
1684                                        }
1685                                        debug_assert!(idl.len() <= 1);
1686                                }
1687                                self.idlayer.write_idl(attr, itype, &idx_key, &idl)
1688                            }
1689                            None => {
1690                                warn!(
1691                                    "WARNING: index {:?} {:?} was not found. YOU MUST REINDEX YOUR DATABASE",
1692                                    attr, itype
1693                                );
1694                                Ok(())
1695                            }
1696                        }
1697                    }
1698                }
1699            })
1700        // End try_for_each
1701    }
1702
1703    #[allow(dead_code)]
1704    fn missing_idxs(&mut self) -> Result<Vec<(Attribute, IndexType)>, OperationError> {
1705        let idx_table_list = self.get_idlayer().list_idxs()?;
1706
1707        // Turn the vec to a real set
1708        let idx_table_set: HashSet<_> = idx_table_list.into_iter().collect();
1709
1710        let missing: Vec<_> = self
1711            .idxmeta_wr
1712            .idxkeys
1713            .keys()
1714            .filter_map(|ikey| {
1715                // what would the table name be?
1716                let tname = format!("idx_{}_{}", ikey.itype.as_idx_str(), ikey.attr.as_str());
1717                trace!("Checking for {}", tname);
1718
1719                if idx_table_set.contains(&tname) {
1720                    None
1721                } else {
1722                    Some((ikey.attr.clone(), ikey.itype))
1723                }
1724            })
1725            .collect();
1726        Ok(missing)
1727    }
1728
1729    fn create_idxs(&mut self) -> Result<(), OperationError> {
1730        // Create name2uuid and uuid2name
1731        trace!("Creating index -> name2uuid");
1732        self.idlayer.create_name2uuid()?;
1733
1734        trace!("Creating index -> externalid2uuid");
1735        self.idlayer.create_externalid2uuid()?;
1736
1737        trace!("Creating index -> uuid2spn");
1738        self.idlayer.create_uuid2spn()?;
1739
1740        trace!("Creating index -> uuid2rdn");
1741        self.idlayer.create_uuid2rdn()?;
1742
1743        self.idxmeta_wr
1744            .idxkeys
1745            .keys()
1746            .try_for_each(|ikey| self.idlayer.create_idx(&ikey.attr, ikey.itype))
1747    }
1748
1749    pub fn upgrade_reindex(&mut self, v: i64) -> Result<(), OperationError> {
1750        let dbv = self.get_db_index_version()?;
1751        admin_debug!(?dbv, ?v, "upgrade_reindex");
1752        if dbv < v {
1753            self.reindex(false)?;
1754            self.set_db_index_version(v)
1755        } else {
1756            Ok(())
1757        }
1758    }
1759
1760    #[instrument(level = "info", skip_all)]
1761    pub fn reindex(&mut self, immediate: bool) -> Result<(), OperationError> {
1762        let notice_immediate = immediate || (cfg!(not(test)) && cfg!(not(debug_assertions)));
1763
1764        info!(
1765            immediate = notice_immediate,
1766            "System reindex: started - this may take a long time!"
1767        );
1768
1769        // Purge the idxs
1770        // TODO: Purge in memory idxs.
1771        self.idlayer.danger_purge_idxs()?;
1772
1773        // Using the index metadata on the txn, create all our idx tables
1774        // TODO: Needs to create all the in memory indexes.
1775        self.create_idxs()?;
1776
1777        // Now, we need to iterate over everything in id2entry and index them
1778        // Future idea: Do this in batches of X amount to limit memory
1779        // consumption.
1780        let idl = IdList::AllIds;
1781        let entries = self.idlayer.get_identry(&idl).inspect_err(|err| {
1782            error!(?err, "get_identry failure");
1783        })?;
1784
1785        let mut count = 0;
1786
1787        // This is the longest phase of reindexing, so we have a "progress" display here.
1788        entries
1789            .iter()
1790            .try_for_each(|e| {
1791                if immediate {
1792                    count += 1;
1793                    if count % 2500 == 0 {
1794                        eprint!("{count}");
1795                    } else if count % 250 == 0 {
1796                        eprint!(".");
1797                    }
1798                }
1799
1800                self.entry_index(None, Some(e))
1801            })
1802            .inspect_err(|err| {
1803                error!(?err, "reindex failed");
1804            })?;
1805
1806        if immediate {
1807            eprintln!(" done ✅");
1808        }
1809
1810        info!(immediate, "Reindexed {count} entries");
1811
1812        info!("Optimising Indexes: started");
1813        self.idlayer.optimise_dirty_idls();
1814        info!("Optimising Indexes: complete ✅");
1815        info!("Calculating Index Optimisation Slopes: started");
1816        self.idlayer.analyse_idx_slopes().inspect_err(|err| {
1817            error!(?err, "index optimisation failed");
1818        })?;
1819        info!("Calculating Index Optimisation Slopes: complete ✅");
1820        info!("System reindex: complete 🎉");
1821        Ok(())
1822    }
1823
1824    /// ⚠️  - This function will destroy all indexes in the database.
1825    ///
1826    /// It should only be called internally by the backend in limited and
1827    /// specific situations.
1828    fn danger_purge_idxs(&mut self) -> Result<(), OperationError> {
1829        self.get_idlayer().danger_purge_idxs()
1830    }
1831
1832    /// ⚠️  - This function will destroy all entries and indexes in the database.
1833    ///
1834    /// It should only be called internally by the backend in limited and
1835    /// specific situations.
1836    pub(crate) fn danger_delete_all_db_content(&mut self) -> Result<(), OperationError> {
1837        self.get_ruv().clear();
1838        self.get_idlayer()
1839            .danger_purge_id2entry()
1840            .and_then(|_| self.danger_purge_idxs())
1841    }
1842
1843    #[cfg(test)]
1844    pub fn load_test_idl(
1845        &mut self,
1846        attr: &Attribute,
1847        itype: IndexType,
1848        idx_key: &str,
1849    ) -> Result<Option<IDLBitRange>, OperationError> {
1850        self.get_idlayer().get_idl(attr, itype, idx_key)
1851    }
1852
1853    fn is_idx_slopeyness_generated(&mut self) -> Result<bool, OperationError> {
1854        self.get_idlayer().is_idx_slopeyness_generated()
1855    }
1856
1857    fn get_idx_slope(&mut self, ikey: &IdxKey) -> Result<IdxSlope, OperationError> {
1858        // Do we have the slopeyness?
1859        let slope = self
1860            .get_idlayer()
1861            .get_idx_slope(ikey)?
1862            .unwrap_or_else(|| get_idx_slope_default(ikey));
1863        trace!("index slope - {:?} -> {:?}", ikey, slope);
1864        Ok(slope)
1865    }
1866
1867    pub fn restore<IN>(
1868        &mut self,
1869        input: IN,
1870        compression: BackupCompression,
1871    ) -> Result<(), OperationError>
1872    where
1873        IN: std::io::Read,
1874    {
1875        // load all entries into RAM, may need to change this later
1876        // if the size of the database compared to RAM is an issue
1877
1878        let dbbak_option: Result<DbBackup, serde_json::Error> = match compression {
1879            BackupCompression::NoCompression => serde_json::from_reader(input),
1880            BackupCompression::Gzip => {
1881                let decoder = flate2::read::GzDecoder::new(input);
1882                serde_json::from_reader(decoder)
1883            }
1884        };
1885
1886        let dbbak = dbbak_option.map_err(|e| {
1887            admin_error!("serde_json error {:?}", e);
1888            OperationError::SerdeJsonError
1889        })?;
1890
1891        self.danger_delete_all_db_content().map_err(|e| {
1892            error!("delete_all_db_content failed {:?}", e);
1893            e
1894        })?;
1895
1896        let idlayer = self.get_idlayer();
1897
1898        let (dbentries, repl_meta, maybe_version) = match dbbak {
1899            DbBackup::V1(dbentries) => (dbentries, None, None),
1900            DbBackup::V2 {
1901                db_s_uuid,
1902                db_d_uuid,
1903                db_ts_max,
1904                entries,
1905            } => {
1906                // Do stuff.
1907                idlayer.write_db_s_uuid(db_s_uuid)?;
1908                idlayer.write_db_d_uuid(db_d_uuid)?;
1909                idlayer.set_db_ts_max(db_ts_max)?;
1910                (entries, None, None)
1911            }
1912            DbBackup::V3 {
1913                db_s_uuid,
1914                db_d_uuid,
1915                db_ts_max,
1916                keyhandles,
1917                entries,
1918            } => {
1919                // Do stuff.
1920                idlayer.write_db_s_uuid(db_s_uuid)?;
1921                idlayer.write_db_d_uuid(db_d_uuid)?;
1922                idlayer.set_db_ts_max(db_ts_max)?;
1923                idlayer.set_key_handles(keyhandles)?;
1924                (entries, None, None)
1925            }
1926            DbBackup::V4 {
1927                db_s_uuid,
1928                db_d_uuid,
1929                db_ts_max,
1930                keyhandles,
1931                repl_meta,
1932                entries,
1933            } => {
1934                // Do stuff.
1935                idlayer.write_db_s_uuid(db_s_uuid)?;
1936                idlayer.write_db_d_uuid(db_d_uuid)?;
1937                idlayer.set_db_ts_max(db_ts_max)?;
1938                idlayer.set_key_handles(keyhandles)?;
1939                (entries, Some(repl_meta), None)
1940            }
1941            DbBackup::V5 {
1942                version,
1943                db_s_uuid,
1944                db_d_uuid,
1945                db_ts_max,
1946                keyhandles,
1947                repl_meta,
1948                entries,
1949            } => {
1950                // Do stuff.
1951                idlayer.write_db_s_uuid(db_s_uuid)?;
1952                idlayer.write_db_d_uuid(db_d_uuid)?;
1953                idlayer.set_db_ts_max(db_ts_max)?;
1954                idlayer.set_key_handles(keyhandles)?;
1955                (entries, Some(repl_meta), Some(version))
1956            }
1957        };
1958
1959        if let Some(version) = maybe_version {
1960            if version != env!("KANIDM_PKG_SERIES") {
1961                error!("The provided backup data is from server version {} and is unable to be restored on this instance ({})", version, env!("KANIDM_PKG_SERIES"));
1962                return Err(OperationError::DB0001MismatchedRestoreVersion);
1963            }
1964        } else {
1965            error!("The provided backup data is from an older server version and is unable to be restored.");
1966            return Err(OperationError::DB0002MismatchedRestoreVersion);
1967        };
1968
1969        // Rebuild the RUV from the backup.
1970        match repl_meta {
1971            Some(DbReplMeta::V1 { ruv: db_ruv }) => {
1972                self.get_ruv()
1973                    .restore(db_ruv.into_iter().map(|db_cid| db_cid.into()))?;
1974            }
1975            None => {
1976                warn!("Unable to restore replication metadata, this server may need a refresh.");
1977            }
1978        }
1979
1980        info!("Restoring {} entries ...", dbentries.len());
1981
1982        // Now, we setup all the entries with new ids.
1983        let mut id_max = 0;
1984        let identries: Result<Vec<IdRawEntry>, _> = dbentries
1985            .iter()
1986            .map(|e| {
1987                id_max += 1;
1988                let data = serde_json::to_vec(&e).map_err(|_| OperationError::SerdeCborError)?;
1989                Ok(IdRawEntry { id: id_max, data })
1990            })
1991            .collect();
1992
1993        let idlayer = self.get_idlayer();
1994
1995        idlayer.write_identries_raw(identries?.into_iter())?;
1996
1997        info!("Restored {} entries", dbentries.len());
1998
1999        let vr = self.verify();
2000        if vr.is_empty() {
2001            Ok(())
2002        } else {
2003            Err(OperationError::ConsistencyError(
2004                vr.into_iter().filter_map(|v| v.err()).collect(),
2005            ))
2006        }
2007    }
2008
2009    /// If any RUV elements are present in the DB, load them now. This provides us with
2010    /// the RUV boundaries and change points from previous operations of the server, so
2011    /// that ruv_rebuild can "fill in" the gaps.
2012    ///
2013    /// # SAFETY
2014    ///
2015    /// Note that you should only call this function during the server startup
2016    /// to reload the RUV data from the entries of the database.
2017    ///
2018    /// Before calling this, the in memory ruv MUST be clear.
2019    #[instrument(level = "debug", name = "be::ruv_rebuild", skip_all)]
2020    fn ruv_reload(&mut self) -> Result<(), OperationError> {
2021        let idlayer = self.get_idlayer();
2022
2023        let db_ruv = idlayer.get_db_ruv()?;
2024
2025        // Setup the CID's that existed previously. We don't need to know what entries
2026        // they affect, we just need them to ensure that we have ranges for replication
2027        // comparison to take effect properly.
2028        self.get_ruv().restore(db_ruv)?;
2029
2030        // Then populate the RUV with the data from the entries.
2031        self.ruv_rebuild()
2032    }
2033
2034    #[instrument(level = "debug", name = "be::ruv_rebuild", skip_all)]
2035    fn ruv_rebuild(&mut self) -> Result<(), OperationError> {
2036        // Rebuild the ruv!
2037        // For now this has to read from all the entries in the DB, but in the future
2038        // we'll actually store this properly (?). If it turns out this is really fast
2039        // we may just rebuild this always on startup.
2040
2041        // NOTE: An important detail is that we don't rely on indexes here!
2042
2043        let idl = IdList::AllIds;
2044        let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
2045            admin_error!(?e, "get_identry failed");
2046            e
2047        })?;
2048
2049        self.get_ruv().rebuild(&entries)?;
2050
2051        Ok(())
2052    }
2053
2054    pub fn quarantine_entry(&mut self, id: u64) -> Result<(), OperationError> {
2055        self.get_idlayer().quarantine_entry(id)?;
2056        // We have to set the index version to 0 so that on next start we force
2057        // a reindex to automatically occur.
2058        self.set_db_index_version(0)
2059    }
2060
2061    pub fn restore_quarantined(&mut self, id: u64) -> Result<(), OperationError> {
2062        self.get_idlayer().restore_quarantined(id)?;
2063        // We have to set the index version to 0 so that on next start we force
2064        // a reindex to automatically occur.
2065        self.set_db_index_version(0)
2066    }
2067
2068    #[cfg(any(test, debug_assertions))]
2069    pub fn clear_cache(&mut self) -> Result<(), OperationError> {
2070        self.get_idlayer().clear_cache()
2071    }
2072
2073    pub fn commit(self) -> Result<(), OperationError> {
2074        let BackendWriteTransaction {
2075            mut idlayer,
2076            idxmeta_wr,
2077            ruv,
2078        } = self;
2079
2080        // write the ruv content back to the db.
2081        idlayer.write_db_ruv(ruv.added(), ruv.removed())?;
2082
2083        idlayer.commit().map(|()| {
2084            ruv.commit();
2085            idxmeta_wr.commit();
2086        })
2087    }
2088
2089    pub(crate) fn reset_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
2090        // The value is missing. Generate a new one and store it.
2091        let nsid = Uuid::new_v4();
2092        self.get_idlayer().write_db_s_uuid(nsid).map_err(|err| {
2093            error!(?err, "Unable to persist server uuid");
2094            err
2095        })?;
2096        Ok(nsid)
2097    }
2098    pub fn get_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
2099        let res = self.get_idlayer().get_db_s_uuid().map_err(|err| {
2100            error!(?err, "Failed to read server uuid");
2101            err
2102        })?;
2103        match res {
2104            Some(s_uuid) => Ok(s_uuid),
2105            None => self.reset_db_s_uuid(),
2106        }
2107    }
2108
2109    /// This generates a new domain UUID and stores it into the database,
2110    /// returning the new UUID
2111    fn reset_db_d_uuid(&mut self) -> Result<Uuid, OperationError> {
2112        let nsid = Uuid::new_v4();
2113        self.get_idlayer().write_db_d_uuid(nsid).map_err(|err| {
2114            error!(?err, "Unable to persist domain uuid");
2115            err
2116        })?;
2117        Ok(nsid)
2118    }
2119
2120    /// Manually set a new domain UUID and store it into the DB. This is used
2121    /// as part of a replication refresh.
2122    pub fn set_db_d_uuid(&mut self, nsid: Uuid) -> Result<(), OperationError> {
2123        self.get_idlayer().write_db_d_uuid(nsid)
2124    }
2125
2126    /// This pulls the domain UUID from the database
2127    pub fn get_db_d_uuid(&mut self) -> Result<Uuid, OperationError> {
2128        let res = self.get_idlayer().get_db_d_uuid().map_err(|err| {
2129            error!(?err, "Failed to read domain uuid");
2130            err
2131        })?;
2132        match res {
2133            Some(d_uuid) => Ok(d_uuid),
2134            None => self.reset_db_d_uuid(),
2135        }
2136    }
2137
2138    pub fn set_db_ts_max(&mut self, ts: Duration) -> Result<(), OperationError> {
2139        self.get_idlayer().set_db_ts_max(ts)
2140    }
2141
2142    pub fn get_db_ts_max(&mut self, ts: Duration) -> Result<Duration, OperationError> {
2143        // if none, return ts. If found, return it.
2144        match self.get_idlayer().get_db_ts_max()? {
2145            Some(dts) => Ok(dts),
2146            None => Ok(ts),
2147        }
2148    }
2149
2150    fn get_db_index_version(&mut self) -> Result<i64, OperationError> {
2151        self.get_idlayer().get_db_index_version()
2152    }
2153
2154    fn set_db_index_version(&mut self, v: i64) -> Result<(), OperationError> {
2155        self.get_idlayer().set_db_index_version(v)
2156    }
2157}
2158
2159// We have a number of hardcoded, "obvious" slopes that should
2160// exist. We return these when the analysis has not been run, as
2161// these are values that are generally "good enough" for most applications
2162fn get_idx_slope_default(ikey: &IdxKey) -> IdxSlope {
2163    match (ikey.attr.as_str(), &ikey.itype) {
2164        (ATTR_NAME, IndexType::Equality)
2165        | (ATTR_SPN, IndexType::Equality)
2166        | (ATTR_UUID, IndexType::Equality) => 1,
2167        (ATTR_CLASS, IndexType::Equality) => 180,
2168        (_, IndexType::Equality) => 45,
2169        (_, IndexType::SubString) => 90,
2170        (_, IndexType::Presence) => 90,
2171        (_, IndexType::Ordering) => 120,
2172    }
2173}
2174
2175// In the future this will do the routing between the chosen backends etc.
2176impl Backend {
2177    #[instrument(level = "debug", name = "be::new", skip_all)]
2178    pub fn new(
2179        mut cfg: BackendConfig,
2180        // path: &str,
2181        // mut pool_size: u32,
2182        // fstype: FsType,
2183        idxkeys: Vec<IdxKey>,
2184        vacuum: bool,
2185    ) -> Result<Self, OperationError> {
2186        debug!(db_tickets = ?cfg.pool_size, profile = %env!("KANIDM_PROFILE_NAME"), cpu_flags = %env!("KANIDM_CPU_FLAGS"));
2187
2188        // If in memory, reduce pool to 1
2189        if cfg.path.as_os_str().is_empty() {
2190            cfg.pool_size = 1;
2191        }
2192
2193        // Setup idxkeys here. By default we set these all to "max slope" aka
2194        // all indexes are "equal" but also worse case unless analysed.
2195        //
2196        // During startup this will be "fixed" as the schema core will call reload_idxmeta
2197        // which will trigger a reload of the analysis data (if present).
2198        let idxkeys: Map<_, _> = idxkeys
2199            .into_iter()
2200            .map(|ikey| {
2201                let slope = get_idx_slope_default(&ikey);
2202                (ikey, slope)
2203            })
2204            .collect();
2205
2206        // Load the replication update vector here. Initially we build an in memory
2207        // RUV, and then we load it from the DB.
2208        let ruv = Arc::new(ReplicationUpdateVector::default());
2209
2210        // this has a ::memory() type, but will path == "" work?
2211        let idlayer = Arc::new(IdlArcSqlite::new(&cfg, vacuum)?);
2212        let be = Backend {
2213            cfg,
2214            idlayer,
2215            ruv,
2216            idxmeta: Arc::new(CowCell::new(IdxMeta::new(idxkeys))),
2217        };
2218
2219        // Now complete our setup with a txn
2220        // In this case we can use an empty idx meta because we don't
2221        // access any parts of
2222        // the indexing subsystem here.
2223        let mut idl_write = be.idlayer.write()?;
2224        idl_write
2225            .setup()
2226            .and_then(|_| idl_write.commit())
2227            .map_err(|e| {
2228                admin_error!(?e, "Failed to setup idlayer");
2229                e
2230            })?;
2231
2232        // Load/generate any in memory indexes.
2233        // I think here we don't actually care about in memory indexes until
2234        // later?
2235
2236        // Now rebuild the ruv.
2237        let mut be_write = be.write()?;
2238        be_write
2239            .ruv_reload()
2240            .and_then(|_| be_write.commit())
2241            .map_err(|e| {
2242                admin_error!(?e, "Failed to reload ruv");
2243                e
2244            })?;
2245
2246        Ok(be)
2247    }
2248
2249    pub fn get_pool_size(&self) -> u32 {
2250        debug_assert!(self.cfg.pool_size > 0);
2251        self.cfg.pool_size
2252    }
2253
2254    pub fn try_quiesce(&self) {
2255        self.idlayer.try_quiesce();
2256    }
2257
2258    pub fn read(&self) -> Result<BackendReadTransaction<'_>, OperationError> {
2259        Ok(BackendReadTransaction {
2260            idlayer: self.idlayer.read()?,
2261            idxmeta: self.idxmeta.read(),
2262            ruv: self.ruv.read(),
2263        })
2264    }
2265
2266    pub fn write(&self) -> Result<BackendWriteTransaction<'_>, OperationError> {
2267        Ok(BackendWriteTransaction {
2268            idlayer: self.idlayer.write()?,
2269            idxmeta_wr: self.idxmeta.write(),
2270            ruv: self.ruv.write(),
2271        })
2272    }
2273}
2274
2275// What are the possible actions we'll receive here?
2276
2277#[cfg(test)]
2278mod tests {
2279    use super::super::entry::{Entry, EntryInit, EntryNew};
2280    use super::Limits;
2281    use super::{
2282        Backend, BackendConfig, BackendTransaction, BackendWriteTransaction, DbBackup, IdList,
2283        IdxKey, OperationError,
2284    };
2285    use crate::prelude::*;
2286    use crate::repl::cid::Cid;
2287    use crate::value::{IndexType, PartialValue, Value};
2288    use idlset::v2::IDLBitRange;
2289    use kanidm_proto::backup::BackupCompression;
2290    use std::iter::FromIterator;
2291    use std::sync::{Arc, LazyLock};
2292    use std::time::Duration;
2293
2294    static CID_ZERO: LazyLock<Cid> = LazyLock::new(|| Cid::new_zero());
2295    static CID_ONE: LazyLock<Cid> = LazyLock::new(|| Cid::new_count(1));
2296    static CID_TWO: LazyLock<Cid> = LazyLock::new(|| Cid::new_count(2));
2297    static CID_THREE: LazyLock<Cid> = LazyLock::new(|| Cid::new_count(3));
2298    static CID_ADV: LazyLock<Cid> = LazyLock::new(|| Cid::new_count(10));
2299
2300    macro_rules! run_test {
2301        ($test_fn:expr) => {{
2302            sketching::test_init();
2303
2304            // This is a demo idxmeta, purely for testing.
2305            let idxmeta = vec![
2306                IdxKey {
2307                    attr: Attribute::Name.into(),
2308                    itype: IndexType::Equality,
2309                },
2310                IdxKey {
2311                    attr: Attribute::Name.into(),
2312                    itype: IndexType::Presence,
2313                },
2314                IdxKey {
2315                    attr: Attribute::Name.into(),
2316                    itype: IndexType::SubString,
2317                },
2318                IdxKey {
2319                    attr: Attribute::Uuid.into(),
2320                    itype: IndexType::Equality,
2321                },
2322                IdxKey {
2323                    attr: Attribute::Uuid.into(),
2324                    itype: IndexType::Presence,
2325                },
2326                IdxKey {
2327                    attr: Attribute::TestAttr.into(),
2328                    itype: IndexType::Equality,
2329                },
2330                IdxKey {
2331                    attr: Attribute::TestNumber.into(),
2332                    itype: IndexType::Equality,
2333                },
2334            ];
2335
2336            let be = Backend::new(BackendConfig::new_test("main"), idxmeta, false)
2337                .expect("Failed to setup backend");
2338
2339            let mut be_txn = be.write().unwrap();
2340
2341            let r = $test_fn(&mut be_txn);
2342            // Commit, to guarantee it worked.
2343            assert!(be_txn.commit().is_ok());
2344            r
2345        }};
2346    }
2347
2348    macro_rules! entry_exists {
2349        ($be:expr, $ent:expr) => {{
2350            let ei = $ent.clone().into_sealed_committed();
2351            let filt = ei
2352                .filter_from_attrs(&[Attribute::Uuid.into()])
2353                .expect("failed to generate filter")
2354                .into_valid_resolved();
2355            let lims = Limits::unlimited();
2356            let entries = $be.search(&lims, &filt).expect("failed to search");
2357            entries.first().is_some()
2358        }};
2359    }
2360
2361    macro_rules! entry_attr_pres {
2362        ($be:expr, $ent:expr, $attr:expr) => {{
2363            let ei = $ent.clone().into_sealed_committed();
2364            let filt = ei
2365                .filter_from_attrs(&[Attribute::UserId.into()])
2366                .expect("failed to generate filter")
2367                .into_valid_resolved();
2368            let lims = Limits::unlimited();
2369            let entries = $be.search(&lims, &filt).expect("failed to search");
2370            match entries.first() {
2371                Some(ent) => ent.attribute_pres($attr),
2372                None => false,
2373            }
2374        }};
2375    }
2376
2377    macro_rules! idl_state {
2378        ($be:expr, $attr:expr, $itype:expr, $idx_key:expr, $expect:expr) => {{
2379            let t_idl = $be
2380                .load_test_idl(&$attr, $itype, &$idx_key.to_string())
2381                .expect("IdList Load failed");
2382            let t = $expect.map(|v: Vec<u64>| IDLBitRange::from_iter(v));
2383            assert_eq!(t_idl, t);
2384        }};
2385    }
2386
2387    #[test]
2388    fn test_be_simple_create() {
2389        run_test!(|be: &mut BackendWriteTransaction| {
2390            trace!("Simple Create");
2391
2392            let empty_result = be.create(&CID_ZERO, Vec::with_capacity(0));
2393            trace!("{:?}", empty_result);
2394            assert_eq!(empty_result, Err(OperationError::EmptyRequest));
2395
2396            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2397            e.add_ava(Attribute::UserId, Value::from("william"));
2398            e.add_ava(
2399                Attribute::Uuid,
2400                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2401            );
2402            let e = e.into_sealed_new();
2403
2404            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
2405
2406            assert!(single_result.is_ok());
2407
2408            // Construct a filter
2409            assert!(entry_exists!(be, e));
2410        });
2411    }
2412
2413    #[test]
2414    fn test_be_simple_search() {
2415        run_test!(|be: &mut BackendWriteTransaction| {
2416            trace!("Simple Search");
2417
2418            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2419            e.add_ava(Attribute::UserId, Value::from("claire"));
2420            e.add_ava(
2421                Attribute::Uuid,
2422                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2423            );
2424            let e = e.into_sealed_new();
2425
2426            let single_result = be.create(&CID_ZERO, vec![e]);
2427            assert!(single_result.is_ok());
2428            // Test a simple EQ search
2429
2430            let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("claire")));
2431
2432            let lims = Limits::unlimited();
2433
2434            let r = be.search(&lims, &filt);
2435            assert!(r.expect("Search failed!").len() == 1);
2436
2437            // Test empty search
2438
2439            // Test class pres
2440
2441            // Search with no results
2442        });
2443    }
2444
2445    #[test]
2446    fn test_be_search_with_invalid() {
2447        run_test!(|be: &mut BackendWriteTransaction| {
2448            trace!("Simple Search");
2449
2450            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2451            e.add_ava(Attribute::UserId, Value::from("bagel"));
2452            e.add_ava(
2453                Attribute::Uuid,
2454                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2455            );
2456            let e = e.into_sealed_new();
2457
2458            let single_result = be.create(&CID_ZERO, vec![e]);
2459            assert!(single_result.is_ok());
2460
2461            // Test Search with or condition including invalid attribute
2462            let filt = filter_resolved!(f_or(vec![
2463                f_eq(Attribute::UserId, PartialValue::new_utf8s("bagel")),
2464                f_invalid(Attribute::UserId)
2465            ]));
2466
2467            let lims = Limits::unlimited();
2468
2469            let r = be.search(&lims, &filt);
2470            assert!(r.expect("Search failed!").len() == 1);
2471
2472            // Test Search with or condition including invalid attribute
2473            let filt = filter_resolved!(f_and(vec![
2474                f_eq(Attribute::UserId, PartialValue::new_utf8s("bagel")),
2475                f_invalid(Attribute::UserId)
2476            ]));
2477
2478            let lims = Limits::unlimited();
2479
2480            let r = be.search(&lims, &filt);
2481            assert!(r.expect("Search failed!").is_empty());
2482        });
2483    }
2484
2485    #[test]
2486    fn test_be_simple_modify() {
2487        run_test!(|be: &mut BackendWriteTransaction| {
2488            trace!("Simple Modify");
2489            let lims = Limits::unlimited();
2490            // First create some entries (3?)
2491            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2492            e1.add_ava(Attribute::UserId, Value::from("william"));
2493            e1.add_ava(
2494                Attribute::Uuid,
2495                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2496            );
2497
2498            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2499            e2.add_ava(Attribute::UserId, Value::from("alice"));
2500            e2.add_ava(
2501                Attribute::Uuid,
2502                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2503            );
2504
2505            let ve1 = e1.clone().into_sealed_new();
2506            let ve2 = e2.clone().into_sealed_new();
2507
2508            assert!(be.create(&CID_ZERO, vec![ve1, ve2]).is_ok());
2509            assert!(entry_exists!(be, e1));
2510            assert!(entry_exists!(be, e2));
2511
2512            // You need to now retrieve the entries back out to get the entry id's
2513            let mut results = be
2514                .search(&lims, &filter_resolved!(f_pres(Attribute::UserId)))
2515                .expect("Failed to search");
2516
2517            // Get these out to usable entries.
2518            let r1 = results.remove(0);
2519            let r2 = results.remove(0);
2520
2521            let mut r1 = r1.as_ref().clone().into_invalid();
2522            let mut r2 = r2.as_ref().clone().into_invalid();
2523
2524            // Modify no id (err)
2525            // This is now impossible due to the state machine design.
2526            // However, with some unsafe ....
2527            let ue1 = e1.clone().into_sealed_committed();
2528            assert!(be
2529                .modify(&CID_ZERO, &[Arc::new(ue1.clone())], &[ue1])
2530                .is_err());
2531            // Modify none
2532            assert!(be.modify(&CID_ZERO, &[], &[]).is_err());
2533
2534            // Make some changes to r1, r2.
2535            let pre1 = Arc::new(r1.clone().into_sealed_committed());
2536            let pre2 = Arc::new(r2.clone().into_sealed_committed());
2537            r1.add_ava(Attribute::TestAttr, Value::from("modified"));
2538            r2.add_ava(Attribute::TestAttr, Value::from("modified"));
2539
2540            // Now ... cheat.
2541
2542            let vr1 = r1.into_sealed_committed();
2543            let vr2 = r2.into_sealed_committed();
2544
2545            // Modify single
2546            assert!(be
2547                .modify(&CID_ZERO, &[pre1], std::slice::from_ref(&vr1))
2548                .is_ok());
2549            // Assert no other changes
2550            assert!(entry_attr_pres!(be, vr1, Attribute::TestAttr));
2551            assert!(!entry_attr_pres!(be, vr2, Attribute::TestAttr));
2552
2553            // Modify both
2554            assert!(be
2555                .modify(
2556                    &CID_ZERO,
2557                    &[Arc::new(vr1.clone()), pre2],
2558                    &[vr1.clone(), vr2.clone()]
2559                )
2560                .is_ok());
2561
2562            assert!(entry_attr_pres!(be, vr1, Attribute::TestAttr));
2563            assert!(entry_attr_pres!(be, vr2, Attribute::TestAttr));
2564        });
2565    }
2566
2567    #[test]
2568    fn test_be_simple_delete() {
2569        run_test!(|be: &mut BackendWriteTransaction| {
2570            trace!("Simple Delete");
2571            let lims = Limits::unlimited();
2572
2573            // First create some entries (3?)
2574            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2575            e1.add_ava(Attribute::UserId, Value::from("william"));
2576            e1.add_ava(
2577                Attribute::Uuid,
2578                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2579            );
2580
2581            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2582            e2.add_ava(Attribute::UserId, Value::from("alice"));
2583            e2.add_ava(
2584                Attribute::Uuid,
2585                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2586            );
2587
2588            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2589            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2590            e3.add_ava(
2591                Attribute::Uuid,
2592                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2593            );
2594
2595            let ve1 = e1.clone().into_sealed_new();
2596            let ve2 = e2.clone().into_sealed_new();
2597            let ve3 = e3.clone().into_sealed_new();
2598
2599            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2600            assert!(entry_exists!(be, e1));
2601            assert!(entry_exists!(be, e2));
2602            assert!(entry_exists!(be, e3));
2603
2604            // You need to now retrieve the entries back out to get the entry id's
2605            let mut results = be
2606                .search(&lims, &filter_resolved!(f_pres(Attribute::UserId)))
2607                .expect("Failed to search");
2608
2609            // Get these out to usable entries.
2610            let r1 = results.remove(0);
2611            let r2 = results.remove(0);
2612            let r3 = results.remove(0);
2613
2614            // Deletes nothing, all entries are live.
2615            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
2616
2617            // Put them into the tombstone state, and write that down.
2618            // This sets up the RUV with the changes.
2619            let r1_ts = r1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
2620
2621            assert!(be
2622                .modify(&CID_ONE, &[r1], std::slice::from_ref(&r1_ts))
2623                .is_ok());
2624
2625            let r2_ts = r2.to_tombstone(CID_TWO.clone()).into_sealed_committed();
2626            let r3_ts = r3.to_tombstone(CID_TWO.clone()).into_sealed_committed();
2627
2628            assert!(be
2629                .modify(&CID_TWO, &[r2, r3], &[r2_ts.clone(), r3_ts.clone()])
2630                .is_ok());
2631
2632            // The entry are now tombstones, but is still in the ruv. This is because we
2633            // targeted CID_ZERO, not ONE.
2634            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
2635
2636            assert!(entry_exists!(be, r1_ts));
2637            assert!(entry_exists!(be, r2_ts));
2638            assert!(entry_exists!(be, r3_ts));
2639
2640            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ONE), Ok(0)));
2641
2642            assert!(entry_exists!(be, r1_ts));
2643            assert!(entry_exists!(be, r2_ts));
2644            assert!(entry_exists!(be, r3_ts));
2645
2646            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_TWO), Ok(1)));
2647
2648            assert!(!entry_exists!(be, r1_ts));
2649            assert!(entry_exists!(be, r2_ts));
2650            assert!(entry_exists!(be, r3_ts));
2651
2652            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(2)));
2653
2654            assert!(!entry_exists!(be, r1_ts));
2655            assert!(!entry_exists!(be, r2_ts));
2656            assert!(!entry_exists!(be, r3_ts));
2657
2658            // Nothing left
2659            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(0)));
2660
2661            assert!(!entry_exists!(be, r1_ts));
2662            assert!(!entry_exists!(be, r2_ts));
2663            assert!(!entry_exists!(be, r3_ts));
2664        });
2665    }
2666
2667    #[test]
2668    fn test_be_backup_restore() {
2669        run_test!(|be: &mut BackendWriteTransaction| {
2670            // Important! Need db metadata setup!
2671            be.reset_db_s_uuid().unwrap();
2672            be.reset_db_d_uuid().unwrap();
2673            be.set_db_ts_max(Duration::from_secs(1)).unwrap();
2674
2675            // First create some entries (3?)
2676            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2677            e1.add_ava(Attribute::UserId, Value::from("william"));
2678            e1.add_ava(
2679                Attribute::Uuid,
2680                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2681            );
2682
2683            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2684            e2.add_ava(Attribute::UserId, Value::from("alice"));
2685            e2.add_ava(
2686                Attribute::Uuid,
2687                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2688            );
2689
2690            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2691            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2692            e3.add_ava(
2693                Attribute::Uuid,
2694                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2695            );
2696
2697            let ve1 = e1.clone().into_sealed_new();
2698            let ve2 = e2.clone().into_sealed_new();
2699            let ve3 = e3.clone().into_sealed_new();
2700
2701            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2702            assert!(entry_exists!(be, e1));
2703            assert!(entry_exists!(be, e2));
2704            assert!(entry_exists!(be, e3));
2705
2706            let mut buf = std::io::Cursor::new(Vec::new());
2707
2708            be.backup(&mut buf, BackupCompression::Gzip)
2709                .expect("Backup failed!");
2710
2711            buf.set_position(0);
2712
2713            be.restore(&mut buf, BackupCompression::Gzip)
2714                .expect("Restore failed!");
2715
2716            assert!(be.verify().is_empty());
2717        });
2718    }
2719
2720    #[test]
2721    fn test_be_backup_restore_tampered() {
2722        run_test!(|be: &mut BackendWriteTransaction| {
2723            // Important! Need db metadata setup!
2724            be.reset_db_s_uuid().unwrap();
2725            be.reset_db_d_uuid().unwrap();
2726            be.set_db_ts_max(Duration::from_secs(1)).unwrap();
2727            // First create some entries (3?)
2728            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2729            e1.add_ava(Attribute::UserId, Value::from("william"));
2730            e1.add_ava(
2731                Attribute::Uuid,
2732                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2733            );
2734
2735            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2736            e2.add_ava(Attribute::UserId, Value::from("alice"));
2737            e2.add_ava(
2738                Attribute::Uuid,
2739                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2740            );
2741
2742            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2743            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2744            e3.add_ava(
2745                Attribute::Uuid,
2746                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2747            );
2748
2749            let ve1 = e1.clone().into_sealed_new();
2750            let ve2 = e2.clone().into_sealed_new();
2751            let ve3 = e3.clone().into_sealed_new();
2752
2753            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2754            assert!(entry_exists!(be, e1));
2755            assert!(entry_exists!(be, e2));
2756            assert!(entry_exists!(be, e3));
2757
2758            let mut buf = std::io::Cursor::new(Vec::new());
2759
2760            be.backup(&mut buf, BackupCompression::NoCompression)
2761                .expect("Backup failed!");
2762
2763            // Rewind
2764            buf.set_position(0);
2765
2766            // Now here, we need to tamper with the data.
2767            let mut dbbak: DbBackup = serde_json::from_reader(&mut buf).unwrap();
2768
2769            match &mut dbbak {
2770                DbBackup::V5 {
2771                    version: _,
2772                    db_s_uuid: _,
2773                    db_d_uuid: _,
2774                    db_ts_max: _,
2775                    keyhandles: _,
2776                    repl_meta: _,
2777                    entries,
2778                } => {
2779                    let _ = entries.pop();
2780                }
2781                _ => {
2782                    // We no longer use these format versions!
2783                    unreachable!()
2784                }
2785            };
2786
2787            buf.get_mut().clear();
2788            buf.set_position(0);
2789
2790            serde_json::to_writer(&mut buf, &dbbak).unwrap();
2791
2792            // Rewind
2793            buf.set_position(0);
2794
2795            be.restore(&mut buf, BackupCompression::NoCompression)
2796                .expect("Restore failed!");
2797
2798            assert!(be.verify().is_empty());
2799        });
2800    }
2801
2802    #[test]
2803    fn test_be_sid_generation_and_reset() {
2804        run_test!(|be: &mut BackendWriteTransaction| {
2805            let sid1 = be.get_db_s_uuid().unwrap();
2806            let sid2 = be.get_db_s_uuid().unwrap();
2807            assert_eq!(sid1, sid2);
2808            let sid3 = be.reset_db_s_uuid().unwrap();
2809            assert!(sid1 != sid3);
2810            let sid4 = be.get_db_s_uuid().unwrap();
2811            assert_eq!(sid3, sid4);
2812        });
2813    }
2814
2815    #[test]
2816    fn test_be_reindex_empty() {
2817        run_test!(|be: &mut BackendWriteTransaction| {
2818            // Add some test data?
2819            let missing = be.missing_idxs().unwrap();
2820            assert_eq!(missing.len(), 7);
2821            assert!(be.reindex(false).is_ok());
2822            let missing = be.missing_idxs().unwrap();
2823            debug!("{:?}", missing);
2824            assert!(missing.is_empty());
2825        });
2826    }
2827
2828    #[test]
2829    fn test_be_reindex_data() {
2830        run_test!(|be: &mut BackendWriteTransaction| {
2831            // Add some test data?
2832            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2833            e1.add_ava(Attribute::Name, Value::new_iname("william"));
2834            e1.add_ava(
2835                Attribute::Uuid,
2836                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2837            );
2838            let e1 = e1.into_sealed_new();
2839
2840            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2841            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
2842            e2.add_ava(
2843                Attribute::Uuid,
2844                Value::from("bd651620-00dd-426b-aaa0-4494f7b7906f"),
2845            );
2846            let e2 = e2.into_sealed_new();
2847
2848            be.create(&CID_ZERO, vec![e1, e2]).unwrap();
2849
2850            // purge indexes
2851            be.danger_purge_idxs().unwrap();
2852            // Check they are gone
2853            let missing = be.missing_idxs().unwrap();
2854            assert_eq!(missing.len(), 7);
2855            assert!(be.reindex(false).is_ok());
2856            let missing = be.missing_idxs().unwrap();
2857            debug!("{:?}", missing);
2858            assert!(missing.is_empty());
2859            // check name and uuid ids on eq, sub, pres
2860
2861            idl_state!(
2862                be,
2863                Attribute::Name,
2864                IndexType::Equality,
2865                "william",
2866                Some(vec![1])
2867            );
2868
2869            idl_state!(
2870                be,
2871                Attribute::Name,
2872                IndexType::Equality,
2873                "claire",
2874                Some(vec![2])
2875            );
2876
2877            for sub in [
2878                "w", "m", "wi", "il", "ll", "li", "ia", "am", "wil", "ill", "lli", "lia", "iam",
2879            ] {
2880                idl_state!(
2881                    be,
2882                    Attribute::Name,
2883                    IndexType::SubString,
2884                    sub,
2885                    Some(vec![1])
2886                );
2887            }
2888
2889            for sub in [
2890                "c", "r", "e", "cl", "la", "ai", "ir", "re", "cla", "lai", "air", "ire",
2891            ] {
2892                idl_state!(
2893                    be,
2894                    Attribute::Name,
2895                    IndexType::SubString,
2896                    sub,
2897                    Some(vec![2])
2898                );
2899            }
2900
2901            for sub in ["i", "a", "l"] {
2902                idl_state!(
2903                    be,
2904                    Attribute::Name,
2905                    IndexType::SubString,
2906                    sub,
2907                    Some(vec![1, 2])
2908                );
2909            }
2910
2911            idl_state!(
2912                be,
2913                Attribute::Name,
2914                IndexType::Presence,
2915                "_",
2916                Some(vec![1, 2])
2917            );
2918
2919            idl_state!(
2920                be,
2921                Attribute::Uuid,
2922                IndexType::Equality,
2923                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
2924                Some(vec![1])
2925            );
2926
2927            idl_state!(
2928                be,
2929                Attribute::Uuid,
2930                IndexType::Equality,
2931                "bd651620-00dd-426b-aaa0-4494f7b7906f",
2932                Some(vec![2])
2933            );
2934
2935            idl_state!(
2936                be,
2937                Attribute::Uuid,
2938                IndexType::Presence,
2939                "_",
2940                Some(vec![1, 2])
2941            );
2942
2943            // Show what happens with empty
2944
2945            idl_state!(
2946                be,
2947                Attribute::Name,
2948                IndexType::Equality,
2949                "not-exist",
2950                Some(Vec::with_capacity(0))
2951            );
2952
2953            idl_state!(
2954                be,
2955                Attribute::Uuid,
2956                IndexType::Equality,
2957                "fake-0079-4b8c-8a56-593b22aa44d1",
2958                Some(Vec::with_capacity(0))
2959            );
2960
2961            let uuid_p_idl = be
2962                .load_test_idl(&Attribute::from("not_indexed"), IndexType::Presence, "_")
2963                .unwrap(); // unwrap the result
2964            assert_eq!(uuid_p_idl, None);
2965
2966            // Check name2uuid
2967            let claire_uuid = uuid!("bd651620-00dd-426b-aaa0-4494f7b7906f");
2968            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
2969
2970            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
2971            assert_eq!(be.name2uuid("william"), Ok(Some(william_uuid)));
2972            assert_eq!(
2973                be.name2uuid("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2974                Ok(None)
2975            );
2976            // check uuid2spn
2977            assert_eq!(
2978                be.uuid2spn(claire_uuid),
2979                Ok(Some(Value::new_iname("claire")))
2980            );
2981            assert_eq!(
2982                be.uuid2spn(william_uuid),
2983                Ok(Some(Value::new_iname("william")))
2984            );
2985            // check uuid2rdn
2986            assert_eq!(
2987                be.uuid2rdn(claire_uuid),
2988                Ok(Some("name=claire".to_string()))
2989            );
2990            assert_eq!(
2991                be.uuid2rdn(william_uuid),
2992                Ok(Some("name=william".to_string()))
2993            );
2994        });
2995    }
2996
2997    #[test]
2998    fn test_be_index_create_delete_simple() {
2999        run_test!(|be: &mut BackendWriteTransaction| {
3000            // First, setup our index tables!
3001            assert!(be.reindex(false).is_ok());
3002            // Test that on entry create, the indexes are made correctly.
3003            // this is a similar case to reindex.
3004            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3005            e1.add_ava(Attribute::Name, Value::from("william"));
3006            e1.add_ava(
3007                Attribute::Uuid,
3008                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3009            );
3010            let e1 = e1.into_sealed_new();
3011
3012            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3013            let mut rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3014            let e1 = rset.pop().unwrap();
3015
3016            idl_state!(
3017                be,
3018                Attribute::Name.as_ref(),
3019                IndexType::Equality,
3020                "william",
3021                Some(vec![1])
3022            );
3023
3024            idl_state!(
3025                be,
3026                Attribute::Name.as_ref(),
3027                IndexType::Presence,
3028                "_",
3029                Some(vec![1])
3030            );
3031
3032            idl_state!(
3033                be,
3034                Attribute::Uuid.as_ref(),
3035                IndexType::Equality,
3036                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3037                Some(vec![1])
3038            );
3039
3040            idl_state!(
3041                be,
3042                Attribute::Uuid.as_ref(),
3043                IndexType::Presence,
3044                "_",
3045                Some(vec![1])
3046            );
3047
3048            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3049            assert_eq!(be.name2uuid("william"), Ok(Some(william_uuid)));
3050            assert_eq!(be.uuid2spn(william_uuid), Ok(Some(Value::from("william"))));
3051            assert_eq!(
3052                be.uuid2rdn(william_uuid),
3053                Ok(Some("name=william".to_string()))
3054            );
3055
3056            // == Now we reap_tombstones, and assert we removed the items.
3057            let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3058            assert!(be.modify(&CID_ONE, &[e1], &[e1_ts]).is_ok());
3059            be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
3060
3061            idl_state!(
3062                be,
3063                Attribute::Name.as_ref(),
3064                IndexType::Equality,
3065                "william",
3066                Some(Vec::with_capacity(0))
3067            );
3068
3069            idl_state!(
3070                be,
3071                Attribute::Name.as_ref(),
3072                IndexType::Presence,
3073                "_",
3074                Some(Vec::with_capacity(0))
3075            );
3076
3077            idl_state!(
3078                be,
3079                Attribute::Uuid.as_ref(),
3080                IndexType::Equality,
3081                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3082                Some(Vec::with_capacity(0))
3083            );
3084
3085            idl_state!(
3086                be,
3087                Attribute::Uuid.as_ref(),
3088                IndexType::Presence,
3089                "_",
3090                Some(Vec::with_capacity(0))
3091            );
3092
3093            assert_eq!(be.name2uuid("william"), Ok(None));
3094            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3095            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3096        })
3097    }
3098
3099    #[test]
3100    fn test_be_index_create_delete_multi() {
3101        run_test!(|be: &mut BackendWriteTransaction| {
3102            // delete multiple entries at a time, without deleting others
3103            // First, setup our index tables!
3104            assert!(be.reindex(false).is_ok());
3105            // Test that on entry create, the indexes are made correctly.
3106            // this is a similar case to reindex.
3107            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3108            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3109            e1.add_ava(
3110                Attribute::Uuid,
3111                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3112            );
3113            let e1 = e1.into_sealed_new();
3114
3115            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3116            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3117            e2.add_ava(
3118                Attribute::Uuid,
3119                Value::from("bd651620-00dd-426b-aaa0-4494f7b7906f"),
3120            );
3121            let e2 = e2.into_sealed_new();
3122
3123            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
3124            e3.add_ava(Attribute::UserId, Value::new_iname("lucy"));
3125            e3.add_ava(
3126                Attribute::Uuid,
3127                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
3128            );
3129            let e3 = e3.into_sealed_new();
3130
3131            let mut rset = be.create(&CID_ZERO, vec![e1, e2, e3]).unwrap();
3132            rset.remove(1);
3133            let mut rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3134            let e1 = rset.pop().unwrap();
3135            let e3 = rset.pop().unwrap();
3136
3137            // Now remove e1, e3.
3138            let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3139            let e3_ts = e3.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3140            assert!(be.modify(&CID_ONE, &[e1, e3], &[e1_ts, e3_ts]).is_ok());
3141            be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
3142
3143            idl_state!(
3144                be,
3145                Attribute::Name.as_ref(),
3146                IndexType::Equality,
3147                "claire",
3148                Some(vec![2])
3149            );
3150
3151            idl_state!(
3152                be,
3153                Attribute::Name.as_ref(),
3154                IndexType::Presence,
3155                "_",
3156                Some(vec![2])
3157            );
3158
3159            idl_state!(
3160                be,
3161                Attribute::Uuid.as_ref(),
3162                IndexType::Equality,
3163                "bd651620-00dd-426b-aaa0-4494f7b7906f",
3164                Some(vec![2])
3165            );
3166
3167            idl_state!(
3168                be,
3169                Attribute::Uuid.as_ref(),
3170                IndexType::Presence,
3171                "_",
3172                Some(vec![2])
3173            );
3174
3175            let claire_uuid = uuid!("bd651620-00dd-426b-aaa0-4494f7b7906f");
3176            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3177            let lucy_uuid = uuid!("7b23c99d-c06b-4a9a-a958-3afa56383e1d");
3178
3179            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
3180            let x = be.uuid2spn(claire_uuid);
3181            trace!(?x);
3182            assert_eq!(
3183                be.uuid2spn(claire_uuid),
3184                Ok(Some(Value::new_iname("claire")))
3185            );
3186            assert_eq!(
3187                be.uuid2rdn(claire_uuid),
3188                Ok(Some("name=claire".to_string()))
3189            );
3190
3191            assert_eq!(be.name2uuid("william"), Ok(None));
3192            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3193            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3194
3195            assert_eq!(be.name2uuid("lucy"), Ok(None));
3196            assert_eq!(be.uuid2spn(lucy_uuid), Ok(None));
3197            assert_eq!(be.uuid2rdn(lucy_uuid), Ok(None));
3198        })
3199    }
3200
3201    #[test]
3202    fn test_be_index_modify_simple() {
3203        run_test!(|be: &mut BackendWriteTransaction| {
3204            assert!(be.reindex(false).is_ok());
3205            // modify with one type, ensuring we clean the indexes behind
3206            // us. For the test to be "accurate" we must add one attr, remove one attr
3207            // and change one attr.
3208            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3209            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3210            e1.add_ava(
3211                Attribute::Uuid,
3212                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3213            );
3214            e1.add_ava(Attribute::TestAttr, Value::from("test"));
3215            let e1 = e1.into_sealed_new();
3216
3217            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3218            let rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3219            // Now, alter the new entry.
3220            let mut ce1 = rset[0].as_ref().clone().into_invalid();
3221            // add something.
3222            ce1.add_ava(Attribute::TestNumber, Value::from("test"));
3223            // remove something.
3224            ce1.purge_ava(Attribute::TestAttr);
3225            // mod something.
3226            ce1.purge_ava(Attribute::Name);
3227            ce1.add_ava(Attribute::Name, Value::new_iname("claire"));
3228
3229            let ce1 = ce1.into_sealed_committed();
3230
3231            be.modify(&CID_ZERO, &rset, &[ce1]).unwrap();
3232
3233            // Now check the idls
3234            idl_state!(
3235                be,
3236                Attribute::Name.as_ref(),
3237                IndexType::Equality,
3238                "claire",
3239                Some(vec![1])
3240            );
3241
3242            idl_state!(
3243                be,
3244                Attribute::Name.as_ref(),
3245                IndexType::Presence,
3246                "_",
3247                Some(vec![1])
3248            );
3249
3250            idl_state!(
3251                be,
3252                Attribute::TestNumber.as_ref(),
3253                IndexType::Equality,
3254                "test",
3255                Some(vec![1])
3256            );
3257
3258            idl_state!(
3259                be,
3260                Attribute::TestAttr,
3261                IndexType::Equality,
3262                "test",
3263                Some(vec![])
3264            );
3265
3266            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3267            assert_eq!(be.name2uuid("william"), Ok(None));
3268            assert_eq!(be.name2uuid("claire"), Ok(Some(william_uuid)));
3269            assert_eq!(
3270                be.uuid2spn(william_uuid),
3271                Ok(Some(Value::new_iname("claire")))
3272            );
3273            assert_eq!(
3274                be.uuid2rdn(william_uuid),
3275                Ok(Some("name=claire".to_string()))
3276            );
3277        })
3278    }
3279
3280    #[test]
3281    fn test_be_index_modify_rename() {
3282        run_test!(|be: &mut BackendWriteTransaction| {
3283            assert!(be.reindex(false).is_ok());
3284            // test when we change name AND uuid
3285            // This will be needing to be correct for conflicts when we add
3286            // replication support!
3287            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3288            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3289            e1.add_ava(
3290                Attribute::Uuid,
3291                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3292            );
3293            let e1 = e1.into_sealed_new();
3294
3295            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3296            let rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3297            // Now, alter the new entry.
3298            let mut ce1 = rset[0].as_ref().clone().into_invalid();
3299            ce1.purge_ava(Attribute::Name);
3300            ce1.purge_ava(Attribute::Uuid);
3301            ce1.add_ava(Attribute::Name, Value::new_iname("claire"));
3302            ce1.add_ava(
3303                Attribute::Uuid,
3304                Value::from("04091a7a-6ce4-42d2-abf5-c2ce244ac9e8"),
3305            );
3306            let ce1 = ce1.into_sealed_committed();
3307
3308            be.modify(&CID_ZERO, &rset, &[ce1]).unwrap();
3309
3310            idl_state!(
3311                be,
3312                Attribute::Name.as_ref(),
3313                IndexType::Equality,
3314                "claire",
3315                Some(vec![1])
3316            );
3317
3318            idl_state!(
3319                be,
3320                Attribute::Uuid.as_ref(),
3321                IndexType::Equality,
3322                "04091a7a-6ce4-42d2-abf5-c2ce244ac9e8",
3323                Some(vec![1])
3324            );
3325
3326            idl_state!(
3327                be,
3328                Attribute::Name.as_ref(),
3329                IndexType::Presence,
3330                "_",
3331                Some(vec![1])
3332            );
3333            idl_state!(
3334                be,
3335                Attribute::Uuid.as_ref(),
3336                IndexType::Presence,
3337                "_",
3338                Some(vec![1])
3339            );
3340
3341            idl_state!(
3342                be,
3343                Attribute::Uuid.as_ref(),
3344                IndexType::Equality,
3345                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3346                Some(Vec::with_capacity(0))
3347            );
3348            idl_state!(
3349                be,
3350                Attribute::Name.as_ref(),
3351                IndexType::Equality,
3352                "william",
3353                Some(Vec::with_capacity(0))
3354            );
3355
3356            let claire_uuid = uuid!("04091a7a-6ce4-42d2-abf5-c2ce244ac9e8");
3357            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3358            assert_eq!(be.name2uuid("william"), Ok(None));
3359            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
3360            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3361            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3362            assert_eq!(
3363                be.uuid2spn(claire_uuid),
3364                Ok(Some(Value::new_iname("claire")))
3365            );
3366            assert_eq!(
3367                be.uuid2rdn(claire_uuid),
3368                Ok(Some("name=claire".to_string()))
3369            );
3370        })
3371    }
3372
3373    #[test]
3374    fn test_be_index_search_simple() {
3375        run_test!(|be: &mut BackendWriteTransaction| {
3376            assert!(be.reindex(false).is_ok());
3377
3378            // Create a test entry with some indexed / unindexed values.
3379            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3380            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3381            e1.add_ava(
3382                Attribute::Uuid,
3383                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3384            );
3385            e1.add_ava(Attribute::NoIndex, Value::from("william"));
3386            e1.add_ava(Attribute::OtherNoIndex, Value::from("william"));
3387            let e1 = e1.into_sealed_new();
3388
3389            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3390            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3391            e2.add_ava(
3392                Attribute::Uuid,
3393                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d2"),
3394            );
3395            let e2 = e2.into_sealed_new();
3396
3397            let _rset = be.create(&CID_ZERO, vec![e1, e2]).unwrap();
3398            // Test fully unindexed
3399            let f_un =
3400                filter_resolved!(f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william")));
3401
3402            let (r, _plan) = be.filter2idl(f_un.to_inner(), 0).unwrap();
3403            match r {
3404                IdList::AllIds => {}
3405                _ => {
3406                    panic!("");
3407                }
3408            }
3409
3410            // Test that a fully indexed search works
3411            let feq = filter_resolved!(f_eq(Attribute::Name, PartialValue::new_utf8s("william")));
3412
3413            let (r, _plan) = be.filter2idl(feq.to_inner(), 0).unwrap();
3414            match r {
3415                IdList::Indexed(idl) => {
3416                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3417                }
3418                _ => {
3419                    panic!("");
3420                }
3421            }
3422
3423            // Test and/or
3424            //   full index and
3425            let f_in_and = filter_resolved!(f_and!([
3426                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3427                f_eq(
3428                    Attribute::Uuid,
3429                    PartialValue::new_utf8s("db237e8a-0079-4b8c-8a56-593b22aa44d1")
3430                )
3431            ]));
3432
3433            let (r, _plan) = be.filter2idl(f_in_and.to_inner(), 0).unwrap();
3434            match r {
3435                IdList::Indexed(idl) => {
3436                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3437                }
3438                _ => {
3439                    panic!("");
3440                }
3441            }
3442
3443            //   partial index and
3444            let f_p1 = filter_resolved!(f_and!([
3445                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3446                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william"))
3447            ]));
3448
3449            let f_p2 = filter_resolved!(f_and!([
3450                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3451                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william"))
3452            ]));
3453
3454            let (r, _plan) = be.filter2idl(f_p1.to_inner(), 0).unwrap();
3455            match r {
3456                IdList::Partial(idl) => {
3457                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3458                }
3459                _ => unreachable!(),
3460            }
3461
3462            let (r, _plan) = be.filter2idl(f_p2.to_inner(), 0).unwrap();
3463            match r {
3464                IdList::Partial(idl) => {
3465                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3466                }
3467                _ => unreachable!(),
3468            }
3469
3470            // Substrings are always partial
3471            let f_p3 = filter_resolved!(f_sub(Attribute::Name, PartialValue::new_utf8s("wil")));
3472
3473            let (r, plan) = be.filter2idl(f_p3.to_inner(), 0).unwrap();
3474            trace!(?r, ?plan);
3475            match r {
3476                IdList::Partial(idl) => {
3477                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3478                }
3479                _ => unreachable!(),
3480            }
3481
3482            //   no index and
3483            let f_no_and = filter_resolved!(f_and!([
3484                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william")),
3485                f_eq(Attribute::OtherNoIndex, PartialValue::new_utf8s("william"))
3486            ]));
3487
3488            let (r, _plan) = be.filter2idl(f_no_and.to_inner(), 0).unwrap();
3489            match r {
3490                IdList::AllIds => {}
3491                _ => {
3492                    panic!("");
3493                }
3494            }
3495
3496            //   full index or
3497            let f_in_or = filter_resolved!(f_or!([f_eq(
3498                Attribute::Name,
3499                PartialValue::new_utf8s("william")
3500            )]));
3501
3502            let (r, _plan) = be.filter2idl(f_in_or.to_inner(), 0).unwrap();
3503            match r {
3504                IdList::Indexed(idl) => {
3505                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3506                }
3507                _ => {
3508                    panic!("");
3509                }
3510            }
3511            //   partial (aka allids) or
3512            let f_un_or = filter_resolved!(f_or!([f_eq(
3513                Attribute::NoIndex,
3514                PartialValue::new_utf8s("william")
3515            )]));
3516
3517            let (r, _plan) = be.filter2idl(f_un_or.to_inner(), 0).unwrap();
3518            match r {
3519                IdList::AllIds => {}
3520                _ => {
3521                    panic!("");
3522                }
3523            }
3524
3525            // Test root andnot
3526            let f_r_andnot = filter_resolved!(f_andnot(f_eq(
3527                Attribute::Name,
3528                PartialValue::new_utf8s("william")
3529            )));
3530
3531            let (r, _plan) = be.filter2idl(f_r_andnot.to_inner(), 0).unwrap();
3532            match r {
3533                IdList::Indexed(idl) => {
3534                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3535                }
3536                _ => {
3537                    panic!("");
3538                }
3539            }
3540
3541            // test andnot as only in and
3542            let f_and_andnot = filter_resolved!(f_and!([f_andnot(f_eq(
3543                Attribute::Name,
3544                PartialValue::new_utf8s("william")
3545            ))]));
3546
3547            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3548            match r {
3549                IdList::Indexed(idl) => {
3550                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3551                }
3552                _ => {
3553                    panic!("");
3554                }
3555            }
3556            // test andnot as only in or
3557            let f_or_andnot = filter_resolved!(f_or!([f_andnot(f_eq(
3558                Attribute::Name,
3559                PartialValue::new_utf8s("william")
3560            ))]));
3561
3562            let (r, _plan) = be.filter2idl(f_or_andnot.to_inner(), 0).unwrap();
3563            match r {
3564                IdList::Indexed(idl) => {
3565                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3566                }
3567                _ => {
3568                    panic!("");
3569                }
3570            }
3571
3572            // test andnot in and (first) with name
3573            let f_and_andnot = filter_resolved!(f_and!([
3574                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire"))),
3575                f_pres(Attribute::Name)
3576            ]));
3577
3578            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3579            match r {
3580                IdList::Indexed(idl) => {
3581                    debug!("{:?}", idl);
3582                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3583                }
3584                _ => {
3585                    panic!("");
3586                }
3587            }
3588            // test andnot in and (last) with name
3589            let f_and_andnot = filter_resolved!(f_and!([
3590                f_pres(Attribute::Name),
3591                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire")))
3592            ]));
3593
3594            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3595            match r {
3596                IdList::Indexed(idl) => {
3597                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3598                }
3599                _ => {
3600                    panic!("");
3601                }
3602            }
3603            // test andnot in and (first) with no-index
3604            let f_and_andnot = filter_resolved!(f_and!([
3605                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire"))),
3606                f_pres(Attribute::NoIndex)
3607            ]));
3608
3609            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3610            match r {
3611                IdList::AllIds => {}
3612                _ => {
3613                    panic!("");
3614                }
3615            }
3616            // test andnot in and (last) with no-index
3617            let f_and_andnot = filter_resolved!(f_and!([
3618                f_pres(Attribute::NoIndex),
3619                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire")))
3620            ]));
3621
3622            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3623            match r {
3624                IdList::AllIds => {}
3625                _ => {
3626                    panic!("");
3627                }
3628            }
3629
3630            //   empty or
3631            let f_e_or = filter_resolved!(f_or!([]));
3632
3633            let (r, _plan) = be.filter2idl(f_e_or.to_inner(), 0).unwrap();
3634            match r {
3635                IdList::Indexed(idl) => {
3636                    assert_eq!(idl, IDLBitRange::from_iter(vec![]));
3637                }
3638                _ => {
3639                    panic!("");
3640                }
3641            }
3642
3643            let f_e_and = filter_resolved!(f_and!([]));
3644
3645            let (r, _plan) = be.filter2idl(f_e_and.to_inner(), 0).unwrap();
3646            match r {
3647                IdList::Indexed(idl) => {
3648                    assert_eq!(idl, IDLBitRange::from_iter(vec![]));
3649                }
3650                _ => {
3651                    panic!("");
3652                }
3653            }
3654        })
3655    }
3656
3657    #[test]
3658    fn test_be_index_search_missing() {
3659        run_test!(|be: &mut BackendWriteTransaction| {
3660            // Test where the index is in schema but not created (purge idxs)
3661            // should fall back to an empty set because we can't satisfy the term
3662            be.danger_purge_idxs().unwrap();
3663            debug!("{:?}", be.missing_idxs().unwrap());
3664            let f_eq = filter_resolved!(f_eq(Attribute::Name, PartialValue::new_utf8s("william")));
3665
3666            let (r, _plan) = be.filter2idl(f_eq.to_inner(), 0).unwrap();
3667            match r {
3668                IdList::AllIds => {}
3669                _ => {
3670                    panic!("");
3671                }
3672            }
3673        })
3674    }
3675
3676    #[test]
3677    fn test_be_index_slope_generation() {
3678        run_test!(|be: &mut BackendWriteTransaction| {
3679            // Create some test entry with some indexed / unindexed values.
3680            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3681            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3682            e1.add_ava(
3683                Attribute::Uuid,
3684                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3685            );
3686            e1.add_ava(Attribute::TestAttr, Value::from("dupe"));
3687            e1.add_ava(Attribute::TestNumber, Value::from("1"));
3688            let e1 = e1.into_sealed_new();
3689
3690            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3691            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3692            e2.add_ava(
3693                Attribute::Uuid,
3694                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d2"),
3695            );
3696            e2.add_ava(Attribute::TestAttr, Value::from("dupe"));
3697            e2.add_ava(Attribute::TestNumber, Value::from("1"));
3698            let e2 = e2.into_sealed_new();
3699
3700            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
3701            e3.add_ava(Attribute::Name, Value::new_iname("benny"));
3702            e3.add_ava(
3703                Attribute::Uuid,
3704                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d3"),
3705            );
3706            e3.add_ava(Attribute::TestAttr, Value::from("dupe"));
3707            e3.add_ava(Attribute::TestNumber, Value::from("2"));
3708            let e3 = e3.into_sealed_new();
3709
3710            let _rset = be.create(&CID_ZERO, vec![e1, e2, e3]).unwrap();
3711
3712            // If the slopes haven't been generated yet, there are some hardcoded values
3713            // that we can use instead. They aren't generated until a first re-index.
3714            assert!(!be.is_idx_slopeyness_generated().unwrap());
3715
3716            let ta_eq_slope = be
3717                .get_idx_slope(&IdxKey::new(Attribute::TestAttr, IndexType::Equality))
3718                .unwrap();
3719            assert_eq!(ta_eq_slope, 45);
3720
3721            let tb_eq_slope = be
3722                .get_idx_slope(&IdxKey::new(Attribute::TestNumber, IndexType::Equality))
3723                .unwrap();
3724            assert_eq!(tb_eq_slope, 45);
3725
3726            let name_eq_slope = be
3727                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Equality))
3728                .unwrap();
3729            assert_eq!(name_eq_slope, 1);
3730            let uuid_eq_slope = be
3731                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Equality))
3732                .unwrap();
3733            assert_eq!(uuid_eq_slope, 1);
3734
3735            let name_pres_slope = be
3736                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Presence))
3737                .unwrap();
3738            assert_eq!(name_pres_slope, 90);
3739            let uuid_pres_slope = be
3740                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Presence))
3741                .unwrap();
3742            assert_eq!(uuid_pres_slope, 90);
3743            // Check the slopes are what we expect for hardcoded values.
3744
3745            // Now check slope generation for the values. Today these are calculated
3746            // at reindex time, so we now perform the re-index.
3747            assert!(be.reindex(false).is_ok());
3748            assert!(be.is_idx_slopeyness_generated().unwrap());
3749
3750            let ta_eq_slope = be
3751                .get_idx_slope(&IdxKey::new(Attribute::TestAttr, IndexType::Equality))
3752                .unwrap();
3753            assert_eq!(ta_eq_slope, 200);
3754
3755            let tb_eq_slope = be
3756                .get_idx_slope(&IdxKey::new(Attribute::TestNumber, IndexType::Equality))
3757                .unwrap();
3758            assert_eq!(tb_eq_slope, 133);
3759
3760            let name_eq_slope = be
3761                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Equality))
3762                .unwrap();
3763            assert_eq!(name_eq_slope, 51);
3764            let uuid_eq_slope = be
3765                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Equality))
3766                .unwrap();
3767            assert_eq!(uuid_eq_slope, 51);
3768
3769            let name_pres_slope = be
3770                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Presence))
3771                .unwrap();
3772            assert_eq!(name_pres_slope, 200);
3773            let uuid_pres_slope = be
3774                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Presence))
3775                .unwrap();
3776            assert_eq!(uuid_pres_slope, 200);
3777        })
3778    }
3779
3780    #[test]
3781    fn test_be_limits_allids() {
3782        run_test!(|be: &mut BackendWriteTransaction| {
3783            let mut lim_allow_allids = Limits::unlimited();
3784            lim_allow_allids.unindexed_allow = true;
3785
3786            let mut lim_deny_allids = Limits::unlimited();
3787            lim_deny_allids.unindexed_allow = false;
3788
3789            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3790            e.add_ava(Attribute::UserId, Value::from("william"));
3791            e.add_ava(
3792                Attribute::Uuid,
3793                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3794            );
3795            e.add_ava(Attribute::NonExist, Value::from("x"));
3796            let e = e.into_sealed_new();
3797            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
3798
3799            assert!(single_result.is_ok());
3800            let filt = e
3801                .filter_from_attrs(&[Attribute::NonExist])
3802                .expect("failed to generate filter")
3803                .into_valid_resolved();
3804            // check allow on allids
3805            let res = be.search(&lim_allow_allids, &filt);
3806            assert!(res.is_ok());
3807            let res = be.exists(&lim_allow_allids, &filt);
3808            assert!(res.is_ok());
3809
3810            // check deny on allids
3811            let res = be.search(&lim_deny_allids, &filt);
3812            assert_eq!(res, Err(OperationError::ResourceLimit));
3813            let res = be.exists(&lim_deny_allids, &filt);
3814            assert_eq!(res, Err(OperationError::ResourceLimit));
3815        })
3816    }
3817
3818    #[test]
3819    fn test_be_limits_results_max() {
3820        run_test!(|be: &mut BackendWriteTransaction| {
3821            let mut lim_allow = Limits::unlimited();
3822            lim_allow.search_max_results = usize::MAX;
3823
3824            let mut lim_deny = Limits::unlimited();
3825            lim_deny.search_max_results = 0;
3826
3827            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3828            e.add_ava(Attribute::UserId, Value::from("william"));
3829            e.add_ava(
3830                Attribute::Uuid,
3831                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3832            );
3833            e.add_ava(Attribute::NonExist, Value::from("x"));
3834            let e = e.into_sealed_new();
3835            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
3836            assert!(single_result.is_ok());
3837
3838            let filt = e
3839                .filter_from_attrs(&[Attribute::NonExist])
3840                .expect("failed to generate filter")
3841                .into_valid_resolved();
3842
3843            // --> This is the all ids path (unindexed)
3844            // check allow on entry max
3845            let res = be.search(&lim_allow, &filt);
3846            assert!(res.is_ok());
3847            let res = be.exists(&lim_allow, &filt);
3848            assert!(res.is_ok());
3849
3850            // check deny on entry max
3851            let res = be.search(&lim_deny, &filt);
3852            assert_eq!(res, Err(OperationError::ResourceLimit));
3853            // we don't limit on exists because we never load the entries.
3854            let res = be.exists(&lim_deny, &filt);
3855            assert!(res.is_ok());
3856
3857            // --> This will shortcut due to indexing.
3858            assert!(be.reindex(false).is_ok());
3859            let res = be.search(&lim_deny, &filt);
3860            assert_eq!(res, Err(OperationError::ResourceLimit));
3861            // we don't limit on exists because we never load the entries.
3862            let res = be.exists(&lim_deny, &filt);
3863            assert!(res.is_ok());
3864        })
3865    }
3866
3867    #[test]
3868    fn test_be_limits_partial_filter() {
3869        run_test!(|be: &mut BackendWriteTransaction| {
3870            // This relies on how we do partials, so it could be a bit sensitive.
3871            // A partial is generated after an allids + indexed in a single and
3872            // as we require both conditions to exist. Allids comes from unindexed
3873            // terms. we need to ensure we don't hit partial threshold too.
3874            //
3875            // This means we need an and query where the first term is allids
3876            // and the second is indexed, but without the filter shortcutting.
3877            //
3878            // To achieve this we need a monstrously evil query.
3879            //
3880            let mut lim_allow = Limits::unlimited();
3881            lim_allow.search_max_filter_test = usize::MAX;
3882
3883            let mut lim_deny = Limits::unlimited();
3884            lim_deny.search_max_filter_test = 0;
3885
3886            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3887            e.add_ava(Attribute::Name, Value::new_iname("william"));
3888            e.add_ava(
3889                Attribute::Uuid,
3890                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3891            );
3892            e.add_ava(Attribute::NonExist, Value::from("x"));
3893            e.add_ava(Attribute::NonExist, Value::from("y"));
3894            let e = e.into_sealed_new();
3895            let single_result = be.create(&CID_ZERO, vec![e]);
3896            assert!(single_result.is_ok());
3897
3898            // Reindex so we have things in place for our query
3899            assert!(be.reindex(false).is_ok());
3900
3901            // 🚨 This is evil!
3902            // The and allows us to hit "allids + indexed -> partial".
3903            // the or terms prevent re-arrangement. They can't be folded or dead
3904            // term elimed either.
3905            //
3906            // This means the f_or nonexist will become allids and the second will be indexed
3907            // due to f_eq userid in both with the result of william.
3908            //
3909            // This creates a partial, and because it's the first iteration in the loop, this
3910            // doesn't encounter partial threshold testing.
3911            let filt = filter_resolved!(f_and!([
3912                f_or!([
3913                    f_eq(Attribute::NonExist, PartialValue::new_utf8s("x")),
3914                    f_eq(Attribute::NonExist, PartialValue::new_utf8s("y"))
3915                ]),
3916                f_or!([
3917                    f_eq(Attribute::Name, PartialValue::new_utf8s("claire")),
3918                    f_eq(Attribute::Name, PartialValue::new_utf8s("william"))
3919                ]),
3920            ]));
3921
3922            let res = be.search(&lim_allow, &filt);
3923            assert!(res.is_ok());
3924            let res = be.exists(&lim_allow, &filt);
3925            assert!(res.is_ok());
3926
3927            // check deny on entry max
3928            let res = be.search(&lim_deny, &filt);
3929            assert_eq!(res, Err(OperationError::ResourceLimit));
3930            // we don't limit on exists because we never load the entries.
3931            let res = be.exists(&lim_deny, &filt);
3932            assert_eq!(res, Err(OperationError::ResourceLimit));
3933        })
3934    }
3935
3936    #[test]
3937    fn test_be_multiple_create() {
3938        sketching::test_init();
3939
3940        // This is a demo idxmeta, purely for testing.
3941        let idxmeta = vec![IdxKey {
3942            attr: Attribute::Uuid,
3943            itype: IndexType::Equality,
3944        }];
3945
3946        let be_a = Backend::new(BackendConfig::new_test("main"), idxmeta.clone(), false)
3947            .expect("Failed to setup backend");
3948
3949        let be_b = Backend::new(BackendConfig::new_test("db_2"), idxmeta, false)
3950            .expect("Failed to setup backend");
3951
3952        let mut be_a_txn = be_a.write().unwrap();
3953        let mut be_b_txn = be_b.write().unwrap();
3954
3955        assert!(be_a_txn.get_db_s_uuid() != be_b_txn.get_db_s_uuid());
3956
3957        // Create into A
3958        let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3959        e.add_ava(Attribute::UserId, Value::from("william"));
3960        e.add_ava(
3961            Attribute::Uuid,
3962            Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3963        );
3964        let e = e.into_sealed_new();
3965
3966        let single_result = be_a_txn.create(&CID_ZERO, vec![e]);
3967
3968        assert!(single_result.is_ok());
3969
3970        // Assert it's in A but not B.
3971        let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("william")));
3972
3973        let lims = Limits::unlimited();
3974
3975        let r = be_a_txn.search(&lims, &filt);
3976        assert!(r.expect("Search failed!").len() == 1);
3977
3978        let r = be_b_txn.search(&lims, &filt);
3979        assert!(r.expect("Search failed!").is_empty());
3980
3981        // Create into B
3982        let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3983        e.add_ava(Attribute::UserId, Value::from("claire"));
3984        e.add_ava(
3985            Attribute::Uuid,
3986            Value::from("0c680959-0944-47d6-9dea-53304d124266"),
3987        );
3988        let e = e.into_sealed_new();
3989
3990        let single_result = be_b_txn.create(&CID_ZERO, vec![e]);
3991
3992        assert!(single_result.is_ok());
3993
3994        // Assert it's in B but not A
3995        let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("claire")));
3996
3997        let lims = Limits::unlimited();
3998
3999        let r = be_a_txn.search(&lims, &filt);
4000        assert!(r.expect("Search failed!").is_empty());
4001
4002        let r = be_b_txn.search(&lims, &filt);
4003        assert!(r.expect("Search failed!").len() == 1);
4004    }
4005}