1use super::keystorage::{KeyHandle, KeyHandleId};
2use crate::be::dbentry::DbIdentSpn;
3use crate::be::dbvalue::DbCidV1;
4use crate::be::{BackendConfig, IdList, IdRawEntry, IdxKey, IdxSlope};
5use crate::entry::{Entry, EntryCommitted, EntrySealed};
6use crate::prelude::*;
7use crate::value::{IndexType, Value};
8use hashbrown::HashMap;
9use idlset::v2::IDLBitRange;
10use kanidm_proto::internal::{ConsistencyError, OperationError};
11use rusqlite::vtab::array::Array;
12use rusqlite::{Connection, OpenFlags, OptionalExtension};
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::convert::{TryFrom, TryInto};
15use std::sync::Arc;
16use std::sync::Mutex;
17use std::time::Duration;
18use uuid::Uuid;
19
20const DBV_ID2ENTRY: &str = "id2entry";
21const DBV_INDEXV: &str = "indexv";
22
23#[allow(clippy::needless_pass_by_value)] pub(super) fn sqlite_error(e: rusqlite::Error) -> OperationError {
25 admin_error!(?e, "SQLite Error");
26 OperationError::SqliteError
27}
28
29#[allow(clippy::needless_pass_by_value)] pub(super) fn serde_json_error(e: serde_json::Error) -> OperationError {
31 admin_error!(?e, "Serde JSON Error");
32 OperationError::SerdeJsonError
33}
34
35type ConnPool = Arc<Mutex<VecDeque<Connection>>>;
36
37#[derive(Debug)]
38pub struct IdSqliteEntry {
39 id: i64,
40 data: Vec<u8>,
41}
42
43#[derive(Debug)]
44struct KeyIdl {
45 key: String,
46 data: Vec<u8>,
47}
48
49impl TryFrom<IdSqliteEntry> for IdRawEntry {
50 type Error = OperationError;
51
52 fn try_from(value: IdSqliteEntry) -> Result<Self, Self::Error> {
53 if value.id <= 0 {
54 return Err(OperationError::InvalidEntryId);
55 }
56 Ok(IdRawEntry {
57 id: value
58 .id
59 .try_into()
60 .map_err(|_| OperationError::InvalidEntryId)?,
61 data: value.data,
62 })
63 }
64}
65
66impl TryFrom<IdRawEntry> for IdSqliteEntry {
67 type Error = OperationError;
68
69 fn try_from(value: IdRawEntry) -> Result<Self, Self::Error> {
70 if value.id == 0 {
71 return Err(OperationError::InvalidEntryId);
72 }
73 Ok(IdSqliteEntry {
74 id: value
75 .id
76 .try_into()
77 .map_err(|_| OperationError::InvalidEntryId)?,
78 data: value.data,
79 })
80 }
81}
82
83#[derive(Clone)]
84pub struct IdlSqlite {
85 pool: ConnPool,
86 db_name: &'static str,
87}
88
89pub struct IdlSqliteReadTransaction {
90 pool: ConnPool,
91 conn: Option<Connection>,
92 db_name: &'static str,
93}
94
95pub struct IdlSqliteWriteTransaction {
96 pool: ConnPool,
97 conn: Option<Connection>,
98 db_name: &'static str,
99}
100
101pub(crate) trait IdlSqliteTransaction {
102 fn get_db_name(&self) -> &str;
103
104 fn get_conn(&self) -> Result<&Connection, OperationError>;
105
106 fn get_identry(&self, idl: &IdList) -> Result<Vec<Arc<EntrySealedCommitted>>, OperationError> {
107 self.get_identry_raw(idl)?
108 .into_iter()
109 .map(|ide| ide.into_entry().map(Arc::new))
110 .collect()
111 }
112
113 fn get_identry_raw(&self, idl: &IdList) -> Result<Vec<IdRawEntry>, OperationError> {
114 match idl {
116 IdList::AllIds => {
117 let mut stmt = self
118 .get_conn()?
119 .prepare(&format!(
120 "SELECT id, data FROM {}.id2entry",
121 self.get_db_name()
122 ))
123 .map_err(sqlite_error)?;
124 let id2entry_iter = stmt
125 .query_map([], |row| {
126 Ok(IdSqliteEntry {
127 id: row.get(0)?,
128 data: row.get(1)?,
129 })
130 })
131 .map_err(sqlite_error)?;
132 id2entry_iter
133 .map(|v| {
134 v.map_err(sqlite_error).and_then(|ise| {
135 ise.try_into()
137 })
138 })
139 .collect()
140 }
141 IdList::Partial(idli) | IdList::PartialThreshold(idli) | IdList::Indexed(idli) => {
142 let mut stmt = self
143 .get_conn()?
144 .prepare(&format!(
145 "SELECT id, data FROM {}.id2entry
146 WHERE id IN rarray(:idli)",
147 self.get_db_name()
148 ))
149 .map_err(sqlite_error)?;
150
151 let mut id_list: Vec<i64> = vec![];
153 for id in idli {
154 id_list.push(i64::try_from(id).map_err(|_| OperationError::InvalidEntryId)?);
155 }
156 let id_list: Array = std::rc::Rc::new(
158 id_list
159 .into_iter()
160 .map(rusqlite::types::Value::from)
161 .collect::<Vec<rusqlite::types::Value>>(),
162 );
163
164 let mut results: Vec<IdRawEntry> = vec![];
165
166 let rows = stmt.query_map(named_params! {":idli": &id_list}, |row| {
167 Ok(IdSqliteEntry {
168 id: row.get(0)?,
169 data: row.get(1)?,
170 })
171 });
172 let rows = match rows {
173 Ok(rows) => rows,
174 Err(e) => {
175 error!("query failed in get_identry_raw: {:?}", e);
176 return Err(OperationError::SqliteError);
177 }
178 };
179
180 for row in rows {
181 match row {
182 Ok(ise) => {
183 results.push(ise.try_into()?);
185 }
186 Err(e) => {
188 admin_error!(?e, "SQLite Error in get_identry_raw");
189 return Err(OperationError::SqliteError);
190 }
191 }
192 }
193 Ok(results)
194 }
195 }
196 }
197
198 fn exists_table(&self, tname: &str) -> Result<bool, OperationError> {
199 let mut stmt = self
200 .get_conn()?
201 .prepare(&format!(
202 "SELECT rowid from {}.sqlite_master where type=\"table\" AND name = :tname LIMIT 1",
203 self.get_db_name()
204 ))
205 .map_err(sqlite_error)?;
206
207 let i: Option<i64> = stmt
208 .query_row(&[(":tname", tname)], |row| row.get(0))
209 .optional()
211 .map_err(sqlite_error)?;
212
213 match i {
214 None | Some(0) => Ok(false),
215 _ => Ok(true),
216 }
217 }
218
219 fn exists_idx(&self, attr: &Attribute, itype: IndexType) -> Result<bool, OperationError> {
220 let tname = format!("idx_{}_{}", itype.as_idx_str(), attr.as_str());
221 self.exists_table(&tname)
222 }
223
224 #[instrument(level = "trace", skip_all)]
225 fn get_idl(
226 &self,
227 attr: &Attribute,
228 itype: IndexType,
229 idx_key: &str,
230 ) -> Result<Option<IDLBitRange>, OperationError> {
231 if !(self.exists_idx(attr, itype)?) {
232 debug!(
233 "IdlSqliteTransaction: Index {:?} {:?} not found",
234 itype, attr
235 );
236 return Ok(None);
237 }
238 let query = format!(
241 "SELECT idl FROM {}.idx_{}_{} WHERE key = :idx_key",
242 self.get_db_name(),
243 itype.as_idx_str(),
244 attr.as_str()
245 );
246 let mut stmt = self.get_conn()?.prepare(&query).map_err(sqlite_error)?;
247 let idl_raw: Option<Vec<u8>> = stmt
248 .query_row(&[(":idx_key", &idx_key)], |row| row.get(0))
249 .optional()
251 .map_err(sqlite_error)?;
252
253 let idl = match idl_raw {
254 Some(d) => serde_json::from_slice(d.as_slice()).map_err(serde_json_error)?,
255 None => IDLBitRange::new(),
258 };
259 trace!(
260 miss_index = ?itype,
261 attr = ?attr,
262 idl = %idl,
263 );
264
265 Ok(Some(idl))
266 }
267
268 fn name2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
269 let mut stmt = self
271 .get_conn()?
272 .prepare(&format!(
273 "SELECT uuid FROM {}.idx_name2uuid WHERE name = :name",
274 self.get_db_name()
275 ))
276 .map_err(sqlite_error)?;
277 let uuid_raw: Option<String> = stmt
278 .query_row(&[(":name", &name)], |row| row.get(0))
279 .optional()
281 .map_err(sqlite_error)?;
282
283 let uuid = uuid_raw.as_ref().and_then(|u| Uuid::parse_str(u).ok());
284
285 Ok(uuid)
286 }
287
288 fn externalid2uuid(&mut self, name: &str) -> Result<Option<Uuid>, OperationError> {
289 let mut stmt = self
291 .get_conn()?
292 .prepare(&format!(
293 "SELECT uuid FROM {}.idx_externalid2uuid WHERE eid = :eid",
294 self.get_db_name()
295 ))
296 .map_err(sqlite_error)?;
297 let uuid_raw: Option<String> = stmt
298 .query_row(&[(":eid", &name)], |row| row.get(0))
299 .optional()
301 .map_err(sqlite_error)?;
302
303 let uuid = uuid_raw.as_ref().and_then(|u| Uuid::parse_str(u).ok());
304
305 Ok(uuid)
306 }
307
308 fn uuid2spn(&mut self, uuid: Uuid) -> Result<Option<Value>, OperationError> {
309 let uuids = uuid.as_hyphenated().to_string();
310 let mut stmt = self
312 .get_conn()?
313 .prepare(&format!(
314 "SELECT spn FROM {}.idx_uuid2spn WHERE uuid = :uuid",
315 self.get_db_name()
316 ))
317 .map_err(sqlite_error)?;
318 let spn_raw: Option<Vec<u8>> = stmt
319 .query_row(&[(":uuid", &uuids)], |row| row.get(0))
320 .optional()
322 .map_err(sqlite_error)?;
323
324 let spn: Option<Value> = match spn_raw {
325 Some(d) => {
326 let dbv: DbIdentSpn =
327 serde_json::from_slice(d.as_slice()).map_err(serde_json_error)?;
328
329 Some(Value::from(dbv))
330 }
331 None => None,
332 };
333
334 Ok(spn)
335 }
336
337 fn uuid2rdn(&mut self, uuid: Uuid) -> Result<Option<String>, OperationError> {
338 let uuids = uuid.as_hyphenated().to_string();
339 let mut stmt = self
341 .get_conn()?
342 .prepare(&format!(
343 "SELECT rdn FROM {}.idx_uuid2rdn WHERE uuid = :uuid",
344 self.get_db_name()
345 ))
346 .map_err(sqlite_error)?;
347 let rdn: Option<String> = stmt
348 .query_row(&[(":uuid", &uuids)], |row| row.get(0))
349 .optional()
351 .map_err(sqlite_error)?;
352
353 Ok(rdn)
354 }
355
356 fn get_db_s_uuid(&self) -> Result<Option<Uuid>, OperationError> {
357 let data: Option<Vec<u8>> = self
359 .get_conn()?
360 .query_row(
361 &format!(
362 "SELECT data FROM {}.db_sid WHERE id = 2",
363 self.get_db_name()
364 ),
365 [],
366 |row| row.get(0),
367 )
368 .optional()
369 .map(|e_opt| {
371 e_opt.map(|e| {
373 let y: Vec<u8> = e;
374 y
375 })
376 })
378 .map_err(|_| OperationError::SqliteError)?;
379
380 Ok(match data {
381 Some(d) => Some(
382 serde_json::from_slice(d.as_slice())
383 .or_else(|e| serde_cbor::from_slice(d.as_slice()).map_err(|_| e))
384 .map_err(|e| {
385 admin_error!(immediate = true, ?e, "CRITICAL: Serde CBOR Error");
386 eprintln!("CRITICAL: Serde CBOR Error -> {e:?}");
387 OperationError::SerdeCborError
388 })?,
389 ),
390 None => None,
391 })
392 }
393
394 fn get_db_d_uuid(&self) -> Result<Option<Uuid>, OperationError> {
395 let data: Option<Vec<u8>> = self
397 .get_conn()?
398 .query_row(
399 &format!(
400 "SELECT data FROM {}.db_did WHERE id = 2",
401 self.get_db_name()
402 ),
403 [],
404 |row| row.get(0),
405 )
406 .optional()
407 .map(|e_opt| {
409 e_opt.map(|e| {
411 let y: Vec<u8> = e;
412 y
413 })
414 })
416 .map_err(|_| OperationError::SqliteError)?;
417
418 Ok(match data {
419 Some(d) => Some(
420 serde_json::from_slice(d.as_slice())
421 .or_else(|e| serde_cbor::from_slice(d.as_slice()).map_err(|_| e))
422 .map_err(|e| {
423 admin_error!(immediate = true, ?e, "CRITICAL: Serde CBOR Error");
424 eprintln!("CRITICAL: Serde CBOR Error -> {e:?}");
425 OperationError::SerdeCborError
426 })?,
427 ),
428 None => None,
429 })
430 }
431
432 fn get_db_ts_max(&self) -> Result<Option<Duration>, OperationError> {
433 let data: Option<Vec<u8>> = self
435 .get_conn()?
436 .query_row(
437 &format!(
438 "SELECT data FROM {}.db_op_ts WHERE id = 1",
439 self.get_db_name()
440 ),
441 [],
442 |row| row.get(0),
443 )
444 .optional()
445 .map(|e_opt| {
446 e_opt.map(|e| {
448 let y: Vec<u8> = e;
449 y
450 })
451 })
453 .map_err(sqlite_error)?;
454
455 Ok(match data {
456 Some(d) => Some(
457 serde_json::from_slice(d.as_slice())
458 .or_else(|_| serde_cbor::from_slice(d.as_slice()))
459 .map_err(|e| {
460 admin_error!(immediate = true, ?e, "CRITICAL: Serde JSON Error");
461 eprintln!("CRITICAL: Serde JSON Error -> {e:?}");
462 OperationError::SerdeJsonError
463 })?,
464 ),
465 None => None,
466 })
467 }
468
469 fn get_key_handles(&mut self) -> Result<BTreeMap<KeyHandleId, KeyHandle>, OperationError> {
470 let mut stmt = self
471 .get_conn()?
472 .prepare(&format!(
473 "SELECT id, data FROM {}.keyhandles",
474 self.get_db_name()
475 ))
476 .map_err(sqlite_error)?;
477
478 let kh_iter = stmt
479 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
480 .map_err(sqlite_error)?;
481
482 kh_iter
483 .map(|v| {
484 let (id, data): (Vec<u8>, Vec<u8>) = v.map_err(sqlite_error)?;
485 let id = serde_json::from_slice(id.as_slice()).map_err(serde_json_error)?;
486 let data = serde_json::from_slice(data.as_slice()).map_err(serde_json_error)?;
487 Ok((id, data))
488 })
489 .collect()
490 }
491
492 #[instrument(level = "debug", name = "idl_sqlite::get_allids", skip_all)]
493 fn get_allids(&self) -> Result<IDLBitRange, OperationError> {
494 let mut stmt = self
495 .get_conn()?
496 .prepare(&format!("SELECT id FROM {}.id2entry", self.get_db_name()))
497 .map_err(sqlite_error)?;
498 let res = stmt.query_map([], |row| row.get(0)).map_err(sqlite_error)?;
499 let mut ids: Result<IDLBitRange, _> = res
500 .map(|v| {
501 v.map_err(sqlite_error).and_then(|id: i64| {
502 id.try_into().map_err(|e| {
504 admin_error!(?e, "I64 Parse Error");
505 OperationError::SqliteError
506 })
507 })
508 })
509 .collect();
510 if let Ok(i) = &mut ids {
511 i.compress()
512 }
513 ids
514 }
515
516 fn list_idxs(&self) -> Result<Vec<String>, OperationError> {
517 let mut stmt = self
518 .get_conn()?
519 .prepare(&format!(
520 "SELECT name from {}.sqlite_master where type='table' and name GLOB 'idx_*'",
521 self.get_db_name()
522 ))
523 .map_err(sqlite_error)?;
524 let idx_table_iter = stmt.query_map([], |row| row.get(0)).map_err(sqlite_error)?;
525
526 idx_table_iter.map(|v| v.map_err(sqlite_error)).collect()
527 }
528
529 fn list_id2entry(&self) -> Result<Vec<(u64, String)>, OperationError> {
530 let allids = self.get_identry_raw(&IdList::AllIds)?;
531 allids
532 .into_iter()
533 .map(|data| data.into_dbentry().map(|(id, db_e)| (id, db_e.to_string())))
534 .collect()
535 }
536
537 fn list_quarantined(&self) -> Result<Vec<(u64, String)>, OperationError> {
538 let mut stmt = self
541 .get_conn()?
542 .prepare(&format!(
543 "SELECT id, data FROM {}.id2entry_quarantine",
544 self.get_db_name()
545 ))
546 .map_err(sqlite_error)?;
547 let id2entry_iter = stmt
548 .query_map([], |row| {
549 Ok(IdSqliteEntry {
550 id: row.get(0)?,
551 data: row.get(1)?,
552 })
553 })
554 .map_err(sqlite_error)?;
555 let allids = id2entry_iter
556 .map(|v| {
557 v.map_err(sqlite_error).and_then(|ise| {
558 ise.try_into()
560 })
561 })
562 .collect::<Result<Vec<IdRawEntry>, _>>()?;
563
564 allids
565 .into_iter()
566 .map(|data| data.into_dbentry().map(|(id, db_e)| (id, db_e.to_string())))
567 .collect()
568 }
569
570 fn get_id2entry(&self, id: u64) -> Result<(u64, String), OperationError> {
571 let idl = IdList::Indexed(IDLBitRange::from_u64(id));
572 let mut allids = self.get_identry_raw(&idl)?;
573 allids
574 .pop()
575 .ok_or(OperationError::InvalidEntryId)
576 .and_then(|data| {
577 data.into_dbentry()
578 .map(|(id, db_e)| (id, format!("{db_e:?}")))
579 })
580 }
581
582 fn list_index_content(
583 &self,
584 index_name: &str,
585 ) -> Result<Vec<(String, IDLBitRange)>, OperationError> {
586 let query = format!("SELECT key, idl FROM {}.{}", self.get_db_name(), index_name);
590 let mut stmt = self
591 .get_conn()?
592 .prepare(query.as_str())
593 .map_err(sqlite_error)?;
594
595 let idx_iter = stmt
596 .query_map([], |row| {
597 Ok(KeyIdl {
598 key: row.get(0)?,
599 data: row.get(1)?,
600 })
601 })
602 .map_err(sqlite_error)?;
603 idx_iter
604 .map(|v| {
605 v.map_err(sqlite_error).and_then(|KeyIdl { key, data }| {
606 serde_json::from_slice(data.as_slice())
607 .map_err(serde_json_error)
608 .map(|idl| (key, idl))
609 })
610 })
611 .collect()
612 }
613
614 #[allow(clippy::let_and_return)]
616 fn verify(&self) -> Vec<Result<(), ConsistencyError>> {
617 let Ok(conn) = self.get_conn() else {
618 return vec![Err(ConsistencyError::SqliteIntegrityFailure)];
619 };
620
621 let Ok(mut stmt) = conn.prepare("PRAGMA integrity_check;") else {
622 return vec![Err(ConsistencyError::SqliteIntegrityFailure)];
623 };
624
625 let r = match stmt.query([]) {
627 Ok(mut rows) => match rows.next() {
628 Ok(Some(v)) => {
629 let r: Result<String, _> = v.get(0);
630 match r {
631 Ok(t) if t == "ok" => Vec::with_capacity(0),
632 _ => vec![Err(ConsistencyError::SqliteIntegrityFailure)],
633 }
634 }
635 _ => vec![Err(ConsistencyError::SqliteIntegrityFailure)],
636 },
637 Err(_) => vec![Err(ConsistencyError::SqliteIntegrityFailure)],
638 };
639 r
640 }
641}
642
643impl IdlSqliteTransaction for IdlSqliteReadTransaction {
644 fn get_db_name(&self) -> &str {
645 self.db_name
646 }
647
648 fn get_conn(&self) -> Result<&Connection, OperationError> {
649 self.conn
650 .as_ref()
651 .ok_or(OperationError::TransactionAlreadyCommitted)
652 }
653}
654
655impl Drop for IdlSqliteReadTransaction {
656 fn drop(&mut self) {
658 let mut dropping = None;
659 std::mem::swap(&mut dropping, &mut self.conn);
660
661 if let Some(conn) = dropping {
662 #[allow(clippy::expect_used)]
663 conn.execute("ROLLBACK TRANSACTION", [])
664 .expect("Unable to rollback transaction! Can not proceed!!!");
665
666 #[allow(clippy::expect_used)]
667 self.pool
668 .lock()
669 .expect("Unable to access db pool")
670 .push_back(conn);
671 }
672 }
673}
674
675impl IdlSqliteReadTransaction {
676 pub fn new(
677 pool: ConnPool,
678 conn: Connection,
679 db_name: &'static str,
680 ) -> Result<Self, OperationError> {
681 conn.execute("BEGIN DEFERRED TRANSACTION", [])
689 .map_err(sqlite_error)?;
690
691 Ok(IdlSqliteReadTransaction {
692 pool,
693 conn: Some(conn),
694 db_name,
695 })
696 }
697}
698
699impl IdlSqliteTransaction for IdlSqliteWriteTransaction {
700 fn get_db_name(&self) -> &str {
701 self.db_name
702 }
703
704 fn get_conn(&self) -> Result<&Connection, OperationError> {
705 self.conn
706 .as_ref()
707 .ok_or(OperationError::TransactionAlreadyCommitted)
708 }
709}
710
711impl Drop for IdlSqliteWriteTransaction {
712 fn drop(&mut self) {
714 let mut dropping = None;
715 std::mem::swap(&mut dropping, &mut self.conn);
716
717 if let Some(conn) = dropping {
718 #[allow(clippy::expect_used)]
719 conn.execute("ROLLBACK TRANSACTION", [])
720 .expect("Unable to rollback transaction! Can not proceed!!!");
721
722 #[allow(clippy::expect_used)]
723 self.pool
724 .lock()
725 .expect("Unable to access db pool")
726 .push_back(conn);
727 }
728 }
729}
730
731impl IdlSqliteWriteTransaction {
732 pub fn new(
733 pool: ConnPool,
734 conn: Connection,
735 db_name: &'static str,
736 ) -> Result<Self, OperationError> {
737 conn.execute("BEGIN EXCLUSIVE TRANSACTION", [])
739 .map_err(sqlite_error)?;
740 Ok(IdlSqliteWriteTransaction {
741 pool,
742 conn: Some(conn),
743 db_name,
744 })
745 }
746
747 #[instrument(level = "debug", name = "idl_sqlite::commit", skip_all)]
748 pub fn commit(mut self) -> Result<(), OperationError> {
749 debug_assert!(self.conn.is_some());
750
751 let mut dropping = None;
752 std::mem::swap(&mut dropping, &mut self.conn);
753
754 if let Some(conn) = dropping {
755 conn.execute("COMMIT TRANSACTION", [])
756 .map(|_| ())
757 .map_err(|e| {
758 admin_error!(?e, "CRITICAL: failed to commit sqlite txn");
759 OperationError::BackendEngine
760 })?;
761
762 self.pool
763 .lock()
764 .map_err(|err| {
765 error!(?err, "Unable to return connection to pool");
766 OperationError::BackendEngine
767 })?
768 .push_back(conn);
769
770 Ok(())
771 } else {
772 Err(OperationError::TransactionAlreadyCommitted)
773 }
774 }
775
776 pub fn get_id2entry_max_id(&self) -> Result<u64, OperationError> {
777 let mut stmt = self
778 .get_conn()?
779 .prepare(&format!(
780 "SELECT MAX(id) as id_max FROM {}.id2entry",
781 self.get_db_name(),
782 ))
783 .map_err(sqlite_error)?;
784 let v = stmt.exists([]).map_err(sqlite_error)?;
787
788 if v {
789 let i: Option<i64> = stmt.query_row([], |row| row.get(0)).map_err(sqlite_error)?;
791 i.unwrap_or(0)
792 .try_into()
793 .map_err(|_| OperationError::InvalidEntryId)
794 } else {
795 Ok(0)
797 }
798 }
799
800 pub fn write_identry(
801 &self,
802 entry: &Entry<EntrySealed, EntryCommitted>,
803 ) -> Result<(), OperationError> {
804 let dbe = entry.to_dbentry();
805 let data = serde_json::to_vec(&dbe).map_err(serde_json_error)?;
806
807 let raw_entries = std::iter::once(IdRawEntry {
808 id: entry.get_id(),
809 data,
810 });
811
812 self.write_identries_raw(raw_entries)
813 }
814
815 pub fn write_identries_raw<I>(&self, mut entries: I) -> Result<(), OperationError>
816 where
817 I: Iterator<Item = IdRawEntry>,
818 {
819 let mut stmt = self
820 .get_conn()?
821 .prepare(&format!(
822 "INSERT OR REPLACE INTO {}.id2entry (id, data) VALUES(:id, :data)",
823 self.get_db_name()
824 ))
825 .map_err(sqlite_error)?;
826
827 entries.try_for_each(|e| {
828 IdSqliteEntry::try_from(e).and_then(|ser_ent| {
829 stmt.execute(named_params! {
830 ":id": &ser_ent.id,
831 ":data": &ser_ent.data.as_slice()
832 })
833 .map(|_| ())
835 .map_err(sqlite_error)
836 })
837 })
838 }
839
840 pub fn delete_identry(&self, id: u64) -> Result<(), OperationError> {
841 let mut stmt = self
842 .get_conn()?
843 .prepare(&format!(
844 "DELETE FROM {}.id2entry WHERE id = :id",
845 self.get_db_name()
846 ))
847 .map_err(sqlite_error)?;
848
849 let iid: i64 = id
850 .try_into()
851 .map_err(|_| OperationError::InvalidEntryId)
852 .and_then(|i| {
853 if i > 0 {
854 Ok(i)
855 } else {
856 Err(OperationError::InvalidEntryId)
857 }
858 })?;
859
860 debug_assert!(iid > 0);
861
862 stmt.execute([&iid]).map(|_| ()).map_err(sqlite_error)
863 }
864
865 pub fn write_idl(
866 &self,
867 attr: &Attribute,
868 itype: IndexType,
869 idx_key: &str,
870 idl: &IDLBitRange,
871 ) -> Result<(), OperationError> {
872 if idl.is_empty() {
873 let query = format!(
876 "DELETE FROM {}.idx_{}_{} WHERE key = :key",
877 self.get_db_name(),
878 itype.as_idx_str(),
879 attr.as_str()
880 );
881
882 self.get_conn()?
883 .prepare(query.as_str())
884 .and_then(|mut stmt| stmt.execute(&[(":key", &idx_key)]))
885 .map_err(sqlite_error)
886 } else {
887 let idl_raw = serde_json::to_vec(idl).map_err(serde_json_error)?;
889
890 let query = format!(
892 "INSERT OR REPLACE INTO {}.idx_{}_{} (key, idl) VALUES(:key, :idl)",
893 self.get_db_name(),
894 itype.as_idx_str(),
895 attr.as_str()
896 );
897
898 self.get_conn()?
899 .prepare(query.as_str())
900 .and_then(|mut stmt| {
901 stmt.execute(named_params! {
902 ":key": &idx_key,
903 ":idl": &idl_raw
904 })
905 })
906 .map_err(sqlite_error)
907 }
908 .map(|_| ())
910 }
911
912 pub fn create_name2uuid(&self) -> Result<(), OperationError> {
913 self.get_conn()?
914 .execute(
915 &format!("CREATE TABLE IF NOT EXISTS {}.idx_name2uuid (name TEXT PRIMARY KEY, uuid TEXT)", self.get_db_name()),
916 [],
917 )
918 .map(|_| ())
919 .map_err(sqlite_error)
920 }
921
922 pub fn write_name2uuid_add(&self, name: &str, uuid: Uuid) -> Result<(), OperationError> {
923 let uuids = uuid.as_hyphenated().to_string();
924
925 self.get_conn()?
926 .prepare(&format!(
927 "INSERT OR REPLACE INTO {}.idx_name2uuid (name, uuid) VALUES(:name, :uuid)",
928 self.get_db_name()
929 ))
930 .and_then(|mut stmt| {
931 stmt.execute(named_params! {
932 ":name": &name,
933 ":uuid": uuids.as_str()
934 })
935 })
936 .map(|_| ())
937 .map_err(sqlite_error)
938 }
939
940 pub fn write_name2uuid_rem(&self, name: &str) -> Result<(), OperationError> {
941 self.get_conn()?
942 .prepare(&format!(
943 "DELETE FROM {}.idx_name2uuid WHERE name = :name",
944 self.get_db_name()
945 ))
946 .and_then(|mut stmt| stmt.execute(&[(":name", &name)]))
947 .map(|_| ())
948 .map_err(sqlite_error)
949 }
950
951 pub fn create_externalid2uuid(&self) -> Result<(), OperationError> {
952 self.get_conn()?
953 .execute(
954 &format!("CREATE TABLE IF NOT EXISTS {}.idx_externalid2uuid (eid TEXT PRIMARY KEY, uuid TEXT)", self.get_db_name()),
955 [],
956 )
957 .map(|_| ())
958 .map_err(sqlite_error)
959 }
960
961 pub fn write_externalid2uuid_add(&self, name: &str, uuid: Uuid) -> Result<(), OperationError> {
962 let uuids = uuid.as_hyphenated().to_string();
963
964 self.get_conn()?
965 .prepare(&format!(
966 "INSERT OR REPLACE INTO {}.idx_externalid2uuid (eid, uuid) VALUES(:eid, :uuid)",
967 self.get_db_name()
968 ))
969 .and_then(|mut stmt| {
970 stmt.execute(named_params! {
971 ":eid": &name,
972 ":uuid": uuids.as_str()
973 })
974 })
975 .map(|_| ())
976 .map_err(sqlite_error)
977 }
978
979 pub fn write_externalid2uuid_rem(&self, name: &str) -> Result<(), OperationError> {
980 self.get_conn()?
981 .prepare(&format!(
982 "DELETE FROM {}.idx_externalid2uuid WHERE eid = :eid",
983 self.get_db_name()
984 ))
985 .and_then(|mut stmt| stmt.execute(&[(":eid", &name)]))
986 .map(|_| ())
987 .map_err(sqlite_error)
988 }
989
990 pub fn create_uuid2spn(&self) -> Result<(), OperationError> {
991 self.get_conn()?
992 .execute(
993 &format!(
994 "CREATE TABLE IF NOT EXISTS {}.idx_uuid2spn (uuid TEXT PRIMARY KEY, spn BLOB)",
995 self.get_db_name()
996 ),
997 [],
998 )
999 .map(|_| ())
1000 .map_err(sqlite_error)
1001 }
1002
1003 pub fn write_uuid2spn(&self, uuid: Uuid, k: Option<&Value>) -> Result<(), OperationError> {
1004 let uuids = uuid.as_hyphenated().to_string();
1005 match k {
1006 Some(k) => {
1007 let dbv1: DbIdentSpn = k.to_db_ident_spn();
1008 let data = serde_json::to_vec(&dbv1).map_err(serde_json_error)?;
1009 self.get_conn()?
1010 .prepare(&format!(
1011 "INSERT OR REPLACE INTO {}.idx_uuid2spn (uuid, spn) VALUES(:uuid, :spn)",
1012 self.get_db_name()
1013 ))
1014 .and_then(|mut stmt| {
1015 stmt.execute(named_params! {
1016 ":uuid": &uuids,
1017 ":spn": &data,
1018 })
1019 })
1020 .map(|_| ())
1021 .map_err(sqlite_error)
1022 }
1023 None => self
1024 .get_conn()?
1025 .prepare(&format!(
1026 "DELETE FROM {}.idx_uuid2spn WHERE uuid = :uuid",
1027 self.get_db_name()
1028 ))
1029 .and_then(|mut stmt| stmt.execute(&[(":uuid", &uuids)]))
1030 .map(|_| ())
1031 .map_err(sqlite_error),
1032 }
1033 }
1034
1035 pub fn create_uuid2rdn(&self) -> Result<(), OperationError> {
1036 self.get_conn()?
1037 .execute(
1038 &format!(
1039 "CREATE TABLE IF NOT EXISTS {}.idx_uuid2rdn (uuid TEXT PRIMARY KEY, rdn TEXT)",
1040 self.get_db_name()
1041 ),
1042 [],
1043 )
1044 .map(|_| ())
1045 .map_err(sqlite_error)
1046 }
1047
1048 pub fn write_uuid2rdn(&self, uuid: Uuid, k: Option<&String>) -> Result<(), OperationError> {
1049 let uuids = uuid.as_hyphenated().to_string();
1050 match k {
1051 Some(k) => self
1052 .get_conn()?
1053 .prepare(&format!(
1054 "INSERT OR REPLACE INTO {}.idx_uuid2rdn (uuid, rdn) VALUES(:uuid, :rdn)",
1055 self.get_db_name()
1056 ))
1057 .and_then(|mut stmt| stmt.execute(&[(":uuid", &uuids), (":rdn", k)]))
1058 .map(|_| ())
1059 .map_err(sqlite_error),
1060 None => self
1061 .get_conn()?
1062 .prepare(&format!(
1063 "DELETE FROM {}.idx_uuid2rdn WHERE uuid = :uuid",
1064 self.get_db_name()
1065 ))
1066 .and_then(|mut stmt| stmt.execute(&[(":uuid", &uuids)]))
1067 .map(|_| ())
1068 .map_err(sqlite_error),
1069 }
1070 }
1071
1072 pub(crate) fn create_keyhandles(&self) -> Result<(), OperationError> {
1073 self.get_conn()?
1074 .execute(
1075 &format!(
1076 "CREATE TABLE IF NOT EXISTS {}.keyhandles (id TEXT PRIMARY KEY, data TEXT)",
1077 self.get_db_name()
1078 ),
1079 [],
1080 )
1081 .map(|_| ())
1082 .map_err(sqlite_error)
1083 }
1084
1085 pub(crate) fn create_db_ruv(&self) -> Result<(), OperationError> {
1086 self.get_conn()?
1087 .execute(
1088 &format!(
1089 "CREATE TABLE IF NOT EXISTS {}.ruv (cid TEXT PRIMARY KEY)",
1090 self.get_db_name()
1091 ),
1092 [],
1093 )
1094 .map(|_| ())
1095 .map_err(sqlite_error)
1096 }
1097
1098 pub fn get_db_ruv(&self) -> Result<BTreeSet<Cid>, OperationError> {
1099 let mut stmt = self
1100 .get_conn()?
1101 .prepare(&format!("SELECT cid FROM {}.ruv", self.get_db_name()))
1102 .map_err(sqlite_error)?;
1103
1104 let kh_iter = stmt.query_map([], |row| row.get(0)).map_err(sqlite_error)?;
1105
1106 kh_iter
1107 .map(|v| {
1108 let ser_cid: String = v.map_err(sqlite_error)?;
1109 let db_cid: DbCidV1 = serde_json::from_str(&ser_cid).map_err(serde_json_error)?;
1110 Ok(db_cid.into())
1111 })
1112 .collect()
1113 }
1114
1115 pub fn write_db_ruv<I, J>(&mut self, mut added: I, mut removed: J) -> Result<(), OperationError>
1116 where
1117 I: Iterator<Item = Cid>,
1118 J: Iterator<Item = Cid>,
1119 {
1120 let mut stmt = self
1121 .get_conn()?
1122 .prepare(&format!(
1123 "DELETE FROM {}.ruv WHERE cid = :cid",
1124 self.get_db_name()
1125 ))
1126 .map_err(sqlite_error)?;
1127
1128 removed.try_for_each(|cid| {
1129 let db_cid: DbCidV1 = cid.into();
1130
1131 serde_json::to_string(&db_cid)
1132 .map_err(serde_json_error)
1133 .and_then(|ser_cid| {
1134 stmt.execute(named_params! {
1135 ":cid": &ser_cid
1136 })
1137 .map(|_| ())
1139 .map_err(sqlite_error)
1140 })
1141 })?;
1142
1143 let mut stmt = self
1144 .get_conn()?
1145 .prepare(&format!(
1146 "INSERT OR REPLACE INTO {}.ruv (cid) VALUES(:cid)",
1147 self.get_db_name()
1148 ))
1149 .map_err(sqlite_error)?;
1150
1151 added.try_for_each(|cid| {
1152 let db_cid: DbCidV1 = cid.into();
1153
1154 serde_json::to_string(&db_cid)
1155 .map_err(serde_json_error)
1156 .and_then(|ser_cid| {
1157 stmt.execute(named_params! {
1158 ":cid": &ser_cid
1159 })
1160 .map(|_| ())
1162 .map_err(sqlite_error)
1163 })
1164 })
1165 }
1166
1167 #[instrument(level = "debug", skip(self))]
1168 pub fn create_idx(&self, attr: &Attribute, itype: IndexType) -> Result<(), OperationError> {
1169 let idx_stmt = format!(
1174 "CREATE TABLE IF NOT EXISTS {}.idx_{}_{} (key TEXT PRIMARY KEY, idl BLOB)",
1175 self.get_db_name(),
1176 itype.as_idx_str(),
1177 attr.as_str(),
1178 );
1179
1180 self.get_conn()?
1181 .execute(idx_stmt.as_str(), [])
1182 .map(|_| ())
1183 .map_err(sqlite_error)
1184 }
1185
1186 #[instrument(level = "trace", skip_all)]
1191 pub fn danger_purge_idxs(&self) -> Result<(), OperationError> {
1192 let idx_table_list = self.list_idxs()?;
1193 trace!(tables = ?idx_table_list);
1194
1195 idx_table_list.iter().try_for_each(|idx_table| {
1196 debug!(table = ?idx_table, "removing idx_table");
1197 self.get_conn()?
1198 .prepare(format!("DROP TABLE {}.{}", self.get_db_name(), idx_table).as_str())
1199 .and_then(|mut stmt| stmt.execute([]).map(|_| ()))
1200 .map_err(sqlite_error)
1201 })
1202 }
1203
1204 pub fn store_idx_slope_analysis(
1205 &self,
1206 slopes: &HashMap<IdxKey, IdxSlope>,
1207 ) -> Result<(), OperationError> {
1208 self.get_conn()?
1209 .execute(
1210 &format!(
1211 "CREATE TABLE IF NOT EXISTS {}.idxslope_analysis (
1212 id TEXT PRIMARY KEY,
1213 slope INTEGER
1214 )",
1215 self.get_db_name()
1216 ),
1217 [],
1218 )
1219 .map(|_| ())
1220 .map_err(sqlite_error)?;
1221
1222 self.get_conn()?
1224 .execute(
1225 &format!("DELETE FROM {}.idxslope_analysis", self.get_db_name()),
1226 [],
1227 )
1228 .map(|_| ())
1229 .map_err(sqlite_error)?;
1230
1231 slopes.iter().try_for_each(|(k, v)| {
1232 let key = format!("idx_{}_{}", k.itype.as_idx_str(), k.attr);
1233 self.get_conn()?
1234 .execute(
1235 &format!("INSERT OR REPLACE INTO {}.idxslope_analysis (id, slope) VALUES(:id, :slope)", self.get_db_name()),
1236 named_params! {
1237 ":id": &key,
1238 ":slope": &v,
1239 },
1240 )
1241 .map(|_| ())
1242 .map_err(|e| {
1243 admin_error!(immediate = true, ?e, "CRITICAL: rusqlite error in store_idx_slope_analysis");
1244 eprintln!("CRITICAL: rusqlite error in store_idx_slope_analysis: {e:?}");
1245 OperationError::SqliteError
1246 })
1247 })
1248 }
1249
1250 pub fn is_idx_slopeyness_generated(&self) -> Result<bool, OperationError> {
1251 self.exists_table("idxslope_analysis")
1252 }
1253
1254 pub fn get_idx_slope(&self, ikey: &IdxKey) -> Result<Option<IdxSlope>, OperationError> {
1255 let analysis_exists = self.exists_table("idxslope_analysis")?;
1256 if !analysis_exists {
1257 return Ok(None);
1258 }
1259
1260 let key = format!("idx_{}_{}", ikey.itype.as_idx_str(), ikey.attr);
1263
1264 let mut stmt = self
1265 .get_conn()?
1266 .prepare(&format!(
1267 "SELECT slope FROM {}.idxslope_analysis WHERE id = :id",
1268 self.get_db_name()
1269 ))
1270 .map_err(sqlite_error)?;
1271
1272 let slope: Option<IdxSlope> = stmt
1273 .query_row(&[(":id", &key)], |row| row.get(0))
1274 .optional()
1276 .map_err(sqlite_error)?;
1277 trace!(name = %key, ?slope, "Got slope for index");
1278
1279 Ok(slope)
1280 }
1281
1282 pub fn quarantine_entry(&self, id: u64) -> Result<(), OperationError> {
1283 let iid = i64::try_from(id).map_err(|_| OperationError::InvalidEntryId)?;
1284
1285 let id_sqlite_entry = self
1286 .get_conn()?
1287 .query_row(
1288 &format!(
1289 "DELETE FROM {}.id2entry WHERE id = :idl RETURNING id, data",
1290 self.get_db_name()
1291 ),
1292 [&iid],
1293 |row| {
1294 Ok(IdSqliteEntry {
1295 id: row.get(0)?,
1296 data: row.get(1)?,
1297 })
1298 },
1299 )
1300 .map_err(sqlite_error)?;
1301
1302 trace!(?id_sqlite_entry);
1303
1304 self.get_conn()?
1305 .execute(
1306 &format!(
1307 "INSERT OR REPLACE INTO {}.id2entry_quarantine VALUES(:id, :data)",
1308 self.get_db_name()
1309 ),
1310 named_params! {
1311 ":id": &id_sqlite_entry.id,
1312 ":data": &id_sqlite_entry.data.as_slice()
1313 },
1314 )
1315 .map_err(sqlite_error)
1316 .map(|_| ())
1317 }
1318
1319 pub fn restore_quarantined(&self, id: u64) -> Result<(), OperationError> {
1320 let iid = i64::try_from(id).map_err(|_| OperationError::InvalidEntryId)?;
1321
1322 let id_sqlite_entry = self
1323 .get_conn()?
1324 .query_row(
1325 &format!(
1326 "DELETE FROM {}.id2entry_quarantine WHERE id = :idl RETURNING id, data",
1327 self.get_db_name()
1328 ),
1329 [&iid],
1330 |row| {
1331 Ok(IdSqliteEntry {
1332 id: row.get(0)?,
1333 data: row.get(1)?,
1334 })
1335 },
1336 )
1337 .map_err(sqlite_error)?;
1338
1339 trace!(?id_sqlite_entry);
1340
1341 self.get_conn()?
1342 .execute(
1343 &format!(
1344 "INSERT INTO {}.id2entry VALUES(:id, :data)",
1345 self.get_db_name()
1346 ),
1347 named_params! {
1348 ":id": &id_sqlite_entry.id,
1349 ":data": &id_sqlite_entry.data.as_slice()
1350 },
1351 )
1352 .map_err(sqlite_error)
1353 .map(|_| ())
1354 }
1355
1356 #[instrument(level = "trace", skip_all)]
1361 pub fn danger_purge_id2entry(&self) -> Result<(), OperationError> {
1362 self.get_conn()?
1363 .execute(&format!("DELETE FROM {}.id2entry", self.get_db_name()), [])
1364 .map(|_| ())
1365 .map_err(sqlite_error)
1366 }
1367
1368 pub fn write_db_s_uuid(&self, nsid: Uuid) -> Result<(), OperationError> {
1369 let data = serde_json::to_vec(&nsid).map_err(|e| {
1370 admin_error!(immediate = true, ?e, "CRITICAL: Serde JSON Error");
1371 eprintln!("CRITICAL: Serde JSON Error -> {e:?}");
1372 OperationError::SerdeJsonError
1373 })?;
1374
1375 self.get_conn()?
1376 .execute(
1377 &format!(
1378 "INSERT OR REPLACE INTO {}.db_sid (id, data) VALUES(:id, :sid)",
1379 self.get_db_name()
1380 ),
1381 named_params! {
1382 ":id": &2,
1383 ":sid": &data,
1384 },
1385 )
1386 .map(|_| ())
1387 .map_err(|e| {
1388 admin_error!(
1389 immediate = true,
1390 ?e,
1391 "CRITICAL: rusqlite error in write_db_s_uuid"
1392 );
1393 eprintln!("CRITICAL: rusqlite error in write_db_s_uuid {e:?}");
1394 OperationError::SqliteError
1395 })
1396 }
1397
1398 pub fn write_db_d_uuid(&self, nsid: Uuid) -> Result<(), OperationError> {
1399 let data = serde_json::to_vec(&nsid).map_err(|e| {
1400 admin_error!(
1401 immediate = true,
1402 ?e,
1403 "CRITICAL: Serde JSON Error in write_db_d_uuid"
1404 );
1405 eprintln!("CRITICAL: Serde JSON Error in write_db_d_uuid-> {e:?}");
1406 OperationError::SerdeJsonError
1407 })?;
1408
1409 self.get_conn()?
1410 .execute(
1411 &format!(
1412 "INSERT OR REPLACE INTO {}.db_did (id, data) VALUES(:id, :did)",
1413 self.get_db_name()
1414 ),
1415 named_params! {
1416 ":id": &2,
1417 ":did": &data,
1418 },
1419 )
1420 .map(|_| ())
1421 .map_err(|e| {
1422 admin_error!(
1423 immediate = true,
1424 ?e,
1425 "CRITICAL: rusqlite error in write_db_d_uuid"
1426 );
1427 eprintln!("CRITICAL: rusqlite error in write_db_d_uuid {e:?}");
1428 OperationError::SqliteError
1429 })
1430 }
1431
1432 pub fn set_db_ts_max(&self, ts: Duration) -> Result<(), OperationError> {
1433 let data = serde_json::to_vec(&ts).map_err(|e| {
1434 admin_error!(
1435 immediate = true,
1436 ?e,
1437 "CRITICAL: Serde JSON Error in set_db_ts_max"
1438 );
1439 eprintln!("CRITICAL: Serde JSON Error in set_db_ts_max -> {e:?}");
1440 OperationError::SerdeJsonError
1441 })?;
1442
1443 self.get_conn()?
1444 .execute(
1445 &format!(
1446 "INSERT OR REPLACE INTO {}.db_op_ts (id, data) VALUES(:id, :did)",
1447 self.get_db_name()
1448 ),
1449 named_params! {
1450 ":id": &1,
1451 ":did": &data,
1452 },
1453 )
1454 .map(|_| ())
1455 .map_err(|e| {
1456 admin_error!(
1457 immediate = true,
1458 ?e,
1459 "CRITICAL: rusqlite error in set_db_ts_max"
1460 );
1461 eprintln!("CRITICAL: rusqlite error in set_db_ts_max {e:?}");
1462 OperationError::SqliteError
1463 })
1464 }
1465
1466 fn get_db_version_key(&self, key: &str) -> Result<i64, OperationError> {
1469 self.get_conn().map(|conn| {
1470 conn.query_row(
1471 &format!(
1472 "SELECT version FROM {}.db_version WHERE id = :id",
1473 self.get_db_name()
1474 ),
1475 &[(":id", &key)],
1476 |row| row.get(0),
1477 )
1478 .unwrap_or({
1479 0
1481 })
1482 })
1483 }
1484
1485 fn set_db_version_key(&self, key: &str, v: i64) -> Result<(), OperationError> {
1486 self.get_conn()?
1487 .execute(
1488 &format!(
1489 "INSERT OR REPLACE INTO {}.db_version (id, version) VALUES(:id, :dbv_id2entry)",
1490 self.get_db_name()
1491 ),
1492 named_params! {
1493 ":id": &key,
1494 ":dbv_id2entry": v,
1495 },
1496 )
1497 .map(|_| ())
1498 .map_err(|e| {
1499 admin_error!(
1500 immediate = true,
1501 ?e,
1502 "CRITICAL: rusqlite error in set_db_version_key"
1503 );
1504 eprintln!("CRITICAL: rusqlite error in set_db_version_key {e:?}");
1505 OperationError::SqliteError
1506 })
1507 }
1508
1509 pub(crate) fn get_db_index_version(&self) -> Result<i64, OperationError> {
1510 self.get_db_version_key(DBV_INDEXV)
1511 }
1512
1513 pub(crate) fn set_db_index_version(&self, v: i64) -> Result<(), OperationError> {
1514 self.set_db_version_key(DBV_INDEXV, v)
1515 }
1516
1517 pub fn setup(&self) -> Result<(), OperationError> {
1518 trace!(db_name = %self.get_db_name(), "setup");
1521 if self.get_db_name() != "main" {
1522 warn!("Using non-default db-name - this database content WILL be lost!");
1523 self.get_conn()?
1525 .execute(&format!("ATTACH DATABASE '' AS {}", self.get_db_name()), [])
1526 .map_err(sqlite_error)?;
1527 };
1528
1529 self.get_conn()?
1542 .execute(
1543 &format!(
1544 "CREATE TABLE IF NOT EXISTS {}.db_version (
1545 id TEXT PRIMARY KEY,
1546 version INTEGER
1547 )
1548 ",
1549 self.get_db_name()
1550 ),
1551 [],
1552 )
1553 .map_err(sqlite_error)?;
1554
1555 let mut dbv_id2entry = self.get_db_version_key(DBV_ID2ENTRY)?;
1557
1558 trace!(%dbv_id2entry);
1559
1560 if dbv_id2entry != 0 && dbv_id2entry < 9 {
1561 error!(
1562 ?dbv_id2entry,
1563 "Unable to perform database migrations. This instance is too old."
1564 );
1565 return Err(OperationError::DB0004DatabaseTooOld);
1566 }
1567
1568 if dbv_id2entry == 0 {
1571 self.get_conn()?
1572 .execute(
1573 &format!(
1574 "CREATE TABLE IF NOT EXISTS {}.id2entry (
1575 id INTEGER PRIMARY KEY ASC,
1576 data BLOB NOT NULL
1577 )
1578 ",
1579 self.get_db_name()
1580 ),
1581 [],
1582 )
1583 .map_err(sqlite_error)?;
1584
1585 self.get_conn()?
1586 .execute(
1587 &format!(
1588 "CREATE TABLE IF NOT EXISTS {}.db_sid (
1589 id INTEGER PRIMARY KEY ASC,
1590 data BLOB NOT NULL
1591 )
1592 ",
1593 self.get_db_name()
1594 ),
1595 [],
1596 )
1597 .map_err(sqlite_error)?;
1598
1599 dbv_id2entry = 1;
1600
1601 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (id2entry, db_sid)");
1602 }
1603 if dbv_id2entry == 1 {
1605 self.get_conn()?
1606 .execute(
1607 &format!(
1608 "CREATE TABLE IF NOT EXISTS {}.db_did (
1609 id INTEGER PRIMARY KEY ASC,
1610 data BLOB NOT NULL
1611 )
1612 ",
1613 self.get_db_name()
1614 ),
1615 [],
1616 )
1617 .map_err(sqlite_error)?;
1618
1619 dbv_id2entry = 2;
1620 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (db_did)");
1621 }
1622 if dbv_id2entry == 2 {
1624 self.get_conn()?
1625 .execute(
1626 &format!(
1627 "CREATE TABLE IF NOT EXISTS {}.db_op_ts (
1628 id INTEGER PRIMARY KEY ASC,
1629 data BLOB NOT NULL
1630 )
1631 ",
1632 self.get_db_name()
1633 ),
1634 [],
1635 )
1636 .map_err(sqlite_error)?;
1637 dbv_id2entry = 3;
1638 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (db_op_ts)");
1639 }
1640 if dbv_id2entry == 3 {
1642 self.create_name2uuid()
1643 .and_then(|_| self.create_uuid2spn())
1644 .and_then(|_| self.create_uuid2rdn())?;
1645 dbv_id2entry = 4;
1646 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (name2uuid, uuid2spn, uuid2rdn)");
1647 }
1648 if dbv_id2entry == 4 {
1650 dbv_id2entry = 5;
1651 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (dbentryv1 -> dbentryv2)");
1652 }
1653 if dbv_id2entry == 5 {
1655 self.create_externalid2uuid()?;
1656 dbv_id2entry = 6;
1657 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (externalid2uuid)");
1658 }
1659 if dbv_id2entry == 6 {
1661 self.get_conn()?
1662 .execute(
1663 &format!(
1664 "CREATE TABLE IF NOT EXISTS {}.id2entry_quarantine (
1665 id INTEGER PRIMARY KEY ASC,
1666 data BLOB NOT NULL
1667 )
1668 ",
1669 self.get_db_name()
1670 ),
1671 [],
1672 )
1673 .map_err(sqlite_error)?;
1674
1675 dbv_id2entry = 7;
1676 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (quarantine)");
1677 }
1678 if dbv_id2entry == 7 {
1680 self.create_keyhandles()?;
1681 dbv_id2entry = 8;
1682 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (keyhandles)");
1683 }
1684 if dbv_id2entry == 8 {
1686 dbv_id2entry = 9;
1687 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (dbentryv2 -> dbentryv3)");
1688 }
1689 if dbv_id2entry == 9 {
1691 self.create_db_ruv()?;
1692 dbv_id2entry = 10;
1693 info!(entry = %dbv_id2entry, "dbv_id2entry migrated (db_ruv)");
1694 }
1695 self.set_db_version_key(DBV_ID2ENTRY, dbv_id2entry)?;
1698
1699 Ok(())
1704 }
1705}
1706
1707impl IdlSqlite {
1708 pub fn new(cfg: &BackendConfig, vacuum: bool) -> Result<Self, OperationError> {
1709 if cfg.path.as_os_str().is_empty() {
1710 debug_assert_eq!(cfg.pool_size, 1);
1711 }
1712 let mut flags = OpenFlags::default();
1717 if cfg!(test) {
1720 flags.insert(OpenFlags::SQLITE_OPEN_NO_MUTEX);
1721 };
1722
1723 let fs_page_size = cfg.fstype as u32;
1724 let cache_pages = 33554432 / fs_page_size;
1727 let checkpoint_pages = cfg.fstype.checkpoint_pages();
1728
1729 {
1731 let vconn = Connection::open_with_flags(&cfg.path, flags).map_err(sqlite_error)?;
1732
1733 vconn
1734 .execute_batch(
1735 format!(
1736 "PRAGMA page_size={fs_page_size};
1737 PRAGMA cache_size={cache_pages};
1738 PRAGMA journal_mode=WAL;
1739 PRAGMA wal_autocheckpoint={checkpoint_pages};
1740 PRAGMA wal_checkpoint(RESTART);"
1741 )
1742 .as_str(),
1743 )
1744 .map_err(sqlite_error)?;
1745 }
1746
1747 if vacuum {
1749 admin_warn!(
1750 immediate = true,
1751 "NOTICE: A db vacuum has been requested. This may take a long time ..."
1752 );
1753 let vconn = Connection::open_with_flags(&cfg.path, flags).map_err(sqlite_error)?;
1761
1762 vconn
1763 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
1764 .map_err(|e| {
1765 admin_error!(?e, "rusqlite wal_checkpoint error");
1766 OperationError::SqliteError
1767 })?;
1768
1769 vconn
1770 .pragma_update(None, "journal_mode", "DELETE")
1771 .map_err(|e| {
1772 admin_error!(?e, "rusqlite journal_mode update error");
1773 OperationError::SqliteError
1774 })?;
1775
1776 vconn.close().map_err(|e| {
1777 admin_error!(?e, "rusqlite db close error");
1778 OperationError::SqliteError
1779 })?;
1780
1781 let vconn = Connection::open_with_flags(&cfg.path, flags).map_err(sqlite_error)?;
1782
1783 vconn
1784 .pragma_update(None, "page_size", cfg.fstype as u32)
1785 .map_err(|e| {
1786 admin_error!(?e, "rusqlite page_size update error");
1787 OperationError::SqliteError
1788 })?;
1789
1790 vconn.execute_batch("VACUUM").map_err(|e| {
1791 admin_error!(?e, "rusqlite vacuum error");
1792 OperationError::SqliteError
1793 })?;
1794
1795 vconn
1796 .pragma_update(None, "journal_mode", "WAL")
1797 .map_err(|e| {
1798 admin_error!(?e, "rusqlite journal_mode update error");
1799 OperationError::SqliteError
1800 })?;
1801
1802 vconn.close().map_err(|e| {
1803 admin_error!(?e, "rusqlite db close error");
1804 OperationError::SqliteError
1805 })?;
1806
1807 admin_warn!(immediate = true, "NOTICE: db vacuum complete");
1808 };
1810
1811 let pool = (0..cfg.pool_size)
1812 .map(|i| {
1813 trace!("Opening Connection {}", i);
1814 let conn =
1815 Connection::open_with_flags(&cfg.path, flags).map_err(sqlite_error);
1816 match conn {
1817 Ok(conn) => {
1818 conn
1820 .execute_batch(
1821 format!(
1822 "PRAGMA cache_size={cache_pages};"
1823 )
1824 .as_str(),
1825 )
1826 .map_err(sqlite_error)?;
1827
1828
1829 rusqlite::vtab::array::load_module(&conn).map_err(|e| {
1831 admin_error!(
1832 "Failed to load rarray virtual module for sqlite, cannot start! {:?}", e
1833 );
1834 sqlite_error(e)
1835 })?;
1836 Ok(conn)
1837 }
1838 Err(err) => {
1839 admin_error!(
1840 "Failed to start database connection, cannot start! {:?}",
1841 err
1842 );
1843 Err(err)
1844 }
1845 }
1846 })
1847 .collect::<Result<VecDeque<Connection>, OperationError>>()
1848 .map_err(|e| {
1849 error!(err = ?e, "Failed to build connection pool");
1850 e
1851 })?;
1852
1853 let pool = Arc::new(Mutex::new(pool));
1854
1855 Ok(IdlSqlite {
1856 pool,
1857 db_name: cfg.db_name,
1858 })
1859 }
1860
1861 pub(crate) fn get_allids_count(&self) -> Result<u64, OperationError> {
1862 let guard = self.pool.lock().map_err(|err| {
1863 error!(?err, "Unable to access connection to pool");
1864 OperationError::BackendEngine
1865 })?;
1866 let conn = guard.front().ok_or_else(|| {
1868 error!("Unable to retrieve connection from pool");
1869 OperationError::BackendEngine
1870 })?;
1871
1872 conn.query_row("select count(id) from id2entry", [], |row| row.get(0))
1873 .map_err(sqlite_error)
1874 }
1875
1876 pub fn read(&self) -> Result<IdlSqliteReadTransaction, OperationError> {
1877 let mut guard = self.pool.lock().map_err(|e| {
1879 error!(err = ?e, "Unable to lock connection pool.");
1880 OperationError::BackendEngine
1881 })?;
1882
1883 let conn = guard.pop_front().ok_or_else(|| {
1884 error!("Unable to retrieve connection from pool.");
1885 OperationError::BackendEngine
1886 })?;
1887
1888 IdlSqliteReadTransaction::new(self.pool.clone(), conn, self.db_name)
1889 }
1890
1891 pub fn write(&self) -> Result<IdlSqliteWriteTransaction, OperationError> {
1892 let mut guard = self.pool.lock().map_err(|e| {
1894 error!(err = ?e, "Unable to lock connection pool.");
1895 OperationError::BackendEngine
1896 })?;
1897
1898 let conn = guard.pop_front().ok_or_else(|| {
1899 error!("Unable to retrieve connection from pool.");
1900 OperationError::BackendEngine
1901 })?;
1902
1903 IdlSqliteWriteTransaction::new(self.pool.clone(), conn, self.db_name)
1904 }
1905}
1906
1907#[cfg(test)]
1908mod tests {
1909 use crate::be::idl_sqlite::{IdlSqlite, IdlSqliteTransaction};
1910 use crate::be::BackendConfig;
1911
1912 #[test]
1913 fn test_idl_sqlite_verify() {
1914 sketching::test_init();
1915 let cfg = BackendConfig::new_test("main");
1916 let be = IdlSqlite::new(&cfg, false).unwrap();
1917 let be_w = be.write().unwrap();
1918 let r = be_w.verify();
1919 assert!(r.is_empty());
1920 }
1921}