kanidmd_core/
admin.rs

1use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
2use crate::repl::ReplCtrl;
3use crate::CoreAction;
4use bytes::{BufMut, BytesMut};
5use futures::{SinkExt, StreamExt};
6use kanidm_lib_crypto::serialise::x509b64;
7use kanidm_utils_users::get_current_uid;
8use serde::{Deserialize, Serialize};
9use std::error::Error;
10use std::io;
11use std::path::Path;
12use tokio::net::{UnixListener, UnixStream};
13use tokio::sync::broadcast;
14use tokio::sync::mpsc;
15use tokio::sync::oneshot;
16use tokio_util::codec::{Decoder, Encoder, Framed};
17use tracing::{span, Instrument, Level};
18use uuid::Uuid;
19
20pub use kanidm_proto::internal::{
21    DomainInfo as ProtoDomainInfo, DomainUpgradeCheckReport as ProtoDomainUpgradeCheckReport,
22    DomainUpgradeCheckStatus as ProtoDomainUpgradeCheckStatus,
23};
24
25#[derive(Serialize, Deserialize, Debug)]
26pub enum AdminTaskRequest {
27    RecoverAccount { name: String },
28    ShowReplicationCertificate,
29    RenewReplicationCertificate,
30    RefreshReplicationConsumer,
31    DomainShow,
32    DomainUpgradeCheck,
33    DomainRaise,
34    DomainRemigrate { level: Option<u32> },
35}
36
37#[derive(Serialize, Deserialize, Debug)]
38pub enum AdminTaskResponse {
39    RecoverAccount {
40        password: String,
41    },
42    ShowReplicationCertificate {
43        cert: String,
44    },
45    DomainUpgradeCheck {
46        report: ProtoDomainUpgradeCheckReport,
47    },
48    DomainRaise {
49        level: u32,
50    },
51    DomainShow {
52        domain_info: ProtoDomainInfo,
53    },
54    Success,
55    Error,
56}
57
58#[derive(Default)]
59pub struct ClientCodec;
60
61impl Decoder for ClientCodec {
62    type Error = io::Error;
63    type Item = AdminTaskResponse;
64
65    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
66        trace!("Attempting to decode request ...");
67        match serde_json::from_slice::<AdminTaskResponse>(src) {
68            Ok(msg) => {
69                // Clear the buffer for the next message.
70                src.clear();
71                Ok(Some(msg))
72            }
73            _ => Ok(None),
74        }
75    }
76}
77
78impl Encoder<AdminTaskRequest> for ClientCodec {
79    type Error = io::Error;
80
81    fn encode(&mut self, msg: AdminTaskRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
82        trace!("Attempting to send response -> {:?} ...", msg);
83        let data = serde_json::to_vec(&msg).map_err(|e| {
84            error!("socket encoding error -> {:?}", e);
85            io::Error::new(io::ErrorKind::Other, "JSON encode error")
86        })?;
87        dst.put(data.as_slice());
88        Ok(())
89    }
90}
91
92#[derive(Default)]
93struct ServerCodec;
94
95impl Decoder for ServerCodec {
96    type Error = io::Error;
97    type Item = AdminTaskRequest;
98
99    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
100        trace!("Attempting to decode request ...");
101        match serde_json::from_slice::<AdminTaskRequest>(src) {
102            Ok(msg) => {
103                // Clear the buffer for the next message.
104                src.clear();
105                Ok(Some(msg))
106            }
107            _ => Ok(None),
108        }
109    }
110}
111
112impl Encoder<AdminTaskResponse> for ServerCodec {
113    type Error = io::Error;
114
115    fn encode(&mut self, msg: AdminTaskResponse, dst: &mut BytesMut) -> Result<(), Self::Error> {
116        trace!("Attempting to send response -> {:?} ...", msg);
117        let data = serde_json::to_vec(&msg).map_err(|e| {
118            error!("socket encoding error -> {:?}", e);
119            io::Error::new(io::ErrorKind::Other, "JSON encode error")
120        })?;
121        dst.put(data.as_slice());
122        Ok(())
123    }
124}
125
126pub(crate) struct AdminActor;
127
128impl AdminActor {
129    pub async fn create_admin_sock(
130        sock_path: &str,
131        server_rw: &'static QueryServerWriteV1,
132        server_ro: &'static QueryServerReadV1,
133        mut broadcast_rx: broadcast::Receiver<CoreAction>,
134        repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
135    ) -> Result<tokio::task::JoinHandle<()>, ()> {
136        debug!("🧹 Cleaning up sockets from previous invocations");
137        rm_if_exist(sock_path);
138
139        // Setup the unix socket.
140        let listener = match UnixListener::bind(sock_path) {
141            Ok(l) => l,
142            Err(e) => {
143                error!(err = ?e, "Failed to bind UNIX socket {}", sock_path);
144                return Err(());
145            }
146        };
147
148        // what is the uid we are running as?
149        let cuid = get_current_uid();
150
151        let handle = tokio::spawn(async move {
152            loop {
153                tokio::select! {
154                    Ok(action) = broadcast_rx.recv() => {
155                        match action {
156                            CoreAction::Shutdown => break,
157                        }
158                    }
159                    accept_res = listener.accept() => {
160                        match accept_res {
161                            Ok((socket, _addr)) => {
162                                // Assert that the incoming connection is from root or
163                                // our own uid.
164                                // ⚠️  This underpins the security of this socket ⚠️
165                                if let Ok(ucred) = socket.peer_cred() {
166                                    let incoming_uid = ucred.uid();
167                                    if incoming_uid == 0 || incoming_uid == cuid {
168                                        // all good!
169                                        info!(pid = ?ucred.pid(), "Allowing admin socket access");
170                                    } else {
171                                        warn!(%incoming_uid, "unauthorised user");
172                                        continue;
173                                    }
174                                } else {
175                                    error!("unable to determine peer credentials");
176                                    continue;
177                                };
178
179                                // spawn the worker.
180                                let task_repl_ctrl_tx = repl_ctrl_tx.clone();
181                                tokio::spawn(async move {
182                                    if let Err(e) = handle_client(socket, server_rw, server_ro, task_repl_ctrl_tx).await {
183                                        error!(err = ?e, "admin client error");
184                                    }
185                                });
186                            }
187                            Err(e) => {
188                                warn!(err = ?e, "admin socket accept error");
189                            }
190                        }
191                    }
192                }
193            }
194            info!("Stopped {}", super::TaskName::AdminSocket);
195        });
196        Ok(handle)
197    }
198}
199
200fn rm_if_exist(p: &str) {
201    if Path::new(p).exists() {
202        debug!("Removing requested file {:?}", p);
203        let _ = std::fs::remove_file(p).map_err(|e| {
204            error!(
205                "Failure while attempting to attempting to remove {:?} -> {:?}",
206                p, e
207            );
208        });
209    } else {
210        debug!("Path {:?} doesn't exist, not attempting to remove.", p);
211    }
212}
213
214async fn show_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
215    let (tx, rx) = oneshot::channel();
216
217    if ctrl_tx
218        .send(ReplCtrl::GetCertificate { respond: tx })
219        .await
220        .is_err()
221    {
222        error!("replication control channel has shutdown");
223        return AdminTaskResponse::Error;
224    }
225
226    match rx.await {
227        Ok(cert) => x509b64::cert_to_string(&cert)
228            .map(|cert| AdminTaskResponse::ShowReplicationCertificate { cert })
229            .unwrap_or(AdminTaskResponse::Error),
230        Err(_) => {
231            error!("replication control channel did not respond with certificate.");
232            AdminTaskResponse::Error
233        }
234    }
235}
236
237async fn renew_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
238    let (tx, rx) = oneshot::channel();
239
240    if ctrl_tx
241        .send(ReplCtrl::RenewCertificate { respond: tx })
242        .await
243        .is_err()
244    {
245        error!("replication control channel has shutdown");
246        return AdminTaskResponse::Error;
247    }
248
249    match rx.await {
250        Ok(success) => {
251            if success {
252                show_replication_certificate(ctrl_tx).await
253            } else {
254                error!("replication control channel indicated that certificate renewal failed.");
255                AdminTaskResponse::Error
256            }
257        }
258        Err(_) => {
259            error!("replication control channel did not respond with renewal status.");
260            AdminTaskResponse::Error
261        }
262    }
263}
264
265async fn replication_consumer_refresh(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
266    let (tx, rx) = oneshot::channel();
267
268    if ctrl_tx
269        .send(ReplCtrl::RefreshConsumer { respond: tx })
270        .await
271        .is_err()
272    {
273        error!("replication control channel has shutdown");
274        return AdminTaskResponse::Error;
275    }
276
277    match rx.await {
278        Ok(mut refresh_rx) => {
279            if let Some(()) = refresh_rx.recv().await {
280                info!("Replication refresh success");
281                AdminTaskResponse::Success
282            } else {
283                error!("Replication refresh failed. Please inspect the logs.");
284                AdminTaskResponse::Error
285            }
286        }
287        Err(_) => {
288            error!("replication control channel did not respond with refresh status.");
289            AdminTaskResponse::Error
290        }
291    }
292}
293
294async fn handle_client(
295    sock: UnixStream,
296    server_rw: &'static QueryServerWriteV1,
297    server_ro: &'static QueryServerReadV1,
298    mut repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
299) -> Result<(), Box<dyn Error>> {
300    debug!("Accepted admin socket connection");
301
302    let mut reqs = Framed::new(sock, ServerCodec);
303
304    trace!("Waiting for requests ...");
305    while let Some(Ok(req)) = reqs.next().await {
306        // Setup the logging span
307        let eventid = Uuid::new_v4();
308        let nspan = span!(Level::INFO, "handle_admin_client_request", uuid = ?eventid);
309
310        let resp = async {
311            match req {
312                AdminTaskRequest::RecoverAccount { name } => {
313                    match server_rw.handle_admin_recover_account(name, eventid).await {
314                        Ok(password) => AdminTaskResponse::RecoverAccount { password },
315                        Err(e) => {
316                            error!(err = ?e, "error during recover-account");
317                            AdminTaskResponse::Error
318                        }
319                    }
320                }
321                AdminTaskRequest::ShowReplicationCertificate => match repl_ctrl_tx.as_mut() {
322                    Some(ctrl_tx) => show_replication_certificate(ctrl_tx).await,
323                    None => {
324                        error!("replication not configured, unable to display certificate.");
325                        AdminTaskResponse::Error
326                    }
327                },
328                AdminTaskRequest::RenewReplicationCertificate => match repl_ctrl_tx.as_mut() {
329                    Some(ctrl_tx) => renew_replication_certificate(ctrl_tx).await,
330                    None => {
331                        error!("replication not configured, unable to renew certificate.");
332                        AdminTaskResponse::Error
333                    }
334                },
335                AdminTaskRequest::RefreshReplicationConsumer => match repl_ctrl_tx.as_mut() {
336                    Some(ctrl_tx) => replication_consumer_refresh(ctrl_tx).await,
337                    None => {
338                        error!("replication not configured, unable to refresh consumer.");
339                        AdminTaskResponse::Error
340                    }
341                },
342
343                AdminTaskRequest::DomainShow => match server_ro.handle_domain_show(eventid).await {
344                    Ok(domain_info) => AdminTaskResponse::DomainShow { domain_info },
345                    Err(e) => {
346                        error!(err = ?e, "error during domain show");
347                        AdminTaskResponse::Error
348                    }
349                },
350                AdminTaskRequest::DomainUpgradeCheck => {
351                    match server_ro.handle_domain_upgrade_check(eventid).await {
352                        Ok(report) => AdminTaskResponse::DomainUpgradeCheck { report },
353                        Err(e) => {
354                            error!(err = ?e, "error during domain upgrade checkr");
355                            AdminTaskResponse::Error
356                        }
357                    }
358                }
359                AdminTaskRequest::DomainRaise => match server_rw.handle_domain_raise(eventid).await
360                {
361                    Ok(level) => AdminTaskResponse::DomainRaise { level },
362                    Err(e) => {
363                        error!(err = ?e, "error during domain raise");
364                        AdminTaskResponse::Error
365                    }
366                },
367                AdminTaskRequest::DomainRemigrate { level } => {
368                    match server_rw.handle_domain_remigrate(level, eventid).await {
369                        Ok(()) => AdminTaskResponse::Success,
370                        Err(e) => {
371                            error!(err = ?e, "error during domain remigrate");
372                            AdminTaskResponse::Error
373                        }
374                    }
375                }
376            }
377        }
378        .instrument(nspan)
379        .await;
380
381        reqs.send(resp).await?;
382        reqs.flush().await?;
383    }
384
385    debug!("Disconnecting client ...");
386    Ok(())
387}