1use crate::actors::QueryServerReadV1;
2use crate::CoreAction;
3use cidr::IpCidr;
4use futures_util::sink::SinkExt;
5use futures_util::stream::StreamExt;
6use haproxy_protocol::{ProxyHdrV2, RemoteAddress};
7use kanidmd_lib::idm::ldap::{LdapBoundToken, LdapResponseState};
8use kanidmd_lib::prelude::*;
9use ldap3_proto::proto::LdapMsg;
10use ldap3_proto::LdapCodec;
11use openssl::ssl::{Ssl, SslAcceptor};
12use std::net::SocketAddr;
13use std::pin::Pin;
14use std::str::FromStr;
15use std::sync::Arc;
16use tokio::io::{AsyncRead, AsyncWrite};
17use tokio::net::{TcpListener, TcpStream};
18use tokio::sync::broadcast;
19use tokio::sync::mpsc;
20use tokio_openssl::SslStream;
21use tokio_util::codec::{FramedRead, FramedWrite};
22
23struct LdapSession {
24 uat: Option<LdapBoundToken>,
25}
26
27impl LdapSession {
28 fn new() -> Self {
29 LdapSession {
30 uat: None,
32 }
33 }
34}
35
36#[instrument(name = "ldap-request", skip(client_address, qe_r_ref))]
37async fn client_process_msg(
38 uat: Option<LdapBoundToken>,
39 client_address: SocketAddr,
40 protomsg: LdapMsg,
41 qe_r_ref: &'static QueryServerReadV1,
42) -> Option<LdapResponseState> {
43 let eventid = sketching::tracing_forest::id();
44 security_info!(
45 client_ip = %client_address.ip(),
46 client_port = %client_address.port(),
47 "LDAP client"
48 );
49 qe_r_ref
50 .handle_ldaprequest(eventid, protomsg, uat, client_address.ip())
51 .await
52}
53
54async fn client_process<STREAM>(
55 stream: STREAM,
56 client_address: SocketAddr,
57 connection_address: SocketAddr,
58 qe_r_ref: &'static QueryServerReadV1,
59) where
60 STREAM: AsyncRead + AsyncWrite,
61{
62 let (r, w) = tokio::io::split(stream);
63 let mut r = FramedRead::new(r, LdapCodec::default());
64 let mut w = FramedWrite::new(w, LdapCodec::default());
65
66 let mut session = LdapSession::new();
68 while let Some(Ok(protomsg)) = r.next().await {
70 let uat = session.uat.clone();
72 let caddr = client_address;
73
74 debug!(?client_address, ?connection_address);
75
76 match client_process_msg(uat, caddr, protomsg, qe_r_ref).await {
77 Some(LdapResponseState::Unbind) => return,
80 Some(LdapResponseState::Disconnect(rmsg)) => {
81 if w.send(rmsg).await.is_err() {
82 break;
83 }
84 break;
85 }
86 Some(LdapResponseState::Bind(uat, rmsg)) => {
87 session.uat = Some(uat);
88 if w.send(rmsg).await.is_err() {
89 break;
90 }
91 }
92 Some(LdapResponseState::Respond(rmsg)) => {
93 if w.send(rmsg).await.is_err() {
94 break;
95 }
96 }
97 Some(LdapResponseState::MultiPartResponse(v)) => {
98 for rmsg in v.into_iter() {
99 if w.send(rmsg).await.is_err() {
100 break;
101 }
102 }
103 }
104 Some(LdapResponseState::BindMultiPartResponse(uat, v)) => {
105 session.uat = Some(uat);
106 for rmsg in v.into_iter() {
107 if w.send(rmsg).await.is_err() {
108 break;
109 }
110 }
111 }
112 None => {
113 error!("Internal server error");
114 break;
115 }
116 };
117 }
118}
119
120async fn client_tls_accept(
121 stream: TcpStream,
122 tls_acceptor: SslAcceptor,
123 connection_addr: SocketAddr,
124 qe_r_ref: &'static QueryServerReadV1,
125 trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
126) {
127 let enable_proxy_v2_hdr = trusted_proxy_v2_ips
128 .map(|trusted| {
129 trusted
130 .iter()
131 .any(|ip_cidr| ip_cidr.contains(&connection_addr.ip()))
132 })
133 .unwrap_or_default();
134
135 let (stream, client_addr) = if enable_proxy_v2_hdr {
136 match ProxyHdrV2::parse_from_read(stream).await {
137 Ok((stream, hdr)) => {
138 let remote_socket_addr = match hdr.to_remote_addr() {
139 RemoteAddress::Local => {
140 debug!("PROXY protocol liveness check - will not contain client data");
141 return;
142 }
143 RemoteAddress::TcpV4 { src, dst: _ } => SocketAddr::from(src),
144 RemoteAddress::TcpV6 { src, dst: _ } => SocketAddr::from(src),
145 remote_addr => {
146 error!(?remote_addr, "remote address in proxy header is invalid");
147 return;
148 }
149 };
150
151 (stream, remote_socket_addr)
152 }
153 Err(err) => {
154 error!(?connection_addr, ?err, "Unable to process proxy v2 header");
155 return;
156 }
157 }
158 } else {
159 (stream, connection_addr)
160 };
161
162 let mut tlsstream = match Ssl::new(tls_acceptor.context())
165 .and_then(|tls_obj| SslStream::new(tls_obj, stream))
166 {
167 Ok(ta) => ta,
168 Err(err) => {
169 error!(?err, %client_addr, %connection_addr, "LDAP TLS setup error");
170 return;
171 }
172 };
173 if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
174 error!(?err, %client_addr, %connection_addr, "LDAP TLS accept error");
175 return;
176 };
177
178 tokio::spawn(client_process(
179 tlsstream,
180 client_addr,
181 connection_addr,
182 qe_r_ref,
183 ));
184}
185
186async fn ldap_tls_acceptor(
188 listener: TcpListener,
189 mut tls_acceptor: SslAcceptor,
190 qe_r_ref: &'static QueryServerReadV1,
191 mut rx: broadcast::Receiver<CoreAction>,
192 mut tls_acceptor_reload_rx: mpsc::Receiver<SslAcceptor>,
193 trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
194) {
195 loop {
196 tokio::select! {
197 Ok(action) = rx.recv() => {
198 match action {
199 CoreAction::Shutdown => break,
200 }
201 }
202 accept_result = listener.accept() => {
203 match accept_result {
204 Ok((tcpstream, client_socket_addr)) => {
205 let clone_tls_acceptor = tls_acceptor.clone();
206 tokio::spawn(client_tls_accept(tcpstream, clone_tls_acceptor, client_socket_addr, qe_r_ref, trusted_proxy_v2_ips.clone()));
207 }
208 Err(err) => {
209 warn!(?err, "LDAP acceptor error, continuing");
210 }
211 }
212 }
213 Some(mut new_tls_acceptor) = tls_acceptor_reload_rx.recv() => {
214 std::mem::swap(&mut tls_acceptor, &mut new_tls_acceptor);
215 info!("Reloaded ldap tls acceptor");
216 }
217 }
218 }
219 info!("Stopped {}", super::TaskName::LdapActor);
220}
221
222async fn ldap_plaintext_acceptor(
224 listener: TcpListener,
225 qe_r_ref: &'static QueryServerReadV1,
226 mut rx: broadcast::Receiver<CoreAction>,
227) {
228 loop {
229 tokio::select! {
230 Ok(action) = rx.recv() => {
231 match action {
232 CoreAction::Shutdown => break,
233 }
234 }
235 accept_result = listener.accept() => {
236 match accept_result {
237 Ok((tcpstream, client_socket_addr)) => {
238 tokio::spawn(client_process(tcpstream, client_socket_addr, client_socket_addr, qe_r_ref));
239 }
240 Err(e) => {
241 error!("LDAP acceptor error, continuing -> {:?}", e);
242 }
243 }
244 }
245 }
246 }
247 info!("Stopped {}", super::TaskName::LdapActor);
248}
249
250pub(crate) async fn create_ldap_server(
251 address: &str,
252 opt_ssl_acceptor: Option<SslAcceptor>,
253 qe_r_ref: &'static QueryServerReadV1,
254 rx: broadcast::Receiver<CoreAction>,
255 tls_acceptor_reload_rx: mpsc::Receiver<SslAcceptor>,
256 trusted_proxy_v2_ips: Option<Vec<IpCidr>>,
257) -> Result<tokio::task::JoinHandle<()>, ()> {
258 if address.starts_with(":::") {
259 let port = address.replacen(":::", "", 1);
261 error!("Address '{}' looks like an attempt to wildcard bind with IPv6 on port {} - please try using ldapbindaddress = '[::]:{}'", address, port, port);
262 };
263
264 let addr = SocketAddr::from_str(address).map_err(|e| {
265 error!("Could not parse LDAP server address {} -> {:?}", address, e);
266 })?;
267
268 let listener = TcpListener::bind(&addr).await.map_err(|e| {
269 error!(
270 "Could not bind to LDAP server address {} -> {:?}",
271 address, e
272 );
273 })?;
274
275 let trusted_proxy_v2_ips = trusted_proxy_v2_ips.map(Arc::new);
276
277 let ldap_acceptor_handle = match opt_ssl_acceptor {
278 Some(ssl_acceptor) => {
279 info!("Starting LDAPS interface ldaps://{} ...", address);
280
281 tokio::spawn(ldap_tls_acceptor(
282 listener,
283 ssl_acceptor,
284 qe_r_ref,
285 rx,
286 tls_acceptor_reload_rx,
287 trusted_proxy_v2_ips,
288 ))
289 }
290 None => tokio::spawn(ldap_plaintext_acceptor(listener, qe_r_ref, rx)),
291 };
292
293 info!("Created LDAP interface");
294 Ok(ldap_acceptor_handle)
295}