kanidmd_core/https/
mod.rs

1use self::extractors::ClientConnInfo;
2use self::javascript::*;
3use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
4use crate::config::{AddressSet, Configuration, ServerRole};
5use crate::CoreAction;
6use axum::{
7    body::Body,
8    extract::connect_info::IntoMakeServiceWithConnectInfo,
9    http::{HeaderMap, HeaderValue, Request, StatusCode},
10    middleware::{from_fn, from_fn_with_state},
11    response::{IntoResponse, Redirect, Response},
12    routing::*,
13    Router,
14};
15use axum_extra::extract::cookie::CookieJar;
16use cidr::IpCidr;
17use compact_jwt::{error::JwtError, JwsCompact, JwsHs256Signer, JwsVerifier};
18use futures::pin_mut;
19use haproxy_protocol::{ProxyHdrV2, RemoteAddress};
20use hyper::body::Incoming;
21use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
22use kanidm_lib_crypto::x509_cert::{der::Decode, x509_public_key_s256, Certificate};
23use kanidm_proto::{constants::KSESSIONID, internal::COOKIE_AUTH_SESSION_ID};
24use kanidmd_lib::{idm::authentication::ClientCertInfo, status::StatusActor};
25use serde::de::DeserializeOwned;
26use sketching::*;
27use std::fmt::Write;
28use std::io::ErrorKind;
29use std::path::PathBuf;
30use std::sync::Arc;
31use std::time::Duration;
32use std::{
33    net::{IpAddr, SocketAddr},
34    str::FromStr,
35};
36use tokio::{
37    io::{AsyncRead, AsyncReadExt, AsyncWrite},
38    net::{TcpListener, TcpStream},
39    sync::broadcast,
40    sync::mpsc,
41    task,
42    time::timeout,
43};
44use tokio_rustls::TlsAcceptor;
45use tower::Service;
46use tower_http::{services::ServeDir, timeout::TimeoutLayer, trace::TraceLayer};
47use url::Url;
48use uuid::Uuid;
49
50const HTTPS_CLIENT_CONN_TIMEOUT: Duration = Duration::from_secs(30);
51const HTTPS_CLIENT_IO_TIMEOUT: Duration = Duration::from_secs(60);
52const HTTPS_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
53
54mod apidocs;
55pub(crate) mod cache_buster;
56pub(crate) mod errors;
57mod extractors;
58mod generic;
59mod javascript;
60mod manifest;
61pub(crate) mod middleware;
62mod oauth2;
63pub(crate) mod trace;
64mod v1;
65mod v1_domain;
66mod v1_oauth2;
67mod v1_scim;
68mod views;
69
70#[derive(Clone)]
71pub struct ServerState {
72    pub(crate) status_ref: &'static StatusActor,
73    pub(crate) qe_w_ref: &'static QueryServerWriteV1,
74    pub(crate) qe_r_ref: &'static QueryServerReadV1,
75    // Store the token management parts.
76    pub(crate) jws_signer: JwsHs256Signer,
77    pub(crate) trust_x_forward_for_ips: Option<Arc<AddressSet>>,
78    pub(crate) csp_header: HeaderValue,
79    pub(crate) csp_header_no_form_action: HeaderValue,
80    pub(crate) origin: Url,
81    pub(crate) domain: String,
82    // This is set to true by default, and is only false on integration tests.
83    pub(crate) secure_cookies: bool,
84}
85
86impl ServerState {
87    /// Deserialize some input string validating that it was signed by our instance's
88    /// HMAC signer. This is used for short lived server-only sessions and context
89    /// data. This has applications in both accessing cookie content and header content.
90    fn deserialise_from_str<T: DeserializeOwned>(&self, input: &str) -> Option<T> {
91        match JwsCompact::from_str(input) {
92            Ok(val) => match self.jws_signer.verify(&val) {
93                Ok(val) => val.from_json::<T>().ok(),
94                Err(err) => {
95                    error!(?err, "Failed to deserialise JWT from request");
96                    if matches!(err, JwtError::InvalidSignature) {
97                        // The server has an ephemeral in memory HMAC signer. This is important as
98                        // auth (login) sessions on one node shouldn't validate on another. Sessions
99                        // that are shared between nodes use the internal ECDSA signer.
100                        //
101                        // But because of this if the server restarts it rolls the key. Additionally
102                        // it can occur if the load balancer isn't sticking sessions to the correct
103                        // node. That can cause this error. So we want to specifically call it out
104                        // to admins so they can investigate that the fault is occurring *outside*
105                        // of kanidm.
106                        warn!("Invalid Signature errors can occur if your instance restarted recently, if a load balancer is not configured for sticky sessions, or a session was tampered with.");
107                    }
108                    None
109                }
110            },
111            Err(_) => None,
112        }
113    }
114
115    #[instrument(level = "trace", skip_all)]
116    fn get_current_auth_session_id(&self, headers: &HeaderMap, jar: &CookieJar) -> Option<Uuid> {
117        // We see if there is a signed header copy first.
118        headers
119            .get(KSESSIONID)
120            .and_then(|hv| {
121                trace!("trying header");
122                // Get the first header value.
123                hv.to_str().ok()
124            })
125            .or_else(|| {
126                trace!("trying cookie");
127                jar.get(COOKIE_AUTH_SESSION_ID).map(|c| c.value())
128            })
129            .and_then(|s| {
130                trace!(id_jws = %s);
131                self.deserialise_from_str::<Uuid>(s)
132            })
133    }
134}
135
136pub(crate) fn get_js_files(role: ServerRole) -> Result<Vec<JavaScriptFile>, ()> {
137    let mut all_pages: Vec<JavaScriptFile> = Vec::new();
138
139    if !matches!(role, ServerRole::WriteReplicaNoUI) {
140        // let's set up the list of js module hashes
141        let pkg_path = env!("KANIDM_SERVER_UI_PKG_PATH").to_owned();
142
143        let filelist = [
144            "external/bootstrap.bundle.min.js",
145            "external/htmx.min.1.9.12.js",
146            "external/confetti.js",
147            "external/base64.js",
148            "modules/cred_update.mjs",
149            "pkhtml.js",
150            "style.js",
151        ];
152
153        for filepath in filelist {
154            match generate_integrity_hash(format!("{pkg_path}/{filepath}",)) {
155                Ok(hash) => {
156                    debug!("Integrity hash for {}: {}", filepath, hash);
157                    let js = JavaScriptFile { hash };
158                    all_pages.push(js)
159                }
160                Err(err) => {
161                    admin_error!(
162                        ?err,
163                        "Failed to generate integrity hash for {} - cancelling startup!",
164                        filepath
165                    );
166                    return Err(());
167                }
168            }
169        }
170    }
171    Ok(all_pages)
172}
173
174async fn handler_404() -> Response {
175    (StatusCode::NOT_FOUND, "Route not found").into_response()
176}
177
178pub async fn create_https_server(
179    config: Configuration,
180    jws_signer: JwsHs256Signer,
181    status_ref: &'static StatusActor,
182    qe_w_ref: &'static QueryServerWriteV1,
183    qe_r_ref: &'static QueryServerReadV1,
184    server_message_tx: broadcast::Sender<CoreAction>,
185    maybe_tls_acceptor: Option<TlsAcceptor>,
186    tls_acceptor_reload_rx: mpsc::Receiver<TlsAcceptor>,
187) -> Result<task::JoinHandle<()>, ()> {
188    let rx = server_message_tx.subscribe();
189
190    let all_js_files = get_js_files(config.role)?;
191    // set up the CSP headers
192    // script-src 'self'
193    //      'sha384-Zao7ExRXVZOJobzS/uMp0P1jtJz3TTqJU4nYXkdmsjpiVD+/wcwCyX7FGqRIqvIz'
194    //      'sha384-MrcW6ZMFYlzcLA8Nl+NtUVF0sA7MsXsP1UyJoMp4YLEuNSfAP+JcXn/tWtIaxVXM';
195
196    let js_directives = all_js_files
197        .into_iter()
198        .map(|f| f.hash)
199        .collect::<Vec<String>>();
200
201    let js_checksums: String = js_directives
202        .iter()
203        .fold(String::new(), |mut output, value| {
204            let _ = write!(output, " 'sha384-{value}'");
205            output
206        });
207
208    let csp_header = format!(
209        concat!(
210            "default-src 'self'; ",
211            "base-uri 'self' https:; ",
212            "form-action 'self'; ",
213            "frame-ancestors 'none'; ",
214            "img-src 'self' data:; ",
215            "worker-src 'none'; ",
216            "script-src 'self' 'unsafe-eval'{};",
217        ),
218        js_checksums
219    );
220
221    let csp_header = HeaderValue::from_str(&csp_header).map_err(|err| {
222        error!(?err, "Unable to generate content security policy");
223    })?;
224
225    // Omit form action - form action is interpreted by chrome to also control valid
226    // redirect targets on submit. This breaks oauth2 in many cases.
227    //
228    // Normally this would be considered BAD to remove a CSP control to make Oauth2 work
229    // but we need to consider the primary attack form-action protects from - open redirectors
230    // in the form submission. Since the paths that use this header do NOT have open
231    // redirectors, we are safe to remove the form-action directive.
232    let csp_header_no_form_action = format!(
233        concat!(
234            "default-src 'self'; ",
235            "base-uri 'self' https:; ",
236            "frame-ancestors 'none'; ",
237            "img-src 'self' data:; ",
238            "worker-src 'none'; ",
239            "script-src 'self' 'unsafe-eval'{};",
240        ),
241        js_checksums
242    );
243
244    let csp_header_no_form_action =
245        HeaderValue::from_str(&csp_header_no_form_action).map_err(|err| {
246            error!(
247                ?err,
248                "Unable to generate content security policy with no form action"
249            );
250        })?;
251
252    let trust_x_forward_for_ips = config
253        .http_client_address_info
254        .trusted_x_forward_for()
255        .map(Arc::new);
256
257    let trusted_proxy_v2_ips = config
258        .http_client_address_info
259        .trusted_proxy_v2()
260        .map(Arc::new);
261
262    let state = ServerState {
263        status_ref,
264        qe_w_ref,
265        qe_r_ref,
266        jws_signer,
267        trust_x_forward_for_ips,
268        csp_header,
269        csp_header_no_form_action,
270        origin: config.origin,
271        domain: config.domain.clone(),
272        secure_cookies: config.integration_test_config.is_none(),
273    };
274
275    let static_routes = match config.role {
276        ServerRole::WriteReplica | ServerRole::ReadOnlyReplica => {
277            Router::new()
278                .route("/ui/images/oauth2/{rs_name}", get(oauth2::oauth2_image_get))
279                .route("/ui/images/domain", get(v1_domain::image_get))
280                .route("/manifest.webmanifest", get(manifest::manifest)) // skip_route_check
281                // Layers only apply to routes that are *already* added, not the ones
282                // added after.
283                .layer(middleware::compression::new())
284                .layer(from_fn(middleware::caching::cache_me_short))
285                .route("/", get(|| async { Redirect::to("/ui") }))
286                .nest("/ui", views::view_router(state.clone()))
287            // Can't compress on anything that changes
288        }
289        ServerRole::WriteReplicaNoUI => Router::new(),
290    };
291    let app = Router::new()
292        .merge(oauth2::route_setup(state.clone()))
293        .merge(v1_scim::route_setup())
294        .merge(v1::route_setup(state.clone()))
295        .route("/robots.txt", get(generic::robots_txt))
296        .route(
297            views::constants::Urls::WellKnownChangePassword.as_ref(),
298            get(generic::redirect_to_update_credentials),
299        );
300
301    let app = match config.role {
302        ServerRole::WriteReplicaNoUI => app,
303        ServerRole::WriteReplica | ServerRole::ReadOnlyReplica => {
304            let pkg_path = PathBuf::from(env!("KANIDM_SERVER_UI_PKG_PATH"));
305            if !pkg_path.exists() {
306                eprintln!(
307                    "Couldn't find htmx UI package path: ({}), quitting.",
308                    env!("KANIDM_SERVER_UI_PKG_PATH")
309                );
310                std::process::exit(1);
311            }
312            let pkg_router = Router::new()
313                .nest_service("/pkg", ServeDir::new(pkg_path))
314                // TODO: Add in the br precompress
315                .layer(from_fn(middleware::caching::cache_me_short));
316
317            app.merge(pkg_router)
318        }
319    };
320
321    // this sets up the default span which logs the URL etc.
322    let trace_layer = TraceLayer::new_for_http()
323        .make_span_with(trace::DefaultMakeSpanKanidmd::new())
324        // setting these to trace because all they do is print "started processing request", and we are already doing that enough!
325        .on_response(trace::DefaultOnResponseKanidmd::new());
326
327    let app = app
328        .merge(static_routes)
329        .layer(from_fn_with_state(
330            state.clone(),
331            middleware::security_headers::security_headers_layer,
332        ))
333        .layer(from_fn(middleware::version_middleware))
334        .layer(from_fn(
335            middleware::hsts_header::strict_transport_security_layer,
336        ));
337
338    // layer which checks the responses have a content-type of JSON when we're in debug mode
339    #[cfg(any(test, debug_assertions))]
340    let app = app.layer(from_fn(middleware::are_we_json_yet));
341
342    let app = app
343        .route("/status", get(generic::status))
344        // 404 handler
345        .fallback(handler_404)
346        // This must be the LAST middleware.
347        // This is because the last middleware here is the first to be entered and the last
348        // to be exited, and this middleware sets up ids' and other bits for for logging
349        // coherence to be maintained.
350        .layer(from_fn_with_state(
351            state.clone(),
352            middleware::ip_address_middleware,
353        ))
354        .layer(from_fn(middleware::kopid_middleware))
355        .merge(apidocs::router())
356        // Apply Request Timeouts
357        .layer(TimeoutLayer::new(HTTPS_CLIENT_REQUEST_TIMEOUT))
358        // this MUST be the last layer before with_state else the span never starts and everything breaks.
359        .layer(trace_layer)
360        .with_state(state)
361        // the connect_info bit here lets us pick up the remote address of the client
362        .into_make_service_with_connect_info::<ClientConnInfo>();
363
364    let addr = SocketAddr::from_str(&config.address).map_err(|err| {
365        error!(
366            "Failed to parse address ({:?}) from config: {:?}",
367            config.address, err
368        );
369    })?;
370
371    info!("Starting the web server...");
372
373    let listener = match TcpListener::bind(addr).await {
374        Ok(l) => l,
375        Err(err) => {
376            error!(?err, "Failed to bind tcp listener");
377            return Err(());
378        }
379    };
380
381    match maybe_tls_acceptor {
382        Some(tls_acceptor) => Ok(task::spawn(server_tls_loop(
383            tls_acceptor,
384            listener,
385            app,
386            rx,
387            server_message_tx,
388            tls_acceptor_reload_rx,
389            trusted_proxy_v2_ips,
390        ))),
391        None => Ok(task::spawn(server_plaintext_loop(
392            listener,
393            app,
394            rx,
395            trusted_proxy_v2_ips,
396        ))),
397    }
398}
399
400async fn server_tls_loop(
401    mut tls_acceptor: TlsAcceptor,
402    listener: TcpListener,
403    app: IntoMakeServiceWithConnectInfo<Router, ClientConnInfo>,
404    mut rx: broadcast::Receiver<CoreAction>,
405    server_message_tx: broadcast::Sender<CoreAction>,
406    mut tls_acceptor_reload_rx: mpsc::Receiver<TlsAcceptor>,
407    trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
408) {
409    pin_mut!(listener);
410
411    loop {
412        tokio::select! {
413            Ok(action) = rx.recv() => {
414                match action {
415                    CoreAction::Shutdown => break,
416                }
417            }
418            accept = listener.accept() => {
419                match accept {
420                    Ok((stream, addr)) => {
421                        let tls_acceptor = tls_acceptor.clone();
422                        let app = app.clone();
423                        task::spawn(handle_tls_conn(tls_acceptor, stream, app, addr, trusted_proxy_v2_ips.clone()));
424                    }
425                    Err(err) => {
426                        error!("Web server exited with {:?}", err);
427                        if let Err(err) = server_message_tx.send(CoreAction::Shutdown) {
428                            error!("Web server failed to send shutdown message! {:?}", err)
429                        };
430                        break;
431                    }
432                }
433            }
434            Some(mut new_tls_acceptor) = tls_acceptor_reload_rx.recv() => {
435                std::mem::swap(&mut tls_acceptor, &mut new_tls_acceptor);
436                info!("Reloaded http tls acceptor");
437            }
438        }
439    }
440
441    info!("Stopped {}", super::TaskName::HttpsServer);
442}
443
444async fn server_plaintext_loop(
445    listener: TcpListener,
446    app: IntoMakeServiceWithConnectInfo<Router, ClientConnInfo>,
447    mut rx: broadcast::Receiver<CoreAction>,
448    trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
449) {
450    pin_mut!(listener);
451
452    loop {
453        tokio::select! {
454            Ok(action) = rx.recv() => {
455                match action {
456                    CoreAction::Shutdown => break,
457                }
458            }
459            accept = listener.accept() => {
460                match accept {
461                    Ok((stream, addr)) => {
462                        let app = app.clone();
463                        task::spawn(handle_conn(stream, app, addr, trusted_proxy_v2_ips.clone()));
464                    }
465                    Err(err) => {
466                        error!("Web server exited with {:?}", err);
467                        break;
468                    }
469                }
470            }
471        }
472    }
473
474    info!("Stopped {}", super::TaskName::HttpsServer);
475}
476
477/// This handles an individual connection.
478pub(crate) async fn handle_conn(
479    stream: TcpStream,
480    app: IntoMakeServiceWithConnectInfo<Router, ClientConnInfo>,
481    connection_addr: SocketAddr,
482    trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
483) -> Result<(), std::io::Error> {
484    let (stream, client_ip_addr) =
485        process_client_addr(stream, connection_addr, trusted_proxy_v2_ips).await?;
486
487    let client_conn_info = ClientConnInfo {
488        connection_addr,
489        client_ip_addr,
490        client_cert: None,
491    };
492
493    // Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use tokio.
494    // `TokioIo` converts between them.
495    let stream = TokioIo::new(stream);
496
497    process_client_hyper(stream, app, client_conn_info).await
498}
499
500/// This handles an individual connection.
501pub(crate) async fn handle_tls_conn(
502    acceptor: TlsAcceptor,
503    stream: TcpStream,
504    app: IntoMakeServiceWithConnectInfo<Router, ClientConnInfo>,
505    connection_addr: SocketAddr,
506    trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
507) -> Result<(), std::io::Error> {
508    let (mut stream, client_ip_addr) =
509        process_client_addr(stream, connection_addr, trusted_proxy_v2_ips).await?;
510
511    // Don't both starting to build anything until there is actually something to do.
512    // This is pretty common with "health checks" that open a connection and then just
513    // quit.
514    let mut zero_buf: [u8; 0] = [];
515    match timeout(HTTPS_CLIENT_CONN_TIMEOUT, stream.read(&mut zero_buf)).await {
516        Ok(Ok(_)) => {}
517        Ok(Err(err)) => {
518            debug!(?err, "Connection closed before we recieved initial data");
519            return Err(std::io::Error::from(ErrorKind::ConnectionAborted));
520        }
521        Err(_) => {
522            error!("Timeout waiting for initial data");
523            return Err(std::io::Error::from(ErrorKind::TimedOut));
524        }
525    };
526
527    let tls_stream = match timeout(HTTPS_CLIENT_CONN_TIMEOUT, acceptor.accept(stream)).await {
528        Ok(Ok(tls_stream)) => tls_stream,
529        Ok(Err(err)) => {
530            error!(?err, "Failed to create TLS stream");
531            return Err(std::io::Error::from(ErrorKind::ConnectionAborted));
532        }
533        Err(_) => {
534            error!("Timeout creating TLS stream");
535            return Err(std::io::Error::from(ErrorKind::TimedOut));
536        }
537    };
538
539    let maybe_peer_cert = tls_stream
540        .get_ref()
541        .1
542        .peer_certificates()
543        // The first certificate relates to the peer.
544        .and_then(|peer_certs| peer_certs.first());
545
546    // Process the client cert (if any)
547    let client_cert = if let Some(peer_cert) = maybe_peer_cert {
548        // We don't need to check the CRL here - it's already completed as part of the
549        // TLS connection establishment process.
550
551        // Extract the cert from rustls DER to x509-cert which is a better
552        // parser to handle the various extensions.
553        let certificate = Certificate::from_der(peer_cert).map_err(|ossl_err| {
554            error!(?ossl_err, "unable to process DER certificate to x509");
555            std::io::Error::from(ErrorKind::ConnectionAborted)
556        })?;
557
558        let public_key_s256 = x509_public_key_s256(&certificate).ok_or_else(|| {
559            error!("subject public key bitstring is not octet aligned");
560            std::io::Error::from(ErrorKind::ConnectionAborted)
561        })?;
562
563        Some(ClientCertInfo {
564            public_key_s256,
565            certificate,
566        })
567    } else {
568        None
569    };
570
571    let client_conn_info = ClientConnInfo {
572        connection_addr,
573        client_ip_addr,
574        client_cert,
575    };
576
577    // Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use tokio.
578    // `TokioIo` converts between them.
579    let stream = TokioIo::new(tls_stream);
580
581    process_client_hyper(stream, app, client_conn_info).await
582}
583
584async fn process_client_addr(
585    stream: TcpStream,
586    connection_addr: SocketAddr,
587    trusted_proxy_v2_ips: Option<Arc<Vec<IpCidr>>>,
588) -> Result<(TcpStream, IpAddr), std::io::Error> {
589    let enable_proxy_v2_hdr = trusted_proxy_v2_ips
590        .map(|trusted| {
591            trusted
592                .iter()
593                .any(|ip_cidr| ip_cidr.contains(&connection_addr.ip().to_canonical()))
594        })
595        .unwrap_or_default();
596
597    let (stream, client_addr) = if enable_proxy_v2_hdr {
598        match timeout(
599            HTTPS_CLIENT_CONN_TIMEOUT,
600            ProxyHdrV2::parse_from_read(stream),
601        )
602        .await
603        {
604            Ok(Ok((stream, hdr))) => {
605                let remote_socket_addr = match hdr.to_remote_addr() {
606                    RemoteAddress::Local => {
607                        debug!("PROXY protocol liveness check - will not contain client data");
608                        // This is a check from the proxy, so just use the connection address.
609                        connection_addr
610                    }
611                    RemoteAddress::TcpV4 { src, dst: _ } => SocketAddr::from(src),
612                    RemoteAddress::TcpV6 { src, dst: _ } => SocketAddr::from(src),
613                    remote_addr => {
614                        error!(?remote_addr, "remote address in proxy header is invalid");
615                        return Err(std::io::Error::from(ErrorKind::ConnectionAborted));
616                    }
617                };
618
619                (stream, remote_socket_addr)
620            }
621            Ok(Err(err)) => {
622                error!(?connection_addr, ?err, "Unable to process proxy v2 header");
623                return Err(std::io::Error::from(ErrorKind::ConnectionAborted));
624            }
625            Err(_) => {
626                error!(?connection_addr, "Timeout receiving proxy v2 header");
627                return Err(std::io::Error::from(ErrorKind::TimedOut));
628            }
629        }
630    } else {
631        (stream, connection_addr)
632    };
633
634    Ok((stream, client_addr.ip()))
635}
636
637async fn process_client_hyper<T>(
638    mut stream: TokioIo<T>,
639    mut app: IntoMakeServiceWithConnectInfo<Router, ClientConnInfo>,
640    client_conn_info: ClientConnInfo,
641) -> Result<(), std::io::Error>
642where
643    T: AsyncRead + AsyncWrite + std::marker::Unpin + std::marker::Send + 'static,
644{
645    debug!(?client_conn_info);
646    // Don't both starting to build anything until there is actually something to do.
647    let mut zero_buf: [u8; 0] = [];
648    match timeout(
649        HTTPS_CLIENT_CONN_TIMEOUT,
650        stream.inner_mut().read(&mut zero_buf),
651    )
652    .await
653    {
654        Ok(Ok(_)) => {}
655        Ok(Err(err)) => {
656            debug!(
657                ?err,
658                "connection was closed before initial data could be sent"
659            );
660            return Err(std::io::Error::from(ErrorKind::ConnectionAborted));
661        }
662        Err(_) => {
663            error!("connection timed out waiting for initial request data");
664            return Err(std::io::Error::from(ErrorKind::TimedOut));
665        }
666    };
667
668    let svc = tower::MakeService::<ClientConnInfo, hyper::Request<Body>>::make_service(
669        &mut app,
670        client_conn_info,
671    );
672
673    let svc = svc.await.map_err(|e| {
674        error!("Failed to build HTTP response: {:?}", e);
675        std::io::Error::from(ErrorKind::Other)
676    })?;
677
678    // Hyper also has its own `Service` trait and doesn't use tower. We can use
679    // `hyper::service::service_fn` to create a hyper `Service` that calls our app through
680    // `tower::Service::call`.
681    let hyper_service = hyper::service::service_fn(move |request: Request<Incoming>| {
682        // We have to clone `tower_service` because hyper's `Service` uses `&self` whereas
683        // tower's `Service` requires `&mut self`.
684        //
685        // We don't need to call `poll_ready` since `Router` is always ready.
686        svc.clone().call(request)
687    });
688
689    let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
690
691    builder
692        .http1()
693        .timer(TokioTimer::new())
694        .header_read_timeout(HTTPS_CLIENT_IO_TIMEOUT);
695
696    builder
697        .http2()
698        .timer(TokioTimer::new())
699        .keep_alive_timeout(HTTPS_CLIENT_IO_TIMEOUT)
700        .keep_alive_interval(HTTPS_CLIENT_IO_TIMEOUT);
701
702    builder
703        .serve_connection_with_upgrades(stream, hyper_service)
704        .await
705        .map_err(|e| {
706            debug!("Failed to complete connection: {:?}", e);
707            std::io::Error::from(ErrorKind::ConnectionAborted)
708        })
709}