kanidmd_core/repl/
mod.rs

1use openssl::{
2    pkey::{PKey, Private},
3    ssl::{Ssl, SslAcceptor, SslConnector, SslMethod, SslVerifyMode},
4    x509::{store::X509StoreBuilder, X509},
5};
6use std::collections::VecDeque;
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::broadcast;
12use tokio::sync::mpsc;
13use tokio::sync::oneshot;
14use tokio::sync::Mutex;
15use tokio::time::{interval, sleep, timeout};
16use tokio::{
17    net::{TcpListener, TcpStream},
18    task::JoinHandle,
19};
20use tokio_openssl::SslStream;
21use tokio_util::codec::{Framed, FramedRead, FramedWrite};
22use tracing::{error, Instrument};
23use url::Url;
24use uuid::Uuid;
25
26use futures_util::sink::SinkExt;
27use futures_util::stream::StreamExt;
28
29use kanidmd_lib::prelude::duration_from_epoch_now;
30use kanidmd_lib::prelude::IdmServer;
31use kanidmd_lib::repl::proto::ConsumerState;
32use kanidmd_lib::server::QueryServerTransaction;
33
34use crate::CoreAction;
35use config::{RepNodeConfig, ReplicationConfiguration};
36
37use self::codec::{ConsumerRequest, SupplierResponse};
38
39mod codec;
40pub(crate) mod config;
41
42pub(crate) enum ReplCtrl {
43    GetCertificate {
44        respond: oneshot::Sender<X509>,
45    },
46    RenewCertificate {
47        respond: oneshot::Sender<bool>,
48    },
49    RefreshConsumer {
50        respond: oneshot::Sender<mpsc::Receiver<()>>,
51    },
52}
53
54#[derive(Debug, Clone)]
55enum ReplConsumerCtrl {
56    Stop,
57    Refresh(Arc<Mutex<(bool, mpsc::Sender<()>)>>),
58}
59
60pub(crate) async fn create_repl_server(
61    idms: Arc<IdmServer>,
62    repl_config: &ReplicationConfiguration,
63    rx: broadcast::Receiver<CoreAction>,
64) -> Result<(tokio::task::JoinHandle<()>, mpsc::Sender<ReplCtrl>), ()> {
65    // We need to start the tcp listener. This will persist over ssl reloads!
66    let listener = TcpListener::bind(&repl_config.bindaddress)
67        .await
68        .map_err(|e| {
69            error!(
70                "Could not bind to replication address {} -> {:?}",
71                repl_config.bindaddress, e
72            );
73        })?;
74
75    // Create the control channel. Use a low msg count, there won't be that much going on.
76    let (ctrl_tx, ctrl_rx) = mpsc::channel(4);
77
78    // We need to start the tcp listener. This will persist over ssl reloads!
79    info!(
80        "Starting replication interface https://{} ...",
81        repl_config.bindaddress
82    );
83    let repl_handle: JoinHandle<()> = tokio::spawn(repl_acceptor(
84        listener,
85        idms,
86        repl_config.clone(),
87        rx,
88        ctrl_rx,
89    ));
90
91    info!("Created replication interface");
92    Ok((repl_handle, ctrl_tx))
93}
94
95#[instrument(level = "info", skip_all)]
96/// This returns the remote address that worked, so you can try that first next time
97async fn repl_consumer_connect_supplier(
98    domain: &str,
99    sock_addrs: &[SocketAddr],
100    tls_connector: &SslConnector,
101    consumer_conn_settings: &ConsumerConnSettings,
102) -> Option<(
103    SocketAddr,
104    Framed<SslStream<TcpStream>, codec::ConsumerCodec>,
105)> {
106    // This is pretty gnarly, but we need to loop to try out each socket addr.
107    for sock_addr in sock_addrs {
108        debug!(
109            "Attempting to connect to {} replica via {}",
110            domain, sock_addr
111        );
112
113        let tcpstream = match timeout(
114            consumer_conn_settings.replica_connect_timeout,
115            TcpStream::connect(sock_addr),
116        )
117        .await
118        {
119            Ok(Ok(tc)) => tc,
120            Ok(Err(err)) => {
121                debug!(?err, "Failed to connect to {}", sock_addr);
122                continue;
123            }
124            Err(_) => {
125                debug!("Timeout connecting to {}", sock_addr);
126                continue;
127            }
128        };
129
130        trace!("Connection established to peer on {:?}", sock_addr);
131
132        let mut tlsstream = match Ssl::new(tls_connector.context())
133            .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
134        {
135            Ok(ta) => ta,
136            Err(e) => {
137                error!("Replication client TLS setup error, continuing -> {:?}", e);
138                continue;
139            }
140        };
141
142        if let Err(e) = SslStream::connect(Pin::new(&mut tlsstream)).await {
143            error!("Replication client TLS accept error, continuing -> {:?}", e);
144            continue;
145        };
146
147        let supplier_conn = Framed::new(
148            tlsstream,
149            codec::ConsumerCodec::new(consumer_conn_settings.max_frame_bytes),
150        );
151        // "hey this one worked, try it first next time!"
152        return Some((sock_addr.to_owned(), supplier_conn));
153    }
154
155    error!(
156        "Unable to connect to supplier, tried to connect to {:?}",
157        sock_addrs
158    );
159    None
160}
161
162/// This returns the socket address that worked, so you can try that first next time
163#[instrument(level="info", skip(refresh_coord, tls_connector, idms), fields(uuid=Uuid::new_v4().to_string()))]
164async fn repl_run_consumer_refresh(
165    refresh_coord: Arc<Mutex<(bool, mpsc::Sender<()>)>>,
166    domain: &str,
167    sock_addrs: &[SocketAddr],
168    tls_connector: &SslConnector,
169    idms: &IdmServer,
170    consumer_conn_settings: &ConsumerConnSettings,
171) -> Result<Option<SocketAddr>, ()> {
172    // Take the refresh lock. Note that every replication consumer *should* end up here
173    // behind this lock, but only one can proceed. This is what we want!
174
175    let mut refresh_coord_guard = refresh_coord.lock().await;
176
177    // Simple case - task is already done.
178    if refresh_coord_guard.0 {
179        trace!("Refresh already completed by another task, return.");
180        return Ok(None);
181    }
182
183    // okay, we need to proceed.
184    let (addr, mut supplier_conn) =
185        repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
186            .await
187            .ok_or(())?;
188
189    // If we fail at any point, just RETURN because this leaves the next task to attempt, or
190    // the channel drops and that tells the caller this failed.
191    supplier_conn
192        .send(ConsumerRequest::Refresh)
193        .await
194        .map_err(|err| error!(?err, "consumer encode error, unable to continue."))?;
195
196    let refresh = if let Some(codec_msg) = supplier_conn.next().await {
197        match codec_msg.map_err(|err| error!(?err, "Consumer decode error, unable to continue."))? {
198            SupplierResponse::Refresh(changes) => {
199                // Success - return to bypass the error message.
200                changes
201            }
202            SupplierResponse::Pong | SupplierResponse::Incremental(_) => {
203                error!("Supplier Response contains invalid State");
204                return Err(());
205            }
206        }
207    } else {
208        error!("Connection closed");
209        return Err(());
210    };
211
212    // Now apply the refresh if possible
213    {
214        // Scope the transaction.
215        let ct = duration_from_epoch_now();
216        idms.proxy_write(ct)
217            .await
218            .and_then(|mut write_txn| {
219                write_txn
220                    .qs_write
221                    .consumer_apply_refresh(refresh)
222                    .and_then(|cs| write_txn.commit().map(|()| cs))
223            })
224            .map_err(|err| error!(?err, "Consumer was not able to apply refresh."))?;
225    }
226
227    // Now mark the refresh as complete AND indicate it to the channel.
228    refresh_coord_guard.0 = true;
229    if refresh_coord_guard.1.send(()).await.is_err() {
230        warn!("Unable to signal to caller that refresh has completed.");
231    }
232
233    // Here the coord guard will drop and every other task proceeds.
234
235    info!("Replication refresh was successful.");
236    Ok(Some(addr))
237}
238
239#[instrument(level="info", skip(tls_connector, idms), fields(eventid=Uuid::new_v4().to_string()))]
240async fn repl_run_consumer(
241    domain: &str,
242    sock_addrs: &[SocketAddr],
243    tls_connector: &SslConnector,
244    automatic_refresh: bool,
245    idms: &IdmServer,
246    consumer_conn_settings: &ConsumerConnSettings,
247) -> Option<SocketAddr> {
248    let (socket_addr, mut supplier_conn) =
249        repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
250            .await?;
251
252    // Perform incremental.
253    let consumer_ruv_range = {
254        let consumer_state = idms
255            .proxy_read()
256            .await
257            .and_then(|mut read_txn| read_txn.qs_read.consumer_get_state());
258        match consumer_state {
259            Ok(ruv_range) => ruv_range,
260            Err(err) => {
261                error!(
262                    ?err,
263                    "consumer ruv range could not be accessed, unable to continue."
264                );
265                return None;
266            }
267        }
268    };
269
270    if let Err(err) = supplier_conn
271        .send(ConsumerRequest::Incremental(consumer_ruv_range))
272        .await
273    {
274        error!(?err, "consumer encode error, unable to continue.");
275        return None;
276    }
277
278    let changes = if let Some(codec_msg) = supplier_conn.next().await {
279        match codec_msg {
280            Ok(SupplierResponse::Incremental(changes)) => {
281                // Success - return to bypass the error message.
282                changes
283            }
284            Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Refresh(_)) => {
285                error!("Supplier Response contains invalid State");
286                return None;
287            }
288            Err(err) => {
289                error!(?err, "consumer decode error, unable to continue.");
290                return None;
291            }
292        }
293    } else {
294        error!("Connection closed");
295        return None;
296    };
297
298    // Now apply the changes if possible
299    let consumer_state = {
300        let ct = duration_from_epoch_now();
301        match idms.proxy_write(ct).await.and_then(|mut write_txn| {
302            write_txn
303                .qs_write
304                .consumer_apply_changes(changes)
305                .and_then(|cs| write_txn.commit().map(|()| cs))
306        }) {
307            Ok(state) => state,
308            Err(err) => {
309                error!(?err, "consumer was not able to apply changes.");
310                return None;
311            }
312        }
313    };
314
315    match consumer_state {
316        ConsumerState::Ok => {
317            info!("Incremental Replication Success");
318            // return to bypass the failure message.
319            return Some(socket_addr);
320        }
321        ConsumerState::RefreshRequired => {
322            if automatic_refresh {
323                warn!("Consumer is out of date and must be refreshed. This will happen *now*.");
324            } else {
325                error!("Consumer is out of date and must be refreshed. You must manually resolve this situation.");
326                return None;
327            };
328        }
329    }
330
331    if let Err(err) = supplier_conn.send(ConsumerRequest::Refresh).await {
332        error!(?err, "consumer encode error, unable to continue.");
333        return None;
334    }
335
336    let refresh = if let Some(codec_msg) = supplier_conn.next().await {
337        match codec_msg {
338            Ok(SupplierResponse::Refresh(changes)) => {
339                // Success - return to bypass the error message.
340                changes
341            }
342            Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Incremental(_)) => {
343                error!("Supplier Response contains invalid State");
344                return None;
345            }
346            Err(err) => {
347                error!(?err, "consumer decode error, unable to continue.");
348                return None;
349            }
350        }
351    } else {
352        error!("Connection closed");
353        return None;
354    };
355
356    // Now apply the refresh if possible
357    let ct = duration_from_epoch_now();
358    if let Err(err) = idms.proxy_write(ct).await.and_then(|mut write_txn| {
359        write_txn
360            .qs_write
361            .consumer_apply_refresh(refresh)
362            .and_then(|cs| write_txn.commit().map(|()| cs))
363    }) {
364        error!(?err, "consumer was not able to apply refresh.");
365        return None;
366    }
367
368    warn!("Replication refresh was successful.");
369    Some(socket_addr)
370}
371
372#[derive(Debug, Clone)]
373struct ConsumerConnSettings {
374    max_frame_bytes: usize,
375    task_poll_interval: Duration,
376    replica_connect_timeout: Duration,
377}
378
379#[allow(clippy::too_many_arguments)]
380async fn repl_task(
381    origin: Url,
382    client_key: PKey<Private>,
383    client_cert: X509,
384    supplier_cert: X509,
385    consumer_conn_settings: ConsumerConnSettings,
386    mut task_rx: broadcast::Receiver<ReplConsumerCtrl>,
387    automatic_refresh: bool,
388    idms: Arc<IdmServer>,
389) {
390    if origin.scheme() != "repl" {
391        error!("Replica origin is not repl:// - refusing to proceed.");
392        return;
393    }
394
395    let domain = match origin.domain() {
396        Some(d) => d,
397        None => {
398            error!("Replica origin does not have a valid domain name, unable to proceed. Perhaps you tried to use an ip address?");
399            return;
400        }
401    };
402
403    // Setup our tls connector.
404    let mut ssl_builder = match SslConnector::builder(SslMethod::tls_client()) {
405        Ok(sb) => sb,
406        Err(err) => {
407            error!(?err, "Unable to configure tls connector");
408            return;
409        }
410    };
411
412    let setup_client_cert = ssl_builder
413        .set_certificate(&client_cert)
414        .and_then(|_| ssl_builder.set_private_key(&client_key))
415        .and_then(|_| ssl_builder.check_private_key());
416    if let Err(err) = setup_client_cert {
417        error!(?err, "Unable to configure client certificate/key");
418        return;
419    }
420
421    // Add the supplier cert.
422    // ⚠️  note that here we need to build a new cert store. This is because
423    // openssl SslConnector adds the default system cert locations with
424    // the call to ::builder and we *don't* want this. We want our certstore
425    // to pin a single certificate!
426    let mut cert_store = match X509StoreBuilder::new() {
427        Ok(csb) => csb,
428        Err(err) => {
429            error!(?err, "Unable to configure certificate store builder.");
430            return;
431        }
432    };
433
434    if let Err(err) = cert_store.add_cert(supplier_cert) {
435        error!(?err, "Unable to add supplier certificate to cert store");
436        return;
437    }
438
439    let cert_store = cert_store.build();
440    ssl_builder.set_cert_store(cert_store);
441
442    // Configure the expected hostname of the remote.
443    let verify_param = ssl_builder.verify_param_mut();
444    if let Err(err) = verify_param.set_host(domain) {
445        error!(?err, "Unable to set domain name for tls peer verification");
446        return;
447    }
448
449    // Assert the expected supplier certificate is correct and has a valid domain san
450    ssl_builder.set_verify(SslVerifyMode::PEER);
451    let tls_connector = ssl_builder.build();
452
453    let mut repl_interval = interval(consumer_conn_settings.task_poll_interval);
454
455    info!("Replica task for {} has started.", origin);
456
457    // we keep track of the "last known good" socketaddr so we can try that first next time.
458    let mut last_working_address: Option<SocketAddr> = None;
459
460    // Okay, all the parameters are set up. Now we replicate on our interval.
461    loop {
462        // we resolve the DNS entry to the ip:port each time we attempt a connection to avoid stale
463        // DNS issues, ref #3188. If we are unable to resolve the address, we backoff and try again
464        // as in something like docker the address may change frequently.
465        //
466        // Note, if DNS isn't available, we can proceed with the last used working address too. This
467        // prevents DNS (or lack thereof) from causing a replication outage.
468        let mut sorted_socket_addrs = vec![];
469
470        // If the target address worked last time, then let's use it this time!
471        if let Some(addr) = last_working_address {
472            debug!(?last_working_address);
473            sorted_socket_addrs.push(addr);
474        };
475
476        // Default to port 443 if not set in the origin
477        match origin.socket_addrs(|| Some(443)) {
478            Ok(mut socket_addrs) => {
479                // Make every address unique.
480                socket_addrs.sort_unstable();
481                socket_addrs.dedup();
482
483                // The only possible conflict is with the last working address,
484                // so lets just check that.
485                socket_addrs.into_iter().for_each(|addr| {
486                    if Some(&addr) != last_working_address.as_ref() {
487                        // Not already present, append
488                        sorted_socket_addrs.push(addr);
489                    }
490                });
491            }
492            Err(err) => {
493                if let Some(addr) = last_working_address {
494                    warn!(
495                        ?err,
496                        "Unable to resolve '{origin}' to ip:port, using last known working address '{addr}'"
497                    );
498                } else {
499                    warn!(?err, "Unable to resolve '{origin}' to ip:port.");
500                }
501            }
502        };
503
504        if sorted_socket_addrs.is_empty() {
505            warn!(
506                "No replication addresses available, delaying replication operation for '{origin}'"
507            );
508            repl_interval.tick().await;
509            continue;
510        }
511
512        tokio::select! {
513            Ok(task) = task_rx.recv() => {
514                match task {
515                    ReplConsumerCtrl::Stop => break,
516                    ReplConsumerCtrl::Refresh ( refresh_coord ) => {
517                        last_working_address = (repl_run_consumer_refresh(
518                            refresh_coord,
519                            domain,
520                            &sorted_socket_addrs,
521                            &tls_connector,
522                            &idms,
523                            &consumer_conn_settings
524                        )
525                        .await).unwrap_or_default();
526                    }
527                }
528            }
529            _ = repl_interval.tick() => {
530                // Interval passed, attempt a replication run.
531                repl_run_consumer(
532                    domain,
533                    &sorted_socket_addrs,
534                    &tls_connector,
535                    automatic_refresh,
536                    &idms,
537                    &consumer_conn_settings
538                )
539                .await;
540            }
541        }
542    }
543
544    info!("Replica task for {} has stopped.", origin);
545}
546
547#[instrument(level = "info", skip_all)]
548async fn handle_repl_conn(
549    max_frame_bytes: usize,
550    tcpstream: TcpStream,
551    client_address: SocketAddr,
552    tls_parms: SslAcceptor,
553    idms: Arc<IdmServer>,
554) {
555    debug!(?client_address, "replication client connected 🛫");
556
557    let mut tlsstream = match Ssl::new(tls_parms.context())
558        .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
559    {
560        Ok(ta) => ta,
561        Err(err) => {
562            error!(?err, "Replication TLS setup error, disconnecting client");
563            return;
564        }
565    };
566    if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
567        error!(?err, "Replication TLS accept error, disconnecting client");
568        return;
569    };
570    let (r, w) = tokio::io::split(tlsstream);
571    let mut r = FramedRead::new(r, codec::SupplierCodec::new(max_frame_bytes));
572    let mut w = FramedWrite::new(w, codec::SupplierCodec::new(max_frame_bytes));
573
574    while let Some(codec_msg) = r.next().await {
575        match codec_msg {
576            Ok(ConsumerRequest::Ping) => {
577                debug!("consumer requested ping");
578                if let Err(err) = w.send(SupplierResponse::Pong).await {
579                    error!(?err, "supplier encode error, unable to continue.");
580                    break;
581                }
582            }
583            Ok(ConsumerRequest::Incremental(consumer_ruv_range)) => {
584                let changes = match idms.proxy_read().await.and_then(|mut read_txn| {
585                    read_txn
586                        .qs_read
587                        .supplier_provide_changes(consumer_ruv_range)
588                }) {
589                    Ok(changes) => changes,
590                    Err(err) => {
591                        error!(?err, "supplier provide changes failed.");
592                        break;
593                    }
594                };
595
596                if let Err(err) = w.send(SupplierResponse::Incremental(changes)).await {
597                    error!(?err, "supplier encode error, unable to continue.");
598                    break;
599                }
600            }
601            Ok(ConsumerRequest::Refresh) => {
602                let changes = match idms
603                    .proxy_read()
604                    .await
605                    .and_then(|mut read_txn| read_txn.qs_read.supplier_provide_refresh())
606                {
607                    Ok(changes) => changes,
608                    Err(err) => {
609                        error!(?err, "supplier provide refresh failed.");
610                        break;
611                    }
612                };
613
614                if let Err(err) = w.send(SupplierResponse::Refresh(changes)).await {
615                    error!(?err, "supplier encode error, unable to continue.");
616                    break;
617                }
618            }
619            Err(err) => {
620                error!(?err, "supplier decode error, unable to continue.");
621                break;
622            }
623        }
624    }
625
626    debug!(?client_address, "replication client disconnected 🛬");
627}
628
629async fn repl_acceptor(
630    listener: TcpListener,
631    idms: Arc<IdmServer>,
632    repl_config: ReplicationConfiguration,
633    mut rx: broadcast::Receiver<CoreAction>,
634    mut ctrl_rx: mpsc::Receiver<ReplCtrl>,
635) {
636    info!("Starting Replication Acceptor ...");
637    // Persistent parts
638    // These all probably need changes later ...
639    let replica_connect_timeout = Duration::from_secs(2);
640    let retry_timeout = Duration::from_secs(60);
641    let max_frame_bytes = 268435456;
642
643    let consumer_conn_settings = ConsumerConnSettings {
644        max_frame_bytes,
645        task_poll_interval: repl_config.get_task_poll_interval(),
646        replica_connect_timeout,
647    };
648
649    // Setup a broadcast to control our tasks.
650    let (task_tx, task_rx1) = broadcast::channel(1);
651    // Note, we drop this task here since each task will re-subscribe. That way the
652    // broadcast doesn't jam up because we aren't draining this task.
653    drop(task_rx1);
654    let mut task_handles = VecDeque::new();
655
656    // Create another broadcast to control the replication tasks and their need to reload.
657
658    // Spawn a KRC communication task?
659
660    // In future we need to update this from the KRC if configured, and we default this
661    // to "empty". But if this map exists in the config, we have to always use that.
662    let replication_node_map = repl_config.manual.clone();
663    let domain_name = match repl_config.origin.domain() {
664        Some(n) => n.to_string(),
665        None => {
666            error!("Unable to start replication, replication origin does not contain a valid domain name.");
667            return;
668        }
669    };
670
671    // This needs to have an event loop that can respond to changes.
672    // For now we just design it to reload ssl if the map changes internally.
673    'event: loop {
674        info!("Starting replication reload ...");
675        // Tell existing tasks to shutdown.
676        // Note: We ignore the result here since an err can occur *if* there are
677        // no tasks currently listening on the channel.
678        info!("Stopping {} Replication Tasks ...", task_handles.len());
679        debug_assert!(task_handles.len() >= task_tx.receiver_count());
680        let _ = task_tx.send(ReplConsumerCtrl::Stop);
681        for task_handle in task_handles.drain(..) {
682            // Let each task join.
683            let res: Result<(), _> = task_handle.await;
684            if res.is_err() {
685                warn!("Failed to join replication task, continuing ...");
686            }
687        }
688
689        // Now we can start to re-load configurations and setup our client tasks
690        // as well.
691
692        // Get the private key / cert.
693        let res = {
694            let ct = duration_from_epoch_now();
695            idms.proxy_write(ct).await.and_then(|mut idms_prox_write| {
696                idms_prox_write
697                    .qs_write
698                    .supplier_get_key_cert(&domain_name)
699                    .and_then(|res| idms_prox_write.commit().map(|()| res))
700            })
701        };
702
703        let (server_key, server_cert) = match res {
704            Ok(r) => r,
705            Err(err) => {
706                error!(?err, "CRITICAL: Unable to access supplier certificate/key.");
707                sleep(retry_timeout).await;
708                continue;
709            }
710        };
711
712        info!(
713            replication_cert_not_before = ?server_cert.not_before(),
714            replication_cert_not_after = ?server_cert.not_after(),
715        );
716
717        let mut client_certs = Vec::new();
718
719        // For each node in the map, either spawn a task to pull from that node,
720        // or setup the node as allowed to pull from us.
721        for (origin, node) in replication_node_map.iter() {
722            // Setup client certs
723            match node {
724                RepNodeConfig::MutualPull {
725                    partner_cert: consumer_cert,
726                    automatic_refresh: _,
727                }
728                | RepNodeConfig::AllowPull { consumer_cert } => {
729                    client_certs.push(consumer_cert.clone())
730                }
731                RepNodeConfig::Pull {
732                    supplier_cert: _,
733                    automatic_refresh: _,
734                } => {}
735            };
736
737            match node {
738                RepNodeConfig::MutualPull {
739                    partner_cert: supplier_cert,
740                    automatic_refresh,
741                }
742                | RepNodeConfig::Pull {
743                    supplier_cert,
744                    automatic_refresh,
745                } => {
746                    let task_rx = task_tx.subscribe();
747
748                    let handle: JoinHandle<()> = tokio::spawn(repl_task(
749                        origin.clone(),
750                        server_key.clone(),
751                        server_cert.clone(),
752                        supplier_cert.clone(),
753                        consumer_conn_settings.clone(),
754                        task_rx,
755                        *automatic_refresh,
756                        idms.clone(),
757                    ));
758
759                    task_handles.push_back(handle);
760                    debug_assert_eq!(task_handles.len(), task_tx.receiver_count());
761                }
762                RepNodeConfig::AllowPull { consumer_cert: _ } => {}
763            };
764        }
765
766        // ⚠️  This section is critical to the security of replication
767        //    Since replication relies on mTLS we MUST ensure these options
768        //    are absolutely correct!
769        //
770        // Setup the TLS builder.
771        let mut tls_builder = match SslAcceptor::mozilla_modern_v5(SslMethod::tls()) {
772            Ok(tls_builder) => tls_builder,
773            Err(err) => {
774                error!(?err, "CRITICAL, unable to create SslAcceptorBuilder.");
775                sleep(retry_timeout).await;
776                continue;
777            }
778        };
779
780        // tls_builder.set_keylog_callback(keylog_cb);
781        if let Err(err) = tls_builder
782            .set_certificate(&server_cert)
783            .and_then(|_| tls_builder.set_private_key(&server_key))
784            .and_then(|_| tls_builder.check_private_key())
785        {
786            error!(?err, "CRITICAL, unable to set server_cert and server key.");
787            sleep(retry_timeout).await;
788            continue;
789        };
790
791        // ⚠️  CRITICAL - ensure that the cert store only has client certs from
792        // the repl map added.
793        let cert_store = tls_builder.cert_store_mut();
794        for client_cert in client_certs.into_iter() {
795            if let Err(err) = cert_store.add_cert(client_cert.clone()) {
796                error!(?err, "CRITICAL, unable to add client certificates.");
797                sleep(retry_timeout).await;
798                continue;
799            }
800        }
801
802        // ⚠️  CRITICAL - Both verifications here are needed. PEER requests
803        // the client cert to be sent. FAIL_IF_NO_PEER_CERT triggers an
804        // error if the cert is NOT present. FAIL_IF_NO_PEER_CERT on its own
805        // DOES NOTHING.
806        let mut verify = SslVerifyMode::PEER;
807        verify.insert(SslVerifyMode::FAIL_IF_NO_PEER_CERT);
808        tls_builder.set_verify(verify);
809
810        let tls_acceptor = tls_builder.build();
811
812        loop {
813            // This is great to diagnose when spans are entered or present and they capture
814            // things incorrectly.
815            // eprintln!("🔥 C ---> {:?}", tracing::Span::current());
816            let eventid = Uuid::new_v4();
817
818            tokio::select! {
819                Ok(action) = rx.recv() => {
820                    match action {
821                        CoreAction::Shutdown => break 'event,
822                    }
823                }
824                Some(ctrl_msg) = ctrl_rx.recv() => {
825                    match ctrl_msg {
826                        ReplCtrl::GetCertificate {
827                            respond
828                        } => {
829                            let _span = debug_span!("supplier_accept_loop", uuid = ?eventid).entered();
830                            if respond.send(server_cert.clone()).is_err() {
831                                warn!("Server certificate was requested, but requsetor disconnected");
832                            } else {
833                                trace!("Sent server certificate via control channel");
834                            }
835                        }
836                        ReplCtrl::RenewCertificate {
837                            respond
838                        } => {
839                            let span = debug_span!("supplier_accept_loop", uuid = ?eventid);
840                            async {
841                                debug!("renewing replication certificate ...");
842                                // Renew the cert.
843                                let res = {
844                                    let ct = duration_from_epoch_now();
845                                    idms.proxy_write(ct).await
846                                        .and_then(|mut idms_prox_write|
847                                    idms_prox_write
848                                        .qs_write
849                                        .supplier_renew_key_cert(&domain_name)
850                                        .and_then(|res| idms_prox_write.commit().map(|()| res))
851                                        )
852                                };
853
854                                let success = res.is_ok();
855
856                                if let Err(err) = res {
857                                    error!(?err, "failed to renew server certificate");
858                                }
859
860                                if respond.send(success).is_err() {
861                                    warn!("Server certificate renewal was requested, but requester disconnected!");
862                                } else {
863                                    trace!("Sent server certificate renewal status via control channel");
864                                }
865                            }
866                            .instrument(span)
867                            .await;
868
869                            // Start a reload.
870                            continue 'event;
871                        }
872                        ReplCtrl::RefreshConsumer {
873                            respond
874                        } => {
875                            // Indicate to consumer tasks that they should do a refresh.
876                            let (tx, rx) = mpsc::channel(1);
877
878                            let refresh_coord = Arc::new(
879                                Mutex::new(
880                                (
881                                    false, tx
882                                )
883                                )
884                            );
885
886                            if task_tx.send(ReplConsumerCtrl::Refresh(refresh_coord)).is_err() {
887                                error!("Unable to begin replication consumer refresh, tasks are unable to be notified.");
888                            }
889
890                            if respond.send(rx).is_err() {
891                                warn!("Replication consumer refresh was requested, but requester disconnected");
892                            } else {
893                                trace!("Sent refresh comms channel to requester");
894                            }
895                        }
896                    }
897                }
898                // Handle accepts.
899                // Handle *reloads*
900                /*
901                _ = reload.recv() => {
902                    info!("Initiating TLS reload");
903                    continue
904                }
905                */
906                accept_result = listener.accept() => {
907                    match accept_result {
908                        Ok((tcpstream, client_socket_addr)) => {
909                            let clone_idms = idms.clone();
910                            let clone_tls_acceptor = tls_acceptor.clone();
911                            // We don't care about the join handle here - once a client connects
912                            // it sticks to whatever ssl settings it had at launch.
913                            tokio::spawn(
914                                handle_repl_conn(max_frame_bytes, tcpstream, client_socket_addr, clone_tls_acceptor, clone_idms)
915                            );
916                        }
917                        Err(e) => {
918                            error!("replication acceptor error, continuing -> {:?}", e);
919                        }
920                    }
921                }
922            } // end select
923              // Continue to poll/loop
924        }
925    }
926    // Shutdown child tasks.
927    info!("Stopping {} Replication Tasks ...", task_handles.len());
928    debug_assert!(task_handles.len() >= task_tx.receiver_count());
929    let _ = task_tx.send(ReplConsumerCtrl::Stop);
930    for task_handle in task_handles.drain(..) {
931        // Let each task join.
932        let res: Result<(), _> = task_handle.await.map(|_| ());
933        if res.is_err() {
934            warn!("Failed to join replication task, continuing ...");
935        }
936    }
937
938    info!("Stopped {}", super::TaskName::Replication);
939}