kanidmd_core/
ldaps.rs

1use crate::actors::QueryServerReadV1;
2use crate::CoreAction;
3use futures_util::sink::SinkExt;
4use futures_util::stream::StreamExt;
5use haproxy_protocol::{ProxyHdrV2, RemoteAddress};
6use hashbrown::HashSet;
7use kanidmd_lib::idm::ldap::{LdapBoundToken, LdapResponseState};
8use kanidmd_lib::prelude::*;
9use ldap3_proto::proto::LdapMsg;
10use ldap3_proto::LdapCodec;
11use openssl::ssl::{Ssl, SslAcceptor};
12use std::net::{IpAddr, SocketAddr};
13use std::pin::Pin;
14use std::str::FromStr;
15use std::sync::Arc;
16use tokio::io::{AsyncRead, AsyncWrite};
17use tokio::net::{TcpListener, TcpStream};
18use tokio::sync::broadcast;
19use tokio::sync::mpsc;
20use tokio_openssl::SslStream;
21use tokio_util::codec::{FramedRead, FramedWrite};
22
23struct LdapSession {
24    uat: Option<LdapBoundToken>,
25}
26
27impl LdapSession {
28    fn new() -> Self {
29        LdapSession {
30            // We start un-authenticated
31            uat: None,
32        }
33    }
34}
35
36#[instrument(name = "ldap-request", skip(client_address, qe_r_ref))]
37async fn client_process_msg(
38    uat: Option<LdapBoundToken>,
39    client_address: SocketAddr,
40    protomsg: LdapMsg,
41    qe_r_ref: &'static QueryServerReadV1,
42) -> Option<LdapResponseState> {
43    let eventid = sketching::tracing_forest::id();
44    security_info!(
45        client_ip = %client_address.ip(),
46        client_port = %client_address.port(),
47        "LDAP client"
48    );
49    qe_r_ref
50        .handle_ldaprequest(eventid, protomsg, uat, client_address.ip())
51        .await
52}
53
54async fn client_process<STREAM>(
55    stream: STREAM,
56    client_address: SocketAddr,
57    connection_address: SocketAddr,
58    qe_r_ref: &'static QueryServerReadV1,
59) where
60    STREAM: AsyncRead + AsyncWrite,
61{
62    let (r, w) = tokio::io::split(stream);
63    let mut r = FramedRead::new(r, LdapCodec::default());
64    let mut w = FramedWrite::new(w, LdapCodec::default());
65
66    // This is a connected client session. we need to associate some state to the session
67    let mut session = LdapSession::new();
68    // Now that we have the session we begin an event loop to process input OR we return.
69    while let Some(Ok(protomsg)) = r.next().await {
70        // Start the event
71        let uat = session.uat.clone();
72        let caddr = client_address;
73
74        debug!(?client_address, ?connection_address);
75
76        match client_process_msg(uat, caddr, protomsg, qe_r_ref).await {
77            // I'd really have liked to have put this near the [LdapResponseState::Bind] but due
78            // to the handing of `audit` it isn't possible due to borrows, etc.
79            Some(LdapResponseState::Unbind) => return,
80            Some(LdapResponseState::Disconnect(rmsg)) => {
81                if w.send(rmsg).await.is_err() {
82                    break;
83                }
84                break;
85            }
86            Some(LdapResponseState::Bind(uat, rmsg)) => {
87                session.uat = Some(uat);
88                if w.send(rmsg).await.is_err() {
89                    break;
90                }
91            }
92            Some(LdapResponseState::Respond(rmsg)) => {
93                if w.send(rmsg).await.is_err() {
94                    break;
95                }
96            }
97            Some(LdapResponseState::MultiPartResponse(v)) => {
98                for rmsg in v.into_iter() {
99                    if w.send(rmsg).await.is_err() {
100                        break;
101                    }
102                }
103            }
104            Some(LdapResponseState::BindMultiPartResponse(uat, v)) => {
105                session.uat = Some(uat);
106                for rmsg in v.into_iter() {
107                    if w.send(rmsg).await.is_err() {
108                        break;
109                    }
110                }
111            }
112            None => {
113                error!("Internal server error");
114                break;
115            }
116        };
117    }
118}
119
120async fn client_tls_accept(
121    stream: TcpStream,
122    tls_acceptor: SslAcceptor,
123    connection_addr: SocketAddr,
124    qe_r_ref: &'static QueryServerReadV1,
125    trusted_proxy_v2_ips: Option<Arc<HashSet<IpAddr>>>,
126) {
127    let enable_proxy_v2_hdr = trusted_proxy_v2_ips
128        .map(|trusted| trusted.contains(&connection_addr.ip()))
129        .unwrap_or_default();
130
131    let (stream, client_addr) = if enable_proxy_v2_hdr {
132        match ProxyHdrV2::parse_from_read(stream).await {
133            Ok((stream, hdr)) => {
134                let remote_socket_addr = match hdr.to_remote_addr() {
135                    RemoteAddress::Local => {
136                        debug!("PROXY protocol liveness check - will not contain client data");
137                        return;
138                    }
139                    RemoteAddress::TcpV4 { src, dst: _ } => SocketAddr::from(src),
140                    RemoteAddress::TcpV6 { src, dst: _ } => SocketAddr::from(src),
141                    remote_addr => {
142                        error!(?remote_addr, "remote address in proxy header is invalid");
143                        return;
144                    }
145                };
146
147                (stream, remote_socket_addr)
148            }
149            Err(err) => {
150                error!(?connection_addr, ?err, "Unable to process proxy v2 header");
151                return;
152            }
153        }
154    } else {
155        (stream, connection_addr)
156    };
157
158    // Start the event
159    // From the parameters we need to create an SslContext.
160    let mut tlsstream = match Ssl::new(tls_acceptor.context())
161        .and_then(|tls_obj| SslStream::new(tls_obj, stream))
162    {
163        Ok(ta) => ta,
164        Err(err) => {
165            error!(?err, %client_addr, %connection_addr, "LDAP TLS setup error");
166            return;
167        }
168    };
169    if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
170        error!(?err, %client_addr, %connection_addr, "LDAP TLS accept error");
171        return;
172    };
173
174    tokio::spawn(client_process(
175        tlsstream,
176        client_addr,
177        connection_addr,
178        qe_r_ref,
179    ));
180}
181
182/// TLS LDAP Listener, hands off to [client_tls_accept]
183async fn ldap_tls_acceptor(
184    listener: TcpListener,
185    mut tls_acceptor: SslAcceptor,
186    qe_r_ref: &'static QueryServerReadV1,
187    mut rx: broadcast::Receiver<CoreAction>,
188    mut tls_acceptor_reload_rx: mpsc::Receiver<SslAcceptor>,
189    trusted_proxy_v2_ips: Option<Arc<HashSet<IpAddr>>>,
190) {
191    loop {
192        tokio::select! {
193            Ok(action) = rx.recv() => {
194                match action {
195                    CoreAction::Shutdown => break,
196                }
197            }
198            accept_result = listener.accept() => {
199                match accept_result {
200                    Ok((tcpstream, client_socket_addr)) => {
201                        let clone_tls_acceptor = tls_acceptor.clone();
202                        tokio::spawn(client_tls_accept(tcpstream, clone_tls_acceptor, client_socket_addr, qe_r_ref, trusted_proxy_v2_ips.clone()));
203                    }
204                    Err(err) => {
205                        warn!(?err, "LDAP acceptor error, continuing");
206                    }
207                }
208            }
209            Some(mut new_tls_acceptor) = tls_acceptor_reload_rx.recv() => {
210                std::mem::swap(&mut tls_acceptor, &mut new_tls_acceptor);
211                info!("Reloaded ldap tls acceptor");
212            }
213        }
214    }
215    info!("Stopped {}", super::TaskName::LdapActor);
216}
217
218/// PLAIN LDAP Listener, hands off to [client_process]
219async fn ldap_plaintext_acceptor(
220    listener: TcpListener,
221    qe_r_ref: &'static QueryServerReadV1,
222    mut rx: broadcast::Receiver<CoreAction>,
223) {
224    loop {
225        tokio::select! {
226            Ok(action) = rx.recv() => {
227                match action {
228                    CoreAction::Shutdown => break,
229                }
230            }
231            accept_result = listener.accept() => {
232                match accept_result {
233                    Ok((tcpstream, client_socket_addr)) => {
234                        tokio::spawn(client_process(tcpstream, client_socket_addr, client_socket_addr, qe_r_ref));
235                    }
236                    Err(e) => {
237                        error!("LDAP acceptor error, continuing -> {:?}", e);
238                    }
239                }
240            }
241        }
242    }
243    info!("Stopped {}", super::TaskName::LdapActor);
244}
245
246pub(crate) async fn create_ldap_server(
247    address: &str,
248    opt_ssl_acceptor: Option<SslAcceptor>,
249    qe_r_ref: &'static QueryServerReadV1,
250    rx: broadcast::Receiver<CoreAction>,
251    tls_acceptor_reload_rx: mpsc::Receiver<SslAcceptor>,
252    trusted_proxy_v2_ips: Option<HashSet<IpAddr>>,
253) -> Result<tokio::task::JoinHandle<()>, ()> {
254    if address.starts_with(":::") {
255        // takes :::xxxx to xxxx
256        let port = address.replacen(":::", "", 1);
257        error!("Address '{}' looks like an attempt to wildcard bind with IPv6 on port {} - please try using ldapbindaddress = '[::]:{}'", address, port, port);
258    };
259
260    let addr = SocketAddr::from_str(address).map_err(|e| {
261        error!("Could not parse LDAP server address {} -> {:?}", address, e);
262    })?;
263
264    let listener = TcpListener::bind(&addr).await.map_err(|e| {
265        error!(
266            "Could not bind to LDAP server address {} -> {:?}",
267            address, e
268        );
269    })?;
270
271    let trusted_proxy_v2_ips = trusted_proxy_v2_ips.map(Arc::new);
272
273    let ldap_acceptor_handle = match opt_ssl_acceptor {
274        Some(ssl_acceptor) => {
275            info!("Starting LDAPS interface ldaps://{} ...", address);
276
277            tokio::spawn(ldap_tls_acceptor(
278                listener,
279                ssl_acceptor,
280                qe_r_ref,
281                rx,
282                tls_acceptor_reload_rx,
283                trusted_proxy_v2_ips,
284            ))
285        }
286        None => tokio::spawn(ldap_plaintext_acceptor(listener, qe_r_ref, rx)),
287    };
288
289    info!("Created LDAP interface");
290    Ok(ldap_acceptor_handle)
291}