1use crate::actors::QueryServerReadV1;
2use crate::CoreAction;
3use cidr::IpCidr;
4use futures_util::sink::SinkExt;
5use futures_util::stream::StreamExt;
6use haproxy_protocol::{ProxyHdrV2, RemoteAddress};
7use kanidmd_lib::idm::ldap::{LdapBoundToken, LdapResponseState};
8use kanidmd_lib::prelude::*;
9use ldap3_proto::proto::LdapMsg;
10use ldap3_proto::LdapCodec;
11use std::net::SocketAddr;
12use std::str::FromStr;
13use std::sync::Arc;
14use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
15use tokio::net::{TcpListener, TcpStream};
16use tokio::sync::broadcast;
17use tokio::sync::mpsc;
18use tokio::time::timeout;
19use tokio_rustls::TlsAcceptor;
20use tokio_util::codec::{FramedRead, FramedWrite};
21
22const LDAP_CLIENT_IO_TIMEOUT: Duration = Duration::from_secs(300);
23const LDAP_CLIENT_CONN_TIMEOUT: Duration = Duration::from_secs(30);
24
25struct LdapSession {
26    uat: Option<LdapBoundToken>,
27}
28
29impl LdapSession {
30    fn new() -> Self {
31        LdapSession {
32            uat: None,
34        }
35    }
36}
37
38#[instrument(name = "ldap-request", skip(client_address, qe_r_ref))]
39async fn client_process_msg(
40    uat: Option<LdapBoundToken>,
41    client_address: SocketAddr,
42    protomsg: LdapMsg,
43    qe_r_ref: &'static QueryServerReadV1,
44) -> Option<LdapResponseState> {
45    let eventid = sketching::tracing_forest::id();
46    security_info!(
47        client_ip = %client_address.ip(),
48        client_port = %client_address.port(),
49        "LDAP client"
50    );
51    qe_r_ref
52        .handle_ldaprequest(eventid, protomsg, uat, client_address.ip())
53        .await
54}
55
56async fn client_process<STREAM>(
57    stream: STREAM,
58    client_address: SocketAddr,
59    connection_address: SocketAddr,
60    qe_r_ref: &'static QueryServerReadV1,
61) where
62    STREAM: AsyncRead + AsyncWrite + AsyncWriteExt + Unpin,
63{
64    let (r, w) = tokio::io::split(stream);
65    let mut r = FramedRead::new(r, LdapCodec::default());
66    let mut w = FramedWrite::new(w, LdapCodec::default());
67
68    let mut session = LdapSession::new();
70    loop {
72        let protomsg = match timeout(LDAP_CLIENT_IO_TIMEOUT, r.next()).await {
73            Ok(Some(Ok(protomsg))) => protomsg,
74            Ok(Some(Err(req_err))) => {
75                error!(?req_err, "Invalid LDAP request");
76                break;
77            }
78            Ok(None) => {
79                debug!("connection closed");
80                break;
81            }
82            Err(_) => {
83                debug!("client IO timeout, closing connection");
84                break;
85            }
86        };
87
88        let uat = session.uat.clone();
90        let caddr = client_address;
91
92        debug!(?client_address, ?connection_address);
93
94        match client_process_msg(uat, caddr, protomsg, qe_r_ref).await {
95            Some(LdapResponseState::Unbind) => break,
98            Some(LdapResponseState::Disconnect(rmsg)) => {
99                if w.send(rmsg).await.is_err() {
100                    break;
101                }
102                break;
103            }
104            Some(LdapResponseState::Bind(uat, rmsg)) => {
105                session.uat = Some(uat);
106                if w.send(rmsg).await.is_err() {
107                    break;
108                }
109            }
110            Some(LdapResponseState::Respond(rmsg)) => {
111                if w.send(rmsg).await.is_err() {
112                    break;
113                }
114            }
115            Some(LdapResponseState::MultiPartResponse(v)) => {
116                for rmsg in v.into_iter() {
117                    if w.send(rmsg).await.is_err() {
118                        break;
119                    }
120                }
121            }
122            Some(LdapResponseState::BindMultiPartResponse(uat, v)) => {
123                session.uat = Some(uat);
124                for rmsg in v.into_iter() {
125                    if w.send(rmsg).await.is_err() {
126                        break;
127                    }
128                }
129            }
130            None => {
131                error!("Internal server error");
132                break;
133            }
134        };
135    }
136
137    let r = r.into_inner();
140    let w = w.into_inner();
141
142    let mut stream = r.unsplit(w);
143
144    match timeout(LDAP_CLIENT_IO_TIMEOUT, stream.shutdown()).await {
145        Ok(Ok(_)) => debug!("Connection closed successfully"),
146        Ok(Err(tls_err)) => warn!(?tls_err, "Unable to cleanly shutdown client connection"),
147        Err(_) => error!("Timeout attempting to close connection"),
148    }
149}
150
151async fn client_tls_accept(
152    stream: TcpStream,
153    tls_acceptor: TlsAcceptor,
154    connection_addr: SocketAddr,
155    qe_r_ref: &'static QueryServerReadV1,
156    trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
157) {
158    let enable_proxy_v2_hdr = trusted_proxy_v2_ips
159        .map(|trusted| {
160            trusted
161                .iter()
162                .any(|ip_cidr| ip_cidr.contains(&connection_addr.ip().to_canonical()))
163        })
164        .unwrap_or_default();
165
166    let (mut stream, client_addr) = if enable_proxy_v2_hdr {
167        match timeout(
168            LDAP_CLIENT_CONN_TIMEOUT,
169            ProxyHdrV2::parse_from_read(stream),
170        )
171        .await
172        {
173            Ok(Ok((stream, hdr))) => {
174                let remote_socket_addr = match hdr.to_remote_addr() {
175                    RemoteAddress::Local => {
176                        debug!("PROXY protocol liveness check - will not contain client data");
177                        connection_addr
179                    }
180                    RemoteAddress::TcpV4 { src, dst: _ } => SocketAddr::from(src),
181                    RemoteAddress::TcpV6 { src, dst: _ } => SocketAddr::from(src),
182                    remote_addr => {
183                        error!(?remote_addr, "remote address in proxy header is invalid");
184                        return;
185                    }
186                };
187
188                (stream, remote_socket_addr)
189            }
190            Ok(Err(err)) => {
191                error!(?connection_addr, ?err, "Unable to process proxy v2 header");
192                return;
193            }
194            Err(_) => {
195                error!(?connection_addr, "Timeout receiving proxy v2 header");
196                return;
197            }
198        }
199    } else {
200        (stream, connection_addr)
201    };
202
203    let mut zero_buf: [u8; 0] = [];
204    match timeout(LDAP_CLIENT_CONN_TIMEOUT, stream.read(&mut zero_buf)).await {
205        Ok(Ok(_)) => {}
206        Ok(Err(_)) => {
207            debug!(%client_addr, %connection_addr, "Connection closed before we recieved initial data");
208            return;
209        }
210        Err(_) => {
211            error!(%client_addr, %connection_addr, "LDAP timeout waiting for initial data");
212            return;
213        }
214    };
215
216    let tlsstream = match timeout(LDAP_CLIENT_CONN_TIMEOUT, tls_acceptor.accept(stream)).await {
217        Ok(Ok(ta)) => ta,
218        Ok(Err(err)) => {
219            error!(?err, %client_addr, %connection_addr, "LDAP TLS setup error");
220            return;
221        }
222        Err(_) => {
223            error!(%client_addr, %connection_addr, "LDAP TLS timeout error");
224            return;
225        }
226    };
227
228    tokio::spawn(client_process(
230        tlsstream,
231        client_addr,
232        connection_addr,
233        qe_r_ref,
234    ));
235}
236
237async fn ldap_tls_acceptor(
239    listener: TcpListener,
240    mut tls_acceptor: TlsAcceptor,
241    qe_r_ref: &'static QueryServerReadV1,
242    mut rx: broadcast::Receiver<CoreAction>,
243    mut tls_acceptor_reload_rx: mpsc::Receiver<TlsAcceptor>,
244    trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
245) {
246    loop {
247        tokio::select! {
248            Ok(action) = rx.recv() => {
249                match action {
250                    CoreAction::Shutdown => break,
251                }
252            }
253            accept_result = listener.accept() => {
254                match accept_result {
255                    Ok((tcpstream, client_socket_addr)) => {
256                        let clone_tls_acceptor = tls_acceptor.clone();
257                        tokio::spawn(client_tls_accept(tcpstream, clone_tls_acceptor, client_socket_addr, qe_r_ref, trusted_proxy_v2_ips.clone()));
258                    }
259                    Err(err) => {
260                        warn!(?err, "LDAP acceptor error, continuing");
261                    }
262                }
263            }
264            Some(mut new_tls_acceptor) = tls_acceptor_reload_rx.recv() => {
265                std::mem::swap(&mut tls_acceptor, &mut new_tls_acceptor);
266                info!("Reloaded ldap tls acceptor");
267            }
268        }
269    }
270    info!("Stopped {}", super::TaskName::LdapActor);
271}
272
273async fn ldap_plaintext_acceptor(
275    listener: TcpListener,
276    qe_r_ref: &'static QueryServerReadV1,
277    mut rx: broadcast::Receiver<CoreAction>,
278) {
279    loop {
280        tokio::select! {
281            Ok(action) = rx.recv() => {
282                match action {
283                    CoreAction::Shutdown => break,
284                }
285            }
286            accept_result = listener.accept() => {
287                match accept_result {
288                    Ok((tcpstream, client_socket_addr)) => {
289                        tokio::spawn(client_process(tcpstream, client_socket_addr, client_socket_addr, qe_r_ref));
290                    }
291                    Err(e) => {
292                        error!("LDAP acceptor error, continuing -> {:?}", e);
293                    }
294                }
295            }
296        }
297    }
298    info!("Stopped {}", super::TaskName::LdapActor);
299}
300
301pub(crate) async fn create_ldap_server(
302    address: &str,
303    opt_ssl_acceptor: Option<TlsAcceptor>,
304    qe_r_ref: &'static QueryServerReadV1,
305    rx: broadcast::Receiver<CoreAction>,
306    tls_acceptor_reload_rx: mpsc::Receiver<TlsAcceptor>,
307    trusted_proxy_v2_ips: Option<Vec<IpCidr>>,
308) -> Result<tokio::task::JoinHandle<()>, ()> {
309    if address.starts_with(":::") {
310        let port = address.replacen(":::", "", 1);
312        error!("Address '{}' looks like an attempt to wildcard bind with IPv6 on port {} - please try using ldapbindaddress = '[::]:{}'", address, port, port);
313    };
314
315    let addr = SocketAddr::from_str(address).map_err(|e| {
316        error!("Could not parse LDAP server address {} -> {:?}", address, e);
317    })?;
318
319    let listener = TcpListener::bind(&addr).await.map_err(|e| {
320        error!(
321            "Could not bind to LDAP server address {} -> {:?}",
322            address, e
323        );
324    })?;
325
326    let trusted_proxy_v2_ips = trusted_proxy_v2_ips.map(Arc::new);
327
328    let ldap_acceptor_handle = match opt_ssl_acceptor {
329        Some(ssl_acceptor) => {
330            info!("Starting LDAPS interface ldaps://{} ...", address);
331
332            tokio::spawn(ldap_tls_acceptor(
333                listener,
334                ssl_acceptor,
335                qe_r_ref,
336                rx,
337                tls_acceptor_reload_rx,
338                trusted_proxy_v2_ips,
339            ))
340        }
341        None => tokio::spawn(ldap_plaintext_acceptor(listener, qe_r_ref, rx)),
342    };
343
344    info!("Created LDAP interface");
345    Ok(ldap_acceptor_handle)
346}