1use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
2use crate::repl::ReplCtrl;
3use crate::CoreAction;
4use bytes::{BufMut, BytesMut};
5use futures::{SinkExt, StreamExt};
6use kanidm_lib_crypto::serialise::x509b64;
7use kanidm_utils_users::get_current_uid;
8use serde::{Deserialize, Serialize};
9use std::error::Error;
10use std::io;
11use std::path::Path;
12use tokio::net::{UnixListener, UnixStream};
13use tokio::sync::broadcast;
14use tokio::sync::mpsc;
15use tokio::sync::oneshot;
16use tokio_util::codec::{Decoder, Encoder, Framed};
17use tracing::{span, Instrument, Level};
18use uuid::Uuid;
19
20pub use kanidm_proto::internal::{
21 DomainInfo as ProtoDomainInfo, DomainUpgradeCheckReport as ProtoDomainUpgradeCheckReport,
22 DomainUpgradeCheckStatus as ProtoDomainUpgradeCheckStatus,
23};
24
25#[derive(Serialize, Deserialize, Debug)]
26pub enum AdminTaskRequest {
27 RecoverAccount { name: String },
28 DisableAccount { name: String },
29 ShowReplicationCertificate,
30 RenewReplicationCertificate,
31 RefreshReplicationConsumer,
32 DomainShow,
33 DomainUpgradeCheck,
34 DomainRaise,
35 DomainRemigrate { level: Option<u32> },
36}
37
38#[derive(Serialize, Deserialize, Debug)]
39pub enum AdminTaskResponse {
40 RecoverAccount {
41 password: String,
42 },
43 ShowReplicationCertificate {
44 cert: String,
45 },
46 DomainUpgradeCheck {
47 report: ProtoDomainUpgradeCheckReport,
48 },
49 DomainRaise {
50 level: u32,
51 },
52 DomainShow {
53 domain_info: ProtoDomainInfo,
54 },
55 Success,
56 Error,
57}
58
59#[derive(Default)]
60pub struct ClientCodec;
61
62impl Decoder for ClientCodec {
63 type Error = io::Error;
64 type Item = AdminTaskResponse;
65
66 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
67 trace!("Attempting to decode request ...");
68 match serde_json::from_slice::<AdminTaskResponse>(src) {
69 Ok(msg) => {
70 src.clear();
72 Ok(Some(msg))
73 }
74 _ => Ok(None),
75 }
76 }
77}
78
79impl Encoder<AdminTaskRequest> for ClientCodec {
80 type Error = io::Error;
81
82 fn encode(&mut self, msg: AdminTaskRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
83 trace!("Attempting to send response -> {:?} ...", msg);
84 let data = serde_json::to_vec(&msg).map_err(|e| {
85 error!("socket encoding error -> {:?}", e);
86 io::Error::other("JSON encode error")
87 })?;
88 dst.put(data.as_slice());
89 Ok(())
90 }
91}
92
93#[derive(Default)]
94struct ServerCodec;
95
96impl Decoder for ServerCodec {
97 type Error = io::Error;
98 type Item = AdminTaskRequest;
99
100 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
101 trace!("Attempting to decode request ...");
102 match serde_json::from_slice::<AdminTaskRequest>(src) {
103 Ok(msg) => {
104 src.clear();
106 Ok(Some(msg))
107 }
108 _ => Ok(None),
109 }
110 }
111}
112
113impl Encoder<AdminTaskResponse> for ServerCodec {
114 type Error = io::Error;
115
116 fn encode(&mut self, msg: AdminTaskResponse, dst: &mut BytesMut) -> Result<(), Self::Error> {
117 trace!("Attempting to send response -> {:?} ...", msg);
118 let data = serde_json::to_vec(&msg).map_err(|e| {
119 error!("socket encoding error -> {:?}", e);
120 io::Error::other("JSON encode error")
121 })?;
122 dst.put(data.as_slice());
123 Ok(())
124 }
125}
126
127pub(crate) struct AdminActor;
128
129impl AdminActor {
130 pub async fn create_admin_sock(
131 sock_path: &str,
132 server_rw: &'static QueryServerWriteV1,
133 server_ro: &'static QueryServerReadV1,
134 mut broadcast_rx: broadcast::Receiver<CoreAction>,
135 repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
136 ) -> Result<tokio::task::JoinHandle<()>, ()> {
137 debug!("🧹 Cleaning up sockets from previous invocations");
138 rm_if_exist(sock_path);
139
140 let listener = match UnixListener::bind(sock_path) {
142 Ok(l) => l,
143 Err(e) => {
144 error!(err = ?e, "Failed to bind UNIX socket {}", sock_path);
145 return Err(());
146 }
147 };
148
149 let cuid = get_current_uid();
151
152 let handle = tokio::spawn(async move {
153 loop {
154 tokio::select! {
155 Ok(action) = broadcast_rx.recv() => {
156 match action {
157 CoreAction::Shutdown => break,
158 }
159 }
160 accept_res = listener.accept() => {
161 match accept_res {
162 Ok((socket, _addr)) => {
163 if let Ok(ucred) = socket.peer_cred() {
167 let incoming_uid = ucred.uid();
168 if incoming_uid == 0 || incoming_uid == cuid {
169 info!(pid = ?ucred.pid(), "Allowing admin socket access");
171 } else {
172 warn!(%incoming_uid, "unauthorised user");
173 continue;
174 }
175 } else {
176 error!("unable to determine peer credentials");
177 continue;
178 };
179
180 let task_repl_ctrl_tx = repl_ctrl_tx.clone();
182 tokio::spawn(async move {
183 if let Err(e) = handle_client(socket, server_rw, server_ro, task_repl_ctrl_tx).await {
184 error!(err = ?e, "admin client error");
185 }
186 });
187 }
188 Err(e) => {
189 warn!(err = ?e, "admin socket accept error");
190 }
191 }
192 }
193 }
194 }
195 info!("Stopped {}", super::TaskName::AdminSocket);
196 });
197 Ok(handle)
198 }
199}
200
201fn rm_if_exist(p: &str) {
202 if Path::new(p).exists() {
203 debug!("Removing requested file {:?}", p);
204 let _ = std::fs::remove_file(p).map_err(|e| {
205 error!(
206 "Failure while attempting to attempting to remove {:?} -> {:?}",
207 p, e
208 );
209 });
210 } else {
211 debug!("Path {:?} doesn't exist, not attempting to remove.", p);
212 }
213}
214
215async fn show_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
216 let (tx, rx) = oneshot::channel();
217
218 if ctrl_tx
219 .send(ReplCtrl::GetCertificate { respond: tx })
220 .await
221 .is_err()
222 {
223 error!("replication control channel has shutdown");
224 return AdminTaskResponse::Error;
225 }
226
227 match rx.await {
228 Ok(cert) => x509b64::cert_to_string(&cert)
229 .map(|cert| AdminTaskResponse::ShowReplicationCertificate { cert })
230 .unwrap_or(AdminTaskResponse::Error),
231 Err(_) => {
232 error!("replication control channel did not respond with certificate.");
233 AdminTaskResponse::Error
234 }
235 }
236}
237
238async fn renew_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
239 let (tx, rx) = oneshot::channel();
240
241 if ctrl_tx
242 .send(ReplCtrl::RenewCertificate { respond: tx })
243 .await
244 .is_err()
245 {
246 error!("replication control channel has shutdown");
247 return AdminTaskResponse::Error;
248 }
249
250 match rx.await {
251 Ok(success) => {
252 if success {
253 show_replication_certificate(ctrl_tx).await
254 } else {
255 error!("replication control channel indicated that certificate renewal failed.");
256 AdminTaskResponse::Error
257 }
258 }
259 Err(_) => {
260 error!("replication control channel did not respond with renewal status.");
261 AdminTaskResponse::Error
262 }
263 }
264}
265
266async fn replication_consumer_refresh(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
267 let (tx, rx) = oneshot::channel();
268
269 if ctrl_tx
270 .send(ReplCtrl::RefreshConsumer { respond: tx })
271 .await
272 .is_err()
273 {
274 error!("replication control channel has shutdown");
275 return AdminTaskResponse::Error;
276 }
277
278 match rx.await {
279 Ok(mut refresh_rx) => {
280 if let Some(()) = refresh_rx.recv().await {
281 info!("Replication refresh success");
282 AdminTaskResponse::Success
283 } else {
284 error!("Replication refresh failed. Please inspect the logs.");
285 AdminTaskResponse::Error
286 }
287 }
288 Err(_) => {
289 error!("replication control channel did not respond with refresh status.");
290 AdminTaskResponse::Error
291 }
292 }
293}
294
295async fn handle_client(
296 sock: UnixStream,
297 server_rw: &'static QueryServerWriteV1,
298 server_ro: &'static QueryServerReadV1,
299 mut repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
300) -> Result<(), Box<dyn Error>> {
301 debug!("Accepted admin socket connection");
302
303 let mut reqs = Framed::new(sock, ServerCodec);
304
305 trace!("Waiting for requests ...");
306 while let Some(Ok(req)) = reqs.next().await {
307 let eventid = Uuid::new_v4();
309 let nspan = span!(Level::INFO, "handle_admin_client_request", uuid = ?eventid);
310
311 let resp = async {
312 match req {
313 AdminTaskRequest::RecoverAccount { name } => {
314 match server_rw.handle_admin_recover_account(name, eventid).await {
315 Ok(password) => AdminTaskResponse::RecoverAccount { password },
316 Err(e) => {
317 error!(err = ?e, "error during recover-account");
318 AdminTaskResponse::Error
319 }
320 }
321 }
322 AdminTaskRequest::DisableAccount { name } => {
323 match server_rw.handle_admin_disable_account(name, eventid).await {
324 Ok(()) => AdminTaskResponse::Success,
325 Err(e) => {
326 error!(err = ?e, "error during disable-account");
327 AdminTaskResponse::Error
328 }
329 }
330 }
331 AdminTaskRequest::ShowReplicationCertificate => match repl_ctrl_tx.as_mut() {
332 Some(ctrl_tx) => show_replication_certificate(ctrl_tx).await,
333 None => {
334 error!("replication not configured, unable to display certificate.");
335 AdminTaskResponse::Error
336 }
337 },
338 AdminTaskRequest::RenewReplicationCertificate => match repl_ctrl_tx.as_mut() {
339 Some(ctrl_tx) => renew_replication_certificate(ctrl_tx).await,
340 None => {
341 error!("replication not configured, unable to renew certificate.");
342 AdminTaskResponse::Error
343 }
344 },
345 AdminTaskRequest::RefreshReplicationConsumer => match repl_ctrl_tx.as_mut() {
346 Some(ctrl_tx) => replication_consumer_refresh(ctrl_tx).await,
347 None => {
348 error!("replication not configured, unable to refresh consumer.");
349 AdminTaskResponse::Error
350 }
351 },
352
353 AdminTaskRequest::DomainShow => match server_ro.handle_domain_show(eventid).await {
354 Ok(domain_info) => AdminTaskResponse::DomainShow { domain_info },
355 Err(e) => {
356 error!(err = ?e, "error during domain show");
357 AdminTaskResponse::Error
358 }
359 },
360 AdminTaskRequest::DomainUpgradeCheck => {
361 match server_ro.handle_domain_upgrade_check(eventid).await {
362 Ok(report) => AdminTaskResponse::DomainUpgradeCheck { report },
363 Err(e) => {
364 error!(err = ?e, "error during domain upgrade checkr");
365 AdminTaskResponse::Error
366 }
367 }
368 }
369 AdminTaskRequest::DomainRaise => match server_rw.handle_domain_raise(eventid).await
370 {
371 Ok(level) => AdminTaskResponse::DomainRaise { level },
372 Err(e) => {
373 error!(err = ?e, "error during domain raise");
374 AdminTaskResponse::Error
375 }
376 },
377 AdminTaskRequest::DomainRemigrate { level } => {
378 match server_rw.handle_domain_remigrate(level, eventid).await {
379 Ok(()) => AdminTaskResponse::Success,
380 Err(e) => {
381 error!(err = ?e, "error during domain remigrate");
382 AdminTaskResponse::Error
383 }
384 }
385 }
386 }
387 }
388 .instrument(nspan)
389 .await;
390
391 reqs.send(resp).await?;
392 reqs.flush().await?;
393 }
394
395 debug!("Disconnecting client ...");
396 Ok(())
397}