kanidm_unixd_tasks/
kanidm_unixd_tasks.rs

1#![deny(warnings)]
2#![warn(unused_extern_crates)]
3#![deny(clippy::todo)]
4#![deny(clippy::unimplemented)]
5#![deny(clippy::unwrap_used)]
6#![deny(clippy::expect_used)]
7#![deny(clippy::panic)]
8#![deny(clippy::unreachable)]
9#![deny(clippy::await_holding_lock)]
10#![deny(clippy::needless_pass_by_value)]
11#![deny(clippy::trivially_copy_pass_by_ref)]
12
13use futures::{SinkExt, StreamExt};
14use kanidm_unix_common::constants::{
15    DEFAULT_CONFIG_PATH, SYSTEM_GROUP_PATH, SYSTEM_PASSWD_PATH, SYSTEM_SHADOW_PATH,
16};
17use kanidm_unix_common::json_codec::JsonCodec;
18use kanidm_unix_common::unix_config::{HomeStrategy, UnixdConfig};
19use kanidm_unix_common::unix_passwd::{parse_etc_group, parse_etc_passwd, parse_etc_shadow, EtcDb};
20use kanidm_unix_common::unix_proto::{
21    HomeDirectoryInfo, TaskRequest, TaskRequestFrame, TaskResponse,
22};
23use kanidm_utils_users::{get_effective_gid, get_effective_uid};
24use libc::{lchown, umask};
25use notify_debouncer_full::notify::RecommendedWatcher;
26use notify_debouncer_full::Debouncer;
27use notify_debouncer_full::RecommendedCache;
28use notify_debouncer_full::{new_debouncer, notify::RecursiveMode, DebouncedEvent};
29use sketching::tracing_forest::traits::*;
30use sketching::tracing_forest::util::*;
31use sketching::tracing_forest::{self};
32use std::ffi::CString;
33use std::os::unix::ffi::OsStrExt;
34use std::os::unix::fs::symlink;
35use std::path::{Path, PathBuf};
36use std::process::ExitCode;
37use std::time::Duration;
38use std::{fs, io};
39use tokio::fs::File;
40use tokio::io::AsyncReadExt;
41use tokio::net::UnixStream;
42use tokio::sync::broadcast;
43use tokio::sync::watch;
44use tokio::time;
45use tokio_util::codec::Framed;
46use tracing::instrument;
47use walkdir::WalkDir;
48
49#[cfg(target_os = "linux")]
50use nix::mount::MsFlags;
51#[cfg(target_os = "linux")]
52use procfs::process::Process;
53#[cfg(target_os = "linux")]
54use std::fs::{create_dir, remove_file};
55
56#[cfg(all(target_family = "unix", feature = "selinux"))]
57use kanidm_unix_common::selinux_util;
58
59static KANIDM_UNIX_RETRY_SECS: u64 = 5;
60
61fn chown(path: &Path, gid: u32) -> Result<(), String> {
62    let path_os = CString::new(path.as_os_str().as_bytes())
63        .map_err(|_| "Unable to create c-string".to_string())?;
64
65    // Change the owner to the gid - remember, kanidm ONLY has gid's, the uid is implied.
66    if unsafe { lchown(path_os.as_ptr(), gid, gid) } != 0 {
67        return Err("Unable to set ownership".to_string());
68    }
69    Ok(())
70}
71
72fn create_home_directory(
73    info: &HomeDirectoryInfo,
74    home_prefix_path: &Path,
75    home_mount_prefix_path: Option<&PathBuf>,
76    home_strategy: &HomeStrategy,
77    use_etc_skel: bool,
78    use_selinux: bool,
79) -> Result<(), String> {
80    // Final sanity check to prevent certain classes of attacks. This should *never*
81    // be possible, but we assert this to be sure.
82    let name = info.name.trim_start_matches('.').replace(['/', '\\'], "");
83
84    debug!(?home_prefix_path, ?home_mount_prefix_path, ?info);
85
86    // This is where the users home dir "is" and aliases from here go to the true storage
87    // mounts
88    let home_prefix_path = home_prefix_path
89        .canonicalize()
90        .map_err(|e| format!("{e:?}"))?;
91
92    // This is where the storage is *mounted*. If not set, falls back to the home_prefix.
93    let home_mount_prefix_path = home_mount_prefix_path
94        .unwrap_or(&home_prefix_path)
95        .canonicalize()
96        .map_err(|e| format!("{e:?}"))?;
97
98    // Does our home_prefix actually exist?
99    if !home_prefix_path.exists() || !home_prefix_path.is_dir() || !home_prefix_path.is_absolute() {
100        return Err("Invalid home_prefix from configuration - home_prefix path must exist, must be a directory, and must be absolute (not relative)".to_string());
101    }
102
103    if !home_mount_prefix_path.exists()
104        || !home_mount_prefix_path.is_dir()
105        || !home_mount_prefix_path.is_absolute()
106    {
107        return Err("Invalid home_mount_prefix from configuration - home_prefix path must exist, must be a directory, and must be absolute (not relative)".to_string());
108    }
109
110    // This is now creating the actual home directory in the home_mount path.
111    // First we want to validate that the path is legitimate and hasn't tried
112    // to escape the home_mount prefix.
113    let hd_mount_path = Path::join(&home_mount_prefix_path, &name);
114
115    debug!(?hd_mount_path);
116
117    if let Some(pp) = hd_mount_path.parent() {
118        if pp != home_mount_prefix_path {
119            return Err("Invalid home directory name - not within home_mount_prefix".to_string());
120        }
121    } else {
122        return Err("Invalid/Corrupt home directory path - no prefix found".to_string());
123    }
124
125    // Get a handle to the SELinux labeling interface
126    debug!(?use_selinux, "selinux for home dir labeling");
127    #[cfg(all(target_family = "unix", feature = "selinux"))]
128    let labeler = if use_selinux {
129        selinux_util::SelinuxLabeler::new(info.gid, &home_mount_prefix_path)?
130    } else {
131        selinux_util::SelinuxLabeler::new_noop()
132    };
133
134    // In the ZFS strategy, we will need to check if the filesystem
135    // exists, rather than the location it's mounted to.
136    let hd_mount_path_exists = match home_strategy {
137        HomeStrategy::Symlink => hd_mount_path.exists(),
138        #[cfg(target_os = "linux")]
139        HomeStrategy::BindMount => hd_mount_path.exists(),
140    };
141
142    // Does the home directory exist? This is checking the *true* home mount storage.
143    if !hd_mount_path_exists {
144        // Set the SELinux security context for file creation
145        #[cfg(all(target_family = "unix", feature = "selinux"))]
146        labeler.do_setfscreatecon_for_path()?;
147
148        // This is affected by the mount strategy
149        // because in a future ZFS home dir setup, we'll need to be able to make
150        // the zfs volume for the user in this step.
151        match home_strategy {
152            HomeStrategy::Symlink => create_dir_path(&hd_mount_path, info)?,
153            #[cfg(target_os = "linux")]
154            HomeStrategy::BindMount => create_dir_path(&hd_mount_path, info)?,
155        }
156
157        // Copy in structure from /etc/skel/ if present
158        let skel_dir = Path::new("/etc/skel/");
159        if use_etc_skel && skel_dir.exists() {
160            info!("preparing homedir using /etc/skel");
161            for entry in WalkDir::new(skel_dir).into_iter().filter_map(|e| e.ok()) {
162                let dest = &hd_mount_path.join(
163                    entry
164                        .path()
165                        .strip_prefix(skel_dir)
166                        .map_err(|e| e.to_string())?,
167                );
168
169                #[cfg(all(target_family = "unix", feature = "selinux"))]
170                {
171                    let p = entry
172                        .path()
173                        .strip_prefix(skel_dir)
174                        .map_err(|e| e.to_string())?;
175                    labeler.label_path(p)?;
176                }
177
178                if entry.path().is_dir() {
179                    fs::create_dir_all(dest).map_err(|e| {
180                        error!(err = ?e, ?dest, "Unable to create directory from /etc/skel");
181                        e.to_string()
182                    })?;
183                } else {
184                    fs::copy(entry.path(), dest).map_err(|e| {
185                        error!(err = ?e, ?dest, "Unable to copy from /etc/skel");
186                        e.to_string()
187                    })?;
188                }
189                chown(dest, info.gid)?;
190
191                // Create equivalence rule in the SELinux policy
192                #[cfg(all(target_family = "unix", feature = "selinux"))]
193                labeler.setup_equivalence_rule(&hd_mount_path)?;
194            }
195        }
196    }
197
198    // Reset object creation SELinux context to default
199    #[cfg(all(target_family = "unix", feature = "selinux"))]
200    labeler.set_default_context_for_fs_objects()?;
201
202    let Some(alias) = info.alias.as_ref() else {
203        // No alias for the home dir, lets go.
204        debug!("No home directory alias present, success.");
205        return Ok(());
206    };
207
208    // Sanity check the alias.
209    // let alias = alias.replace(".", "").replace("/", "").replace("\\", "");
210    let alias = alias.trim_start_matches('.').replace(['/', '\\'], "");
211
212    let alias_path = Path::join(&home_prefix_path, &alias);
213
214    // Assert the resulting alias path is consistent and correct within the home_prefix.
215    if let Some(pp) = alias_path.parent() {
216        if pp != home_prefix_path {
217            return Err("Invalid home directory alias - not within home_prefix".to_string());
218        }
219    } else {
220        return Err("Invalid/Corrupt alias directory path - no prefix found".to_string());
221    }
222
223    match home_strategy {
224        HomeStrategy::Symlink => home_alias_update_symlink(&alias_path, &hd_mount_path),
225        #[cfg(target_os = "linux")]
226        HomeStrategy::BindMount => home_alias_update_bind_mount(&alias_path, &hd_mount_path),
227    }
228}
229
230fn create_dir_path(hd_mount_path: &Path, info: &HomeDirectoryInfo) -> Result<(), String> {
231    // Set a umask
232    let before = unsafe { umask(0o0027) };
233
234    // Create the home directory.
235    if let Err(e) = fs::create_dir_all(hd_mount_path) {
236        let _ = unsafe { umask(before) };
237        error!(err = ?e, ?hd_mount_path, "Unable to create directory");
238        return Err(format!("{e:?}"));
239    }
240    let _ = unsafe { umask(before) };
241
242    chown(hd_mount_path, info.gid)
243}
244
245#[cfg(target_os = "linux")]
246fn home_alias_update_bind_mount(alias_path: &Path, hd_mount_path: &Path) -> Result<(), String> {
247    if alias_path.exists() {
248        if alias_path.is_symlink() {
249            // If the alias_path is a symlink, remove it
250            if let Err(e) = remove_file(alias_path) {
251                error!("Unable to remove existing symlink at {alias_path:?}");
252                return Err(format!("{e:?}"));
253            }
254        } else if !alias_path.is_dir() {
255            // If it's anything other than a directory, we don't proceed.
256            error!("A non-directory item already exists at {alias_path:?}");
257            return Err(format!(
258                "A non-directory item already exists at {alias_path:?}"
259            ));
260        }
261    }
262
263    // Create mount point if it doesn't exist
264    if !alias_path.exists() {
265        if let Err(e) = create_dir(alias_path) {
266            error!("Unable to create bind mount target at {alias_path:?}");
267            return Err(format!("{e:?}"));
268        }
269    }
270
271    let current_mounts = Process::myself()
272        .map_err(|e| format!("While updating home directory bind mount, could not get reference to current process: {e}"))?
273        .mountinfo()
274        .map_err(|e| format!("While updating home directory bind mount, could not get mount info: {e}"))?;
275
276    // Remove conflicting mount if it exists:
277    let mismatching_mount = current_mounts.iter().find(|m| {
278        m.mount_point == alias_path && m.mount_source.as_ref().map(Path::new) != Some(hd_mount_path)
279    });
280
281    if let Some(m) = mismatching_mount {
282        nix::mount::umount(&m.mount_point).map_err(|e| {
283            format!(
284                "Unable to remove conflicting mount at {:?}: {e}",
285                &m.mount_point
286            )
287        })?;
288    }
289
290    // If mount point exists and is already correctly mounted, we are done
291    if current_mounts.iter().any(|m| {
292        m.mount_point == alias_path && m.mount_source.as_ref().map(Path::new) != Some(hd_mount_path)
293    }) {
294        return Ok(());
295    }
296
297    // Finally, try to create the bind mount
298    nix::mount::mount::<Path, Path, str, str>(
299        Some(hd_mount_path),
300        alias_path,
301        None,
302        MsFlags::MS_BIND,
303        None,
304    )
305    .map_err(|e| {
306        format!("Unable to bind mount home directory {hd_mount_path:?} to {alias_path:?}: {e}")
307    })?;
308
309    Ok(())
310}
311
312fn home_alias_update_symlink(alias_path: &Path, hd_mount_path: &Path) -> Result<(), String> {
313    if !alias_path.exists() {
314        // Does not exist. Create.
315        debug!("creating symlink {:?} -> {:?}", alias_path, hd_mount_path);
316        if let Err(e) = symlink(hd_mount_path, alias_path) {
317            error!(err = ?e, ?alias_path, "Unable to create alias path");
318            return Err(format!("{e:?}"));
319        }
320        return Ok(());
321    }
322
323    debug!("checking symlink {:?} -> {:?}", alias_path, hd_mount_path);
324    let attr = match fs::symlink_metadata(alias_path) {
325        Ok(a) => a,
326        Err(e) => {
327            error!(err = ?e, ?alias_path, "Unable to read alias path metadata");
328            return Err(format!("{e:?}"));
329        }
330    };
331
332    if !attr.file_type().is_symlink() {
333        warn!(
334            ?alias_path,
335            ?hd_mount_path,
336            "home directory alias path is not a symlink, unable to update"
337        );
338        return Ok(());
339    }
340
341    // If already correct, skip churn.
342    match fs::read_link(alias_path) {
343        Ok(current_target) if current_target == hd_mount_path => {
344            debug!(
345                ?alias_path,
346                ?current_target,
347                "alias symlink already correct, skipping update"
348            );
349            return Ok(());
350        }
351        Ok(current_target) => {
352            debug!(
353                ?alias_path,
354                ?current_target,
355                ?hd_mount_path,
356                "alias symlink target differs, updating atomically"
357            );
358        }
359        Err(e) => {
360            warn!(
361                err=?e, ?alias_path,
362                "unable to read existing symlink target, will replace atomically"
363            );
364        }
365    }
366
367    // Atomic replace: create temp symlink in same dir, then rename over existing.
368    let alias_path_tmp = alias_path.with_extension("tmp");
369
370    if alias_path_tmp.exists() {
371        debug!("checking symlink temp {:?}", alias_path_tmp);
372        let attr = match fs::symlink_metadata(&alias_path_tmp) {
373            Ok(a) => a,
374            Err(e) => {
375                error!(err = ?e, ?alias_path_tmp, "Unable to read alias path temp metadata");
376                return Err(format!("{e:?}"));
377            }
378        };
379
380        if !attr.file_type().is_symlink() {
381            warn!(
382                ?alias_path,
383                ?alias_path_tmp,
384                ?hd_mount_path,
385                "home directory alias path temporary update location already exists, and is not a symlink, unable to update"
386            );
387            return Ok(());
388        }
389    }
390
391    // Best-effort cleanup of any stale tmp.
392    let _ = fs::remove_file(&alias_path_tmp);
393
394    if let Err(e) = symlink(hd_mount_path, &alias_path_tmp) {
395        error!(err=?e, ?alias_path_tmp, "Unable to create temporary alias symlink");
396        return Err(format!("{e:?}"));
397    }
398
399    // Rename is atomic within the same directory; no disappearance window.
400    if let Err(e) = fs::rename(&alias_path_tmp, alias_path) {
401        error!(err=?e, from=?alias_path_tmp, to=?alias_path, "Unable to atomically replace alias symlink");
402        // Cleanup temp on failure.
403        let _ = fs::remove_file(&alias_path_tmp);
404        return Err(format!("{e:?}"));
405    }
406
407    debug!(
408        "alias symlink updated atomically {:?} -> {:?}",
409        alias_path, hd_mount_path
410    );
411
412    Ok(())
413}
414
415async fn shadow_reload_task(
416    shadow_data_watch_tx: watch::Sender<EtcDb>,
417    mut shadow_broadcast_rx: broadcast::Receiver<bool>,
418) {
419    debug!("shadow reload task has started ...");
420
421    while shadow_broadcast_rx.recv().await.is_ok() {
422        match process_etc_passwd_group().await {
423            Ok(etc_db) => {
424                shadow_data_watch_tx.send_replace(etc_db);
425                debug!("shadow reload task sent");
426            }
427            Err(()) => {
428                error!("Unable to process etc db");
429                continue;
430            }
431        }
432    }
433
434    debug!("shadow reload task has stopped");
435}
436
437async fn handle_shadow_reload(shadow_data_watch_rx: &mut watch::Receiver<EtcDb>) -> TaskResponse {
438    debug!("Received shadow reload event.");
439    let etc_db: EtcDb = {
440        let etc_db_ref = shadow_data_watch_rx.borrow_and_update();
441        (*etc_db_ref).clone()
442    };
443    // process etc shadow and send it here.
444    TaskResponse::NotifyShadowChange(etc_db)
445}
446
447async fn handle_unixd_request(
448    request: Option<Result<TaskRequestFrame, io::Error>>,
449    cfg: &UnixdConfig,
450) -> Result<TaskResponse, ()> {
451    debug!("Received unixd event.");
452    match request {
453        Some(Ok(TaskRequestFrame {
454            id,
455            req: TaskRequest::HomeDirectory(info),
456        })) => {
457            debug!("Received task -> HomeDirectory({:?})", info);
458
459            match create_home_directory(
460                &info,
461                cfg.home_prefix.as_ref(),
462                cfg.home_mount_prefix.as_ref(),
463                &cfg.home_strategy,
464                cfg.use_etc_skel,
465                cfg.selinux,
466            ) {
467                Ok(()) => Ok(TaskResponse::Success(id)),
468                Err(msg) => Ok(TaskResponse::Error(msg)),
469            }
470        }
471        other => {
472            error!("Error -> got un-handled Request Frame {other:?}");
473            Err(())
474        }
475    }
476}
477
478async fn handle_tasks(
479    stream: UnixStream,
480    ctl_broadcast_rx: &mut broadcast::Receiver<bool>,
481    shadow_data_watch_rx: &mut watch::Receiver<EtcDb>,
482    cfg: &UnixdConfig,
483) {
484    let codec: JsonCodec<TaskRequestFrame, TaskResponse> = JsonCodec::default();
485
486    let mut reqs = Framed::new(stream, codec);
487
488    // Immediately trigger that we should reload the shadow files for the new connected handler
489    shadow_data_watch_rx.mark_changed();
490
491    debug!("Task handler loop has started ...");
492
493    loop {
494        let msg = tokio::select! {
495            biased; // tell tokio to poll these in order
496            _ = ctl_broadcast_rx.recv() => {
497                // We received a shutdown signal.
498                info!("Received shutdown signal, breaking task handler loop ...");
499                return
500            }
501            // We bias to *sending* messages in tasks.
502            Ok(_) = shadow_data_watch_rx.changed() => {
503                handle_shadow_reload(shadow_data_watch_rx).await
504            }
505            request = reqs.next() => {
506                match handle_unixd_request(request,  cfg).await {
507                    Ok(response) => {
508                       response
509                    }
510                    Err(_) => {
511                        error!("Error handling request, exiting task handler loop ...");
512                        return;
513                    }
514                }
515            }
516        };
517
518        if let Err(e) = reqs.send(msg).await {
519            error!(?e, "Error sending response to kanidm_unixd");
520            return;
521        }
522    }
523}
524
525#[instrument(level = "debug", skip_all)]
526async fn process_etc_passwd_group() -> Result<EtcDb, ()> {
527    let mut file = File::open(SYSTEM_PASSWD_PATH).await.map_err(|err| {
528        error!(?err);
529    })?;
530    let mut contents = vec![];
531    file.read_to_end(&mut contents).await.map_err(|err| {
532        error!(?err);
533    })?;
534
535    let users = parse_etc_passwd(contents.as_slice())
536        .map_err(|_| "Invalid passwd content")
537        .map_err(|err| {
538            error!(?err);
539        })?;
540
541    let mut file = File::open(SYSTEM_SHADOW_PATH).await.map_err(|err| {
542        error!(?err);
543    })?;
544    let mut contents = vec![];
545    file.read_to_end(&mut contents).await.map_err(|err| {
546        error!(?err);
547    })?;
548
549    let shadow = parse_etc_shadow(contents.as_slice())
550        .map_err(|_| "Invalid passwd content")
551        .map_err(|err| {
552            error!(?err);
553        })?;
554
555    let mut file = File::open(SYSTEM_GROUP_PATH).await.map_err(|err| {
556        error!(?err);
557    })?;
558    let mut contents = vec![];
559    file.read_to_end(&mut contents).await.map_err(|err| {
560        error!(?err);
561    })?;
562
563    let groups = parse_etc_group(contents.as_slice())
564        .map_err(|_| "Invalid group content")
565        .map_err(|err| {
566            error!(?err);
567        })?;
568
569    Ok(EtcDb {
570        users,
571        shadow,
572        groups,
573    })
574}
575
576fn setup_shadow_inotify_watcher(
577    shadow_broadcast_tx: broadcast::Sender<bool>,
578) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, ExitCode> {
579    let watcher = new_debouncer(
580        Duration::from_secs(5),
581        None,
582        move |event: Result<Vec<DebouncedEvent>, _>| {
583            let array_of_events = match event {
584                Ok(events) => events,
585                Err(array_errors) => {
586                    for err in array_errors {
587                        error!(?err, "inotify debounce error");
588                    }
589                    return;
590                }
591            };
592
593            let mut path_of_interest_was_changed = false;
594
595            for inode_event in array_of_events.iter() {
596                if !inode_event.kind.is_access()
597                    && inode_event.paths.iter().any(|path| {
598                        path == Path::new(SYSTEM_GROUP_PATH)
599                            || path == Path::new(SYSTEM_PASSWD_PATH)
600                            || path == Path::new(SYSTEM_SHADOW_PATH)
601                    })
602                {
603                    debug!(?inode_event, "Handling inotify modification event");
604
605                    path_of_interest_was_changed = true
606                }
607            }
608
609            if path_of_interest_was_changed {
610                let _ = shadow_broadcast_tx.send(true);
611            } else {
612                trace!(?array_of_events, "IGNORED");
613            }
614        },
615    )
616    .and_then(|mut debouncer| {
617        debouncer
618            .watch(Path::new("/etc"), RecursiveMode::Recursive)
619            .map(|()| debouncer)
620    });
621
622    watcher.map_err(|err| {
623        error!(?err, "Failed to setup inotify");
624        ExitCode::FAILURE
625    })
626}
627
628#[tokio::main(flavor = "current_thread")]
629async fn main() -> ExitCode {
630    // On linux when debug assertions are disabled, prevent ptrace
631    // from attaching to us.
632    #[cfg(all(target_os = "linux", not(debug_assertions)))]
633    if let Err(code) = prctl::set_dumpable(false) {
634        error!(?code, "CRITICAL: Unable to set prctl flags");
635        return ExitCode::FAILURE;
636    }
637    // let cuid = get_current_uid();
638    // let cgid = get_current_gid();
639    // We only need to check effective id
640    let ceuid = get_effective_uid();
641    let cegid = get_effective_gid();
642
643    for arg in std::env::args() {
644        if arg.contains("--version") {
645            println!("kanidm_unixd_tasks {}", env!("CARGO_PKG_VERSION"));
646            return ExitCode::SUCCESS;
647        } else if arg.contains("--help") {
648            println!("kanidm_unixd_tasks {}", env!("CARGO_PKG_VERSION"));
649            println!("Usage: kanidm_unixd_tasks");
650            println!("  --version");
651            println!("  --help");
652            return ExitCode::SUCCESS;
653        }
654    }
655
656    #[allow(clippy::expect_used)]
657    tracing_forest::worker_task()
658        .set_global(true)
659        // Fall back to stderr
660        .map_sender(|sender| sender.or_stderr())
661        .build_on(|subscriber| {
662            subscriber.with(
663                EnvFilter::try_from_default_env()
664                    .or_else(|_| EnvFilter::try_new("info"))
665                    .expect("Failed to init envfilter"),
666            )
667        })
668        .on(async {
669            if ceuid != 0 || cegid != 0 {
670                error!("Refusing to run - this process *MUST* operate as root.");
671                return ExitCode::FAILURE;
672            }
673
674            let unixd_path = Path::new(DEFAULT_CONFIG_PATH);
675            let unixd_path_str = match unixd_path.to_str() {
676                Some(cps) => cps,
677                None => {
678                    error!("Unable to turn unixd_path to str");
679                    return ExitCode::FAILURE;
680                }
681            };
682
683            let cfg = match UnixdConfig::new().read_options_from_optional_config(unixd_path) {
684                Ok(v) => v,
685                Err(_) => {
686                    error!("Failed to parse {}", unixd_path_str);
687                    return ExitCode::FAILURE;
688                }
689            };
690
691            let task_sock_path = cfg.task_sock_path.clone();
692            debug!("Attempting to use {} ...", task_sock_path);
693
694            // This is the startup/shutdown control channel
695            let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
696            let mut d_broadcast_rx = broadcast_tx.subscribe();
697
698            // This is to broadcast when we need to reload the shadow
699            // files.
700            let (shadow_broadcast_tx, shadow_broadcast_rx) = broadcast::channel(4);
701
702            let watcher = match setup_shadow_inotify_watcher(shadow_broadcast_tx.clone()) {
703                Ok(w) => w,
704                Err(exit) => return exit,
705            };
706
707            // Setup the etcdb watch
708            let etc_db = match process_etc_passwd_group().await {
709                Ok(etc_db) => etc_db,
710                Err(err) => {
711                    warn!(?err, "unable to process {SYSTEM_PASSWD_PATH} and related files.");
712                    // Return an empty set instead.
713                    EtcDb::default()
714                }
715            };
716
717            let (shadow_data_watch_tx, mut shadow_data_watch_rx) = watch::channel(etc_db);
718
719            let _shadow_task = tokio::spawn(async move {
720                shadow_reload_task(
721                    shadow_data_watch_tx, shadow_broadcast_rx
722                ).await
723            });
724
725            let server = tokio::spawn(async move {
726                loop {
727                    info!("Attempting to connect to kanidm_unixd ...");
728
729                    tokio::select! {
730                        _ = broadcast_rx.recv() => {
731                            break;
732                        }
733                        connect_res = UnixStream::connect(&task_sock_path) => {
734                            match connect_res {
735                                Ok(stream) => {
736                                    info!("Found kanidm_unixd, waiting for tasks ...");
737
738                                    // Yep! Now let the main handler do its job.
739                                    // If it returns (disconnected, etc, then we loop and try again).
740                                    handle_tasks(stream, &mut d_broadcast_rx, &mut shadow_data_watch_rx, &cfg).await;
741                                    continue;
742                                }
743                                Err(e) => {
744                                    debug!("\\---> {:?}", e);
745                                    error!("Unable to find kanidm_unixd, sleeping for {} seconds ...", KANIDM_UNIX_RETRY_SECS);
746                                    // Back off.
747                                    time::sleep(Duration::from_secs(KANIDM_UNIX_RETRY_SECS)).await;
748                                }
749                            }
750                        }
751                    } // select
752                } // loop
753            });
754
755            info!("Server started ...");
756
757            // On linux, notify systemd.
758            #[cfg(target_os = "linux")]
759            unsafe {
760                let _ = sd_notify::notify_and_unset_env( &[sd_notify::NotifyState::Ready]);
761            }
762
763            loop {
764                tokio::select! {
765                    Ok(()) = tokio::signal::ctrl_c() => {
766                        break
767                    }
768                    Some(()) = async move {
769                        let sigterm = tokio::signal::unix::SignalKind::terminate();
770                        #[allow(clippy::unwrap_used)]
771                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
772                    } => {
773                        break
774                    }
775                    Some(()) = async move {
776                        let sigterm = tokio::signal::unix::SignalKind::alarm();
777                        #[allow(clippy::unwrap_used)]
778                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
779                    } => {
780                        // Ignore
781                    }
782                    Some(()) = async move {
783                        let sigterm = tokio::signal::unix::SignalKind::hangup();
784                        #[allow(clippy::unwrap_used)]
785                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
786                    } => {
787                        // Ignore
788                    }
789                    Some(()) = async move {
790                        let sigterm = tokio::signal::unix::SignalKind::user_defined1();
791                        #[allow(clippy::unwrap_used)]
792                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
793                    } => {
794                        // Ignore
795                    }
796
797                    Some(()) = async move {
798                        let sigterm = tokio::signal::unix::SignalKind::user_defined2();
799                        #[allow(clippy::unwrap_used)]
800                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
801                    } => {
802                        // Ignore
803                    }
804                }
805            }
806            info!("Signal received, shutting down");
807            // Send a broadcast that we are done.
808            if let Err(e) = broadcast_tx.send(true) {
809                error!("Unable to shutdown workers {:?}", e);
810            }
811
812            debug!("Dropping inotify watcher ...");
813            drop(watcher);
814
815            let _ = server.await;
816            ExitCode::SUCCESS
817        })
818        .await
819}