kanidmd_core/
ldaps.rs

1use crate::actors::QueryServerReadV1;
2use crate::CoreAction;
3use cidr::IpCidr;
4use futures_util::sink::SinkExt;
5use futures_util::stream::StreamExt;
6use haproxy_protocol::{ProxyHdrV2, RemoteAddress};
7use kanidmd_lib::idm::ldap::{LdapBoundToken, LdapResponseState};
8use kanidmd_lib::prelude::*;
9use ldap3_proto::proto::LdapMsg;
10use ldap3_proto::LdapCodec;
11use openssl::ssl::{Ssl, SslAcceptor};
12use std::net::SocketAddr;
13use std::pin::Pin;
14use std::str::FromStr;
15use std::sync::Arc;
16use tokio::io::{AsyncRead, AsyncWrite};
17use tokio::net::{TcpListener, TcpStream};
18use tokio::sync::broadcast;
19use tokio::sync::mpsc;
20use tokio_openssl::SslStream;
21use tokio_util::codec::{FramedRead, FramedWrite};
22
23struct LdapSession {
24    uat: Option<LdapBoundToken>,
25}
26
27impl LdapSession {
28    fn new() -> Self {
29        LdapSession {
30            // We start un-authenticated
31            uat: None,
32        }
33    }
34}
35
36#[instrument(name = "ldap-request", skip(client_address, qe_r_ref))]
37async fn client_process_msg(
38    uat: Option<LdapBoundToken>,
39    client_address: SocketAddr,
40    protomsg: LdapMsg,
41    qe_r_ref: &'static QueryServerReadV1,
42) -> Option<LdapResponseState> {
43    let eventid = sketching::tracing_forest::id();
44    security_info!(
45        client_ip = %client_address.ip(),
46        client_port = %client_address.port(),
47        "LDAP client"
48    );
49    qe_r_ref
50        .handle_ldaprequest(eventid, protomsg, uat, client_address.ip())
51        .await
52}
53
54async fn client_process<STREAM>(
55    stream: STREAM,
56    client_address: SocketAddr,
57    connection_address: SocketAddr,
58    qe_r_ref: &'static QueryServerReadV1,
59) where
60    STREAM: AsyncRead + AsyncWrite,
61{
62    let (r, w) = tokio::io::split(stream);
63    let mut r = FramedRead::new(r, LdapCodec::default());
64    let mut w = FramedWrite::new(w, LdapCodec::default());
65
66    // This is a connected client session. we need to associate some state to the session
67    let mut session = LdapSession::new();
68    // Now that we have the session we begin an event loop to process input OR we return.
69    while let Some(Ok(protomsg)) = r.next().await {
70        // Start the event
71        let uat = session.uat.clone();
72        let caddr = client_address;
73
74        debug!(?client_address, ?connection_address);
75
76        match client_process_msg(uat, caddr, protomsg, qe_r_ref).await {
77            // I'd really have liked to have put this near the [LdapResponseState::Bind] but due
78            // to the handing of `audit` it isn't possible due to borrows, etc.
79            Some(LdapResponseState::Unbind) => return,
80            Some(LdapResponseState::Disconnect(rmsg)) => {
81                if w.send(rmsg).await.is_err() {
82                    break;
83                }
84                break;
85            }
86            Some(LdapResponseState::Bind(uat, rmsg)) => {
87                session.uat = Some(uat);
88                if w.send(rmsg).await.is_err() {
89                    break;
90                }
91            }
92            Some(LdapResponseState::Respond(rmsg)) => {
93                if w.send(rmsg).await.is_err() {
94                    break;
95                }
96            }
97            Some(LdapResponseState::MultiPartResponse(v)) => {
98                for rmsg in v.into_iter() {
99                    if w.send(rmsg).await.is_err() {
100                        break;
101                    }
102                }
103            }
104            Some(LdapResponseState::BindMultiPartResponse(uat, v)) => {
105                session.uat = Some(uat);
106                for rmsg in v.into_iter() {
107                    if w.send(rmsg).await.is_err() {
108                        break;
109                    }
110                }
111            }
112            None => {
113                error!("Internal server error");
114                break;
115            }
116        };
117    }
118}
119
120async fn client_tls_accept(
121    stream: TcpStream,
122    tls_acceptor: SslAcceptor,
123    connection_addr: SocketAddr,
124    qe_r_ref: &'static QueryServerReadV1,
125    trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
126) {
127    let enable_proxy_v2_hdr = trusted_proxy_v2_ips
128        .map(|trusted| {
129            trusted
130                .iter()
131                .any(|ip_cidr| ip_cidr.contains(&connection_addr.ip()))
132        })
133        .unwrap_or_default();
134
135    let (stream, client_addr) = if enable_proxy_v2_hdr {
136        match ProxyHdrV2::parse_from_read(stream).await {
137            Ok((stream, hdr)) => {
138                let remote_socket_addr = match hdr.to_remote_addr() {
139                    RemoteAddress::Local => {
140                        debug!("PROXY protocol liveness check - will not contain client data");
141                        return;
142                    }
143                    RemoteAddress::TcpV4 { src, dst: _ } => SocketAddr::from(src),
144                    RemoteAddress::TcpV6 { src, dst: _ } => SocketAddr::from(src),
145                    remote_addr => {
146                        error!(?remote_addr, "remote address in proxy header is invalid");
147                        return;
148                    }
149                };
150
151                (stream, remote_socket_addr)
152            }
153            Err(err) => {
154                error!(?connection_addr, ?err, "Unable to process proxy v2 header");
155                return;
156            }
157        }
158    } else {
159        (stream, connection_addr)
160    };
161
162    // Start the event
163    // From the parameters we need to create an SslContext.
164    let mut tlsstream = match Ssl::new(tls_acceptor.context())
165        .and_then(|tls_obj| SslStream::new(tls_obj, stream))
166    {
167        Ok(ta) => ta,
168        Err(err) => {
169            error!(?err, %client_addr, %connection_addr, "LDAP TLS setup error");
170            return;
171        }
172    };
173    if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
174        error!(?err, %client_addr, %connection_addr, "LDAP TLS accept error");
175        return;
176    };
177
178    tokio::spawn(client_process(
179        tlsstream,
180        client_addr,
181        connection_addr,
182        qe_r_ref,
183    ));
184}
185
186/// TLS LDAP Listener, hands off to [client_tls_accept]
187async fn ldap_tls_acceptor(
188    listener: TcpListener,
189    mut tls_acceptor: SslAcceptor,
190    qe_r_ref: &'static QueryServerReadV1,
191    mut rx: broadcast::Receiver<CoreAction>,
192    mut tls_acceptor_reload_rx: mpsc::Receiver<SslAcceptor>,
193    trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
194) {
195    loop {
196        tokio::select! {
197            Ok(action) = rx.recv() => {
198                match action {
199                    CoreAction::Shutdown => break,
200                }
201            }
202            accept_result = listener.accept() => {
203                match accept_result {
204                    Ok((tcpstream, client_socket_addr)) => {
205                        let clone_tls_acceptor = tls_acceptor.clone();
206                        tokio::spawn(client_tls_accept(tcpstream, clone_tls_acceptor, client_socket_addr, qe_r_ref, trusted_proxy_v2_ips.clone()));
207                    }
208                    Err(err) => {
209                        warn!(?err, "LDAP acceptor error, continuing");
210                    }
211                }
212            }
213            Some(mut new_tls_acceptor) = tls_acceptor_reload_rx.recv() => {
214                std::mem::swap(&mut tls_acceptor, &mut new_tls_acceptor);
215                info!("Reloaded ldap tls acceptor");
216            }
217        }
218    }
219    info!("Stopped {}", super::TaskName::LdapActor);
220}
221
222/// PLAIN LDAP Listener, hands off to [client_process]
223async fn ldap_plaintext_acceptor(
224    listener: TcpListener,
225    qe_r_ref: &'static QueryServerReadV1,
226    mut rx: broadcast::Receiver<CoreAction>,
227) {
228    loop {
229        tokio::select! {
230            Ok(action) = rx.recv() => {
231                match action {
232                    CoreAction::Shutdown => break,
233                }
234            }
235            accept_result = listener.accept() => {
236                match accept_result {
237                    Ok((tcpstream, client_socket_addr)) => {
238                        tokio::spawn(client_process(tcpstream, client_socket_addr, client_socket_addr, qe_r_ref));
239                    }
240                    Err(e) => {
241                        error!("LDAP acceptor error, continuing -> {:?}", e);
242                    }
243                }
244            }
245        }
246    }
247    info!("Stopped {}", super::TaskName::LdapActor);
248}
249
250pub(crate) async fn create_ldap_server(
251    address: &str,
252    opt_ssl_acceptor: Option<SslAcceptor>,
253    qe_r_ref: &'static QueryServerReadV1,
254    rx: broadcast::Receiver<CoreAction>,
255    tls_acceptor_reload_rx: mpsc::Receiver<SslAcceptor>,
256    trusted_proxy_v2_ips: Option<Vec<IpCidr>>,
257) -> Result<tokio::task::JoinHandle<()>, ()> {
258    if address.starts_with(":::") {
259        // takes :::xxxx to xxxx
260        let port = address.replacen(":::", "", 1);
261        error!("Address '{}' looks like an attempt to wildcard bind with IPv6 on port {} - please try using ldapbindaddress = '[::]:{}'", address, port, port);
262    };
263
264    let addr = SocketAddr::from_str(address).map_err(|e| {
265        error!("Could not parse LDAP server address {} -> {:?}", address, e);
266    })?;
267
268    let listener = TcpListener::bind(&addr).await.map_err(|e| {
269        error!(
270            "Could not bind to LDAP server address {} -> {:?}",
271            address, e
272        );
273    })?;
274
275    let trusted_proxy_v2_ips = trusted_proxy_v2_ips.map(Arc::new);
276
277    let ldap_acceptor_handle = match opt_ssl_acceptor {
278        Some(ssl_acceptor) => {
279            info!("Starting LDAPS interface ldaps://{} ...", address);
280
281            tokio::spawn(ldap_tls_acceptor(
282                listener,
283                ssl_acceptor,
284                qe_r_ref,
285                rx,
286                tls_acceptor_reload_rx,
287                trusted_proxy_v2_ips,
288            ))
289        }
290        None => tokio::spawn(ldap_plaintext_acceptor(listener, qe_r_ref, rx)),
291    };
292
293    info!("Created LDAP interface");
294    Ok(ldap_acceptor_handle)
295}