kanidmd_lib/be/
idl_sqlite.rs

1use super::keystorage::{KeyHandle, KeyHandleId};
2use crate::be::dbentry::DbIdentSpn;
3use crate::be::dbvalue::DbCidV1;
4use crate::be::{BackendConfig, IdList, IdRawEntry, IdxKey, IdxSlope};
5use crate::entry::{Entry, EntryCommitted, EntrySealed};
6use crate::prelude::*;
7use crate::value::{IndexType, Value};
8use hashbrown::HashMap;
9use idlset::v2::IDLBitRange;
10use kanidm_proto::internal::{ConsistencyError, OperationError};
11use rusqlite::vtab::array::Array;
12use rusqlite::{Connection, OpenFlags, OptionalExtension};
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::convert::{TryFrom, TryInto};
15use std::sync::Arc;
16use std::sync::Mutex;
17use std::time::Duration;
18use uuid::Uuid;
19
20const DBV_ID2ENTRY: &str = "id2entry";
21const DBV_INDEXV: &str = "indexv";
22
23#[allow(clippy::needless_pass_by_value)] // needs to accept value from `map_err`
24pub(super) fn sqlite_error(e: rusqlite::Error) -> OperationError {
25    admin_error!(?e, "SQLite Error");
26    OperationError::SqliteError
27}
28
29#[allow(clippy::needless_pass_by_value)] // needs to accept value from `map_err`
30pub(super) fn serde_json_error(e: serde_json::Error) -> OperationError {
31    admin_error!(?e, "Serde JSON Error");
32    OperationError::SerdeJsonError
33}
34
35type ConnPool = Arc<Mutex<VecDeque<Connection>>>;
36
37#[derive(Debug)]
38pub struct IdSqliteEntry {
39    id: i64,
40    data: Vec<u8>,
41}
42
43#[derive(Debug)]
44struct KeyIdl {
45    key: String,
46    data: Vec<u8>,
47}
48
49impl TryFrom<IdSqliteEntry> for IdRawEntry {
50    type Error = OperationError;
51
52    fn try_from(value: IdSqliteEntry) -> Result<Self, Self::Error> {
53        if value.id <= 0 {
54            return Err(OperationError::InvalidEntryId);
55        }
56        Ok(IdRawEntry {
57            id: value
58                .id
59                .try_into()
60                .map_err(|_| OperationError::InvalidEntryId)?,
61            data: value.data,
62        })
63    }
64}
65
66impl TryFrom<IdRawEntry> for IdSqliteEntry {
67    type Error = OperationError;
68
69    fn try_from(value: IdRawEntry) -> Result<Self, Self::Error> {
70        if value.id == 0 {
71            return Err(OperationError::InvalidEntryId);
72        }
73        Ok(IdSqliteEntry {
74            id: value
75                .id
76                .try_into()
77                .map_err(|_| OperationError::InvalidEntryId)?,
78            data: value.data,
79        })
80    }
81}
82
83#[derive(Clone)]
84pub struct IdlSqlite {
85    pool: ConnPool,
86    db_name: &'static str,
87}
88
89pub struct IdlSqliteReadTransaction {
90    pool: ConnPool,
91    conn: Option<Connection>,
92    db_name: &'static str,
93}
94
95pub struct IdlSqliteWriteTransaction {
96    pool: ConnPool,
97    conn: Option<Connection>,
98    db_name: &'static str,
99}
100
101pub(crate) trait IdlSqliteTransaction {
102    fn get_db_name(&self) -> &str;
103
104    fn get_conn(&self) -> Result<&Connection, OperationError>;
105
106    fn get_identry(&self, idl: &IdList) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
107        self.get_identry_raw(idl)?
108            .into_iter()
109            .map(|ide| ide.into_entry().map(Arc::new))
110            .collect()
111    }
112
113    fn get_identry_raw(&self, idl: &IdList) -> Result<Vec<IdRawEntry>, OperationError> {
114        // is the idl allids?
115        match idl {
116            IdList::AllIds => {
117                let mut stmt = self
118                    .get_conn()?
119                    .prepare(&format!(
120                        "SELECT id, data FROM {}.id2entry",
121                        self.get_db_name()
122                    ))
123                    .map_err(sqlite_error)?;
124                let id2entry_iter = stmt
125                    .query_map([], |row| {
126                        Ok(IdSqliteEntry {
127                            id: row.get(0)?,
128                            data: row.get(1)?,
129                        })
130                    })
131                    .map_err(sqlite_error)?;
132                id2entry_iter
133                    .map(|v| {
134                        v.map_err(sqlite_error).and_then(|ise| {
135                            // Convert the idsqlite to id raw
136                            ise.try_into()
137                        })
138                    })
139                    .collect()
140            }
141            IdList::Partial(idli) | IdList::PartialThreshold(idli) | IdList::Indexed(idli) => {
142                let mut stmt = self
143                    .get_conn()?
144                    .prepare(&format!(
145                        "SELECT id, data FROM {}.id2entry
146                         WHERE id IN rarray(:idli)",
147                        self.get_db_name()
148                    ))
149                    .map_err(sqlite_error)?;
150
151                // turn them into i64's
152                let mut id_list: Vec<i64> = vec![];
153                for id in idli {
154                    id_list.push(i64::try_from(id).map_err(|_| OperationError::InvalidEntryId)?);
155                }
156                // turn them into rusqlite values
157                let id_list: Array = std::rc::Rc::new(
158                    id_list
159                        .into_iter()
160                        .map(rusqlite::types::Value::from)
161                        .collect::<Vec<rusqlite::types::Value>>(),
162                );
163
164                let mut results: Vec<IdRawEntry> = vec![];
165
166                let rows = stmt.query_map(named_params! {":idli": &id_list}, |row| {
167                    Ok(IdSqliteEntry {
168                        id: row.get(0)?,
169                        data: row.get(1)?,
170                    })
171                });
172                let rows = match rows {
173                    Ok(rows) => rows,
174                    Err(e) => {
175                        error!("query failed in get_identry_raw: {:?}", e);
176                        return Err(OperationError::SqliteError);
177                    }
178                };
179
180                for row in rows {
181                    match row {
182                        Ok(ise) => {
183                            // Convert the idsqlite to id raw
184                            results.push(ise.try_into()?);
185                        }
186                        // TODO: make this a better error
187                        Err(e) => {
188                            admin_error!(?e, "SQLite Error in get_identry_raw");
189                            return Err(OperationError::SqliteError);
190                        }
191                    }
192                }
193                Ok(results)
194            }
195        }
196    }
197
198    fn exists_table(&self, tname: &str) -> Result<bool, OperationError> {
199        let mut stmt = self
200            .get_conn()?
201            .prepare(&format!(
202                "SELECT rowid from {}.sqlite_master where type=\"table\" AND name = :tname LIMIT 1",
203                self.get_db_name()
204            ))
205            .map_err(sqlite_error)?;
206
207        let i: Option<i64> = stmt
208            .query_row(&[(":tname", tname)], |row| row.get(0))
209            // If the row doesn't exist, we don't mind.
210            .optional()
211            .map_err(sqlite_error)?;
212
213        match i {
214            None | Some(0) => Ok(false),
215            _ => Ok(true),
216        }
217    }
218
219    fn exists_idx(&self, attr: &Attribute, itype: IndexType) -> Result<bool, OperationError> {
220        let tname = format!("idx_{}_{}", itype.as_idx_str(), attr.as_str());
221        self.exists_table(&tname)
222    }
223
224    #[instrument(level = "trace", skip_all)]
225    fn get_idl(
226        &self,
227        attr: &Attribute,
228        itype: IndexType,
229        idx_key: &str,
230    ) -> Result<Option<IDLBitRange>, OperationError> {
231        if !(self.exists_idx(attr, itype)?) {
232            debug!(
233                "IdlSqliteTransaction: Index {:?} {:?} not found",
234                itype, attr
235            );
236            return Ok(None);
237        }
238        // The table exists - lets now get the actual index itself.
239
240        let query = format!(
241            "SELECT idl FROM {}.idx_{}_{} WHERE key = :idx_key",
242            self.get_db_name(),
243            itype.as_idx_str(),
244            attr.as_str()
245        );
246        let mut stmt = self.get_conn()?.prepare(&query).map_err(sqlite_error)?;
247        let idl_raw: Option<Vec<u8>> = stmt
248            .query_row(&[(":idx_key", &idx_key)], |row| row.get(0))
249            // We don't mind if it doesn't exist
250            .optional()
251            .map_err(sqlite_error)?;
252
253        let idl = match idl_raw {
254            Some(d) => serde_json::from_slice(d.as_slice()).map_err(serde_json_error)?,
255            // We don't have this value, it must be empty (or we
256            // have a corrupted index .....
257            None => IDLBitRange::new(),
258        };
259        trace!(
260            miss_index = ?itype,
261            attr = ?attr,
262            idl = %idl,
263        );
264
265        Ok(Some(idl))
266    }
267
268    fn name2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
269        // The table exists - lets now get the actual index itself.
270        let mut stmt = self
271            .get_conn()?
272            .prepare(&format!(
273                "SELECT uuid FROM {}.idx_name2uuid WHERE name = :name",
274                self.get_db_name()
275            ))
276            .map_err(sqlite_error)?;
277        let uuid_raw: Option<String> = stmt
278            .query_row(&[(":name", &name)], |row| row.get(0))
279            // We don't mind if it doesn't exist
280            .optional()
281            .map_err(sqlite_error)?;
282
283        let uuid = uuid_raw.as_ref().and_then(|u| Uuid::parse_str(u).ok());
284
285        Ok(uuid)
286    }
287
288    fn externalid2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
289        // The table exists - lets now get the actual index itself.
290        let mut stmt = self
291            .get_conn()?
292            .prepare(&format!(
293                "SELECT uuid FROM {}.idx_externalid2uuid WHERE eid = :eid",
294                self.get_db_name()
295            ))
296            .map_err(sqlite_error)?;
297        let uuid_raw: Option<String> = stmt
298            .query_row(&[(":eid", &name)], |row| row.get(0))
299            // We don't mind if it doesn't exist
300            .optional()
301            .map_err(sqlite_error)?;
302
303        let uuid = uuid_raw.as_ref().and_then(|u| Uuid::parse_str(u).ok());
304
305        Ok(uuid)
306    }
307
308    fn uuid2spn(&mut self, uuid: Uuid) -> Result<Option<Value>, OperationError> {
309        let uuids = uuid.as_hyphenated().to_string();
310        // The table exists - lets now get the actual index itself.
311        let mut stmt = self
312            .get_conn()?
313            .prepare(&format!(
314                "SELECT spn FROM {}.idx_uuid2spn WHERE uuid = :uuid",
315                self.get_db_name()
316            ))
317            .map_err(sqlite_error)?;
318        let spn_raw: Option<Vec<u8>> = stmt
319            .query_row(&[(":uuid", &uuids)], |row| row.get(0))
320            // We don't mind if it doesn't exist
321            .optional()
322            .map_err(sqlite_error)?;
323
324        let spn: Option<Value> = match spn_raw {
325            Some(d) => {
326                let dbv: DbIdentSpn =
327                    serde_json::from_slice(d.as_slice()).map_err(serde_json_error)?;
328
329                Some(Value::from(dbv))
330            }
331            None => None,
332        };
333
334        Ok(spn)
335    }
336
337    fn uuid2rdn(&mut self, uuid: Uuid) -> Result<Option<String>, OperationError> {
338        let uuids = uuid.as_hyphenated().to_string();
339        // The table exists - lets now get the actual index itself.
340        let mut stmt = self
341            .get_conn()?
342            .prepare(&format!(
343                "SELECT rdn FROM {}.idx_uuid2rdn WHERE uuid = :uuid",
344                self.get_db_name()
345            ))
346            .map_err(sqlite_error)?;
347        let rdn: Option<String> = stmt
348            .query_row(&[(":uuid", &uuids)], |row| row.get(0))
349            // We don't mind if it doesn't exist
350            .optional()
351            .map_err(sqlite_error)?;
352
353        Ok(rdn)
354    }
355
356    fn get_db_s_uuid(&self) -> Result<Option<Uuid>, OperationError> {
357        // Try to get a value.
358        let data: Option<Vec<u8>> = self
359            .get_conn()?
360            .query_row(
361                &format!(
362                    "SELECT data FROM {}.db_sid WHERE id = 2",
363                    self.get_db_name()
364                ),
365                [],
366                |row| row.get(0),
367            )
368            .optional()
369            // this whole map call is useless
370            .map(|e_opt| {
371                // If we have a row, we try to make it a sid
372                e_opt.map(|e| {
373                    let y: Vec<u8> = e;
374                    y
375                })
376                // If no sid, we return none.
377            })
378            .map_err(|_| OperationError::SqliteError)?;
379
380        Ok(match data {
381            Some(d) => Some(
382                serde_json::from_slice(d.as_slice())
383                    .or_else(|e| serde_cbor::from_slice(d.as_slice()).map_err(|_| e))
384                    .map_err(|e| {
385                        admin_error!(immediate = true, ?e, "CRITICAL: Serde CBOR Error");
386                        eprintln!("CRITICAL: Serde CBOR Error -> {e:?}");
387                        OperationError::SerdeCborError
388                    })?,
389            ),
390            None => None,
391        })
392    }
393
394    fn get_db_d_uuid(&self) -> Result<Option<Uuid>, OperationError> {
395        // Try to get a value.
396        let data: Option<Vec<u8>> = self
397            .get_conn()?
398            .query_row(
399                &format!(
400                    "SELECT data FROM {}.db_did WHERE id = 2",
401                    self.get_db_name()
402                ),
403                [],
404                |row| row.get(0),
405            )
406            .optional()
407            // this whole map call is useless
408            .map(|e_opt| {
409                // If we have a row, we try to make it a sid
410                e_opt.map(|e| {
411                    let y: Vec<u8> = e;
412                    y
413                })
414                // If no sid, we return none.
415            })
416            .map_err(|_| OperationError::SqliteError)?;
417
418        Ok(match data {
419            Some(d) => Some(
420                serde_json::from_slice(d.as_slice())
421                    .or_else(|e| serde_cbor::from_slice(d.as_slice()).map_err(|_| e))
422                    .map_err(|e| {
423                        admin_error!(immediate = true, ?e, "CRITICAL: Serde CBOR Error");
424                        eprintln!("CRITICAL: Serde CBOR Error -> {e:?}");
425                        OperationError::SerdeCborError
426                    })?,
427            ),
428            None => None,
429        })
430    }
431
432    fn get_db_ts_max(&self) -> Result<Option<Duration>, OperationError> {
433        // Try to get a value.
434        let data: Option<Vec<u8>> = self
435            .get_conn()?
436            .query_row(
437                &format!(
438                    "SELECT data FROM {}.db_op_ts WHERE id = 1",
439                    self.get_db_name()
440                ),
441                [],
442                |row| row.get(0),
443            )
444            .optional()
445            .map(|e_opt| {
446                // If we have a row, we try to make it a sid
447                e_opt.map(|e| {
448                    let y: Vec<u8> = e;
449                    y
450                })
451                // If no sid, we return none.
452            })
453            .map_err(sqlite_error)?;
454
455        Ok(match data {
456            Some(d) => Some(
457                serde_json::from_slice(d.as_slice())
458                    .or_else(|_| serde_cbor::from_slice(d.as_slice()))
459                    .map_err(|e| {
460                        admin_error!(immediate = true, ?e, "CRITICAL: Serde JSON Error");
461                        eprintln!("CRITICAL: Serde JSON Error -> {e:?}");
462                        OperationError::SerdeJsonError
463                    })?,
464            ),
465            None => None,
466        })
467    }
468
469    fn get_key_handles(&mut self) -> Result<BTreeMap<KeyHandleId, KeyHandle>, OperationError> {
470        let mut stmt = self
471            .get_conn()?
472            .prepare(&format!(
473                "SELECT id, data FROM {}.keyhandles",
474                self.get_db_name()
475            ))
476            .map_err(sqlite_error)?;
477
478        let kh_iter = stmt
479            .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
480            .map_err(sqlite_error)?;
481
482        kh_iter
483            .map(|v| {
484                let (id, data): (Vec<u8>, Vec<u8>) = v.map_err(sqlite_error)?;
485                let id = serde_json::from_slice(id.as_slice()).map_err(serde_json_error)?;
486                let data = serde_json::from_slice(data.as_slice()).map_err(serde_json_error)?;
487                Ok((id, data))
488            })
489            .collect()
490    }
491
492    #[instrument(level = "debug", name = "idl_sqlite::get_allids", skip_all)]
493    fn get_allids(&self) -> Result<IDLBitRange, OperationError> {
494        let mut stmt = self
495            .get_conn()?
496            .prepare(&format!("SELECT id FROM {}.id2entry", self.get_db_name()))
497            .map_err(sqlite_error)?;
498        let res = stmt.query_map([], |row| row.get(0)).map_err(sqlite_error)?;
499        let mut ids: Result<IDLBitRange, _> = res
500            .map(|v| {
501                v.map_err(sqlite_error).and_then(|id: i64| {
502                    // Convert the idsqlite to id raw
503                    id.try_into().map_err(|e| {
504                        admin_error!(?e, "I64 Parse Error");
505                        OperationError::SqliteError
506                    })
507                })
508            })
509            .collect();
510        if let Ok(i) = &mut ids {
511            i.compress()
512        }
513        ids
514    }
515
516    fn list_idxs(&self) -> Result<Vec<String>, OperationError> {
517        let mut stmt = self
518            .get_conn()?
519            .prepare(&format!(
520                "SELECT name from {}.sqlite_master where type='table' and name GLOB 'idx_*'",
521                self.get_db_name()
522            ))
523            .map_err(sqlite_error)?;
524        let idx_table_iter = stmt.query_map([], |row| row.get(0)).map_err(sqlite_error)?;
525
526        idx_table_iter.map(|v| v.map_err(sqlite_error)).collect()
527    }
528
529    fn list_id2entry(&self) -> Result<Vec<(u64, String)>, OperationError> {
530        let allids = self.get_identry_raw(&IdList::AllIds)?;
531        allids
532            .into_iter()
533            .map(|data| data.into_dbentry().map(|(id, db_e)| (id, db_e.to_string())))
534            .collect()
535    }
536
537    fn list_quarantined(&self) -> Result<Vec<(u64, String)>, OperationError> {
538        // This is a more direct version of get_identry_raw adapted for the simpler
539        // quarantine setup.
540        let mut stmt = self
541            .get_conn()?
542            .prepare(&format!(
543                "SELECT id, data FROM {}.id2entry_quarantine",
544                self.get_db_name()
545            ))
546            .map_err(sqlite_error)?;
547        let id2entry_iter = stmt
548            .query_map([], |row| {
549                Ok(IdSqliteEntry {
550                    id: row.get(0)?,
551                    data: row.get(1)?,
552                })
553            })
554            .map_err(sqlite_error)?;
555        let allids = id2entry_iter
556            .map(|v| {
557                v.map_err(sqlite_error).and_then(|ise| {
558                    // Convert the idsqlite to id raw
559                    ise.try_into()
560                })
561            })
562            .collect::<Result<Vec<IdRawEntry>, _>>()?;
563
564        allids
565            .into_iter()
566            .map(|data| data.into_dbentry().map(|(id, db_e)| (id, db_e.to_string())))
567            .collect()
568    }
569
570    fn get_id2entry(&self, id: u64) -> Result<(u64, String), OperationError> {
571        let idl = IdList::Indexed(IDLBitRange::from_u64(id));
572        let mut allids = self.get_identry_raw(&idl)?;
573        allids
574            .pop()
575            .ok_or(OperationError::InvalidEntryId)
576            .and_then(|data| {
577                data.into_dbentry()
578                    .map(|(id, db_e)| (id, format!("{db_e:?}")))
579            })
580    }
581
582    fn list_index_content(
583        &self,
584        index_name: &str,
585    ) -> Result<Vec<(String, IDLBitRange)>, OperationError> {
586        // TODO: Once we have slopes we can add .exists_table, and assert
587        // it's an idx table.
588
589        let query = format!("SELECT key, idl FROM {}.{}", self.get_db_name(), index_name);
590        let mut stmt = self
591            .get_conn()?
592            .prepare(query.as_str())
593            .map_err(sqlite_error)?;
594
595        let idx_iter = stmt
596            .query_map([], |row| {
597                Ok(KeyIdl {
598                    key: row.get(0)?,
599                    data: row.get(1)?,
600                })
601            })
602            .map_err(sqlite_error)?;
603        idx_iter
604            .map(|v| {
605                v.map_err(sqlite_error).and_then(|KeyIdl { key, data }| {
606                    serde_json::from_slice(data.as_slice())
607                        .map_err(serde_json_error)
608                        .map(|idl| (key, idl))
609                })
610            })
611            .collect()
612    }
613
614    // This allow is critical as it resolves a life time issue in stmt.
615    #[allow(clippy::let_and_return)]
616    fn verify(&self) -> Vec<Result<(), ConsistencyError>> {
617        let Ok(conn) = self.get_conn() else {
618            return vec![Err(ConsistencyError::SqliteIntegrityFailure)];
619        };
620
621        let Ok(mut stmt) = conn.prepare("PRAGMA integrity_check;") else {
622            return vec![Err(ConsistencyError::SqliteIntegrityFailure)];
623        };
624
625        // Allow this as it actually extends the life of stmt
626        let r = match stmt.query([]) {
627            Ok(mut rows) => match rows.next() {
628                Ok(Some(v)) => {
629                    let r: Result<String, _> = v.get(0);
630                    match r {
631                        Ok(t) if t == "ok" => Vec::with_capacity(0),
632                        _ => vec![Err(ConsistencyError::SqliteIntegrityFailure)],
633                    }
634                }
635                _ => vec![Err(ConsistencyError::SqliteIntegrityFailure)],
636            },
637            Err(_) => vec![Err(ConsistencyError::SqliteIntegrityFailure)],
638        };
639        r
640    }
641}
642
643impl IdlSqliteTransaction for IdlSqliteReadTransaction {
644    fn get_db_name(&self) -> &str {
645        self.db_name
646    }
647
648    fn get_conn(&self) -> Result<&Connection, OperationError> {
649        self.conn
650            .as_ref()
651            .ok_or(OperationError::TransactionAlreadyCommitted)
652    }
653}
654
655impl Drop for IdlSqliteReadTransaction {
656    // Abort - so far this has proven reliable to use drop here.
657    fn drop(&mut self) {
658        let mut dropping = None;
659        std::mem::swap(&mut dropping, &mut self.conn);
660
661        if let Some(conn) = dropping {
662            #[allow(clippy::expect_used)]
663            conn.execute("ROLLBACK TRANSACTION", [])
664                .expect("Unable to rollback transaction! Can not proceed!!!");
665
666            #[allow(clippy::expect_used)]
667            self.pool
668                .lock()
669                .expect("Unable to access db pool")
670                .push_back(conn);
671        }
672    }
673}
674
675impl IdlSqliteReadTransaction {
676    pub fn new(
677        pool: ConnPool,
678        conn: Connection,
679        db_name: &'static str,
680    ) -> Result<Self, OperationError> {
681        // Start the transaction
682        //
683        // I'm happy for this to be an expect, because this is a huge failure
684        // of the server ... but if it happens a lot we should consider making
685        // this a Result<>
686        //
687        // There is no way to flag this is an RO operation.
688        conn.execute("BEGIN DEFERRED TRANSACTION", [])
689            .map_err(sqlite_error)?;
690
691        Ok(IdlSqliteReadTransaction {
692            pool,
693            conn: Some(conn),
694            db_name,
695        })
696    }
697}
698
699impl IdlSqliteTransaction for IdlSqliteWriteTransaction {
700    fn get_db_name(&self) -> &str {
701        self.db_name
702    }
703
704    fn get_conn(&self) -> Result<&Connection, OperationError> {
705        self.conn
706            .as_ref()
707            .ok_or(OperationError::TransactionAlreadyCommitted)
708    }
709}
710
711impl Drop for IdlSqliteWriteTransaction {
712    // Abort
713    fn drop(&mut self) {
714        let mut dropping = None;
715        std::mem::swap(&mut dropping, &mut self.conn);
716
717        if let Some(conn) = dropping {
718            #[allow(clippy::expect_used)]
719            conn.execute("ROLLBACK TRANSACTION", [])
720                .expect("Unable to rollback transaction! Can not proceed!!!");
721
722            #[allow(clippy::expect_used)]
723            self.pool
724                .lock()
725                .expect("Unable to access db pool")
726                .push_back(conn);
727        }
728    }
729}
730
731impl IdlSqliteWriteTransaction {
732    pub fn new(
733        pool: ConnPool,
734        conn: Connection,
735        db_name: &'static str,
736    ) -> Result<Self, OperationError> {
737        // Start the transaction
738        conn.execute("BEGIN EXCLUSIVE TRANSACTION", [])
739            .map_err(sqlite_error)?;
740        Ok(IdlSqliteWriteTransaction {
741            pool,
742            conn: Some(conn),
743            db_name,
744        })
745    }
746
747    #[instrument(level = "debug", name = "idl_sqlite::commit", skip_all)]
748    pub fn commit(mut self) -> Result<(), OperationError> {
749        debug_assert!(self.conn.is_some());
750
751        let mut dropping = None;
752        std::mem::swap(&mut dropping, &mut self.conn);
753
754        if let Some(conn) = dropping {
755            conn.execute("COMMIT TRANSACTION", [])
756                .map(|_| ())
757                .map_err(|e| {
758                    admin_error!(?e, "CRITICAL: failed to commit sqlite txn");
759                    OperationError::BackendEngine
760                })?;
761
762            self.pool
763                .lock()
764                .map_err(|err| {
765                    error!(?err, "Unable to return connection to pool");
766                    OperationError::BackendEngine
767                })?
768                .push_back(conn);
769
770            Ok(())
771        } else {
772            Err(OperationError::TransactionAlreadyCommitted)
773        }
774    }
775
776    pub fn get_id2entry_max_id(&self) -> Result<u64, OperationError> {
777        let mut stmt = self
778            .get_conn()?
779            .prepare(&format!(
780                "SELECT MAX(id) as id_max FROM {}.id2entry",
781                self.get_db_name(),
782            ))
783            .map_err(sqlite_error)?;
784        // This exists checks for if any rows WERE returned
785        // that way we know to shortcut or not.
786        let v = stmt.exists([]).map_err(sqlite_error)?;
787
788        if v {
789            // We have some rows, let get max!
790            let i: Option<i64> = stmt.query_row([], |row| row.get(0)).map_err(sqlite_error)?;
791            i.unwrap_or(0)
792                .try_into()
793                .map_err(|_| OperationError::InvalidEntryId)
794        } else {
795            // No rows are present, return a 0.
796            Ok(0)
797        }
798    }
799
800    pub fn write_identry(
801        &self,
802        entry: &Entry<EntrySealed, EntryCommitted>,
803    ) -> Result<(), OperationError> {
804        let dbe = entry.to_dbentry();
805        let data = serde_json::to_vec(&dbe).map_err(serde_json_error)?;
806
807        let raw_entries = std::iter::once(IdRawEntry {
808            id: entry.get_id(),
809            data,
810        });
811
812        self.write_identries_raw(raw_entries)
813    }
814
815    pub fn write_identries_raw<I>(&self, mut entries: I) -> Result<(), OperationError>
816    where
817        I: Iterator<Item = IdRawEntry>,
818    {
819        let mut stmt = self
820            .get_conn()?
821            .prepare(&format!(
822                "INSERT OR REPLACE INTO {}.id2entry (id, data) VALUES(:id, :data)",
823                self.get_db_name()
824            ))
825            .map_err(sqlite_error)?;
826
827        entries.try_for_each(|e| {
828            IdSqliteEntry::try_from(e).and_then(|ser_ent| {
829                stmt.execute(named_params! {
830                    ":id": &ser_ent.id,
831                    ":data": &ser_ent.data.as_slice()
832                })
833                // remove the updated usize
834                .map(|_| ())
835                .map_err(sqlite_error)
836            })
837        })
838    }
839
840    pub fn delete_identry(&self, id: u64) -> Result<(), OperationError> {
841        let mut stmt = self
842            .get_conn()?
843            .prepare(&format!(
844                "DELETE FROM {}.id2entry WHERE id = :id",
845                self.get_db_name()
846            ))
847            .map_err(sqlite_error)?;
848
849        let iid: i64 = id
850            .try_into()
851            .map_err(|_| OperationError::InvalidEntryId)
852            .and_then(|i| {
853                if i > 0 {
854                    Ok(i)
855                } else {
856                    Err(OperationError::InvalidEntryId)
857                }
858            })?;
859
860        debug_assert!(iid > 0);
861
862        stmt.execute([&iid]).map(|_| ()).map_err(sqlite_error)
863    }
864
865    pub fn write_idl(
866        &self,
867        attr: &Attribute,
868        itype: IndexType,
869        idx_key: &str,
870        idl: &IDLBitRange,
871    ) -> Result<(), OperationError> {
872        if idl.is_empty() {
873            // delete it
874            // Delete this idx_key from the table.
875            let query = format!(
876                "DELETE FROM {}.idx_{}_{} WHERE key = :key",
877                self.get_db_name(),
878                itype.as_idx_str(),
879                attr.as_str()
880            );
881
882            self.get_conn()?
883                .prepare(query.as_str())
884                .and_then(|mut stmt| stmt.execute(&[(":key", &idx_key)]))
885                .map_err(sqlite_error)
886        } else {
887            // Serialise the IdList to Vec<u8>
888            let idl_raw = serde_json::to_vec(idl).map_err(serde_json_error)?;
889
890            // update or create it.
891            let query = format!(
892                "INSERT OR REPLACE INTO {}.idx_{}_{} (key, idl) VALUES(:key, :idl)",
893                self.get_db_name(),
894                itype.as_idx_str(),
895                attr.as_str()
896            );
897
898            self.get_conn()?
899                .prepare(query.as_str())
900                .and_then(|mut stmt| {
901                    stmt.execute(named_params! {
902                        ":key": &idx_key,
903                        ":idl": &idl_raw
904                    })
905                })
906                .map_err(sqlite_error)
907        }
908        // Get rid of the sqlite rows usize
909        .map(|_| ())
910    }
911
912    pub fn create_name2uuid(&self) -> Result<(), OperationError> {
913        self.get_conn()?
914            .execute(
915                &format!("CREATE TABLE IF NOT EXISTS {}.idx_name2uuid (name TEXT PRIMARY KEY, uuid TEXT)", self.get_db_name()),
916                [],
917            )
918            .map(|_| ())
919            .map_err(sqlite_error)
920    }
921
922    pub fn write_name2uuid_add(&self, name: &str, uuid: Uuid) -> Result<(), OperationError> {
923        let uuids = uuid.as_hyphenated().to_string();
924
925        self.get_conn()?
926            .prepare(&format!(
927                "INSERT OR REPLACE INTO {}.idx_name2uuid (name, uuid) VALUES(:name, :uuid)",
928                self.get_db_name()
929            ))
930            .and_then(|mut stmt| {
931                stmt.execute(named_params! {
932                    ":name": &name,
933                    ":uuid": uuids.as_str()
934                })
935            })
936            .map(|_| ())
937            .map_err(sqlite_error)
938    }
939
940    pub fn write_name2uuid_rem(&self, name: &str) -> Result<(), OperationError> {
941        self.get_conn()?
942            .prepare(&format!(
943                "DELETE FROM {}.idx_name2uuid WHERE name = :name",
944                self.get_db_name()
945            ))
946            .and_then(|mut stmt| stmt.execute(&[(":name", &name)]))
947            .map(|_| ())
948            .map_err(sqlite_error)
949    }
950
951    pub fn create_externalid2uuid(&self) -> Result<(), OperationError> {
952        self.get_conn()?
953            .execute(
954                &format!("CREATE TABLE IF NOT EXISTS {}.idx_externalid2uuid (eid TEXT PRIMARY KEY, uuid TEXT)", self.get_db_name()),
955                [],
956            )
957            .map(|_| ())
958            .map_err(sqlite_error)
959    }
960
961    pub fn write_externalid2uuid_add(&self, name: &str, uuid: Uuid) -> Result<(), OperationError> {
962        let uuids = uuid.as_hyphenated().to_string();
963
964        self.get_conn()?
965            .prepare(&format!(
966                "INSERT OR REPLACE INTO {}.idx_externalid2uuid (eid, uuid) VALUES(:eid, :uuid)",
967                self.get_db_name()
968            ))
969            .and_then(|mut stmt| {
970                stmt.execute(named_params! {
971                    ":eid": &name,
972                    ":uuid": uuids.as_str()
973                })
974            })
975            .map(|_| ())
976            .map_err(sqlite_error)
977    }
978
979    pub fn write_externalid2uuid_rem(&self, name: &str) -> Result<(), OperationError> {
980        self.get_conn()?
981            .prepare(&format!(
982                "DELETE FROM {}.idx_externalid2uuid WHERE eid = :eid",
983                self.get_db_name()
984            ))
985            .and_then(|mut stmt| stmt.execute(&[(":eid", &name)]))
986            .map(|_| ())
987            .map_err(sqlite_error)
988    }
989
990    pub fn create_uuid2spn(&self) -> Result<(), OperationError> {
991        self.get_conn()?
992            .execute(
993                &format!(
994                    "CREATE TABLE IF NOT EXISTS {}.idx_uuid2spn (uuid TEXT PRIMARY KEY, spn BLOB)",
995                    self.get_db_name()
996                ),
997                [],
998            )
999            .map(|_| ())
1000            .map_err(sqlite_error)
1001    }
1002
1003    pub fn write_uuid2spn(&self, uuid: Uuid, k: Option<&Value>) -> Result<(), OperationError> {
1004        let uuids = uuid.as_hyphenated().to_string();
1005        match k {
1006            Some(k) => {
1007                let dbv1: DbIdentSpn = k.to_db_ident_spn();
1008                let data = serde_json::to_vec(&dbv1).map_err(serde_json_error)?;
1009                self.get_conn()?
1010                    .prepare(&format!(
1011                        "INSERT OR REPLACE INTO {}.idx_uuid2spn (uuid, spn) VALUES(:uuid, :spn)",
1012                        self.get_db_name()
1013                    ))
1014                    .and_then(|mut stmt| {
1015                        stmt.execute(named_params! {
1016                            ":uuid": &uuids,
1017                            ":spn": &data,
1018                        })
1019                    })
1020                    .map(|_| ())
1021                    .map_err(sqlite_error)
1022            }
1023            None => self
1024                .get_conn()?
1025                .prepare(&format!(
1026                    "DELETE FROM {}.idx_uuid2spn WHERE uuid = :uuid",
1027                    self.get_db_name()
1028                ))
1029                .and_then(|mut stmt| stmt.execute(&[(":uuid", &uuids)]))
1030                .map(|_| ())
1031                .map_err(sqlite_error),
1032        }
1033    }
1034
1035    pub fn create_uuid2rdn(&self) -> Result<(), OperationError> {
1036        self.get_conn()?
1037            .execute(
1038                &format!(
1039                    "CREATE TABLE IF NOT EXISTS {}.idx_uuid2rdn (uuid TEXT PRIMARY KEY, rdn TEXT)",
1040                    self.get_db_name()
1041                ),
1042                [],
1043            )
1044            .map(|_| ())
1045            .map_err(sqlite_error)
1046    }
1047
1048    pub fn write_uuid2rdn(&self, uuid: Uuid, k: Option<&String>) -> Result<(), OperationError> {
1049        let uuids = uuid.as_hyphenated().to_string();
1050        match k {
1051            Some(k) => self
1052                .get_conn()?
1053                .prepare(&format!(
1054                    "INSERT OR REPLACE INTO {}.idx_uuid2rdn (uuid, rdn) VALUES(:uuid, :rdn)",
1055                    self.get_db_name()
1056                ))
1057                .and_then(|mut stmt| stmt.execute(&[(":uuid", &uuids), (":rdn", k)]))
1058                .map(|_| ())
1059                .map_err(sqlite_error),
1060            None => self
1061                .get_conn()?
1062                .prepare(&format!(
1063                    "DELETE FROM {}.idx_uuid2rdn WHERE uuid = :uuid",
1064                    self.get_db_name()
1065                ))
1066                .and_then(|mut stmt| stmt.execute(&[(":uuid", &uuids)]))
1067                .map(|_| ())
1068                .map_err(sqlite_error),
1069        }
1070    }
1071
1072    pub(crate) fn create_keyhandles(&self) -> Result<(), OperationError> {
1073        self.get_conn()?
1074            .execute(
1075                &format!(
1076                    "CREATE TABLE IF NOT EXISTS {}.keyhandles (id TEXT PRIMARY KEY, data TEXT)",
1077                    self.get_db_name()
1078                ),
1079                [],
1080            )
1081            .map(|_| ())
1082            .map_err(sqlite_error)
1083    }
1084
1085    pub(crate) fn create_db_ruv(&self) -> Result<(), OperationError> {
1086        self.get_conn()?
1087            .execute(
1088                &format!(
1089                    "CREATE TABLE IF NOT EXISTS {}.ruv (cid TEXT PRIMARY KEY)",
1090                    self.get_db_name()
1091                ),
1092                [],
1093            )
1094            .map(|_| ())
1095            .map_err(sqlite_error)
1096    }
1097
1098    pub fn get_db_ruv(&self) -> Result<BTreeSet<Cid>, OperationError> {
1099        let mut stmt = self
1100            .get_conn()?
1101            .prepare(&format!("SELECT cid FROM {}.ruv", self.get_db_name()))
1102            .map_err(sqlite_error)?;
1103
1104        let kh_iter = stmt.query_map([], |row| row.get(0)).map_err(sqlite_error)?;
1105
1106        kh_iter
1107            .map(|v| {
1108                let ser_cid: String = v.map_err(sqlite_error)?;
1109                let db_cid: DbCidV1 = serde_json::from_str(&ser_cid).map_err(serde_json_error)?;
1110                Ok(db_cid.into())
1111            })
1112            .collect()
1113    }
1114
1115    pub fn write_db_ruv<I, J>(&mut self, mut added: I, mut removed: J) -> Result<(), OperationError>
1116    where
1117        I: Iterator<Item = Cid>,
1118        J: Iterator<Item = Cid>,
1119    {
1120        let mut stmt = self
1121            .get_conn()?
1122            .prepare(&format!(
1123                "DELETE FROM {}.ruv WHERE cid = :cid",
1124                self.get_db_name()
1125            ))
1126            .map_err(sqlite_error)?;
1127
1128        removed.try_for_each(|cid| {
1129            let db_cid: DbCidV1 = cid.into();
1130
1131            serde_json::to_string(&db_cid)
1132                .map_err(serde_json_error)
1133                .and_then(|ser_cid| {
1134                    stmt.execute(named_params! {
1135                        ":cid": &ser_cid
1136                    })
1137                    // remove the updated usize
1138                    .map(|_| ())
1139                    .map_err(sqlite_error)
1140                })
1141        })?;
1142
1143        let mut stmt = self
1144            .get_conn()?
1145            .prepare(&format!(
1146                "INSERT OR REPLACE INTO {}.ruv (cid) VALUES(:cid)",
1147                self.get_db_name()
1148            ))
1149            .map_err(sqlite_error)?;
1150
1151        added.try_for_each(|cid| {
1152            let db_cid: DbCidV1 = cid.into();
1153
1154            serde_json::to_string(&db_cid)
1155                .map_err(serde_json_error)
1156                .and_then(|ser_cid| {
1157                    stmt.execute(named_params! {
1158                        ":cid": &ser_cid
1159                    })
1160                    // remove the updated usize
1161                    .map(|_| ())
1162                    .map_err(sqlite_error)
1163                })
1164        })
1165    }
1166
1167    #[instrument(level = "debug", skip(self))]
1168    pub fn create_idx(&self, attr: &Attribute, itype: IndexType) -> Result<(), OperationError> {
1169        // Is there a better way than formatting this? I can't seem
1170        // to template into the str.
1171        //
1172        // We could also re-design our idl storage.
1173        let idx_stmt = format!(
1174            "CREATE TABLE IF NOT EXISTS {}.idx_{}_{} (key TEXT PRIMARY KEY, idl BLOB)",
1175            self.get_db_name(),
1176            itype.as_idx_str(),
1177            attr.as_str(),
1178        );
1179
1180        self.get_conn()?
1181            .execute(idx_stmt.as_str(), [])
1182            .map(|_| ())
1183            .map_err(sqlite_error)
1184    }
1185
1186    /// ⚠️  - This function will destroy all indexes in the database.
1187    ///
1188    /// It should only be called internally by the backend in limited and
1189    /// specific situations.
1190    #[instrument(level = "trace", skip_all)]
1191    pub fn danger_purge_idxs(&self) -> Result<(), OperationError> {
1192        let idx_table_list = self.list_idxs()?;
1193        trace!(tables = ?idx_table_list);
1194
1195        idx_table_list.iter().try_for_each(|idx_table| {
1196            debug!(table = ?idx_table, "removing idx_table");
1197            self.get_conn()?
1198                .prepare(format!("DROP TABLE {}.{}", self.get_db_name(), idx_table).as_str())
1199                .and_then(|mut stmt| stmt.execute([]).map(|_| ()))
1200                .map_err(sqlite_error)
1201        })
1202    }
1203
1204    pub fn store_idx_slope_analysis(
1205        &self,
1206        slopes: &HashMap<IdxKey, IdxSlope>,
1207    ) -> Result<(), OperationError> {
1208        self.get_conn()?
1209            .execute(
1210                &format!(
1211                    "CREATE TABLE IF NOT EXISTS {}.idxslope_analysis (
1212                    id TEXT PRIMARY KEY,
1213                    slope INTEGER
1214                )",
1215                    self.get_db_name()
1216                ),
1217                [],
1218            )
1219            .map(|_| ())
1220            .map_err(sqlite_error)?;
1221
1222        // Remove any data if it exists.
1223        self.get_conn()?
1224            .execute(
1225                &format!("DELETE FROM {}.idxslope_analysis", self.get_db_name()),
1226                [],
1227            )
1228            .map(|_| ())
1229            .map_err(sqlite_error)?;
1230
1231        slopes.iter().try_for_each(|(k, v)| {
1232            let key = format!("idx_{}_{}", k.itype.as_idx_str(), k.attr);
1233            self.get_conn()?
1234                .execute(
1235                    &format!("INSERT OR REPLACE INTO {}.idxslope_analysis (id, slope) VALUES(:id, :slope)", self.get_db_name()),
1236                    named_params! {
1237                        ":id": &key,
1238                        ":slope": &v,
1239                    },
1240                )
1241                .map(|_| ())
1242                .map_err(|e| {
1243                    admin_error!(immediate = true, ?e, "CRITICAL: rusqlite error in store_idx_slope_analysis");
1244                    eprintln!("CRITICAL: rusqlite error in store_idx_slope_analysis: {e:?}");
1245                    OperationError::SqliteError
1246                })
1247        })
1248    }
1249
1250    pub fn is_idx_slopeyness_generated(&self) -> Result<bool, OperationError> {
1251        self.exists_table("idxslope_analysis")
1252    }
1253
1254    pub fn get_idx_slope(&self, ikey: &IdxKey) -> Result<Option<IdxSlope>, OperationError> {
1255        let analysis_exists = self.exists_table("idxslope_analysis")?;
1256        if !analysis_exists {
1257            return Ok(None);
1258        }
1259
1260        // Then we have the table and it should have things in it, lets put
1261        // it all together.
1262        let key = format!("idx_{}_{}", ikey.itype.as_idx_str(), ikey.attr);
1263
1264        let mut stmt = self
1265            .get_conn()?
1266            .prepare(&format!(
1267                "SELECT slope FROM {}.idxslope_analysis WHERE id = :id",
1268                self.get_db_name()
1269            ))
1270            .map_err(sqlite_error)?;
1271
1272        let slope: Option<IdxSlope> = stmt
1273            .query_row(&[(":id", &key)], |row| row.get(0))
1274            // We don't mind if it doesn't exist
1275            .optional()
1276            .map_err(sqlite_error)?;
1277        trace!(name = %key, ?slope, "Got slope for index");
1278
1279        Ok(slope)
1280    }
1281
1282    pub fn quarantine_entry(&self, id: u64) -> Result<(), OperationError> {
1283        let iid = i64::try_from(id).map_err(|_| OperationError::InvalidEntryId)?;
1284
1285        let id_sqlite_entry = self
1286            .get_conn()?
1287            .query_row(
1288                &format!(
1289                    "DELETE FROM {}.id2entry WHERE id = :idl RETURNING id, data",
1290                    self.get_db_name()
1291                ),
1292                [&iid],
1293                |row| {
1294                    Ok(IdSqliteEntry {
1295                        id: row.get(0)?,
1296                        data: row.get(1)?,
1297                    })
1298                },
1299            )
1300            .map_err(sqlite_error)?;
1301
1302        trace!(?id_sqlite_entry);
1303
1304        self.get_conn()?
1305            .execute(
1306                &format!(
1307                    "INSERT OR REPLACE INTO {}.id2entry_quarantine VALUES(:id, :data)",
1308                    self.get_db_name()
1309                ),
1310                named_params! {
1311                    ":id": &id_sqlite_entry.id,
1312                    ":data": &id_sqlite_entry.data.as_slice()
1313                },
1314            )
1315            .map_err(sqlite_error)
1316            .map(|_| ())
1317    }
1318
1319    pub fn restore_quarantined(&self, id: u64) -> Result<(), OperationError> {
1320        let iid = i64::try_from(id).map_err(|_| OperationError::InvalidEntryId)?;
1321
1322        let id_sqlite_entry = self
1323            .get_conn()?
1324            .query_row(
1325                &format!(
1326                    "DELETE FROM {}.id2entry_quarantine WHERE id = :idl RETURNING id, data",
1327                    self.get_db_name()
1328                ),
1329                [&iid],
1330                |row| {
1331                    Ok(IdSqliteEntry {
1332                        id: row.get(0)?,
1333                        data: row.get(1)?,
1334                    })
1335                },
1336            )
1337            .map_err(sqlite_error)?;
1338
1339        trace!(?id_sqlite_entry);
1340
1341        self.get_conn()?
1342            .execute(
1343                &format!(
1344                    "INSERT INTO {}.id2entry VALUES(:id, :data)",
1345                    self.get_db_name()
1346                ),
1347                named_params! {
1348                    ":id": &id_sqlite_entry.id,
1349                    ":data": &id_sqlite_entry.data.as_slice()
1350                },
1351            )
1352            .map_err(sqlite_error)
1353            .map(|_| ())
1354    }
1355
1356    /// ⚠️  - This function will destroy all entries in the database.
1357    ///
1358    /// It should only be called internally by the backend in limited and
1359    /// specific situations.
1360    #[instrument(level = "trace", skip_all)]
1361    pub fn danger_purge_id2entry(&self) -> Result<(), OperationError> {
1362        self.get_conn()?
1363            .execute(&format!("DELETE FROM {}.id2entry", self.get_db_name()), [])
1364            .map(|_| ())
1365            .map_err(sqlite_error)
1366    }
1367
1368    pub fn write_db_s_uuid(&self, nsid: Uuid) -> Result<(), OperationError> {
1369        let data = serde_json::to_vec(&nsid).map_err(|e| {
1370            admin_error!(immediate = true, ?e, "CRITICAL: Serde JSON Error");
1371            eprintln!("CRITICAL: Serde JSON Error -> {e:?}");
1372            OperationError::SerdeJsonError
1373        })?;
1374
1375        self.get_conn()?
1376            .execute(
1377                &format!(
1378                    "INSERT OR REPLACE INTO {}.db_sid (id, data) VALUES(:id, :sid)",
1379                    self.get_db_name()
1380                ),
1381                named_params! {
1382                    ":id": &2,
1383                    ":sid": &data,
1384                },
1385            )
1386            .map(|_| ())
1387            .map_err(|e| {
1388                admin_error!(
1389                    immediate = true,
1390                    ?e,
1391                    "CRITICAL: rusqlite error in write_db_s_uuid"
1392                );
1393                eprintln!("CRITICAL: rusqlite error in write_db_s_uuid {e:?}");
1394                OperationError::SqliteError
1395            })
1396    }
1397
1398    pub fn write_db_d_uuid(&self, nsid: Uuid) -> Result<(), OperationError> {
1399        let data = serde_json::to_vec(&nsid).map_err(|e| {
1400            admin_error!(
1401                immediate = true,
1402                ?e,
1403                "CRITICAL: Serde JSON Error in write_db_d_uuid"
1404            );
1405            eprintln!("CRITICAL: Serde JSON Error  in write_db_d_uuid-> {e:?}");
1406            OperationError::SerdeJsonError
1407        })?;
1408
1409        self.get_conn()?
1410            .execute(
1411                &format!(
1412                    "INSERT OR REPLACE INTO {}.db_did (id, data) VALUES(:id, :did)",
1413                    self.get_db_name()
1414                ),
1415                named_params! {
1416                    ":id": &2,
1417                    ":did": &data,
1418                },
1419            )
1420            .map(|_| ())
1421            .map_err(|e| {
1422                admin_error!(
1423                    immediate = true,
1424                    ?e,
1425                    "CRITICAL: rusqlite error in write_db_d_uuid"
1426                );
1427                eprintln!("CRITICAL: rusqlite error in write_db_d_uuid {e:?}");
1428                OperationError::SqliteError
1429            })
1430    }
1431
1432    pub fn set_db_ts_max(&self, ts: Duration) -> Result<(), OperationError> {
1433        let data = serde_json::to_vec(&ts).map_err(|e| {
1434            admin_error!(
1435                immediate = true,
1436                ?e,
1437                "CRITICAL: Serde JSON Error in set_db_ts_max"
1438            );
1439            eprintln!("CRITICAL: Serde JSON Error in set_db_ts_max -> {e:?}");
1440            OperationError::SerdeJsonError
1441        })?;
1442
1443        self.get_conn()?
1444            .execute(
1445                &format!(
1446                    "INSERT OR REPLACE INTO {}.db_op_ts (id, data) VALUES(:id, :did)",
1447                    self.get_db_name()
1448                ),
1449                named_params! {
1450                    ":id": &1,
1451                    ":did": &data,
1452                },
1453            )
1454            .map(|_| ())
1455            .map_err(|e| {
1456                admin_error!(
1457                    immediate = true,
1458                    ?e,
1459                    "CRITICAL: rusqlite error in set_db_ts_max"
1460                );
1461                eprintln!("CRITICAL: rusqlite error in set_db_ts_max {e:?}");
1462                OperationError::SqliteError
1463            })
1464    }
1465
1466    // ===== inner helpers =====
1467    // Some of these are not self due to use in new()
1468    fn get_db_version_key(&self, key: &str) -> Result<i64, OperationError> {
1469        self.get_conn().map(|conn| {
1470            conn.query_row(
1471                &format!(
1472                    "SELECT version FROM {}.db_version WHERE id = :id",
1473                    self.get_db_name()
1474                ),
1475                &[(":id", &key)],
1476                |row| row.get(0),
1477            )
1478            .unwrap_or({
1479                // The value is missing, default to 0.
1480                0
1481            })
1482        })
1483    }
1484
1485    fn set_db_version_key(&self, key: &str, v: i64) -> Result<(), OperationError> {
1486        self.get_conn()?
1487            .execute(
1488                &format!(
1489                    "INSERT OR REPLACE INTO {}.db_version (id, version) VALUES(:id, :dbv_id2entry)",
1490                    self.get_db_name()
1491                ),
1492                named_params! {
1493                    ":id": &key,
1494                    ":dbv_id2entry": v,
1495                },
1496            )
1497            .map(|_| ())
1498            .map_err(|e| {
1499                admin_error!(
1500                    immediate = true,
1501                    ?e,
1502                    "CRITICAL: rusqlite error in set_db_version_key"
1503                );
1504                eprintln!("CRITICAL: rusqlite error in set_db_version_key {e:?}");
1505                OperationError::SqliteError
1506            })
1507    }
1508
1509    pub(crate) fn get_db_index_version(&self) -> Result<i64, OperationError> {
1510        self.get_db_version_key(DBV_INDEXV)
1511    }
1512
1513    pub(crate) fn set_db_index_version(&self, v: i64) -> Result<(), OperationError> {
1514        self.set_db_version_key(DBV_INDEXV, v)
1515    }
1516
1517    pub fn setup(&self) -> Result<(), OperationError> {
1518        // If the db_name is NOT main, we MAY need to create it as we are in
1519        // a test!
1520        trace!(db_name = %self.get_db_name(), "setup");
1521        if self.get_db_name() != "main" {
1522            warn!("Using non-default db-name - this database content WILL be lost!");
1523            // we need to attach the DB!
1524            self.get_conn()?
1525                .execute(&format!("ATTACH DATABASE '' AS {}", self.get_db_name()), [])
1526                .map_err(sqlite_error)?;
1527        };
1528
1529        // This stores versions of components. For example:
1530        // ----------------------
1531        // | id       | version |
1532        // | id2entry | 1       |
1533        // | index    | 1       |
1534        // | schema   | 1       |
1535        // ----------------------
1536        //
1537        // This allows each component to initialise on its own, be
1538        // rolled back individually, by upgraded in isolation, and more
1539        //
1540        // NEVER CHANGE THIS DEFINITION.
1541        self.get_conn()?
1542            .execute(
1543                &format!(
1544                    "CREATE TABLE IF NOT EXISTS {}.db_version (
1545                    id TEXT PRIMARY KEY,
1546                    version INTEGER
1547                )
1548                ",
1549                    self.get_db_name()
1550                ),
1551                [],
1552            )
1553            .map_err(sqlite_error)?;
1554
1555        // If the table is empty, populate the versions as 0.
1556        let mut dbv_id2entry = self.get_db_version_key(DBV_ID2ENTRY)?;
1557
1558        trace!(%dbv_id2entry);
1559
1560        if dbv_id2entry != 0 && dbv_id2entry < 9 {
1561            error!(
1562                ?dbv_id2entry,
1563                "Unable to perform database migrations. This instance is too old."
1564            );
1565            return Err(OperationError::DB0004DatabaseTooOld);
1566        }
1567
1568        // Check db_version here.
1569        //   * if 0 -> create v1.
1570        if dbv_id2entry == 0 {
1571            self.get_conn()?
1572                .execute(
1573                    &format!(
1574                        "CREATE TABLE IF NOT EXISTS {}.id2entry (
1575                        id INTEGER PRIMARY KEY ASC,
1576                        data BLOB NOT NULL
1577                    )
1578                    ",
1579                        self.get_db_name()
1580                    ),
1581                    [],
1582                )
1583                .map_err(sqlite_error)?;
1584
1585            self.get_conn()?
1586                .execute(
1587                    &format!(
1588                        "CREATE TABLE IF NOT EXISTS {}.db_sid (
1589                    id INTEGER PRIMARY KEY ASC,
1590                    data BLOB NOT NULL
1591                    )
1592                    ",
1593                        self.get_db_name()
1594                    ),
1595                    [],
1596                )
1597                .map_err(sqlite_error)?;
1598
1599            dbv_id2entry = 1;
1600
1601            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (id2entry, db_sid)");
1602        }
1603        //   * if v1 -> add the domain uuid table
1604        if dbv_id2entry == 1 {
1605            self.get_conn()?
1606                .execute(
1607                    &format!(
1608                        "CREATE TABLE IF NOT EXISTS {}.db_did (
1609                        id INTEGER PRIMARY KEY ASC,
1610                        data BLOB NOT NULL
1611                    )
1612                    ",
1613                        self.get_db_name()
1614                    ),
1615                    [],
1616                )
1617                .map_err(sqlite_error)?;
1618
1619            dbv_id2entry = 2;
1620            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (db_did)");
1621        }
1622        //   * if v2 -> add the op max ts table.
1623        if dbv_id2entry == 2 {
1624            self.get_conn()?
1625                .execute(
1626                    &format!(
1627                        "CREATE TABLE IF NOT EXISTS {}.db_op_ts (
1628                        id INTEGER PRIMARY KEY ASC,
1629                        data BLOB NOT NULL
1630                    )
1631                    ",
1632                        self.get_db_name()
1633                    ),
1634                    [],
1635                )
1636                .map_err(sqlite_error)?;
1637            dbv_id2entry = 3;
1638            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (db_op_ts)");
1639        }
1640        //   * if v3 -> create name2uuid, uuid2spn, uuid2rdn.
1641        if dbv_id2entry == 3 {
1642            self.create_name2uuid()
1643                .and_then(|_| self.create_uuid2spn())
1644                .and_then(|_| self.create_uuid2rdn())?;
1645            dbv_id2entry = 4;
1646            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (name2uuid, uuid2spn, uuid2rdn)");
1647        }
1648        //   * if v4 -> migrate v1 to v2 entries.
1649        if dbv_id2entry == 4 {
1650            dbv_id2entry = 5;
1651            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (dbentryv1 -> dbentryv2)");
1652        }
1653        //   * if v5 -> create externalid2uuid
1654        if dbv_id2entry == 5 {
1655            self.create_externalid2uuid()?;
1656            dbv_id2entry = 6;
1657            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (externalid2uuid)");
1658        }
1659        //   * if v6 -> create id2entry_quarantine.
1660        if dbv_id2entry == 6 {
1661            self.get_conn()?
1662                .execute(
1663                    &format!(
1664                        "CREATE TABLE IF NOT EXISTS {}.id2entry_quarantine (
1665                        id INTEGER PRIMARY KEY ASC,
1666                        data BLOB NOT NULL
1667                    )
1668                    ",
1669                        self.get_db_name()
1670                    ),
1671                    [],
1672                )
1673                .map_err(sqlite_error)?;
1674
1675            dbv_id2entry = 7;
1676            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (quarantine)");
1677        }
1678        //   * if v7 -> create keyhandles storage.
1679        if dbv_id2entry == 7 {
1680            self.create_keyhandles()?;
1681            dbv_id2entry = 8;
1682            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (keyhandles)");
1683        }
1684        //   * if v8 -> migrate all entries to have a change state
1685        if dbv_id2entry == 8 {
1686            dbv_id2entry = 9;
1687            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (dbentryv2 -> dbentryv3)");
1688        }
1689        //   * if v9 -> complete
1690        if dbv_id2entry == 9 {
1691            self.create_db_ruv()?;
1692            dbv_id2entry = 10;
1693            info!(entry = %dbv_id2entry, "dbv_id2entry migrated (db_ruv)");
1694        }
1695        //   * if v10 -> complete
1696
1697        self.set_db_version_key(DBV_ID2ENTRY, dbv_id2entry)?;
1698
1699        // NOTE: Indexing is configured in a different step!
1700        // Indexing uses a db version flag to represent the version
1701        // of the indexes representation on disk in case we change
1702        // it.
1703        Ok(())
1704    }
1705}
1706
1707impl IdlSqlite {
1708    pub fn new(cfg: &BackendConfig, vacuum: bool) -> Result<Self, OperationError> {
1709        if cfg.path.as_os_str().is_empty() {
1710            debug_assert_eq!(cfg.pool_size, 1);
1711        }
1712        // If provided, set the page size to match the tuning we want. By default we use 4096. The VACUUM
1713        // immediately after is so that on db create the page size takes effect.
1714        //
1715        // Enable WAL mode, which is just faster and better for our needs.
1716        let mut flags = OpenFlags::default();
1717        // Open with multi thread flags and locking options.
1718
1719        if cfg!(test) {
1720            flags.insert(OpenFlags::SQLITE_OPEN_NO_MUTEX);
1721        };
1722
1723        let fs_page_size = cfg.fstype as u32;
1724        // sqlite caches based on pages, so we calc based on page size to achieve our target which
1725        // is 32MB (constrst the SQLite default of 2MB)
1726        let cache_pages = 33554432 / fs_page_size;
1727        let checkpoint_pages = cfg.fstype.checkpoint_pages();
1728
1729        // Initial setup routines.
1730        {
1731            let vconn = Connection::open_with_flags(&cfg.path, flags).map_err(sqlite_error)?;
1732
1733            vconn
1734                .execute_batch(
1735                    format!(
1736                        "PRAGMA page_size={fs_page_size};
1737                         PRAGMA cache_size={cache_pages};
1738                         PRAGMA journal_mode=WAL;
1739                         PRAGMA wal_autocheckpoint={checkpoint_pages};
1740                         PRAGMA wal_checkpoint(RESTART);"
1741                    )
1742                    .as_str(),
1743                )
1744                .map_err(sqlite_error)?;
1745        }
1746
1747        // We need to run vacuum in the setup else we hit sqlite lock conditions.
1748        if vacuum {
1749            admin_warn!(
1750                immediate = true,
1751                "NOTICE: A db vacuum has been requested. This may take a long time ..."
1752            );
1753            /*
1754            limmediate_warning!(
1755                audit,
1756                "NOTICE: A db vacuum has been requested. This may take a long time ...\n"
1757            );
1758            */
1759
1760            let vconn = Connection::open_with_flags(&cfg.path, flags).map_err(sqlite_error)?;
1761
1762            vconn
1763                .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1764                .map_err(|e| {
1765                    admin_error!(?e, "rusqlite wal_checkpoint error");
1766                    OperationError::SqliteError
1767                })?;
1768
1769            vconn
1770                .pragma_update(None, "journal_mode", "DELETE")
1771                .map_err(|e| {
1772                    admin_error!(?e, "rusqlite journal_mode update error");
1773                    OperationError::SqliteError
1774                })?;
1775
1776            vconn.close().map_err(|e| {
1777                admin_error!(?e, "rusqlite db close error");
1778                OperationError::SqliteError
1779            })?;
1780
1781            let vconn = Connection::open_with_flags(&cfg.path, flags).map_err(sqlite_error)?;
1782
1783            vconn
1784                .pragma_update(None, "page_size", cfg.fstype as u32)
1785                .map_err(|e| {
1786                    admin_error!(?e, "rusqlite page_size update error");
1787                    OperationError::SqliteError
1788                })?;
1789
1790            vconn.execute_batch("VACUUM").map_err(|e| {
1791                admin_error!(?e, "rusqlite vacuum error");
1792                OperationError::SqliteError
1793            })?;
1794
1795            vconn
1796                .pragma_update(None, "journal_mode", "WAL")
1797                .map_err(|e| {
1798                    admin_error!(?e, "rusqlite journal_mode update error");
1799                    OperationError::SqliteError
1800                })?;
1801
1802            vconn.close().map_err(|e| {
1803                admin_error!(?e, "rusqlite db close error");
1804                OperationError::SqliteError
1805            })?;
1806
1807            admin_warn!(immediate = true, "NOTICE: db vacuum complete");
1808            // limmediate_warning!(audit, "NOTICE: db vacuum complete\n");
1809        };
1810
1811        let pool = (0..cfg.pool_size)
1812            .map(|i| {
1813                trace!("Opening Connection {}", i);
1814                let conn =
1815                    Connection::open_with_flags(&cfg.path, flags).map_err(sqlite_error);
1816                match conn {
1817                    Ok(conn) => {
1818                        // We need to set the cachesize at this point as well.
1819                        conn
1820                            .execute_batch(
1821                                format!(
1822                                    "PRAGMA cache_size={cache_pages};"
1823                                )
1824                                .as_str(),
1825                            )
1826                            .map_err(sqlite_error)?;
1827
1828
1829                        // load the rusqlite vtab module to allow for virtual tables
1830                        rusqlite::vtab::array::load_module(&conn).map_err(|e| {
1831                            admin_error!(
1832                                "Failed to load rarray virtual module for sqlite, cannot start! {:?}", e
1833                            );
1834                            sqlite_error(e)
1835                        })?;
1836                        Ok(conn)
1837                    }
1838                    Err(err) => {
1839                        admin_error!(
1840                            "Failed to start database connection, cannot start! {:?}",
1841                            err
1842                        );
1843                        Err(err)
1844                    }
1845                }
1846            })
1847            .collect::<Result<VecDeque<Connection>, OperationError>>()
1848            .map_err(|e| {
1849                error!(err = ?e, "Failed to build connection pool");
1850                e
1851            })?;
1852
1853        let pool = Arc::new(Mutex::new(pool));
1854
1855        Ok(IdlSqlite {
1856            pool,
1857            db_name: cfg.db_name,
1858        })
1859    }
1860
1861    pub(crate) fn get_allids_count(&self) -> Result<u64, OperationError> {
1862        let guard = self.pool.lock().map_err(|err| {
1863            error!(?err, "Unable to access connection to pool");
1864            OperationError::BackendEngine
1865        })?;
1866        // Get not pop here
1867        let conn = guard.front().ok_or_else(|| {
1868            error!("Unable to retrieve connection from pool");
1869            OperationError::BackendEngine
1870        })?;
1871
1872        conn.query_row("select count(id) from id2entry", [], |row| row.get(0))
1873            .map_err(sqlite_error)
1874    }
1875
1876    pub fn read(&self) -> Result<IdlSqliteReadTransaction, OperationError> {
1877        // This can't fail because we should only get here if a pool conn is available.
1878        let mut guard = self.pool.lock().map_err(|e| {
1879            error!(err = ?e, "Unable to lock connection pool.");
1880            OperationError::BackendEngine
1881        })?;
1882
1883        let conn = guard.pop_front().ok_or_else(|| {
1884            error!("Unable to retrieve connection from pool.");
1885            OperationError::BackendEngine
1886        })?;
1887
1888        IdlSqliteReadTransaction::new(self.pool.clone(), conn, self.db_name)
1889    }
1890
1891    pub fn write(&self) -> Result<IdlSqliteWriteTransaction, OperationError> {
1892        // This can't fail because we should only get here if a pool conn is available.
1893        let mut guard = self.pool.lock().map_err(|e| {
1894            error!(err = ?e, "Unable to lock connection pool.");
1895            OperationError::BackendEngine
1896        })?;
1897
1898        let conn = guard.pop_front().ok_or_else(|| {
1899            error!("Unable to retrieve connection from pool.");
1900            OperationError::BackendEngine
1901        })?;
1902
1903        IdlSqliteWriteTransaction::new(self.pool.clone(), conn, self.db_name)
1904    }
1905}
1906
1907#[cfg(test)]
1908mod tests {
1909    use crate::be::idl_sqlite::{IdlSqlite, IdlSqliteTransaction};
1910    use crate::be::BackendConfig;
1911
1912    #[test]
1913    fn test_idl_sqlite_verify() {
1914        sketching::test_init();
1915        let cfg = BackendConfig::new_test("main");
1916        let be = IdlSqlite::new(&cfg, false).unwrap();
1917        let be_w = be.write().unwrap();
1918        let r = be_w.verify();
1919        assert!(r.is_empty());
1920    }
1921}