kanidm_unixd_tasks/
kanidm_unixd_tasks.rs

1#![deny(warnings)]
2#![warn(unused_extern_crates)]
3#![deny(clippy::todo)]
4#![deny(clippy::unimplemented)]
5#![deny(clippy::unwrap_used)]
6#![deny(clippy::expect_used)]
7#![deny(clippy::panic)]
8#![deny(clippy::unreachable)]
9#![deny(clippy::await_holding_lock)]
10#![deny(clippy::needless_pass_by_value)]
11#![deny(clippy::trivially_copy_pass_by_ref)]
12
13use bytes::{BufMut, BytesMut};
14use futures::{SinkExt, StreamExt};
15use kanidm_unix_common::constants::DEFAULT_CONFIG_PATH;
16use kanidm_unix_common::unix_config::UnixdConfig;
17use kanidm_unix_common::unix_passwd::{parse_etc_group, parse_etc_passwd, parse_etc_shadow, EtcDb};
18use kanidm_unix_common::unix_proto::{
19    HomeDirectoryInfo, TaskRequest, TaskRequestFrame, TaskResponse,
20};
21use kanidm_utils_users::{get_effective_gid, get_effective_uid};
22use libc::{lchown, umask};
23use notify_debouncer_full::notify::RecommendedWatcher;
24use notify_debouncer_full::Debouncer;
25use notify_debouncer_full::RecommendedCache;
26use notify_debouncer_full::{new_debouncer, notify::RecursiveMode, DebouncedEvent};
27use sketching::tracing_forest::traits::*;
28use sketching::tracing_forest::util::*;
29use sketching::tracing_forest::{self};
30use std::ffi::CString;
31use std::os::unix::ffi::OsStrExt;
32use std::os::unix::fs::symlink;
33use std::path::{Path, PathBuf};
34use std::process::ExitCode;
35use std::time::Duration;
36use std::{fs, io};
37use tokio::fs::File;
38use tokio::io::AsyncReadExt;
39use tokio::net::UnixStream;
40use tokio::sync::broadcast;
41use tokio::sync::watch;
42use tokio::time;
43use tokio_util::codec::{Decoder, Encoder, Framed};
44use tracing::instrument;
45use walkdir::WalkDir;
46
47#[cfg(all(target_family = "unix", feature = "selinux"))]
48use kanidm_unix_common::selinux_util;
49
50struct TaskCodec;
51
52impl Decoder for TaskCodec {
53    type Error = io::Error;
54    type Item = TaskRequestFrame;
55
56    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
57        match serde_json::from_slice::<TaskRequestFrame>(src) {
58            Ok(msg) => {
59                // Clear the buffer for the next message.
60                src.clear();
61                Ok(Some(msg))
62            }
63            _ => Ok(None),
64        }
65    }
66}
67
68impl Encoder<TaskResponse> for TaskCodec {
69    type Error = io::Error;
70
71    fn encode(&mut self, msg: TaskResponse, dst: &mut BytesMut) -> Result<(), Self::Error> {
72        debug!("Attempting to send request -> {:?} ...", msg);
73        let data = serde_json::to_vec(&msg).map_err(|e| {
74            error!("socket encoding error -> {:?}", e);
75            io::Error::new(io::ErrorKind::Other, "JSON encode error")
76        })?;
77        dst.put(data.as_slice());
78        Ok(())
79    }
80}
81
82impl TaskCodec {
83    fn new() -> Self {
84        TaskCodec
85    }
86}
87
88fn chown(path: &Path, gid: u32) -> Result<(), String> {
89    let path_os = CString::new(path.as_os_str().as_bytes())
90        .map_err(|_| "Unable to create c-string".to_string())?;
91
92    // Change the owner to the gid - remember, kanidm ONLY has gid's, the uid is implied.
93    if unsafe { lchown(path_os.as_ptr(), gid, gid) } != 0 {
94        return Err("Unable to set ownership".to_string());
95    }
96    Ok(())
97}
98
99fn create_home_directory(
100    info: &HomeDirectoryInfo,
101    home_prefix_path: &Path,
102    home_mount_prefix_path: Option<&PathBuf>,
103    use_etc_skel: bool,
104    use_selinux: bool,
105) -> Result<(), String> {
106    // Final sanity check to prevent certain classes of attacks. This should *never*
107    // be possible, but we assert this to be sure.
108    let name = info.name.trim_start_matches('.').replace(['/', '\\'], "");
109
110    debug!(?home_prefix_path, ?home_mount_prefix_path, ?info);
111
112    // This is where the users home dir "is" and aliases from here go to the true storage
113    // mounts
114    let home_prefix_path = home_prefix_path
115        .canonicalize()
116        .map_err(|e| format!("{:?}", e))?;
117
118    // This is where the storage is *mounted*. If not set, falls back to the home_prefix.
119    let home_mount_prefix_path = home_mount_prefix_path
120        .unwrap_or(&home_prefix_path)
121        .canonicalize()
122        .map_err(|e| format!("{:?}", e))?;
123
124    // Does our home_prefix actually exist?
125    if !home_prefix_path.exists() || !home_prefix_path.is_dir() || !home_prefix_path.is_absolute() {
126        return Err("Invalid home_prefix from configuration - home_prefix path must exist, must be a directory, and must be absolute (not relative)".to_string());
127    }
128
129    if !home_mount_prefix_path.exists()
130        || !home_mount_prefix_path.is_dir()
131        || !home_mount_prefix_path.is_absolute()
132    {
133        return Err("Invalid home_mount_prefix from configuration - home_prefix path must exist, must be a directory, and must be absolute (not relative)".to_string());
134    }
135
136    // This is now creating the actual home directory in the home_mount path.
137    // First we want to validate that the path is legitimate and hasn't tried
138    // to escape the home_mount prefix.
139    let hd_mount_path = Path::join(&home_mount_prefix_path, &name);
140
141    debug!(?hd_mount_path);
142
143    if let Some(pp) = hd_mount_path.parent() {
144        if pp != home_mount_prefix_path {
145            return Err("Invalid home directory name - not within home_mount_prefix".to_string());
146        }
147    } else {
148        return Err("Invalid/Corrupt home directory path - no prefix found".to_string());
149    }
150
151    // Get a handle to the SELinux labeling interface
152    debug!(?use_selinux, "selinux for home dir labeling");
153    #[cfg(all(target_family = "unix", feature = "selinux"))]
154    let labeler = if use_selinux {
155        selinux_util::SelinuxLabeler::new(info.gid, &home_mount_prefix_path)?
156    } else {
157        selinux_util::SelinuxLabeler::new_noop()
158    };
159
160    // Does the home directory exist? This is checking the *true* home mount storage.
161    if !hd_mount_path.exists() {
162        // Set the SELinux security context for file creation
163        #[cfg(all(target_family = "unix", feature = "selinux"))]
164        labeler.do_setfscreatecon_for_path()?;
165
166        // Set a umask
167        let before = unsafe { umask(0o0027) };
168
169        // Create the dir
170        if let Err(e) = fs::create_dir_all(&hd_mount_path) {
171            let _ = unsafe { umask(before) };
172            error!(err = ?e, ?hd_mount_path, "Unable to create directory");
173            return Err(format!("{:?}", e));
174        }
175        let _ = unsafe { umask(before) };
176
177        chown(&hd_mount_path, info.gid)?;
178
179        // Copy in structure from /etc/skel/ if present
180        let skel_dir = Path::new("/etc/skel/");
181        if use_etc_skel && skel_dir.exists() {
182            info!("preparing homedir using /etc/skel");
183            for entry in WalkDir::new(skel_dir).into_iter().filter_map(|e| e.ok()) {
184                let dest = &hd_mount_path.join(
185                    entry
186                        .path()
187                        .strip_prefix(skel_dir)
188                        .map_err(|e| e.to_string())?,
189                );
190
191                #[cfg(all(target_family = "unix", feature = "selinux"))]
192                {
193                    let p = entry
194                        .path()
195                        .strip_prefix(skel_dir)
196                        .map_err(|e| e.to_string())?;
197                    labeler.label_path(p)?;
198                }
199
200                if entry.path().is_dir() {
201                    fs::create_dir_all(dest).map_err(|e| {
202                        error!(err = ?e, ?dest, "Unable to create directory from /etc/skel");
203                        e.to_string()
204                    })?;
205                } else {
206                    fs::copy(entry.path(), dest).map_err(|e| {
207                        error!(err = ?e, ?dest, "Unable to copy from /etc/skel");
208                        e.to_string()
209                    })?;
210                }
211                chown(dest, info.gid)?;
212
213                // Create equivalence rule in the SELinux policy
214                #[cfg(all(target_family = "unix", feature = "selinux"))]
215                labeler.setup_equivalence_rule(&hd_mount_path)?;
216            }
217        }
218    }
219
220    // Reset object creation SELinux context to default
221    #[cfg(all(target_family = "unix", feature = "selinux"))]
222    labeler.set_default_context_for_fs_objects()?;
223
224    // Do the aliases exist?
225    for alias in info.aliases.iter() {
226        // Sanity check the alias.
227        // let alias = alias.replace(".", "").replace("/", "").replace("\\", "");
228        let alias = alias.trim_start_matches('.').replace(['/', '\\'], "");
229
230        let alias_path = Path::join(&home_prefix_path, &alias);
231
232        // Assert the resulting alias path is consistent and correct within the home_prefix.
233        if let Some(pp) = alias_path.parent() {
234            if pp != home_prefix_path {
235                return Err("Invalid home directory alias - not within home_prefix".to_string());
236            }
237        } else {
238            return Err("Invalid/Corrupt alias directory path - no prefix found".to_string());
239        }
240
241        if alias_path.exists() {
242            debug!("checking symlink {:?} -> {:?}", alias_path, hd_mount_path);
243            let attr = match fs::symlink_metadata(&alias_path) {
244                Ok(a) => a,
245                Err(e) => {
246                    error!(err = ?e, ?alias_path, "Unable to read alias path metadata");
247                    return Err(format!("{:?}", e));
248                }
249            };
250
251            if attr.file_type().is_symlink() {
252                // Probably need to update it.
253                if let Err(e) = fs::remove_file(&alias_path) {
254                    error!(err = ?e, ?alias_path, "Unable to remove existing alias path");
255                    return Err(format!("{:?}", e));
256                }
257
258                debug!("updating symlink {:?} -> {:?}", alias_path, hd_mount_path);
259                if let Err(e) = symlink(&hd_mount_path, &alias_path) {
260                    error!(err = ?e, ?alias_path, "Unable to update alias path");
261                    return Err(format!("{:?}", e));
262                }
263            } else {
264                warn!(
265                    ?alias_path,
266                    ?hd_mount_path,
267                    "home directory alias path is not a symlink, unable to update"
268                );
269            }
270        } else {
271            // Does not exist. Create.
272            debug!("creating symlink {:?} -> {:?}", alias_path, hd_mount_path);
273            if let Err(e) = symlink(&hd_mount_path, &alias_path) {
274                error!(err = ?e, ?alias_path, "Unable to create alias path");
275                return Err(format!("{:?}", e));
276            }
277        }
278    }
279    Ok(())
280}
281
282async fn shadow_reload_task(
283    shadow_data_watch_tx: watch::Sender<EtcDb>,
284    mut shadow_broadcast_rx: broadcast::Receiver<bool>,
285) {
286    debug!("shadow reload task has started ...");
287
288    while shadow_broadcast_rx.recv().await.is_ok() {
289        match process_etc_passwd_group().await {
290            Ok(etc_db) => {
291                shadow_data_watch_tx.send_replace(etc_db);
292                debug!("shadow reload task sent");
293            }
294            Err(()) => {
295                error!("Unable to process etc db");
296                continue;
297            }
298        }
299    }
300
301    debug!("shadow reload task has stopped");
302}
303
304async fn handle_tasks(
305    stream: UnixStream,
306    ctl_broadcast_rx: &mut broadcast::Receiver<bool>,
307    shadow_data_watch_rx: &mut watch::Receiver<EtcDb>,
308    cfg: &UnixdConfig,
309) {
310    let mut reqs = Framed::new(stream, TaskCodec::new());
311
312    debug!("task handler has started ...");
313
314    // Immediately trigger that we should reload the shadow files for the new connected handler
315    shadow_data_watch_rx.mark_changed();
316
317    loop {
318        tokio::select! {
319            _ = ctl_broadcast_rx.recv() => {
320                break;
321            }
322            request = reqs.next() => {
323                match request {
324                    Some(Ok(TaskRequestFrame {
325                        id,
326                        req: TaskRequest::HomeDirectory(info),
327                    })) => {
328                        debug!("Received task -> HomeDirectory({:?})", info);
329
330                        let resp = match create_home_directory(
331                            &info,
332                            cfg.home_prefix.as_ref(),
333                            cfg.home_mount_prefix.as_ref(),
334                            cfg.use_etc_skel,
335                            cfg.selinux,
336                        ) {
337                            Ok(()) => TaskResponse::Success(id),
338                            Err(msg) => TaskResponse::Error(msg),
339                        };
340
341                        // Now send a result.
342                        if let Err(err) = reqs.send(resp).await {
343                            error!(?err, "Unable to communicate to kanidm unixd");
344                            break;
345                        }
346                        // All good, loop.
347                    }
348                    other => {
349                        error!("Error -> {:?}", other);
350                        break;
351                    }
352                }
353            }
354            Ok(_) = shadow_data_watch_rx.changed() => {
355                debug!("Received shadow reload event.");
356                let etc_db: EtcDb = {
357                    let etc_db_ref = shadow_data_watch_rx.borrow_and_update();
358                    (*etc_db_ref).clone()
359                };
360                // process etc shadow and send it here.
361                let resp = TaskResponse::NotifyShadowChange(etc_db);
362                if let Err(err) = reqs.send(resp).await {
363                    error!(?err, "Unable to communicate to kanidm unixd");
364                    break;
365                }
366                debug!("Shadow reload OK!");
367            }
368        }
369    }
370
371    info!("Disconnected from kanidm_unixd ...");
372}
373
374#[instrument(level = "debug", skip_all)]
375async fn process_etc_passwd_group() -> Result<EtcDb, ()> {
376    let mut file = File::open("/etc/passwd").await.map_err(|err| {
377        error!(?err);
378    })?;
379    let mut contents = vec![];
380    file.read_to_end(&mut contents).await.map_err(|err| {
381        error!(?err);
382    })?;
383
384    let users = parse_etc_passwd(contents.as_slice())
385        .map_err(|_| "Invalid passwd content")
386        .map_err(|err| {
387            error!(?err);
388        })?;
389
390    let mut file = File::open("/etc/shadow").await.map_err(|err| {
391        error!(?err);
392    })?;
393    let mut contents = vec![];
394    file.read_to_end(&mut contents).await.map_err(|err| {
395        error!(?err);
396    })?;
397
398    let shadow = parse_etc_shadow(contents.as_slice())
399        .map_err(|_| "Invalid passwd content")
400        .map_err(|err| {
401            error!(?err);
402        })?;
403
404    let mut file = File::open("/etc/group").await.map_err(|err| {
405        error!(?err);
406    })?;
407    let mut contents = vec![];
408    file.read_to_end(&mut contents).await.map_err(|err| {
409        error!(?err);
410    })?;
411
412    let groups = parse_etc_group(contents.as_slice())
413        .map_err(|_| "Invalid group content")
414        .map_err(|err| {
415            error!(?err);
416        })?;
417
418    Ok(EtcDb {
419        users,
420        shadow,
421        groups,
422    })
423}
424
425fn setup_shadow_inotify_watcher(
426    shadow_broadcast_tx: broadcast::Sender<bool>,
427) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, ExitCode> {
428    let watcher = new_debouncer(
429        Duration::from_secs(5),
430        None,
431        move |event: Result<Vec<DebouncedEvent>, _>| {
432            let array_of_events = match event {
433                Ok(events) => events,
434                Err(array_errors) => {
435                    for err in array_errors {
436                        error!(?err, "inotify debounce error");
437                    }
438                    return;
439                }
440            };
441
442            let mut path_of_interest_was_changed = false;
443
444            for inode_event in array_of_events.iter() {
445                if !inode_event.kind.is_access()
446                    && inode_event.paths.iter().any(|path| {
447                        path == Path::new("/etc/group")
448                            || path == Path::new("/etc/passwd")
449                            || path == Path::new("/etc/shadow")
450                    })
451                {
452                    debug!(?inode_event, "Handling inotify modification event");
453
454                    path_of_interest_was_changed = true
455                }
456            }
457
458            if path_of_interest_was_changed {
459                let _ = shadow_broadcast_tx.send(true);
460            } else {
461                debug!(?array_of_events, "IGNORED");
462            }
463        },
464    )
465    .and_then(|mut debouncer| {
466        debouncer
467            .watch(Path::new("/etc"), RecursiveMode::Recursive)
468            .map(|()| debouncer)
469    });
470
471    watcher.map_err(|err| {
472        error!(?err, "Failed to setup inotify");
473        ExitCode::FAILURE
474    })
475}
476
477#[tokio::main(flavor = "current_thread")]
478async fn main() -> ExitCode {
479    // On linux when debug assertions are disabled, prevent ptrace
480    // from attaching to us.
481    #[cfg(all(target_os = "linux", not(debug_assertions)))]
482    if let Err(code) = prctl::set_dumpable(false) {
483        error!(?code, "CRITICAL: Unable to set prctl flags");
484        return ExitCode::FAILURE;
485    }
486    // let cuid = get_current_uid();
487    // let cgid = get_current_gid();
488    // We only need to check effective id
489    let ceuid = get_effective_uid();
490    let cegid = get_effective_gid();
491
492    for arg in std::env::args() {
493        if arg.contains("--version") {
494            println!("kanidm_unixd_tasks {}", env!("CARGO_PKG_VERSION"));
495            return ExitCode::SUCCESS;
496        } else if arg.contains("--help") {
497            println!("kanidm_unixd_tasks {}", env!("CARGO_PKG_VERSION"));
498            println!("Usage: kanidm_unixd_tasks");
499            println!("  --version");
500            println!("  --help");
501            return ExitCode::SUCCESS;
502        }
503    }
504
505    #[allow(clippy::expect_used)]
506    tracing_forest::worker_task()
507        .set_global(true)
508        // Fall back to stderr
509        .map_sender(|sender| sender.or_stderr())
510        .build_on(|subscriber| {
511            subscriber.with(
512                EnvFilter::try_from_default_env()
513                    .or_else(|_| EnvFilter::try_new("info"))
514                    .expect("Failed to init envfilter"),
515            )
516        })
517        .on(async {
518            if ceuid != 0 || cegid != 0 {
519                error!("Refusing to run - this process *MUST* operate as root.");
520                return ExitCode::FAILURE;
521            }
522
523            let unixd_path = Path::new(DEFAULT_CONFIG_PATH);
524            let unixd_path_str = match unixd_path.to_str() {
525                Some(cps) => cps,
526                None => {
527                    error!("Unable to turn unixd_path to str");
528                    return ExitCode::FAILURE;
529                }
530            };
531
532            let cfg = match UnixdConfig::new().read_options_from_optional_config(unixd_path) {
533                Ok(v) => v,
534                Err(_) => {
535                    error!("Failed to parse {}", unixd_path_str);
536                    return ExitCode::FAILURE;
537                }
538            };
539
540            let task_sock_path = cfg.task_sock_path.clone();
541            debug!("Attempting to use {} ...", task_sock_path);
542
543            // This is the startup/shutdown control channel
544            let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
545            let mut d_broadcast_rx = broadcast_tx.subscribe();
546
547            // This is to broadcast when we need to reload the shadow
548            // files.
549            let (shadow_broadcast_tx, shadow_broadcast_rx) = broadcast::channel(4);
550
551            let watcher = match setup_shadow_inotify_watcher(shadow_broadcast_tx.clone()) {
552                Ok(w) => w,
553                Err(exit) => return exit,
554            };
555
556            // Setup the etcdb watch
557            let etc_db = match process_etc_passwd_group().await {
558                Ok(etc_db) => etc_db,
559                Err(err) => {
560                    warn!(?err, "unable to process /etc/passwd and related files.");
561                    // Return an empty set instead.
562                    EtcDb::default()
563                }
564            };
565
566            let (shadow_data_watch_tx, mut shadow_data_watch_rx) = watch::channel(etc_db);
567
568            let _shadow_task = tokio::spawn(async move {
569                shadow_reload_task(
570                    shadow_data_watch_tx, shadow_broadcast_rx
571                ).await
572            });
573
574            let server = tokio::spawn(async move {
575                loop {
576                    info!("Attempting to connect to kanidm_unixd ...");
577
578                    tokio::select! {
579                        _ = broadcast_rx.recv() => {
580                            break;
581                        }
582                        connect_res = UnixStream::connect(&task_sock_path) => {
583                            match connect_res {
584                                Ok(stream) => {
585                                    info!("Found kanidm_unixd, waiting for tasks ...");
586
587                                    // Yep! Now let the main handler do it's job.
588                                    // If it returns (dc, etc, then we loop and try again).
589                                    handle_tasks(stream, &mut d_broadcast_rx, &mut shadow_data_watch_rx, &cfg).await;
590                                    continue;
591                                }
592                                Err(e) => {
593                                    debug!("\\---> {:?}", e);
594                                    error!("Unable to find kanidm_unixd, sleeping ...");
595                                    // Back off.
596                                    time::sleep(Duration::from_millis(5000)).await;
597                                }
598                            }
599                        }
600                    } // select
601                } // loop
602            });
603
604            info!("Server started ...");
605
606            // On linux, notify systemd.
607            #[cfg(target_os = "linux")]
608            let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]);
609
610            loop {
611                tokio::select! {
612                    Ok(()) = tokio::signal::ctrl_c() => {
613                        break
614                    }
615                    Some(()) = async move {
616                        let sigterm = tokio::signal::unix::SignalKind::terminate();
617                        #[allow(clippy::unwrap_used)]
618                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
619                    } => {
620                        break
621                    }
622                    Some(()) = async move {
623                        let sigterm = tokio::signal::unix::SignalKind::alarm();
624                        #[allow(clippy::unwrap_used)]
625                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
626                    } => {
627                        // Ignore
628                    }
629                    Some(()) = async move {
630                        let sigterm = tokio::signal::unix::SignalKind::hangup();
631                        #[allow(clippy::unwrap_used)]
632                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
633                    } => {
634                        // Ignore
635                    }
636                    Some(()) = async move {
637                        let sigterm = tokio::signal::unix::SignalKind::user_defined1();
638                        #[allow(clippy::unwrap_used)]
639                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
640                    } => {
641                        // Ignore
642                    }
643
644                    Some(()) = async move {
645                        let sigterm = tokio::signal::unix::SignalKind::user_defined2();
646                        #[allow(clippy::unwrap_used)]
647                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
648                    } => {
649                        // Ignore
650                    }
651                }
652            }
653            info!("Signal received, shutting down");
654            // Send a broadcast that we are done.
655            if let Err(e) = broadcast_tx.send(true) {
656                error!("Unable to shutdown workers {:?}", e);
657            }
658
659            debug!("Dropping inotify watcher ...");
660            drop(watcher);
661
662            let _ = server.await;
663            ExitCode::SUCCESS
664        })
665        .await
666}