kanidmd_core/repl/
mod.rs

1use openssl::{
2    pkey::{PKey, Private},
3    ssl::{Ssl, SslAcceptor, SslConnector, SslMethod, SslVerifyMode},
4    x509::{store::X509StoreBuilder, X509},
5};
6use std::collections::VecDeque;
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::broadcast;
12use tokio::sync::mpsc;
13use tokio::sync::oneshot;
14use tokio::sync::Mutex;
15use tokio::time::{interval, sleep, timeout};
16use tokio::{
17    net::{TcpListener, TcpStream},
18    task::JoinHandle,
19};
20use tokio_openssl::SslStream;
21use tokio_util::codec::{Framed, FramedRead, FramedWrite};
22use tracing::{error, Instrument};
23use url::Url;
24use uuid::Uuid;
25
26use futures_util::sink::SinkExt;
27use futures_util::stream::StreamExt;
28
29use kanidmd_lib::prelude::duration_from_epoch_now;
30use kanidmd_lib::prelude::IdmServer;
31use kanidmd_lib::repl::proto::ConsumerState;
32use kanidmd_lib::server::QueryServerTransaction;
33
34use crate::CoreAction;
35use config::{RepNodeConfig, ReplicationConfiguration};
36
37use self::codec::{ConsumerRequest, SupplierResponse};
38
39mod codec;
40pub(crate) mod config;
41
42pub(crate) enum ReplCtrl {
43    GetCertificate {
44        respond: oneshot::Sender<X509>,
45    },
46    RenewCertificate {
47        respond: oneshot::Sender<bool>,
48    },
49    RefreshConsumer {
50        respond: oneshot::Sender<mpsc::Receiver<()>>,
51    },
52}
53
54#[derive(Debug, Clone)]
55enum ReplConsumerCtrl {
56    Stop,
57    Refresh(Arc<Mutex<(bool, mpsc::Sender<()>)>>),
58}
59
60pub(crate) async fn create_repl_server(
61    idms: Arc<IdmServer>,
62    repl_config: &ReplicationConfiguration,
63    rx: broadcast::Receiver<CoreAction>,
64) -> Result<(tokio::task::JoinHandle<()>, mpsc::Sender<ReplCtrl>), ()> {
65    // We need to start the tcp listener. This will persist over ssl reloads!
66    let listener = TcpListener::bind(&repl_config.bindaddress)
67        .await
68        .map_err(|e| {
69            error!(
70                "Could not bind to replication address {} -> {:?}",
71                repl_config.bindaddress, e
72            );
73        })?;
74
75    // Create the control channel. Use a low msg count, there won't be that much going on.
76    let (ctrl_tx, ctrl_rx) = mpsc::channel(4);
77
78    // We need to start the tcp listener. This will persist over ssl reloads!
79    info!(
80        "Starting replication interface https://{} ...",
81        repl_config.bindaddress
82    );
83    let repl_handle: JoinHandle<()> = tokio::spawn(repl_acceptor(
84        listener,
85        idms,
86        repl_config.clone(),
87        rx,
88        ctrl_rx,
89    ));
90
91    info!("Created replication interface");
92    Ok((repl_handle, ctrl_tx))
93}
94
95#[instrument(level = "debug", skip_all)]
96/// This returns the remote address that worked, so you can try that first next time
97async fn repl_consumer_connect_supplier(
98    domain: &str,
99    sock_addrs: &[SocketAddr],
100    tls_connector: &SslConnector,
101    consumer_conn_settings: &ConsumerConnSettings,
102) -> Option<(
103    SocketAddr,
104    Framed<SslStream<TcpStream>, codec::ConsumerCodec>,
105)> {
106    // This is pretty gnarly, but we need to loop to try out each socket addr.
107    for sock_addr in sock_addrs {
108        debug!(
109            "Attempting to connect to {} replica via {}",
110            domain, sock_addr
111        );
112
113        let tcpstream = match timeout(
114            consumer_conn_settings.replica_connect_timeout,
115            TcpStream::connect(sock_addr),
116        )
117        .await
118        {
119            Ok(Ok(tc)) => {
120                trace!("Connection established to peer on {:?}", sock_addr);
121                tc
122            }
123            Ok(Err(err)) => {
124                debug!(?err, "Failed to connect to {}", sock_addr);
125                continue;
126            }
127            Err(_) => {
128                debug!("Timeout connecting to {}", sock_addr);
129                continue;
130            }
131        };
132
133        let mut tlsstream = match Ssl::new(tls_connector.context())
134            .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
135        {
136            Ok(ta) => ta,
137            Err(e) => {
138                error!("Replication client TLS setup error, continuing -> {:?}", e);
139                continue;
140            }
141        };
142
143        if let Err(e) = SslStream::connect(Pin::new(&mut tlsstream)).await {
144            error!("Replication client TLS accept error, continuing -> {:?}", e);
145            continue;
146        };
147
148        let supplier_conn = Framed::new(
149            tlsstream,
150            codec::ConsumerCodec::new(consumer_conn_settings.max_frame_bytes),
151        );
152        // "hey this one worked, try it first next time!"
153        return Some((sock_addr.to_owned(), supplier_conn));
154    }
155
156    error!(
157        "Unable to connect to supplier, tried to connect to {:?}",
158        sock_addrs
159    );
160    None
161}
162
163/// This returns the socket address that worked, so you can try that first next time
164#[instrument(level="info", skip(refresh_coord, tls_connector, idms), fields(uuid=Uuid::new_v4().to_string()))]
165async fn repl_run_consumer_refresh(
166    refresh_coord: Arc<Mutex<(bool, mpsc::Sender<()>)>>,
167    domain: &str,
168    sock_addrs: &[SocketAddr],
169    tls_connector: &SslConnector,
170    idms: &IdmServer,
171    consumer_conn_settings: &ConsumerConnSettings,
172) -> Result<Option<SocketAddr>, ()> {
173    // Take the refresh lock. Note that every replication consumer *should* end up here
174    // behind this lock, but only one can proceed. This is what we want!
175
176    let mut refresh_coord_guard = refresh_coord.lock().await;
177
178    // Simple case - task is already done.
179    if refresh_coord_guard.0 {
180        trace!("Refresh already completed by another task, return.");
181        return Ok(None);
182    }
183
184    // okay, we need to proceed.
185    let (addr, mut supplier_conn) =
186        repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
187            .await
188            .ok_or(())?;
189
190    // If we fail at any point, just RETURN because this leaves the next task to attempt, or
191    // the channel drops and that tells the caller this failed.
192    supplier_conn
193        .send(ConsumerRequest::Refresh)
194        .await
195        .map_err(|err| error!(?err, "consumer encode error, unable to continue."))?;
196
197    let refresh = if let Some(codec_msg) = supplier_conn.next().await {
198        match codec_msg.map_err(|err| error!(?err, "Consumer decode error, unable to continue."))? {
199            SupplierResponse::Refresh(changes) => {
200                // Success - return to bypass the error message.
201                changes
202            }
203            SupplierResponse::Pong | SupplierResponse::Incremental(_) => {
204                error!("Supplier Response contains invalid State");
205                return Err(());
206            }
207        }
208    } else {
209        error!("Connection closed");
210        return Err(());
211    };
212
213    // Now apply the refresh if possible
214    {
215        // Scope the transaction.
216        let ct = duration_from_epoch_now();
217        idms.proxy_write(ct)
218            .await
219            .and_then(|mut write_txn| {
220                write_txn
221                    .qs_write
222                    .consumer_apply_refresh(refresh)
223                    .and_then(|cs| write_txn.commit().map(|()| cs))
224            })
225            .map_err(|err| error!(?err, "Consumer was not able to apply refresh."))?;
226    }
227
228    // Now mark the refresh as complete AND indicate it to the channel.
229    refresh_coord_guard.0 = true;
230    if refresh_coord_guard.1.send(()).await.is_err() {
231        warn!("Unable to signal to caller that refresh has completed.");
232    }
233
234    // Here the coord guard will drop and every other task proceeds.
235
236    info!("Replication refresh was successful.");
237    Ok(Some(addr))
238}
239
240#[instrument(level="debug", skip(tls_connector, idms), fields(eventid=Uuid::new_v4().to_string()))]
241async fn repl_run_consumer(
242    domain: &str,
243    sock_addrs: &[SocketAddr],
244    tls_connector: &SslConnector,
245    automatic_refresh: bool,
246    idms: &IdmServer,
247    consumer_conn_settings: &ConsumerConnSettings,
248) -> Option<SocketAddr> {
249    let (socket_addr, mut supplier_conn) =
250        repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
251            .await?;
252
253    // Perform incremental.
254    let consumer_ruv_range = {
255        let consumer_state = idms
256            .proxy_read()
257            .await
258            .and_then(|mut read_txn| read_txn.qs_read.consumer_get_state());
259        match consumer_state {
260            Ok(ruv_range) => ruv_range,
261            Err(err) => {
262                error!(
263                    ?err,
264                    "consumer ruv range could not be accessed, unable to continue."
265                );
266                return None;
267            }
268        }
269    };
270
271    if let Err(err) = supplier_conn
272        .send(ConsumerRequest::Incremental(consumer_ruv_range))
273        .await
274    {
275        error!(?err, "consumer encode error, unable to continue.");
276        return None;
277    }
278
279    let changes = if let Some(codec_msg) = supplier_conn.next().await {
280        match codec_msg {
281            Ok(SupplierResponse::Incremental(changes)) => {
282                // Success - return to bypass the error message.
283                changes
284            }
285            Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Refresh(_)) => {
286                error!("Supplier Response contains invalid state");
287                return None;
288            }
289            Err(err) => {
290                error!(?err, "Consumer decode error, unable to continue.");
291                return None;
292            }
293        }
294    } else {
295        error!("Connection closed");
296        return None;
297    };
298
299    // Now apply the changes if possible
300    let consumer_state = {
301        let ct = duration_from_epoch_now();
302        match idms.proxy_write(ct).await.and_then(|mut write_txn| {
303            write_txn
304                .qs_write
305                .consumer_apply_changes(changes)
306                .and_then(|cs| write_txn.commit().map(|()| cs))
307        }) {
308            Ok(state) => state,
309            Err(err) => {
310                error!(?err, "Consumer was not able to apply changes.");
311                return None;
312            }
313        }
314    };
315
316    match consumer_state {
317        ConsumerState::Ok => {
318            info!("Incremental Replication Success");
319            // return to bypass the failure message.
320            return Some(socket_addr);
321        }
322        ConsumerState::RefreshRequired => {
323            if automatic_refresh {
324                warn!("Consumer is out of date and must be refreshed. This will happen *now*.");
325            } else {
326                error!("Consumer is out of date and must be refreshed. You must manually resolve this situation.");
327                return None;
328            };
329        }
330    }
331
332    if let Err(err) = supplier_conn.send(ConsumerRequest::Refresh).await {
333        error!(?err, "consumer encode error, unable to continue.");
334        return None;
335    }
336
337    let refresh = if let Some(codec_msg) = supplier_conn.next().await {
338        match codec_msg {
339            Ok(SupplierResponse::Refresh(changes)) => {
340                // Success - return to bypass the error message.
341                changes
342            }
343            Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Incremental(_)) => {
344                error!("Supplier Response contains invalid State");
345                return None;
346            }
347            Err(err) => {
348                error!(?err, "consumer decode error, unable to continue.");
349                return None;
350            }
351        }
352    } else {
353        error!("Connection closed");
354        return None;
355    };
356
357    // Now apply the refresh if possible
358    let ct = duration_from_epoch_now();
359    if let Err(err) = idms.proxy_write(ct).await.and_then(|mut write_txn| {
360        write_txn
361            .qs_write
362            .consumer_apply_refresh(refresh)
363            .and_then(|cs| write_txn.commit().map(|()| cs))
364    }) {
365        error!(?err, "consumer was not able to apply refresh.");
366        return None;
367    }
368
369    info!("Replication refresh was successful.");
370    Some(socket_addr)
371}
372
373#[derive(Debug, Clone)]
374struct ConsumerConnSettings {
375    max_frame_bytes: usize,
376    task_poll_interval: Duration,
377    replica_connect_timeout: Duration,
378}
379
380#[allow(clippy::too_many_arguments)]
381async fn repl_task(
382    origin: Url,
383    client_key: PKey<Private>,
384    client_cert: X509,
385    supplier_cert: X509,
386    consumer_conn_settings: ConsumerConnSettings,
387    mut task_rx: broadcast::Receiver<ReplConsumerCtrl>,
388    automatic_refresh: bool,
389    idms: Arc<IdmServer>,
390) {
391    if origin.scheme() != "repl" {
392        error!("Replica origin is not repl:// - refusing to proceed.");
393        return;
394    }
395
396    let domain = match origin.domain() {
397        Some(d) => d,
398        None => {
399            error!("Replica origin does not have a valid domain name, unable to proceed. Perhaps you tried to use an ip address?");
400            return;
401        }
402    };
403
404    // Setup our tls connector.
405    let mut ssl_builder = match SslConnector::builder(SslMethod::tls_client()) {
406        Ok(sb) => sb,
407        Err(err) => {
408            error!(?err, "Unable to configure tls connector");
409            return;
410        }
411    };
412
413    let setup_client_cert = ssl_builder
414        .set_certificate(&client_cert)
415        .and_then(|_| ssl_builder.set_private_key(&client_key))
416        .and_then(|_| ssl_builder.check_private_key());
417    if let Err(err) = setup_client_cert {
418        error!(?err, "Unable to configure client certificate/key");
419        return;
420    }
421
422    // Add the supplier cert.
423    // ⚠️  note that here we need to build a new cert store. This is because
424    // openssl SslConnector adds the default system cert locations with
425    // the call to ::builder and we *don't* want this. We want our certstore
426    // to pin a single certificate!
427    let mut cert_store = match X509StoreBuilder::new() {
428        Ok(csb) => csb,
429        Err(err) => {
430            error!(?err, "Unable to configure certificate store builder.");
431            return;
432        }
433    };
434
435    if let Err(err) = cert_store.add_cert(supplier_cert) {
436        error!(?err, "Unable to add supplier certificate to cert store");
437        return;
438    }
439
440    let cert_store = cert_store.build();
441    ssl_builder.set_cert_store(cert_store);
442
443    // Configure the expected hostname of the remote.
444    let verify_param = ssl_builder.verify_param_mut();
445    if let Err(err) = verify_param.set_host(domain) {
446        error!(?err, "Unable to set domain name for tls peer verification");
447        return;
448    }
449
450    // Assert the expected supplier certificate is correct and has a valid domain san
451    ssl_builder.set_verify(SslVerifyMode::PEER);
452    let tls_connector = ssl_builder.build();
453
454    let mut repl_interval = interval(consumer_conn_settings.task_poll_interval);
455
456    info!("Replica task for {} has started.", origin);
457
458    // we keep track of the "last known good" socketaddr so we can try that first next time.
459    let mut last_working_address: Option<SocketAddr> = None;
460
461    // Okay, all the parameters are set up. Now we replicate on our interval.
462    loop {
463        // we resolve the DNS entry to the ip:port each time we attempt a connection to avoid stale
464        // DNS issues, ref #3188. If we are unable to resolve the address, we backoff and try again
465        // as in something like docker the address may change frequently.
466        //
467        // Note, if DNS isn't available, we can proceed with the last used working address too. This
468        // prevents DNS (or lack thereof) from causing a replication outage.
469        let mut sorted_socket_addrs = vec![];
470
471        // If the target address worked last time, then let's use it this time!
472        if let Some(addr) = last_working_address {
473            debug!(?last_working_address);
474            sorted_socket_addrs.push(addr);
475        };
476
477        // Default to port 443 if not set in the origin
478        match origin.socket_addrs(|| Some(443)) {
479            Ok(mut socket_addrs) => {
480                // Make every address unique.
481                socket_addrs.sort_unstable();
482                socket_addrs.dedup();
483
484                // The only possible conflict is with the last working address,
485                // so lets just check that.
486                socket_addrs.into_iter().for_each(|addr| {
487                    if Some(&addr) != last_working_address.as_ref() {
488                        // Not already present, append
489                        sorted_socket_addrs.push(addr);
490                    }
491                });
492            }
493            Err(err) => {
494                if let Some(addr) = last_working_address {
495                    warn!(
496                        ?err,
497                        "Unable to resolve '{origin}' to ip:port, using last known working address '{addr}'"
498                    );
499                } else {
500                    warn!(?err, "Unable to resolve '{origin}' to ip:port.");
501                }
502            }
503        };
504
505        if sorted_socket_addrs.is_empty() {
506            warn!(
507                "No replication addresses available, delaying replication operation for '{origin}'"
508            );
509            repl_interval.tick().await;
510            continue;
511        }
512
513        tokio::select! {
514            Ok(task) = task_rx.recv() => {
515                match task {
516                    ReplConsumerCtrl::Stop => break,
517                    ReplConsumerCtrl::Refresh ( refresh_coord ) => {
518                        last_working_address = (repl_run_consumer_refresh(
519                            refresh_coord,
520                            domain,
521                            &sorted_socket_addrs,
522                            &tls_connector,
523                            &idms,
524                            &consumer_conn_settings
525                        )
526                        .await).unwrap_or_default();
527                    }
528                }
529            }
530            _ = repl_interval.tick() => {
531                // Interval passed, attempt a replication run.
532                repl_run_consumer(
533                    domain,
534                    &sorted_socket_addrs,
535                    &tls_connector,
536                    automatic_refresh,
537                    &idms,
538                    &consumer_conn_settings
539                )
540                .await;
541            }
542        }
543    }
544
545    info!("Replica task for {} has stopped.", origin);
546}
547
548#[instrument(level = "debug", skip_all)]
549async fn handle_repl_conn(
550    max_frame_bytes: usize,
551    tcpstream: TcpStream,
552    client_address: SocketAddr,
553    tls_parms: SslAcceptor,
554    idms: Arc<IdmServer>,
555) {
556    debug!(?client_address, "replication client connected 🛫");
557
558    let mut tlsstream = match Ssl::new(tls_parms.context())
559        .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
560    {
561        Ok(ta) => ta,
562        Err(err) => {
563            error!(?err, "Replication TLS setup error, disconnecting client");
564            return;
565        }
566    };
567    if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
568        error!(?err, "Replication TLS accept error, disconnecting client");
569        return;
570    };
571    let (r, w) = tokio::io::split(tlsstream);
572    let mut r = FramedRead::new(r, codec::SupplierCodec::new(max_frame_bytes));
573    let mut w = FramedWrite::new(w, codec::SupplierCodec::new(max_frame_bytes));
574
575    while let Some(codec_msg) = r.next().await {
576        match codec_msg {
577            Ok(ConsumerRequest::Ping) => {
578                debug!("consumer requested ping");
579                if let Err(err) = w.send(SupplierResponse::Pong).await {
580                    error!(?err, "supplier encode error, unable to continue.");
581                    break;
582                }
583            }
584            Ok(ConsumerRequest::Incremental(consumer_ruv_range)) => {
585                let changes = match idms.proxy_read().await.and_then(|mut read_txn| {
586                    read_txn
587                        .qs_read
588                        .supplier_provide_changes(consumer_ruv_range)
589                }) {
590                    Ok(changes) => changes,
591                    Err(err) => {
592                        error!(?err, "supplier provide changes failed.");
593                        break;
594                    }
595                };
596
597                if let Err(err) = w.send(SupplierResponse::Incremental(changes)).await {
598                    error!(?err, "supplier encode error, unable to continue.");
599                    break;
600                }
601            }
602            Ok(ConsumerRequest::Refresh) => {
603                let changes = match idms
604                    .proxy_read()
605                    .await
606                    .and_then(|mut read_txn| read_txn.qs_read.supplier_provide_refresh())
607                {
608                    Ok(changes) => changes,
609                    Err(err) => {
610                        error!(?err, "supplier provide refresh failed.");
611                        break;
612                    }
613                };
614
615                if let Err(err) = w.send(SupplierResponse::Refresh(changes)).await {
616                    error!(?err, "supplier encode error, unable to continue.");
617                    break;
618                }
619            }
620            Err(err) => {
621                error!(?err, "supplier decode error, unable to continue.");
622                break;
623            }
624        }
625    }
626
627    debug!(?client_address, "replication client disconnected 🛬");
628}
629
630/// This is the main acceptor for the replication server.
631async fn repl_acceptor(
632    listener: TcpListener,
633    idms: Arc<IdmServer>,
634    repl_config: ReplicationConfiguration,
635    mut rx: broadcast::Receiver<CoreAction>,
636    mut ctrl_rx: mpsc::Receiver<ReplCtrl>,
637) {
638    info!("Starting Replication Acceptor ...");
639    // Persistent parts
640    // These all probably need changes later ...
641    let replica_connect_timeout = Duration::from_secs(2);
642    let retry_timeout = Duration::from_secs(60);
643    let max_frame_bytes = 268435456;
644
645    let consumer_conn_settings = ConsumerConnSettings {
646        max_frame_bytes,
647        task_poll_interval: repl_config.get_task_poll_interval(),
648        replica_connect_timeout,
649    };
650
651    // Setup a broadcast to control our tasks.
652    let (task_tx, task_rx1) = broadcast::channel(1);
653    // Note, we drop this task here since each task will re-subscribe. That way the
654    // broadcast doesn't jam up because we aren't draining this task.
655    drop(task_rx1);
656    let mut task_handles = VecDeque::new();
657
658    // Create another broadcast to control the replication tasks and their need to reload.
659
660    // Spawn a KRC communication task?
661
662    // In future we need to update this from the KRC if configured, and we default this
663    // to "empty". But if this map exists in the config, we have to always use that.
664    let replication_node_map = repl_config.manual.clone();
665    let domain_name = match repl_config.origin.domain() {
666        Some(n) => n.to_string(),
667        None => {
668            error!("Unable to start replication, replication origin does not contain a valid domain name.");
669            return;
670        }
671    };
672
673    // This needs to have an event loop that can respond to changes.
674    // For now we just design it to reload ssl if the map changes internally.
675    'event: loop {
676        info!("Starting replication reload ...");
677        // Tell existing tasks to shutdown.
678        // Note: We ignore the result here since an err can occur *if* there are
679        // no tasks currently listening on the channel.
680        info!("Stopping {} Replication Tasks ...", task_handles.len());
681        debug_assert!(task_handles.len() >= task_tx.receiver_count());
682        let _ = task_tx.send(ReplConsumerCtrl::Stop);
683        for task_handle in task_handles.drain(..) {
684            // Let each task join.
685            let res: Result<(), _> = task_handle.await;
686            if res.is_err() {
687                warn!("Failed to join replication task, continuing ...");
688            }
689        }
690
691        // Now we can start to re-load configurations and setup our client tasks
692        // as well.
693
694        // Get the private key / cert.
695        let res = {
696            let ct = duration_from_epoch_now();
697            idms.proxy_write(ct).await.and_then(|mut idms_prox_write| {
698                idms_prox_write
699                    .qs_write
700                    .supplier_get_key_cert(&domain_name)
701                    .and_then(|res| idms_prox_write.commit().map(|()| res))
702            })
703        };
704
705        let (server_key, server_cert) = match res {
706            Ok(r) => r,
707            Err(err) => {
708                error!(?err, "CRITICAL: Unable to access supplier certificate/key.");
709                sleep(retry_timeout).await;
710                continue;
711            }
712        };
713
714        info!(
715            replication_cert_not_before = ?server_cert.not_before(),
716            replication_cert_not_after = ?server_cert.not_after(),
717        );
718
719        let mut client_certs = Vec::new();
720
721        // For each node in the map, either spawn a task to pull from that node,
722        // or setup the node as allowed to pull from us.
723        for (origin, node) in replication_node_map.iter() {
724            // Setup client certs
725            match node {
726                RepNodeConfig::MutualPull {
727                    partner_cert: consumer_cert,
728                    automatic_refresh: _,
729                }
730                | RepNodeConfig::AllowPull { consumer_cert } => {
731                    client_certs.push(consumer_cert.clone())
732                }
733                RepNodeConfig::Pull {
734                    supplier_cert: _,
735                    automatic_refresh: _,
736                } => {}
737            };
738
739            match node {
740                RepNodeConfig::MutualPull {
741                    partner_cert: supplier_cert,
742                    automatic_refresh,
743                }
744                | RepNodeConfig::Pull {
745                    supplier_cert,
746                    automatic_refresh,
747                } => {
748                    let task_rx = task_tx.subscribe();
749
750                    let handle: JoinHandle<()> = tokio::spawn(repl_task(
751                        origin.clone(),
752                        server_key.clone(),
753                        server_cert.clone(),
754                        supplier_cert.clone(),
755                        consumer_conn_settings.clone(),
756                        task_rx,
757                        *automatic_refresh,
758                        idms.clone(),
759                    ));
760
761                    task_handles.push_back(handle);
762                    debug_assert_eq!(task_handles.len(), task_tx.receiver_count());
763                }
764                RepNodeConfig::AllowPull { consumer_cert: _ } => {}
765            };
766        }
767
768        // ⚠️  This section is critical to the security of replication
769        //    Since replication relies on mTLS we MUST ensure these options
770        //    are absolutely correct!
771        //
772        // Setup the TLS builder.
773        let mut tls_builder = match SslAcceptor::mozilla_modern_v5(SslMethod::tls()) {
774            Ok(tls_builder) => tls_builder,
775            Err(err) => {
776                error!(?err, "CRITICAL, unable to create SslAcceptorBuilder.");
777                sleep(retry_timeout).await;
778                continue;
779            }
780        };
781
782        // tls_builder.set_keylog_callback(keylog_cb);
783        if let Err(err) = tls_builder
784            .set_certificate(&server_cert)
785            .and_then(|_| tls_builder.set_private_key(&server_key))
786            .and_then(|_| tls_builder.check_private_key())
787        {
788            error!(?err, "CRITICAL, unable to set server_cert and server key.");
789            sleep(retry_timeout).await;
790            continue;
791        };
792
793        // ⚠️  CRITICAL - ensure that the cert store only has client certs from
794        // the repl map added.
795        let cert_store = tls_builder.cert_store_mut();
796        for client_cert in client_certs.into_iter() {
797            if let Err(err) = cert_store.add_cert(client_cert.clone()) {
798                error!(?err, "CRITICAL, unable to add client certificates.");
799                sleep(retry_timeout).await;
800                continue;
801            }
802        }
803
804        // ⚠️  CRITICAL - Both verifications here are needed. PEER requests
805        // the client cert to be sent. FAIL_IF_NO_PEER_CERT triggers an
806        // error if the cert is NOT present. FAIL_IF_NO_PEER_CERT on its own
807        // DOES NOTHING.
808        let mut verify = SslVerifyMode::PEER;
809        verify.insert(SslVerifyMode::FAIL_IF_NO_PEER_CERT);
810        tls_builder.set_verify(verify);
811
812        let tls_acceptor = tls_builder.build();
813
814        loop {
815            // This is great to diagnose when spans are entered or present and they capture
816            // things incorrectly.
817            // eprintln!("🔥 C ---> {:?}", tracing::Span::current());
818            let eventid = Uuid::new_v4();
819
820            tokio::select! {
821                Ok(action) = rx.recv() => {
822                    match action {
823                        CoreAction::Shutdown => break 'event,
824                    }
825                }
826                Some(ctrl_msg) = ctrl_rx.recv() => {
827                    match ctrl_msg {
828                        ReplCtrl::GetCertificate {
829                            respond
830                        } => {
831                            let _span = debug_span!("supplier_accept_loop", uuid = ?eventid).entered();
832                            if respond.send(server_cert.clone()).is_err() {
833                                warn!("Server certificate was requested, but requsetor disconnected");
834                            } else {
835                                trace!("Sent server certificate via control channel");
836                            }
837                        }
838                        ReplCtrl::RenewCertificate {
839                            respond
840                        } => {
841                            let span = debug_span!("supplier_accept_loop", uuid = ?eventid);
842                            async {
843                                debug!("renewing replication certificate ...");
844                                // Renew the cert.
845                                let res = {
846                                    let ct = duration_from_epoch_now();
847                                    idms.proxy_write(ct).await
848                                        .and_then(|mut idms_prox_write|
849                                    idms_prox_write
850                                        .qs_write
851                                        .supplier_renew_key_cert(&domain_name)
852                                        .and_then(|res| idms_prox_write.commit().map(|()| res))
853                                        )
854                                };
855
856                                let success = res.is_ok();
857
858                                if let Err(err) = res {
859                                    error!(?err, "failed to renew server certificate");
860                                }
861
862                                if respond.send(success).is_err() {
863                                    warn!("Server certificate renewal was requested, but requester disconnected!");
864                                } else {
865                                    trace!("Sent server certificate renewal status via control channel");
866                                }
867                            }
868                            .instrument(span)
869                            .await;
870
871                            // Start a reload.
872                            continue 'event;
873                        }
874                        ReplCtrl::RefreshConsumer {
875                            respond
876                        } => {
877                            // Indicate to consumer tasks that they should do a refresh.
878                            let (tx, rx) = mpsc::channel(1);
879
880                            let refresh_coord = Arc::new(
881                                Mutex::new(
882                                (
883                                    false, tx
884                                )
885                                )
886                            );
887
888                            if task_tx.send(ReplConsumerCtrl::Refresh(refresh_coord)).is_err() {
889                                error!("Unable to begin replication consumer refresh, tasks are unable to be notified.");
890                            }
891
892                            if respond.send(rx).is_err() {
893                                warn!("Replication consumer refresh was requested, but requester disconnected");
894                            } else {
895                                trace!("Sent refresh comms channel to requester");
896                            }
897                        }
898                    }
899                }
900                // Handle accepts.
901                // Handle *reloads*
902                /*
903                _ = reload.recv() => {
904                    info!("Initiating TLS reload");
905                    continue
906                }
907                */
908                accept_result = listener.accept() => {
909                    match accept_result {
910                        Ok((tcpstream, client_socket_addr)) => {
911                            let clone_idms = idms.clone();
912                            let clone_tls_acceptor = tls_acceptor.clone();
913                            // We don't care about the join handle here - once a client connects
914                            // it sticks to whatever ssl settings it had at launch.
915                            tokio::spawn(
916                                handle_repl_conn(max_frame_bytes, tcpstream, client_socket_addr, clone_tls_acceptor, clone_idms)
917                            );
918                        }
919                        Err(e) => {
920                            error!("replication acceptor error, continuing -> {:?}", e);
921                        }
922                    }
923                }
924            } // end select
925              // Continue to poll/loop
926        }
927    }
928    // Shutdown child tasks.
929    info!("Stopping {} Replication Tasks ...", task_handles.len());
930    debug_assert!(task_handles.len() >= task_tx.receiver_count());
931    let _ = task_tx.send(ReplConsumerCtrl::Stop);
932    for task_handle in task_handles.drain(..) {
933        // Let each task join.
934        let res: Result<(), _> = task_handle.await.map(|_| ());
935        if res.is_err() {
936            warn!("Failed to join replication task, continuing ...");
937        }
938    }
939
940    info!("Stopped {}", super::TaskName::Replication);
941}