kanidmd_core/actors/
internal.rs1use crate::{QueryServerReadV1, QueryServerWriteV1};
6use kanidm_proto::internal::{
7    DomainInfo as ProtoDomainInfo, DomainUpgradeCheckReport as ProtoDomainUpgradeCheckReport,
8};
9use kanidmd_lib::prelude::*;
10use kanidmd_lib::{
11    event::{PurgeDeleteAfterEvent, PurgeRecycledEvent, PurgeTombstoneEvent},
12    idm::delayed::DelayedAction,
13};
14use tracing::{Instrument, Level};
15
16impl QueryServerReadV1 {
17    #[instrument(
18        level = "info",
19        skip_all,
20        fields(uuid = ?eventid)
21    )]
22    pub(crate) async fn handle_domain_show(
23        &self,
24        eventid: Uuid,
25    ) -> Result<ProtoDomainInfo, OperationError> {
26        let mut idms_prox_read = self.idms.proxy_read().await?;
27
28        idms_prox_read.qs_read.domain_info()
29    }
30
31    #[instrument(
32        level = "info",
33        skip_all,
34        fields(uuid = ?eventid)
35    )]
36    pub(crate) async fn handle_domain_upgrade_check(
37        &self,
38        eventid: Uuid,
39    ) -> Result<ProtoDomainUpgradeCheckReport, OperationError> {
40        let mut idms_prox_read = self.idms.proxy_read().await?;
41
42        idms_prox_read.qs_read.domain_upgrade_check()
43    }
44}
45
46impl QueryServerWriteV1 {
47    #[instrument(
48        level = "info",
49        skip_all,
50        fields(uuid = ?msg.eventid)
51    )]
52    pub async fn handle_purgetombstoneevent(&self, msg: PurgeTombstoneEvent) {
53        let Ok(mut idms_prox_write) = self.idms.proxy_write(duration_from_epoch_now()).await else {
54            warn!("Unable to start purge tombstone event, will retry later");
55            return;
56        };
57
58        let res = idms_prox_write
59            .qs_write
60            .purge_tombstones()
61            .and_then(|_changed| idms_prox_write.commit());
62
63        match res {
64            Ok(()) => {
65                debug!("Purge tombstone success");
66            }
67            Err(err) => {
68                error!(?err, "Unable to purge tombstones");
69            }
70        }
71    }
72
73    #[instrument(
74        level = "info",
75        skip_all,
76        fields(uuid = ?msg.eventid)
77    )]
78    pub async fn handle_purgerecycledevent(&self, msg: PurgeRecycledEvent) {
79        let ct = duration_from_epoch_now();
80        let Ok(mut idms_prox_write) = self.idms.proxy_write(ct).await else {
81            warn!("Unable to start purge recycled event, will retry later");
82            return;
83        };
84        let _ = idms_prox_write
85            .qs_write
86            .purge_recycled()
87            .and_then(|touched| {
88                if touched > 0 {
90                    idms_prox_write.commit()
91                } else {
92                    Ok(())
93                }
94            })
95            .inspect_err(|err| error!(?err, "Unable to purge recycle bin entries"));
96    }
97
98    #[instrument(
99        level = "info",
100        skip_all,
101        fields(uuid = ?msg.eventid)
102    )]
103    pub async fn handle_purge_delete_after_event(&self, msg: PurgeDeleteAfterEvent) {
104        let ct = duration_from_epoch_now();
105        let Ok(mut idms_prox_write) = self.idms.proxy_write(ct).await else {
106            warn!("Unable to start purge delete after event, will retry later");
107            return;
108        };
109        let _ = idms_prox_write
110            .qs_write
111            .purge_delete_after()
112            .and_then(|touched| {
113                if touched > 0 {
115                    idms_prox_write.commit()
116                } else {
117                    Ok(())
118                }
119            })
120            .inspect_err(|err| error!(?err, "Unable to purge delete after entries"));
121    }
122
123    pub(crate) async fn handle_delayedaction(&self, da_batch: &mut Vec<DelayedAction>) {
124        let eventid = Uuid::new_v4();
125        let span = span!(Level::INFO, "process_delayed_action", uuid = ?eventid);
126
127        let mut retry = false;
128
129        async {
130            let ct = duration_from_epoch_now();
131            match self.idms.proxy_write(ct).await {
132                Ok(mut idms_prox_write) => {
133                    for da in da_batch.iter() {
134                        retry = idms_prox_write.process_delayedaction(da, ct).is_err();
135                        if retry {
136                            warn!("delayed action failed, will be retried individually.");
138                            break;
139                        }
140                    }
141
142                    if let Err(res) = idms_prox_write.commit() {
143                        retry = true;
144                        error!(?res, "delayed action batch commit error");
145                    }
146                }
147                Err(err) => {
148                    error!(?err, "unable to process delayed actions");
149                }
150            }
151        }
152        .instrument(span)
153        .await;
154
155        if retry {
156            for da in da_batch.iter() {
158                let eventid = Uuid::new_v4();
159                let span = span!(Level::INFO, "process_delayed_action_retried", uuid = ?eventid);
160
161                async {
162                    let ct = duration_from_epoch_now();
163
164                    match self.idms.proxy_write(ct).await {
165                        Ok(mut idms_prox_write) => {
166                            if let Err(res) = idms_prox_write
167                                .process_delayedaction(da, ct)
168                                .and_then(|_| idms_prox_write.commit())
169                            {
170                                error!(?res, "delayed action commit error");
171                            }
172                        }
173                        Err(err) => {
174                            error!(?err, "unable to process delayed actions");
175                        }
176                    }
177                }
178                .instrument(span)
179                .await
180            }
181        }
182
183        da_batch.clear();
185    }
186
187    #[instrument(
188        level = "info",
189        skip(self, eventid),
190        fields(uuid = ?eventid)
191    )]
192    pub(crate) async fn handle_admin_recover_account(
193        &self,
194        name: String,
195        eventid: Uuid,
196    ) -> Result<String, OperationError> {
197        let ct = duration_from_epoch_now();
198        let mut idms_prox_write = self.idms.proxy_write(ct).await?;
199        let pw = idms_prox_write.recover_account(name.as_str(), None)?;
200
201        idms_prox_write.commit().map(|()| pw)
202    }
203
204    #[instrument(
205        level = "info",
206        skip(self, eventid),
207        fields(uuid = ?eventid)
208    )]
209    pub(crate) async fn handle_admin_disable_account(
210        &self,
211        name: String,
212        eventid: Uuid,
213    ) -> Result<(), OperationError> {
214        let ct = duration_from_epoch_now();
215        let mut idms_prox_write = self.idms.proxy_write(ct).await?;
216        idms_prox_write.disable_account(name.as_str())?;
217
218        idms_prox_write.commit()
219    }
220
221    #[instrument(
222        level = "info",
223        skip_all,
224        fields(uuid = ?eventid)
225    )]
226    pub(crate) async fn handle_domain_raise(&self, eventid: Uuid) -> Result<u32, OperationError> {
227        let ct = duration_from_epoch_now();
228        let mut idms_prox_write = self.idms.proxy_write(ct).await?;
229
230        idms_prox_write.qs_write.domain_raise(DOMAIN_MAX_LEVEL)?;
231
232        idms_prox_write.commit().map(|()| DOMAIN_MAX_LEVEL)
233    }
234
235    #[instrument(
236        level = "info",
237        skip(self, eventid),
238        fields(uuid = ?eventid)
239    )]
240    pub(crate) async fn handle_domain_remigrate(
241        &self,
242        level: Option<u32>,
243        eventid: Uuid,
244    ) -> Result<(), OperationError> {
245        let level = level.unwrap_or(DOMAIN_MIN_REMIGRATION_LEVEL);
246        let ct = duration_from_epoch_now();
247        let mut idms_prox_write = self.idms.proxy_write(ct).await?;
248
249        idms_prox_write.qs_write.domain_remigrate(level)?;
250
251        idms_prox_write.commit()
252    }
253}