kanidm_ldap_sync/
main.rs

1#![deny(warnings)]
2#![warn(unused_extern_crates)]
3#![deny(clippy::todo)]
4#![deny(clippy::unimplemented)]
5#![deny(clippy::unwrap_used)]
6#![deny(clippy::panic)]
7#![deny(clippy::unreachable)]
8#![deny(clippy::await_holding_lock)]
9#![deny(clippy::needless_pass_by_value)]
10#![deny(clippy::trivially_copy_pass_by_ref)]
11// We allow expect since it forces good error messages at the least.
12#![allow(clippy::expect_used)]
13
14use crate::config::{Config, EntryConfig, GroupAttrSchema};
15use crate::error::SyncError;
16use chrono::Utc;
17use clap::Parser;
18use cron::Schedule;
19use kanidm_client::KanidmClientBuilder;
20use kanidm_lib_file_permissions::readonly as file_permissions_readonly;
21use kanidm_proto::constants::ATTR_OBJECTCLASS;
22use kanidm_proto::scim_v1::{
23    MultiValueAttr, ScimEntry, ScimSshPubKey, ScimSyncGroup, ScimSyncPerson, ScimSyncRequest,
24    ScimSyncRetentionMode, ScimSyncState,
25};
26#[cfg(target_family = "unix")]
27use kanidm_utils_users::{get_current_gid, get_current_uid, get_effective_gid, get_effective_uid};
28use kanidmd_lib::prelude::Attribute;
29use ldap3_client::{
30    proto::{self, LdapFilter},
31    LdapClient, LdapClientBuilder, LdapSyncRepl, LdapSyncReplEntry, LdapSyncStateValue,
32};
33use std::collections::{BTreeMap, BTreeSet};
34use std::fs::metadata;
35use std::fs::File;
36use std::io::Read;
37#[cfg(target_family = "unix")]
38use std::os::unix::fs::MetadataExt;
39use std::path::{Path, PathBuf};
40use std::str::FromStr;
41use std::sync::atomic::AtomicBool;
42use std::sync::atomic::Ordering;
43use std::sync::Arc;
44use std::thread;
45use std::time::Duration;
46use tokio::io::AsyncWriteExt;
47use tokio::net::TcpListener;
48use tokio::runtime;
49use tokio::sync::broadcast;
50use tokio::time::sleep;
51use tracing::{debug, error, info, warn};
52use tracing_subscriber::prelude::*;
53use tracing_subscriber::{fmt, EnvFilter};
54
55mod config;
56mod error;
57
58include!("./opt.rs");
59
60async fn driver_main(opt: Opt) -> Result<(), ()> {
61    debug!("Starting kanidm ldap sync driver.");
62
63    let mut f = match File::open(&opt.ldap_sync_config) {
64        Ok(f) => f,
65        Err(e) => {
66            error!(
67                "Unable to open ldap sync config from '{}' [{:?}] 🥺",
68                &opt.ldap_sync_config.display(),
69                e
70            );
71            return Err(());
72        }
73    };
74
75    let mut contents = String::new();
76    if let Err(e) = f.read_to_string(&mut contents) {
77        error!(
78            "unable to read file '{}': {:?}",
79            &opt.ldap_sync_config.display(),
80            e
81        );
82        return Err(());
83    };
84
85    let sync_config: Config = match toml::from_str(contents.as_str()) {
86        Ok(c) => c,
87        Err(e) => {
88            eprintln!(
89                "Unable to parse config from '{}' error: {:?}",
90                &opt.ldap_sync_config.display(),
91                e
92            );
93            return Err(());
94        }
95    };
96
97    debug!(?sync_config);
98
99    let cb = match KanidmClientBuilder::new().read_options_from_optional_config(&opt.client_config)
100    {
101        Ok(v) => v,
102        Err(_) => {
103            error!("Failed to parse {}", opt.client_config.to_string_lossy());
104            return Err(());
105        }
106    };
107
108    let expression = sync_config.schedule.as_deref().unwrap_or("0 */5 * * * * *");
109
110    let schedule = match Schedule::from_str(expression) {
111        Ok(s) => s,
112        Err(_) => {
113            error!("Failed to parse cron schedule expression");
114            return Err(());
115        }
116    };
117
118    if opt.schedule {
119        let last_op_status = Arc::new(AtomicBool::new(true));
120        let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
121
122        let last_op_status_c = last_op_status.clone();
123
124        // Can we setup the socket for status?
125
126        let status_handle = if let Some(sb) = sync_config.status_bind.as_deref() {
127            // Can we bind?
128            let listener = match TcpListener::bind(sb).await {
129                Ok(l) => l,
130                Err(e) => {
131                    error!(?e, "Failed to bind status socket");
132                    return Err(());
133                }
134            };
135
136            info!("Status listener is started on {:?}", sb);
137            // Detach a status listener.
138            let status_rx = broadcast_tx.subscribe();
139            Some(tokio::spawn(async move {
140                status_task(listener, status_rx, last_op_status_c).await
141            }))
142        } else {
143            warn!("No status listener configured, this will prevent you monitoring the sync tool");
144            None
145        };
146
147        // main driver loop
148        let driver_handle = tokio::spawn(async move {
149            loop {
150                let now = Utc::now();
151                let next_time = match schedule.after(&now).next() {
152                    Some(v) => v,
153                    None => {
154                        error!("Failed to access any future scheduled events, terminating.");
155                        break;
156                    }
157                };
158
159                // If we don't do 1 + here we can trigger the event multiple times
160                // rapidly since we are in the same second.
161                let wait_seconds = 1 + (next_time - now).num_seconds() as u64;
162                info!("next sync on {}, wait_time = {}s", next_time, wait_seconds);
163
164                tokio::select! {
165                    _ = broadcast_rx.recv() => {
166                        // stop the event loop!
167                        break;
168                    }
169                    _ = sleep(Duration::from_secs(wait_seconds)) => {
170                        info!("starting sync ...");
171                        match run_sync(cb.clone(), &sync_config, &opt).await {
172                            Ok(_) => last_op_status.store(true, Ordering::Relaxed),
173                            Err(e) => {
174                                error!(?e, "sync completed with error");
175                                last_op_status.store(false, Ordering::Relaxed)
176                            }
177                        };
178                    }
179                }
180            }
181            info!("Stopped sync driver");
182        });
183
184        // TODO: this loop/handler should be generic across the various crates
185        // Block on signals now.
186        loop {
187            #[cfg(target_family = "unix")]
188            {
189                tokio::select! {
190                    Ok(()) = tokio::signal::ctrl_c() => {
191                        break
192                    }
193                    Some(()) = async move {
194                        let sigterm = tokio::signal::unix::SignalKind::terminate();
195                        #[allow(clippy::unwrap_used)]
196                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
197                    } => {
198                        break
199                    }
200                    Some(()) = async move {
201                        let sigterm = tokio::signal::unix::SignalKind::alarm();
202                        #[allow(clippy::unwrap_used)]
203                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
204                    } => {
205                        // Ignore
206                    }
207                    Some(()) = async move {
208                        let sigterm = tokio::signal::unix::SignalKind::hangup();
209                        #[allow(clippy::unwrap_used)]
210                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
211                    } => {
212                        // Ignore
213                    }
214                    Some(()) = async move {
215                        let sigterm = tokio::signal::unix::SignalKind::user_defined1();
216                        #[allow(clippy::unwrap_used)]
217                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
218                    } => {
219                        // Ignore
220                    }
221                    Some(()) = async move {
222                        let sigterm = tokio::signal::unix::SignalKind::user_defined2();
223                        #[allow(clippy::unwrap_used)]
224                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
225                    } => {
226                        // Ignore
227                    }
228                }
229            }
230            #[cfg(target_family = "windows")]
231            {
232                tokio::select! {
233                    Ok(()) = tokio::signal::ctrl_c() => {
234                        break
235                    }
236                }
237            }
238        }
239
240        broadcast_tx
241            .send(true)
242            .expect("Failed to trigger a clean shutdown!");
243
244        let _ = driver_handle.await;
245        if let Some(sh) = status_handle {
246            let _ = sh.await;
247        }
248    } else if let Err(e) = run_sync(cb, &sync_config, &opt).await {
249        error!(?e, "Sync completed with error");
250    }
251    Ok(())
252}
253
254async fn run_sync(
255    cb: KanidmClientBuilder,
256    sync_config: &Config,
257    opt: &Opt,
258) -> Result<(), SyncError> {
259    let rsclient = match cb.build() {
260        Ok(rsc) => rsc,
261        Err(_e) => {
262            error!("Failed to build async client");
263            return Err(SyncError::ClientConfig);
264        }
265    };
266
267    rsclient.set_token(sync_config.sync_token.clone()).await;
268
269    // Preflight check.
270    //  * can we connect to ldap?
271    let mut ldap_client = match LdapClientBuilder::new(&sync_config.ldap_uri)
272        .max_ber_size(sync_config.max_ber_size)
273        .add_tls_ca(&sync_config.ldap_ca)
274        .build()
275        .await
276    {
277        Ok(lc) => lc,
278        Err(e) => {
279            error!(?e, "Failed to connect to ldap");
280            return Err(SyncError::LdapConn);
281        }
282    };
283
284    match ldap_client
285        .bind(
286            sync_config.ldap_sync_dn.clone(),
287            sync_config.ldap_sync_pw.clone(),
288        )
289        .await
290    {
291        Ok(()) => {
292            debug!(ldap_sync_dn = ?sync_config.ldap_sync_dn, ldap_uri = %sync_config.ldap_uri);
293        }
294        Err(e) => {
295            error!(?e, "Failed to bind (authenticate) to freeldap");
296            return Err(SyncError::LdapAuth);
297        }
298    };
299
300    //  * can we connect to kanidm?
301    // - get the current sync cookie from kanidm.
302    let scim_sync_status = match rsclient.scim_v1_sync_status().await {
303        Ok(s) => s,
304        Err(e) => {
305            error!(?e, "Failed to access scim sync status");
306            return Err(SyncError::SyncStatus);
307        }
308    };
309
310    debug!(state=?scim_sync_status);
311
312    // === Everything is connected! ===
313
314    let mode = proto::SyncRequestMode::RefreshOnly;
315
316    let cookie = match &scim_sync_status {
317        ScimSyncState::Refresh => None,
318        ScimSyncState::Active { cookie } => Some(cookie.to_vec()),
319    };
320
321    let filter = sync_config.ldap_filter.clone();
322
323    debug!(ldap_sync_base_dn = ?sync_config.ldap_sync_base_dn, ?cookie, ?mode, ?filter);
324    let sync_result = match ldap_client
325        .syncrepl(sync_config.ldap_sync_base_dn.clone(), filter, cookie, mode)
326        .await
327    {
328        Ok(results) => results,
329        Err(e) => {
330            error!(?e, "Failed to perform syncrepl from ldap");
331            return Err(SyncError::LdapSyncrepl);
332        }
333    };
334
335    if opt.proto_dump {
336        let stdout = std::io::stdout();
337        if let Err(e) = serde_json::to_writer_pretty(stdout, &sync_result) {
338            error!(?e, "Failed to serialise ldap sync response");
339        }
340    }
341
342    let scim_sync_request = match sync_result {
343        LdapSyncRepl::Success {
344            cookie,
345            refresh_deletes: _,
346            mut entries,
347            delete_uuids,
348            present_uuids,
349        } => {
350            // refresh deletes is true only on the first refresh from openldap, implying
351            // to delete anything not marked as present. In otherwords
352            // refresh_deletes means to assert the content as it exists from the ldap server
353            // in the openldap case. For our purpose, we can use this to mean "present phase" since
354            // that will imply that all non present entries are purged.
355
356            let to_state = if let Some(cookie) = cookie {
357                // Only update the cookie if it's present - openldap omits!
358                ScimSyncState::Active { cookie }
359            } else {
360                info!("no changes required");
361                return Ok(());
362            };
363
364            let retain = match (delete_uuids, present_uuids) {
365                (None, None) => {
366                    // if delete_phase == false && present_phase == false
367                    //     Only update entries if they are present in the *add* state.
368                    //     Generally also means do nothing with other entries, no updates for example.
369                    //
370                    //     This is the state of 389-ds with no deletes *and* entries are updated *only*.
371                    //     This is the state for openldap and 389-ds when there are no changes to apply.
372                    ScimSyncRetentionMode::Ignore
373                }
374                (Some(d_uuids), None) => {
375                    //    update entries that are in Add state, delete from delete uuids.
376                    //
377                    //    This only occurs on 389-ds, which sends a list of deleted uuids as required.
378                    ScimSyncRetentionMode::Delete(d_uuids)
379                }
380                (None, Some(p_uuids)) => {
381                    //    update entries in Add state, assert entry is live from present_uuids
382                    //    NOTE! Even if an entry is updated, it will also be in the present phase set. This
383                    //    means we can use present_set > 0 as an indicator too.
384                    //
385                    //    This occurs only on openldap, where present phase lists all uuids in the filter set
386                    //    *and* includes all entries that are updated at the same time.
387                    ScimSyncRetentionMode::Retain(p_uuids)
388                }
389                (Some(_), Some(_)) => {
390                    //    error! No Ldap server emits this!
391                    error!("Ldap server provided an invalid sync repl response - unable to have both delete and present phases.");
392                    return Err(SyncError::LdapStateInvalid);
393                }
394            };
395
396            if matches!(sync_config.group_attr_schema, GroupAttrSchema::Rfc2307) {
397                // Since the schema is rfc 2307, this means that the names of members
398                // in any group are uids, not dn's, so we need to resolve these now.
399                resolve_member_uid_to_dn(&mut ldap_client, &mut entries, sync_config)
400                    .await
401                    .map_err(|_| SyncError::Preprocess)?;
402            };
403
404            let entries = match process_ldap_sync_result(entries, sync_config).await {
405                Ok(ssr) => ssr,
406                Err(()) => {
407                    error!("Failed to process IPA entries to SCIM");
408                    return Err(SyncError::Preprocess);
409                }
410            };
411
412            ScimSyncRequest {
413                from_state: scim_sync_status,
414                to_state,
415                entries,
416                retain,
417            }
418        }
419        LdapSyncRepl::RefreshRequired => {
420            let to_state = ScimSyncState::Refresh;
421
422            ScimSyncRequest {
423                from_state: scim_sync_status,
424                to_state,
425                entries: Vec::new(),
426                retain: ScimSyncRetentionMode::Ignore,
427            }
428        }
429    };
430
431    if opt.proto_dump {
432        let stdout = std::io::stdout();
433        // write it out.
434        if let Err(e) = serde_json::to_writer_pretty(stdout, &scim_sync_request) {
435            error!(?e, "Failed to serialise scim sync request");
436        };
437        Ok(())
438    } else if opt.dry_run {
439        info!("dry-run complete");
440        info!("Success!");
441        Ok(())
442    } else if let Err(e) = rsclient.scim_v1_sync_update(&scim_sync_request).await {
443        error!(
444            ?e,
445            "Failed to submit scim sync update - see the kanidmd server log for more details."
446        );
447        Err(SyncError::SyncUpdate)
448    } else {
449        info!("Success!");
450        Ok(())
451    }
452    // done!
453}
454
455async fn resolve_member_uid_to_dn(
456    ldap_client: &mut LdapClient,
457    ldap_entries: &mut [LdapSyncReplEntry],
458    sync_config: &Config,
459) -> Result<(), ()> {
460    let mut lookup_cache: BTreeMap<String, String> = Default::default();
461
462    for sync_entry in ldap_entries.iter_mut() {
463        let oc = sync_entry
464            .entry
465            .attrs
466            .get(ATTR_OBJECTCLASS)
467            .ok_or_else(|| {
468                error!("Invalid entry - no object class {}", sync_entry.entry.dn);
469            })?;
470
471        if !oc.contains(&sync_config.group_objectclass) {
472            // Not a group, skip.
473            continue;
474        }
475
476        // It's a group, does it have memberUid? We pop this out here
477        // because we plan to replace it.
478        let members = sync_entry
479            .entry
480            .remove_ava(&sync_config.group_attr_member)
481            .unwrap_or_default();
482
483        // Now, search all the members to dns.
484        let mut resolved_members: BTreeSet<String> = Default::default();
485
486        for member_uid in members {
487            if let Some(member_dn) = lookup_cache.get(&member_uid) {
488                resolved_members.insert(member_dn.to_string());
489            } else {
490                // Not in cache, search it. We use a syncrepl request here as this
491                // can bypass some query limits. Note we set the sync cookie to None.
492                let filter = LdapFilter::And(vec![
493                    // Always put uid first as openldap can't query optimise.
494                    LdapFilter::Equality(
495                        sync_config.person_attr_user_name.clone(),
496                        member_uid.clone(),
497                    ),
498                    LdapFilter::Equality(
499                        ATTR_OBJECTCLASS.into(),
500                        sync_config.person_objectclass.clone(),
501                    ),
502                ]);
503
504                let mode = proto::SyncRequestMode::RefreshOnly;
505                let sync_result = ldap_client
506                    .syncrepl(sync_config.ldap_sync_base_dn.clone(), filter, None, mode)
507                    .await
508                    .map_err(|err| {
509                        debug!(?member_uid, ?sync_entry.entry_uuid);
510                        error!(
511                            ?err,
512                            "Failed to perform syncrepl to resolve members from ldap"
513                        );
514                    })?;
515
516                // Get the memberDN out now.
517                let member_dn = match sync_result {
518                    LdapSyncRepl::Success { mut entries, .. } => {
519                        let Some(resolved_entry) = entries.pop() else {
520                            warn!(?member_uid, "Unable to resolve member, no matching entries");
521                            continue;
522                        };
523
524                        resolved_entry.entry.dn.clone()
525                    }
526                    _ => {
527                        error!("Invalid sync repl result state");
528                        return Err(());
529                    }
530                };
531
532                // cache it.
533                lookup_cache.insert(member_uid, member_dn.clone());
534                resolved_members.insert(member_dn);
535            }
536        }
537
538        // Put the members back in resolved as DN's now.
539        sync_entry
540            .entry
541            .attrs
542            .insert(sync_config.group_attr_member.clone(), resolved_members);
543    }
544
545    Ok(())
546}
547
548async fn process_ldap_sync_result(
549    ldap_entries: Vec<LdapSyncReplEntry>,
550    sync_config: &Config,
551) -> Result<Vec<ScimEntry>, ()> {
552    // Future - make this par-map
553    ldap_entries
554        .into_iter()
555        .filter_map(|lentry| {
556            let e_config = sync_config
557                .entry_map
558                .get(&lentry.entry_uuid)
559                .cloned()
560                .unwrap_or_default();
561
562            match ldap_to_scim_entry(lentry, &e_config, sync_config) {
563                Ok(Some(e)) => Some(Ok(e)),
564                Ok(None) => None,
565                Err(()) => Some(Err(())),
566            }
567        })
568        .collect::<Result<Vec<_>, _>>()
569}
570
571fn ldap_to_scim_entry(
572    sync_entry: LdapSyncReplEntry,
573    entry_config: &EntryConfig,
574    sync_config: &Config,
575) -> Result<Option<ScimEntry>, ()> {
576    debug!("{:#?}", sync_entry);
577
578    // check the sync_entry state?
579    #[allow(clippy::unimplemented)]
580    if sync_entry.state != LdapSyncStateValue::Add {
581        unimplemented!();
582    }
583
584    let dn = sync_entry.entry.dn.clone();
585
586    // Is this an entry we need to observe/look at?
587    if entry_config.exclude {
588        info!("entry_config excludes {}", dn);
589        return Ok(None);
590    }
591
592    let oc = sync_entry
593        .entry
594        .attrs
595        .get(ATTR_OBJECTCLASS)
596        .ok_or_else(|| {
597            error!("Invalid entry - no object class {}", dn);
598        })?;
599
600    if oc.contains(&sync_config.person_objectclass) {
601        let LdapSyncReplEntry {
602            entry_uuid,
603            state: _,
604            mut entry,
605        } = sync_entry;
606
607        let id = if let Some(map_uuid) = &entry_config.map_uuid {
608            *map_uuid
609        } else {
610            entry_uuid
611        };
612
613        let user_name = if let Some(name) = entry_config.map_name.clone() {
614            name
615        } else {
616            entry
617                .get_ava_single(&sync_config.person_attr_user_name)
618                .ok_or_else(|| {
619                    error!(
620                        "Missing required attribute {} (person_attr_user_name)",
621                        sync_config.person_attr_user_name
622                    );
623                })?
624                .to_owned()
625        };
626
627        let display_name = entry
628            .get_ava_single(&sync_config.person_attr_display_name)
629            .ok_or_else(|| {
630                error!(
631                    "Missing required attribute {} (person_attr_display_name)",
632                    sync_config.person_attr_display_name
633                );
634            })?
635            .to_owned();
636
637        let gidnumber = if let Some(number) = entry_config.map_gidnumber {
638            Some(number)
639        } else {
640            entry
641                .get_ava_single(&sync_config.person_attr_gidnumber)
642                .map(|gid| {
643                    u32::from_str(gid).map_err(|_| {
644                        error!(
645                            "Invalid gidnumber - {} is not a u32 (person_attr_gidnumber)",
646                            sync_config.person_attr_gidnumber
647                        );
648                    })
649                })
650                .transpose()?
651        };
652
653        let password_import = entry
654            .get_ava_single(&sync_config.person_attr_password)
655            .map(str::to_string);
656
657        let password_import = if let Some(pw_prefix) = sync_config.person_password_prefix.as_ref() {
658            password_import.map(|s| format!("{}{}", pw_prefix, s))
659        } else {
660            password_import
661        };
662
663        let unix_password_import = if sync_config
664            .sync_password_as_unix_password
665            .unwrap_or_default()
666        {
667            password_import.clone()
668        } else {
669            None
670        };
671
672        let mail: Vec<_> = entry
673            .remove_ava(&sync_config.person_attr_mail)
674            .map(|set| {
675                set.into_iter()
676                    .map(|addr| MultiValueAttr {
677                        type_: None,
678                        primary: None,
679                        display: None,
680                        ref_: None,
681                        value: addr,
682                    })
683                    .collect()
684            })
685            .unwrap_or_default();
686
687        let totp_import = Vec::default();
688
689        let ssh_publickey = entry
690            .remove_ava(&sync_config.person_attr_ssh_public_key)
691            .map(|set| {
692                set.into_iter()
693                    .enumerate()
694                    .map(|(i, value)| ScimSshPubKey {
695                        label: format!("sshpublickey-{}", i),
696                        value,
697                    })
698                    .collect()
699            })
700            .unwrap_or_default();
701
702        let account_disabled: bool = entry
703            .remove_ava(Attribute::NsAccountLock.as_ref())
704            .map(|set| {
705                set.into_iter()
706                    .any(|value| value != "FALSE" && value != "false")
707            })
708            .unwrap_or_default();
709
710        // Account is not valid
711        let account_expire = if account_disabled {
712            Some(chrono::DateTime::UNIX_EPOCH.to_rfc3339())
713        } else {
714            None
715        };
716        let account_valid_from = None;
717
718        let login_shell = entry
719            .get_ava_single(&sync_config.person_attr_login_shell)
720            .map(str::to_string);
721
722        let scim_sync_person = ScimSyncPerson::builder(id, entry.dn, user_name, display_name)
723            .set_gidnumber(gidnumber)
724            .set_password_import(password_import)
725            .set_unix_password_import(unix_password_import)
726            .set_totp_import(totp_import)
727            .set_login_shell(login_shell)
728            .set_mail(mail)
729            .set_ssh_publickey(ssh_publickey)
730            .set_account_expire(account_expire)
731            .set_account_valid_from(account_valid_from)
732            .build();
733
734        let scim_entry_generic: ScimEntry = scim_sync_person.try_into().map_err(|json_err| {
735            error!(?json_err, "Unable to convert person to scim_sync_person");
736        })?;
737
738        Ok(Some(scim_entry_generic))
739    } else if oc.contains(&sync_config.group_objectclass) {
740        let LdapSyncReplEntry {
741            entry_uuid,
742            state: _,
743            mut entry,
744        } = sync_entry;
745
746        let id = entry_uuid;
747
748        let name = entry
749            .get_ava_single(&sync_config.group_attr_name)
750            .ok_or_else(|| {
751                error!(
752                    "Missing required attribute {} (group_attr_name)",
753                    sync_config.group_attr_name
754                );
755            })?
756            .to_owned();
757
758        let description = entry
759            .get_ava_single(&sync_config.group_attr_description)
760            .map(str::to_string);
761
762        let gidnumber = entry
763            .get_ava_single(&sync_config.group_attr_gidnumber)
764            .map(|gid| {
765                u32::from_str(gid).map_err(|_| {
766                    error!(
767                        "Invalid gidnumber - {} is not a u32 (group_attr_gidnumber)",
768                        sync_config.group_attr_gidnumber
769                    );
770                })
771            })
772            .transpose()?;
773
774        let members: Vec<_> = entry
775            .remove_ava(&sync_config.group_attr_member)
776            // BTreeSet to Vec
777            .map(|set| set.into_iter().collect())
778            .unwrap_or_default();
779
780        let scim_sync_group = ScimSyncGroup::builder(id, entry.dn, name)
781            .set_description(description)
782            .set_gidnumber(gidnumber)
783            .set_members(members.into_iter())
784            .build();
785
786        let scim_entry_generic: ScimEntry = scim_sync_group.try_into().map_err(|json_err| {
787            error!(?json_err, "Unable to convert group to scim_sync_group");
788        })?;
789
790        Ok(Some(scim_entry_generic))
791    } else {
792        debug!("Skipping entry {} with oc {:?}", dn, oc);
793        Ok(None)
794    }
795}
796
797async fn status_task(
798    listener: TcpListener,
799    mut status_rx: broadcast::Receiver<bool>,
800    last_op_status: Arc<AtomicBool>,
801) {
802    loop {
803        tokio::select! {
804            _ = status_rx.recv() => {
805                break;
806            }
807            maybe_sock = listener.accept() => {
808                let mut stream = match maybe_sock {
809                    Ok((sock, addr)) => {
810                        debug!("accept from {:?}", addr);
811                        sock
812                    }
813                    Err(e) => {
814                        error!(?e, "Failed to accept status connection");
815                        continue;
816                    }
817                };
818
819                let sr = if last_op_status.load(Ordering::Relaxed) {
820                     stream.write_all(b"Ok\n").await
821                } else {
822                     stream.write_all(b"Err\n").await
823                };
824                if let Err(e) = sr {
825                    error!(?e, "Failed to send status");
826                }
827            }
828        }
829    }
830    info!("Stopped status task");
831}
832
833fn config_security_checks(cfg_path: &Path) -> bool {
834    let cfg_path_str = cfg_path.to_string_lossy();
835
836    if !cfg_path.exists() {
837        // there's no point trying to start up if we can't read a usable config!
838        error!(
839            "Config missing from {} - cannot start up. Quitting.",
840            cfg_path_str
841        );
842        false
843    } else {
844        let cfg_meta = match metadata(cfg_path) {
845            Ok(v) => v,
846            Err(e) => {
847                error!(
848                    "Unable to read metadata for '{}' during security checks - {:?}",
849                    cfg_path_str, e
850                );
851                return false;
852            }
853        };
854        if !file_permissions_readonly(&cfg_meta) {
855            warn!("permissions on {} may not be secure. Should be readonly to running uid. This could be a security risk ...",
856                cfg_path_str
857                );
858        }
859
860        #[cfg(target_family = "unix")]
861        if cfg_meta.uid() == get_current_uid() || cfg_meta.uid() == get_effective_uid() {
862            warn!("WARNING: {} owned by the current uid, which may allow file permission changes. This could be a security risk ...",
863                cfg_path_str
864            );
865        }
866
867        true
868    }
869}
870
871fn main() {
872    let opt = Opt::parse();
873
874    let fmt_layer = fmt::layer().with_writer(std::io::stderr);
875
876    let filter_layer = if opt.debug {
877        match EnvFilter::try_new("kanidm_client=debug,kanidm_ldap_sync=debug,ldap3_client=debug") {
878            Ok(f) => f,
879            Err(e) => {
880                eprintln!("ERROR! Unable to start tracing {:?}", e);
881                return;
882            }
883        }
884    } else {
885        match EnvFilter::try_from_default_env() {
886            Ok(f) => f,
887            Err(_) => EnvFilter::new("kanidm_client=warn,kanidm_ldap_sync=info,ldap3_client=warn"),
888        }
889    };
890
891    tracing_subscriber::registry()
892        .with(filter_layer)
893        .with(fmt_layer)
894        .init();
895
896    // Startup sanity checks.
897    // TODO: put this in the junk drawer
898    #[cfg(target_family = "unix")]
899    if opt.skip_root_check {
900        warn!("Skipping root user check, if you're running this for testing, ensure you clean up temporary files.")
901    } else if get_current_uid() == 0
902        || get_effective_uid() == 0
903        || get_current_gid() == 0
904        || get_effective_gid() == 0
905    {
906        error!("Refusing to run - this process must not operate as root.");
907        return;
908    };
909
910    if !config_security_checks(&opt.client_config) || !config_security_checks(&opt.ldap_sync_config)
911    {
912        return;
913    }
914
915    let par_count = thread::available_parallelism()
916        .expect("Failed to determine available parallelism")
917        .get();
918
919    let rt = runtime::Builder::new_current_thread()
920        // We configure this as we use parallel workers at some points.
921        .max_blocking_threads(par_count)
922        .enable_all()
923        .build()
924        .expect("Failed to initialise tokio runtime!");
925
926    #[cfg(debug_assertions)]
927    tracing::debug!("Using {} worker threads", par_count);
928
929    if rt.block_on(async move { driver_main(opt).await }).is_err() {
930        std::process::exit(1);
931    };
932}
933
934#[tokio::test]
935async fn test_driver_main() {
936    let testopt = Opt {
937        client_config: PathBuf::from("test"),
938        ldap_sync_config: PathBuf::from("test"),
939        debug: false,
940        schedule: false,
941        proto_dump: false,
942        dry_run: false,
943        skip_root_check: true,
944    };
945    sketching::test_init();
946
947    println!("testing config");
948    // because it can't find the profile file it'll just stop
949    assert!(driver_main(testopt.clone()).await.is_err());
950    println!("done testing missing config");
951
952    let testopt = Opt {
953        client_config: PathBuf::from(format!("{}/Cargo.toml", env!("CARGO_MANIFEST_DIR"))),
954        ldap_sync_config: PathBuf::from(format!("{}/Cargo.toml", env!("CARGO_MANIFEST_DIR"))),
955        ..testopt
956    };
957    println!("valid file path, invalid contents");
958    assert!(driver_main(testopt.clone()).await.is_err());
959    println!("done with valid file path, invalid contents");
960
961    let testopt = Opt {
962        client_config: PathBuf::from(format!(
963            "{}/../../../examples/iam_migration_ldap.toml",
964            env!("CARGO_MANIFEST_DIR")
965        )),
966        ldap_sync_config: PathBuf::from(format!(
967            "{}/../../../examples/iam_migration_ldap.toml",
968            env!("CARGO_MANIFEST_DIR")
969        )),
970        ..testopt
971    };
972
973    println!("valid file path, invalid contents");
974    assert!(driver_main(testopt).await.is_err());
975    println!("done with valid file path, valid contents");
976}