kanidmd_core/actors/
internal.rs

1//! ⚠️  Operations in this set of actor handlers are INTERNAL and MAY bypass
2//! access controls. Access is *IMPLIED* by the use of these via the internal
3//! admin unixd socket.
4
5use crate::{QueryServerReadV1, QueryServerWriteV1};
6use tracing::{Instrument, Level};
7
8use kanidmd_lib::prelude::*;
9
10use kanidmd_lib::{
11    event::{PurgeRecycledEvent, PurgeTombstoneEvent},
12    idm::delayed::DelayedAction,
13};
14
15use kanidm_proto::internal::{
16    DomainInfo as ProtoDomainInfo, DomainUpgradeCheckReport as ProtoDomainUpgradeCheckReport,
17};
18
19impl QueryServerReadV1 {
20    #[instrument(
21        level = "info",
22        skip_all,
23        fields(uuid = ?eventid)
24    )]
25    pub(crate) async fn handle_domain_show(
26        &self,
27        eventid: Uuid,
28    ) -> Result<ProtoDomainInfo, OperationError> {
29        let mut idms_prox_read = self.idms.proxy_read().await?;
30
31        idms_prox_read.qs_read.domain_info()
32    }
33
34    #[instrument(
35        level = "info",
36        skip_all,
37        fields(uuid = ?eventid)
38    )]
39    pub(crate) async fn handle_domain_upgrade_check(
40        &self,
41        eventid: Uuid,
42    ) -> Result<ProtoDomainUpgradeCheckReport, OperationError> {
43        let mut idms_prox_read = self.idms.proxy_read().await?;
44
45        idms_prox_read.qs_read.domain_upgrade_check()
46    }
47}
48
49impl QueryServerWriteV1 {
50    #[instrument(
51        level = "info",
52        skip_all,
53        fields(uuid = ?msg.eventid)
54    )]
55    pub async fn handle_purgetombstoneevent(&self, msg: PurgeTombstoneEvent) {
56        let Ok(mut idms_prox_write) = self.idms.proxy_write(duration_from_epoch_now()).await else {
57            warn!("Unable to start purge tombstone event, will retry later");
58            return;
59        };
60
61        let res = idms_prox_write
62            .qs_write
63            .purge_tombstones()
64            .and_then(|_changed| idms_prox_write.commit());
65
66        match res {
67            Ok(()) => {
68                debug!("Purge tombstone success");
69            }
70            Err(err) => {
71                error!(?err, "Unable to purge tombstones");
72            }
73        }
74    }
75
76    #[instrument(
77        level = "info",
78        skip_all,
79        fields(uuid = ?msg.eventid)
80    )]
81    pub async fn handle_purgerecycledevent(&self, msg: PurgeRecycledEvent) {
82        let ct = duration_from_epoch_now();
83        let Ok(mut idms_prox_write) = self.idms.proxy_write(ct).await else {
84            warn!("Unable to start purge recycled event, will retry later");
85            return;
86        };
87        let res = idms_prox_write
88            .qs_write
89            .purge_recycled()
90            .and_then(|touched| {
91                // don't need to commit a txn with no changes
92                if touched > 0 {
93                    idms_prox_write.commit()
94                } else {
95                    Ok(())
96                }
97            });
98
99        match res {
100            Ok(()) => {
101                debug!("Purge recyclebin success");
102            }
103            Err(err) => {
104                error!(?err, "Unable to purge recyclebin");
105            }
106        }
107    }
108
109    pub(crate) async fn handle_delayedaction(&self, da_batch: &mut Vec<DelayedAction>) {
110        let eventid = Uuid::new_v4();
111        let span = span!(Level::INFO, "process_delayed_action", uuid = ?eventid);
112
113        let mut retry = false;
114
115        async {
116            let ct = duration_from_epoch_now();
117            match self.idms.proxy_write(ct).await {
118                Ok(mut idms_prox_write) => {
119                    for da in da_batch.iter() {
120                        retry = idms_prox_write.process_delayedaction(da, ct).is_err();
121                        if retry {
122                            // exit the loop
123                            warn!("delayed action failed, will be retried individually.");
124                            break;
125                        }
126                    }
127
128                    if let Err(res) = idms_prox_write.commit() {
129                        retry = true;
130                        error!(?res, "delayed action batch commit error");
131                    }
132                }
133                Err(err) => {
134                    error!(?err, "unable to process delayed actions");
135                }
136            }
137        }
138        .instrument(span)
139        .await;
140
141        if retry {
142            // An error occurred, retry each operation one at a time.
143            for da in da_batch.iter() {
144                let eventid = Uuid::new_v4();
145                let span = span!(Level::INFO, "process_delayed_action_retried", uuid = ?eventid);
146
147                async {
148                    let ct = duration_from_epoch_now();
149
150                    match self.idms.proxy_write(ct).await {
151                        Ok(mut idms_prox_write) => {
152                            if let Err(res) = idms_prox_write
153                                .process_delayedaction(da, ct)
154                                .and_then(|_| idms_prox_write.commit())
155                            {
156                                error!(?res, "delayed action commit error");
157                            }
158                        }
159                        Err(err) => {
160                            error!(?err, "unable to process delayed actions");
161                        }
162                    }
163                }
164                .instrument(span)
165                .await
166            }
167        }
168
169        // We're done, clear out the buffer.
170        da_batch.clear();
171    }
172
173    #[instrument(
174        level = "info",
175        skip(self, eventid),
176        fields(uuid = ?eventid)
177    )]
178    pub(crate) async fn handle_admin_recover_account(
179        &self,
180        name: String,
181        eventid: Uuid,
182    ) -> Result<String, OperationError> {
183        let ct = duration_from_epoch_now();
184        let mut idms_prox_write = self.idms.proxy_write(ct).await?;
185        let pw = idms_prox_write.recover_account(name.as_str(), None)?;
186
187        idms_prox_write.commit().map(|()| pw)
188    }
189
190    #[instrument(
191        level = "info",
192        skip_all,
193        fields(uuid = ?eventid)
194    )]
195    pub(crate) async fn handle_domain_raise(&self, eventid: Uuid) -> Result<u32, OperationError> {
196        let ct = duration_from_epoch_now();
197        let mut idms_prox_write = self.idms.proxy_write(ct).await?;
198
199        idms_prox_write.qs_write.domain_raise(DOMAIN_MAX_LEVEL)?;
200
201        idms_prox_write.commit().map(|()| DOMAIN_MAX_LEVEL)
202    }
203
204    #[instrument(
205        level = "info",
206        skip(self, eventid),
207        fields(uuid = ?eventid)
208    )]
209    pub(crate) async fn handle_domain_remigrate(
210        &self,
211        level: Option<u32>,
212        eventid: Uuid,
213    ) -> Result<(), OperationError> {
214        let level = level.unwrap_or(DOMAIN_MIN_REMIGRATION_LEVEL);
215        let ct = duration_from_epoch_now();
216        let mut idms_prox_write = self.idms.proxy_write(ct).await?;
217
218        idms_prox_write.qs_write.domain_remigrate(level)?;
219
220        idms_prox_write.commit()
221    }
222}