1use crate::actors::QueryServerReadV1;
2use crate::CoreAction;
3use cidr::IpCidr;
4use futures_util::sink::SinkExt;
5use futures_util::stream::StreamExt;
6use haproxy_protocol::{ProxyHdrV2, RemoteAddress};
7use kanidmd_lib::idm::ldap::{LdapBoundToken, LdapResponseState};
8use kanidmd_lib::prelude::*;
9use ldap3_proto::proto::LdapMsg;
10use ldap3_proto::LdapCodec;
11use std::net::SocketAddr;
12use std::str::FromStr;
13use std::sync::Arc;
14use tokio::io::{AsyncRead, AsyncWrite};
15use tokio::net::{TcpListener, TcpStream};
16use tokio::sync::broadcast;
17use tokio::sync::mpsc;
18use tokio_rustls::TlsAcceptor;
19use tokio_util::codec::{FramedRead, FramedWrite};
20
21struct LdapSession {
22 uat: Option<LdapBoundToken>,
23}
24
25impl LdapSession {
26 fn new() -> Self {
27 LdapSession {
28 uat: None,
30 }
31 }
32}
33
34#[instrument(name = "ldap-request", skip(client_address, qe_r_ref))]
35async fn client_process_msg(
36 uat: Option<LdapBoundToken>,
37 client_address: SocketAddr,
38 protomsg: LdapMsg,
39 qe_r_ref: &'static QueryServerReadV1,
40) -> Option<LdapResponseState> {
41 let eventid = sketching::tracing_forest::id();
42 security_info!(
43 client_ip = %client_address.ip(),
44 client_port = %client_address.port(),
45 "LDAP client"
46 );
47 qe_r_ref
48 .handle_ldaprequest(eventid, protomsg, uat, client_address.ip())
49 .await
50}
51
52async fn client_process<STREAM>(
53 stream: STREAM,
54 client_address: SocketAddr,
55 connection_address: SocketAddr,
56 qe_r_ref: &'static QueryServerReadV1,
57) where
58 STREAM: AsyncRead + AsyncWrite,
59{
60 let (r, w) = tokio::io::split(stream);
61 let mut r = FramedRead::new(r, LdapCodec::default());
62 let mut w = FramedWrite::new(w, LdapCodec::default());
63
64 let mut session = LdapSession::new();
66 while let Some(Ok(protomsg)) = r.next().await {
68 let uat = session.uat.clone();
70 let caddr = client_address;
71
72 debug!(?client_address, ?connection_address);
73
74 match client_process_msg(uat, caddr, protomsg, qe_r_ref).await {
75 Some(LdapResponseState::Unbind) => return,
78 Some(LdapResponseState::Disconnect(rmsg)) => {
79 if w.send(rmsg).await.is_err() {
80 break;
81 }
82 break;
83 }
84 Some(LdapResponseState::Bind(uat, rmsg)) => {
85 session.uat = Some(uat);
86 if w.send(rmsg).await.is_err() {
87 break;
88 }
89 }
90 Some(LdapResponseState::Respond(rmsg)) => {
91 if w.send(rmsg).await.is_err() {
92 break;
93 }
94 }
95 Some(LdapResponseState::MultiPartResponse(v)) => {
96 for rmsg in v.into_iter() {
97 if w.send(rmsg).await.is_err() {
98 break;
99 }
100 }
101 }
102 Some(LdapResponseState::BindMultiPartResponse(uat, v)) => {
103 session.uat = Some(uat);
104 for rmsg in v.into_iter() {
105 if w.send(rmsg).await.is_err() {
106 break;
107 }
108 }
109 }
110 None => {
111 error!("Internal server error");
112 break;
113 }
114 };
115 }
116}
117
118async fn client_tls_accept(
119 stream: TcpStream,
120 tls_acceptor: TlsAcceptor,
121 connection_addr: SocketAddr,
122 qe_r_ref: &'static QueryServerReadV1,
123 trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
124) {
125 let enable_proxy_v2_hdr = trusted_proxy_v2_ips
126 .map(|trusted| {
127 trusted
128 .iter()
129 .any(|ip_cidr| ip_cidr.contains(&connection_addr.ip().to_canonical()))
130 })
131 .unwrap_or_default();
132
133 let (stream, client_addr) = if enable_proxy_v2_hdr {
134 match ProxyHdrV2::parse_from_read(stream).await {
135 Ok((stream, hdr)) => {
136 let remote_socket_addr = match hdr.to_remote_addr() {
137 RemoteAddress::Local => {
138 debug!("PROXY protocol liveness check - will not contain client data");
139 connection_addr
141 }
142 RemoteAddress::TcpV4 { src, dst: _ } => SocketAddr::from(src),
143 RemoteAddress::TcpV6 { src, dst: _ } => SocketAddr::from(src),
144 remote_addr => {
145 error!(?remote_addr, "remote address in proxy header is invalid");
146 return;
147 }
148 };
149
150 (stream, remote_socket_addr)
151 }
152 Err(err) => {
153 error!(?connection_addr, ?err, "Unable to process proxy v2 header");
154 return;
155 }
156 }
157 } else {
158 (stream, connection_addr)
159 };
160
161 let tlsstream = match tls_acceptor.accept(stream).await {
162 Ok(ta) => ta,
163 Err(err) => {
164 error!(?err, %client_addr, %connection_addr, "LDAP TLS setup error");
165 return;
166 }
167 };
168
169 tokio::spawn(client_process(
171 tlsstream,
172 client_addr,
173 connection_addr,
174 qe_r_ref,
175 ));
176}
177
178async fn ldap_tls_acceptor(
180 listener: TcpListener,
181 mut tls_acceptor: TlsAcceptor,
182 qe_r_ref: &'static QueryServerReadV1,
183 mut rx: broadcast::Receiver<CoreAction>,
184 mut tls_acceptor_reload_rx: mpsc::Receiver<TlsAcceptor>,
185 trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
186) {
187 loop {
188 tokio::select! {
189 Ok(action) = rx.recv() => {
190 match action {
191 CoreAction::Shutdown => break,
192 }
193 }
194 accept_result = listener.accept() => {
195 match accept_result {
196 Ok((tcpstream, client_socket_addr)) => {
197 let clone_tls_acceptor = tls_acceptor.clone();
198 tokio::spawn(client_tls_accept(tcpstream, clone_tls_acceptor, client_socket_addr, qe_r_ref, trusted_proxy_v2_ips.clone()));
199 }
200 Err(err) => {
201 warn!(?err, "LDAP acceptor error, continuing");
202 }
203 }
204 }
205 Some(mut new_tls_acceptor) = tls_acceptor_reload_rx.recv() => {
206 std::mem::swap(&mut tls_acceptor, &mut new_tls_acceptor);
207 info!("Reloaded ldap tls acceptor");
208 }
209 }
210 }
211 info!("Stopped {}", super::TaskName::LdapActor);
212}
213
214async fn ldap_plaintext_acceptor(
216 listener: TcpListener,
217 qe_r_ref: &'static QueryServerReadV1,
218 mut rx: broadcast::Receiver<CoreAction>,
219) {
220 loop {
221 tokio::select! {
222 Ok(action) = rx.recv() => {
223 match action {
224 CoreAction::Shutdown => break,
225 }
226 }
227 accept_result = listener.accept() => {
228 match accept_result {
229 Ok((tcpstream, client_socket_addr)) => {
230 tokio::spawn(client_process(tcpstream, client_socket_addr, client_socket_addr, qe_r_ref));
231 }
232 Err(e) => {
233 error!("LDAP acceptor error, continuing -> {:?}", e);
234 }
235 }
236 }
237 }
238 }
239 info!("Stopped {}", super::TaskName::LdapActor);
240}
241
242pub(crate) async fn create_ldap_server(
243 address: &str,
244 opt_ssl_acceptor: Option<TlsAcceptor>,
245 qe_r_ref: &'static QueryServerReadV1,
246 rx: broadcast::Receiver<CoreAction>,
247 tls_acceptor_reload_rx: mpsc::Receiver<TlsAcceptor>,
248 trusted_proxy_v2_ips: Option<Vec<IpCidr>>,
249) -> Result<tokio::task::JoinHandle<()>, ()> {
250 if address.starts_with(":::") {
251 let port = address.replacen(":::", "", 1);
253 error!("Address '{}' looks like an attempt to wildcard bind with IPv6 on port {} - please try using ldapbindaddress = '[::]:{}'", address, port, port);
254 };
255
256 let addr = SocketAddr::from_str(address).map_err(|e| {
257 error!("Could not parse LDAP server address {} -> {:?}", address, e);
258 })?;
259
260 let listener = TcpListener::bind(&addr).await.map_err(|e| {
261 error!(
262 "Could not bind to LDAP server address {} -> {:?}",
263 address, e
264 );
265 })?;
266
267 let trusted_proxy_v2_ips = trusted_proxy_v2_ips.map(Arc::new);
268
269 let ldap_acceptor_handle = match opt_ssl_acceptor {
270 Some(ssl_acceptor) => {
271 info!("Starting LDAPS interface ldaps://{} ...", address);
272
273 tokio::spawn(ldap_tls_acceptor(
274 listener,
275 ssl_acceptor,
276 qe_r_ref,
277 rx,
278 tls_acceptor_reload_rx,
279 trusted_proxy_v2_ips,
280 ))
281 }
282 None => tokio::spawn(ldap_plaintext_acceptor(listener, qe_r_ref, rx)),
283 };
284
285 info!("Created LDAP interface");
286 Ok(ldap_acceptor_handle)
287}