#![deny(warnings)]
#![warn(unused_extern_crates)]
#![deny(clippy::todo)]
#![deny(clippy::unimplemented)]
#![deny(clippy::unwrap_used)]
#![deny(clippy::panic)]
#![deny(clippy::unreachable)]
#![deny(clippy::await_holding_lock)]
#![deny(clippy::needless_pass_by_value)]
#![deny(clippy::trivially_copy_pass_by_ref)]
#![allow(clippy::expect_used)]
mod config;
mod error;
use crate::config::{Config, EntryConfig};
use crate::error::SyncError;
use chrono::Utc;
use clap::Parser;
use cron::Schedule;
use kanidm_proto::constants::ATTR_OBJECTCLASS;
use kanidmd_lib::prelude::Attribute;
use std::fs::metadata;
use std::fs::File;
use std::io::Read;
#[cfg(target_family = "unix")]
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::runtime;
use tokio::sync::broadcast;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};
use kanidm_client::KanidmClientBuilder;
use kanidm_lib_file_permissions::readonly as file_permissions_readonly;
use kanidm_proto::scim_v1::{
MultiValueAttr, ScimEntry, ScimSshPubKey, ScimSyncGroup, ScimSyncPerson, ScimSyncRequest,
ScimSyncRetentionMode, ScimSyncState,
};
#[cfg(target_family = "unix")]
use kanidm_utils_users::{get_current_gid, get_current_uid, get_effective_gid, get_effective_uid};
use ldap3_client::{proto, LdapClientBuilder, LdapSyncRepl, LdapSyncReplEntry, LdapSyncStateValue};
include!("./opt.rs");
async fn driver_main(opt: Opt) -> Result<(), ()> {
debug!("Starting kanidm ldap sync driver.");
let mut f = match File::open(&opt.ldap_sync_config) {
Ok(f) => f,
Err(e) => {
error!(
"Unable to open ldap sync config from '{}' [{:?}] 🥺",
&opt.ldap_sync_config.display(),
e
);
return Err(());
}
};
let mut contents = String::new();
if let Err(e) = f.read_to_string(&mut contents) {
error!(
"unable to read file '{}': {:?}",
&opt.ldap_sync_config.display(),
e
);
return Err(());
};
let sync_config: Config = match toml::from_str(contents.as_str()) {
Ok(c) => c,
Err(e) => {
eprintln!(
"Unable to parse config from '{}' error: {:?}",
&opt.ldap_sync_config.display(),
e
);
return Err(());
}
};
debug!(?sync_config);
let cb = match KanidmClientBuilder::new().read_options_from_optional_config(&opt.client_config)
{
Ok(v) => v,
Err(_) => {
error!("Failed to parse {}", opt.client_config.to_string_lossy());
return Err(());
}
};
let expression = sync_config.schedule.as_deref().unwrap_or("0 */5 * * * * *");
let schedule = match Schedule::from_str(expression) {
Ok(s) => s,
Err(_) => {
error!("Failed to parse cron schedule expression");
return Err(());
}
};
if opt.schedule {
let last_op_status = Arc::new(AtomicBool::new(true));
let (broadcast_tx, mut broadcast_rx) = broadcast::channel(4);
let last_op_status_c = last_op_status.clone();
let status_handle = if let Some(sb) = sync_config.status_bind.as_deref() {
let listener = match TcpListener::bind(sb).await {
Ok(l) => l,
Err(e) => {
error!(?e, "Failed to bind status socket");
return Err(());
}
};
info!("Status listener is started on {:?}", sb);
let status_rx = broadcast_tx.subscribe();
Some(tokio::spawn(async move {
status_task(listener, status_rx, last_op_status_c).await
}))
} else {
warn!("No status listener configured, this will prevent you monitoring the sync tool");
None
};
let driver_handle = tokio::spawn(async move {
loop {
let now = Utc::now();
let next_time = match schedule.after(&now).next() {
Some(v) => v,
None => {
error!("Failed to access any future scheduled events, terminating.");
break;
}
};
let wait_seconds = 1 + (next_time - now).num_seconds() as u64;
info!("next sync on {}, wait_time = {}s", next_time, wait_seconds);
tokio::select! {
_ = broadcast_rx.recv() => {
break;
}
_ = sleep(Duration::from_secs(wait_seconds)) => {
info!("starting sync ...");
match run_sync(cb.clone(), &sync_config, &opt).await {
Ok(_) => last_op_status.store(true, Ordering::Relaxed),
Err(e) => {
error!(?e, "sync completed with error");
last_op_status.store(false, Ordering::Relaxed)
}
};
}
}
}
info!("Stopped sync driver");
});
loop {
#[cfg(target_family = "unix")]
{
tokio::select! {
Ok(()) = tokio::signal::ctrl_c() => {
break
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::terminate();
#[allow(clippy::unwrap_used)]
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
break
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::alarm();
#[allow(clippy::unwrap_used)]
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::hangup();
#[allow(clippy::unwrap_used)]
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::user_defined1();
#[allow(clippy::unwrap_used)]
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
}
Some(()) = async move {
let sigterm = tokio::signal::unix::SignalKind::user_defined2();
#[allow(clippy::unwrap_used)]
tokio::signal::unix::signal(sigterm).unwrap().recv().await
} => {
}
}
}
#[cfg(target_family = "windows")]
{
tokio::select! {
Ok(()) = tokio::signal::ctrl_c() => {
break
}
}
}
}
broadcast_tx
.send(true)
.expect("Failed to trigger a clean shutdown!");
let _ = driver_handle.await;
if let Some(sh) = status_handle {
let _ = sh.await;
}
} else if let Err(e) = run_sync(cb, &sync_config, &opt).await {
error!(?e, "Sync completed with error");
}
Ok(())
}
async fn run_sync(
cb: KanidmClientBuilder,
sync_config: &Config,
opt: &Opt,
) -> Result<(), SyncError> {
let rsclient = match cb.build() {
Ok(rsc) => rsc,
Err(_e) => {
error!("Failed to build async client");
return Err(SyncError::ClientConfig);
}
};
rsclient.set_token(sync_config.sync_token.clone()).await;
let mut ldap_client = match LdapClientBuilder::new(&sync_config.ldap_uri)
.max_ber_size(sync_config.max_ber_size)
.add_tls_ca(&sync_config.ldap_ca)
.build()
.await
{
Ok(lc) => lc,
Err(e) => {
error!(?e, "Failed to connect to ldap");
return Err(SyncError::LdapConn);
}
};
match ldap_client
.bind(
sync_config.ldap_sync_dn.clone(),
sync_config.ldap_sync_pw.clone(),
)
.await
{
Ok(()) => {
debug!(ldap_sync_dn = ?sync_config.ldap_sync_dn, ldap_uri = %sync_config.ldap_uri);
}
Err(e) => {
error!(?e, "Failed to bind (authenticate) to freeldap");
return Err(SyncError::LdapAuth);
}
};
let scim_sync_status = match rsclient.scim_v1_sync_status().await {
Ok(s) => s,
Err(e) => {
error!(?e, "Failed to access scim sync status");
return Err(SyncError::SyncStatus);
}
};
debug!(state=?scim_sync_status);
let mode = proto::SyncRequestMode::RefreshOnly;
let cookie = match &scim_sync_status {
ScimSyncState::Refresh => None,
ScimSyncState::Active { cookie } => Some(cookie.to_vec()),
};
let filter = sync_config.ldap_filter.clone();
debug!(ldap_sync_base_dn = ?sync_config.ldap_sync_base_dn, ?cookie, ?mode, ?filter);
let sync_result = match ldap_client
.syncrepl(sync_config.ldap_sync_base_dn.clone(), filter, cookie, mode)
.await
{
Ok(results) => results,
Err(e) => {
error!(?e, "Failed to perform syncrepl from ldap");
return Err(SyncError::LdapSyncrepl);
}
};
if opt.proto_dump {
let stdout = std::io::stdout();
if let Err(e) = serde_json::to_writer_pretty(stdout, &sync_result) {
error!(?e, "Failed to serialise ldap sync response");
}
}
let scim_sync_request = match sync_result {
LdapSyncRepl::Success {
cookie,
refresh_deletes: _,
entries,
delete_uuids,
present_uuids,
} => {
let to_state = if let Some(cookie) = cookie {
ScimSyncState::Active { cookie }
} else {
info!("no changes required");
return Ok(());
};
let retain = match (delete_uuids, present_uuids) {
(None, None) => {
ScimSyncRetentionMode::Ignore
}
(Some(d_uuids), None) => {
ScimSyncRetentionMode::Delete(d_uuids)
}
(None, Some(p_uuids)) => {
ScimSyncRetentionMode::Retain(p_uuids)
}
(Some(_), Some(_)) => {
error!("Ldap server provided an invalid sync repl response - unable to have both delete and present phases.");
return Err(SyncError::LdapStateInvalid);
}
};
let entries = match process_ldap_sync_result(entries, sync_config).await {
Ok(ssr) => ssr,
Err(()) => {
error!("Failed to process IPA entries to SCIM");
return Err(SyncError::Preprocess);
}
};
ScimSyncRequest {
from_state: scim_sync_status,
to_state,
entries,
retain,
}
}
LdapSyncRepl::RefreshRequired => {
let to_state = ScimSyncState::Refresh;
ScimSyncRequest {
from_state: scim_sync_status,
to_state,
entries: Vec::new(),
retain: ScimSyncRetentionMode::Ignore,
}
}
};
if opt.proto_dump {
let stdout = std::io::stdout();
if let Err(e) = serde_json::to_writer_pretty(stdout, &scim_sync_request) {
error!(?e, "Failed to serialise scim sync request");
};
Ok(())
} else if opt.dry_run {
info!("dry-run complete");
info!("Success!");
Ok(())
} else if let Err(e) = rsclient.scim_v1_sync_update(&scim_sync_request).await {
error!(
?e,
"Failed to submit scim sync update - see the kanidmd server log for more details."
);
Err(SyncError::SyncUpdate)
} else {
info!("Success!");
Ok(())
}
}
async fn process_ldap_sync_result(
ldap_entries: Vec<LdapSyncReplEntry>,
sync_config: &Config,
) -> Result<Vec<ScimEntry>, ()> {
ldap_entries
.into_iter()
.filter_map(|lentry| {
let e_config = sync_config
.entry_map
.get(&lentry.entry_uuid)
.cloned()
.unwrap_or_default();
match ldap_to_scim_entry(lentry, &e_config, sync_config) {
Ok(Some(e)) => Some(Ok(e)),
Ok(None) => None,
Err(()) => Some(Err(())),
}
})
.collect::<Result<Vec<_>, _>>()
}
fn ldap_to_scim_entry(
sync_entry: LdapSyncReplEntry,
entry_config: &EntryConfig,
sync_config: &Config,
) -> Result<Option<ScimEntry>, ()> {
debug!("{:#?}", sync_entry);
#[allow(clippy::unimplemented)]
if sync_entry.state != LdapSyncStateValue::Add {
unimplemented!();
}
let dn = sync_entry.entry.dn.clone();
if entry_config.exclude {
info!("entry_config excludes {}", dn);
return Ok(None);
}
let oc = sync_entry
.entry
.attrs
.get(ATTR_OBJECTCLASS)
.ok_or_else(|| {
error!("Invalid entry - no object class {}", dn);
})?;
if oc.contains(&sync_config.person_objectclass) {
let LdapSyncReplEntry {
entry_uuid,
state: _,
mut entry,
} = sync_entry;
let id = if let Some(map_uuid) = &entry_config.map_uuid {
*map_uuid
} else {
entry_uuid
};
let user_name = if let Some(name) = entry_config.map_name.clone() {
name
} else {
entry
.get_ava_single(&sync_config.person_attr_user_name)
.ok_or_else(|| {
error!(
"Missing required attribute {} (person_attr_user_name)",
sync_config.person_attr_user_name
);
})?
.to_owned()
};
let display_name = entry
.get_ava_single(&sync_config.person_attr_display_name)
.ok_or_else(|| {
error!(
"Missing required attribute {} (person_attr_display_name)",
sync_config.person_attr_display_name
);
})?
.to_owned();
let gidnumber = if let Some(number) = entry_config.map_gidnumber {
Some(number)
} else {
entry
.get_ava_single(&sync_config.person_attr_gidnumber)
.map(|gid| {
u32::from_str(gid).map_err(|_| {
error!(
"Invalid gidnumber - {} is not a u32 (person_attr_gidnumber)",
sync_config.person_attr_gidnumber
);
})
})
.transpose()?
};
let password_import = entry
.get_ava_single(&sync_config.person_attr_password)
.map(str::to_string);
let password_import = if let Some(pw_prefix) = sync_config.person_password_prefix.as_ref() {
password_import.map(|s| format!("{}{}", pw_prefix, s))
} else {
password_import
};
let unix_password_import = if sync_config
.sync_password_as_unix_password
.unwrap_or_default()
{
password_import.clone()
} else {
None
};
let mail: Vec<_> = entry
.remove_ava(&sync_config.person_attr_mail)
.map(|set| {
set.into_iter()
.map(|addr| MultiValueAttr {
type_: None,
primary: None,
display: None,
ref_: None,
value: addr,
})
.collect()
})
.unwrap_or_default();
let totp_import = Vec::default();
let ssh_publickey = entry
.remove_ava(&sync_config.person_attr_ssh_public_key)
.map(|set| {
set.into_iter()
.enumerate()
.map(|(i, value)| ScimSshPubKey {
label: format!("sshpublickey-{}", i),
value,
})
.collect()
})
.unwrap_or_default();
let account_disabled: bool = entry
.remove_ava(Attribute::NsAccountLock.as_ref())
.map(|set| {
set.into_iter()
.any(|value| value != "FALSE" && value != "false")
})
.unwrap_or_default();
let account_expire = if account_disabled {
Some(chrono::DateTime::UNIX_EPOCH.to_rfc3339())
} else {
None
};
let account_valid_from = None;
let login_shell = entry
.get_ava_single(&sync_config.person_attr_login_shell)
.map(str::to_string);
let scim_sync_person = ScimSyncPerson::builder(id, entry.dn, user_name, display_name)
.set_gidnumber(gidnumber)
.set_password_import(password_import)
.set_unix_password_import(unix_password_import)
.set_totp_import(totp_import)
.set_login_shell(login_shell)
.set_mail(mail)
.set_ssh_publickey(ssh_publickey)
.set_account_expire(account_expire)
.set_account_valid_from(account_valid_from)
.build();
let scim_entry_generic: ScimEntry = scim_sync_person.try_into().map_err(|json_err| {
error!(?json_err, "Unable to convert person to scim_sync_person");
})?;
Ok(Some(scim_entry_generic))
} else if oc.contains(&sync_config.group_objectclass) {
let LdapSyncReplEntry {
entry_uuid,
state: _,
mut entry,
} = sync_entry;
let id = entry_uuid;
let name = entry
.get_ava_single(&sync_config.group_attr_name)
.ok_or_else(|| {
error!(
"Missing required attribute {} (group_attr_name)",
sync_config.group_attr_name
);
})?
.to_owned();
let description = entry
.get_ava_single(&sync_config.group_attr_description)
.map(str::to_string);
let gidnumber = entry
.get_ava_single(&sync_config.group_attr_gidnumber)
.map(|gid| {
u32::from_str(gid).map_err(|_| {
error!(
"Invalid gidnumber - {} is not a u32 (group_attr_gidnumber)",
sync_config.group_attr_gidnumber
);
})
})
.transpose()?;
let members: Vec<_> = entry
.remove_ava(&sync_config.group_attr_member)
.map(|set| set.into_iter().collect())
.unwrap_or_default();
let scim_sync_group = ScimSyncGroup::builder(id, entry.dn, name)
.set_description(description)
.set_gidnumber(gidnumber)
.set_members(members.into_iter())
.build();
let scim_entry_generic: ScimEntry = scim_sync_group.try_into().map_err(|json_err| {
error!(?json_err, "Unable to convert group to scim_sync_group");
})?;
Ok(Some(scim_entry_generic))
} else {
debug!("Skipping entry {} with oc {:?}", dn, oc);
Ok(None)
}
}
async fn status_task(
listener: TcpListener,
mut status_rx: broadcast::Receiver<bool>,
last_op_status: Arc<AtomicBool>,
) {
loop {
tokio::select! {
_ = status_rx.recv() => {
break;
}
maybe_sock = listener.accept() => {
let mut stream = match maybe_sock {
Ok((sock, addr)) => {
debug!("accept from {:?}", addr);
sock
}
Err(e) => {
error!(?e, "Failed to accept status connection");
continue;
}
};
let sr = if last_op_status.load(Ordering::Relaxed) {
stream.write_all(b"Ok\n").await
} else {
stream.write_all(b"Err\n").await
};
if let Err(e) = sr {
error!(?e, "Failed to send status");
}
}
}
}
info!("Stopped status task");
}
fn config_security_checks(cfg_path: &Path) -> bool {
let cfg_path_str = cfg_path.to_string_lossy();
if !cfg_path.exists() {
error!(
"Config missing from {} - cannot start up. Quitting.",
cfg_path_str
);
false
} else {
let cfg_meta = match metadata(cfg_path) {
Ok(v) => v,
Err(e) => {
error!(
"Unable to read metadata for '{}' during security checks - {:?}",
cfg_path_str, e
);
return false;
}
};
if !file_permissions_readonly(&cfg_meta) {
warn!("permissions on {} may not be secure. Should be readonly to running uid. This could be a security risk ...",
cfg_path_str
);
}
#[cfg(target_family = "unix")]
if cfg_meta.uid() == get_current_uid() || cfg_meta.uid() == get_effective_uid() {
warn!("WARNING: {} owned by the current uid, which may allow file permission changes. This could be a security risk ...",
cfg_path_str
);
}
true
}
}
fn main() {
let opt = Opt::parse();
let fmt_layer = fmt::layer().with_writer(std::io::stderr);
let filter_layer = if opt.debug {
match EnvFilter::try_new("kanidm_client=debug,kanidm_ldap_sync=debug,ldap3_client=debug") {
Ok(f) => f,
Err(e) => {
eprintln!("ERROR! Unable to start tracing {:?}", e);
return;
}
}
} else {
match EnvFilter::try_from_default_env() {
Ok(f) => f,
Err(_) => EnvFilter::new("kanidm_client=warn,kanidm_ldap_sync=info,ldap3_client=warn"),
}
};
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
#[cfg(target_family = "unix")]
if opt.skip_root_check {
warn!("Skipping root user check, if you're running this for testing, ensure you clean up temporary files.")
} else if get_current_uid() == 0
|| get_effective_uid() == 0
|| get_current_gid() == 0
|| get_effective_gid() == 0
{
error!("Refusing to run - this process must not operate as root.");
return;
};
if !config_security_checks(&opt.client_config) || !config_security_checks(&opt.ldap_sync_config)
{
return;
}
let par_count = thread::available_parallelism()
.expect("Failed to determine available parallelism")
.get();
let rt = runtime::Builder::new_current_thread()
.max_blocking_threads(par_count)
.enable_all()
.build()
.expect("Failed to initialise tokio runtime!");
#[cfg(debug_assertions)]
tracing::debug!("Using {} worker threads", par_count);
if rt.block_on(async move { driver_main(opt).await }).is_err() {
std::process::exit(1);
};
}
#[tokio::test]
async fn test_driver_main() {
let testopt = Opt {
client_config: PathBuf::from("test"),
ldap_sync_config: PathBuf::from("test"),
debug: false,
schedule: false,
proto_dump: false,
dry_run: false,
skip_root_check: true,
};
sketching::test_init();
println!("testing config");
assert!(driver_main(testopt.clone()).await.is_err());
println!("done testing missing config");
let testopt = Opt {
client_config: PathBuf::from(format!("{}/Cargo.toml", env!("CARGO_MANIFEST_DIR"))),
ldap_sync_config: PathBuf::from(format!("{}/Cargo.toml", env!("CARGO_MANIFEST_DIR"))),
..testopt
};
println!("valid file path, invalid contents");
assert!(driver_main(testopt.clone()).await.is_err());
println!("done with valid file path, invalid contents");
let testopt = Opt {
client_config: PathBuf::from(format!(
"{}/../../../examples/iam_migration_ldap.toml",
env!("CARGO_MANIFEST_DIR")
)),
ldap_sync_config: PathBuf::from(format!(
"{}/../../../examples/iam_migration_ldap.toml",
env!("CARGO_MANIFEST_DIR")
)),
..testopt
};
println!("valid file path, invalid contents");
assert!(driver_main(testopt).await.is_err());
println!("done with valid file path, valid contents");
}