kanidmd_lib/be/
mod.rs

1//! The backend. This contains the "low level" storage and query code, which is
2//! implemented as a json-like kv document database. This has no rules about content
3//! of the server, which are all enforced at higher levels. The role of the backend
4//! is to persist content safely to disk, load that content, and execute queries
5//! utilising indexes in the most effective way possible.
6
7use crate::be::dbentry::{DbBackup, DbEntry};
8use crate::be::dbrepl::DbReplMeta;
9use crate::entry::Entry;
10use crate::filter::{Filter, FilterPlan, FilterResolved, FilterValidResolved};
11use crate::prelude::*;
12use crate::repl::cid::Cid;
13use crate::repl::proto::ReplCidRange;
14use crate::repl::ruv::{
15    ReplicationUpdateVector, ReplicationUpdateVectorReadTransaction,
16    ReplicationUpdateVectorTransaction, ReplicationUpdateVectorWriteTransaction,
17};
18use crate::utils::trigraph_iter;
19use crate::value::{IndexType, Value};
20use concread::cowcell::*;
21use hashbrown::{HashMap as Map, HashSet};
22use idlset::v2::IDLBitRange;
23use idlset::AndNot;
24use kanidm_proto::backup::BackupCompression;
25use kanidm_proto::internal::{ConsistencyError, OperationError};
26use std::collections::BTreeMap;
27use std::fs;
28use std::io::prelude::*;
29use std::io::Read;
30use std::ops::DerefMut;
31use std::path::{Path, PathBuf};
32use std::sync::Arc;
33use std::time::Duration;
34use tracing::{trace, trace_span};
35use uuid::Uuid;
36
37use flate2::write::GzEncoder;
38use flate2::Compression;
39
40pub(crate) mod dbentry;
41pub(crate) mod dbrepl;
42pub(crate) mod dbvalue;
43
44mod idl_arc_sqlite;
45mod idl_sqlite;
46pub(crate) mod idxkey;
47pub(crate) mod keystorage;
48
49pub(crate) use self::idxkey::{IdxKey, IdxKeyRef, IdxKeyToRef, IdxSlope};
50use crate::be::idl_arc_sqlite::{
51    IdlArcSqlite, IdlArcSqliteReadTransaction, IdlArcSqliteTransaction,
52    IdlArcSqliteWriteTransaction,
53};
54use kanidm_proto::internal::FsType;
55
56// Currently disabled due to improvements in idlset for intersection handling.
57const FILTER_SEARCH_TEST_THRESHOLD: usize = 0;
58const FILTER_EXISTS_TEST_THRESHOLD: usize = 0;
59const FILTER_SUBSTR_TEST_THRESHOLD: usize = 4;
60
61#[derive(Debug, Clone)]
62/// Limits on the resources a single event can consume. These are defined per-event
63/// as they are derived from the userAuthToken based on that individual session
64pub struct Limits {
65    pub unindexed_allow: bool,
66    pub search_max_results: usize,
67    pub search_max_filter_test: usize,
68    pub filter_max_elements: usize,
69}
70
71impl Default for Limits {
72    fn default() -> Self {
73        Limits {
74            unindexed_allow: false,
75            search_max_results: DEFAULT_LIMIT_SEARCH_MAX_RESULTS as usize,
76            search_max_filter_test: DEFAULT_LIMIT_SEARCH_MAX_FILTER_TEST as usize,
77            filter_max_elements: DEFAULT_LIMIT_FILTER_MAX_ELEMENTS as usize,
78        }
79    }
80}
81
82impl Limits {
83    pub fn unlimited() -> Self {
84        Limits {
85            unindexed_allow: true,
86            search_max_results: usize::MAX >> 1,
87            search_max_filter_test: usize::MAX >> 1,
88            filter_max_elements: usize::MAX,
89        }
90    }
91
92    pub fn api_token() -> Self {
93        Limits {
94            unindexed_allow: false,
95            search_max_results: DEFAULT_LIMIT_API_SEARCH_MAX_RESULTS as usize,
96            search_max_filter_test: DEFAULT_LIMIT_API_SEARCH_MAX_FILTER_TEST as usize,
97            filter_max_elements: DEFAULT_LIMIT_FILTER_MAX_ELEMENTS as usize,
98        }
99    }
100}
101
102/// The result of a key value request containing the list of entry IDs that
103/// match the filter/query condition.
104#[derive(Debug, Clone)]
105pub enum IdList {
106    /// The value is not indexed, and must be assumed that all entries may match.
107    AllIds,
108    /// The index is "fuzzy" like a bloom filter (perhaps superset is a better description) -
109    /// it contains all elements that do match, but may have extra elements that don't.
110    /// This requires the caller to perform a filter test to assert that all
111    /// returned entries match all assertions within the filter.
112    Partial(IDLBitRange),
113    /// The set was indexed and is below the filter test threshold. This is because it's
114    /// now faster to test with the filter than to continue to access indexes at this point.
115    /// Like a partial set, this is a super set of the entries that match the query.
116    PartialThreshold(IDLBitRange),
117    /// The value is indexed and accurately represents the set of entries that precisely match.
118    Indexed(IDLBitRange),
119}
120
121#[derive(Debug)]
122pub struct IdRawEntry {
123    id: u64,
124    data: Vec<u8>,
125}
126
127#[derive(Debug, Clone)]
128pub struct IdxMeta {
129    pub idxkeys: Map<IdxKey, IdxSlope>,
130}
131
132impl IdxMeta {
133    pub fn new(idxkeys: Map<IdxKey, IdxSlope>) -> Self {
134        IdxMeta { idxkeys }
135    }
136}
137
138#[derive(Clone)]
139pub struct BackendConfig {
140    path: PathBuf,
141    pool_size: u32,
142    db_name: &'static str,
143    fstype: FsType,
144    // Cachesizes?
145    arcsize: Option<usize>,
146}
147
148impl BackendConfig {
149    pub fn new(
150        path: Option<&Path>,
151        pool_size: u32,
152        fstype: FsType,
153        arcsize: Option<usize>,
154    ) -> Self {
155        BackendConfig {
156            pool_size,
157            // This means if path is None, that "" implies an sqlite in memory/ram only database.
158            path: path.unwrap_or_else(|| Path::new("")).to_path_buf(),
159            db_name: "main",
160            fstype,
161            arcsize,
162        }
163    }
164
165    pub(crate) fn new_test(db_name: &'static str) -> Self {
166        BackendConfig {
167            pool_size: 1,
168            path: PathBuf::from(""),
169            db_name,
170            fstype: FsType::Generic,
171            arcsize: Some(2048),
172        }
173    }
174}
175
176#[derive(Clone)]
177pub struct Backend {
178    /// This is the actual datastorage layer.
179    idlayer: Arc<IdlArcSqlite>,
180    /// This is a copy-on-write cache of the index metadata that has been
181    /// extracted from attributes set, in the correct format for the backend
182    /// to consume. We use it to extract indexes from entries during write paths
183    /// and to allow the front end to know what indexes exist during a read.
184    idxmeta: Arc<CowCell<IdxMeta>>,
185    /// The current state of the replication update vector. This is effectively a
186    /// time series index of the full list of all changelog entries and what entries
187    /// that are part of that change.
188    ruv: Arc<ReplicationUpdateVector>,
189    cfg: BackendConfig,
190}
191
192pub struct BackendReadTransaction<'a> {
193    idlayer: IdlArcSqliteReadTransaction<'a>,
194    idxmeta: CowCellReadTxn<IdxMeta>,
195    ruv: ReplicationUpdateVectorReadTransaction<'a>,
196}
197
198unsafe impl Sync for BackendReadTransaction<'_> {}
199
200unsafe impl Send for BackendReadTransaction<'_> {}
201
202pub struct BackendWriteTransaction<'a> {
203    idlayer: IdlArcSqliteWriteTransaction<'a>,
204    idxmeta_wr: CowCellWriteTxn<'a, IdxMeta>,
205    ruv: ReplicationUpdateVectorWriteTransaction<'a>,
206}
207
208impl IdRawEntry {
209    fn into_dbentry(self) -> Result<(u64, DbEntry), OperationError> {
210        serde_json::from_slice(self.data.as_slice())
211            .map_err(|e| {
212                admin_error!(?e, "Serde JSON Error");
213                OperationError::SerdeJsonError
214            })
215            .map(|dbe| (self.id, dbe))
216    }
217
218    fn into_entry(self) -> Result<EntrySealedCommitted, OperationError> {
219        let db_e = serde_json::from_slice(self.data.as_slice()).map_err(|e| {
220            admin_error!(?e, id = %self.id, "Serde JSON Error");
221            let raw_str = String::from_utf8_lossy(self.data.as_slice());
222            debug!(raw = %raw_str);
223            OperationError::SerdeJsonError
224        })?;
225        // let id = u64::try_from(self.id).map_err(|_| OperationError::InvalidEntryId)?;
226        Entry::from_dbentry(db_e, self.id).ok_or(OperationError::CorruptedEntry(self.id))
227    }
228}
229
230pub trait BackendTransaction {
231    type IdlLayerType: IdlArcSqliteTransaction;
232    fn get_idlayer(&mut self) -> &mut Self::IdlLayerType;
233
234    type RuvType: ReplicationUpdateVectorTransaction;
235    fn get_ruv(&mut self) -> &mut Self::RuvType;
236
237    fn get_idxmeta_ref(&self) -> &IdxMeta;
238
239    /// Recursively apply a filter, transforming into IdList's on the way. This builds a query
240    /// execution log, so that it can be examined how an operation proceeded.
241    #[allow(clippy::cognitive_complexity)]
242    fn filter2idl(
243        &mut self,
244        filt: &FilterResolved,
245        thres: usize,
246    ) -> Result<(IdList, FilterPlan), OperationError> {
247        Ok(match filt {
248            FilterResolved::Eq(attr, value, idx) => {
249                if idx.is_some() {
250                    // Get the idx_key
251                    let idx_key = value.get_idx_eq_key();
252                    // Get the idl for this
253                    match self
254                        .get_idlayer()
255                        .get_idl(attr, IndexType::Equality, &idx_key)?
256                    {
257                        Some(idl) => (
258                            IdList::Indexed(idl),
259                            FilterPlan::EqIndexed(attr.clone(), idx_key),
260                        ),
261                        None => (IdList::AllIds, FilterPlan::EqCorrupt(attr.clone())),
262                    }
263                } else {
264                    // Schema believes this is not indexed
265                    (IdList::AllIds, FilterPlan::EqUnindexed(attr.clone()))
266                }
267            }
268            FilterResolved::Stw(attr, subvalue, idx)
269            | FilterResolved::Enw(attr, subvalue, idx)
270            | FilterResolved::Cnt(attr, subvalue, idx) => {
271                // Get the idx_key. Not all types support this, so may return "none".
272                trace!(?idx, ?subvalue, ?attr);
273                if let (true, Some(idx_key)) = (idx.is_some(), subvalue.get_idx_sub_key()) {
274                    self.filter2idl_sub(attr, idx_key)?
275                } else {
276                    // Schema believes this is not indexed
277                    (IdList::AllIds, FilterPlan::SubUnindexed(attr.clone()))
278                }
279            }
280            FilterResolved::Pres(attr, idx) => {
281                if idx.is_some() {
282                    // Get the idl for this
283                    match self.get_idlayer().get_idl(attr, IndexType::Presence, "_")? {
284                        Some(idl) => (IdList::Indexed(idl), FilterPlan::PresIndexed(attr.clone())),
285                        None => (IdList::AllIds, FilterPlan::PresCorrupt(attr.clone())),
286                    }
287                } else {
288                    // Schema believes this is not indexed
289                    (IdList::AllIds, FilterPlan::PresUnindexed(attr.clone()))
290                }
291            }
292            FilterResolved::LessThan(attr, _subvalue, idx) => {
293                if idx.is_some() {
294                    // TODO: Temporary but we get the PRESENCE index for Ordering operations to
295                    // reduce the amount of entries we need to filter in memory. In future we need
296                    // a true ordering index, but that's a large block of work on it's own. For now
297                    // this already helps a lot for in memory processing.
298                    match self.get_idlayer().get_idl(attr, IndexType::Presence, "_")? {
299                        Some(idl) => (
300                            IdList::Partial(idl),
301                            FilterPlan::LessThanIndexed(attr.clone()),
302                        ),
303                        None => (IdList::AllIds, FilterPlan::LessThanCorrupt(attr.clone())),
304                    }
305                } else {
306                    (IdList::AllIds, FilterPlan::LessThanUnindexed(attr.clone()))
307                }
308            }
309            FilterResolved::Or(l, _) => {
310                // Importantly if this has no inner elements, this returns
311                // an empty list.
312                let mut plan = Vec::with_capacity(0);
313                let mut result = IDLBitRange::new();
314                let mut partial = false;
315                let mut threshold = false;
316                // For each filter in l
317                for f in l.iter() {
318                    // get their idls
319                    match self.filter2idl(f, thres)? {
320                        (IdList::Indexed(idl), fp) => {
321                            plan.push(fp);
322                            // now union them (if possible)
323                            result = result | idl;
324                        }
325                        (IdList::Partial(idl), fp) => {
326                            plan.push(fp);
327                            // now union them (if possible)
328                            result = result | idl;
329                            partial = true;
330                        }
331                        (IdList::PartialThreshold(idl), fp) => {
332                            plan.push(fp);
333                            // now union them (if possible)
334                            result = result | idl;
335                            partial = true;
336                            threshold = true;
337                        }
338                        (IdList::AllIds, fp) => {
339                            plan.push(fp);
340                            // If we find anything unindexed, the whole term is unindexed.
341                            filter_trace!("Term {:?} is AllIds, shortcut return", f);
342                            let setplan = FilterPlan::OrUnindexed(plan);
343                            return Ok((IdList::AllIds, setplan));
344                        }
345                    }
346                } // end or.iter()
347                  // If we got here, every term must have been indexed or partial indexed.
348                if partial {
349                    if threshold {
350                        let setplan = FilterPlan::OrPartialThreshold(plan);
351                        (IdList::PartialThreshold(result), setplan)
352                    } else {
353                        let setplan = FilterPlan::OrPartial(plan);
354                        (IdList::Partial(result), setplan)
355                    }
356                } else {
357                    let setplan = FilterPlan::OrIndexed(plan);
358                    (IdList::Indexed(result), setplan)
359                }
360            }
361            FilterResolved::And(l, _) => {
362                // This algorithm is a little annoying. I couldn't get it to work with iter and
363                // folds due to the logic needed ...
364
365                // First, setup the two filter lists. We always apply AndNot after positive
366                // and terms.
367                let (f_andnot, f_rem): (Vec<_>, Vec<_>) = l.iter().partition(|f| f.is_andnot());
368
369                // We make this an iter, so everything comes off in order. if we used pop it means we
370                // pull from the tail, which is the WORST item to start with!
371                let mut f_rem_iter = f_rem.iter();
372
373                // Setup the initial result.
374                let (mut cand_idl, fp) = match f_rem_iter.next() {
375                    Some(f) => self.filter2idl(f, thres)?,
376                    None => {
377                        filter_warn!(
378                            "And filter was empty, or contains only AndNot, can not evaluate."
379                        );
380                        return Ok((IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid));
381                    }
382                };
383
384                // Setup the counter of terms we have left to evaluate.
385                // This is used so that we shortcut return ONLY when we really do have
386                // more terms remaining.
387                let mut f_rem_count = f_rem.len() + f_andnot.len() - 1;
388
389                // Setup the query plan tracker
390                let mut plan = vec![fp];
391
392                match &cand_idl {
393                    IdList::Indexed(idl) | IdList::Partial(idl) | IdList::PartialThreshold(idl) => {
394                        // When below thres, we have to return partials to trigger the entry_no_match_filter check.
395                        // But we only do this when there are actually multiple elements in the and,
396                        // because an and with 1 element now is FULLY resolved.
397                        if idl.below_threshold(thres) && f_rem_count > 0 {
398                            let setplan = FilterPlan::AndPartialThreshold(plan);
399                            return Ok((IdList::PartialThreshold(idl.clone()), setplan));
400                        } else if idl.is_empty() {
401                            // Regardless of the input state, if it's empty, this can never
402                            // be satisfied, so return we are indexed and complete.
403                            let setplan = FilterPlan::AndEmptyCand(plan);
404                            return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
405                        }
406                    }
407                    IdList::AllIds => {}
408                }
409
410                // Now, for all remaining,
411                for f in f_rem_iter {
412                    f_rem_count -= 1;
413                    let (inter, fp) = self.filter2idl(f, thres)?;
414                    plan.push(fp);
415                    cand_idl = match (cand_idl, inter) {
416                        (IdList::Indexed(ia), IdList::Indexed(ib)) => {
417                            let r = ia & ib;
418                            if r.below_threshold(thres) && f_rem_count > 0 {
419                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
420                                let setplan = FilterPlan::AndPartialThreshold(plan);
421                                return Ok((IdList::PartialThreshold(r), setplan));
422                            } else if r.is_empty() {
423                                // Regardless of the input state, if it's empty, this can never
424                                // be satisfied, so return we are indexed and complete.
425                                let setplan = FilterPlan::AndEmptyCand(plan);
426                                return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
427                            } else {
428                                IdList::Indexed(r)
429                            }
430                        }
431                        (IdList::Indexed(ia), IdList::Partial(ib))
432                        | (IdList::Partial(ia), IdList::Indexed(ib))
433                        | (IdList::Partial(ia), IdList::Partial(ib)) => {
434                            let r = ia & ib;
435                            if r.below_threshold(thres) && f_rem_count > 0 {
436                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
437                                let setplan = FilterPlan::AndPartialThreshold(plan);
438                                return Ok((IdList::PartialThreshold(r), setplan));
439                            } else {
440                                IdList::Partial(r)
441                            }
442                        }
443                        (IdList::Indexed(ia), IdList::PartialThreshold(ib))
444                        | (IdList::PartialThreshold(ia), IdList::Indexed(ib))
445                        | (IdList::PartialThreshold(ia), IdList::PartialThreshold(ib))
446                        | (IdList::PartialThreshold(ia), IdList::Partial(ib))
447                        | (IdList::Partial(ia), IdList::PartialThreshold(ib)) => {
448                            let r = ia & ib;
449                            if r.below_threshold(thres) && f_rem_count > 0 {
450                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
451                                let setplan = FilterPlan::AndPartialThreshold(plan);
452                                return Ok((IdList::PartialThreshold(r), setplan));
453                            } else {
454                                IdList::PartialThreshold(r)
455                            }
456                        }
457                        (IdList::Indexed(i), IdList::AllIds)
458                        | (IdList::AllIds, IdList::Indexed(i))
459                        | (IdList::Partial(i), IdList::AllIds)
460                        | (IdList::AllIds, IdList::Partial(i)) => IdList::Partial(i),
461                        (IdList::PartialThreshold(i), IdList::AllIds)
462                        | (IdList::AllIds, IdList::PartialThreshold(i)) => {
463                            IdList::PartialThreshold(i)
464                        }
465                        (IdList::AllIds, IdList::AllIds) => IdList::AllIds,
466                    };
467                }
468
469                // debug!("partial cand set ==> {:?}", cand_idl);
470
471                for f in f_andnot.iter() {
472                    f_rem_count -= 1;
473                    let FilterResolved::AndNot(f_in, _) = f else {
474                        filter_error!("Invalid server state, a cand filter leaked to andnot set!");
475                        return Err(OperationError::InvalidState);
476                    };
477                    let (inter, fp) = self.filter2idl(f_in, thres)?;
478                    // It's an and not, so we need to wrap the plan accordingly.
479                    plan.push(FilterPlan::AndNot(Box::new(fp)));
480                    cand_idl = match (cand_idl, inter) {
481                        (IdList::Indexed(ia), IdList::Indexed(ib)) => {
482                            let r = ia.andnot(ib);
483                            /*
484                            // Don't trigger threshold on and nots if fully indexed.
485                            if r.below_threshold(thres) {
486                                // When below thres, we have to return partials to trigger the entry_no_match_filter check.
487                                return Ok(IdList::PartialThreshold(r));
488                            } else {
489                                IdList::Indexed(r)
490                            }
491                            */
492                            IdList::Indexed(r)
493                        }
494                        (IdList::Indexed(ia), IdList::Partial(ib))
495                        | (IdList::Partial(ia), IdList::Indexed(ib))
496                        | (IdList::Partial(ia), IdList::Partial(ib)) => {
497                            let r = ia.andnot(ib);
498                            // DO trigger threshold on partials, because we have to apply the filter
499                            // test anyway, so we may as well shortcut at this point.
500                            if r.below_threshold(thres) && f_rem_count > 0 {
501                                let setplan = FilterPlan::AndPartialThreshold(plan);
502                                return Ok((IdList::PartialThreshold(r), setplan));
503                            } else {
504                                IdList::Partial(r)
505                            }
506                        }
507                        (IdList::Indexed(ia), IdList::PartialThreshold(ib))
508                        | (IdList::PartialThreshold(ia), IdList::Indexed(ib))
509                        | (IdList::PartialThreshold(ia), IdList::PartialThreshold(ib))
510                        | (IdList::PartialThreshold(ia), IdList::Partial(ib))
511                        | (IdList::Partial(ia), IdList::PartialThreshold(ib)) => {
512                            let r = ia.andnot(ib);
513                            // DO trigger threshold on partials, because we have to apply the filter
514                            // test anyway, so we may as well shortcut at this point.
515                            if r.below_threshold(thres) && f_rem_count > 0 {
516                                let setplan = FilterPlan::AndPartialThreshold(plan);
517                                return Ok((IdList::PartialThreshold(r), setplan));
518                            } else {
519                                IdList::PartialThreshold(r)
520                            }
521                        }
522
523                        (IdList::Indexed(_), IdList::AllIds)
524                        | (IdList::AllIds, IdList::Indexed(_))
525                        | (IdList::Partial(_), IdList::AllIds)
526                        | (IdList::AllIds, IdList::Partial(_))
527                        | (IdList::PartialThreshold(_), IdList::AllIds)
528                        | (IdList::AllIds, IdList::PartialThreshold(_)) => {
529                            // We could actually generate allids here
530                            // and then try to reduce the and-not set, but
531                            // for now we just return all ids.
532                            IdList::AllIds
533                        }
534                        (IdList::AllIds, IdList::AllIds) => IdList::AllIds,
535                    };
536                }
537
538                // What state is the final cand idl in?
539                let setplan = match cand_idl {
540                    IdList::Indexed(_) => FilterPlan::AndIndexed(plan),
541                    IdList::Partial(_) | IdList::PartialThreshold(_) => {
542                        FilterPlan::AndPartial(plan)
543                    }
544                    IdList::AllIds => FilterPlan::AndUnindexed(plan),
545                };
546
547                // Finally, return the result.
548                // debug!("final cand set ==> {:?}", cand_idl);
549                (cand_idl, setplan)
550            } // end and
551            FilterResolved::Inclusion(l, _) => {
552                // For inclusion to be valid, every term must have *at least* one element present.
553                // This really relies on indexing, and so it's internal only - generally only
554                // for fully indexed existence queries, such as from refint.
555
556                // This has a lot in common with an And and Or but not really quite either.
557                let mut plan = Vec::with_capacity(0);
558                let mut result = IDLBitRange::new();
559                // For each filter in l
560                for f in l.iter() {
561                    // get their idls
562                    match self.filter2idl(f, thres)? {
563                        (IdList::Indexed(idl), fp) => {
564                            plan.push(fp);
565                            if idl.is_empty() {
566                                // It's empty, so something is missing. Bail fast.
567                                filter_trace!("Inclusion is unable to proceed - an empty (missing) item was found!");
568                                let setplan = FilterPlan::InclusionIndexed(plan);
569                                return Ok((IdList::Indexed(IDLBitRange::new()), setplan));
570                            } else {
571                                result = result | idl;
572                            }
573                        }
574                        (_, fp) => {
575                            plan.push(fp);
576                            let setplan = FilterPlan::InclusionInvalid(plan);
577                            error!(
578                                ?setplan,
579                                "Inclusion is unable to proceed - all terms must be fully indexed!"
580                            );
581                            return Ok((IdList::Partial(IDLBitRange::new()), setplan));
582                        }
583                    }
584                } // end or.iter()
585                  // If we got here, every term must have been indexed
586                let setplan = FilterPlan::InclusionIndexed(plan);
587                (IdList::Indexed(result), setplan)
588            }
589            // So why does this return empty? Normally we actually process an AndNot in the context
590            // of an "AND" query, but if it's used anywhere else IE the root filter, then there is
591            // no other set to exclude - therefore it's empty set. Additionally, even in an OR query
592            // the AndNot will be skipped as an empty set for the same reason.
593            FilterResolved::AndNot(_f, _) => {
594                // get the idl for f
595                // now do andnot?
596                filter_error!("Requested a top level or isolated AndNot, returning empty");
597                (IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid)
598            }
599            FilterResolved::Invalid(_) => {
600                // Indexed since it is always false and we don't want to influence filter testing
601                (IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid)
602            }
603        })
604    }
605
606    fn filter2idl_sub(
607        &mut self,
608        attr: &Attribute,
609        sub_idx_key: String,
610    ) -> Result<(IdList, FilterPlan), OperationError> {
611        // Now given that idx_key, we will iterate over the possible graphemes.
612        let mut grapheme_iter = trigraph_iter(&sub_idx_key);
613
614        // Substrings are always partial because we have to split the keys up
615        // and we don't pay attention to starts/ends with conditions. We need
616        // the caller to check those conditions manually at run time. This lets
617        // the index focus on trigraph indexes only rather than needing to
618        // worry about those other bits. In a way substring indexes are "fuzzy".
619
620        let mut idl = match grapheme_iter.next() {
621            Some(idx_key) => {
622                match self
623                    .get_idlayer()
624                    .get_idl(attr, IndexType::SubString, idx_key)?
625                {
626                    Some(idl) => idl,
627                    None => return Ok((IdList::AllIds, FilterPlan::SubCorrupt(attr.clone()))),
628                }
629            }
630            None => {
631                // If there are no graphemes this means the attempt is for an empty string, so
632                // we return an empty result set.
633                return Ok((IdList::Indexed(IDLBitRange::new()), FilterPlan::Invalid));
634            }
635        };
636
637        if idl.len() > FILTER_SUBSTR_TEST_THRESHOLD {
638            for idx_key in grapheme_iter {
639                // Get the idl for this
640                match self
641                    .get_idlayer()
642                    .get_idl(attr, IndexType::SubString, idx_key)?
643                {
644                    Some(r_idl) => {
645                        // Do an *and* operation between what we found and our working idl.
646                        idl = r_idl & idl;
647                    }
648                    None => {
649                        // if something didn't match, then we simply bail out after zeroing the current IDL.
650                        idl = IDLBitRange::new();
651                    }
652                };
653
654                if idl.len() < FILTER_SUBSTR_TEST_THRESHOLD {
655                    break;
656                }
657            }
658        } else {
659            drop(grapheme_iter);
660        }
661
662        // We exhausted the grapheme iter, exit with what we found.
663        Ok((
664            IdList::Partial(idl),
665            FilterPlan::SubIndexed(attr.clone(), sub_idx_key),
666        ))
667    }
668
669    #[instrument(level = "debug", name = "be::search", skip_all)]
670    fn search(
671        &mut self,
672        erl: &Limits,
673        filt: &Filter<FilterValidResolved>,
674    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
675        // Unlike DS, even if we don't get the index back, we can just pass
676        // to the in-memory filter test and be done.
677
678        trace!(filter_optimised = ?filt);
679
680        let (idl, fplan) = trace_span!("be::search -> filter2idl")
681            .in_scope(|| self.filter2idl(filt.to_inner(), FILTER_SEARCH_TEST_THRESHOLD))?;
682
683        debug!(search_filter_executed_plan = %fplan);
684
685        match &idl {
686            IdList::AllIds => {
687                if !erl.unindexed_allow {
688                    admin_error!(
689                        "filter (search) is fully unindexed, and not allowed by resource limits"
690                    );
691                    return Err(OperationError::ResourceLimit);
692                }
693            }
694            IdList::Partial(idl_br) => {
695                // if idl_br.len() > erl.search_max_filter_test {
696                if !idl_br.below_threshold(erl.search_max_filter_test) {
697                    admin_error!("filter (search) is partial indexed and greater than search_max_filter_test allowed by resource limits");
698                    return Err(OperationError::ResourceLimit);
699                }
700            }
701            IdList::PartialThreshold(_) => {
702                // Since we opted for this, this is not the fault
703                // of the user and we should not penalise them by limiting on partial.
704            }
705            IdList::Indexed(idl_br) => {
706                // We know this is resolved here, so we can attempt the limit
707                // check. This has to fold the whole index, but you know, class=pres is
708                // indexed ...
709                // if idl_br.len() > erl.search_max_results {
710                if !idl_br.below_threshold(erl.search_max_results) {
711                    admin_error!("filter (search) is indexed and greater than search_max_results allowed by resource limits");
712                    return Err(OperationError::ResourceLimit);
713                }
714            }
715        };
716
717        let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
718            admin_error!(?e, "get_identry failed");
719            e
720        })?;
721
722        let mut entries_filtered = match idl {
723            IdList::AllIds => trace_span!("be::search<entry::ftest::allids>").in_scope(|| {
724                entries
725                    .into_iter()
726                    .filter(|e| e.entry_match_no_index(filt))
727                    .collect()
728            }),
729            IdList::Partial(_) => trace_span!("be::search<entry::ftest::partial>").in_scope(|| {
730                entries
731                    .into_iter()
732                    .filter(|e| e.entry_match_no_index(filt))
733                    .collect()
734            }),
735            IdList::PartialThreshold(_) => trace_span!("be::search<entry::ftest::thresh>")
736                .in_scope(|| {
737                    entries
738                        .into_iter()
739                        .filter(|e| e.entry_match_no_index(filt))
740                        .collect()
741                }),
742            // Since the index fully resolved, we can shortcut the filter test step here!
743            IdList::Indexed(_) => {
744                filter_trace!("filter (search) was fully indexed 👏");
745                entries
746            }
747        };
748
749        // If the idl was not indexed, apply the resource limit now. Avoid the needless match since the
750        // if statement is quick.
751        if entries_filtered.len() > erl.search_max_results {
752            admin_error!("filter (search) is resolved and greater than search_max_results allowed by resource limits");
753            return Err(OperationError::ResourceLimit);
754        }
755
756        // Trim any excess capacity if needed
757        entries_filtered.shrink_to_fit();
758
759        Ok(entries_filtered)
760    }
761
762    /// Given a filter, assert some condition exists.
763    /// Basically, this is a specialised case of search, where we don't need to
764    /// load any candidates if they match. This is heavily used in uuid
765    /// refint and attr uniqueness.
766    #[instrument(level = "debug", name = "be::exists", skip_all)]
767    fn exists(
768        &mut self,
769        erl: &Limits,
770        filt: &Filter<FilterValidResolved>,
771    ) -> Result<bool, OperationError> {
772        trace!(filter_optimised = ?filt);
773
774        // Using the indexes, resolve the IdList here, or AllIds.
775        // Also get if the filter was 100% resolved or not.
776        let (idl, fplan) = trace_span!("be::exists -> filter2idl")
777            .in_scope(|| self.filter2idl(filt.to_inner(), FILTER_EXISTS_TEST_THRESHOLD))?;
778
779        debug!(exist_filter_executed_plan = %fplan);
780
781        // Apply limits to the IdList.
782        match &idl {
783            IdList::AllIds => {
784                if !erl.unindexed_allow {
785                    admin_error!(
786                        "filter (exists) is fully unindexed, and not allowed by resource limits"
787                    );
788                    return Err(OperationError::ResourceLimit);
789                }
790            }
791            IdList::Partial(idl_br) => {
792                if !idl_br.below_threshold(erl.search_max_filter_test) {
793                    admin_error!("filter (exists) is partial indexed and greater than search_max_filter_test allowed by resource limits");
794                    return Err(OperationError::ResourceLimit);
795                }
796            }
797            IdList::PartialThreshold(_) => {
798                // Since we opted for this, this is not the fault
799                // of the user and we should not penalise them.
800            }
801            IdList::Indexed(_) => {}
802        }
803
804        // Now, check the idl -- if it's fully resolved, we can skip this because the query
805        // was fully indexed.
806        match &idl {
807            IdList::Indexed(idl) => Ok(!idl.is_empty()),
808            _ => {
809                let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
810                    admin_error!(?e, "get_identry failed");
811                    e
812                })?;
813
814                // if not 100% resolved query, apply the filter test.
815                let entries_filtered: Vec<_> =
816                    trace_span!("be::exists<entry::ftest>").in_scope(|| {
817                        entries
818                            .into_iter()
819                            .filter(|e| e.entry_match_no_index(filt))
820                            .collect()
821                    });
822
823                Ok(!entries_filtered.is_empty())
824            }
825        } // end match idl
826    }
827
828    fn retrieve_range(
829        &mut self,
830        ranges: &BTreeMap<Uuid, ReplCidRange>,
831    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
832        // First pass the ranges to the ruv to resolve to an absolute set of
833        // entry id's.
834
835        let idl = self.get_ruv().range_to_idl(ranges);
836        // Because of how this works, I think that it's not possible for the idl
837        // to have any missing ids.
838        //
839        // If it was possible, we could just & with allids to remove the extraneous
840        // values.
841
842        if idl.is_empty() {
843            // return no entries.
844            return Ok(Vec::with_capacity(0));
845        }
846
847        // Make it an id list fr the backend.
848        let id_list = IdList::Indexed(idl);
849
850        self.get_idlayer().get_identry(&id_list).map_err(|e| {
851            admin_error!(?e, "get_identry failed");
852            e
853        })
854    }
855
856    fn verify(&mut self) -> Vec<Result<(), ConsistencyError>> {
857        self.get_idlayer().verify()
858    }
859
860    fn verify_entry_index(&mut self, e: &EntrySealedCommitted) -> Result<(), ConsistencyError> {
861        // First, check our references in name2uuid, uuid2spn and uuid2rdn
862        if e.mask_recycled_ts().is_some() {
863            let e_uuid = e.get_uuid();
864            // We only check these on live entries.
865            let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(None, Some(e));
866
867            let (Some(n2u_set), None) = (n2u_add, n2u_rem) else {
868                admin_error!("Invalid idx_name2uuid_diff state");
869                return Err(ConsistencyError::BackendIndexSync);
870            };
871
872            // If the set.len > 1, check each item.
873            n2u_set
874                .iter()
875                .try_for_each(|name| match self.get_idlayer().name2uuid(name) {
876                    Ok(Some(idx_uuid)) => {
877                        if idx_uuid == e_uuid {
878                            Ok(())
879                        } else {
880                            admin_error!("Invalid name2uuid state -> incorrect uuid association");
881                            Err(ConsistencyError::BackendIndexSync)
882                        }
883                    }
884                    r => {
885                        admin_error!(state = ?r, "Invalid name2uuid state");
886                        Err(ConsistencyError::BackendIndexSync)
887                    }
888                })?;
889
890            let spn = e.get_uuid2spn();
891            match self.get_idlayer().uuid2spn(e_uuid) {
892                Ok(Some(idx_spn)) => {
893                    if spn != idx_spn {
894                        admin_error!("Invalid uuid2spn state -> incorrect idx spn value");
895                        return Err(ConsistencyError::BackendIndexSync);
896                    }
897                }
898                r => {
899                    admin_error!(state = ?r, ?e_uuid, "Invalid uuid2spn state");
900                    trace!(entry = ?e);
901                    return Err(ConsistencyError::BackendIndexSync);
902                }
903            };
904
905            let rdn = e.get_uuid2rdn();
906            match self.get_idlayer().uuid2rdn(e_uuid) {
907                Ok(Some(idx_rdn)) => {
908                    if rdn != idx_rdn {
909                        admin_error!("Invalid uuid2rdn state -> incorrect idx rdn value");
910                        return Err(ConsistencyError::BackendIndexSync);
911                    }
912                }
913                r => {
914                    admin_error!(state = ?r, "Invalid uuid2rdn state");
915                    return Err(ConsistencyError::BackendIndexSync);
916                }
917            };
918        }
919
920        // Check the other entry:attr indexes are valid
921        //
922        // This is actually pretty hard to check, because we can check a value *should*
923        // exist, but not that a value should NOT be present in the index. Thought needed ...
924
925        // Got here? Ok!
926        Ok(())
927    }
928
929    fn verify_indexes(&mut self) -> Vec<Result<(), ConsistencyError>> {
930        let idl = IdList::AllIds;
931        let entries = match self.get_idlayer().get_identry(&idl) {
932            Ok(s) => s,
933            Err(e) => {
934                admin_error!(?e, "get_identry failure");
935                return vec![Err(ConsistencyError::Unknown)];
936            }
937        };
938
939        let r = entries.iter().try_for_each(|e| self.verify_entry_index(e));
940
941        if r.is_err() {
942            vec![r]
943        } else {
944            Vec::with_capacity(0)
945        }
946    }
947
948    fn verify_ruv(&mut self, results: &mut Vec<Result<(), ConsistencyError>>) {
949        // The way we verify this is building a whole second RUV and then comparing it.
950        let idl = IdList::AllIds;
951        let entries = match self.get_idlayer().get_identry(&idl) {
952            Ok(ent) => ent,
953            Err(e) => {
954                results.push(Err(ConsistencyError::Unknown));
955                admin_error!(?e, "get_identry failed");
956                return;
957            }
958        };
959
960        self.get_ruv().verify(&entries, results);
961    }
962
963    fn backup(
964        &mut self,
965        dst_path: &Path,
966        compression: BackupCompression,
967    ) -> Result<(), OperationError> {
968        let repl_meta = self.get_ruv().to_db_backup_ruv();
969
970        // load all entries into RAM, may need to change this later
971        // if the size of the database compared to RAM is an issue
972        let idl = IdList::AllIds;
973        let idlayer = self.get_idlayer();
974        let raw_entries: Vec<IdRawEntry> = idlayer.get_identry_raw(&idl)?;
975
976        let entries: Result<Vec<DbEntry>, _> = raw_entries
977            .iter()
978            .map(|id_ent| {
979                serde_json::from_slice(id_ent.data.as_slice())
980                    .map_err(|_| OperationError::SerdeJsonError) // log?
981            })
982            .collect();
983
984        let entries = entries?;
985
986        let db_s_uuid = idlayer
987            .get_db_s_uuid()
988            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
989        let db_d_uuid = idlayer
990            .get_db_d_uuid()
991            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
992        let db_ts_max = idlayer
993            .get_db_ts_max()
994            .and_then(|u| u.ok_or(OperationError::InvalidDbState))?;
995
996        let keyhandles = idlayer.get_key_handles()?;
997
998        let bak = DbBackup::V5 {
999            // remember env is evaled at compile time.
1000            version: env!("KANIDM_PKG_SERIES").to_string(),
1001            db_s_uuid,
1002            db_d_uuid,
1003            db_ts_max,
1004            keyhandles,
1005            repl_meta,
1006            entries,
1007        };
1008
1009        let serialized_entries_str = serde_json::to_string(&bak).map_err(|e| {
1010            admin_error!(?e, "serde error");
1011            OperationError::SerdeJsonError
1012        })?;
1013
1014        match compression {
1015            BackupCompression::NoCompression => fs::write(dst_path, serialized_entries_str)
1016                .map(|_| ())
1017                .map_err(|e| {
1018                    admin_error!(?e, "fs::write error");
1019                    OperationError::FsError
1020                }),
1021            BackupCompression::Gzip => {
1022                let writer = std::fs::File::create(dst_path).map_err(|e| {
1023                    admin_error!(?e, "File::create error creating {}", dst_path.display());
1024                    OperationError::FsError
1025                })?;
1026
1027                let mut encoder = GzEncoder::new(writer, Compression::best());
1028                encoder
1029                    .write_all(serialized_entries_str.as_bytes())
1030                    .map_err(|e| {
1031                        admin_error!(?e, "Gzip compression error writing backup");
1032                        OperationError::FsError
1033                    })?;
1034                Ok(())
1035            }
1036        }
1037    }
1038
1039    fn name2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
1040        self.get_idlayer().name2uuid(name)
1041    }
1042
1043    fn externalid2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
1044        self.get_idlayer().externalid2uuid(name)
1045    }
1046
1047    fn uuid2spn(&mut self, uuid: Uuid) -> Result<Option<Value>, OperationError> {
1048        self.get_idlayer().uuid2spn(uuid)
1049    }
1050
1051    fn uuid2rdn(&mut self, uuid: Uuid) -> Result<Option<String>, OperationError> {
1052        self.get_idlayer().uuid2rdn(uuid)
1053    }
1054}
1055
1056impl<'a> BackendTransaction for BackendReadTransaction<'a> {
1057    type IdlLayerType = IdlArcSqliteReadTransaction<'a>;
1058    type RuvType = ReplicationUpdateVectorReadTransaction<'a>;
1059
1060    fn get_idlayer(&mut self) -> &mut IdlArcSqliteReadTransaction<'a> {
1061        &mut self.idlayer
1062    }
1063
1064    fn get_ruv(&mut self) -> &mut ReplicationUpdateVectorReadTransaction<'a> {
1065        &mut self.ruv
1066    }
1067
1068    fn get_idxmeta_ref(&self) -> &IdxMeta {
1069        &self.idxmeta
1070    }
1071}
1072
1073impl BackendReadTransaction<'_> {
1074    pub fn list_indexes(&mut self) -> Result<Vec<String>, OperationError> {
1075        self.get_idlayer().list_idxs()
1076    }
1077
1078    pub fn list_id2entry(&mut self) -> Result<Vec<(u64, String)>, OperationError> {
1079        self.get_idlayer().list_id2entry()
1080    }
1081
1082    pub fn list_index_content(
1083        &mut self,
1084        index_name: &str,
1085    ) -> Result<Vec<(String, IDLBitRange)>, OperationError> {
1086        self.get_idlayer().list_index_content(index_name)
1087    }
1088
1089    pub fn get_id2entry(&mut self, id: u64) -> Result<(u64, String), OperationError> {
1090        self.get_idlayer().get_id2entry(id)
1091    }
1092
1093    pub fn list_quarantined(&mut self) -> Result<Vec<(u64, String)>, OperationError> {
1094        self.get_idlayer().list_quarantined()
1095    }
1096}
1097
1098impl<'a> BackendTransaction for BackendWriteTransaction<'a> {
1099    type IdlLayerType = IdlArcSqliteWriteTransaction<'a>;
1100    type RuvType = ReplicationUpdateVectorWriteTransaction<'a>;
1101
1102    fn get_idlayer(&mut self) -> &mut IdlArcSqliteWriteTransaction<'a> {
1103        &mut self.idlayer
1104    }
1105
1106    fn get_ruv(&mut self) -> &mut ReplicationUpdateVectorWriteTransaction<'a> {
1107        &mut self.ruv
1108    }
1109
1110    fn get_idxmeta_ref(&self) -> &IdxMeta {
1111        &self.idxmeta_wr
1112    }
1113}
1114
1115impl<'a> BackendWriteTransaction<'a> {
1116    pub(crate) fn get_ruv_write(&mut self) -> &mut ReplicationUpdateVectorWriteTransaction<'a> {
1117        &mut self.ruv
1118    }
1119
1120    #[instrument(level = "debug", name = "be::create", skip_all)]
1121    pub fn create(
1122        &mut self,
1123        cid: &Cid,
1124        entries: Vec<EntrySealedNew>,
1125    ) -> Result<Vec<EntrySealedCommitted>, OperationError> {
1126        if entries.is_empty() {
1127            admin_error!("No entries provided to BE to create, invalid server call!");
1128            return Err(OperationError::EmptyRequest);
1129        }
1130
1131        // Check that every entry has a change associated
1132        // that matches the cid?
1133        entries.iter().try_for_each(|e| {
1134            if e.get_changestate().contains_tail_cid(cid) {
1135                Ok(())
1136            } else {
1137                admin_error!(
1138                    "Entry changelog does not contain a change related to this transaction"
1139                );
1140                Err(OperationError::ReplEntryNotChanged)
1141            }
1142        })?;
1143
1144        // Now, assign id's to all the new entries.
1145
1146        let mut id_max = self.idlayer.get_id2entry_max_id()?;
1147        let c_entries: Vec<_> = entries
1148            .into_iter()
1149            .map(|e| {
1150                id_max += 1;
1151                e.into_sealed_committed_id(id_max)
1152            })
1153            .collect();
1154
1155        // All good, lets update the RUV.
1156        // This auto compresses.
1157        let ruv_idl = IDLBitRange::from_iter(c_entries.iter().map(|e| e.get_id()));
1158
1159        // We don't need to skip this like in mod since creates always go to the ruv
1160        self.get_ruv().insert_change(cid, ruv_idl)?;
1161
1162        self.idlayer.write_identries(c_entries.iter())?;
1163
1164        self.idlayer.set_id2entry_max_id(id_max);
1165
1166        // Now update the indexes as required.
1167        for e in c_entries.iter() {
1168            self.entry_index(None, Some(e))?
1169        }
1170
1171        Ok(c_entries)
1172    }
1173
1174    #[instrument(level = "debug", name = "be::create", skip_all)]
1175    /// This is similar to create, but used in the replication path as it records all
1176    /// the CID's in the entry to the RUV, but without applying the current CID as
1177    /// a new value in the RUV. We *do not* want to apply the current CID in the RUV
1178    /// related to this entry as that could cause an infinite replication loop!
1179    pub fn refresh(
1180        &mut self,
1181        entries: Vec<EntrySealedNew>,
1182    ) -> Result<Vec<EntrySealedCommitted>, OperationError> {
1183        if entries.is_empty() {
1184            admin_error!("No entries provided to BE to create, invalid server call!");
1185            return Err(OperationError::EmptyRequest);
1186        }
1187
1188        // Assign id's to all the new entries.
1189        let mut id_max = self.idlayer.get_id2entry_max_id()?;
1190        let c_entries: Vec<_> = entries
1191            .into_iter()
1192            .map(|e| {
1193                id_max += 1;
1194                e.into_sealed_committed_id(id_max)
1195            })
1196            .collect();
1197
1198        self.idlayer.write_identries(c_entries.iter())?;
1199
1200        self.idlayer.set_id2entry_max_id(id_max);
1201
1202        // Update the RUV with all the changestates of the affected entries.
1203        for e in c_entries.iter() {
1204            self.get_ruv().update_entry_changestate(e)?;
1205        }
1206
1207        // Now update the indexes as required.
1208        for e in c_entries.iter() {
1209            self.entry_index(None, Some(e))?
1210        }
1211
1212        Ok(c_entries)
1213    }
1214
1215    #[instrument(level = "debug", name = "be::modify", skip_all)]
1216    pub fn modify(
1217        &mut self,
1218        cid: &Cid,
1219        pre_entries: &[Arc<EntrySealedCommitted>],
1220        post_entries: &[EntrySealedCommitted],
1221    ) -> Result<(), OperationError> {
1222        if post_entries.is_empty() || pre_entries.is_empty() {
1223            admin_error!("No entries provided to BE to modify, invalid server call!");
1224            return Err(OperationError::EmptyRequest);
1225        }
1226
1227        assert_eq!(post_entries.len(), pre_entries.len());
1228
1229        let post_entries_iter = post_entries.iter().filter(|e| {
1230            trace!(?cid);
1231            trace!(changestate = ?e.get_changestate());
1232            // If True - This means that at least one attribute that *is* replicated was changed
1233            // on this entry, so we need to update and add this to the RUV!
1234            //
1235            // If False - This means that the entry in question was updated but the changes are all
1236            // non-replicated so we DO NOT update the RUV here!
1237            e.get_changestate().contains_tail_cid(cid)
1238        });
1239
1240        // All good, lets update the RUV.
1241        // This auto compresses.
1242        let ruv_idl = IDLBitRange::from_iter(post_entries_iter.map(|e| e.get_id()));
1243
1244        if !ruv_idl.is_empty() {
1245            self.get_ruv().insert_change(cid, ruv_idl)?;
1246        }
1247
1248        // Now, given the list of id's, update them
1249        self.get_idlayer().write_identries(post_entries.iter())?;
1250
1251        // Finally, we now reindex all the changed entries. We do this by iterating and zipping
1252        // over the set, because we know the list is in the same order.
1253        pre_entries
1254            .iter()
1255            .zip(post_entries.iter())
1256            .try_for_each(|(pre, post)| self.entry_index(Some(pre.as_ref()), Some(post)))
1257    }
1258
1259    #[instrument(level = "debug", name = "be::incremental_prepare", skip_all)]
1260    pub fn incremental_prepare(
1261        &mut self,
1262        entry_meta: &[EntryIncrementalNew],
1263    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
1264        let mut ret_entries = Vec::with_capacity(entry_meta.len());
1265        let id_max_pre = self.idlayer.get_id2entry_max_id()?;
1266        let mut id_max = id_max_pre;
1267
1268        for ctx_ent in entry_meta.iter() {
1269            let ctx_ent_uuid = ctx_ent.get_uuid();
1270            let idx_key = ctx_ent_uuid.as_hyphenated().to_string();
1271
1272            let idl =
1273                self.get_idlayer()
1274                    .get_idl(&Attribute::Uuid, IndexType::Equality, &idx_key)?;
1275
1276            let entry = match idl {
1277                Some(idl) if idl.is_empty() => {
1278                    // Create the stub entry, we just need it to have an id number
1279                    // allocated.
1280                    id_max += 1;
1281
1282                    let stub_entry = Arc::new(EntrySealedCommitted::stub_sealed_committed_id(
1283                        id_max, ctx_ent,
1284                    ));
1285                    // Now, the stub entry needs to be indexed. If not, uuid2spn
1286                    // isn't created, so subsequent index diffs don't work correctly.
1287                    self.entry_index(None, Some(stub_entry.as_ref()))?;
1288
1289                    // Okay, entry ready to go.
1290                    stub_entry
1291                }
1292                Some(idl) if idl.len() == 1 => {
1293                    // Get the entry from this idl.
1294                    let mut entries = self
1295                        .get_idlayer()
1296                        .get_identry(&IdList::Indexed(idl))
1297                        .map_err(|e| {
1298                            admin_error!(?e, "get_identry failed");
1299                            e
1300                        })?;
1301
1302                    if let Some(entry) = entries.pop() {
1303                        // Return it.
1304                        entry
1305                    } else {
1306                        error!("Invalid entry state, index was unable to locate entry");
1307                        return Err(OperationError::InvalidDbState);
1308                    }
1309                    // Done, entry is ready to go
1310                }
1311                Some(idl) => {
1312                    // BUG - duplicate uuid!
1313                    error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must have only a single or no values. Contains {:?}", idl);
1314                    return Err(OperationError::InvalidDbState);
1315                }
1316                None => {
1317                    // BUG - corrupt index.
1318                    error!(uuid = ?ctx_ent_uuid, "Invalid IDL state, uuid index must be present");
1319                    return Err(OperationError::InvalidDbState);
1320                }
1321            };
1322
1323            ret_entries.push(entry);
1324        }
1325
1326        if id_max != id_max_pre {
1327            self.idlayer.set_id2entry_max_id(id_max);
1328        }
1329
1330        Ok(ret_entries)
1331    }
1332
1333    #[instrument(level = "debug", name = "be::incremental_apply", skip_all)]
1334    pub fn incremental_apply(
1335        &mut self,
1336        update_entries: &[(EntrySealedCommitted, Arc<EntrySealedCommitted>)],
1337        create_entries: Vec<EntrySealedNew>,
1338    ) -> Result<(), OperationError> {
1339        // For the values in create_cands, create these with similar code to the refresh
1340        // path.
1341        if !create_entries.is_empty() {
1342            // Assign id's to all the new entries.
1343            let mut id_max = self.idlayer.get_id2entry_max_id()?;
1344            let c_entries: Vec<_> = create_entries
1345                .into_iter()
1346                .map(|e| {
1347                    id_max += 1;
1348                    e.into_sealed_committed_id(id_max)
1349                })
1350                .collect();
1351
1352            self.idlayer.write_identries(c_entries.iter())?;
1353
1354            self.idlayer.set_id2entry_max_id(id_max);
1355
1356            // Update the RUV with all the changestates of the affected entries.
1357            for e in c_entries.iter() {
1358                self.get_ruv().update_entry_changestate(e)?;
1359            }
1360
1361            // Now update the indexes as required.
1362            for e in c_entries.iter() {
1363                self.entry_index(None, Some(e))?
1364            }
1365        }
1366
1367        // Otherwise this is a cid-less copy of modify.
1368        if !update_entries.is_empty() {
1369            self.get_idlayer()
1370                .write_identries(update_entries.iter().map(|(up, _)| up))?;
1371
1372            for (e, _) in update_entries.iter() {
1373                self.get_ruv().update_entry_changestate(e)?;
1374            }
1375
1376            for (post, pre) in update_entries.iter() {
1377                self.entry_index(Some(pre.as_ref()), Some(post))?
1378            }
1379        }
1380
1381        Ok(())
1382    }
1383
1384    #[instrument(level = "debug", name = "be::reap_tombstones", skip_all)]
1385    pub fn reap_tombstones(&mut self, cid: &Cid, trim_cid: &Cid) -> Result<usize, OperationError> {
1386        debug_assert!(cid > trim_cid);
1387        // Mark a new maximum for the RUV by inserting an empty change. This
1388        // is important to keep the changestate always advancing.
1389        self.get_ruv().insert_change(cid, IDLBitRange::default())?;
1390
1391        // We plan to clear the RUV up to this cid. So we need to build an IDL
1392        // of all the entries we need to examine.
1393        let idl = self.get_ruv().trim_up_to(trim_cid).map_err(|e| {
1394            admin_error!(
1395                ?e,
1396                "During tombstone cleanup, failed to trim RUV to {:?}",
1397                trim_cid
1398            );
1399            e
1400        })?;
1401
1402        let entries = self
1403            .get_idlayer()
1404            .get_identry(&IdList::Indexed(idl))
1405            .map_err(|e| {
1406                admin_error!(?e, "get_identry failed");
1407                e
1408            })?;
1409
1410        if entries.is_empty() {
1411            admin_debug!("No entries affected - reap_tombstones operation success");
1412            return Ok(0);
1413        }
1414
1415        // Now that we have a list of entries we need to partition them into
1416        // two sets. The entries that are tombstoned and ready to reap_tombstones, and
1417        // the entries that need to have their change logs trimmed.
1418        //
1419        // Remember, these tombstones can be reaped because they were tombstoned at time
1420        // point 'cid', and since we are now "past" that minimum cid, then other servers
1421        // will also be trimming these out.
1422        //
1423        // Note unlike a changelog impl, we don't need to trim changestates here. We
1424        // only need the RUV trimmed so that we know if other servers are laggin behind!
1425
1426        // What entries are tombstones and ready to be deleted?
1427
1428        let (tombstones, leftover): (Vec<_>, Vec<_>) = entries
1429            .into_iter()
1430            .partition(|e| e.get_changestate().can_delete(trim_cid));
1431
1432        let ruv_idls = self.get_ruv().ruv_idls();
1433
1434        // Assert that anything leftover still either is *alive* OR is a tombstone
1435        // and has entries in the RUV!
1436
1437        if !leftover
1438            .iter()
1439            .all(|e| e.get_changestate().is_live() || ruv_idls.contains(e.get_id()))
1440        {
1441            admin_error!("Left over entries may be orphaned due to missing RUV entries");
1442            return Err(OperationError::ReplInvalidRUVState);
1443        }
1444
1445        // Now setup to reap_tombstones the tombstones. Remember, in the post cleanup, it's could
1446        // now have been trimmed to a point we can purge them!
1447
1448        // Assert the id's exist on the entry.
1449        let id_list: IDLBitRange = tombstones.iter().map(|e| e.get_id()).collect();
1450
1451        // Ensure nothing here exists in the RUV index, else it means
1452        // we didn't trim properly, or some other state violation has occurred.
1453        if !((&ruv_idls & &id_list).is_empty()) {
1454            admin_error!("RUV still contains entries that are going to be removed.");
1455            return Err(OperationError::ReplInvalidRUVState);
1456        }
1457
1458        // Now, given the list of id's, reap_tombstones them.
1459        let sz = id_list.len();
1460        self.get_idlayer().delete_identry(id_list.into_iter())?;
1461
1462        // Finally, purge the indexes from the entries we removed. These still have
1463        // indexes due to class=tombstone.
1464        tombstones
1465            .iter()
1466            .try_for_each(|e| self.entry_index(Some(e), None))?;
1467
1468        Ok(sz)
1469    }
1470
1471    #[instrument(level = "debug", name = "be::update_idxmeta", skip_all)]
1472    pub fn update_idxmeta(&mut self, idxkeys: Vec<IdxKey>) -> Result<(), OperationError> {
1473        if self.is_idx_slopeyness_generated()? {
1474            trace!("Indexing slopes available");
1475        } else {
1476            warn!("No indexing slopes available. You should consider reindexing to generate these");
1477        };
1478
1479        // TODO: I think anytime we update idx meta is when we should reindex in memory
1480        // indexes.
1481        // Probably needs to be similar to create_idxs so we iterate over the set of
1482        // purely in memory idxs.
1483
1484        // Setup idxkeys here. By default we set these all to "max slope" aka
1485        // all indexes are "equal" but also worse case unless analysed. If they
1486        // have been analysed, we can set the slope factor into here.
1487        let mut idxkeys = idxkeys
1488            .into_iter()
1489            .map(|k| self.get_idx_slope(&k).map(|slope| (k, slope)))
1490            .collect::<Result<Map<_, _>, _>>()?;
1491
1492        std::mem::swap(&mut self.idxmeta_wr.deref_mut().idxkeys, &mut idxkeys);
1493        Ok(())
1494    }
1495
1496    // Should take a mut index set, and then we write the whole thing back
1497    // in a single stripe.
1498    //
1499    // So we need a cache, which we load indexes into as we do ops, then we
1500    // modify them.
1501    //
1502    // At the end, we flush those cchange outs in a single run.
1503    // For create this is probably a
1504    // TODO: Can this be improved?
1505    #[allow(clippy::cognitive_complexity)]
1506    fn entry_index(
1507        &mut self,
1508        pre: Option<&EntrySealedCommitted>,
1509        post: Option<&EntrySealedCommitted>,
1510    ) -> Result<(), OperationError> {
1511        let (e_uuid, e_id, uuid_same) = match (pre, post) {
1512            (None, None) => {
1513                admin_error!("Invalid call to entry_index - no entries provided");
1514                return Err(OperationError::InvalidState);
1515            }
1516            (Some(pre), None) => {
1517                trace!("Attempting to remove entry indexes");
1518                (pre.get_uuid(), pre.get_id(), true)
1519            }
1520            (None, Some(post)) => {
1521                trace!("Attempting to create entry indexes");
1522                (post.get_uuid(), post.get_id(), true)
1523            }
1524            (Some(pre), Some(post)) => {
1525                trace!("Attempting to modify entry indexes");
1526                assert_eq!(pre.get_id(), post.get_id());
1527                (
1528                    post.get_uuid(),
1529                    post.get_id(),
1530                    pre.get_uuid() == post.get_uuid(),
1531                )
1532            }
1533        };
1534
1535        // Update the names/uuid maps. These have to mask out entries
1536        // that are recycled or tombstones, so these pretend as "deleted"
1537        // and can trigger correct actions.
1538
1539        let mask_pre = pre.and_then(|e| e.mask_recycled_ts());
1540        let mask_pre = if !uuid_same {
1541            // Okay, so if the uuids are different this is probably from
1542            // a replication conflict.  We can't just use the normal none/some
1543            // check from the Entry::idx functions as they only yield partial
1544            // changes. Because the uuid is changing, we have to treat pre
1545            // as a deleting entry, regardless of what state post is in.
1546            let uuid = mask_pre.map(|e| e.get_uuid()).ok_or_else(|| {
1547                admin_error!("Invalid entry state - possible memory corruption");
1548                OperationError::InvalidState
1549            })?;
1550
1551            let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(mask_pre, None);
1552            // There will never be content to add.
1553            assert!(n2u_add.is_none());
1554
1555            let (eid2u_add, eid2u_rem) = Entry::idx_externalid2uuid_diff(mask_pre, None);
1556            // There will never be content to add.
1557            assert!(eid2u_add.is_none());
1558
1559            let u2s_act = Entry::idx_uuid2spn_diff(mask_pre, None);
1560            let u2r_act = Entry::idx_uuid2rdn_diff(mask_pre, None);
1561
1562            trace!(?n2u_rem, ?eid2u_rem, ?u2s_act, ?u2r_act,);
1563
1564            // Write the changes out to the backend
1565            if let Some(rem) = n2u_rem {
1566                self.idlayer.write_name2uuid_rem(rem)?
1567            }
1568
1569            if let Some(rem) = eid2u_rem {
1570                self.idlayer.write_externalid2uuid_rem(rem)?
1571            }
1572
1573            match u2s_act {
1574                None => {}
1575                Some(Ok(k)) => self.idlayer.write_uuid2spn(uuid, Some(k))?,
1576                Some(Err(_)) => self.idlayer.write_uuid2spn(uuid, None)?,
1577            }
1578
1579            match u2r_act {
1580                None => {}
1581                Some(Ok(k)) => self.idlayer.write_uuid2rdn(uuid, Some(k))?,
1582                Some(Err(_)) => self.idlayer.write_uuid2rdn(uuid, None)?,
1583            }
1584            // Return none, mask_pre is now completed.
1585            None
1586        } else {
1587            // Return the state.
1588            mask_pre
1589        };
1590
1591        let mask_post = post.and_then(|e| e.mask_recycled_ts());
1592        let (n2u_add, n2u_rem) = Entry::idx_name2uuid_diff(mask_pre, mask_post);
1593        let (eid2u_add, eid2u_rem) = Entry::idx_externalid2uuid_diff(mask_pre, mask_post);
1594
1595        let u2s_act = Entry::idx_uuid2spn_diff(mask_pre, mask_post);
1596        let u2r_act = Entry::idx_uuid2rdn_diff(mask_pre, mask_post);
1597
1598        trace!(
1599            ?n2u_add,
1600            ?n2u_rem,
1601            ?eid2u_add,
1602            ?eid2u_rem,
1603            ?u2s_act,
1604            ?u2r_act
1605        );
1606
1607        // Write the changes out to the backend
1608        if let Some(add) = n2u_add {
1609            self.idlayer.write_name2uuid_add(e_uuid, add)?
1610        }
1611        if let Some(rem) = n2u_rem {
1612            self.idlayer.write_name2uuid_rem(rem)?
1613        }
1614
1615        if let Some(add) = eid2u_add {
1616            self.idlayer.write_externalid2uuid_add(e_uuid, add)?
1617        }
1618        if let Some(rem) = eid2u_rem {
1619            self.idlayer.write_externalid2uuid_rem(rem)?
1620        }
1621
1622        match u2s_act {
1623            None => {}
1624            Some(Ok(k)) => self.idlayer.write_uuid2spn(e_uuid, Some(k))?,
1625            Some(Err(_)) => self.idlayer.write_uuid2spn(e_uuid, None)?,
1626        }
1627
1628        match u2r_act {
1629            None => {}
1630            Some(Ok(k)) => self.idlayer.write_uuid2rdn(e_uuid, Some(k))?,
1631            Some(Err(_)) => self.idlayer.write_uuid2rdn(e_uuid, None)?,
1632        }
1633
1634        // Extremely Cursed - Okay, we know that self.idxmeta will NOT be changed
1635        // in this function, but we need to borrow self as mut for the caches in
1636        // get_idl to work. As a result, this causes a double borrow. To work around
1637        // this we discard the lifetime on idxmeta, because we know that it will
1638        // remain constant for the life of the operation.
1639
1640        let idxmeta = unsafe { &(*(&self.idxmeta_wr.idxkeys as *const _)) };
1641
1642        let idx_diff = Entry::idx_diff(idxmeta, pre, post);
1643
1644        idx_diff.into_iter()
1645            .try_for_each(|act| {
1646                match act {
1647                    Ok((attr, itype, idx_key)) => {
1648                        trace!("Adding {:?} idx -> {:?}: {:?}", itype, attr, idx_key);
1649                        match self.idlayer.get_idl(attr, itype, &idx_key)? {
1650                            Some(mut idl) => {
1651                                idl.insert_id(e_id);
1652                                if cfg!(debug_assertions)
1653                                    && *attr == Attribute::Uuid && itype == IndexType::Equality {
1654                                        // This means a duplicate UUID has appeared in the index.
1655                                        if idl.len() > 1 {
1656                                            trace!(duplicate_idl = ?idl, ?idx_key);
1657                                        }
1658                                        debug_assert!(idl.len() <= 1);
1659                                }
1660                                self.idlayer.write_idl(attr, itype, &idx_key, &idl)
1661                            }
1662                            None => {
1663                                warn!(
1664                                    "WARNING: index {:?} {:?} was not found. YOU MUST REINDEX YOUR DATABASE",
1665                                    attr, itype
1666                                );
1667                                Ok(())
1668                            }
1669                        }
1670                    }
1671                    Err((attr, itype, idx_key)) => {
1672                        trace!("Removing {:?} idx -> {:?}: {:?}", itype, attr, idx_key);
1673                        match self.idlayer.get_idl(attr, itype, &idx_key)? {
1674                            Some(mut idl) => {
1675                                idl.remove_id(e_id);
1676                                if cfg!(debug_assertions) && *attr == Attribute::Uuid && itype == IndexType::Equality {
1677                                        // This means a duplicate UUID has appeared in the index.
1678                                        if idl.len() > 1 {
1679                                            trace!(duplicate_idl = ?idl, ?idx_key);
1680                                        }
1681                                        debug_assert!(idl.len() <= 1);
1682                                }
1683                                self.idlayer.write_idl(attr, itype, &idx_key, &idl)
1684                            }
1685                            None => {
1686                                warn!(
1687                                    "WARNING: index {:?} {:?} was not found. YOU MUST REINDEX YOUR DATABASE",
1688                                    attr, itype
1689                                );
1690                                Ok(())
1691                            }
1692                        }
1693                    }
1694                }
1695            })
1696        // End try_for_each
1697    }
1698
1699    #[allow(dead_code)]
1700    fn missing_idxs(&mut self) -> Result<Vec<(Attribute, IndexType)>, OperationError> {
1701        let idx_table_list = self.get_idlayer().list_idxs()?;
1702
1703        // Turn the vec to a real set
1704        let idx_table_set: HashSet<_> = idx_table_list.into_iter().collect();
1705
1706        let missing: Vec<_> = self
1707            .idxmeta_wr
1708            .idxkeys
1709            .keys()
1710            .filter_map(|ikey| {
1711                // what would the table name be?
1712                let tname = format!("idx_{}_{}", ikey.itype.as_idx_str(), ikey.attr.as_str());
1713                trace!("Checking for {}", tname);
1714
1715                if idx_table_set.contains(&tname) {
1716                    None
1717                } else {
1718                    Some((ikey.attr.clone(), ikey.itype))
1719                }
1720            })
1721            .collect();
1722        Ok(missing)
1723    }
1724
1725    fn create_idxs(&mut self) -> Result<(), OperationError> {
1726        // Create name2uuid and uuid2name
1727        trace!("Creating index -> name2uuid");
1728        self.idlayer.create_name2uuid()?;
1729
1730        trace!("Creating index -> externalid2uuid");
1731        self.idlayer.create_externalid2uuid()?;
1732
1733        trace!("Creating index -> uuid2spn");
1734        self.idlayer.create_uuid2spn()?;
1735
1736        trace!("Creating index -> uuid2rdn");
1737        self.idlayer.create_uuid2rdn()?;
1738
1739        self.idxmeta_wr
1740            .idxkeys
1741            .keys()
1742            .try_for_each(|ikey| self.idlayer.create_idx(&ikey.attr, ikey.itype))
1743    }
1744
1745    pub fn upgrade_reindex(&mut self, v: i64) -> Result<(), OperationError> {
1746        let dbv = self.get_db_index_version()?;
1747        admin_debug!(?dbv, ?v, "upgrade_reindex");
1748        if dbv < v {
1749            self.reindex(false)?;
1750            self.set_db_index_version(v)
1751        } else {
1752            Ok(())
1753        }
1754    }
1755
1756    #[instrument(level = "info", skip_all)]
1757    pub fn reindex(&mut self, immediate: bool) -> Result<(), OperationError> {
1758        let notice_immediate = immediate || (cfg!(not(test)) && cfg!(not(debug_assertions)));
1759
1760        info!(
1761            immediate = notice_immediate,
1762            "System reindex: started - this may take a long time!"
1763        );
1764
1765        // Purge the idxs
1766        // TODO: Purge in memory idxs.
1767        self.idlayer.danger_purge_idxs()?;
1768
1769        // Using the index metadata on the txn, create all our idx tables
1770        // TODO: Needs to create all the in memory indexes.
1771        self.create_idxs()?;
1772
1773        // Now, we need to iterate over everything in id2entry and index them
1774        // Future idea: Do this in batches of X amount to limit memory
1775        // consumption.
1776        let idl = IdList::AllIds;
1777        let entries = self.idlayer.get_identry(&idl).inspect_err(|err| {
1778            error!(?err, "get_identry failure");
1779        })?;
1780
1781        let mut count = 0;
1782
1783        // This is the longest phase of reindexing, so we have a "progress" display here.
1784        entries
1785            .iter()
1786            .try_for_each(|e| {
1787                if immediate {
1788                    count += 1;
1789                    if count % 2500 == 0 {
1790                        eprint!("{count}");
1791                    } else if count % 250 == 0 {
1792                        eprint!(".");
1793                    }
1794                }
1795
1796                self.entry_index(None, Some(e))
1797            })
1798            .inspect_err(|err| {
1799                error!(?err, "reindex failed");
1800            })?;
1801
1802        if immediate {
1803            eprintln!(" done ✅");
1804        }
1805
1806        info!(immediate, "Reindexed {count} entries");
1807
1808        info!("Optimising Indexes: started");
1809        self.idlayer.optimise_dirty_idls();
1810        info!("Optimising Indexes: complete ✅");
1811        info!("Calculating Index Optimisation Slopes: started");
1812        self.idlayer.analyse_idx_slopes().inspect_err(|err| {
1813            error!(?err, "index optimisation failed");
1814        })?;
1815        info!("Calculating Index Optimisation Slopes: complete ✅");
1816        info!("System reindex: complete 🎉");
1817        Ok(())
1818    }
1819
1820    /// ⚠️  - This function will destroy all indexes in the database.
1821    ///
1822    /// It should only be called internally by the backend in limited and
1823    /// specific situations.
1824    fn danger_purge_idxs(&mut self) -> Result<(), OperationError> {
1825        self.get_idlayer().danger_purge_idxs()
1826    }
1827
1828    /// ⚠️  - This function will destroy all entries and indexes in the database.
1829    ///
1830    /// It should only be called internally by the backend in limited and
1831    /// specific situations.
1832    pub(crate) fn danger_delete_all_db_content(&mut self) -> Result<(), OperationError> {
1833        self.get_ruv().clear();
1834        self.get_idlayer()
1835            .danger_purge_id2entry()
1836            .and_then(|_| self.danger_purge_idxs())
1837    }
1838
1839    #[cfg(test)]
1840    pub fn load_test_idl(
1841        &mut self,
1842        attr: &Attribute,
1843        itype: IndexType,
1844        idx_key: &str,
1845    ) -> Result<Option<IDLBitRange>, OperationError> {
1846        self.get_idlayer().get_idl(attr, itype, idx_key)
1847    }
1848
1849    fn is_idx_slopeyness_generated(&mut self) -> Result<bool, OperationError> {
1850        self.get_idlayer().is_idx_slopeyness_generated()
1851    }
1852
1853    fn get_idx_slope(&mut self, ikey: &IdxKey) -> Result<IdxSlope, OperationError> {
1854        // Do we have the slopeyness?
1855        let slope = self
1856            .get_idlayer()
1857            .get_idx_slope(ikey)?
1858            .unwrap_or_else(|| get_idx_slope_default(ikey));
1859        trace!("index slope - {:?} -> {:?}", ikey, slope);
1860        Ok(slope)
1861    }
1862
1863    pub fn restore(&mut self, src_path: &Path) -> Result<(), OperationError> {
1864        let compression = BackupCompression::identify_file(src_path);
1865
1866        debug!("Backup compression identified: {}", compression);
1867        let serialized_string = match compression {
1868            BackupCompression::NoCompression => fs::read_to_string(src_path).map_err(|e| {
1869                error!("Failed to read backup file {} {e:?}", src_path.display());
1870                OperationError::FsError
1871            })?,
1872            BackupCompression::Gzip => {
1873                let reader = std::fs::File::open(src_path).map_err(|e| {
1874                    error!("Failed to open backup file {} {e:?}", src_path.display());
1875                    OperationError::FsError
1876                })?;
1877                let mut decoder = flate2::read::GzDecoder::new(reader);
1878                let mut decompressed_data = String::new();
1879                decoder
1880                    .read_to_string(&mut decompressed_data)
1881                    .map_err(|e| {
1882                        error!("GZip decompression error {:?}", e);
1883                        OperationError::FsError
1884                    })?;
1885                decompressed_data
1886            }
1887        };
1888
1889        self.danger_delete_all_db_content().map_err(|e| {
1890            error!("delete_all_db_content failed {:?}", e);
1891            e
1892        })?;
1893
1894        let idlayer = self.get_idlayer();
1895        // load all entries into RAM, may need to change this later
1896        // if the size of the database compared to RAM is an issue
1897
1898        let dbbak_option: Result<DbBackup, serde_json::Error> =
1899            serde_json::from_str(&serialized_string);
1900
1901        let dbbak = dbbak_option.map_err(|e| {
1902            admin_error!("serde_json error {:?}", e);
1903            OperationError::SerdeJsonError
1904        })?;
1905
1906        let (dbentries, repl_meta, maybe_version) = match dbbak {
1907            DbBackup::V1(dbentries) => (dbentries, None, None),
1908            DbBackup::V2 {
1909                db_s_uuid,
1910                db_d_uuid,
1911                db_ts_max,
1912                entries,
1913            } => {
1914                // Do stuff.
1915                idlayer.write_db_s_uuid(db_s_uuid)?;
1916                idlayer.write_db_d_uuid(db_d_uuid)?;
1917                idlayer.set_db_ts_max(db_ts_max)?;
1918                (entries, None, None)
1919            }
1920            DbBackup::V3 {
1921                db_s_uuid,
1922                db_d_uuid,
1923                db_ts_max,
1924                keyhandles,
1925                entries,
1926            } => {
1927                // Do stuff.
1928                idlayer.write_db_s_uuid(db_s_uuid)?;
1929                idlayer.write_db_d_uuid(db_d_uuid)?;
1930                idlayer.set_db_ts_max(db_ts_max)?;
1931                idlayer.set_key_handles(keyhandles)?;
1932                (entries, None, None)
1933            }
1934            DbBackup::V4 {
1935                db_s_uuid,
1936                db_d_uuid,
1937                db_ts_max,
1938                keyhandles,
1939                repl_meta,
1940                entries,
1941            } => {
1942                // Do stuff.
1943                idlayer.write_db_s_uuid(db_s_uuid)?;
1944                idlayer.write_db_d_uuid(db_d_uuid)?;
1945                idlayer.set_db_ts_max(db_ts_max)?;
1946                idlayer.set_key_handles(keyhandles)?;
1947                (entries, Some(repl_meta), None)
1948            }
1949            DbBackup::V5 {
1950                version,
1951                db_s_uuid,
1952                db_d_uuid,
1953                db_ts_max,
1954                keyhandles,
1955                repl_meta,
1956                entries,
1957            } => {
1958                // Do stuff.
1959                idlayer.write_db_s_uuid(db_s_uuid)?;
1960                idlayer.write_db_d_uuid(db_d_uuid)?;
1961                idlayer.set_db_ts_max(db_ts_max)?;
1962                idlayer.set_key_handles(keyhandles)?;
1963                (entries, Some(repl_meta), Some(version))
1964            }
1965        };
1966
1967        if let Some(version) = maybe_version {
1968            if version != env!("KANIDM_PKG_SERIES") {
1969                error!("The provided backup data is from server version {} and is unable to be restored on this instance ({})", version, env!("KANIDM_PKG_SERIES"));
1970                return Err(OperationError::DB0001MismatchedRestoreVersion);
1971            }
1972        } else {
1973            error!("The provided backup data is from an older server version and is unable to be restored.");
1974            return Err(OperationError::DB0002MismatchedRestoreVersion);
1975        };
1976
1977        // Rebuild the RUV from the backup.
1978        match repl_meta {
1979            Some(DbReplMeta::V1 { ruv: db_ruv }) => {
1980                self.get_ruv()
1981                    .restore(db_ruv.into_iter().map(|db_cid| db_cid.into()))?;
1982            }
1983            None => {
1984                warn!("Unable to restore replication metadata, this server may need a refresh.");
1985            }
1986        }
1987
1988        info!("Restoring {} entries ...", dbentries.len());
1989
1990        // Now, we setup all the entries with new ids.
1991        let mut id_max = 0;
1992        let identries: Result<Vec<IdRawEntry>, _> = dbentries
1993            .iter()
1994            .map(|e| {
1995                id_max += 1;
1996                let data = serde_json::to_vec(&e).map_err(|_| OperationError::SerdeCborError)?;
1997                Ok(IdRawEntry { id: id_max, data })
1998            })
1999            .collect();
2000
2001        let idlayer = self.get_idlayer();
2002
2003        idlayer.write_identries_raw(identries?.into_iter())?;
2004
2005        info!("Restored {} entries", dbentries.len());
2006
2007        let vr = self.verify();
2008        if vr.is_empty() {
2009            Ok(())
2010        } else {
2011            Err(OperationError::ConsistencyError(
2012                vr.into_iter().filter_map(|v| v.err()).collect(),
2013            ))
2014        }
2015    }
2016
2017    /// If any RUV elements are present in the DB, load them now. This provides us with
2018    /// the RUV boundaries and change points from previous operations of the server, so
2019    /// that ruv_rebuild can "fill in" the gaps.
2020    ///
2021    /// # SAFETY
2022    ///
2023    /// Note that you should only call this function during the server startup
2024    /// to reload the RUV data from the entries of the database.
2025    ///
2026    /// Before calling this, the in memory ruv MUST be clear.
2027    #[instrument(level = "debug", name = "be::ruv_rebuild", skip_all)]
2028    fn ruv_reload(&mut self) -> Result<(), OperationError> {
2029        let idlayer = self.get_idlayer();
2030
2031        let db_ruv = idlayer.get_db_ruv()?;
2032
2033        // Setup the CID's that existed previously. We don't need to know what entries
2034        // they affect, we just need them to ensure that we have ranges for replication
2035        // comparison to take effect properly.
2036        self.get_ruv().restore(db_ruv)?;
2037
2038        // Then populate the RUV with the data from the entries.
2039        self.ruv_rebuild()
2040    }
2041
2042    #[instrument(level = "debug", name = "be::ruv_rebuild", skip_all)]
2043    fn ruv_rebuild(&mut self) -> Result<(), OperationError> {
2044        // Rebuild the ruv!
2045        // For now this has to read from all the entries in the DB, but in the future
2046        // we'll actually store this properly (?). If it turns out this is really fast
2047        // we may just rebuild this always on startup.
2048
2049        // NOTE: An important detail is that we don't rely on indexes here!
2050
2051        let idl = IdList::AllIds;
2052        let entries = self.get_idlayer().get_identry(&idl).map_err(|e| {
2053            admin_error!(?e, "get_identry failed");
2054            e
2055        })?;
2056
2057        self.get_ruv().rebuild(&entries)?;
2058
2059        Ok(())
2060    }
2061
2062    pub fn quarantine_entry(&mut self, id: u64) -> Result<(), OperationError> {
2063        self.get_idlayer().quarantine_entry(id)?;
2064        // We have to set the index version to 0 so that on next start we force
2065        // a reindex to automatically occur.
2066        self.set_db_index_version(0)
2067    }
2068
2069    pub fn restore_quarantined(&mut self, id: u64) -> Result<(), OperationError> {
2070        self.get_idlayer().restore_quarantined(id)?;
2071        // We have to set the index version to 0 so that on next start we force
2072        // a reindex to automatically occur.
2073        self.set_db_index_version(0)
2074    }
2075
2076    #[cfg(any(test, debug_assertions))]
2077    pub fn clear_cache(&mut self) -> Result<(), OperationError> {
2078        self.get_idlayer().clear_cache()
2079    }
2080
2081    pub fn commit(self) -> Result<(), OperationError> {
2082        let BackendWriteTransaction {
2083            mut idlayer,
2084            idxmeta_wr,
2085            ruv,
2086        } = self;
2087
2088        // write the ruv content back to the db.
2089        idlayer.write_db_ruv(ruv.added(), ruv.removed())?;
2090
2091        idlayer.commit().map(|()| {
2092            ruv.commit();
2093            idxmeta_wr.commit();
2094        })
2095    }
2096
2097    pub(crate) fn reset_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
2098        // The value is missing. Generate a new one and store it.
2099        let nsid = Uuid::new_v4();
2100        self.get_idlayer().write_db_s_uuid(nsid).map_err(|err| {
2101            error!(?err, "Unable to persist server uuid");
2102            err
2103        })?;
2104        Ok(nsid)
2105    }
2106    pub fn get_db_s_uuid(&mut self) -> Result<Uuid, OperationError> {
2107        let res = self.get_idlayer().get_db_s_uuid().map_err(|err| {
2108            error!(?err, "Failed to read server uuid");
2109            err
2110        })?;
2111        match res {
2112            Some(s_uuid) => Ok(s_uuid),
2113            None => self.reset_db_s_uuid(),
2114        }
2115    }
2116
2117    /// This generates a new domain UUID and stores it into the database,
2118    /// returning the new UUID
2119    fn reset_db_d_uuid(&mut self) -> Result<Uuid, OperationError> {
2120        let nsid = Uuid::new_v4();
2121        self.get_idlayer().write_db_d_uuid(nsid).map_err(|err| {
2122            error!(?err, "Unable to persist domain uuid");
2123            err
2124        })?;
2125        Ok(nsid)
2126    }
2127
2128    /// Manually set a new domain UUID and store it into the DB. This is used
2129    /// as part of a replication refresh.
2130    pub fn set_db_d_uuid(&mut self, nsid: Uuid) -> Result<(), OperationError> {
2131        self.get_idlayer().write_db_d_uuid(nsid)
2132    }
2133
2134    /// This pulls the domain UUID from the database
2135    pub fn get_db_d_uuid(&mut self) -> Result<Uuid, OperationError> {
2136        let res = self.get_idlayer().get_db_d_uuid().map_err(|err| {
2137            error!(?err, "Failed to read domain uuid");
2138            err
2139        })?;
2140        match res {
2141            Some(d_uuid) => Ok(d_uuid),
2142            None => self.reset_db_d_uuid(),
2143        }
2144    }
2145
2146    pub fn set_db_ts_max(&mut self, ts: Duration) -> Result<(), OperationError> {
2147        self.get_idlayer().set_db_ts_max(ts)
2148    }
2149
2150    pub fn get_db_ts_max(&mut self, ts: Duration) -> Result<Duration, OperationError> {
2151        // if none, return ts. If found, return it.
2152        match self.get_idlayer().get_db_ts_max()? {
2153            Some(dts) => Ok(dts),
2154            None => Ok(ts),
2155        }
2156    }
2157
2158    fn get_db_index_version(&mut self) -> Result<i64, OperationError> {
2159        self.get_idlayer().get_db_index_version()
2160    }
2161
2162    fn set_db_index_version(&mut self, v: i64) -> Result<(), OperationError> {
2163        self.get_idlayer().set_db_index_version(v)
2164    }
2165}
2166
2167// We have a number of hardcoded, "obvious" slopes that should
2168// exist. We return these when the analysis has not been run, as
2169// these are values that are generally "good enough" for most applications
2170fn get_idx_slope_default(ikey: &IdxKey) -> IdxSlope {
2171    match (ikey.attr.as_str(), &ikey.itype) {
2172        (ATTR_NAME, IndexType::Equality)
2173        | (ATTR_SPN, IndexType::Equality)
2174        | (ATTR_UUID, IndexType::Equality) => 1,
2175        (ATTR_CLASS, IndexType::Equality) => 180,
2176        (_, IndexType::Equality) => 45,
2177        (_, IndexType::SubString) => 90,
2178        (_, IndexType::Presence) => 90,
2179        (_, IndexType::Ordering) => 120,
2180    }
2181}
2182
2183// In the future this will do the routing between the chosen backends etc.
2184impl Backend {
2185    #[instrument(level = "debug", name = "be::new", skip_all)]
2186    pub fn new(
2187        mut cfg: BackendConfig,
2188        // path: &str,
2189        // mut pool_size: u32,
2190        // fstype: FsType,
2191        idxkeys: Vec<IdxKey>,
2192        vacuum: bool,
2193    ) -> Result<Self, OperationError> {
2194        debug!(db_tickets = ?cfg.pool_size, profile = %env!("KANIDM_PROFILE_NAME"), cpu_flags = %env!("KANIDM_CPU_FLAGS"));
2195
2196        // If in memory, reduce pool to 1
2197        if cfg.path.as_os_str().is_empty() {
2198            cfg.pool_size = 1;
2199        }
2200
2201        // Setup idxkeys here. By default we set these all to "max slope" aka
2202        // all indexes are "equal" but also worse case unless analysed.
2203        //
2204        // During startup this will be "fixed" as the schema core will call reload_idxmeta
2205        // which will trigger a reload of the analysis data (if present).
2206        let idxkeys: Map<_, _> = idxkeys
2207            .into_iter()
2208            .map(|ikey| {
2209                let slope = get_idx_slope_default(&ikey);
2210                (ikey, slope)
2211            })
2212            .collect();
2213
2214        // Load the replication update vector here. Initially we build an in memory
2215        // RUV, and then we load it from the DB.
2216        let ruv = Arc::new(ReplicationUpdateVector::default());
2217
2218        // this has a ::memory() type, but will path == "" work?
2219        let idlayer = Arc::new(IdlArcSqlite::new(&cfg, vacuum)?);
2220        let be = Backend {
2221            cfg,
2222            idlayer,
2223            ruv,
2224            idxmeta: Arc::new(CowCell::new(IdxMeta::new(idxkeys))),
2225        };
2226
2227        // Now complete our setup with a txn
2228        // In this case we can use an empty idx meta because we don't
2229        // access any parts of
2230        // the indexing subsystem here.
2231        let mut idl_write = be.idlayer.write()?;
2232        idl_write
2233            .setup()
2234            .and_then(|_| idl_write.commit())
2235            .map_err(|e| {
2236                admin_error!(?e, "Failed to setup idlayer");
2237                e
2238            })?;
2239
2240        // Load/generate any in memory indexes.
2241        // I think here we don't actually care about in memory indexes until
2242        // later?
2243
2244        // Now rebuild the ruv.
2245        let mut be_write = be.write()?;
2246        be_write
2247            .ruv_reload()
2248            .and_then(|_| be_write.commit())
2249            .map_err(|e| {
2250                admin_error!(?e, "Failed to reload ruv");
2251                e
2252            })?;
2253
2254        Ok(be)
2255    }
2256
2257    pub fn get_pool_size(&self) -> u32 {
2258        debug_assert!(self.cfg.pool_size > 0);
2259        self.cfg.pool_size
2260    }
2261
2262    pub fn try_quiesce(&self) {
2263        self.idlayer.try_quiesce();
2264    }
2265
2266    pub fn read(&self) -> Result<BackendReadTransaction<'_>, OperationError> {
2267        Ok(BackendReadTransaction {
2268            idlayer: self.idlayer.read()?,
2269            idxmeta: self.idxmeta.read(),
2270            ruv: self.ruv.read(),
2271        })
2272    }
2273
2274    pub fn write(&self) -> Result<BackendWriteTransaction<'_>, OperationError> {
2275        Ok(BackendWriteTransaction {
2276            idlayer: self.idlayer.write()?,
2277            idxmeta_wr: self.idxmeta.write(),
2278            ruv: self.ruv.write(),
2279        })
2280    }
2281}
2282
2283// What are the possible actions we'll receive here?
2284
2285#[cfg(test)]
2286mod tests {
2287    use super::super::entry::{Entry, EntryInit, EntryNew};
2288    use super::Limits;
2289    use super::{
2290        Backend, BackendConfig, BackendTransaction, BackendWriteTransaction, DbBackup, IdList,
2291        IdxKey, OperationError,
2292    };
2293    use crate::prelude::*;
2294    use crate::repl::cid::Cid;
2295    use crate::value::{IndexType, PartialValue, Value};
2296    use idlset::v2::IDLBitRange;
2297    use kanidm_proto::backup::BackupCompression;
2298    use std::fs;
2299    use std::iter::FromIterator;
2300    use std::path::Path;
2301    use std::sync::Arc;
2302    use std::time::Duration;
2303
2304    lazy_static! {
2305        static ref CID_ZERO: Cid = Cid::new_zero();
2306        static ref CID_ONE: Cid = Cid::new_count(1);
2307        static ref CID_TWO: Cid = Cid::new_count(2);
2308        static ref CID_THREE: Cid = Cid::new_count(3);
2309        static ref CID_ADV: Cid = Cid::new_count(10);
2310    }
2311
2312    macro_rules! run_test {
2313        ($test_fn:expr) => {{
2314            sketching::test_init();
2315
2316            // This is a demo idxmeta, purely for testing.
2317            let idxmeta = vec![
2318                IdxKey {
2319                    attr: Attribute::Name.into(),
2320                    itype: IndexType::Equality,
2321                },
2322                IdxKey {
2323                    attr: Attribute::Name.into(),
2324                    itype: IndexType::Presence,
2325                },
2326                IdxKey {
2327                    attr: Attribute::Name.into(),
2328                    itype: IndexType::SubString,
2329                },
2330                IdxKey {
2331                    attr: Attribute::Uuid.into(),
2332                    itype: IndexType::Equality,
2333                },
2334                IdxKey {
2335                    attr: Attribute::Uuid.into(),
2336                    itype: IndexType::Presence,
2337                },
2338                IdxKey {
2339                    attr: Attribute::TestAttr.into(),
2340                    itype: IndexType::Equality,
2341                },
2342                IdxKey {
2343                    attr: Attribute::TestNumber.into(),
2344                    itype: IndexType::Equality,
2345                },
2346            ];
2347
2348            let be = Backend::new(BackendConfig::new_test("main"), idxmeta, false)
2349                .expect("Failed to setup backend");
2350
2351            let mut be_txn = be.write().unwrap();
2352
2353            let r = $test_fn(&mut be_txn);
2354            // Commit, to guarantee it worked.
2355            assert!(be_txn.commit().is_ok());
2356            r
2357        }};
2358    }
2359
2360    macro_rules! entry_exists {
2361        ($be:expr, $ent:expr) => {{
2362            let ei = $ent.clone().into_sealed_committed();
2363            let filt = ei
2364                .filter_from_attrs(&[Attribute::Uuid.into()])
2365                .expect("failed to generate filter")
2366                .into_valid_resolved();
2367            let lims = Limits::unlimited();
2368            let entries = $be.search(&lims, &filt).expect("failed to search");
2369            entries.first().is_some()
2370        }};
2371    }
2372
2373    macro_rules! entry_attr_pres {
2374        ($be:expr, $ent:expr, $attr:expr) => {{
2375            let ei = $ent.clone().into_sealed_committed();
2376            let filt = ei
2377                .filter_from_attrs(&[Attribute::UserId.into()])
2378                .expect("failed to generate filter")
2379                .into_valid_resolved();
2380            let lims = Limits::unlimited();
2381            let entries = $be.search(&lims, &filt).expect("failed to search");
2382            match entries.first() {
2383                Some(ent) => ent.attribute_pres($attr),
2384                None => false,
2385            }
2386        }};
2387    }
2388
2389    macro_rules! idl_state {
2390        ($be:expr, $attr:expr, $itype:expr, $idx_key:expr, $expect:expr) => {{
2391            let t_idl = $be
2392                .load_test_idl(&$attr, $itype, &$idx_key.to_string())
2393                .expect("IdList Load failed");
2394            let t = $expect.map(|v: Vec<u64>| IDLBitRange::from_iter(v));
2395            assert_eq!(t_idl, t);
2396        }};
2397    }
2398
2399    #[test]
2400    fn test_be_simple_create() {
2401        run_test!(|be: &mut BackendWriteTransaction| {
2402            trace!("Simple Create");
2403
2404            let empty_result = be.create(&CID_ZERO, Vec::with_capacity(0));
2405            trace!("{:?}", empty_result);
2406            assert_eq!(empty_result, Err(OperationError::EmptyRequest));
2407
2408            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2409            e.add_ava(Attribute::UserId, Value::from("william"));
2410            e.add_ava(
2411                Attribute::Uuid,
2412                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2413            );
2414            let e = e.into_sealed_new();
2415
2416            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
2417
2418            assert!(single_result.is_ok());
2419
2420            // Construct a filter
2421            assert!(entry_exists!(be, e));
2422        });
2423    }
2424
2425    #[test]
2426    fn test_be_simple_search() {
2427        run_test!(|be: &mut BackendWriteTransaction| {
2428            trace!("Simple Search");
2429
2430            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2431            e.add_ava(Attribute::UserId, Value::from("claire"));
2432            e.add_ava(
2433                Attribute::Uuid,
2434                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2435            );
2436            let e = e.into_sealed_new();
2437
2438            let single_result = be.create(&CID_ZERO, vec![e]);
2439            assert!(single_result.is_ok());
2440            // Test a simple EQ search
2441
2442            let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("claire")));
2443
2444            let lims = Limits::unlimited();
2445
2446            let r = be.search(&lims, &filt);
2447            assert!(r.expect("Search failed!").len() == 1);
2448
2449            // Test empty search
2450
2451            // Test class pres
2452
2453            // Search with no results
2454        });
2455    }
2456
2457    #[test]
2458    fn test_be_search_with_invalid() {
2459        run_test!(|be: &mut BackendWriteTransaction| {
2460            trace!("Simple Search");
2461
2462            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
2463            e.add_ava(Attribute::UserId, Value::from("bagel"));
2464            e.add_ava(
2465                Attribute::Uuid,
2466                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2467            );
2468            let e = e.into_sealed_new();
2469
2470            let single_result = be.create(&CID_ZERO, vec![e]);
2471            assert!(single_result.is_ok());
2472
2473            // Test Search with or condition including invalid attribute
2474            let filt = filter_resolved!(f_or(vec![
2475                f_eq(Attribute::UserId, PartialValue::new_utf8s("bagel")),
2476                f_invalid(Attribute::UserId)
2477            ]));
2478
2479            let lims = Limits::unlimited();
2480
2481            let r = be.search(&lims, &filt);
2482            assert!(r.expect("Search failed!").len() == 1);
2483
2484            // Test Search with or condition including invalid attribute
2485            let filt = filter_resolved!(f_and(vec![
2486                f_eq(Attribute::UserId, PartialValue::new_utf8s("bagel")),
2487                f_invalid(Attribute::UserId)
2488            ]));
2489
2490            let lims = Limits::unlimited();
2491
2492            let r = be.search(&lims, &filt);
2493            assert!(r.expect("Search failed!").is_empty());
2494        });
2495    }
2496
2497    #[test]
2498    fn test_be_simple_modify() {
2499        run_test!(|be: &mut BackendWriteTransaction| {
2500            trace!("Simple Modify");
2501            let lims = Limits::unlimited();
2502            // First create some entries (3?)
2503            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2504            e1.add_ava(Attribute::UserId, Value::from("william"));
2505            e1.add_ava(
2506                Attribute::Uuid,
2507                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2508            );
2509
2510            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2511            e2.add_ava(Attribute::UserId, Value::from("alice"));
2512            e2.add_ava(
2513                Attribute::Uuid,
2514                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2515            );
2516
2517            let ve1 = e1.clone().into_sealed_new();
2518            let ve2 = e2.clone().into_sealed_new();
2519
2520            assert!(be.create(&CID_ZERO, vec![ve1, ve2]).is_ok());
2521            assert!(entry_exists!(be, e1));
2522            assert!(entry_exists!(be, e2));
2523
2524            // You need to now retrieve the entries back out to get the entry id's
2525            let mut results = be
2526                .search(&lims, &filter_resolved!(f_pres(Attribute::UserId)))
2527                .expect("Failed to search");
2528
2529            // Get these out to usable entries.
2530            let r1 = results.remove(0);
2531            let r2 = results.remove(0);
2532
2533            let mut r1 = r1.as_ref().clone().into_invalid();
2534            let mut r2 = r2.as_ref().clone().into_invalid();
2535
2536            // Modify no id (err)
2537            // This is now impossible due to the state machine design.
2538            // However, with some unsafe ....
2539            let ue1 = e1.clone().into_sealed_committed();
2540            assert!(be
2541                .modify(&CID_ZERO, &[Arc::new(ue1.clone())], &[ue1])
2542                .is_err());
2543            // Modify none
2544            assert!(be.modify(&CID_ZERO, &[], &[]).is_err());
2545
2546            // Make some changes to r1, r2.
2547            let pre1 = Arc::new(r1.clone().into_sealed_committed());
2548            let pre2 = Arc::new(r2.clone().into_sealed_committed());
2549            r1.add_ava(Attribute::TestAttr, Value::from("modified"));
2550            r2.add_ava(Attribute::TestAttr, Value::from("modified"));
2551
2552            // Now ... cheat.
2553
2554            let vr1 = r1.into_sealed_committed();
2555            let vr2 = r2.into_sealed_committed();
2556
2557            // Modify single
2558            assert!(be
2559                .modify(&CID_ZERO, &[pre1], std::slice::from_ref(&vr1))
2560                .is_ok());
2561            // Assert no other changes
2562            assert!(entry_attr_pres!(be, vr1, Attribute::TestAttr));
2563            assert!(!entry_attr_pres!(be, vr2, Attribute::TestAttr));
2564
2565            // Modify both
2566            assert!(be
2567                .modify(
2568                    &CID_ZERO,
2569                    &[Arc::new(vr1.clone()), pre2],
2570                    &[vr1.clone(), vr2.clone()]
2571                )
2572                .is_ok());
2573
2574            assert!(entry_attr_pres!(be, vr1, Attribute::TestAttr));
2575            assert!(entry_attr_pres!(be, vr2, Attribute::TestAttr));
2576        });
2577    }
2578
2579    #[test]
2580    fn test_be_simple_delete() {
2581        run_test!(|be: &mut BackendWriteTransaction| {
2582            trace!("Simple Delete");
2583            let lims = Limits::unlimited();
2584
2585            // First create some entries (3?)
2586            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2587            e1.add_ava(Attribute::UserId, Value::from("william"));
2588            e1.add_ava(
2589                Attribute::Uuid,
2590                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2591            );
2592
2593            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2594            e2.add_ava(Attribute::UserId, Value::from("alice"));
2595            e2.add_ava(
2596                Attribute::Uuid,
2597                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2598            );
2599
2600            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2601            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2602            e3.add_ava(
2603                Attribute::Uuid,
2604                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2605            );
2606
2607            let ve1 = e1.clone().into_sealed_new();
2608            let ve2 = e2.clone().into_sealed_new();
2609            let ve3 = e3.clone().into_sealed_new();
2610
2611            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2612            assert!(entry_exists!(be, e1));
2613            assert!(entry_exists!(be, e2));
2614            assert!(entry_exists!(be, e3));
2615
2616            // You need to now retrieve the entries back out to get the entry id's
2617            let mut results = be
2618                .search(&lims, &filter_resolved!(f_pres(Attribute::UserId)))
2619                .expect("Failed to search");
2620
2621            // Get these out to usable entries.
2622            let r1 = results.remove(0);
2623            let r2 = results.remove(0);
2624            let r3 = results.remove(0);
2625
2626            // Deletes nothing, all entries are live.
2627            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
2628
2629            // Put them into the tombstone state, and write that down.
2630            // This sets up the RUV with the changes.
2631            let r1_ts = r1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
2632
2633            assert!(be
2634                .modify(&CID_ONE, &[r1], std::slice::from_ref(&r1_ts))
2635                .is_ok());
2636
2637            let r2_ts = r2.to_tombstone(CID_TWO.clone()).into_sealed_committed();
2638            let r3_ts = r3.to_tombstone(CID_TWO.clone()).into_sealed_committed();
2639
2640            assert!(be
2641                .modify(&CID_TWO, &[r2, r3], &[r2_ts.clone(), r3_ts.clone()])
2642                .is_ok());
2643
2644            // The entry are now tombstones, but is still in the ruv. This is because we
2645            // targeted CID_ZERO, not ONE.
2646            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ZERO), Ok(0)));
2647
2648            assert!(entry_exists!(be, r1_ts));
2649            assert!(entry_exists!(be, r2_ts));
2650            assert!(entry_exists!(be, r3_ts));
2651
2652            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_ONE), Ok(0)));
2653
2654            assert!(entry_exists!(be, r1_ts));
2655            assert!(entry_exists!(be, r2_ts));
2656            assert!(entry_exists!(be, r3_ts));
2657
2658            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_TWO), Ok(1)));
2659
2660            assert!(!entry_exists!(be, r1_ts));
2661            assert!(entry_exists!(be, r2_ts));
2662            assert!(entry_exists!(be, r3_ts));
2663
2664            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(2)));
2665
2666            assert!(!entry_exists!(be, r1_ts));
2667            assert!(!entry_exists!(be, r2_ts));
2668            assert!(!entry_exists!(be, r3_ts));
2669
2670            // Nothing left
2671            assert!(matches!(be.reap_tombstones(&CID_ADV, &CID_THREE), Ok(0)));
2672
2673            assert!(!entry_exists!(be, r1_ts));
2674            assert!(!entry_exists!(be, r2_ts));
2675            assert!(!entry_exists!(be, r3_ts));
2676        });
2677    }
2678
2679    #[test]
2680    fn test_be_backup_restore() {
2681        let db_backup_file_name =
2682            Path::new(option_env!("OUT_DIR").unwrap_or("/tmp")).join(".backup_test.json.gz");
2683        eprintln!(" ⚠️   {}", db_backup_file_name.display());
2684        run_test!(|be: &mut BackendWriteTransaction| {
2685            // Important! Need db metadata setup!
2686            be.reset_db_s_uuid().unwrap();
2687            be.reset_db_d_uuid().unwrap();
2688            be.set_db_ts_max(Duration::from_secs(1)).unwrap();
2689
2690            // First create some entries (3?)
2691            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2692            e1.add_ava(Attribute::UserId, Value::from("william"));
2693            e1.add_ava(
2694                Attribute::Uuid,
2695                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2696            );
2697
2698            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2699            e2.add_ava(Attribute::UserId, Value::from("alice"));
2700            e2.add_ava(
2701                Attribute::Uuid,
2702                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2703            );
2704
2705            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2706            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2707            e3.add_ava(
2708                Attribute::Uuid,
2709                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2710            );
2711
2712            let ve1 = e1.clone().into_sealed_new();
2713            let ve2 = e2.clone().into_sealed_new();
2714            let ve3 = e3.clone().into_sealed_new();
2715
2716            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2717            assert!(entry_exists!(be, e1));
2718            assert!(entry_exists!(be, e2));
2719            assert!(entry_exists!(be, e3));
2720
2721            let result = fs::remove_file(&db_backup_file_name);
2722
2723            if let Err(e) = result {
2724                // if the error is the file is not found, that's what we want so continue,
2725                // otherwise return the error
2726                if e.kind() == std::io::ErrorKind::NotFound {}
2727            }
2728            be.backup(&db_backup_file_name, BackupCompression::Gzip)
2729                .expect("Backup failed!");
2730            be.restore(&db_backup_file_name).expect("Restore failed!");
2731
2732            assert!(be.verify().is_empty());
2733        });
2734    }
2735
2736    #[test]
2737    fn test_be_backup_restore_tampered() {
2738        let db_backup_file_name =
2739            Path::new(option_env!("OUT_DIR").unwrap_or("/tmp")).join(".backup2_test.json");
2740        eprintln!(" ⚠️   {}", db_backup_file_name.display());
2741        run_test!(|be: &mut BackendWriteTransaction| {
2742            // Important! Need db metadata setup!
2743            be.reset_db_s_uuid().unwrap();
2744            be.reset_db_d_uuid().unwrap();
2745            be.set_db_ts_max(Duration::from_secs(1)).unwrap();
2746            // First create some entries (3?)
2747            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2748            e1.add_ava(Attribute::UserId, Value::from("william"));
2749            e1.add_ava(
2750                Attribute::Uuid,
2751                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2752            );
2753
2754            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2755            e2.add_ava(Attribute::UserId, Value::from("alice"));
2756            e2.add_ava(
2757                Attribute::Uuid,
2758                Value::from("4b6228ab-1dbe-42a4-a9f5-f6368222438e"),
2759            );
2760
2761            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
2762            e3.add_ava(Attribute::UserId, Value::from("lucy"));
2763            e3.add_ava(
2764                Attribute::Uuid,
2765                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
2766            );
2767
2768            let ve1 = e1.clone().into_sealed_new();
2769            let ve2 = e2.clone().into_sealed_new();
2770            let ve3 = e3.clone().into_sealed_new();
2771
2772            assert!(be.create(&CID_ZERO, vec![ve1, ve2, ve3]).is_ok());
2773            assert!(entry_exists!(be, e1));
2774            assert!(entry_exists!(be, e2));
2775            assert!(entry_exists!(be, e3));
2776
2777            let result = fs::remove_file(&db_backup_file_name);
2778
2779            if let Err(e) = result {
2780                // if the error is the file is not found, that's what we want so continue,
2781                // otherwise return the error
2782                if e.kind() == std::io::ErrorKind::NotFound {}
2783            }
2784
2785            be.backup(&db_backup_file_name, BackupCompression::NoCompression)
2786                .expect("Backup failed!");
2787
2788            // Now here, we need to tamper with the file.
2789            let serialized_string = fs::read_to_string(&db_backup_file_name).unwrap();
2790            trace!(?serialized_string);
2791            let mut dbbak: DbBackup = serde_json::from_str(&serialized_string).unwrap();
2792
2793            match &mut dbbak {
2794                DbBackup::V5 {
2795                    version: _,
2796                    db_s_uuid: _,
2797                    db_d_uuid: _,
2798                    db_ts_max: _,
2799                    keyhandles: _,
2800                    repl_meta: _,
2801                    entries,
2802                } => {
2803                    let _ = entries.pop();
2804                }
2805                _ => {
2806                    // We no longer use these format versions!
2807                    unreachable!()
2808                }
2809            };
2810
2811            let serialized_entries_str = serde_json::to_string_pretty(&dbbak).unwrap();
2812            fs::write(&db_backup_file_name, serialized_entries_str).unwrap();
2813
2814            be.restore(&db_backup_file_name).expect("Restore failed!");
2815
2816            assert!(be.verify().is_empty());
2817        });
2818    }
2819
2820    #[test]
2821    fn test_be_sid_generation_and_reset() {
2822        run_test!(|be: &mut BackendWriteTransaction| {
2823            let sid1 = be.get_db_s_uuid().unwrap();
2824            let sid2 = be.get_db_s_uuid().unwrap();
2825            assert_eq!(sid1, sid2);
2826            let sid3 = be.reset_db_s_uuid().unwrap();
2827            assert!(sid1 != sid3);
2828            let sid4 = be.get_db_s_uuid().unwrap();
2829            assert_eq!(sid3, sid4);
2830        });
2831    }
2832
2833    #[test]
2834    fn test_be_reindex_empty() {
2835        run_test!(|be: &mut BackendWriteTransaction| {
2836            // Add some test data?
2837            let missing = be.missing_idxs().unwrap();
2838            assert_eq!(missing.len(), 7);
2839            assert!(be.reindex(false).is_ok());
2840            let missing = be.missing_idxs().unwrap();
2841            debug!("{:?}", missing);
2842            assert!(missing.is_empty());
2843        });
2844    }
2845
2846    #[test]
2847    fn test_be_reindex_data() {
2848        run_test!(|be: &mut BackendWriteTransaction| {
2849            // Add some test data?
2850            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
2851            e1.add_ava(Attribute::Name, Value::new_iname("william"));
2852            e1.add_ava(
2853                Attribute::Uuid,
2854                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2855            );
2856            let e1 = e1.into_sealed_new();
2857
2858            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
2859            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
2860            e2.add_ava(
2861                Attribute::Uuid,
2862                Value::from("bd651620-00dd-426b-aaa0-4494f7b7906f"),
2863            );
2864            let e2 = e2.into_sealed_new();
2865
2866            be.create(&CID_ZERO, vec![e1, e2]).unwrap();
2867
2868            // purge indexes
2869            be.danger_purge_idxs().unwrap();
2870            // Check they are gone
2871            let missing = be.missing_idxs().unwrap();
2872            assert_eq!(missing.len(), 7);
2873            assert!(be.reindex(false).is_ok());
2874            let missing = be.missing_idxs().unwrap();
2875            debug!("{:?}", missing);
2876            assert!(missing.is_empty());
2877            // check name and uuid ids on eq, sub, pres
2878
2879            idl_state!(
2880                be,
2881                Attribute::Name,
2882                IndexType::Equality,
2883                "william",
2884                Some(vec![1])
2885            );
2886
2887            idl_state!(
2888                be,
2889                Attribute::Name,
2890                IndexType::Equality,
2891                "claire",
2892                Some(vec![2])
2893            );
2894
2895            for sub in [
2896                "w", "m", "wi", "il", "ll", "li", "ia", "am", "wil", "ill", "lli", "lia", "iam",
2897            ] {
2898                idl_state!(
2899                    be,
2900                    Attribute::Name,
2901                    IndexType::SubString,
2902                    sub,
2903                    Some(vec![1])
2904                );
2905            }
2906
2907            for sub in [
2908                "c", "r", "e", "cl", "la", "ai", "ir", "re", "cla", "lai", "air", "ire",
2909            ] {
2910                idl_state!(
2911                    be,
2912                    Attribute::Name,
2913                    IndexType::SubString,
2914                    sub,
2915                    Some(vec![2])
2916                );
2917            }
2918
2919            for sub in ["i", "a", "l"] {
2920                idl_state!(
2921                    be,
2922                    Attribute::Name,
2923                    IndexType::SubString,
2924                    sub,
2925                    Some(vec![1, 2])
2926                );
2927            }
2928
2929            idl_state!(
2930                be,
2931                Attribute::Name,
2932                IndexType::Presence,
2933                "_",
2934                Some(vec![1, 2])
2935            );
2936
2937            idl_state!(
2938                be,
2939                Attribute::Uuid,
2940                IndexType::Equality,
2941                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
2942                Some(vec![1])
2943            );
2944
2945            idl_state!(
2946                be,
2947                Attribute::Uuid,
2948                IndexType::Equality,
2949                "bd651620-00dd-426b-aaa0-4494f7b7906f",
2950                Some(vec![2])
2951            );
2952
2953            idl_state!(
2954                be,
2955                Attribute::Uuid,
2956                IndexType::Presence,
2957                "_",
2958                Some(vec![1, 2])
2959            );
2960
2961            // Show what happens with empty
2962
2963            idl_state!(
2964                be,
2965                Attribute::Name,
2966                IndexType::Equality,
2967                "not-exist",
2968                Some(Vec::with_capacity(0))
2969            );
2970
2971            idl_state!(
2972                be,
2973                Attribute::Uuid,
2974                IndexType::Equality,
2975                "fake-0079-4b8c-8a56-593b22aa44d1",
2976                Some(Vec::with_capacity(0))
2977            );
2978
2979            let uuid_p_idl = be
2980                .load_test_idl(&Attribute::from("not_indexed"), IndexType::Presence, "_")
2981                .unwrap(); // unwrap the result
2982            assert_eq!(uuid_p_idl, None);
2983
2984            // Check name2uuid
2985            let claire_uuid = uuid!("bd651620-00dd-426b-aaa0-4494f7b7906f");
2986            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
2987
2988            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
2989            assert_eq!(be.name2uuid("william"), Ok(Some(william_uuid)));
2990            assert_eq!(
2991                be.name2uuid("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
2992                Ok(None)
2993            );
2994            // check uuid2spn
2995            assert_eq!(
2996                be.uuid2spn(claire_uuid),
2997                Ok(Some(Value::new_iname("claire")))
2998            );
2999            assert_eq!(
3000                be.uuid2spn(william_uuid),
3001                Ok(Some(Value::new_iname("william")))
3002            );
3003            // check uuid2rdn
3004            assert_eq!(
3005                be.uuid2rdn(claire_uuid),
3006                Ok(Some("name=claire".to_string()))
3007            );
3008            assert_eq!(
3009                be.uuid2rdn(william_uuid),
3010                Ok(Some("name=william".to_string()))
3011            );
3012        });
3013    }
3014
3015    #[test]
3016    fn test_be_index_create_delete_simple() {
3017        run_test!(|be: &mut BackendWriteTransaction| {
3018            // First, setup our index tables!
3019            assert!(be.reindex(false).is_ok());
3020            // Test that on entry create, the indexes are made correctly.
3021            // this is a similar case to reindex.
3022            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3023            e1.add_ava(Attribute::Name, Value::from("william"));
3024            e1.add_ava(
3025                Attribute::Uuid,
3026                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3027            );
3028            let e1 = e1.into_sealed_new();
3029
3030            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3031            let mut rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3032            let e1 = rset.pop().unwrap();
3033
3034            idl_state!(
3035                be,
3036                Attribute::Name.as_ref(),
3037                IndexType::Equality,
3038                "william",
3039                Some(vec![1])
3040            );
3041
3042            idl_state!(
3043                be,
3044                Attribute::Name.as_ref(),
3045                IndexType::Presence,
3046                "_",
3047                Some(vec![1])
3048            );
3049
3050            idl_state!(
3051                be,
3052                Attribute::Uuid.as_ref(),
3053                IndexType::Equality,
3054                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3055                Some(vec![1])
3056            );
3057
3058            idl_state!(
3059                be,
3060                Attribute::Uuid.as_ref(),
3061                IndexType::Presence,
3062                "_",
3063                Some(vec![1])
3064            );
3065
3066            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3067            assert_eq!(be.name2uuid("william"), Ok(Some(william_uuid)));
3068            assert_eq!(be.uuid2spn(william_uuid), Ok(Some(Value::from("william"))));
3069            assert_eq!(
3070                be.uuid2rdn(william_uuid),
3071                Ok(Some("name=william".to_string()))
3072            );
3073
3074            // == Now we reap_tombstones, and assert we removed the items.
3075            let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3076            assert!(be.modify(&CID_ONE, &[e1], &[e1_ts]).is_ok());
3077            be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
3078
3079            idl_state!(
3080                be,
3081                Attribute::Name.as_ref(),
3082                IndexType::Equality,
3083                "william",
3084                Some(Vec::with_capacity(0))
3085            );
3086
3087            idl_state!(
3088                be,
3089                Attribute::Name.as_ref(),
3090                IndexType::Presence,
3091                "_",
3092                Some(Vec::with_capacity(0))
3093            );
3094
3095            idl_state!(
3096                be,
3097                Attribute::Uuid.as_ref(),
3098                IndexType::Equality,
3099                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3100                Some(Vec::with_capacity(0))
3101            );
3102
3103            idl_state!(
3104                be,
3105                Attribute::Uuid.as_ref(),
3106                IndexType::Presence,
3107                "_",
3108                Some(Vec::with_capacity(0))
3109            );
3110
3111            assert_eq!(be.name2uuid("william"), Ok(None));
3112            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3113            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3114        })
3115    }
3116
3117    #[test]
3118    fn test_be_index_create_delete_multi() {
3119        run_test!(|be: &mut BackendWriteTransaction| {
3120            // delete multiple entries at a time, without deleting others
3121            // First, setup our index tables!
3122            assert!(be.reindex(false).is_ok());
3123            // Test that on entry create, the indexes are made correctly.
3124            // this is a similar case to reindex.
3125            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3126            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3127            e1.add_ava(
3128                Attribute::Uuid,
3129                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3130            );
3131            let e1 = e1.into_sealed_new();
3132
3133            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3134            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3135            e2.add_ava(
3136                Attribute::Uuid,
3137                Value::from("bd651620-00dd-426b-aaa0-4494f7b7906f"),
3138            );
3139            let e2 = e2.into_sealed_new();
3140
3141            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
3142            e3.add_ava(Attribute::UserId, Value::new_iname("lucy"));
3143            e3.add_ava(
3144                Attribute::Uuid,
3145                Value::from("7b23c99d-c06b-4a9a-a958-3afa56383e1d"),
3146            );
3147            let e3 = e3.into_sealed_new();
3148
3149            let mut rset = be.create(&CID_ZERO, vec![e1, e2, e3]).unwrap();
3150            rset.remove(1);
3151            let mut rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3152            let e1 = rset.pop().unwrap();
3153            let e3 = rset.pop().unwrap();
3154
3155            // Now remove e1, e3.
3156            let e1_ts = e1.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3157            let e3_ts = e3.to_tombstone(CID_ONE.clone()).into_sealed_committed();
3158            assert!(be.modify(&CID_ONE, &[e1, e3], &[e1_ts, e3_ts]).is_ok());
3159            be.reap_tombstones(&CID_ADV, &CID_TWO).unwrap();
3160
3161            idl_state!(
3162                be,
3163                Attribute::Name.as_ref(),
3164                IndexType::Equality,
3165                "claire",
3166                Some(vec![2])
3167            );
3168
3169            idl_state!(
3170                be,
3171                Attribute::Name.as_ref(),
3172                IndexType::Presence,
3173                "_",
3174                Some(vec![2])
3175            );
3176
3177            idl_state!(
3178                be,
3179                Attribute::Uuid.as_ref(),
3180                IndexType::Equality,
3181                "bd651620-00dd-426b-aaa0-4494f7b7906f",
3182                Some(vec![2])
3183            );
3184
3185            idl_state!(
3186                be,
3187                Attribute::Uuid.as_ref(),
3188                IndexType::Presence,
3189                "_",
3190                Some(vec![2])
3191            );
3192
3193            let claire_uuid = uuid!("bd651620-00dd-426b-aaa0-4494f7b7906f");
3194            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3195            let lucy_uuid = uuid!("7b23c99d-c06b-4a9a-a958-3afa56383e1d");
3196
3197            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
3198            let x = be.uuid2spn(claire_uuid);
3199            trace!(?x);
3200            assert_eq!(
3201                be.uuid2spn(claire_uuid),
3202                Ok(Some(Value::new_iname("claire")))
3203            );
3204            assert_eq!(
3205                be.uuid2rdn(claire_uuid),
3206                Ok(Some("name=claire".to_string()))
3207            );
3208
3209            assert_eq!(be.name2uuid("william"), Ok(None));
3210            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3211            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3212
3213            assert_eq!(be.name2uuid("lucy"), Ok(None));
3214            assert_eq!(be.uuid2spn(lucy_uuid), Ok(None));
3215            assert_eq!(be.uuid2rdn(lucy_uuid), Ok(None));
3216        })
3217    }
3218
3219    #[test]
3220    fn test_be_index_modify_simple() {
3221        run_test!(|be: &mut BackendWriteTransaction| {
3222            assert!(be.reindex(false).is_ok());
3223            // modify with one type, ensuring we clean the indexes behind
3224            // us. For the test to be "accurate" we must add one attr, remove one attr
3225            // and change one attr.
3226            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3227            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3228            e1.add_ava(
3229                Attribute::Uuid,
3230                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3231            );
3232            e1.add_ava(Attribute::TestAttr, Value::from("test"));
3233            let e1 = e1.into_sealed_new();
3234
3235            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3236            let rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3237            // Now, alter the new entry.
3238            let mut ce1 = rset[0].as_ref().clone().into_invalid();
3239            // add something.
3240            ce1.add_ava(Attribute::TestNumber, Value::from("test"));
3241            // remove something.
3242            ce1.purge_ava(Attribute::TestAttr);
3243            // mod something.
3244            ce1.purge_ava(Attribute::Name);
3245            ce1.add_ava(Attribute::Name, Value::new_iname("claire"));
3246
3247            let ce1 = ce1.into_sealed_committed();
3248
3249            be.modify(&CID_ZERO, &rset, &[ce1]).unwrap();
3250
3251            // Now check the idls
3252            idl_state!(
3253                be,
3254                Attribute::Name.as_ref(),
3255                IndexType::Equality,
3256                "claire",
3257                Some(vec![1])
3258            );
3259
3260            idl_state!(
3261                be,
3262                Attribute::Name.as_ref(),
3263                IndexType::Presence,
3264                "_",
3265                Some(vec![1])
3266            );
3267
3268            idl_state!(
3269                be,
3270                Attribute::TestNumber.as_ref(),
3271                IndexType::Equality,
3272                "test",
3273                Some(vec![1])
3274            );
3275
3276            idl_state!(
3277                be,
3278                Attribute::TestAttr,
3279                IndexType::Equality,
3280                "test",
3281                Some(vec![])
3282            );
3283
3284            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3285            assert_eq!(be.name2uuid("william"), Ok(None));
3286            assert_eq!(be.name2uuid("claire"), Ok(Some(william_uuid)));
3287            assert_eq!(
3288                be.uuid2spn(william_uuid),
3289                Ok(Some(Value::new_iname("claire")))
3290            );
3291            assert_eq!(
3292                be.uuid2rdn(william_uuid),
3293                Ok(Some("name=claire".to_string()))
3294            );
3295        })
3296    }
3297
3298    #[test]
3299    fn test_be_index_modify_rename() {
3300        run_test!(|be: &mut BackendWriteTransaction| {
3301            assert!(be.reindex(false).is_ok());
3302            // test when we change name AND uuid
3303            // This will be needing to be correct for conflicts when we add
3304            // replication support!
3305            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3306            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3307            e1.add_ava(
3308                Attribute::Uuid,
3309                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3310            );
3311            let e1 = e1.into_sealed_new();
3312
3313            let rset = be.create(&CID_ZERO, vec![e1]).unwrap();
3314            let rset: Vec<_> = rset.into_iter().map(Arc::new).collect();
3315            // Now, alter the new entry.
3316            let mut ce1 = rset[0].as_ref().clone().into_invalid();
3317            ce1.purge_ava(Attribute::Name);
3318            ce1.purge_ava(Attribute::Uuid);
3319            ce1.add_ava(Attribute::Name, Value::new_iname("claire"));
3320            ce1.add_ava(
3321                Attribute::Uuid,
3322                Value::from("04091a7a-6ce4-42d2-abf5-c2ce244ac9e8"),
3323            );
3324            let ce1 = ce1.into_sealed_committed();
3325
3326            be.modify(&CID_ZERO, &rset, &[ce1]).unwrap();
3327
3328            idl_state!(
3329                be,
3330                Attribute::Name.as_ref(),
3331                IndexType::Equality,
3332                "claire",
3333                Some(vec![1])
3334            );
3335
3336            idl_state!(
3337                be,
3338                Attribute::Uuid.as_ref(),
3339                IndexType::Equality,
3340                "04091a7a-6ce4-42d2-abf5-c2ce244ac9e8",
3341                Some(vec![1])
3342            );
3343
3344            idl_state!(
3345                be,
3346                Attribute::Name.as_ref(),
3347                IndexType::Presence,
3348                "_",
3349                Some(vec![1])
3350            );
3351            idl_state!(
3352                be,
3353                Attribute::Uuid.as_ref(),
3354                IndexType::Presence,
3355                "_",
3356                Some(vec![1])
3357            );
3358
3359            idl_state!(
3360                be,
3361                Attribute::Uuid.as_ref(),
3362                IndexType::Equality,
3363                "db237e8a-0079-4b8c-8a56-593b22aa44d1",
3364                Some(Vec::with_capacity(0))
3365            );
3366            idl_state!(
3367                be,
3368                Attribute::Name.as_ref(),
3369                IndexType::Equality,
3370                "william",
3371                Some(Vec::with_capacity(0))
3372            );
3373
3374            let claire_uuid = uuid!("04091a7a-6ce4-42d2-abf5-c2ce244ac9e8");
3375            let william_uuid = uuid!("db237e8a-0079-4b8c-8a56-593b22aa44d1");
3376            assert_eq!(be.name2uuid("william"), Ok(None));
3377            assert_eq!(be.name2uuid("claire"), Ok(Some(claire_uuid)));
3378            assert_eq!(be.uuid2spn(william_uuid), Ok(None));
3379            assert_eq!(be.uuid2rdn(william_uuid), Ok(None));
3380            assert_eq!(
3381                be.uuid2spn(claire_uuid),
3382                Ok(Some(Value::new_iname("claire")))
3383            );
3384            assert_eq!(
3385                be.uuid2rdn(claire_uuid),
3386                Ok(Some("name=claire".to_string()))
3387            );
3388        })
3389    }
3390
3391    #[test]
3392    fn test_be_index_search_simple() {
3393        run_test!(|be: &mut BackendWriteTransaction| {
3394            assert!(be.reindex(false).is_ok());
3395
3396            // Create a test entry with some indexed / unindexed values.
3397            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3398            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3399            e1.add_ava(
3400                Attribute::Uuid,
3401                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3402            );
3403            e1.add_ava(Attribute::NoIndex, Value::from("william"));
3404            e1.add_ava(Attribute::OtherNoIndex, Value::from("william"));
3405            let e1 = e1.into_sealed_new();
3406
3407            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3408            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3409            e2.add_ava(
3410                Attribute::Uuid,
3411                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d2"),
3412            );
3413            let e2 = e2.into_sealed_new();
3414
3415            let _rset = be.create(&CID_ZERO, vec![e1, e2]).unwrap();
3416            // Test fully unindexed
3417            let f_un =
3418                filter_resolved!(f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william")));
3419
3420            let (r, _plan) = be.filter2idl(f_un.to_inner(), 0).unwrap();
3421            match r {
3422                IdList::AllIds => {}
3423                _ => {
3424                    panic!("");
3425                }
3426            }
3427
3428            // Test that a fully indexed search works
3429            let feq = filter_resolved!(f_eq(Attribute::Name, PartialValue::new_utf8s("william")));
3430
3431            let (r, _plan) = be.filter2idl(feq.to_inner(), 0).unwrap();
3432            match r {
3433                IdList::Indexed(idl) => {
3434                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3435                }
3436                _ => {
3437                    panic!("");
3438                }
3439            }
3440
3441            // Test and/or
3442            //   full index and
3443            let f_in_and = filter_resolved!(f_and!([
3444                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3445                f_eq(
3446                    Attribute::Uuid,
3447                    PartialValue::new_utf8s("db237e8a-0079-4b8c-8a56-593b22aa44d1")
3448                )
3449            ]));
3450
3451            let (r, _plan) = be.filter2idl(f_in_and.to_inner(), 0).unwrap();
3452            match r {
3453                IdList::Indexed(idl) => {
3454                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3455                }
3456                _ => {
3457                    panic!("");
3458                }
3459            }
3460
3461            //   partial index and
3462            let f_p1 = filter_resolved!(f_and!([
3463                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3464                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william"))
3465            ]));
3466
3467            let f_p2 = filter_resolved!(f_and!([
3468                f_eq(Attribute::Name, PartialValue::new_utf8s("william")),
3469                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william"))
3470            ]));
3471
3472            let (r, _plan) = be.filter2idl(f_p1.to_inner(), 0).unwrap();
3473            match r {
3474                IdList::Partial(idl) => {
3475                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3476                }
3477                _ => unreachable!(),
3478            }
3479
3480            let (r, _plan) = be.filter2idl(f_p2.to_inner(), 0).unwrap();
3481            match r {
3482                IdList::Partial(idl) => {
3483                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3484                }
3485                _ => unreachable!(),
3486            }
3487
3488            // Substrings are always partial
3489            let f_p3 = filter_resolved!(f_sub(Attribute::Name, PartialValue::new_utf8s("wil")));
3490
3491            let (r, plan) = be.filter2idl(f_p3.to_inner(), 0).unwrap();
3492            trace!(?r, ?plan);
3493            match r {
3494                IdList::Partial(idl) => {
3495                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3496                }
3497                _ => unreachable!(),
3498            }
3499
3500            //   no index and
3501            let f_no_and = filter_resolved!(f_and!([
3502                f_eq(Attribute::NoIndex, PartialValue::new_utf8s("william")),
3503                f_eq(Attribute::OtherNoIndex, PartialValue::new_utf8s("william"))
3504            ]));
3505
3506            let (r, _plan) = be.filter2idl(f_no_and.to_inner(), 0).unwrap();
3507            match r {
3508                IdList::AllIds => {}
3509                _ => {
3510                    panic!("");
3511                }
3512            }
3513
3514            //   full index or
3515            let f_in_or = filter_resolved!(f_or!([f_eq(
3516                Attribute::Name,
3517                PartialValue::new_utf8s("william")
3518            )]));
3519
3520            let (r, _plan) = be.filter2idl(f_in_or.to_inner(), 0).unwrap();
3521            match r {
3522                IdList::Indexed(idl) => {
3523                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3524                }
3525                _ => {
3526                    panic!("");
3527                }
3528            }
3529            //   partial (aka allids) or
3530            let f_un_or = filter_resolved!(f_or!([f_eq(
3531                Attribute::NoIndex,
3532                PartialValue::new_utf8s("william")
3533            )]));
3534
3535            let (r, _plan) = be.filter2idl(f_un_or.to_inner(), 0).unwrap();
3536            match r {
3537                IdList::AllIds => {}
3538                _ => {
3539                    panic!("");
3540                }
3541            }
3542
3543            // Test root andnot
3544            let f_r_andnot = filter_resolved!(f_andnot(f_eq(
3545                Attribute::Name,
3546                PartialValue::new_utf8s("william")
3547            )));
3548
3549            let (r, _plan) = be.filter2idl(f_r_andnot.to_inner(), 0).unwrap();
3550            match r {
3551                IdList::Indexed(idl) => {
3552                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3553                }
3554                _ => {
3555                    panic!("");
3556                }
3557            }
3558
3559            // test andnot as only in and
3560            let f_and_andnot = filter_resolved!(f_and!([f_andnot(f_eq(
3561                Attribute::Name,
3562                PartialValue::new_utf8s("william")
3563            ))]));
3564
3565            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3566            match r {
3567                IdList::Indexed(idl) => {
3568                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3569                }
3570                _ => {
3571                    panic!("");
3572                }
3573            }
3574            // test andnot as only in or
3575            let f_or_andnot = filter_resolved!(f_or!([f_andnot(f_eq(
3576                Attribute::Name,
3577                PartialValue::new_utf8s("william")
3578            ))]));
3579
3580            let (r, _plan) = be.filter2idl(f_or_andnot.to_inner(), 0).unwrap();
3581            match r {
3582                IdList::Indexed(idl) => {
3583                    assert_eq!(idl, IDLBitRange::from_iter(Vec::with_capacity(0)));
3584                }
3585                _ => {
3586                    panic!("");
3587                }
3588            }
3589
3590            // test andnot in and (first) with name
3591            let f_and_andnot = filter_resolved!(f_and!([
3592                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire"))),
3593                f_pres(Attribute::Name)
3594            ]));
3595
3596            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3597            match r {
3598                IdList::Indexed(idl) => {
3599                    debug!("{:?}", idl);
3600                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3601                }
3602                _ => {
3603                    panic!("");
3604                }
3605            }
3606            // test andnot in and (last) with name
3607            let f_and_andnot = filter_resolved!(f_and!([
3608                f_pres(Attribute::Name),
3609                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire")))
3610            ]));
3611
3612            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3613            match r {
3614                IdList::Indexed(idl) => {
3615                    assert_eq!(idl, IDLBitRange::from_iter(vec![1]));
3616                }
3617                _ => {
3618                    panic!("");
3619                }
3620            }
3621            // test andnot in and (first) with no-index
3622            let f_and_andnot = filter_resolved!(f_and!([
3623                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire"))),
3624                f_pres(Attribute::NoIndex)
3625            ]));
3626
3627            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3628            match r {
3629                IdList::AllIds => {}
3630                _ => {
3631                    panic!("");
3632                }
3633            }
3634            // test andnot in and (last) with no-index
3635            let f_and_andnot = filter_resolved!(f_and!([
3636                f_pres(Attribute::NoIndex),
3637                f_andnot(f_eq(Attribute::Name, PartialValue::new_utf8s("claire")))
3638            ]));
3639
3640            let (r, _plan) = be.filter2idl(f_and_andnot.to_inner(), 0).unwrap();
3641            match r {
3642                IdList::AllIds => {}
3643                _ => {
3644                    panic!("");
3645                }
3646            }
3647
3648            //   empty or
3649            let f_e_or = filter_resolved!(f_or!([]));
3650
3651            let (r, _plan) = be.filter2idl(f_e_or.to_inner(), 0).unwrap();
3652            match r {
3653                IdList::Indexed(idl) => {
3654                    assert_eq!(idl, IDLBitRange::from_iter(vec![]));
3655                }
3656                _ => {
3657                    panic!("");
3658                }
3659            }
3660
3661            let f_e_and = filter_resolved!(f_and!([]));
3662
3663            let (r, _plan) = be.filter2idl(f_e_and.to_inner(), 0).unwrap();
3664            match r {
3665                IdList::Indexed(idl) => {
3666                    assert_eq!(idl, IDLBitRange::from_iter(vec![]));
3667                }
3668                _ => {
3669                    panic!("");
3670                }
3671            }
3672        })
3673    }
3674
3675    #[test]
3676    fn test_be_index_search_missing() {
3677        run_test!(|be: &mut BackendWriteTransaction| {
3678            // Test where the index is in schema but not created (purge idxs)
3679            // should fall back to an empty set because we can't satisfy the term
3680            be.danger_purge_idxs().unwrap();
3681            debug!("{:?}", be.missing_idxs().unwrap());
3682            let f_eq = filter_resolved!(f_eq(Attribute::Name, PartialValue::new_utf8s("william")));
3683
3684            let (r, _plan) = be.filter2idl(f_eq.to_inner(), 0).unwrap();
3685            match r {
3686                IdList::AllIds => {}
3687                _ => {
3688                    panic!("");
3689                }
3690            }
3691        })
3692    }
3693
3694    #[test]
3695    fn test_be_index_slope_generation() {
3696        run_test!(|be: &mut BackendWriteTransaction| {
3697            // Create some test entry with some indexed / unindexed values.
3698            let mut e1: Entry<EntryInit, EntryNew> = Entry::new();
3699            e1.add_ava(Attribute::Name, Value::new_iname("william"));
3700            e1.add_ava(
3701                Attribute::Uuid,
3702                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3703            );
3704            e1.add_ava(Attribute::TestAttr, Value::from("dupe"));
3705            e1.add_ava(Attribute::TestNumber, Value::from("1"));
3706            let e1 = e1.into_sealed_new();
3707
3708            let mut e2: Entry<EntryInit, EntryNew> = Entry::new();
3709            e2.add_ava(Attribute::Name, Value::new_iname("claire"));
3710            e2.add_ava(
3711                Attribute::Uuid,
3712                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d2"),
3713            );
3714            e2.add_ava(Attribute::TestAttr, Value::from("dupe"));
3715            e2.add_ava(Attribute::TestNumber, Value::from("1"));
3716            let e2 = e2.into_sealed_new();
3717
3718            let mut e3: Entry<EntryInit, EntryNew> = Entry::new();
3719            e3.add_ava(Attribute::Name, Value::new_iname("benny"));
3720            e3.add_ava(
3721                Attribute::Uuid,
3722                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d3"),
3723            );
3724            e3.add_ava(Attribute::TestAttr, Value::from("dupe"));
3725            e3.add_ava(Attribute::TestNumber, Value::from("2"));
3726            let e3 = e3.into_sealed_new();
3727
3728            let _rset = be.create(&CID_ZERO, vec![e1, e2, e3]).unwrap();
3729
3730            // If the slopes haven't been generated yet, there are some hardcoded values
3731            // that we can use instead. They aren't generated until a first re-index.
3732            assert!(!be.is_idx_slopeyness_generated().unwrap());
3733
3734            let ta_eq_slope = be
3735                .get_idx_slope(&IdxKey::new(Attribute::TestAttr, IndexType::Equality))
3736                .unwrap();
3737            assert_eq!(ta_eq_slope, 45);
3738
3739            let tb_eq_slope = be
3740                .get_idx_slope(&IdxKey::new(Attribute::TestNumber, IndexType::Equality))
3741                .unwrap();
3742            assert_eq!(tb_eq_slope, 45);
3743
3744            let name_eq_slope = be
3745                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Equality))
3746                .unwrap();
3747            assert_eq!(name_eq_slope, 1);
3748            let uuid_eq_slope = be
3749                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Equality))
3750                .unwrap();
3751            assert_eq!(uuid_eq_slope, 1);
3752
3753            let name_pres_slope = be
3754                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Presence))
3755                .unwrap();
3756            assert_eq!(name_pres_slope, 90);
3757            let uuid_pres_slope = be
3758                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Presence))
3759                .unwrap();
3760            assert_eq!(uuid_pres_slope, 90);
3761            // Check the slopes are what we expect for hardcoded values.
3762
3763            // Now check slope generation for the values. Today these are calculated
3764            // at reindex time, so we now perform the re-index.
3765            assert!(be.reindex(false).is_ok());
3766            assert!(be.is_idx_slopeyness_generated().unwrap());
3767
3768            let ta_eq_slope = be
3769                .get_idx_slope(&IdxKey::new(Attribute::TestAttr, IndexType::Equality))
3770                .unwrap();
3771            assert_eq!(ta_eq_slope, 200);
3772
3773            let tb_eq_slope = be
3774                .get_idx_slope(&IdxKey::new(Attribute::TestNumber, IndexType::Equality))
3775                .unwrap();
3776            assert_eq!(tb_eq_slope, 133);
3777
3778            let name_eq_slope = be
3779                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Equality))
3780                .unwrap();
3781            assert_eq!(name_eq_slope, 51);
3782            let uuid_eq_slope = be
3783                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Equality))
3784                .unwrap();
3785            assert_eq!(uuid_eq_slope, 51);
3786
3787            let name_pres_slope = be
3788                .get_idx_slope(&IdxKey::new(Attribute::Name, IndexType::Presence))
3789                .unwrap();
3790            assert_eq!(name_pres_slope, 200);
3791            let uuid_pres_slope = be
3792                .get_idx_slope(&IdxKey::new(Attribute::Uuid, IndexType::Presence))
3793                .unwrap();
3794            assert_eq!(uuid_pres_slope, 200);
3795        })
3796    }
3797
3798    #[test]
3799    fn test_be_limits_allids() {
3800        run_test!(|be: &mut BackendWriteTransaction| {
3801            let mut lim_allow_allids = Limits::unlimited();
3802            lim_allow_allids.unindexed_allow = true;
3803
3804            let mut lim_deny_allids = Limits::unlimited();
3805            lim_deny_allids.unindexed_allow = false;
3806
3807            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3808            e.add_ava(Attribute::UserId, Value::from("william"));
3809            e.add_ava(
3810                Attribute::Uuid,
3811                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3812            );
3813            e.add_ava(Attribute::NonExist, Value::from("x"));
3814            let e = e.into_sealed_new();
3815            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
3816
3817            assert!(single_result.is_ok());
3818            let filt = e
3819                .filter_from_attrs(&[Attribute::NonExist])
3820                .expect("failed to generate filter")
3821                .into_valid_resolved();
3822            // check allow on allids
3823            let res = be.search(&lim_allow_allids, &filt);
3824            assert!(res.is_ok());
3825            let res = be.exists(&lim_allow_allids, &filt);
3826            assert!(res.is_ok());
3827
3828            // check deny on allids
3829            let res = be.search(&lim_deny_allids, &filt);
3830            assert_eq!(res, Err(OperationError::ResourceLimit));
3831            let res = be.exists(&lim_deny_allids, &filt);
3832            assert_eq!(res, Err(OperationError::ResourceLimit));
3833        })
3834    }
3835
3836    #[test]
3837    fn test_be_limits_results_max() {
3838        run_test!(|be: &mut BackendWriteTransaction| {
3839            let mut lim_allow = Limits::unlimited();
3840            lim_allow.search_max_results = usize::MAX;
3841
3842            let mut lim_deny = Limits::unlimited();
3843            lim_deny.search_max_results = 0;
3844
3845            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3846            e.add_ava(Attribute::UserId, Value::from("william"));
3847            e.add_ava(
3848                Attribute::Uuid,
3849                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3850            );
3851            e.add_ava(Attribute::NonExist, Value::from("x"));
3852            let e = e.into_sealed_new();
3853            let single_result = be.create(&CID_ZERO, vec![e.clone()]);
3854            assert!(single_result.is_ok());
3855
3856            let filt = e
3857                .filter_from_attrs(&[Attribute::NonExist])
3858                .expect("failed to generate filter")
3859                .into_valid_resolved();
3860
3861            // --> This is the all ids path (unindexed)
3862            // check allow on entry max
3863            let res = be.search(&lim_allow, &filt);
3864            assert!(res.is_ok());
3865            let res = be.exists(&lim_allow, &filt);
3866            assert!(res.is_ok());
3867
3868            // check deny on entry max
3869            let res = be.search(&lim_deny, &filt);
3870            assert_eq!(res, Err(OperationError::ResourceLimit));
3871            // we don't limit on exists because we never load the entries.
3872            let res = be.exists(&lim_deny, &filt);
3873            assert!(res.is_ok());
3874
3875            // --> This will shortcut due to indexing.
3876            assert!(be.reindex(false).is_ok());
3877            let res = be.search(&lim_deny, &filt);
3878            assert_eq!(res, Err(OperationError::ResourceLimit));
3879            // we don't limit on exists because we never load the entries.
3880            let res = be.exists(&lim_deny, &filt);
3881            assert!(res.is_ok());
3882        })
3883    }
3884
3885    #[test]
3886    fn test_be_limits_partial_filter() {
3887        run_test!(|be: &mut BackendWriteTransaction| {
3888            // This relies on how we do partials, so it could be a bit sensitive.
3889            // A partial is generated after an allids + indexed in a single and
3890            // as we require both conditions to exist. Allids comes from unindexed
3891            // terms. we need to ensure we don't hit partial threshold too.
3892            //
3893            // This means we need an and query where the first term is allids
3894            // and the second is indexed, but without the filter shortcutting.
3895            //
3896            // To achieve this we need a monstrously evil query.
3897            //
3898            let mut lim_allow = Limits::unlimited();
3899            lim_allow.search_max_filter_test = usize::MAX;
3900
3901            let mut lim_deny = Limits::unlimited();
3902            lim_deny.search_max_filter_test = 0;
3903
3904            let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3905            e.add_ava(Attribute::Name, Value::new_iname("william"));
3906            e.add_ava(
3907                Attribute::Uuid,
3908                Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3909            );
3910            e.add_ava(Attribute::NonExist, Value::from("x"));
3911            e.add_ava(Attribute::NonExist, Value::from("y"));
3912            let e = e.into_sealed_new();
3913            let single_result = be.create(&CID_ZERO, vec![e]);
3914            assert!(single_result.is_ok());
3915
3916            // Reindex so we have things in place for our query
3917            assert!(be.reindex(false).is_ok());
3918
3919            // 🚨 This is evil!
3920            // The and allows us to hit "allids + indexed -> partial".
3921            // the or terms prevent re-arrangement. They can't be folded or dead
3922            // term elimed either.
3923            //
3924            // This means the f_or nonexist will become allids and the second will be indexed
3925            // due to f_eq userid in both with the result of william.
3926            //
3927            // This creates a partial, and because it's the first iteration in the loop, this
3928            // doesn't encounter partial threshold testing.
3929            let filt = filter_resolved!(f_and!([
3930                f_or!([
3931                    f_eq(Attribute::NonExist, PartialValue::new_utf8s("x")),
3932                    f_eq(Attribute::NonExist, PartialValue::new_utf8s("y"))
3933                ]),
3934                f_or!([
3935                    f_eq(Attribute::Name, PartialValue::new_utf8s("claire")),
3936                    f_eq(Attribute::Name, PartialValue::new_utf8s("william"))
3937                ]),
3938            ]));
3939
3940            let res = be.search(&lim_allow, &filt);
3941            assert!(res.is_ok());
3942            let res = be.exists(&lim_allow, &filt);
3943            assert!(res.is_ok());
3944
3945            // check deny on entry max
3946            let res = be.search(&lim_deny, &filt);
3947            assert_eq!(res, Err(OperationError::ResourceLimit));
3948            // we don't limit on exists because we never load the entries.
3949            let res = be.exists(&lim_deny, &filt);
3950            assert_eq!(res, Err(OperationError::ResourceLimit));
3951        })
3952    }
3953
3954    #[test]
3955    fn test_be_multiple_create() {
3956        sketching::test_init();
3957
3958        // This is a demo idxmeta, purely for testing.
3959        let idxmeta = vec![IdxKey {
3960            attr: Attribute::Uuid,
3961            itype: IndexType::Equality,
3962        }];
3963
3964        let be_a = Backend::new(BackendConfig::new_test("main"), idxmeta.clone(), false)
3965            .expect("Failed to setup backend");
3966
3967        let be_b = Backend::new(BackendConfig::new_test("db_2"), idxmeta, false)
3968            .expect("Failed to setup backend");
3969
3970        let mut be_a_txn = be_a.write().unwrap();
3971        let mut be_b_txn = be_b.write().unwrap();
3972
3973        assert!(be_a_txn.get_db_s_uuid() != be_b_txn.get_db_s_uuid());
3974
3975        // Create into A
3976        let mut e: Entry<EntryInit, EntryNew> = Entry::new();
3977        e.add_ava(Attribute::UserId, Value::from("william"));
3978        e.add_ava(
3979            Attribute::Uuid,
3980            Value::from("db237e8a-0079-4b8c-8a56-593b22aa44d1"),
3981        );
3982        let e = e.into_sealed_new();
3983
3984        let single_result = be_a_txn.create(&CID_ZERO, vec![e]);
3985
3986        assert!(single_result.is_ok());
3987
3988        // Assert it's in A but not B.
3989        let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("william")));
3990
3991        let lims = Limits::unlimited();
3992
3993        let r = be_a_txn.search(&lims, &filt);
3994        assert!(r.expect("Search failed!").len() == 1);
3995
3996        let r = be_b_txn.search(&lims, &filt);
3997        assert!(r.expect("Search failed!").is_empty());
3998
3999        // Create into B
4000        let mut e: Entry<EntryInit, EntryNew> = Entry::new();
4001        e.add_ava(Attribute::UserId, Value::from("claire"));
4002        e.add_ava(
4003            Attribute::Uuid,
4004            Value::from("0c680959-0944-47d6-9dea-53304d124266"),
4005        );
4006        let e = e.into_sealed_new();
4007
4008        let single_result = be_b_txn.create(&CID_ZERO, vec![e]);
4009
4010        assert!(single_result.is_ok());
4011
4012        // Assert it's in B but not A
4013        let filt = filter_resolved!(f_eq(Attribute::UserId, PartialValue::new_utf8s("claire")));
4014
4015        let lims = Limits::unlimited();
4016
4017        let r = be_a_txn.search(&lims, &filt);
4018        assert!(r.expect("Search failed!").is_empty());
4019
4020        let r = be_b_txn.search(&lims, &filt);
4021        assert!(r.expect("Search failed!").len() == 1);
4022    }
4023}