1use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
2use crate::repl::ReplCtrl;
3use crate::CoreAction;
4use bytes::{BufMut, BytesMut};
5use futures::{SinkExt, StreamExt};
6use kanidm_lib_crypto::serialise::x509b64;
7use kanidm_utils_users::get_current_uid;
8use serde::{Deserialize, Serialize};
9use std::error::Error;
10use std::io;
11use std::path::Path;
12use tokio::net::{UnixListener, UnixStream};
13use tokio::sync::broadcast;
14use tokio::sync::mpsc;
15use tokio::sync::oneshot;
16use tokio_util::codec::{Decoder, Encoder, Framed};
17use tracing::{span, Instrument, Level};
18use uuid::Uuid;
19
20pub use kanidm_proto::internal::{
21 DomainInfo as ProtoDomainInfo, DomainUpgradeCheckReport as ProtoDomainUpgradeCheckReport,
22 DomainUpgradeCheckStatus as ProtoDomainUpgradeCheckStatus,
23};
24
25#[derive(Serialize, Deserialize, Debug)]
26pub enum AdminTaskRequest {
27 RecoverAccount { name: String },
28 ShowReplicationCertificate,
29 RenewReplicationCertificate,
30 RefreshReplicationConsumer,
31 DomainShow,
32 DomainUpgradeCheck,
33 DomainRaise,
34 DomainRemigrate { level: Option<u32> },
35}
36
37#[derive(Serialize, Deserialize, Debug)]
38pub enum AdminTaskResponse {
39 RecoverAccount {
40 password: String,
41 },
42 ShowReplicationCertificate {
43 cert: String,
44 },
45 DomainUpgradeCheck {
46 report: ProtoDomainUpgradeCheckReport,
47 },
48 DomainRaise {
49 level: u32,
50 },
51 DomainShow {
52 domain_info: ProtoDomainInfo,
53 },
54 Success,
55 Error,
56}
57
58#[derive(Default)]
59pub struct ClientCodec;
60
61impl Decoder for ClientCodec {
62 type Error = io::Error;
63 type Item = AdminTaskResponse;
64
65 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
66 trace!("Attempting to decode request ...");
67 match serde_json::from_slice::<AdminTaskResponse>(src) {
68 Ok(msg) => {
69 src.clear();
71 Ok(Some(msg))
72 }
73 _ => Ok(None),
74 }
75 }
76}
77
78impl Encoder<AdminTaskRequest> for ClientCodec {
79 type Error = io::Error;
80
81 fn encode(&mut self, msg: AdminTaskRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
82 trace!("Attempting to send response -> {:?} ...", msg);
83 let data = serde_json::to_vec(&msg).map_err(|e| {
84 error!("socket encoding error -> {:?}", e);
85 io::Error::new(io::ErrorKind::Other, "JSON encode error")
86 })?;
87 dst.put(data.as_slice());
88 Ok(())
89 }
90}
91
92#[derive(Default)]
93struct ServerCodec;
94
95impl Decoder for ServerCodec {
96 type Error = io::Error;
97 type Item = AdminTaskRequest;
98
99 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
100 trace!("Attempting to decode request ...");
101 match serde_json::from_slice::<AdminTaskRequest>(src) {
102 Ok(msg) => {
103 src.clear();
105 Ok(Some(msg))
106 }
107 _ => Ok(None),
108 }
109 }
110}
111
112impl Encoder<AdminTaskResponse> for ServerCodec {
113 type Error = io::Error;
114
115 fn encode(&mut self, msg: AdminTaskResponse, dst: &mut BytesMut) -> Result<(), Self::Error> {
116 trace!("Attempting to send response -> {:?} ...", msg);
117 let data = serde_json::to_vec(&msg).map_err(|e| {
118 error!("socket encoding error -> {:?}", e);
119 io::Error::new(io::ErrorKind::Other, "JSON encode error")
120 })?;
121 dst.put(data.as_slice());
122 Ok(())
123 }
124}
125
126pub(crate) struct AdminActor;
127
128impl AdminActor {
129 pub async fn create_admin_sock(
130 sock_path: &str,
131 server_rw: &'static QueryServerWriteV1,
132 server_ro: &'static QueryServerReadV1,
133 mut broadcast_rx: broadcast::Receiver<CoreAction>,
134 repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
135 ) -> Result<tokio::task::JoinHandle<()>, ()> {
136 debug!("🧹 Cleaning up sockets from previous invocations");
137 rm_if_exist(sock_path);
138
139 let listener = match UnixListener::bind(sock_path) {
141 Ok(l) => l,
142 Err(e) => {
143 error!(err = ?e, "Failed to bind UNIX socket {}", sock_path);
144 return Err(());
145 }
146 };
147
148 let cuid = get_current_uid();
150
151 let handle = tokio::spawn(async move {
152 loop {
153 tokio::select! {
154 Ok(action) = broadcast_rx.recv() => {
155 match action {
156 CoreAction::Shutdown => break,
157 }
158 }
159 accept_res = listener.accept() => {
160 match accept_res {
161 Ok((socket, _addr)) => {
162 if let Ok(ucred) = socket.peer_cred() {
166 let incoming_uid = ucred.uid();
167 if incoming_uid == 0 || incoming_uid == cuid {
168 info!(pid = ?ucred.pid(), "Allowing admin socket access");
170 } else {
171 warn!(%incoming_uid, "unauthorised user");
172 continue;
173 }
174 } else {
175 error!("unable to determine peer credentials");
176 continue;
177 };
178
179 let task_repl_ctrl_tx = repl_ctrl_tx.clone();
181 tokio::spawn(async move {
182 if let Err(e) = handle_client(socket, server_rw, server_ro, task_repl_ctrl_tx).await {
183 error!(err = ?e, "admin client error");
184 }
185 });
186 }
187 Err(e) => {
188 warn!(err = ?e, "admin socket accept error");
189 }
190 }
191 }
192 }
193 }
194 info!("Stopped {}", super::TaskName::AdminSocket);
195 });
196 Ok(handle)
197 }
198}
199
200fn rm_if_exist(p: &str) {
201 if Path::new(p).exists() {
202 debug!("Removing requested file {:?}", p);
203 let _ = std::fs::remove_file(p).map_err(|e| {
204 error!(
205 "Failure while attempting to attempting to remove {:?} -> {:?}",
206 p, e
207 );
208 });
209 } else {
210 debug!("Path {:?} doesn't exist, not attempting to remove.", p);
211 }
212}
213
214async fn show_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
215 let (tx, rx) = oneshot::channel();
216
217 if ctrl_tx
218 .send(ReplCtrl::GetCertificate { respond: tx })
219 .await
220 .is_err()
221 {
222 error!("replication control channel has shutdown");
223 return AdminTaskResponse::Error;
224 }
225
226 match rx.await {
227 Ok(cert) => x509b64::cert_to_string(&cert)
228 .map(|cert| AdminTaskResponse::ShowReplicationCertificate { cert })
229 .unwrap_or(AdminTaskResponse::Error),
230 Err(_) => {
231 error!("replication control channel did not respond with certificate.");
232 AdminTaskResponse::Error
233 }
234 }
235}
236
237async fn renew_replication_certificate(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
238 let (tx, rx) = oneshot::channel();
239
240 if ctrl_tx
241 .send(ReplCtrl::RenewCertificate { respond: tx })
242 .await
243 .is_err()
244 {
245 error!("replication control channel has shutdown");
246 return AdminTaskResponse::Error;
247 }
248
249 match rx.await {
250 Ok(success) => {
251 if success {
252 show_replication_certificate(ctrl_tx).await
253 } else {
254 error!("replication control channel indicated that certificate renewal failed.");
255 AdminTaskResponse::Error
256 }
257 }
258 Err(_) => {
259 error!("replication control channel did not respond with renewal status.");
260 AdminTaskResponse::Error
261 }
262 }
263}
264
265async fn replication_consumer_refresh(ctrl_tx: &mut mpsc::Sender<ReplCtrl>) -> AdminTaskResponse {
266 let (tx, rx) = oneshot::channel();
267
268 if ctrl_tx
269 .send(ReplCtrl::RefreshConsumer { respond: tx })
270 .await
271 .is_err()
272 {
273 error!("replication control channel has shutdown");
274 return AdminTaskResponse::Error;
275 }
276
277 match rx.await {
278 Ok(mut refresh_rx) => {
279 if let Some(()) = refresh_rx.recv().await {
280 info!("Replication refresh success");
281 AdminTaskResponse::Success
282 } else {
283 error!("Replication refresh failed. Please inspect the logs.");
284 AdminTaskResponse::Error
285 }
286 }
287 Err(_) => {
288 error!("replication control channel did not respond with refresh status.");
289 AdminTaskResponse::Error
290 }
291 }
292}
293
294async fn handle_client(
295 sock: UnixStream,
296 server_rw: &'static QueryServerWriteV1,
297 server_ro: &'static QueryServerReadV1,
298 mut repl_ctrl_tx: Option<mpsc::Sender<ReplCtrl>>,
299) -> Result<(), Box<dyn Error>> {
300 debug!("Accepted admin socket connection");
301
302 let mut reqs = Framed::new(sock, ServerCodec);
303
304 trace!("Waiting for requests ...");
305 while let Some(Ok(req)) = reqs.next().await {
306 let eventid = Uuid::new_v4();
308 let nspan = span!(Level::INFO, "handle_admin_client_request", uuid = ?eventid);
309
310 let resp = async {
311 match req {
312 AdminTaskRequest::RecoverAccount { name } => {
313 match server_rw.handle_admin_recover_account(name, eventid).await {
314 Ok(password) => AdminTaskResponse::RecoverAccount { password },
315 Err(e) => {
316 error!(err = ?e, "error during recover-account");
317 AdminTaskResponse::Error
318 }
319 }
320 }
321 AdminTaskRequest::ShowReplicationCertificate => match repl_ctrl_tx.as_mut() {
322 Some(ctrl_tx) => show_replication_certificate(ctrl_tx).await,
323 None => {
324 error!("replication not configured, unable to display certificate.");
325 AdminTaskResponse::Error
326 }
327 },
328 AdminTaskRequest::RenewReplicationCertificate => match repl_ctrl_tx.as_mut() {
329 Some(ctrl_tx) => renew_replication_certificate(ctrl_tx).await,
330 None => {
331 error!("replication not configured, unable to renew certificate.");
332 AdminTaskResponse::Error
333 }
334 },
335 AdminTaskRequest::RefreshReplicationConsumer => match repl_ctrl_tx.as_mut() {
336 Some(ctrl_tx) => replication_consumer_refresh(ctrl_tx).await,
337 None => {
338 error!("replication not configured, unable to refresh consumer.");
339 AdminTaskResponse::Error
340 }
341 },
342
343 AdminTaskRequest::DomainShow => match server_ro.handle_domain_show(eventid).await {
344 Ok(domain_info) => AdminTaskResponse::DomainShow { domain_info },
345 Err(e) => {
346 error!(err = ?e, "error during domain show");
347 AdminTaskResponse::Error
348 }
349 },
350 AdminTaskRequest::DomainUpgradeCheck => {
351 match server_ro.handle_domain_upgrade_check(eventid).await {
352 Ok(report) => AdminTaskResponse::DomainUpgradeCheck { report },
353 Err(e) => {
354 error!(err = ?e, "error during domain upgrade checkr");
355 AdminTaskResponse::Error
356 }
357 }
358 }
359 AdminTaskRequest::DomainRaise => match server_rw.handle_domain_raise(eventid).await
360 {
361 Ok(level) => AdminTaskResponse::DomainRaise { level },
362 Err(e) => {
363 error!(err = ?e, "error during domain raise");
364 AdminTaskResponse::Error
365 }
366 },
367 AdminTaskRequest::DomainRemigrate { level } => {
368 match server_rw.handle_domain_remigrate(level, eventid).await {
369 Ok(()) => AdminTaskResponse::Success,
370 Err(e) => {
371 error!(err = ?e, "error during domain remigrate");
372 AdminTaskResponse::Error
373 }
374 }
375 }
376 }
377 }
378 .instrument(nspan)
379 .await;
380
381 reqs.send(resp).await?;
382 reqs.flush().await?;
383 }
384
385 debug!("Disconnecting client ...");
386 Ok(())
387}