kanidm_ipa_sync/
main.rs

1#![deny(warnings)]
2#![warn(unused_extern_crates)]
3#![deny(clippy::todo)]
4#![deny(clippy::unimplemented)]
5#![deny(clippy::unwrap_used)]
6#![deny(clippy::panic)]
7#![deny(clippy::unreachable)]
8#![deny(clippy::await_holding_lock)]
9#![deny(clippy::needless_pass_by_value)]
10#![deny(clippy::trivially_copy_pass_by_ref)]
11// We allow expect since it forces good error messages at the least.
12#![allow(clippy::expect_used)]
13
14mod config;
15mod error;
16
17// #[cfg(test)]
18// mod tests;
19
20use crate::config::{Config, EntryConfig};
21use crate::error::SyncError;
22use chrono::Utc;
23use clap::Parser;
24use cron::Schedule;
25use kanidm_proto::constants::{
26    ATTR_UID, LDAP_ATTR_CN, LDAP_ATTR_OBJECTCLASS, LDAP_CLASS_GROUPOFNAMES,
27};
28use kanidmd_lib::prelude::{Attribute, EntryClass};
29use std::collections::BTreeMap;
30use std::fs::metadata;
31use std::fs::File;
32use std::io::Read;
33#[cfg(target_family = "unix")]
34use std::os::unix::fs::MetadataExt;
35use std::path::{Path, PathBuf};
36use std::str::FromStr;
37use std::sync::atomic::AtomicBool;
38use std::sync::atomic::Ordering;
39use std::sync::Arc;
40use std::thread;
41use std::time::Duration;
42use tokio::io::AsyncWriteExt;
43use tokio::net::TcpListener;
44use tokio::runtime;
45use tokio::sync::broadcast;
46use tokio::time::sleep;
47
48use tracing::{debug, error, info, warn};
49use tracing_subscriber::prelude::*;
50use tracing_subscriber::{fmt, EnvFilter};
51use uuid::Uuid;
52
53use kanidm_client::KanidmClientBuilder;
54use kanidm_proto::scim_v1::{
55    MultiValueAttr, ScimEntry, ScimSshPubKey, ScimSyncGroup, ScimSyncPerson, ScimSyncRequest,
56    ScimSyncRetentionMode, ScimSyncState, ScimTotp,
57};
58
59use kanidm_lib_file_permissions::readonly as file_permissions_readonly;
60
61#[cfg(target_family = "unix")]
62use kanidm_utils_users::{get_current_gid, get_current_uid, get_effective_gid, get_effective_uid};
63
64use ldap3_client::{
65    proto, proto::LdapFilter, LdapClient, LdapClientBuilder, LdapSyncRepl, LdapSyncReplEntry,
66    LdapSyncStateValue,
67};
68
69include!("./opt.rs");
70
71async fn driver_main(opt: Opt) {
72    debug!("Starting kanidm freeipa sync driver.");
73    // Parse the configs.
74
75    let mut f = match File::open(&opt.ipa_sync_config) {
76        Ok(f) => f,
77        Err(e) => {
78            error!("Unable to open profile file [{:?}] 🥺", e);
79            let diag = kanidm_lib_file_permissions::diagnose_path(&opt.ipa_sync_config);
80            info!(%diag);
81            return;
82        }
83    };
84
85    let mut contents = String::new();
86    if let Err(e) = f.read_to_string(&mut contents) {
87        error!("unable to read profile contents {:?}", e);
88        return;
89    };
90
91    let sync_config: Config = match toml::from_str(contents.as_str()) {
92        Ok(c) => c,
93        Err(e) => {
94            eprintln!("unable to parse config {:?}", e);
95            return;
96        }
97    };
98
99    debug!(?sync_config);
100
101    let cb = match KanidmClientBuilder::new().read_options_from_optional_config(&opt.client_config)
102    {
103        Ok(v) => v,
104        Err(_) => {
105            error!("Failed to parse {}", opt.client_config.to_string_lossy());
106            return;
107        }
108    };
109
110    let expression = sync_config.schedule.as_deref().unwrap_or("0 */5 * * * * *");
111
112    let schedule = match Schedule::from_str(expression) {
113        Ok(s) => s,
114        Err(_) => {
115            error!("Failed to parse cron schedule expression");
116            return;
117        }
118    };
119
120    if opt.schedule {
121        let last_op_status = Arc::new(AtomicBool::new(true));
122        let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
123
124        let last_op_status_c = last_op_status.clone();
125
126        // Can we setup the socket for status?
127
128        let status_handle = if let Some(sb) = sync_config.status_bind.as_deref() {
129            // Can we bind?
130            let listener = match TcpListener::bind(sb).await {
131                Ok(l) => l,
132                Err(e) => {
133                    error!(?e, "Failed to bind status socket");
134                    return;
135                }
136            };
137
138            info!("Status listener is started on {:?}", sb);
139            // Detach a status listener.
140            let status_rx = broadcast_tx.subscribe();
141            Some(tokio::spawn(async move {
142                status_task(listener, status_rx, last_op_status_c).await
143            }))
144        } else {
145            warn!("No status listener configured, this will prevent you monitoring the sync tool");
146            None
147        };
148
149        // main driver loop
150        let driver_handle = tokio::spawn(async move {
151            loop {
152                let now = Utc::now();
153                let next_time = match schedule.after(&now).next() {
154                    Some(v) => v,
155                    None => {
156                        error!("Failed to access any future scheduled events, terminating.");
157                        break;
158                    }
159                };
160
161                // If we don't do 1 + here we can trigger the event multiple times
162                // rapidly since we are in the same second.
163                let wait_seconds = 1 + (next_time - now).num_seconds() as u64;
164                info!("next sync on {}, wait_time = {}s", next_time, wait_seconds);
165
166                tokio::select! {
167                    _ = broadcast_rx.recv() => {
168                        // stop the event loop!
169                        break;
170                    }
171                    _ = sleep(Duration::from_secs(wait_seconds)) => {
172                        info!("starting sync ...");
173                        match run_sync(cb.clone(), &sync_config, &opt).await {
174                            Ok(_) => last_op_status.store(true, Ordering::Relaxed),
175                            Err(e) => {
176                                error!(?e, "sync completed with error");
177                                last_op_status.store(false, Ordering::Relaxed)
178                            }
179                        };
180                    }
181                }
182            }
183            info!("Stopped sync driver");
184        });
185
186        // TODO: this loop/handler should be generic across the various crates
187        // Block on signals now.
188        loop {
189            #[cfg(target_family = "unix")]
190            {
191                tokio::select! {
192                    Ok(()) = tokio::signal::ctrl_c() => {
193                        break
194                    }
195                    Some(()) = async move {
196                        let sigterm = tokio::signal::unix::SignalKind::terminate();
197                        #[allow(clippy::unwrap_used)]
198                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
199                    } => {
200                        break
201                    }
202                    Some(()) = async move {
203                        let sigterm = tokio::signal::unix::SignalKind::alarm();
204                        #[allow(clippy::unwrap_used)]
205                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
206                    } => {
207                        // Ignore
208                    }
209                    Some(()) = async move {
210                        let sigterm = tokio::signal::unix::SignalKind::hangup();
211                        #[allow(clippy::unwrap_used)]
212                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
213                    } => {
214                        // Ignore
215                    }
216                    Some(()) = async move {
217                        let sigterm = tokio::signal::unix::SignalKind::user_defined1();
218                        #[allow(clippy::unwrap_used)]
219                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
220                    } => {
221                        // Ignore
222                    }
223                    Some(()) = async move {
224                        let sigterm = tokio::signal::unix::SignalKind::user_defined2();
225                        #[allow(clippy::unwrap_used)]
226                        tokio::signal::unix::signal(sigterm).unwrap().recv().await
227                    } => {
228                        // Ignore
229                    }
230                }
231            }
232            #[cfg(target_family = "windows")]
233            {
234                tokio::select! {
235                    Ok(()) = tokio::signal::ctrl_c() => {
236                        break
237                    }
238                }
239            }
240        }
241
242        broadcast_tx
243            .send(true)
244            .expect("Failed to trigger a clean shutdown!");
245
246        let _ = driver_handle.await;
247        if let Some(sh) = status_handle {
248            let _ = sh.await;
249        }
250    } else if let Err(e) = run_sync(cb, &sync_config, &opt).await {
251        error!(?e, "Sync completed with error");
252    }
253}
254
255async fn status_task(
256    listener: TcpListener,
257    mut status_rx: broadcast::Receiver<bool>,
258    last_op_status: Arc<AtomicBool>,
259) {
260    loop {
261        tokio::select! {
262            _ = status_rx.recv() => {
263                break;
264            }
265            maybe_sock = listener.accept() => {
266                let mut stream = match maybe_sock {
267                    Ok((sock, addr)) => {
268                        debug!("accept from {:?}", addr);
269                        sock
270                    }
271                    Err(e) => {
272                        error!(?e, "Failed to accept status connection");
273                        continue;
274                    }
275                };
276
277                let sr = if last_op_status.load(Ordering::Relaxed) {
278                     stream.write_all(b"Ok\n").await
279                } else {
280                     stream.write_all(b"Err\n").await
281                };
282                if let Err(e) = sr {
283                    error!(?e, "Failed to send status");
284                }
285            }
286        }
287    }
288    info!("Stopped status task");
289}
290
291async fn run_sync(
292    cb: KanidmClientBuilder,
293    sync_config: &Config,
294    opt: &Opt,
295) -> Result<(), SyncError> {
296    let rsclient = match cb.build() {
297        Ok(rsc) => rsc,
298        Err(_e) => {
299            error!("Failed to build async client");
300            return Err(SyncError::ClientConfig);
301        }
302    };
303
304    rsclient.set_token(sync_config.sync_token.clone()).await;
305
306    // Preflight check.
307    //  * can we connect to ipa?
308    let mut ipa_client = match LdapClientBuilder::new(&sync_config.ipa_uri)
309        .max_ber_size(sync_config.max_ber_size)
310        .add_tls_ca(&sync_config.ipa_ca)
311        .build()
312        .await
313    {
314        Ok(lc) => lc,
315        Err(e) => {
316            error!(?e, "Failed to connect to freeipa");
317            return Err(SyncError::LdapConn);
318        }
319    };
320
321    match ipa_client
322        .bind(
323            sync_config.ipa_sync_dn.clone(),
324            sync_config.ipa_sync_pw.clone(),
325        )
326        .await
327    {
328        Ok(()) => {
329            debug!(ipa_sync_dn = ?sync_config.ipa_sync_dn, ipa_uri = %sync_config.ipa_uri);
330        }
331        Err(e) => {
332            error!(?e, "Failed to bind (authenticate) to freeipa");
333            return Err(SyncError::LdapAuth);
334        }
335    };
336
337    //  * can we connect to kanidm?
338    // - get the current sync cookie from kanidm.
339    let scim_sync_status = match rsclient.scim_v1_sync_status().await {
340        Ok(s) => s,
341        Err(e) => {
342            error!(?e, "Failed to access scim sync status");
343            return Err(SyncError::SyncStatus);
344        }
345    };
346
347    debug!(state=?scim_sync_status);
348
349    // === Everything is connected! ===
350
351    // Based on the scim_sync_status, perform our sync repl
352
353    let mode = proto::SyncRequestMode::RefreshOnly;
354
355    let cookie = match &scim_sync_status {
356        ScimSyncState::Refresh => None,
357        ScimSyncState::Active { cookie } => Some(cookie.to_vec()),
358    };
359
360    let is_initialise = cookie.is_none();
361
362    let filter = LdapFilter::Or(vec![
363        // LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "domain".to_string()),
364        LdapFilter::And(vec![
365            LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "person".to_string()),
366            LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "posixaccount".to_string()),
367        ]),
368        LdapFilter::And(vec![
369            LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "groupofnames".to_string()),
370            LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "ipausergroup".to_string()),
371            // Ignore user private groups, kani generates these internally.
372            LdapFilter::Not(Box::new(LdapFilter::Equality(
373                LDAP_ATTR_OBJECTCLASS.into(),
374                "mepmanagedentry".to_string(),
375            ))),
376            // Need to exclude the admins group as it gid conflicts to admin.
377            LdapFilter::Not(Box::new(LdapFilter::Equality(
378                LDAP_ATTR_CN.into(),
379                "admins".to_string(),
380            ))),
381            // Kani internally has an all persons group.
382            LdapFilter::Not(Box::new(LdapFilter::Equality(
383                LDAP_ATTR_CN.into(),
384                "ipausers".to_string(),
385            ))),
386            // Ignore editors/trust admins
387            LdapFilter::Not(Box::new(LdapFilter::Equality(
388                LDAP_ATTR_CN.into(),
389                "editors".to_string(),
390            ))),
391            LdapFilter::Not(Box::new(LdapFilter::Equality(
392                LDAP_ATTR_CN.into(),
393                "trust admins".to_string(),
394            ))),
395        ]),
396        // Fetch TOTP's so we know when/if they change.
397        LdapFilter::And(vec![
398            LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "ipatoken".to_string()),
399            LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "ipatokentotp".to_string()),
400        ]),
401    ]);
402
403    debug!(ipa_sync_base_dn = ?sync_config.ipa_sync_base_dn, ?cookie, ?mode, ?filter);
404    let sync_result = match ipa_client
405        .syncrepl(sync_config.ipa_sync_base_dn.clone(), filter, cookie, mode)
406        .await
407    {
408        Ok(results) => results,
409        Err(e) => {
410            error!(?e, "Failed to perform syncrepl from ipa");
411            return Err(SyncError::LdapSyncrepl);
412        }
413    };
414
415    if opt.proto_dump {
416        let stdout = std::io::stdout();
417        if let Err(e) = serde_json::to_writer_pretty(stdout, &sync_result) {
418            error!(?e, "Failed to serialise ldap sync response");
419        }
420    }
421
422    // Convert the ldap sync repl result to a scim equivalent
423    let scim_sync_request = match sync_result {
424        LdapSyncRepl::Success {
425            cookie,
426            refresh_deletes,
427            entries,
428            delete_uuids,
429            present_uuids,
430        } => {
431            if refresh_deletes {
432                error!("Unsure how to handle refreshDeletes=True");
433                return Err(SyncError::Preprocess);
434            }
435
436            if present_uuids.is_some() {
437                error!("Unsure how to handle presentUuids > 0");
438                return Err(SyncError::Preprocess);
439            }
440
441            let to_state = cookie
442                .map(|cookie| {
443                    ScimSyncState::Active { cookie }
444                })
445                .ok_or_else(|| {
446                    error!("Invalid state, ldap sync repl did not provide a valid state cookie in response.");
447
448                    SyncError::Preprocess
449
450                })?;
451
452            // process the entries to scim.
453            let entries = match process_ipa_sync_result(
454                &mut ipa_client,
455                sync_config.ipa_sync_base_dn.clone(),
456                entries,
457                &sync_config.entry_map,
458                is_initialise,
459                sync_config
460                    .sync_password_as_unix_password
461                    .unwrap_or_default(),
462            )
463            .await
464            {
465                Ok(ssr) => ssr,
466                Err(()) => {
467                    error!("Failed to process IPA entries to SCIM");
468                    return Err(SyncError::Preprocess);
469                }
470            };
471
472            let retain = if let Some(delete_uuids) = delete_uuids {
473                ScimSyncRetentionMode::Delete(delete_uuids)
474            } else {
475                ScimSyncRetentionMode::Ignore
476            };
477
478            ScimSyncRequest {
479                from_state: scim_sync_status,
480                to_state,
481                entries,
482                retain,
483            }
484        }
485        LdapSyncRepl::RefreshRequired => {
486            let to_state = ScimSyncState::Refresh;
487
488            ScimSyncRequest {
489                from_state: scim_sync_status,
490                to_state,
491                entries: Vec::new(),
492                retain: ScimSyncRetentionMode::Ignore,
493            }
494        }
495    };
496
497    if opt.proto_dump {
498        let stdout = std::io::stdout();
499        // write it out.
500        if let Err(e) = serde_json::to_writer_pretty(stdout, &scim_sync_request) {
501            error!(?e, "Failed to serialise scim sync request");
502        };
503        Ok(())
504    } else if opt.dry_run {
505        info!("dry-run complete");
506        info!("Success!");
507        Ok(())
508    } else if let Err(e) = rsclient.scim_v1_sync_update(&scim_sync_request).await {
509        error!(
510            ?e,
511            "Failed to submit scim sync update - see the kanidmd server log for more details."
512        );
513        Err(SyncError::SyncUpdate)
514    } else {
515        info!("Success!");
516        Ok(())
517    }
518    // done!
519}
520
521async fn process_ipa_sync_result(
522    ipa_client: &mut LdapClient,
523    basedn: String,
524    ldap_entries: Vec<LdapSyncReplEntry>,
525    entry_config_map: &BTreeMap<Uuid, EntryConfig>,
526    is_initialise: bool,
527    sync_password_as_unix_password: bool,
528) -> Result<Vec<ScimEntry>, ()> {
529    // Because of how TOTP works with freeipa it's a soft referral from
530    // the totp toward the user. This means if a TOTP is added or removed
531    // we see those as unique entries in the syncrepl but we are missing
532    // the user entry that actually needs the update since Kanidm makes TOTP
533    // part of the entry itself.
534    //
535    // This *also* means that when a user is updated that we also need to
536    // fetch their TOTP's that are related so we can assert them on the
537    // submission.
538    //
539    // Because of this, we have to do some client-side processing here to
540    // work out what "entries we are missing" and do a second search to
541    // fetch them. Sadly, this means that we need to do a second search
542    // and since ldap is NOT transactional there is a possibility of a
543    // desync between the sync-repl and the results of the second search.
544    //
545    // There are 5 possibilities - note one of TOTP or USER must be in syncrepl
546    // state else we wouldn't proceed.
547    //      TOTP          USER             OUTCOME
548    //    SyncRepl      SyncRepl         No ext detail needed, proceed
549    //    SyncRepl      Add/Mod          Update user, won't change on next syncrepl
550    //    SyncRepl        Del            Ignore this TOTP -> will be deleted on next syncrepl
551    //    Add/Mod       SyncRepl         Add the new TOTP, won't change on next syncrepl
552    //      Del         SyncRepl         Remove TOTP, won't change on next syncrepl
553    //
554    // The big challenge is to transform our data in a way that we can actually work
555    // with it here meaning we have to disassemble and "index" the content of our
556    // sync result.
557
558    // Hash entries by DN -> Split TOTP's to their own set.
559    //    make a list of updated TOTP's and what DN's they require.
560    //    make a list of updated Users and what TOTP's they require.
561
562    let mut entries = BTreeMap::default();
563    let mut user_dns = Vec::default();
564    let mut totp_entries: BTreeMap<String, Vec<_>> = BTreeMap::default();
565
566    for lentry in ldap_entries.into_iter() {
567        if lentry
568            .entry
569            .attrs
570            .get(LDAP_ATTR_OBJECTCLASS)
571            .map(|oc| oc.contains("ipatokentotp"))
572            .unwrap_or_default()
573        {
574            // It's an otp. Lets see ...
575            let token_owner_dn = if let Some(todn) = lentry
576                .entry
577                .attrs
578                .get("ipatokenowner")
579                .and_then(|attr| if attr.len() != 1 { None } else { attr.first() })
580            {
581                debug!("totp with owner {}", todn);
582                todn.clone()
583            } else {
584                warn!("totp with invalid ownership will be ignored");
585                continue;
586            };
587
588            if !totp_entries.contains_key(&token_owner_dn) {
589                totp_entries.insert(token_owner_dn.clone(), Vec::default());
590            }
591
592            if let Some(v) = totp_entries.get_mut(&token_owner_dn) {
593                v.push(lentry)
594            }
595        } else {
596            let dn = lentry.entry.dn.clone();
597
598            if lentry
599                .entry
600                .attrs
601                .get(LDAP_ATTR_OBJECTCLASS)
602                .map(|oc| oc.contains(EntryClass::Person.as_ref()))
603                .unwrap_or_default()
604            {
605                user_dns.push(dn.clone());
606            }
607
608            entries.insert(dn, lentry);
609        }
610    }
611
612    // Now, we have to invert the totp set so that it's defined by entry dn instead.
613    debug!("te, {}, e {}", totp_entries.len(), entries.len());
614
615    // If this is an INIT we have the full state already - no extra search is needed.
616
617    // On a refresh, we need to search and fix up to make sure TOTP/USER sets are
618    // consistent.
619    let search_filter = if !is_initialise {
620        // If the totp's related user is NOT in our sync repl, we need to fetch them.
621        let fetch_user: Vec<&str> = totp_entries
622            .keys()
623            .map(|k| k.as_str())
624            .filter(|k| !entries.contains_key(*k))
625            .collect();
626
627        // For every user in our fetch_user *and* entries set, we need to fetch their
628        // related TOTP's.
629        let fetch_totps_for: Vec<&str> = fetch_user
630            .iter()
631            .copied()
632            .chain(user_dns.iter().map(|s| s.as_str()))
633            .collect();
634
635        // Create filter (could hit a limit, may need to split this search).
636        let totp_conditions: Vec<_> = fetch_totps_for
637            .iter()
638            .map(|dn| LdapFilter::Equality("ipatokenowner".to_string(), dn.to_string()))
639            .collect();
640
641        let mut or_filter = Vec::with_capacity(2);
642
643        if !totp_conditions.is_empty() {
644            or_filter.push(LdapFilter::And(vec![
645                LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "ipatoken".to_string()),
646                LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "ipatokentotp".to_string()),
647                LdapFilter::Or(totp_conditions),
648            ]));
649        }
650
651        let user_conditions: Vec<_> = fetch_user
652            .iter()
653            .filter_map(|dn| {
654                // We have to split the DN to it's RDN because lol.
655                dn.split_once(',')
656                    .and_then(|(rdn, _)| rdn.split_once('='))
657                    .map(|(_, uid)| LdapFilter::Equality(ATTR_UID.to_string(), uid.to_string()))
658            })
659            .collect();
660
661        if !user_conditions.is_empty() {
662            or_filter.push(LdapFilter::And(vec![
663                LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "person".to_string()),
664                LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "ipantuserattrs".to_string()),
665                LdapFilter::Equality(LDAP_ATTR_OBJECTCLASS.into(), "posixaccount".to_string()),
666                LdapFilter::Or(user_conditions),
667            ]));
668        }
669
670        if or_filter.is_empty() {
671            None
672        } else {
673            Some(LdapFilter::Or(or_filter))
674        }
675    } else {
676        None
677    };
678
679    // If we have something that needs lookup, apply now.
680    if let Some(filter) = search_filter {
681        debug!(?filter);
682        // Search - we use syncrepl here and discard the cookie because we need the
683        // entry uuid to be given from the nsuniqueid else we have issues.
684        let mode = proto::SyncRequestMode::RefreshOnly;
685        match ipa_client.syncrepl(basedn, filter, None, mode).await {
686            Ok(LdapSyncRepl::Success {
687                cookie: _,
688                refresh_deletes: _,
689                entries: sync_entries,
690                delete_uuids: _,
691                present_uuids: _,
692            }) => {
693                // Inject all new entries to our maps. At this point we discard the original content
694                // of the totp entries since we just fetched them all again anyway.
695
696                totp_entries.clear();
697
698                for lentry in sync_entries.into_iter() {
699                    if lentry
700                        .entry
701                        .attrs
702                        .get(LDAP_ATTR_OBJECTCLASS)
703                        .map(|oc| oc.contains("ipatokentotp"))
704                        .unwrap_or_default()
705                    {
706                        let token_owner_dn = if let Some(todn) = lentry
707                            .entry
708                            .attrs
709                            .get("ipatokenowner")
710                            .and_then(|attr| if attr.len() != 1 { None } else { attr.first() })
711                        {
712                            debug!("totp with owner {}", todn);
713                            todn.clone()
714                        } else {
715                            warn!("totp with invalid ownership will be ignored");
716                            continue;
717                        };
718
719                        if !totp_entries.contains_key(&token_owner_dn) {
720                            totp_entries.insert(token_owner_dn.clone(), Vec::default());
721                        }
722
723                        if let Some(v) = totp_entries.get_mut(&token_owner_dn) {
724                            v.push(lentry)
725                        }
726                    } else {
727                        let dn = lentry.entry.dn.clone();
728                        entries.insert(dn, lentry);
729                    }
730                }
731            }
732            Ok(LdapSyncRepl::RefreshRequired) => {
733                error!("Failed due to invalid search state from ipa");
734                return Err(());
735            }
736            Err(e) => {
737                error!(?e, "Failed to perform search from ipa");
738                return Err(());
739            }
740        }
741    }
742
743    // For each updated TOTP -> If it's related DN is not in Hash -> remove from map
744    totp_entries.retain(|k, _| {
745        let x = entries.contains_key(k);
746        if !x {
747            warn!("Removing totp with no valid owner {}", k);
748        }
749        x
750    });
751
752    let empty_slice = Vec::default();
753
754    // Future - make this par-map
755    entries
756        .into_iter()
757        .filter_map(|(dn, e)| {
758            let e_config = entry_config_map
759                .get(&e.entry_uuid)
760                .cloned()
761                .unwrap_or_default();
762
763            let totp = totp_entries.get(&dn).unwrap_or(&empty_slice);
764
765            match ipa_to_scim_entry(e, &e_config, totp, sync_password_as_unix_password) {
766                Ok(Some(e)) => Some(Ok(e)),
767                Ok(None) => None,
768                Err(()) => Some(Err(())),
769            }
770        })
771        .collect::<Result<Vec<_>, _>>()
772}
773
774fn ipa_to_scim_entry(
775    sync_entry: LdapSyncReplEntry,
776    entry_config: &EntryConfig,
777    totp: &[LdapSyncReplEntry],
778    sync_password_as_unix_password: bool,
779) -> Result<Option<ScimEntry>, ()> {
780    debug!("{:#?}", sync_entry);
781
782    // check the sync_entry state?
783    #[allow(clippy::unimplemented)]
784    if sync_entry.state != LdapSyncStateValue::Add {
785        unimplemented!();
786    }
787
788    let dn = sync_entry.entry.dn.clone();
789
790    // Is this an entry we need to observe/look at?
791    if entry_config.exclude {
792        info!("entry_config excludes {}", dn);
793        return Ok(None);
794    }
795
796    let oc = sync_entry
797        .entry
798        .attrs
799        .get(LDAP_ATTR_OBJECTCLASS)
800        .ok_or_else(|| {
801            debug!(?sync_entry);
802            error!("Invalid entry - no object class {}", dn);
803        })?;
804
805    if oc.contains("person") {
806        let LdapSyncReplEntry {
807            entry_uuid,
808            state: _,
809            mut entry,
810        } = sync_entry;
811
812        let id = if let Some(map_uuid) = &entry_config.map_uuid {
813            *map_uuid
814        } else {
815            entry_uuid
816        };
817
818        let user_name = if let Some(name) = entry_config.map_name.clone() {
819            name
820        } else {
821            entry
822                .remove_ava_single(Attribute::Uid.as_ref())
823                .ok_or_else(|| {
824                    error!("Missing required attribute {}", Attribute::Uid);
825                })?
826        };
827
828        // ⚠️  hardcoded skip on admin here!!!
829        if user_name == "admin" {
830            info!("kanidm excludes {}", dn);
831            return Ok(None);
832        }
833
834        let display_name = entry
835            .remove_ava_single(Attribute::Cn.as_ref())
836            .ok_or_else(|| {
837                error!("Missing required attribute {}", Attribute::Cn);
838            })?;
839
840        // There are some installs that incorrectly assign this to a shared
841        // group.
842        let gidnumber = if let Some(number) = entry_config.map_gidnumber {
843            Some(number)
844        } else {
845            entry
846                .remove_ava_single(Attribute::UidNumber.as_ref())
847                .map(|uid| {
848                    u32::from_str(&uid).map_err(|_| {
849                        error!("Invalid {}", Attribute::UidNumber);
850                    })
851                })
852                .transpose()?
853        };
854
855        let password_import = entry
856            .remove_ava_single(Attribute::IpaNtHash.as_ref())
857            .map(|s| format!("ipaNTHash: {}", s))
858            // If we don't have this, try one of the other hashes that *might* work
859            // The reason we don't do this by default is there are multiple
860            // pw hash formats in 389-ds we don't support!
861            .or_else(|| entry.remove_ava_single(Attribute::UserPassword.as_ref()));
862
863        let unix_password_import = if sync_password_as_unix_password {
864            password_import.clone()
865        } else {
866            None
867        };
868
869        let mail: Vec<_> = entry
870            .remove_ava(Attribute::Mail.as_ref())
871            .map(|set| {
872                set.into_iter()
873                    .map(|addr| MultiValueAttr {
874                        type_: None,
875                        primary: None,
876                        display: None,
877                        ref_: None,
878                        value: addr,
879                    })
880                    .collect()
881            })
882            .unwrap_or_default();
883
884        let totp_import = if !totp.is_empty() {
885            if password_import.is_some() {
886                // If there are TOTP's, convert them to something sensible.
887                totp.iter().filter_map(ipa_to_totp).collect()
888            } else {
889                warn!(
890                    "Skipping totp for {} as password is not available to import.",
891                    dn
892                );
893                Vec::default()
894            }
895        } else {
896            Vec::default()
897        };
898
899        let ssh_publickey = entry
900            .remove_ava(Attribute::IpaSshPubKey.as_ref())
901            .map(|set| {
902                set.into_iter()
903                    .enumerate()
904                    .map(|(i, value)| ScimSshPubKey {
905                        label: format!("{}-{}", Attribute::IpaSshPubKey, i),
906                        value,
907                    })
908                    .collect()
909            })
910            .unwrap_or_default();
911
912        let account_disabled: bool = entry
913            .remove_ava(Attribute::NsAccountLock.as_ref())
914            .map(|set| {
915                set.into_iter()
916                    .any(|value| value != "FALSE" && value != "false")
917            })
918            .unwrap_or_default();
919
920        // Account is not valid
921        let account_expire = if account_disabled {
922            Some(chrono::DateTime::UNIX_EPOCH.to_rfc3339())
923        } else {
924            None
925        };
926
927        let account_valid_from = None;
928
929        let login_shell = entry.remove_ava_single(Attribute::LoginShell.as_ref());
930
931        let scim_sync_person = ScimSyncPerson::builder(id, entry.dn, user_name, display_name)
932            .set_gidnumber(gidnumber)
933            .set_password_import(password_import)
934            .set_unix_password_import(unix_password_import)
935            .set_totp_import(totp_import)
936            .set_login_shell(login_shell)
937            .set_mail(mail)
938            .set_ssh_publickey(ssh_publickey)
939            .set_account_expire(account_expire)
940            .set_account_valid_from(account_valid_from)
941            .build();
942
943        let scim_entry_generic: ScimEntry = scim_sync_person.try_into().map_err(|json_err| {
944            error!(?json_err, "Unable to convert person to scim_sync_person");
945        })?;
946
947        Ok(Some(scim_entry_generic))
948    } else if oc.contains(LDAP_CLASS_GROUPOFNAMES) {
949        let LdapSyncReplEntry {
950            entry_uuid,
951            state: _,
952            mut entry,
953        } = sync_entry;
954
955        let id = entry_uuid;
956
957        let name = entry
958            .remove_ava_single(Attribute::Cn.as_ref())
959            .ok_or_else(|| {
960                error!("Missing required attribute cn");
961            })?;
962
963        // ⚠️  hardcoded skip on trust admins / editors / ipausers here!!!
964        if name == "trust admins" || name == "editors" || name == "ipausers" || name == "admins" {
965            info!("kanidm excludes {}", dn);
966            return Ok(None);
967        }
968
969        let description = entry.remove_ava_single(Attribute::Description.as_ref());
970
971        let gidnumber = entry
972            .remove_ava_single(Attribute::GidNumber.as_ref())
973            .map(|gid| {
974                u32::from_str(&gid).map_err(|_| {
975                    error!("Invalid gidnumber");
976                })
977            })
978            .transpose()?;
979
980        let members: Vec<_> = entry
981            .remove_ava(Attribute::Member.as_ref())
982            .map(|set| set.into_iter().collect())
983            .unwrap_or_default();
984
985        let scim_sync_group = ScimSyncGroup::builder(id, entry.dn, name)
986            .set_description(description)
987            .set_gidnumber(gidnumber)
988            .set_members(members.into_iter())
989            .build();
990
991        let scim_entry_generic: ScimEntry = scim_sync_group.try_into().map_err(|json_err| {
992            error!(?json_err, "Unable to convert group to scim_sync_group");
993        })?;
994
995        Ok(Some(scim_entry_generic))
996    } else if oc.contains("ipatokentotp") {
997        // Skip for now, we don't support multiple totp yet.
998        Ok(None)
999    } else {
1000        debug!("Skipping entry {} with oc {:?}", dn, oc);
1001        Ok(None)
1002    }
1003}
1004
1005fn ipa_to_totp(sync_entry: &LdapSyncReplEntry) -> Option<ScimTotp> {
1006    let external_id = sync_entry
1007        .entry
1008        .attrs
1009        .get("ipatokenuniqueid")
1010        .and_then(|v| v.first().cloned())
1011        .or_else(|| {
1012            warn!("Invalid ipatokenuniqueid");
1013            None
1014        })?;
1015
1016    let secret = sync_entry
1017        .entry
1018        .attrs
1019        .get("ipatokenotpkey")
1020        .and_then(|v| v.first().cloned())
1021        // This is a base64 string at this point
1022        .or_else(|| {
1023            warn!("Invalid ipatokenotpkey");
1024            None
1025        })?;
1026
1027    let algo = sync_entry
1028        .entry
1029        .attrs
1030        .get("ipatokenotpalgorithm")
1031        .and_then(|v| v.first().cloned())
1032        .or_else(|| {
1033            warn!("Invalid ipatokenotpalgorithm");
1034            None
1035        })?;
1036
1037    let step = sync_entry
1038        .entry
1039        .attrs
1040        .get("ipatokentotptimestep")
1041        .and_then(|v| v.first())
1042        .and_then(|d| u32::from_str(d).ok())
1043        .or_else(|| {
1044            warn!("Invalid ipatokentotptimestep");
1045            None
1046        })?;
1047
1048    let digits = sync_entry
1049        .entry
1050        .attrs
1051        .get("ipatokenotpdigits")
1052        .and_then(|v| v.first())
1053        .and_then(|d| u32::from_str(d).ok())
1054        .or_else(|| {
1055            warn!("Invalid ipatokenotpdigits");
1056            None
1057        })?;
1058
1059    Some(ScimTotp {
1060        external_id,
1061        secret,
1062        algo,
1063        step,
1064        digits,
1065    })
1066}
1067
1068fn config_security_checks(cfg_path: &Path) -> bool {
1069    let cfg_path_str = cfg_path.to_string_lossy();
1070
1071    if !cfg_path.exists() {
1072        // there's no point trying to start up if we can't read a usable config!
1073        error!(
1074            "Config missing from {} - cannot start up. Quitting.",
1075            cfg_path_str
1076        );
1077        false
1078    } else {
1079        let cfg_meta = match metadata(cfg_path) {
1080            Ok(v) => v,
1081            Err(e) => {
1082                error!(
1083                    "Unable to read metadata for config file '{}' during security checks - {:?}",
1084                    cfg_path_str, e
1085                );
1086                return false;
1087            }
1088        };
1089        if !file_permissions_readonly(&cfg_meta) {
1090            warn!("permissions on {} may not be secure. Should be readonly to running uid. This could be a security risk ...",
1091                cfg_path_str
1092                );
1093        }
1094
1095        #[cfg(target_family = "unix")]
1096        if cfg_meta.uid() == get_current_uid() || cfg_meta.uid() == get_effective_uid() {
1097            warn!("WARNING: {} owned by the current uid, which may allow file permission changes. This could be a security risk ...",
1098                cfg_path_str
1099            );
1100        }
1101
1102        true
1103    }
1104}
1105
1106fn main() {
1107    let opt = Opt::parse();
1108
1109    let fmt_layer = fmt::layer().with_writer(std::io::stderr);
1110
1111    let filter_layer = if opt.debug {
1112        match EnvFilter::try_new("kanidm_client=debug,kanidm_ipa_sync=debug,ldap3_client=debug") {
1113            Ok(f) => f,
1114            Err(e) => {
1115                eprintln!("ERROR! Unable to start tracing {:?}", e);
1116                return;
1117            }
1118        }
1119    } else {
1120        match EnvFilter::try_from_default_env() {
1121            Ok(f) => f,
1122            Err(_) => EnvFilter::new("kanidm_client=warn,kanidm_ipa_sync=info,ldap3_client=warn"),
1123        }
1124    };
1125
1126    tracing_subscriber::registry()
1127        .with(filter_layer)
1128        .with(fmt_layer)
1129        .init();
1130
1131    // Startup sanity checks.
1132    // TODO: put this in the junk drawer
1133    #[cfg(target_family = "unix")]
1134    if opt.skip_root_check {
1135        warn!("Skipping root user check, if you're running this for testing, ensure you clean up temporary files.")
1136    } else if get_current_uid() == 0
1137        || get_effective_uid() == 0
1138        || get_current_gid() == 0
1139        || get_effective_gid() == 0
1140    {
1141        error!("Refusing to run - this process must not operate as root.");
1142        return;
1143    };
1144
1145    if !config_security_checks(&opt.client_config) || !config_security_checks(&opt.ipa_sync_config)
1146    {
1147        return;
1148    }
1149
1150    let par_count = thread::available_parallelism()
1151        .expect("Failed to determine available parallelism")
1152        .get();
1153
1154    let rt = runtime::Builder::new_current_thread()
1155        // We configure this as we use parallel workers at some points.
1156        .max_blocking_threads(par_count)
1157        .enable_all()
1158        .build()
1159        .expect("Failed to initialise tokio runtime!");
1160
1161    #[cfg(debug_assertions)]
1162    tracing::debug!("Using {} worker threads", par_count);
1163
1164    rt.block_on(async move { driver_main(opt).await });
1165}