kanidmd_lib/be/
idl_arc_sqlite.rs

1use std::collections::BTreeMap;
2use std::collections::BTreeSet;
3use std::convert::TryInto;
4use std::ops::DerefMut;
5use std::sync::Arc;
6use std::time::Duration;
7
8use concread::arcache::{ARCache, ARCacheBuilder, ARCacheReadTxn, ARCacheWriteTxn};
9use concread::cowcell::*;
10use hashbrown::HashMap;
11use idlset::v2::IDLBitRange;
12use idlset::AndNot;
13use kanidm_proto::internal::{ConsistencyError, OperationError};
14use tracing::trace;
15use uuid::Uuid;
16
17use crate::be::idl_sqlite::{
18    IdlSqlite, IdlSqliteReadTransaction, IdlSqliteTransaction, IdlSqliteWriteTransaction,
19};
20use crate::be::idxkey::{
21    IdlCacheKey, IdlCacheKeyRef, IdlCacheKeyToRef, IdxKey, IdxKeyRef, IdxKeyToRef, IdxNameKey,
22    IdxSlope,
23};
24use crate::be::keystorage::{KeyHandle, KeyHandleId};
25use crate::be::{BackendConfig, IdList, IdRawEntry};
26use crate::entry::{Entry, EntryCommitted, EntrySealed};
27use crate::prelude::*;
28use crate::value::{IndexType, Value};
29
30// use std::borrow::Borrow;
31
32// Appears to take about ~500MB on some stress tests
33const DEFAULT_CACHE_TARGET: usize = 2048;
34const DEFAULT_IDL_CACHE_RATIO: usize = 32;
35const DEFAULT_NAME_CACHE_RATIO: usize = 8;
36const DEFAULT_CACHE_RMISS: usize = 0;
37const DEFAULT_CACHE_WMISS: usize = 0;
38
39const DEFAULT_IDX_CACHE_RMISS: usize = 8;
40const DEFAULT_IDX_CACHE_WMISS: usize = 16;
41const DEFAULT_IDX_EXISTS_TARGET: usize = 256;
42
43#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
44enum NameCacheKey {
45    Name2Uuid(String),
46    ExternalId2Uuid(String),
47    Uuid2Rdn(Uuid),
48    Uuid2Spn(Uuid),
49}
50
51#[derive(Debug, Clone)]
52enum NameCacheValue {
53    U(Uuid),
54    R(String),
55    S(Box<Value>),
56}
57
58pub struct IdlArcSqlite {
59    db: IdlSqlite,
60    entry_cache: ARCache<u64, Arc<EntrySealedCommitted>>,
61    idl_cache: ARCache<IdlCacheKey, Box<IDLBitRange>>,
62    name_cache: ARCache<NameCacheKey, NameCacheValue>,
63
64    idx_exists_cache: ARCache<IdxNameKey, bool>,
65
66    op_ts_max: CowCell<Option<Duration>>,
67    allids: CowCell<IDLBitRange>,
68    maxid: CowCell<u64>,
69    keyhandles: CowCell<HashMap<KeyHandleId, KeyHandle>>,
70}
71
72pub struct IdlArcSqliteReadTransaction<'a> {
73    db: IdlSqliteReadTransaction,
74    entry_cache: ARCacheReadTxn<'a, u64, Arc<EntrySealedCommitted>, ()>,
75    idl_cache: ARCacheReadTxn<'a, IdlCacheKey, Box<IDLBitRange>, ()>,
76    name_cache: ARCacheReadTxn<'a, NameCacheKey, NameCacheValue, ()>,
77
78    idx_exists_cache: ARCacheReadTxn<'a, IdxNameKey, bool, ()>,
79    allids: CowCellReadTxn<IDLBitRange>,
80}
81
82pub struct IdlArcSqliteWriteTransaction<'a> {
83    pub(super) db: IdlSqliteWriteTransaction,
84    entry_cache: ARCacheWriteTxn<'a, u64, Arc<EntrySealedCommitted>, ()>,
85    idl_cache: ARCacheWriteTxn<'a, IdlCacheKey, Box<IDLBitRange>, ()>,
86    name_cache: ARCacheWriteTxn<'a, NameCacheKey, NameCacheValue, ()>,
87
88    idx_exists_cache: ARCacheWriteTxn<'a, IdxNameKey, bool, ()>,
89
90    op_ts_max: CowCellWriteTxn<'a, Option<Duration>>,
91    allids: CowCellWriteTxn<'a, IDLBitRange>,
92    maxid: CowCellWriteTxn<'a, u64>,
93    pub(super) keyhandles: CowCellWriteTxn<'a, HashMap<KeyHandleId, KeyHandle>>,
94}
95
96macro_rules! get_identry {
97    (
98        $self:expr,
99        $idl:expr,
100        $is_read_op:expr
101    ) => {{
102        let mut result: Vec<Arc<EntrySealedCommitted>> = Vec::with_capacity(0);
103        match $idl {
104            IdList::Partial(idli) | IdList::PartialThreshold(idli) | IdList::Indexed(idli) => {
105                let mut nidl = IDLBitRange::new();
106
107                idli.into_iter().for_each(|i| {
108                    // For all the id's in idl.
109                    // is it in the cache?
110                    match $self.entry_cache.get(&i) {
111                        Some(eref) => result.push(eref.clone()),
112                        None => unsafe { nidl.push_id(i) },
113                    }
114                });
115
116                if !nidl.is_empty() {
117                    // Now, get anything from nidl that is needed.
118                    let mut db_result = $self.db.get_identry(&IdList::Partial(nidl))?;
119                    // Clone everything from db_result into the cache.
120                    if $is_read_op {
121                        db_result.iter().for_each(|e| {
122                            $self.entry_cache.insert(e.get_id(), e.clone());
123                        });
124                    }
125                    // Merge the two vecs
126                    result.append(&mut db_result);
127                }
128            }
129            IdList::AllIds => {
130                // VERY similar to above, but we skip adding the entries to the cache
131                // on miss to prevent scan/invalidation attacks.
132                let idli = (*$self.allids).clone();
133                let mut nidl = IDLBitRange::new();
134
135                (&idli)
136                    .into_iter()
137                    .for_each(|i| match $self.entry_cache.get(&i) {
138                        Some(eref) => result.push(eref.clone()),
139                        None => unsafe { nidl.push_id(i) },
140                    });
141
142                if !nidl.is_empty() {
143                    // Now, get anything from nidl that is needed.
144                    let mut db_result = $self.db.get_identry(&IdList::Partial(nidl))?;
145                    // Merge the two vecs
146                    result.append(&mut db_result);
147                }
148            }
149        };
150        // Return
151        Ok(result)
152    }};
153}
154
155macro_rules! get_identry_raw {
156    (
157        $self:expr,
158        $idl:expr
159    ) => {{
160        // As a cache we have no concept of this, so we just bypass to the db.
161        $self.db.get_identry_raw($idl)
162    }};
163}
164
165// macro_rules! exists_idx {
166//     (
167//         $self:expr,
168//         $attr:expr,
169//         $itype:expr
170//     ) => {{
171//         // As a cache we have no concept of this, so we just bypass to the db.
172//         $self.db.exists_idx($attr, $itype)
173//     }};
174// }
175
176macro_rules! get_idl {
177    (
178        $self:expr,
179        $attr:expr,
180        $itype:expr,
181        $idx_key:expr
182    ) => {{
183        // SEE ALSO #259: Find a way to implement borrow for this properly.
184        // I don't think this is possible. When we make this dyn, the arc
185        // needs the dyn trait to be sized so that it *could* claim a clone
186        // for hit tracking reasons. That also means that we need From and
187        // some other traits that just seem incompatible. And in the end,
188        // we clone a few times in arc, and if we miss we need to insert anyway
189        //
190        // So the best path could be to replace IdlCacheKey with a compressed
191        // or smaller type. Perhaps even a small cache of the IdlCacheKeys that
192        // are allocated to reduce some allocs? Probably over thinking it at
193        // this point.
194
195        // Now attempt to get from this cache.
196        let cache_key = IdlCacheKeyRef {
197            a: $attr,
198            i: $itype,
199            k: $idx_key,
200        };
201        let cache_r = $self.idl_cache.get(&cache_key as &dyn IdlCacheKeyToRef);
202        // If hit, continue.
203        if let Some(ref data) = cache_r {
204            trace!(
205                cached_index = ?$itype,
206                attr = ?$attr,
207                idl = %data,
208            );
209            return Ok(Some(data.as_ref().clone()));
210        }
211
212        // If it was a miss, does the  actually exist in the DB?
213        let idx_key = IdxNameKey {
214            a: $attr.clone(),
215            i: $itype,
216        };
217        let idx_r = $self.idx_exists_cache.get(&idx_key);
218        if idx_r == Some(&false) {
219            // The idx does not exist - bail early.
220            return Ok(None)
221        }
222
223        // The table either exists and we don't have data on it yet,
224        // or it does not exist and we need to hear back from the lower level
225
226        // If miss, get from db *and* insert to the cache.
227        let db_r = $self.db.get_idl($attr, $itype, $idx_key)?;
228
229        if let Some(ref idl) = db_r {
230            if idx_r == None {
231                // It exists, so track that data, because we weren't
232                // previously tracking it.
233                $self.idx_exists_cache.insert(idx_key, true)
234            }
235
236            let ncache_key = IdlCacheKey {
237                a: $attr.clone(),
238                i: $itype.clone(),
239                k: $idx_key.into(),
240            };
241            $self.idl_cache.insert(ncache_key, Box::new(idl.clone()))
242        } else {
243            // The DB was unable to return this idx because table backing the
244            // idx does not exist. We should cache this to prevent repeat hits
245            // on sqlite until the db does exist, at which point the cache is
246            // cleared anyway.
247            //
248            // NOTE: If the db idx misses it returns Some(empty_set), so this
249            // only caches missing index tables.
250            $self.idx_exists_cache.insert(idx_key, false)
251        };
252        Ok(db_r)
253    }};
254}
255
256macro_rules! name2uuid {
257    (
258        $self:expr,
259        $name:expr
260    ) => {{
261        let cache_key = NameCacheKey::Name2Uuid($name.to_string());
262        let cache_r = $self.name_cache.get(&cache_key);
263        if let Some(NameCacheValue::U(uuid)) = cache_r {
264            trace!(?uuid, "Got cached name2uuid");
265            return Ok(Some(uuid.clone()));
266        } else {
267            trace!("Cache miss uuid for name2uuid");
268        }
269
270        let db_r = $self.db.name2uuid($name)?;
271        if let Some(uuid) = db_r {
272            $self
273                .name_cache
274                .insert(cache_key, NameCacheValue::U(uuid.clone()))
275        }
276        Ok(db_r)
277    }};
278}
279
280macro_rules! externalid2uuid {
281    (
282        $self:expr,
283        $name:expr
284    ) => {{
285        let cache_key = NameCacheKey::ExternalId2Uuid($name.to_string());
286        let cache_r = $self.name_cache.get(&cache_key);
287        if let Some(NameCacheValue::U(uuid)) = cache_r {
288            trace!(?uuid, "Got cached externalid2uuid");
289            return Ok(Some(uuid.clone()));
290        } else {
291            trace!("Cache miss uuid for externalid2uuid");
292        }
293
294        let db_r = $self.db.externalid2uuid($name)?;
295        if let Some(uuid) = db_r {
296            $self
297                .name_cache
298                .insert(cache_key, NameCacheValue::U(uuid.clone()))
299        }
300        Ok(db_r)
301    }};
302}
303
304macro_rules! uuid2spn {
305    (
306        $self:expr,
307        $uuid:expr
308    ) => {{
309        let cache_key = NameCacheKey::Uuid2Spn($uuid);
310        let cache_r = $self.name_cache.get(&cache_key);
311        if let Some(NameCacheValue::S(ref spn)) = cache_r {
312            trace!(?spn, "Got cached uuid2spn");
313            return Ok(Some(spn.as_ref().clone()));
314        } else {
315            trace!("Cache miss spn for uuid2spn");
316        }
317
318        let db_r = $self.db.uuid2spn($uuid)?;
319        if let Some(ref data) = db_r {
320            $self
321                .name_cache
322                .insert(cache_key, NameCacheValue::S(Box::new(data.clone())))
323        }
324        Ok(db_r)
325    }};
326}
327
328macro_rules! uuid2rdn {
329    (
330        $self:expr,
331        $uuid:expr
332    ) => {{
333        let cache_key = NameCacheKey::Uuid2Rdn($uuid);
334        let cache_r = $self.name_cache.get(&cache_key);
335        if let Some(NameCacheValue::R(ref rdn)) = cache_r {
336            return Ok(Some(rdn.clone()));
337        } else {
338            trace!("Cache miss rdn for uuid2rdn");
339        }
340
341        let db_r = $self.db.uuid2rdn($uuid)?;
342        if let Some(ref data) = db_r {
343            $self
344                .name_cache
345                .insert(cache_key, NameCacheValue::R(data.clone()))
346        }
347        Ok(db_r)
348    }};
349}
350
351macro_rules! verify {
352    (
353        $self:expr
354    ) => {{
355        let mut r = $self.db.verify();
356        if r.is_empty() && !$self.is_dirty() {
357            // Check allids.
358            match $self.db.get_allids() {
359                Ok(db_allids) => {
360                    if !db_allids.is_compressed() || !(*($self).allids).is_compressed() {
361                        admin_warn!("Inconsistent ALLIDS compression state");
362                        r.push(Err(ConsistencyError::BackendAllIdsSync))
363                    }
364                    if db_allids != (*($self).allids) {
365                        // might want to redo how large key-values are formatted considering what this could look like
366                        admin_warn!(
367                            db_allids = ?(&db_allids).andnot(&($self).allids),
368                            arc_allids = ?(&(*($self).allids)).andnot(&db_allids),
369                            "Inconsistent ALLIDS set"
370                        );
371                        r.push(Err(ConsistencyError::BackendAllIdsSync))
372                    }
373                }
374                Err(_) => r.push(Err(ConsistencyError::Unknown)),
375            };
376        };
377        r
378    }};
379}
380
381pub trait IdlArcSqliteTransaction {
382    fn get_identry(
383        &mut self,
384        idl: &IdList,
385    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError>;
386
387    fn get_identry_raw(&self, idl: &IdList) -> Result<Vec<IdRawEntry>, OperationError>;
388
389    // fn exists_idx(&mut self, attr: &str, itype: IndexType) -> Result<bool, OperationError>;
390
391    fn get_idl(
392        &mut self,
393        attr: &Attribute,
394        itype: IndexType,
395        idx_key: &str,
396    ) -> Result<Option<IDLBitRange>, OperationError>;
397
398    fn get_db_s_uuid(&self) -> Result<Option<Uuid>, OperationError>;
399
400    fn get_db_d_uuid(&self) -> Result<Option<Uuid>, OperationError>;
401
402    fn get_db_ts_max(&self) -> Result<Option<Duration>, OperationError>;
403
404    fn get_key_handles(&mut self) -> Result<BTreeMap<KeyHandleId, KeyHandle>, OperationError>;
405
406    fn verify(&self) -> Vec<Result<(), ConsistencyError>>;
407
408    fn is_dirty(&self) -> bool;
409
410    fn name2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError>;
411
412    fn externalid2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError>;
413
414    fn uuid2spn(&mut self, uuid: Uuid) -> Result<Option<Value>, OperationError>;
415
416    fn uuid2rdn(&mut self, uuid: Uuid) -> Result<Option<String>, OperationError>;
417
418    fn list_idxs(&self) -> Result<Vec<String>, OperationError>;
419
420    fn list_id2entry(&self) -> Result<Vec<(u64, String)>, OperationError>;
421
422    fn list_quarantined(&self) -> Result<Vec<(u64, String)>, OperationError>;
423
424    fn list_index_content(
425        &self,
426        index_name: &str,
427    ) -> Result<Vec<(String, IDLBitRange)>, OperationError>;
428
429    fn get_id2entry(&self, id: u64) -> Result<(u64, String), OperationError>;
430}
431
432impl IdlArcSqliteTransaction for IdlArcSqliteReadTransaction<'_> {
433    fn get_identry(
434        &mut self,
435        idl: &IdList,
436    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
437        get_identry!(self, idl, true)
438    }
439
440    fn get_identry_raw(&self, idl: &IdList) -> Result<Vec<IdRawEntry>, OperationError> {
441        get_identry_raw!(self, idl)
442    }
443
444    // fn exists_idx(&mut self, attr: &str, itype: IndexType) -> Result<bool, OperationError> {
445    //     exists_idx!(self, attr, itype)
446    // }
447
448    #[instrument(level = "trace", skip_all)]
449    fn get_idl(
450        &mut self,
451        attr: &Attribute,
452        itype: IndexType,
453        idx_key: &str,
454    ) -> Result<Option<IDLBitRange>, OperationError> {
455        get_idl!(self, attr, itype, idx_key)
456    }
457
458    fn get_db_s_uuid(&self) -> Result<Option<Uuid>, OperationError> {
459        self.db.get_db_s_uuid()
460    }
461
462    fn get_db_d_uuid(&self) -> Result<Option<Uuid>, OperationError> {
463        self.db.get_db_d_uuid()
464    }
465
466    fn get_db_ts_max(&self) -> Result<Option<Duration>, OperationError> {
467        self.db.get_db_ts_max()
468    }
469
470    fn get_key_handles(&mut self) -> Result<BTreeMap<KeyHandleId, KeyHandle>, OperationError> {
471        self.db.get_key_handles()
472    }
473
474    fn verify(&self) -> Vec<Result<(), ConsistencyError>> {
475        verify!(self)
476    }
477
478    fn is_dirty(&self) -> bool {
479        false
480    }
481
482    fn name2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
483        name2uuid!(self, name)
484    }
485
486    fn externalid2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
487        externalid2uuid!(self, name)
488    }
489
490    fn uuid2spn(&mut self, uuid: Uuid) -> Result<Option<Value>, OperationError> {
491        uuid2spn!(self, uuid)
492    }
493
494    fn uuid2rdn(&mut self, uuid: Uuid) -> Result<Option<String>, OperationError> {
495        uuid2rdn!(self, uuid)
496    }
497
498    fn list_idxs(&self) -> Result<Vec<String>, OperationError> {
499        // This is only used in tests or debug tools, so bypass the cache.
500        self.db.list_idxs()
501    }
502
503    fn list_id2entry(&self) -> Result<Vec<(u64, String)>, OperationError> {
504        // This is only used in tests or debug tools, so bypass the cache.
505        self.db.list_id2entry()
506    }
507
508    fn list_quarantined(&self) -> Result<Vec<(u64, String)>, OperationError> {
509        // No cache of quarantined entries.
510        self.db.list_quarantined()
511    }
512
513    fn list_index_content(
514        &self,
515        index_name: &str,
516    ) -> Result<Vec<(String, IDLBitRange)>, OperationError> {
517        // This is only used in tests or debug tools, so bypass the cache.
518        self.db.list_index_content(index_name)
519    }
520
521    fn get_id2entry(&self, id: u64) -> Result<(u64, String), OperationError> {
522        // This is only used in tests or debug tools, so bypass the cache.
523        self.db.get_id2entry(id)
524    }
525}
526
527impl IdlArcSqliteTransaction for IdlArcSqliteWriteTransaction<'_> {
528    fn get_identry(
529        &mut self,
530        idl: &IdList,
531    ) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
532        get_identry!(self, idl, false)
533    }
534
535    fn get_identry_raw(&self, idl: &IdList) -> Result<Vec<IdRawEntry>, OperationError> {
536        get_identry_raw!(self, idl)
537    }
538
539    // fn exists_idx(&mut self, attr: &str, itype: IndexType) -> Result<bool, OperationError> {
540    //     exists_idx!(self, attr, itype)
541    // }
542
543    #[instrument(level = "trace", skip_all)]
544    fn get_idl(
545        &mut self,
546        attr: &Attribute,
547        itype: IndexType,
548        idx_key: &str,
549    ) -> Result<Option<IDLBitRange>, OperationError> {
550        get_idl!(self, attr, itype, idx_key)
551    }
552
553    fn get_db_s_uuid(&self) -> Result<Option<Uuid>, OperationError> {
554        self.db.get_db_s_uuid()
555    }
556
557    fn get_db_d_uuid(&self) -> Result<Option<Uuid>, OperationError> {
558        self.db.get_db_d_uuid()
559    }
560
561    fn get_db_ts_max(&self) -> Result<Option<Duration>, OperationError> {
562        match *self.op_ts_max {
563            Some(ts) => Ok(Some(ts)),
564            None => self.db.get_db_ts_max(),
565        }
566    }
567
568    fn get_key_handles(&mut self) -> Result<BTreeMap<KeyHandleId, KeyHandle>, OperationError> {
569        self.db.get_key_handles()
570    }
571
572    fn verify(&self) -> Vec<Result<(), ConsistencyError>> {
573        verify!(self)
574    }
575
576    fn is_dirty(&self) -> bool {
577        self.entry_cache.is_dirty()
578    }
579
580    fn name2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
581        name2uuid!(self, name)
582    }
583
584    fn externalid2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
585        externalid2uuid!(self, name)
586    }
587
588    fn uuid2spn(&mut self, uuid: Uuid) -> Result<Option<Value>, OperationError> {
589        uuid2spn!(self, uuid)
590    }
591
592    fn uuid2rdn(&mut self, uuid: Uuid) -> Result<Option<String>, OperationError> {
593        uuid2rdn!(self, uuid)
594    }
595
596    fn list_idxs(&self) -> Result<Vec<String>, OperationError> {
597        // This is only used in tests or debug tools, so bypass the cache.
598        self.db.list_idxs()
599    }
600
601    fn list_id2entry(&self) -> Result<Vec<(u64, String)>, OperationError> {
602        // This is only used in tests or debug tools, so bypass the cache.
603        self.db.list_id2entry()
604    }
605
606    fn list_quarantined(&self) -> Result<Vec<(u64, String)>, OperationError> {
607        // No cache of quarantined entries.
608        self.db.list_quarantined()
609    }
610
611    fn list_index_content(
612        &self,
613        index_name: &str,
614    ) -> Result<Vec<(String, IDLBitRange)>, OperationError> {
615        // This is only used in tests or debug tools, so bypass the cache.
616        self.db.list_index_content(index_name)
617    }
618
619    fn get_id2entry(&self, id: u64) -> Result<(u64, String), OperationError> {
620        // This is only used in tests or debug tools, so bypass the cache.
621        self.db.get_id2entry(id)
622    }
623}
624
625impl IdlArcSqliteWriteTransaction<'_> {
626    #[cfg(any(test, debug_assertions))]
627    #[instrument(level = "debug", name = "idl_arc_sqlite::clear_cache", skip_all)]
628    pub fn clear_cache(&mut self) -> Result<(), OperationError> {
629        // I'm not sure rn if I want to reload these? If we reload these we kind of
630        // prevent verifications of the cached value working, but we also should
631        // clear these to check the db version of the value. Perhaps some extra
632        // dedicated testing needed?
633        /*
634         *self.op_ts_max = self.db.get_db_ts_max()?;
635         *self.allids = self.db.get_allids()?;
636         *self.maxid = self.get_id2entry_max_id()?;
637         */
638        self.entry_cache.clear();
639        self.idl_cache.clear();
640        self.idx_exists_cache.clear();
641        self.name_cache.clear();
642        Ok(())
643    }
644
645    #[instrument(level = "debug", name = "idl_arc_sqlite::commit", skip_all)]
646    pub fn commit(self) -> Result<(), OperationError> {
647        let IdlArcSqliteWriteTransaction {
648            db,
649            mut entry_cache,
650            mut idl_cache,
651            mut name_cache,
652            idx_exists_cache,
653            op_ts_max,
654            allids,
655            maxid,
656            keyhandles,
657        } = self;
658
659        // Write any dirty items to the disk.
660        entry_cache
661            .iter_mut_mark_clean()
662            .try_for_each(|(k, v)| match v {
663                Some(e) => db.write_identry(e),
664                None => db.delete_identry(*k),
665            })
666            .map_err(|e| {
667                admin_error!(?e, "Failed to sync entry cache to sqlite");
668                e
669            })?;
670
671        idl_cache
672            .iter_mut_mark_clean()
673            .try_for_each(|(k, v)| {
674                match v {
675                    Some(idl) => db.write_idl(&k.a, k.i, k.k.as_str(), idl),
676                    #[allow(clippy::unreachable)]
677                    None => {
678                        // Due to how we remove items, we always write an empty idl
679                        // to the cache, so this should never be none.
680                        //
681                        // If it is none, this means we have memory corruption so we MUST
682                        // panic.
683                        // Why is `v` the `Option` type then?
684                        unreachable!();
685                    }
686                }
687            })
688            .map_err(|e| {
689                admin_error!(?e, "Failed to sync idl cache to sqlite");
690                e
691            })?;
692
693        name_cache
694            .iter_mut_mark_clean()
695            .try_for_each(|(k, v)| match (k, v) {
696                (NameCacheKey::Name2Uuid(k), Some(NameCacheValue::U(v))) => {
697                    db.write_name2uuid_add(k, *v)
698                }
699                (NameCacheKey::Name2Uuid(k), None) => db.write_name2uuid_rem(k),
700                (NameCacheKey::ExternalId2Uuid(k), Some(NameCacheValue::U(v))) => {
701                    db.write_externalid2uuid_add(k, *v)
702                }
703                (NameCacheKey::ExternalId2Uuid(k), None) => db.write_externalid2uuid_rem(k),
704                (NameCacheKey::Uuid2Spn(uuid), Some(NameCacheValue::S(v))) => {
705                    db.write_uuid2spn(*uuid, Some(v))
706                }
707                (NameCacheKey::Uuid2Spn(uuid), None) => db.write_uuid2spn(*uuid, None),
708                (NameCacheKey::Uuid2Rdn(uuid), Some(NameCacheValue::R(v))) => {
709                    db.write_uuid2rdn(*uuid, Some(v))
710                }
711                (NameCacheKey::Uuid2Rdn(uuid), None) => db.write_uuid2rdn(*uuid, None),
712
713                _ => Err(OperationError::InvalidCacheState),
714            })
715            .map_err(|e| {
716                admin_error!(?e, "Failed to sync name cache to sqlite");
717                e
718            })?;
719
720        // Ensure the db commit succeeds first.
721        db.commit()?;
722
723        // Can no longer fail from this point.
724        op_ts_max.commit();
725        name_cache.commit();
726        idx_exists_cache.commit();
727        idl_cache.commit();
728        allids.commit();
729        maxid.commit();
730        keyhandles.commit();
731        // Unlock the entry cache last to remove contention on everything else.
732        entry_cache.commit();
733
734        Ok(())
735    }
736
737    pub fn get_db_ruv(&self) -> Result<BTreeSet<Cid>, OperationError> {
738        self.db.get_db_ruv()
739    }
740
741    pub fn write_db_ruv<I, J>(&mut self, added: I, removed: J) -> Result<(), OperationError>
742    where
743        I: Iterator<Item = Cid>,
744        J: Iterator<Item = Cid>,
745    {
746        self.db.write_db_ruv(added, removed)
747    }
748
749    pub fn get_id2entry_max_id(&self) -> Result<u64, OperationError> {
750        Ok(*self.maxid)
751    }
752
753    pub fn set_id2entry_max_id(&mut self, mid: u64) {
754        assert!(mid > *self.maxid);
755        *self.maxid = mid;
756    }
757
758    #[instrument(level = "trace", skip_all)]
759    pub fn write_identries<'b, I>(&'b mut self, mut entries: I) -> Result<(), OperationError>
760    where
761        I: Iterator<Item = &'b Entry<EntrySealed, EntryCommitted>>,
762    {
763        entries.try_for_each(|e| {
764            trace!("Inserting {:?} to cache", e.get_id());
765            if e.get_id() == 0 {
766                Err(OperationError::InvalidEntryId)
767            } else {
768                (*self.allids).insert_id(e.get_id());
769                self.entry_cache
770                    .insert_dirty(e.get_id(), Arc::new(e.clone()));
771                Ok(())
772            }
773        })
774    }
775
776    pub fn write_identries_raw<I>(&mut self, entries: I) -> Result<(), OperationError>
777    where
778        I: Iterator<Item = IdRawEntry>,
779    {
780        // Drop the entry cache.
781        self.entry_cache.clear();
782        // Write the raw ents
783        self.db
784            .write_identries_raw(entries)
785            .and_then(|()| self.db.get_allids())
786            .map(|mut ids| {
787                // Update allids since we cleared them and need to reset it in the cache.
788                std::mem::swap(self.allids.deref_mut(), &mut ids);
789            })
790    }
791
792    pub fn delete_identry<I>(&mut self, mut idl: I) -> Result<(), OperationError>
793    where
794        I: Iterator<Item = u64>,
795    {
796        idl.try_for_each(|i| {
797            trace!("Removing {:?} from cache", i);
798            if i == 0 {
799                Err(OperationError::InvalidEntryId)
800            } else {
801                (*self.allids).remove_id(i);
802                self.entry_cache.remove_dirty(i);
803                Ok(())
804            }
805        })
806    }
807
808    #[instrument(level = "trace", skip_all)]
809    pub fn write_idl(
810        &mut self,
811        attr: &Attribute,
812        itype: IndexType,
813        idx_key: &str,
814        idl: &IDLBitRange,
815    ) -> Result<(), OperationError> {
816        let cache_key = IdlCacheKey {
817            a: attr.clone(),
818            i: itype,
819            k: idx_key.into(),
820        };
821        // On idl == 0 the db will remove this, and synthesise an empty IdList on a miss
822        // but we can cache this as a new empty IdList instead, so that we can avoid the
823        // db lookup on this idl.
824        if idl.is_empty() {
825            self.idl_cache
826                .insert_dirty(cache_key, Box::new(IDLBitRange::new()));
827        } else {
828            self.idl_cache
829                .insert_dirty(cache_key, Box::new(idl.clone()));
830        }
831        // self.db.write_idl(audit, attr, itype, idx_key, idl)
832        Ok(())
833    }
834
835    pub fn optimise_dirty_idls(&mut self) {
836        self.idl_cache.iter_mut_dirty().for_each(|(k, maybe_idl)| {
837            if let Some(idl) = maybe_idl {
838                if idl.maybe_compress() {
839                    trace!(?k, "Compressed idl");
840                }
841            }
842        })
843    }
844
845    pub fn is_idx_slopeyness_generated(&self) -> Result<bool, OperationError> {
846        self.db.is_idx_slopeyness_generated()
847    }
848
849    pub fn get_idx_slope(&self, ikey: &IdxKey) -> Result<Option<IdxSlope>, OperationError> {
850        self.db.get_idx_slope(ikey)
851    }
852
853    /// Index Slope Analysis. For the purpose of external modules you can consider this as a
854    /// module that generates "weights" for each index that we have. Smaller values are faster
855    /// indexes - larger values are more costly ones. This is not intended to yield perfect
856    /// weights. The intent is to separate over obviously more effective indexes rather than
857    /// to min-max the fine tuning of these. Consider name=foo vs class=*. name=foo will always
858    /// be better than class=*, but comparing name=foo to spn=foo is "much over muchness" since
859    /// both are really fast.
860    pub fn analyse_idx_slopes(&mut self) -> Result<(), OperationError> {
861        /*
862         * Inside of this analysis there are two major factors we need to understand
863         *
864         * * What is the variation of idl lengths within an index?
865         * * How man keys are stored in this index?
866         *
867         * Since we have the filter2idl threshold, we want to find "what is the smallest
868         * and most unique index asap so we can exit faster". This allows us to avoid
869         * loading larger most costly indexes that either have large idls, high variation
870         * or few keys and are likely to miss and have to go out to disk.
871         *
872         * A few methods were proposed, but thanks to advice from Perri Boulton (psychology
873         * researcher with a background in statistics), we were able to device a reasonable
874         * approach.
875         *
876         * These are commented in line to help understand the process.
877         */
878
879        /*
880         * Step 1 - we have an index like "idx_eq_member". It has data that looks somewhat
881         * like:
882         *
883         *  key    | idl
884         *  -------+------------
885         *  uuid_a | [1, 2, 3, ...]
886         *  -------+------------
887         *  uuid_b | [4, 5, 6, ...]
888         *
889         * We need to collect this into a single vec of "how long is each idl". Since we have
890         * each idl in the vec, the length of the vec is also the number of keys in the set.
891         * This yields for us:
892         *
893         *   idx_eq_member: [4.0, 5.0, ...]
894         * where each f64 value is the float representation of the length of idl.
895         *
896         * We then assemble these to a map so we have each idxkey and it's associated list
897         * of idl lens.
898         */
899
900        let mut data: HashMap<IdxKey, Vec<f64>> = HashMap::new();
901        self.idl_cache.iter_dirty().for_each(|(k, maybe_idl)| {
902            if let Some(idl) = maybe_idl {
903                let idl_len: u32 = idl.len().try_into().unwrap_or(u32::MAX);
904                // Convert to something we can use.
905                let idl_len = f64::from(idl_len);
906
907                let kref = IdxKeyRef::new(&k.a, &k.i);
908                if idl_len > 0.0 {
909                    // It's worth looking at. Anything len 0 will be removed.
910                    if let Some(lens) = data.get_mut(&kref as &dyn IdxKeyToRef) {
911                        lens.push(idl_len)
912                    } else {
913                        data.insert(kref.as_key(), vec![idl_len]);
914                    }
915                }
916            }
917        });
918
919        /*
920        * So now for each of our sets:
921        *
922        *   idx_eq_member: [4.0, 5.0, ...]
923        *   idx_eq_name  : [1.0, 1.0, 1.0, ...]
924        *
925        * To get the variability, we calculate the normal distribution of the set of values
926        * and then using this variance we use the 1st deviation (~85%) value to assert that
927        * 85% or more of the values in this set will be "equal or less" than this length.*
928        *
929        * So given say:
930        *  [1.0, 1.0, 1.0, 1.0]
931        * We know that the sd_1 will be 1.0. Given:
932        *  [1.0, 1.0, 2.0, 3.0]
933        * We know that it will be ~2.57 (mean 1.75 + sd of 0.82).
934        *
935        * The other factor is number of keys. This is thankfully easy! We have that from
936        * vec.len().
937        *
938        * We can now calculate the index slope. Why is it a slope you ask? Because we
939        * plot the data out on a graph, with "variability" on the y axis, and number of
940        * keys on the x.
941        *
942        * Lets plot our data we just added.
943        *
944        *    |
945        *  4 +
946        *    |
947        *  3 +
948        *    |
949        *  2 +           *  eq_member
950        *    |
951        *  1 +           *  eq_name
952        *    |
953        *    +--+--+--+--+--
954        *       1  2  3  4
955        *
956        * Now, if we were to connect a line from (0,0) to each point we get a line with an angle.
957        *
958        *    |
959        *  4 +
960        *    |
961        *  3 +
962        *    |
963        *  2 +           *  eq_member
964        *    |
965        *  1 +           *  eq_name
966        *    |/---------/
967        *    +--+--+--+--+--
968        *       1  2  3  4
969
970        *    |
971        *  4 +
972        *    |
973        *  3 +
974        *    |
975        *  2 +           *  eq_member
976        *    |        /--/
977        *  1 +    /--/   *  eq_name
978        *    |/--/
979        *    +--+--+--+--+--
980        *       1  2  3  4
981        *
982        * (Look it's ascii art, don't judge.).
983        *
984        * Point is that eq_member is "steeper" and eq_name is "shallower". This is what we call
985        * the "slopeyness" aka the jank of the line, or more precisely, the angle.
986        *
987        * Now we need a way to numerically compare these lines. Since the points could be
988        * anywhere on our graph:
989        *
990        *    |
991        *  4 +  *
992        *    |
993        *  3 +         *
994        *    |
995        *  2 +     *
996        *    |
997        *  1 +           *
998        *    |
999        *    +--+--+--+--+--
1000        *       1  2  3  4
1001        *
1002        * While we can see what's obvious or best here, a computer has to know it. So we now
1003        * assume that these points construct a triangle, going through (0,0), (x, 0) and (x, y).
1004        *
1005        *
1006        *                Λ│
1007        *               ╱ │
1008        *              ╱  │
1009        *             ╱   │
1010        *            ╱    │
1011        *           ╱     │
1012        *          ╱      │
1013        *         ╱       │ sd_1
1014        *        ╱        │
1015        *       ╱         │
1016        *      ───────────┼
1017        *         nkeys
1018        *
1019        * Since this is right angled we can use arctan to work out the degrees of the line. This
1020        * gives us a value from 1.0 to 90.0 (We clamp to a minimum of 1.0, because we use 0 as "None"
1021        * in the NonZeroU8 type in filter.rs, which allows ZST optimisation)
1022        *
1023        * The problem is that we have to go from float to u8 - this means we lose decimal precision
1024        * in the conversion. To lessen this, we multiply by 2 to give some extra weight to each angle
1025        * to minimise this loss and then we convert.
1026        *
1027        * And there we have it! A slope factor of the index! A way to compare these sets quickly
1028        * at query optimisation time to minimise index access.
1029        */
1030        let slopes: HashMap<_, _> = data
1031            .into_iter()
1032            .filter_map(|(k, lens)| {
1033                let slope_factor = Self::calculate_sd_slope(&lens);
1034                if slope_factor == 0 || slope_factor == IdxSlope::MAX {
1035                    None
1036                } else {
1037                    Some((k, slope_factor))
1038                }
1039            })
1040            .collect();
1041        trace!(?slopes, "Generated slopes");
1042        // Write the data down
1043        self.db.store_idx_slope_analysis(&slopes)
1044    }
1045
1046    fn calculate_sd_slope(data: &[f64]) -> IdxSlope {
1047        let (n_keys, sd_1) = if data.len() >= 2 {
1048            // We can only do SD on sets greater than 2
1049            let l: u32 = data.len().try_into().unwrap_or(u32::MAX);
1050            let c = f64::from(l);
1051            let mean = data.iter().take(u32::MAX as usize).sum::<f64>() / c;
1052            let variance: f64 = data
1053                .iter()
1054                .take(u32::MAX as usize)
1055                .map(|len| {
1056                    let delta = mean - len;
1057                    delta * delta
1058                })
1059                .sum::<f64>()
1060                / (c - 1.0);
1061
1062            let sd = variance.sqrt();
1063
1064            // This is saying ~85% of values will be at least this len or less.
1065            let sd_1 = mean + sd;
1066            (c, sd_1)
1067        } else if data.len() == 1 {
1068            (1.0, data[0])
1069        } else {
1070            // Can't resolve.
1071            return IdxSlope::MAX;
1072        };
1073
1074        // Now we know sd_1 and number of keys. We can use this as a triangle to work out
1075        // the angle along the hypotenuse. We use this angle - or slope - to show which
1076        // elements have the smallest sd_1 and most keys available. Then because this
1077        // is bound between 0.0 -> 90.0, we "unfurl" this around a half circle by multiplying
1078        // by 2. This gives us a little more precision when we drop the decimal point.
1079        let sf = (sd_1 / n_keys).atan().to_degrees() * 2.8;
1080
1081        // Now these are fractions, and we can't use those in u8, so we clamp the min/max values
1082        // that we expect to be yielded.
1083        let sf = sf.clamp(1.0, 254.0);
1084        if !sf.is_finite() {
1085            IdxSlope::MAX
1086        } else {
1087            // SAFETY
1088            // `sf` is clamped between 1.0 and 180.0 above, ensuring it is
1089            // always in range.
1090            unsafe { sf.to_int_unchecked::<IdxSlope>() }
1091        }
1092    }
1093
1094    pub fn quarantine_entry(&self, id: u64) -> Result<(), OperationError> {
1095        self.db.quarantine_entry(id)
1096    }
1097
1098    pub fn restore_quarantined(&self, id: u64) -> Result<(), OperationError> {
1099        self.db.restore_quarantined(id)
1100    }
1101
1102    pub fn create_name2uuid(&self) -> Result<(), OperationError> {
1103        self.db.create_name2uuid()
1104    }
1105
1106    pub fn write_name2uuid_add(
1107        &mut self,
1108        uuid: Uuid,
1109        add: BTreeSet<String>,
1110    ) -> Result<(), OperationError> {
1111        add.into_iter().for_each(|k| {
1112            let cache_key = NameCacheKey::Name2Uuid(k);
1113            let cache_value = NameCacheValue::U(uuid);
1114            self.name_cache.insert_dirty(cache_key, cache_value)
1115        });
1116        Ok(())
1117    }
1118
1119    pub fn write_name2uuid_rem(&mut self, rem: BTreeSet<String>) -> Result<(), OperationError> {
1120        rem.into_iter().for_each(|k| {
1121            // why not just a for loop here...
1122            let cache_key = NameCacheKey::Name2Uuid(k);
1123            self.name_cache.remove_dirty(cache_key)
1124        });
1125        Ok(())
1126    }
1127
1128    pub fn create_externalid2uuid(&self) -> Result<(), OperationError> {
1129        self.db.create_externalid2uuid()
1130    }
1131
1132    pub fn write_externalid2uuid_add(
1133        &mut self,
1134        uuid: Uuid,
1135        add: String,
1136    ) -> Result<(), OperationError> {
1137        let cache_key = NameCacheKey::ExternalId2Uuid(add);
1138        let cache_value = NameCacheValue::U(uuid);
1139        self.name_cache.insert_dirty(cache_key, cache_value);
1140        Ok(())
1141    }
1142
1143    pub fn write_externalid2uuid_rem(&mut self, rem: String) -> Result<(), OperationError> {
1144        let cache_key = NameCacheKey::ExternalId2Uuid(rem);
1145        self.name_cache.remove_dirty(cache_key);
1146        Ok(())
1147    }
1148
1149    pub fn create_uuid2spn(&self) -> Result<(), OperationError> {
1150        self.db.create_uuid2spn()
1151    }
1152
1153    pub fn write_uuid2spn(&mut self, uuid: Uuid, k: Option<Value>) -> Result<(), OperationError> {
1154        let cache_key = NameCacheKey::Uuid2Spn(uuid);
1155        match k {
1156            Some(v) => self
1157                .name_cache
1158                .insert_dirty(cache_key, NameCacheValue::S(Box::new(v))),
1159            None => self.name_cache.remove_dirty(cache_key),
1160        }
1161        Ok(())
1162    }
1163
1164    pub fn create_uuid2rdn(&self) -> Result<(), OperationError> {
1165        self.db.create_uuid2rdn()
1166    }
1167
1168    pub fn write_uuid2rdn(&mut self, uuid: Uuid, k: Option<String>) -> Result<(), OperationError> {
1169        let cache_key = NameCacheKey::Uuid2Rdn(uuid);
1170        match k {
1171            Some(s) => self
1172                .name_cache
1173                .insert_dirty(cache_key, NameCacheValue::R(s)),
1174            None => self.name_cache.remove_dirty(cache_key),
1175        }
1176        Ok(())
1177    }
1178
1179    pub fn create_idx(&mut self, attr: &Attribute, itype: IndexType) -> Result<(), OperationError> {
1180        self.db.create_idx(attr, itype)?;
1181
1182        // Cache that this exists since we just made it.
1183        let idx_key = IdxNameKey {
1184            a: attr.clone(),
1185            i: itype,
1186        };
1187        self.idx_exists_cache.insert(idx_key, true);
1188
1189        Ok(())
1190    }
1191
1192    /// ⚠️  - This function will destroy all indexes in the database.
1193    ///
1194    /// It should only be called internally by the backend in limited and
1195    /// specific situations.
1196    #[instrument(level = "trace", skip_all)]
1197    pub fn danger_purge_idxs(&mut self) -> Result<(), OperationError> {
1198        debug!("CLEARING CACHE");
1199        self.db.danger_purge_idxs().map(|()| {
1200            self.idl_cache.clear();
1201            self.idx_exists_cache.clear();
1202            self.name_cache.clear();
1203        })
1204    }
1205
1206    /// ⚠️  - This function will destroy all entries in the database.
1207    ///
1208    /// It should only be called internally by the backend in limited and
1209    /// specific situations.
1210    #[instrument(level = "trace", skip_all)]
1211    pub fn danger_purge_id2entry(&mut self) -> Result<(), OperationError> {
1212        self.db.danger_purge_id2entry().map(|()| {
1213            let mut ids = IDLBitRange::new();
1214            ids.compress();
1215            std::mem::swap(self.allids.deref_mut(), &mut ids);
1216            self.entry_cache.clear();
1217        })
1218    }
1219
1220    pub fn write_db_s_uuid(&self, nsid: Uuid) -> Result<(), OperationError> {
1221        self.db.write_db_s_uuid(nsid)
1222    }
1223
1224    pub fn write_db_d_uuid(&self, nsid: Uuid) -> Result<(), OperationError> {
1225        self.db.write_db_d_uuid(nsid)
1226    }
1227
1228    pub fn set_db_ts_max(&mut self, ts: Duration) -> Result<(), OperationError> {
1229        *self.op_ts_max = Some(ts);
1230        self.db.set_db_ts_max(ts)
1231    }
1232
1233    pub(crate) fn get_db_index_version(&self) -> Result<i64, OperationError> {
1234        self.db.get_db_index_version()
1235    }
1236
1237    pub(crate) fn set_db_index_version(&self, v: i64) -> Result<(), OperationError> {
1238        self.db.set_db_index_version(v)
1239    }
1240
1241    pub fn setup(&mut self) -> Result<(), OperationError> {
1242        self.db
1243            .setup()
1244            .and_then(|()| self.db.get_allids())
1245            .map(|mut ids| {
1246                std::mem::swap(self.allids.deref_mut(), &mut ids);
1247            })
1248            .and_then(|()| self.db.get_id2entry_max_id())
1249            .map(|mid| {
1250                *self.maxid = mid;
1251            })
1252    }
1253}
1254
1255impl IdlArcSqlite {
1256    pub fn new(cfg: &BackendConfig, vacuum: bool) -> Result<Self, OperationError> {
1257        let db = IdlSqlite::new(cfg, vacuum)?;
1258
1259        // Autotune heuristic.
1260        let mut cache_size = cfg.arcsize.unwrap_or_else(|| {
1261            // Due to changes in concread, we can now scale this up! We now aim for 120%
1262            // of entries.
1263            db.get_allids_count()
1264                .map(|c| {
1265                    let tmpsize = ((c / 5) as usize) * 6;
1266                    // if our calculation's too small anyway, just set it to the minimum target
1267                    std::cmp::max(tmpsize, DEFAULT_CACHE_TARGET)
1268                })
1269                .unwrap_or(DEFAULT_CACHE_TARGET)
1270        });
1271
1272        if cache_size < DEFAULT_CACHE_TARGET {
1273            admin_warn!(
1274                old = cache_size,
1275                new = DEFAULT_CACHE_TARGET,
1276                "Configured Arc Cache size too low, increasing..."
1277            );
1278            cache_size = DEFAULT_CACHE_TARGET; // this being above the log was an uncaught bug
1279        }
1280
1281        let entry_cache = ARCacheBuilder::new()
1282            .set_expected_workload(
1283                cache_size,
1284                cfg.pool_size as usize,
1285                DEFAULT_CACHE_RMISS,
1286                DEFAULT_CACHE_WMISS,
1287                false,
1288            )
1289            .set_reader_quiesce(true)
1290            .build()
1291            .ok_or_else(|| {
1292                admin_error!("Failed to construct entry_cache");
1293                OperationError::InvalidState
1294            })?;
1295        // The idl cache should have smaller items, and is critical for fast searches
1296        // so we allow it to have a higher ratio of items relative to the entries.
1297        let idl_cache = ARCacheBuilder::new()
1298            .set_expected_workload(
1299                cache_size * DEFAULT_IDL_CACHE_RATIO,
1300                cfg.pool_size as usize,
1301                DEFAULT_CACHE_RMISS,
1302                DEFAULT_CACHE_WMISS,
1303                false,
1304            )
1305            .set_reader_quiesce(true)
1306            .build()
1307            .ok_or_else(|| {
1308                admin_error!("Failed to construct idl_cache");
1309                OperationError::InvalidState
1310            })?;
1311
1312        let name_cache = ARCacheBuilder::new()
1313            .set_expected_workload(
1314                cache_size * DEFAULT_NAME_CACHE_RATIO,
1315                cfg.pool_size as usize,
1316                DEFAULT_CACHE_RMISS,
1317                DEFAULT_CACHE_WMISS,
1318                true,
1319            )
1320            .set_reader_quiesce(true)
1321            .build()
1322            .ok_or_else(|| {
1323                admin_error!("Failed to construct name_cache");
1324                OperationError::InvalidState
1325            })?;
1326
1327        let idx_exists_cache = ARCacheBuilder::new()
1328            .set_expected_workload(
1329                DEFAULT_IDX_EXISTS_TARGET,
1330                cfg.pool_size as usize,
1331                DEFAULT_IDX_CACHE_RMISS,
1332                DEFAULT_IDX_CACHE_WMISS,
1333                true,
1334            )
1335            .set_reader_quiesce(true)
1336            .build()
1337            .ok_or_else(|| {
1338                admin_error!("Failed to construct idx_exists_cache");
1339                OperationError::InvalidState
1340            })?;
1341
1342        let allids = CowCell::new(IDLBitRange::new());
1343
1344        let maxid = CowCell::new(0);
1345
1346        let keyhandles = CowCell::new(HashMap::default());
1347
1348        let op_ts_max = CowCell::new(None);
1349
1350        Ok(IdlArcSqlite {
1351            db,
1352            entry_cache,
1353            idl_cache,
1354            name_cache,
1355            idx_exists_cache,
1356            op_ts_max,
1357            allids,
1358            maxid,
1359            keyhandles,
1360        })
1361    }
1362
1363    pub fn try_quiesce(&self) {
1364        self.entry_cache.try_quiesce();
1365        self.idl_cache.try_quiesce();
1366        self.name_cache.try_quiesce();
1367    }
1368
1369    pub fn read(&self) -> Result<IdlArcSqliteReadTransaction, OperationError> {
1370        // IMPORTANT! Always take entrycache FIRST
1371        let entry_cache_read = self.entry_cache.read();
1372        let db_read = self.db.read()?;
1373        let idl_cache_read = self.idl_cache.read();
1374        let name_cache_read = self.name_cache.read();
1375        let idx_exists_cache_read = self.idx_exists_cache.read();
1376        let allids_read = self.allids.read();
1377
1378        Ok(IdlArcSqliteReadTransaction {
1379            db: db_read,
1380            entry_cache: entry_cache_read,
1381            idl_cache: idl_cache_read,
1382            name_cache: name_cache_read,
1383            idx_exists_cache: idx_exists_cache_read,
1384            allids: allids_read,
1385        })
1386    }
1387
1388    pub fn write(&self) -> Result<IdlArcSqliteWriteTransaction, OperationError> {
1389        // IMPORTANT! Always take entrycache FIRST
1390        let entry_cache_write = self.entry_cache.write();
1391        let db_write = self.db.write()?;
1392        let idl_cache_write = self.idl_cache.write();
1393        let name_cache_write = self.name_cache.write();
1394        let idx_exists_cache_write = self.idx_exists_cache.write();
1395        let op_ts_max_write = self.op_ts_max.write();
1396        let allids_write = self.allids.write();
1397        let maxid_write = self.maxid.write();
1398        let keyhandles_write = self.keyhandles.write();
1399
1400        Ok(IdlArcSqliteWriteTransaction {
1401            db: db_write,
1402            entry_cache: entry_cache_write,
1403            idl_cache: idl_cache_write,
1404            name_cache: name_cache_write,
1405            idx_exists_cache: idx_exists_cache_write,
1406            op_ts_max: op_ts_max_write,
1407            allids: allids_write,
1408            maxid: maxid_write,
1409            keyhandles: keyhandles_write,
1410        })
1411    }
1412
1413    /*
1414    pub fn stats_audit(&self, audit: &mut AuditScope) {
1415        let entry_stats = self.entry_cache.view_stats();
1416        let idl_stats = self.idl_cache.view_stats();
1417        ladmin_info!(audit, "entry_cache stats -> {:?}", *entry_stats);
1418        ladmin_info!(audit, "idl_cache stats -> {:?}", *idl_stats);
1419    }
1420    */
1421}