kanidmd_core/
admin.rs

1use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
2use crate::repl::ReplCtrl;
3use crate::CoreAction;
4use bytes::{BufMut, BytesMut};
5use futures::{SinkExt, StreamExt};
6use kanidm_lib_crypto::serialise::x509b64;
7use kanidm_utils_users::get_current_uid;
8use serde::{Deserialize, Serialize};
9use std::error::Error;
10use std::io;
11use std::path::Path;
12use tokio::net::{UnixListener, UnixStream};
13use tokio::sync::broadcast;
14use tokio::sync::mpsc;
15use tokio::sync::oneshot;
16use tokio_util::codec::{Decoder, Encoder, Framed};
17use tracing::{span, Instrument, Level};
18use uuid::Uuid;
19
20pub use kanidm_proto::internal::{
21    DomainInfo as ProtoDomainInfo, DomainUpgradeCheckReport as ProtoDomainUpgradeCheckReport,
22    DomainUpgradeCheckStatus as ProtoDomainUpgradeCheckStatus,
23};
24
25#[derive(Serialize, Deserialize, Debug)]
26pub enum AdminTaskRequest {
27    RecoverAccount { name: String },
28    DisableAccount { name: String },
29    ShowReplicationCertificate,
30    RenewReplicationCertificate,
31    RefreshReplicationConsumer,
32    DomainShow,
33    DomainUpgradeCheck,
34    DomainRaise,
35    DomainRemigrate { level: Option<u32> },
36}
37
38#[derive(Serialize, Deserialize, Debug)]
39pub enum AdminTaskResponse {
40    RecoverAccount {
41        password: String,
42    },
43    ShowReplicationCertificate {
44        cert: String,
45    },
46    DomainUpgradeCheck {
47        report: ProtoDomainUpgradeCheckReport,
48    },
49    DomainRaise {
50        level: u32,
51    },
52    DomainShow {
53        domain_info: ProtoDomainInfo,
54    },
55    Success,
56    Error,
57}
58
59#[derive(Default)]
60pub struct ClientCodec;
61
62impl Decoder for ClientCodec {
63    type Error = io::Error;
64    type Item = AdminTaskResponse;
65
66    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
67        trace!("Attempting to decode request ...");
68        match serde_json::from_slice::<AdminTaskResponse>(src) {
69            Ok(msg) => {
70                // Clear the buffer for the next message.
71                src.clear();
72                Ok(Some(msg))
73            }
74            _ => Ok(None),
75        }
76    }
77}
78
79impl Encoder<AdminTaskRequest> for ClientCodec {
80    type Error = io::Error;
81
82    fn encode(&mut self, msg: AdminTaskRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
83        trace!("Attempting to send response -> {:?} ...", msg);
84        let data = serde_json::to_vec(&msg).map_err(|e| {
85            error!("socket encoding error -> {:?}", e);
86            io::Error::other("JSON encode error")
87        })?;
88        dst.put(data.as_slice());
89        Ok(())
90    }
91}
92
93#[derive(Default)]
94struct ServerCodec;
95
96impl Decoder for ServerCodec {
97    type Error = io::Error;
98    type Item = AdminTaskRequest;
99
100    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
101        trace!("Attempting to decode request ...");
102        match serde_json::from_slice::<AdminTaskRequest>(src) {
103            Ok(msg) => {
104                // Clear the buffer for the next message.
105                src.clear();
106                Ok(Some(msg))
107            }
108            _ => Ok(None),
109        }
110    }
111}
112
113impl Encoder<AdminTaskResponse> for ServerCodec {
114    type Error = io::Error;
115
116    fn encode(&mut self, msg: AdminTaskResponse, dst: &mut BytesMut) -> Result<(), Self::Error> {
117        trace!("Attempting to send response -> {:?} ...", msg);
118        let data = serde_json::to_vec(&msg).map_err(|e| {
119            error!("socket encoding error -> {:?}", e);
120            io::Error::other("JSON encode error")
121        })?;
122        dst.put(data.as_slice());
123        Ok(())
124    }
125}
126
127pub(crate) struct AdminActor;
128
129impl AdminActor {
130    pub async fn create_admin_sock(
131        sock_path: &str,
132        server_rw: &'static QueryServerWriteV1,
133        server_ro: &'static QueryServerReadV1,
134        mut broadcast_rx: broadcast::Receiver<CoreAction>,
135        repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
136    ) -> Result<tokio::task::JoinHandle<()>, ()> {
137        debug!("🧹 Cleaning up sockets from previous invocations");
138        rm_if_exist(sock_path);
139
140        // Setup the unix socket.
141        let listener = match UnixListener::bind(sock_path) {
142            Ok(l) => l,
143            Err(e) => {
144                error!(err = ?e, "Failed to bind UNIX socket {}", sock_path);
145                return Err(());
146            }
147        };
148
149        // what is the uid we are running as?
150        let cuid = get_current_uid();
151
152        let handle = tokio::spawn(async move {
153            loop {
154                tokio::select! {
155                    Ok(action) = broadcast_rx.recv() => {
156                        match action {
157                            CoreAction::Shutdown => break,
158                        }
159                    }
160                    accept_res = listener.accept() => {
161                        match accept_res {
162                            Ok((socket, _addr)) => {
163                                // Assert that the incoming connection is from root or
164                                // our own uid.
165                                // ⚠️  This underpins the security of this socket ⚠️
166                                if let Ok(ucred) = socket.peer_cred() {
167                                    let incoming_uid = ucred.uid();
168                                    if incoming_uid == 0 || incoming_uid == cuid {
169                                        // all good!
170                                        info!(pid = ?ucred.pid(), "Allowing admin socket access");
171                                    } else {
172                                        warn!(%incoming_uid, "unauthorised user");
173                                        continue;
174                                    }
175                                } else {
176                                    error!("unable to determine peer credentials");
177                                    continue;
178                                };
179
180                                // spawn the worker.
181                                let task_repl_ctrl_tx = repl_ctrl_tx.clone();
182                                tokio::spawn(async move {
183                                    if let Err(e) = handle_client(socket, server_rw, server_ro, task_repl_ctrl_tx).await {
184                                        error!(err = ?e, "admin client error");
185                                    }
186                                });
187                            }
188                            Err(e) => {
189                                warn!(err = ?e, "admin socket accept error");
190                            }
191                        }
192                    }
193                }
194            }
195            info!("Stopped {}", super::TaskName::AdminSocket);
196        });
197        Ok(handle)
198    }
199}
200
201fn rm_if_exist(p: &str) {
202    if Path::new(p).exists() {
203        debug!("Removing requested file {:?}", p);
204        let _ = std::fs::remove_file(p).map_err(|e| {
205            error!(
206                "Failure while attempting to attempting to remove {:?} -> {:?}",
207                p, e
208            );
209        });
210    } else {
211        debug!("Path {:?} doesn't exist, not attempting to remove.", p);
212    }
213}
214
215async fn show_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
216    let (tx, rx) = oneshot::channel();
217
218    if ctrl_tx
219        .send(ReplCtrl::GetCertificate { respond: tx })
220        .await
221        .is_err()
222    {
223        error!("replication control channel has shutdown");
224        return AdminTaskResponse::Error;
225    }
226
227    match rx.await {
228        Ok(cert) => x509b64::cert_to_string(&cert)
229            .map(|cert| AdminTaskResponse::ShowReplicationCertificate { cert })
230            .unwrap_or(AdminTaskResponse::Error),
231        Err(_) => {
232            error!("replication control channel did not respond with certificate.");
233            AdminTaskResponse::Error
234        }
235    }
236}
237
238async fn renew_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
239    let (tx, rx) = oneshot::channel();
240
241    if ctrl_tx
242        .send(ReplCtrl::RenewCertificate { respond: tx })
243        .await
244        .is_err()
245    {
246        error!("replication control channel has shutdown");
247        return AdminTaskResponse::Error;
248    }
249
250    match rx.await {
251        Ok(success) => {
252            if success {
253                show_replication_certificate(ctrl_tx).await
254            } else {
255                error!("replication control channel indicated that certificate renewal failed.");
256                AdminTaskResponse::Error
257            }
258        }
259        Err(_) => {
260            error!("replication control channel did not respond with renewal status.");
261            AdminTaskResponse::Error
262        }
263    }
264}
265
266async fn replication_consumer_refresh(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
267    let (tx, rx) = oneshot::channel();
268
269    if ctrl_tx
270        .send(ReplCtrl::RefreshConsumer { respond: tx })
271        .await
272        .is_err()
273    {
274        error!("replication control channel has shutdown");
275        return AdminTaskResponse::Error;
276    }
277
278    match rx.await {
279        Ok(mut refresh_rx) => {
280            if let Some(()) = refresh_rx.recv().await {
281                info!("Replication refresh success");
282                AdminTaskResponse::Success
283            } else {
284                error!("Replication refresh failed. Please inspect the logs.");
285                AdminTaskResponse::Error
286            }
287        }
288        Err(_) => {
289            error!("replication control channel did not respond with refresh status.");
290            AdminTaskResponse::Error
291        }
292    }
293}
294
295async fn handle_client(
296    sock: UnixStream,
297    server_rw: &'static QueryServerWriteV1,
298    server_ro: &'static QueryServerReadV1,
299    mut repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
300) -> Result<(), Box<dyn Error>> {
301    debug!("Accepted admin socket connection");
302
303    let mut reqs = Framed::new(sock, ServerCodec);
304
305    trace!("Waiting for requests ...");
306    while let Some(Ok(req)) = reqs.next().await {
307        // Setup the logging span
308        let eventid = Uuid::new_v4();
309        let nspan = span!(Level::INFO, "handle_admin_client_request", uuid = ?eventid);
310
311        let resp = async {
312            match req {
313                AdminTaskRequest::RecoverAccount { name } => {
314                    match server_rw.handle_admin_recover_account(name, eventid).await {
315                        Ok(password) => AdminTaskResponse::RecoverAccount { password },
316                        Err(e) => {
317                            error!(err = ?e, "error during recover-account");
318                            AdminTaskResponse::Error
319                        }
320                    }
321                }
322                AdminTaskRequest::DisableAccount { name } => {
323                    match server_rw.handle_admin_disable_account(name, eventid).await {
324                        Ok(()) => AdminTaskResponse::Success,
325                        Err(e) => {
326                            error!(err = ?e, "error during disable-account");
327                            AdminTaskResponse::Error
328                        }
329                    }
330                }
331                AdminTaskRequest::ShowReplicationCertificate => match repl_ctrl_tx.as_mut() {
332                    Some(ctrl_tx) => show_replication_certificate(ctrl_tx).await,
333                    None => {
334                        error!("replication not configured, unable to display certificate.");
335                        AdminTaskResponse::Error
336                    }
337                },
338                AdminTaskRequest::RenewReplicationCertificate => match repl_ctrl_tx.as_mut() {
339                    Some(ctrl_tx) => renew_replication_certificate(ctrl_tx).await,
340                    None => {
341                        error!("replication not configured, unable to renew certificate.");
342                        AdminTaskResponse::Error
343                    }
344                },
345                AdminTaskRequest::RefreshReplicationConsumer => match repl_ctrl_tx.as_mut() {
346                    Some(ctrl_tx) => replication_consumer_refresh(ctrl_tx).await,
347                    None => {
348                        error!("replication not configured, unable to refresh consumer.");
349                        AdminTaskResponse::Error
350                    }
351                },
352
353                AdminTaskRequest::DomainShow => match server_ro.handle_domain_show(eventid).await {
354                    Ok(domain_info) => AdminTaskResponse::DomainShow { domain_info },
355                    Err(e) => {
356                        error!(err = ?e, "error during domain show");
357                        AdminTaskResponse::Error
358                    }
359                },
360                AdminTaskRequest::DomainUpgradeCheck => {
361                    match server_ro.handle_domain_upgrade_check(eventid).await {
362                        Ok(report) => AdminTaskResponse::DomainUpgradeCheck { report },
363                        Err(e) => {
364                            error!(err = ?e, "error during domain upgrade checkr");
365                            AdminTaskResponse::Error
366                        }
367                    }
368                }
369                AdminTaskRequest::DomainRaise => match server_rw.handle_domain_raise(eventid).await
370                {
371                    Ok(level) => AdminTaskResponse::DomainRaise { level },
372                    Err(e) => {
373                        error!(err = ?e, "error during domain raise");
374                        AdminTaskResponse::Error
375                    }
376                },
377                AdminTaskRequest::DomainRemigrate { level } => {
378                    match server_rw.handle_domain_remigrate(level, eventid).await {
379                        Ok(()) => AdminTaskResponse::Success,
380                        Err(e) => {
381                            error!(err = ?e, "error during domain remigrate");
382                            AdminTaskResponse::Error
383                        }
384                    }
385                }
386            }
387        }
388        .instrument(nspan)
389        .await;
390
391        reqs.send(resp).await?;
392        reqs.flush().await?;
393    }
394
395    debug!("Disconnecting client ...");
396    Ok(())
397}