1use crate::actors::QueryServerReadV1;
2use crate::CoreAction;
3use futures_util::sink::SinkExt;
4use futures_util::stream::StreamExt;
5use haproxy_protocol::{ProxyHdrV2, RemoteAddress};
6use hashbrown::HashSet;
7use kanidmd_lib::idm::ldap::{LdapBoundToken, LdapResponseState};
8use kanidmd_lib::prelude::*;
9use ldap3_proto::proto::LdapMsg;
10use ldap3_proto::LdapCodec;
11use openssl::ssl::{Ssl, SslAcceptor};
12use std::net::{IpAddr, SocketAddr};
13use std::pin::Pin;
14use std::str::FromStr;
15use std::sync::Arc;
16use tokio::io::{AsyncRead, AsyncWrite};
17use tokio::net::{TcpListener, TcpStream};
18use tokio::sync::broadcast;
19use tokio::sync::mpsc;
20use tokio_openssl::SslStream;
21use tokio_util::codec::{FramedRead, FramedWrite};
22
23struct LdapSession {
24 uat: Option<LdapBoundToken>,
25}
26
27impl LdapSession {
28 fn new() -> Self {
29 LdapSession {
30 uat: None,
32 }
33 }
34}
35
36#[instrument(name = "ldap-request", skip(client_address, qe_r_ref))]
37async fn client_process_msg(
38 uat: Option<LdapBoundToken>,
39 client_address: SocketAddr,
40 protomsg: LdapMsg,
41 qe_r_ref: &'static QueryServerReadV1,
42) -> Option<LdapResponseState> {
43 let eventid = sketching::tracing_forest::id();
44 security_info!(
45 client_ip = %client_address.ip(),
46 client_port = %client_address.port(),
47 "LDAP client"
48 );
49 qe_r_ref
50 .handle_ldaprequest(eventid, protomsg, uat, client_address.ip())
51 .await
52}
53
54async fn client_process<STREAM>(
55 stream: STREAM,
56 client_address: SocketAddr,
57 connection_address: SocketAddr,
58 qe_r_ref: &'static QueryServerReadV1,
59) where
60 STREAM: AsyncRead + AsyncWrite,
61{
62 let (r, w) = tokio::io::split(stream);
63 let mut r = FramedRead::new(r, LdapCodec::default());
64 let mut w = FramedWrite::new(w, LdapCodec::default());
65
66 let mut session = LdapSession::new();
68 while let Some(Ok(protomsg)) = r.next().await {
70 let uat = session.uat.clone();
72 let caddr = client_address;
73
74 debug!(?client_address, ?connection_address);
75
76 match client_process_msg(uat, caddr, protomsg, qe_r_ref).await {
77 Some(LdapResponseState::Unbind) => return,
80 Some(LdapResponseState::Disconnect(rmsg)) => {
81 if w.send(rmsg).await.is_err() {
82 break;
83 }
84 break;
85 }
86 Some(LdapResponseState::Bind(uat, rmsg)) => {
87 session.uat = Some(uat);
88 if w.send(rmsg).await.is_err() {
89 break;
90 }
91 }
92 Some(LdapResponseState::Respond(rmsg)) => {
93 if w.send(rmsg).await.is_err() {
94 break;
95 }
96 }
97 Some(LdapResponseState::MultiPartResponse(v)) => {
98 for rmsg in v.into_iter() {
99 if w.send(rmsg).await.is_err() {
100 break;
101 }
102 }
103 }
104 Some(LdapResponseState::BindMultiPartResponse(uat, v)) => {
105 session.uat = Some(uat);
106 for rmsg in v.into_iter() {
107 if w.send(rmsg).await.is_err() {
108 break;
109 }
110 }
111 }
112 None => {
113 error!("Internal server error");
114 break;
115 }
116 };
117 }
118}
119
120async fn client_tls_accept(
121 stream: TcpStream,
122 tls_acceptor: SslAcceptor,
123 connection_addr: SocketAddr,
124 qe_r_ref: &'static QueryServerReadV1,
125 trusted_proxy_v2_ips: Option<Arc<HashSet<IpAddr>>>,
126) {
127 let enable_proxy_v2_hdr = trusted_proxy_v2_ips
128 .map(|trusted| trusted.contains(&connection_addr.ip()))
129 .unwrap_or_default();
130
131 let (stream, client_addr) = if enable_proxy_v2_hdr {
132 match ProxyHdrV2::parse_from_read(stream).await {
133 Ok((stream, hdr)) => {
134 let remote_socket_addr = match hdr.to_remote_addr() {
135 RemoteAddress::Local => {
136 debug!("PROXY protocol liveness check - will not contain client data");
137 return;
138 }
139 RemoteAddress::TcpV4 { src, dst: _ } => SocketAddr::from(src),
140 RemoteAddress::TcpV6 { src, dst: _ } => SocketAddr::from(src),
141 remote_addr => {
142 error!(?remote_addr, "remote address in proxy header is invalid");
143 return;
144 }
145 };
146
147 (stream, remote_socket_addr)
148 }
149 Err(err) => {
150 error!(?connection_addr, ?err, "Unable to process proxy v2 header");
151 return;
152 }
153 }
154 } else {
155 (stream, connection_addr)
156 };
157
158 let mut tlsstream = match Ssl::new(tls_acceptor.context())
161 .and_then(|tls_obj| SslStream::new(tls_obj, stream))
162 {
163 Ok(ta) => ta,
164 Err(err) => {
165 error!(?err, %client_addr, %connection_addr, "LDAP TLS setup error");
166 return;
167 }
168 };
169 if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
170 error!(?err, %client_addr, %connection_addr, "LDAP TLS accept error");
171 return;
172 };
173
174 tokio::spawn(client_process(
175 tlsstream,
176 client_addr,
177 connection_addr,
178 qe_r_ref,
179 ));
180}
181
182async fn ldap_tls_acceptor(
184 listener: TcpListener,
185 mut tls_acceptor: SslAcceptor,
186 qe_r_ref: &'static QueryServerReadV1,
187 mut rx: broadcast::Receiver<CoreAction>,
188 mut tls_acceptor_reload_rx: mpsc::Receiver<SslAcceptor>,
189 trusted_proxy_v2_ips: Option<Arc<HashSet<IpAddr>>>,
190) {
191 loop {
192 tokio::select! {
193 Ok(action) = rx.recv() => {
194 match action {
195 CoreAction::Shutdown => break,
196 }
197 }
198 accept_result = listener.accept() => {
199 match accept_result {
200 Ok((tcpstream, client_socket_addr)) => {
201 let clone_tls_acceptor = tls_acceptor.clone();
202 tokio::spawn(client_tls_accept(tcpstream, clone_tls_acceptor, client_socket_addr, qe_r_ref, trusted_proxy_v2_ips.clone()));
203 }
204 Err(err) => {
205 warn!(?err, "LDAP acceptor error, continuing");
206 }
207 }
208 }
209 Some(mut new_tls_acceptor) = tls_acceptor_reload_rx.recv() => {
210 std::mem::swap(&mut tls_acceptor, &mut new_tls_acceptor);
211 info!("Reloaded ldap tls acceptor");
212 }
213 }
214 }
215 info!("Stopped {}", super::TaskName::LdapActor);
216}
217
218async fn ldap_plaintext_acceptor(
220 listener: TcpListener,
221 qe_r_ref: &'static QueryServerReadV1,
222 mut rx: broadcast::Receiver<CoreAction>,
223) {
224 loop {
225 tokio::select! {
226 Ok(action) = rx.recv() => {
227 match action {
228 CoreAction::Shutdown => break,
229 }
230 }
231 accept_result = listener.accept() => {
232 match accept_result {
233 Ok((tcpstream, client_socket_addr)) => {
234 tokio::spawn(client_process(tcpstream, client_socket_addr, client_socket_addr, qe_r_ref));
235 }
236 Err(e) => {
237 error!("LDAP acceptor error, continuing -> {:?}", e);
238 }
239 }
240 }
241 }
242 }
243 info!("Stopped {}", super::TaskName::LdapActor);
244}
245
246pub(crate) async fn create_ldap_server(
247 address: &str,
248 opt_ssl_acceptor: Option<SslAcceptor>,
249 qe_r_ref: &'static QueryServerReadV1,
250 rx: broadcast::Receiver<CoreAction>,
251 tls_acceptor_reload_rx: mpsc::Receiver<SslAcceptor>,
252 trusted_proxy_v2_ips: Option<HashSet<IpAddr>>,
253) -> Result<tokio::task::JoinHandle<()>, ()> {
254 if address.starts_with(":::") {
255 let port = address.replacen(":::", "", 1);
257 error!("Address '{}' looks like an attempt to wildcard bind with IPv6 on port {} - please try using ldapbindaddress = '[::]:{}'", address, port, port);
258 };
259
260 let addr = SocketAddr::from_str(address).map_err(|e| {
261 error!("Could not parse LDAP server address {} -> {:?}", address, e);
262 })?;
263
264 let listener = TcpListener::bind(&addr).await.map_err(|e| {
265 error!(
266 "Could not bind to LDAP server address {} -> {:?}",
267 address, e
268 );
269 })?;
270
271 let trusted_proxy_v2_ips = trusted_proxy_v2_ips.map(Arc::new);
272
273 let ldap_acceptor_handle = match opt_ssl_acceptor {
274 Some(ssl_acceptor) => {
275 info!("Starting LDAPS interface ldaps://{} ...", address);
276
277 tokio::spawn(ldap_tls_acceptor(
278 listener,
279 ssl_acceptor,
280 qe_r_ref,
281 rx,
282 tls_acceptor_reload_rx,
283 trusted_proxy_v2_ips,
284 ))
285 }
286 None => tokio::spawn(ldap_plaintext_acceptor(listener, qe_r_ref, rx)),
287 };
288
289 info!("Created LDAP interface");
290 Ok(ldap_acceptor_handle)
291}