1use crate::actors::QueryServerReadV1;
2use crate::CoreAction;
3use cidr::IpCidr;
4use futures_util::sink::SinkExt;
5use futures_util::stream::StreamExt;
6use haproxy_protocol::{ProxyHdrV2, RemoteAddress};
7use kanidmd_lib::idm::ldap::{LdapBoundToken, LdapResponseState};
8use kanidmd_lib::prelude::*;
9use ldap3_proto::proto::LdapMsg;
10use ldap3_proto::LdapCodec;
11use std::net::SocketAddr;
12use std::str::FromStr;
13use std::sync::Arc;
14use tokio::io::{AsyncRead, AsyncWrite};
15use tokio::net::{TcpListener, TcpStream};
16use tokio::sync::broadcast;
17use tokio::sync::mpsc;
18use tokio_rustls::TlsAcceptor;
19use tokio_util::codec::{FramedRead, FramedWrite};
20
21struct LdapSession {
22 uat: Option<LdapBoundToken>,
23}
24
25impl LdapSession {
26 fn new() -> Self {
27 LdapSession {
28 uat: None,
30 }
31 }
32}
33
34#[instrument(name = "ldap-request", skip(client_address, qe_r_ref))]
35async fn client_process_msg(
36 uat: Option<LdapBoundToken>,
37 client_address: SocketAddr,
38 protomsg: LdapMsg,
39 qe_r_ref: &'static QueryServerReadV1,
40) -> Option<LdapResponseState> {
41 let eventid = sketching::tracing_forest::id();
42 security_info!(
43 client_ip = %client_address.ip(),
44 client_port = %client_address.port(),
45 "LDAP client"
46 );
47 qe_r_ref
48 .handle_ldaprequest(eventid, protomsg, uat, client_address.ip())
49 .await
50}
51
52async fn client_process<STREAM>(
53 stream: STREAM,
54 client_address: SocketAddr,
55 connection_address: SocketAddr,
56 qe_r_ref: &'static QueryServerReadV1,
57) where
58 STREAM: AsyncRead + AsyncWrite,
59{
60 let (r, w) = tokio::io::split(stream);
61 let mut r = FramedRead::new(r, LdapCodec::default());
62 let mut w = FramedWrite::new(w, LdapCodec::default());
63
64 let mut session = LdapSession::new();
66 while let Some(Ok(protomsg)) = r.next().await {
68 let uat = session.uat.clone();
70 let caddr = client_address;
71
72 debug!(?client_address, ?connection_address);
73
74 match client_process_msg(uat, caddr, protomsg, qe_r_ref).await {
75 Some(LdapResponseState::Unbind) => return,
78 Some(LdapResponseState::Disconnect(rmsg)) => {
79 if w.send(rmsg).await.is_err() {
80 break;
81 }
82 break;
83 }
84 Some(LdapResponseState::Bind(uat, rmsg)) => {
85 session.uat = Some(uat);
86 if w.send(rmsg).await.is_err() {
87 break;
88 }
89 }
90 Some(LdapResponseState::Respond(rmsg)) => {
91 if w.send(rmsg).await.is_err() {
92 break;
93 }
94 }
95 Some(LdapResponseState::MultiPartResponse(v)) => {
96 for rmsg in v.into_iter() {
97 if w.send(rmsg).await.is_err() {
98 break;
99 }
100 }
101 }
102 Some(LdapResponseState::BindMultiPartResponse(uat, v)) => {
103 session.uat = Some(uat);
104 for rmsg in v.into_iter() {
105 if w.send(rmsg).await.is_err() {
106 break;
107 }
108 }
109 }
110 None => {
111 error!("Internal server error");
112 break;
113 }
114 };
115 }
116}
117
118async fn client_tls_accept(
119 stream: TcpStream,
120 tls_acceptor: TlsAcceptor,
121 connection_addr: SocketAddr,
122 qe_r_ref: &'static QueryServerReadV1,
123 trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
124) {
125 let enable_proxy_v2_hdr = trusted_proxy_v2_ips
126 .map(|trusted| {
127 trusted
128 .iter()
129 .any(|ip_cidr| ip_cidr.contains(&connection_addr.ip()))
130 })
131 .unwrap_or_default();
132
133 let (stream, client_addr) = if enable_proxy_v2_hdr {
134 match ProxyHdrV2::parse_from_read(stream).await {
135 Ok((stream, hdr)) => {
136 let remote_socket_addr = match hdr.to_remote_addr() {
137 RemoteAddress::Local => {
138 debug!("PROXY protocol liveness check - will not contain client data");
139 return;
140 }
141 RemoteAddress::TcpV4 { src, dst: _ } => SocketAddr::from(src),
142 RemoteAddress::TcpV6 { src, dst: _ } => SocketAddr::from(src),
143 remote_addr => {
144 error!(?remote_addr, "remote address in proxy header is invalid");
145 return;
146 }
147 };
148
149 (stream, remote_socket_addr)
150 }
151 Err(err) => {
152 error!(?connection_addr, ?err, "Unable to process proxy v2 header");
153 return;
154 }
155 }
156 } else {
157 (stream, connection_addr)
158 };
159
160 let tlsstream = match tls_acceptor.accept(stream).await {
161 Ok(ta) => ta,
162 Err(err) => {
163 error!(?err, %client_addr, %connection_addr, "LDAP TLS setup error");
164 return;
165 }
166 };
167
168 tokio::spawn(client_process(
170 tlsstream,
171 client_addr,
172 connection_addr,
173 qe_r_ref,
174 ));
175}
176
177async fn ldap_tls_acceptor(
179 listener: TcpListener,
180 mut tls_acceptor: TlsAcceptor,
181 qe_r_ref: &'static QueryServerReadV1,
182 mut rx: broadcast::Receiver<CoreAction>,
183 mut tls_acceptor_reload_rx: mpsc::Receiver<TlsAcceptor>,
184 trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
185) {
186 loop {
187 tokio::select! {
188 Ok(action) = rx.recv() => {
189 match action {
190 CoreAction::Shutdown => break,
191 }
192 }
193 accept_result = listener.accept() => {
194 match accept_result {
195 Ok((tcpstream, client_socket_addr)) => {
196 let clone_tls_acceptor = tls_acceptor.clone();
197 tokio::spawn(client_tls_accept(tcpstream, clone_tls_acceptor, client_socket_addr, qe_r_ref, trusted_proxy_v2_ips.clone()));
198 }
199 Err(err) => {
200 warn!(?err, "LDAP acceptor error, continuing");
201 }
202 }
203 }
204 Some(mut new_tls_acceptor) = tls_acceptor_reload_rx.recv() => {
205 std::mem::swap(&mut tls_acceptor, &mut new_tls_acceptor);
206 info!("Reloaded ldap tls acceptor");
207 }
208 }
209 }
210 info!("Stopped {}", super::TaskName::LdapActor);
211}
212
213async fn ldap_plaintext_acceptor(
215 listener: TcpListener,
216 qe_r_ref: &'static QueryServerReadV1,
217 mut rx: broadcast::Receiver<CoreAction>,
218) {
219 loop {
220 tokio::select! {
221 Ok(action) = rx.recv() => {
222 match action {
223 CoreAction::Shutdown => break,
224 }
225 }
226 accept_result = listener.accept() => {
227 match accept_result {
228 Ok((tcpstream, client_socket_addr)) => {
229 tokio::spawn(client_process(tcpstream, client_socket_addr, client_socket_addr, qe_r_ref));
230 }
231 Err(e) => {
232 error!("LDAP acceptor error, continuing -> {:?}", e);
233 }
234 }
235 }
236 }
237 }
238 info!("Stopped {}", super::TaskName::LdapActor);
239}
240
241pub(crate) async fn create_ldap_server(
242 address: &str,
243 opt_ssl_acceptor: Option<TlsAcceptor>,
244 qe_r_ref: &'static QueryServerReadV1,
245 rx: broadcast::Receiver<CoreAction>,
246 tls_acceptor_reload_rx: mpsc::Receiver<TlsAcceptor>,
247 trusted_proxy_v2_ips: Option<Vec<IpCidr>>,
248) -> Result<tokio::task::JoinHandle<()>, ()> {
249 if address.starts_with(":::") {
250 let port = address.replacen(":::", "", 1);
252 error!("Address '{}' looks like an attempt to wildcard bind with IPv6 on port {} - please try using ldapbindaddress = '[::]:{}'", address, port, port);
253 };
254
255 let addr = SocketAddr::from_str(address).map_err(|e| {
256 error!("Could not parse LDAP server address {} -> {:?}", address, e);
257 })?;
258
259 let listener = TcpListener::bind(&addr).await.map_err(|e| {
260 error!(
261 "Could not bind to LDAP server address {} -> {:?}",
262 address, e
263 );
264 })?;
265
266 let trusted_proxy_v2_ips = trusted_proxy_v2_ips.map(Arc::new);
267
268 let ldap_acceptor_handle = match opt_ssl_acceptor {
269 Some(ssl_acceptor) => {
270 info!("Starting LDAPS interface ldaps://{} ...", address);
271
272 tokio::spawn(ldap_tls_acceptor(
273 listener,
274 ssl_acceptor,
275 qe_r_ref,
276 rx,
277 tls_acceptor_reload_rx,
278 trusted_proxy_v2_ips,
279 ))
280 }
281 None => tokio::spawn(ldap_plaintext_acceptor(listener, qe_r_ref, rx)),
282 };
283
284 info!("Created LDAP interface");
285 Ok(ldap_acceptor_handle)
286}