kanidmd_core/actors/
internal.rs
1use crate::{QueryServerReadV1, QueryServerWriteV1};
6use tracing::{Instrument, Level};
7
8use kanidmd_lib::prelude::*;
9
10use kanidmd_lib::{
11 event::{PurgeRecycledEvent, PurgeTombstoneEvent},
12 idm::delayed::DelayedAction,
13};
14
15use kanidm_proto::internal::{
16 DomainInfo as ProtoDomainInfo, DomainUpgradeCheckReport as ProtoDomainUpgradeCheckReport,
17};
18
19impl QueryServerReadV1 {
20 #[instrument(
21 level = "info",
22 skip_all,
23 fields(uuid = ?eventid)
24 )]
25 pub(crate) async fn handle_domain_show(
26 &self,
27 eventid: Uuid,
28 ) -> Result<ProtoDomainInfo, OperationError> {
29 let mut idms_prox_read = self.idms.proxy_read().await?;
30
31 idms_prox_read.qs_read.domain_info()
32 }
33
34 #[instrument(
35 level = "info",
36 skip_all,
37 fields(uuid = ?eventid)
38 )]
39 pub(crate) async fn handle_domain_upgrade_check(
40 &self,
41 eventid: Uuid,
42 ) -> Result<ProtoDomainUpgradeCheckReport, OperationError> {
43 let mut idms_prox_read = self.idms.proxy_read().await?;
44
45 idms_prox_read.qs_read.domain_upgrade_check()
46 }
47}
48
49impl QueryServerWriteV1 {
50 #[instrument(
51 level = "info",
52 skip_all,
53 fields(uuid = ?msg.eventid)
54 )]
55 pub async fn handle_purgetombstoneevent(&self, msg: PurgeTombstoneEvent) {
56 let Ok(mut idms_prox_write) = self.idms.proxy_write(duration_from_epoch_now()).await else {
57 warn!("Unable to start purge tombstone event, will retry later");
58 return;
59 };
60
61 let res = idms_prox_write
62 .qs_write
63 .purge_tombstones()
64 .and_then(|_changed| idms_prox_write.commit());
65
66 match res {
67 Ok(()) => {
68 debug!("Purge tombstone success");
69 }
70 Err(err) => {
71 error!(?err, "Unable to purge tombstones");
72 }
73 }
74 }
75
76 #[instrument(
77 level = "info",
78 skip_all,
79 fields(uuid = ?msg.eventid)
80 )]
81 pub async fn handle_purgerecycledevent(&self, msg: PurgeRecycledEvent) {
82 let ct = duration_from_epoch_now();
83 let Ok(mut idms_prox_write) = self.idms.proxy_write(ct).await else {
84 warn!("Unable to start purge recycled event, will retry later");
85 return;
86 };
87 let res = idms_prox_write
88 .qs_write
89 .purge_recycled()
90 .and_then(|touched| {
91 if touched > 0 {
93 idms_prox_write.commit()
94 } else {
95 Ok(())
96 }
97 });
98
99 match res {
100 Ok(()) => {
101 debug!("Purge recyclebin success");
102 }
103 Err(err) => {
104 error!(?err, "Unable to purge recyclebin");
105 }
106 }
107 }
108
109 pub(crate) async fn handle_delayedaction(&self, da_batch: &mut Vec<DelayedAction>) {
110 let eventid = Uuid::new_v4();
111 let span = span!(Level::INFO, "process_delayed_action", uuid = ?eventid);
112
113 let mut retry = false;
114
115 async {
116 let ct = duration_from_epoch_now();
117 match self.idms.proxy_write(ct).await {
118 Ok(mut idms_prox_write) => {
119 for da in da_batch.iter() {
120 retry = idms_prox_write.process_delayedaction(da, ct).is_err();
121 if retry {
122 warn!("delayed action failed, will be retried individually.");
124 break;
125 }
126 }
127
128 if let Err(res) = idms_prox_write.commit() {
129 retry = true;
130 error!(?res, "delayed action batch commit error");
131 }
132 }
133 Err(err) => {
134 error!(?err, "unable to process delayed actions");
135 }
136 }
137 }
138 .instrument(span)
139 .await;
140
141 if retry {
142 for da in da_batch.iter() {
144 let eventid = Uuid::new_v4();
145 let span = span!(Level::INFO, "process_delayed_action_retried", uuid = ?eventid);
146
147 async {
148 let ct = duration_from_epoch_now();
149
150 match self.idms.proxy_write(ct).await {
151 Ok(mut idms_prox_write) => {
152 if let Err(res) = idms_prox_write
153 .process_delayedaction(da, ct)
154 .and_then(|_| idms_prox_write.commit())
155 {
156 error!(?res, "delayed action commit error");
157 }
158 }
159 Err(err) => {
160 error!(?err, "unable to process delayed actions");
161 }
162 }
163 }
164 .instrument(span)
165 .await
166 }
167 }
168
169 da_batch.clear();
171 }
172
173 #[instrument(
174 level = "info",
175 skip(self, eventid),
176 fields(uuid = ?eventid)
177 )]
178 pub(crate) async fn handle_admin_recover_account(
179 &self,
180 name: String,
181 eventid: Uuid,
182 ) -> Result<String, OperationError> {
183 let ct = duration_from_epoch_now();
184 let mut idms_prox_write = self.idms.proxy_write(ct).await?;
185 let pw = idms_prox_write.recover_account(name.as_str(), None)?;
186
187 idms_prox_write.commit().map(|()| pw)
188 }
189
190 #[instrument(
191 level = "info",
192 skip_all,
193 fields(uuid = ?eventid)
194 )]
195 pub(crate) async fn handle_domain_raise(&self, eventid: Uuid) -> Result<u32, OperationError> {
196 let ct = duration_from_epoch_now();
197 let mut idms_prox_write = self.idms.proxy_write(ct).await?;
198
199 idms_prox_write.qs_write.domain_raise(DOMAIN_MAX_LEVEL)?;
200
201 idms_prox_write.commit().map(|()| DOMAIN_MAX_LEVEL)
202 }
203
204 #[instrument(
205 level = "info",
206 skip(self, eventid),
207 fields(uuid = ?eventid)
208 )]
209 pub(crate) async fn handle_domain_remigrate(
210 &self,
211 level: Option<u32>,
212 eventid: Uuid,
213 ) -> Result<(), OperationError> {
214 let level = level.unwrap_or(DOMAIN_MIN_REMIGRATION_LEVEL);
215 let ct = duration_from_epoch_now();
216 let mut idms_prox_write = self.idms.proxy_write(ct).await?;
217
218 idms_prox_write.qs_write.domain_remigrate(level)?;
219
220 idms_prox_write.commit()
221 }
222}