kanidm_unixd_tasks/
kanidm_unixd_tasks.rs

1#![deny(warnings)]
2#![warn(unused_extern_crates)]
3#![deny(clippy::todo)]
4#![deny(clippy::unimplemented)]
5#![deny(clippy::unwrap_used)]
6#![deny(clippy::expect_used)]
7#![deny(clippy::panic)]
8#![deny(clippy::unreachable)]
9#![deny(clippy::await_holding_lock)]
10#![deny(clippy::needless_pass_by_value)]
11#![deny(clippy::trivially_copy_pass_by_ref)]
12
13use futures::{SinkExt, StreamExt};
14use kanidm_unix_common::constants::{
15    DEFAULT_CONFIG_PATH, SYSTEM_GROUP_PATH, SYSTEM_PASSWD_PATH, SYSTEM_SHADOW_PATH,
16};
17use kanidm_unix_common::json_codec::JsonCodec;
18use kanidm_unix_common::unix_config::{HomeStrategy, UnixdConfig};
19use kanidm_unix_common::unix_passwd::{parse_etc_group, parse_etc_passwd, parse_etc_shadow, EtcDb};
20use kanidm_unix_common::unix_proto::{
21    HomeDirectoryInfo, TaskRequest, TaskRequestFrame, TaskResponse,
22};
23use kanidm_utils_users::{get_effective_gid, get_effective_uid};
24use libc::{lchown, umask};
25use notify_debouncer_full::notify::RecommendedWatcher;
26use notify_debouncer_full::Debouncer;
27use notify_debouncer_full::RecommendedCache;
28use notify_debouncer_full::{new_debouncer, notify::RecursiveMode, DebouncedEvent};
29use sketching::tracing_forest::traits::*;
30use sketching::tracing_forest::util::*;
31use sketching::tracing_forest::{self};
32use std::ffi::CString;
33use std::os::unix::ffi::OsStrExt;
34use std::os::unix::fs::symlink;
35use std::path::{Path, PathBuf};
36use std::process::ExitCode;
37use std::time::Duration;
38use std::{fs, io};
39use tokio::fs::File;
40use tokio::io::AsyncReadExt;
41use tokio::net::UnixStream;
42use tokio::sync::broadcast;
43use tokio::sync::watch;
44use tokio::time;
45use tokio_util::codec::Framed;
46use tracing::instrument;
47use walkdir::WalkDir;
48
49#[cfg(target_os = "linux")]
50use nix::mount::MsFlags;
51#[cfg(target_os = "linux")]
52use procfs::process::Process;
53#[cfg(target_os = "linux")]
54use std::fs::{create_dir, remove_file};
55
56#[cfg(all(target_family = "unix", feature = "selinux"))]
57use kanidm_unix_common::selinux_util;
58
59fn chown(path: &Path, gid: u32) -> Result<(), String> {
60    let path_os = CString::new(path.as_os_str().as_bytes())
61        .map_err(|_| "Unable to create c-string".to_string())?;
62
63    // Change the owner to the gid - remember, kanidm ONLY has gid's, the uid is implied.
64    if unsafe { lchown(path_os.as_ptr(), gid, gid) } != 0 {
65        return Err("Unable to set ownership".to_string());
66    }
67    Ok(())
68}
69
70fn create_home_directory(
71    info: &HomeDirectoryInfo,
72    home_prefix_path: &Path,
73    home_mount_prefix_path: Option<&PathBuf>,
74    home_strategy: &HomeStrategy,
75    use_etc_skel: bool,
76    use_selinux: bool,
77) -> Result<(), String> {
78    // Final sanity check to prevent certain classes of attacks. This should *never*
79    // be possible, but we assert this to be sure.
80    let name = info.name.trim_start_matches('.').replace(['/', '\\'], "");
81
82    debug!(?home_prefix_path, ?home_mount_prefix_path, ?info);
83
84    // This is where the users home dir "is" and aliases from here go to the true storage
85    // mounts
86    let home_prefix_path = home_prefix_path
87        .canonicalize()
88        .map_err(|e| format!("{e:?}"))?;
89
90    // This is where the storage is *mounted*. If not set, falls back to the home_prefix.
91    let home_mount_prefix_path = home_mount_prefix_path
92        .unwrap_or(&home_prefix_path)
93        .canonicalize()
94        .map_err(|e| format!("{e:?}"))?;
95
96    // Does our home_prefix actually exist?
97    if !home_prefix_path.exists() || !home_prefix_path.is_dir() || !home_prefix_path.is_absolute() {
98        return Err("Invalid home_prefix from configuration - home_prefix path must exist, must be a directory, and must be absolute (not relative)".to_string());
99    }
100
101    if !home_mount_prefix_path.exists()
102        || !home_mount_prefix_path.is_dir()
103        || !home_mount_prefix_path.is_absolute()
104    {
105        return Err("Invalid home_mount_prefix from configuration - home_prefix path must exist, must be a directory, and must be absolute (not relative)".to_string());
106    }
107
108    // This is now creating the actual home directory in the home_mount path.
109    // First we want to validate that the path is legitimate and hasn't tried
110    // to escape the home_mount prefix.
111    let hd_mount_path = Path::join(&home_mount_prefix_path, &name);
112
113    debug!(?hd_mount_path);
114
115    if let Some(pp) = hd_mount_path.parent() {
116        if pp != home_mount_prefix_path {
117            return Err("Invalid home directory name - not within home_mount_prefix".to_string());
118        }
119    } else {
120        return Err("Invalid/Corrupt home directory path - no prefix found".to_string());
121    }
122
123    // Get a handle to the SELinux labeling interface
124    debug!(?use_selinux, "selinux for home dir labeling");
125    #[cfg(all(target_family = "unix", feature = "selinux"))]
126    let labeler = if use_selinux {
127        selinux_util::SelinuxLabeler::new(info.gid, &home_mount_prefix_path)?
128    } else {
129        selinux_util::SelinuxLabeler::new_noop()
130    };
131
132    // In the ZFS strategy, we will need to check if the filesystem
133    // exists, rather than the location it's mounted to.
134    let hd_mount_path_exists = match home_strategy {
135        HomeStrategy::Symlink => hd_mount_path.exists(),
136        #[cfg(target_os = "linux")]
137        HomeStrategy::BindMount => hd_mount_path.exists(),
138    };
139
140    // Does the home directory exist? This is checking the *true* home mount storage.
141    if !hd_mount_path_exists {
142        // Set the SELinux security context for file creation
143        #[cfg(all(target_family = "unix", feature = "selinux"))]
144        labeler.do_setfscreatecon_for_path()?;
145
146        // This is affected by the mount strategy
147        // because in a future ZFS home dir setup, we'll need to be able to make
148        // the zfs volume for the user in this step.
149        match home_strategy {
150            HomeStrategy::Symlink => create_dir_path(&hd_mount_path, info)?,
151            #[cfg(target_os = "linux")]
152            HomeStrategy::BindMount => create_dir_path(&hd_mount_path, info)?,
153        }
154
155        // Copy in structure from /etc/skel/ if present
156        let skel_dir = Path::new("/etc/skel/");
157        if use_etc_skel && skel_dir.exists() {
158            info!("preparing homedir using /etc/skel");
159            for entry in WalkDir::new(skel_dir).into_iter().filter_map(|e| e.ok()) {
160                let dest = &hd_mount_path.join(
161                    entry
162                        .path()
163                        .strip_prefix(skel_dir)
164                        .map_err(|e| e.to_string())?,
165                );
166
167                #[cfg(all(target_family = "unix", feature = "selinux"))]
168                {
169                    let p = entry
170                        .path()
171                        .strip_prefix(skel_dir)
172                        .map_err(|e| e.to_string())?;
173                    labeler.label_path(p)?;
174                }
175
176                if entry.path().is_dir() {
177                    fs::create_dir_all(dest).map_err(|e| {
178                        error!(err = ?e, ?dest, "Unable to create directory from /etc/skel");
179                        e.to_string()
180                    })?;
181                } else {
182                    fs::copy(entry.path(), dest).map_err(|e| {
183                        error!(err = ?e, ?dest, "Unable to copy from /etc/skel");
184                        e.to_string()
185                    })?;
186                }
187                chown(dest, info.gid)?;
188
189                // Create equivalence rule in the SELinux policy
190                #[cfg(all(target_family = "unix", feature = "selinux"))]
191                labeler.setup_equivalence_rule(&hd_mount_path)?;
192            }
193        }
194    }
195
196    // Reset object creation SELinux context to default
197    #[cfg(all(target_family = "unix", feature = "selinux"))]
198    labeler.set_default_context_for_fs_objects()?;
199
200    let Some(alias) = info.alias.as_ref() else {
201        // No alias for the home dir, lets go.
202        debug!("No home directory alias present, success.");
203        return Ok(());
204    };
205
206    // Sanity check the alias.
207    // let alias = alias.replace(".", "").replace("/", "").replace("\\", "");
208    let alias = alias.trim_start_matches('.').replace(['/', '\\'], "");
209
210    let alias_path = Path::join(&home_prefix_path, &alias);
211
212    // Assert the resulting alias path is consistent and correct within the home_prefix.
213    if let Some(pp) = alias_path.parent() {
214        if pp != home_prefix_path {
215            return Err("Invalid home directory alias - not within home_prefix".to_string());
216        }
217    } else {
218        return Err("Invalid/Corrupt alias directory path - no prefix found".to_string());
219    }
220
221    match home_strategy {
222        HomeStrategy::Symlink => home_alias_update_symlink(&alias_path, &hd_mount_path),
223        #[cfg(target_os = "linux")]
224        HomeStrategy::BindMount => home_alias_update_bind_mount(&alias_path, &hd_mount_path),
225    }
226}
227
228fn create_dir_path(hd_mount_path: &Path, info: &HomeDirectoryInfo) -> Result<(), String> {
229    // Set a umask
230    let before = unsafe { umask(0o0027) };
231
232    // Create the home directory.
233    if let Err(e) = fs::create_dir_all(hd_mount_path) {
234        let _ = unsafe { umask(before) };
235        error!(err = ?e, ?hd_mount_path, "Unable to create directory");
236        return Err(format!("{e:?}"));
237    }
238    let _ = unsafe { umask(before) };
239
240    chown(hd_mount_path, info.gid)
241}
242
243#[cfg(target_os = "linux")]
244fn home_alias_update_bind_mount(alias_path: &Path, hd_mount_path: &Path) -> Result<(), String> {
245    if alias_path.exists() {
246        if alias_path.is_symlink() {
247            // If the alias_path is a symlink, remove it
248            if let Err(e) = remove_file(alias_path) {
249                error!("Unable to remove existing symlink at {alias_path:?}");
250                return Err(format!("{e:?}"));
251            }
252        } else if !alias_path.is_dir() {
253            // If it's anything other than a directory, we don't proceed.
254            error!("A non-directory item already exists at {alias_path:?}");
255            return Err(format!(
256                "A non-directory item already exists at {alias_path:?}"
257            ));
258        }
259    }
260
261    // Create mount point if it doesn't exist
262    if !alias_path.exists() {
263        if let Err(e) = create_dir(alias_path) {
264            error!("Unable to create bind mount target at {alias_path:?}");
265            return Err(format!("{e:?}"));
266        }
267    }
268
269    let current_mounts = Process::myself()
270        .map_err(|e| format!("While updating home directory bind mount, could not get reference to current process: {e}"))?
271        .mountinfo()
272        .map_err(|e| format!("While updating home directory bind mount, could not get mount info: {e}"))?;
273
274    // Remove conflicting mount if it exists:
275    let mismatching_mount = current_mounts.iter().find(|m| {
276        m.mount_point == alias_path && m.mount_source.as_ref().map(Path::new) != Some(hd_mount_path)
277    });
278
279    if let Some(m) = mismatching_mount {
280        nix::mount::umount(&m.mount_point).map_err(|e| {
281            format!(
282                "Unable to remove conflicting mount at {:?}: {e}",
283                &m.mount_point
284            )
285        })?;
286    }
287
288    // If mount point exists and is already correctly mounted, we are done
289    if current_mounts.iter().any(|m| {
290        m.mount_point == alias_path && m.mount_source.as_ref().map(Path::new) != Some(hd_mount_path)
291    }) {
292        return Ok(());
293    }
294
295    // Finally, try to create the bind mount
296    nix::mount::mount::<Path, Path, str, str>(
297        Some(hd_mount_path),
298        alias_path,
299        None,
300        MsFlags::MS_BIND,
301        None,
302    )
303    .map_err(|e| {
304        format!("Unable to bind mount home directory {hd_mount_path:?} to {alias_path:?}: {e}")
305    })?;
306
307    Ok(())
308}
309
310fn home_alias_update_symlink(alias_path: &Path, hd_mount_path: &Path) -> Result<(), String> {
311    if !alias_path.exists() {
312        // Does not exist. Create.
313        debug!("creating symlink {:?} -> {:?}", alias_path, hd_mount_path);
314        if let Err(e) = symlink(hd_mount_path, alias_path) {
315            error!(err = ?e, ?alias_path, "Unable to create alias path");
316            return Err(format!("{e:?}"));
317        }
318        return Ok(());
319    }
320
321    debug!("checking symlink {:?} -> {:?}", alias_path, hd_mount_path);
322    let attr = match fs::symlink_metadata(alias_path) {
323        Ok(a) => a,
324        Err(e) => {
325            error!(err = ?e, ?alias_path, "Unable to read alias path metadata");
326            return Err(format!("{e:?}"));
327        }
328    };
329
330    if !attr.file_type().is_symlink() {
331        warn!(
332            ?alias_path,
333            ?hd_mount_path,
334            "home directory alias path is not a symlink, unable to update"
335        );
336        return Ok(());
337    }
338
339    // If already correct, skip churn.
340    match fs::read_link(alias_path) {
341        Ok(current_target) if current_target == hd_mount_path => {
342            debug!(
343                ?alias_path,
344                ?current_target,
345                "alias symlink already correct, skipping update"
346            );
347            return Ok(());
348        }
349        Ok(current_target) => {
350            debug!(
351                ?alias_path,
352                ?current_target,
353                ?hd_mount_path,
354                "alias symlink target differs, updating atomically"
355            );
356        }
357        Err(e) => {
358            warn!(
359                err=?e, ?alias_path,
360                "unable to read existing symlink target, will replace atomically"
361            );
362        }
363    }
364
365    // Atomic replace: create temp symlink in same dir, then rename over existing.
366    let alias_path_tmp = alias_path.with_extension("tmp");
367
368    if alias_path_tmp.exists() {
369        debug!("checking symlink temp {:?}", alias_path_tmp);
370        let attr = match fs::symlink_metadata(&alias_path_tmp) {
371            Ok(a) => a,
372            Err(e) => {
373                error!(err = ?e, ?alias_path_tmp, "Unable to read alias path temp metadata");
374                return Err(format!("{e:?}"));
375            }
376        };
377
378        if !attr.file_type().is_symlink() {
379            warn!(
380                ?alias_path,
381                ?alias_path_tmp,
382                ?hd_mount_path,
383                "home directory alias path temporary update location already exists, and is not a symlink, unable to update"
384            );
385            return Ok(());
386        }
387    }
388
389    // Best-effort cleanup of any stale tmp.
390    let _ = fs::remove_file(&alias_path_tmp);
391
392    if let Err(e) = symlink(hd_mount_path, &alias_path_tmp) {
393        error!(err=?e, ?alias_path_tmp, "Unable to create temporary alias symlink");
394        return Err(format!("{e:?}"));
395    }
396
397    // Rename is atomic within the same directory; no disappearance window.
398    if let Err(e) = fs::rename(&alias_path_tmp, alias_path) {
399        error!(err=?e, from=?alias_path_tmp, to=?alias_path, "Unable to atomically replace alias symlink");
400        // Cleanup temp on failure.
401        let _ = fs::remove_file(&alias_path_tmp);
402        return Err(format!("{e:?}"));
403    }
404
405    debug!(
406        "alias symlink updated atomically {:?} -> {:?}",
407        alias_path, hd_mount_path
408    );
409
410    Ok(())
411}
412
413async fn shadow_reload_task(
414    shadow_data_watch_tx: watch::Sender<EtcDb>,
415    mut shadow_broadcast_rx: broadcast::Receiver<bool>,
416) {
417    debug!("shadow reload task has started ...");
418
419    while shadow_broadcast_rx.recv().await.is_ok() {
420        match process_etc_passwd_group().await {
421            Ok(etc_db) => {
422                shadow_data_watch_tx.send_replace(etc_db);
423                debug!("shadow reload task sent");
424            }
425            Err(()) => {
426                error!("Unable to process etc db");
427                continue;
428            }
429        }
430    }
431
432    debug!("shadow reload task has stopped");
433}
434
435async fn handle_shadow_reload(shadow_data_watch_rx: &mut watch::Receiver<EtcDb>) -> TaskResponse {
436    debug!("Received shadow reload event.");
437    let etc_db: EtcDb = {
438        let etc_db_ref = shadow_data_watch_rx.borrow_and_update();
439        (*etc_db_ref).clone()
440    };
441    // process etc shadow and send it here.
442    TaskResponse::NotifyShadowChange(etc_db)
443}
444
445async fn handle_unixd_request(
446    request: Option<Result<TaskRequestFrame, io::Error>>,
447    cfg: &UnixdConfig,
448) -> Result<TaskResponse, ()> {
449    debug!("Received unixd event.");
450    match request {
451        Some(Ok(TaskRequestFrame {
452            id,
453            req: TaskRequest::HomeDirectory(info),
454        })) => {
455            debug!("Received task -> HomeDirectory({:?})", info);
456
457            match create_home_directory(
458                &info,
459                cfg.home_prefix.as_ref(),
460                cfg.home_mount_prefix.as_ref(),
461                &cfg.home_strategy,
462                cfg.use_etc_skel,
463                cfg.selinux,
464            ) {
465                Ok(()) => Ok(TaskResponse::Success(id)),
466                Err(msg) => Ok(TaskResponse::Error(msg)),
467            }
468        }
469        other => {
470            error!("Error -> got un-handled Request Frame {other:?}");
471            Err(())
472        }
473    }
474}
475
476async fn handle_tasks(
477    stream: UnixStream,
478    ctl_broadcast_rx: &mut broadcast::Receiver<bool>,
479    shadow_data_watch_rx: &mut watch::Receiver<EtcDb>,
480    cfg: &UnixdConfig,
481) {
482    let codec: JsonCodec<TaskRequestFrame, TaskResponse> = JsonCodec::default();
483
484    let mut reqs = Framed::new(stream, codec);
485
486    // Immediately trigger that we should reload the shadow files for the new connected handler
487    shadow_data_watch_rx.mark_changed();
488
489    debug!("Task handler loop has started ...");
490
491    loop {
492        let msg = tokio::select! {
493            biased; // tell tokio to poll these in order
494            _ = ctl_broadcast_rx.recv() => {
495                // We received a shutdown signal.
496                debug!("Received shutdown signal, breaking task handler loop ...");
497                return
498            }
499            // We bias to *sending* messages in tasks.
500            Ok(_) = shadow_data_watch_rx.changed() => {
501                handle_shadow_reload(shadow_data_watch_rx).await
502            }
503            request = reqs.next() => {
504                match handle_unixd_request(request,  cfg).await {
505                    Ok(response) => {
506                       response
507                    }
508                    Err(_) => {
509                        error!("Error handling request, exiting task handler loop ...");
510                        return;
511                    }
512                }
513            }
514        };
515
516        if let Err(e) = reqs.send(msg).await {
517            error!(?e, "Error sending response to kanidm_unixd");
518            return;
519        }
520    }
521}
522
523#[instrument(level = "debug", skip_all)]
524async fn process_etc_passwd_group() -> Result<EtcDb, ()> {
525    let mut file = File::open(SYSTEM_PASSWD_PATH).await.map_err(|err| {
526        error!(?err);
527    })?;
528    let mut contents = vec![];
529    file.read_to_end(&mut contents).await.map_err(|err| {
530        error!(?err);
531    })?;
532
533    let users = parse_etc_passwd(contents.as_slice())
534        .map_err(|_| "Invalid passwd content")
535        .map_err(|err| {
536            error!(?err);
537        })?;
538
539    let mut file = File::open(SYSTEM_SHADOW_PATH).await.map_err(|err| {
540        error!(?err);
541    })?;
542    let mut contents = vec![];
543    file.read_to_end(&mut contents).await.map_err(|err| {
544        error!(?err);
545    })?;
546
547    let shadow = parse_etc_shadow(contents.as_slice())
548        .map_err(|_| "Invalid passwd content")
549        .map_err(|err| {
550            error!(?err);
551        })?;
552
553    let mut file = File::open(SYSTEM_GROUP_PATH).await.map_err(|err| {
554        error!(?err);
555    })?;
556    let mut contents = vec![];
557    file.read_to_end(&mut contents).await.map_err(|err| {
558        error!(?err);
559    })?;
560
561    let groups = parse_etc_group(contents.as_slice())
562        .map_err(|_| "Invalid group content")
563        .map_err(|err| {
564            error!(?err);
565        })?;
566
567    Ok(EtcDb {
568        users,
569        shadow,
570        groups,
571    })
572}
573
574fn setup_shadow_inotify_watcher(
575    shadow_broadcast_tx: broadcast::Sender<bool>,
576) -> Result<Debouncer<RecommendedWatcher, RecommendedCache>, ExitCode> {
577    let watcher = new_debouncer(
578        Duration::from_secs(5),
579        None,
580        move |event: Result<Vec<DebouncedEvent>, _>| {
581            let array_of_events = match event {
582                Ok(events) => events,
583                Err(array_errors) => {
584                    for err in array_errors {
585                        error!(?err, "inotify debounce error");
586                    }
587                    return;
588                }
589            };
590
591            let mut path_of_interest_was_changed = false;
592
593            for inode_event in array_of_events.iter() {
594                if !inode_event.kind.is_access()
595                    && inode_event.paths.iter().any(|path| {
596                        path == Path::new(SYSTEM_GROUP_PATH)
597                            || path == Path::new(SYSTEM_PASSWD_PATH)
598                            || path == Path::new(SYSTEM_SHADOW_PATH)
599                    })
600                {
601                    debug!(?inode_event, "Handling inotify modification event");
602
603                    path_of_interest_was_changed = true
604                }
605            }
606
607            if path_of_interest_was_changed {
608                let _ = shadow_broadcast_tx.send(true);
609            } else {
610                trace!(?array_of_events, "IGNORED");
611            }
612        },
613    )
614    .and_then(|mut debouncer| {
615        debouncer
616            .watch(Path::new("/etc"), RecursiveMode::Recursive)
617            .map(|()| debouncer)
618    });
619
620    watcher.map_err(|err| {
621        error!(?err, "Failed to setup inotify");
622        ExitCode::FAILURE
623    })
624}
625
626#[tokio::main(flavor = "current_thread")]
627async fn main() -> ExitCode {
628    // On linux when debug assertions are disabled, prevent ptrace
629    // from attaching to us.
630    #[cfg(all(target_os = "linux", not(debug_assertions)))]
631    if let Err(code) = prctl::set_dumpable(false) {
632        error!(?code, "CRITICAL: Unable to set prctl flags");
633        return ExitCode::FAILURE;
634    }
635    // let cuid = get_current_uid();
636    // let cgid = get_current_gid();
637    // We only need to check effective id
638    let ceuid = get_effective_uid();
639    let cegid = get_effective_gid();
640
641    for arg in std::env::args() {
642        if arg.contains("--version") {
643            println!("kanidm_unixd_tasks {}", env!("CARGO_PKG_VERSION"));
644            return ExitCode::SUCCESS;
645        } else if arg.contains("--help") {
646            println!("kanidm_unixd_tasks {}", env!("CARGO_PKG_VERSION"));
647            println!("Usage: kanidm_unixd_tasks");
648            println!("  --version");
649            println!("  --help");
650            return ExitCode::SUCCESS;
651        }
652    }
653
654    #[allow(clippy::expect_used)]
655    tracing_forest::worker_task()
656        .set_global(true)
657        // Fall back to stderr
658        .map_sender(|sender| sender.or_stderr())
659        .build_on(|subscriber| {
660            subscriber.with(
661                EnvFilter::try_from_default_env()
662                    .or_else(|_| EnvFilter::try_new("info"))
663                    .expect("Failed to init envfilter"),
664            )
665        })
666        .on(async {
667            if ceuid != 0 || cegid != 0 {
668                error!("Refusing to run - this process *MUST* operate as root.");
669                return ExitCode::FAILURE;
670            }
671
672            let unixd_path = Path::new(DEFAULT_CONFIG_PATH);
673            let unixd_path_str = match unixd_path.to_str() {
674                Some(cps) => cps,
675                None => {
676                    error!("Unable to turn unixd_path to str");
677                    return ExitCode::FAILURE;
678                }
679            };
680
681            let cfg = match UnixdConfig::new().read_options_from_optional_config(unixd_path) {
682                Ok(v) => v,
683                Err(_) => {
684                    error!("Failed to parse {}", unixd_path_str);
685                    return ExitCode::FAILURE;
686                }
687            };
688
689            let task_sock_path = cfg.task_sock_path.clone();
690            debug!("Attempting to use {} ...", task_sock_path);
691
692            // This is the startup/shutdown control channel
693            let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
694            let mut d_broadcast_rx = broadcast_tx.subscribe();
695
696            // This is to broadcast when we need to reload the shadow
697            // files.
698            let (shadow_broadcast_tx, shadow_broadcast_rx) = broadcast::channel(4);
699
700            let watcher = match setup_shadow_inotify_watcher(shadow_broadcast_tx.clone()) {
701                Ok(w) => w,
702                Err(exit) => return exit,
703            };
704
705            // Setup the etcdb watch
706            let etc_db = match process_etc_passwd_group().await {
707                Ok(etc_db) => etc_db,
708                Err(err) => {
709                    warn!(?err, "unable to process {SYSTEM_PASSWD_PATH} and related files.");
710                    // Return an empty set instead.
711                    EtcDb::default()
712                }
713            };
714
715            let (shadow_data_watch_tx, mut shadow_data_watch_rx) = watch::channel(etc_db);
716
717            let _shadow_task = tokio::spawn(async move {
718                shadow_reload_task(
719                    shadow_data_watch_tx, shadow_broadcast_rx
720                ).await
721            });
722
723            let server = tokio::spawn(async move {
724                loop {
725                    info!("Attempting to connect to kanidm_unixd ...");
726
727                    tokio::select! {
728                        _ = broadcast_rx.recv() => {
729                            break;
730                        }
731                        connect_res = UnixStream::connect(&task_sock_path) => {
732                            match connect_res {
733                                Ok(stream) => {
734                                    info!("Found kanidm_unixd, waiting for tasks ...");
735
736                                    // Yep! Now let the main handler do its job.
737                                    // If it returns (dc, etc, then we loop and try again).
738                                    handle_tasks(stream, &mut d_broadcast_rx, &mut shadow_data_watch_rx, &cfg).await;
739                                    continue;
740                                }
741                                Err(e) => {
742                                    debug!("\\---> {:?}", e);
743                                    error!("Unable to find kanidm_unixd, sleeping ...");
744                                    // Back off.
745                                    time::sleep(Duration::from_millis(5000)).await;
746                                }
747                            }
748                        }
749                    } // select
750                } // loop
751            });
752
753            info!("Server started ...");
754
755            // On linux, notify systemd.
756            #[cfg(target_os = "linux")]
757            let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]);
758
759            loop {
760                tokio::select! {
761                    Ok(()) = tokio::signal::ctrl_c() => {
762                        break
763                    }
764                    Some(()) = async move {
765                        let sigterm = tokio::signal::unix::SignalKind::terminate();
766                        #[allow(clippy::unwrap_used)]
767                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
768                    } => {
769                        break
770                    }
771                    Some(()) = async move {
772                        let sigterm = tokio::signal::unix::SignalKind::alarm();
773                        #[allow(clippy::unwrap_used)]
774                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
775                    } => {
776                        // Ignore
777                    }
778                    Some(()) = async move {
779                        let sigterm = tokio::signal::unix::SignalKind::hangup();
780                        #[allow(clippy::unwrap_used)]
781                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
782                    } => {
783                        // Ignore
784                    }
785                    Some(()) = async move {
786                        let sigterm = tokio::signal::unix::SignalKind::user_defined1();
787                        #[allow(clippy::unwrap_used)]
788                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
789                    } => {
790                        // Ignore
791                    }
792
793                    Some(()) = async move {
794                        let sigterm = tokio::signal::unix::SignalKind::user_defined2();
795                        #[allow(clippy::unwrap_used)]
796                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
797                    } => {
798                        // Ignore
799                    }
800                }
801            }
802            info!("Signal received, shutting down");
803            // Send a broadcast that we are done.
804            if let Err(e) = broadcast_tx.send(true) {
805                error!("Unable to shutdown workers {:?}", e);
806            }
807
808            debug!("Dropping inotify watcher ...");
809            drop(watcher);
810
811            let _ = server.await;
812            ExitCode::SUCCESS
813        })
814        .await
815}