1use openssl::{
2 pkey::{PKey, Private},
3 ssl::{Ssl, SslAcceptor, SslConnector, SslMethod, SslVerifyMode},
4 x509::{store::X509StoreBuilder, X509},
5};
6use std::collections::VecDeque;
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::broadcast;
12use tokio::sync::mpsc;
13use tokio::sync::oneshot;
14use tokio::sync::Mutex;
15use tokio::time::{interval, sleep, timeout};
16use tokio::{
17 net::{TcpListener, TcpStream},
18 task::JoinHandle,
19};
20use tokio_openssl::SslStream;
21use tokio_util::codec::{Framed, FramedRead, FramedWrite};
22use tracing::{error, Instrument};
23use url::Url;
24use uuid::Uuid;
25
26use futures_util::sink::SinkExt;
27use futures_util::stream::StreamExt;
28
29use kanidmd_lib::prelude::duration_from_epoch_now;
30use kanidmd_lib::prelude::IdmServer;
31use kanidmd_lib::repl::proto::ConsumerState;
32use kanidmd_lib::server::QueryServerTransaction;
33
34use crate::CoreAction;
35use config::{RepNodeConfig, ReplicationConfiguration};
36
37use self::codec::{ConsumerRequest, SupplierResponse};
38
39mod codec;
40pub(crate) mod config;
41
42pub(crate) enum ReplCtrl {
43 GetCertificate {
44 respond: oneshot::Sender<X509>,
45 },
46 RenewCertificate {
47 respond: oneshot::Sender<bool>,
48 },
49 RefreshConsumer {
50 respond: oneshot::Sender<mpsc::Receiver<()>>,
51 },
52}
53
54#[derive(Debug, Clone)]
55enum ReplConsumerCtrl {
56 Stop,
57 Refresh(Arc<Mutex<(bool, mpsc::Sender<()>)>>),
58}
59
60pub(crate) async fn create_repl_server(
61 idms: Arc<IdmServer>,
62 repl_config: &ReplicationConfiguration,
63 rx: broadcast::Receiver<CoreAction>,
64) -> Result<(tokio::task::JoinHandle<()>, mpsc::Sender<ReplCtrl>), ()> {
65 let listener = TcpListener::bind(&repl_config.bindaddress)
67 .await
68 .map_err(|e| {
69 error!(
70 "Could not bind to replication address {} -> {:?}",
71 repl_config.bindaddress, e
72 );
73 })?;
74
75 let (ctrl_tx, ctrl_rx) = mpsc::channel(4);
77
78 info!(
80 "Starting replication interface https://{} ...",
81 repl_config.bindaddress
82 );
83 let repl_handle: JoinHandle<()> = tokio::spawn(repl_acceptor(
84 listener,
85 idms,
86 repl_config.clone(),
87 rx,
88 ctrl_rx,
89 ));
90
91 info!("Created replication interface");
92 Ok((repl_handle, ctrl_tx))
93}
94
95#[instrument(level = "debug", skip_all)]
96async fn repl_consumer_connect_supplier(
98 domain: &str,
99 sock_addrs: &[SocketAddr],
100 tls_connector: &SslConnector,
101 consumer_conn_settings: &ConsumerConnSettings,
102) -> Option<(
103 SocketAddr,
104 Framed<SslStream<TcpStream>, codec::ConsumerCodec>,
105)> {
106 for sock_addr in sock_addrs {
108 debug!(
109 "Attempting to connect to {} replica via {}",
110 domain, sock_addr
111 );
112
113 let tcpstream = match timeout(
114 consumer_conn_settings.replica_connect_timeout,
115 TcpStream::connect(sock_addr),
116 )
117 .await
118 {
119 Ok(Ok(tc)) => {
120 trace!("Connection established to peer on {:?}", sock_addr);
121 tc
122 }
123 Ok(Err(err)) => {
124 debug!(?err, "Failed to connect to {}", sock_addr);
125 continue;
126 }
127 Err(_) => {
128 debug!("Timeout connecting to {}", sock_addr);
129 continue;
130 }
131 };
132
133 let mut tlsstream = match Ssl::new(tls_connector.context())
134 .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
135 {
136 Ok(ta) => ta,
137 Err(e) => {
138 error!("Replication client TLS setup error, continuing -> {:?}", e);
139 continue;
140 }
141 };
142
143 if let Err(e) = SslStream::connect(Pin::new(&mut tlsstream)).await {
144 error!("Replication client TLS accept error, continuing -> {:?}", e);
145 continue;
146 };
147
148 let supplier_conn = Framed::new(
149 tlsstream,
150 codec::ConsumerCodec::new(consumer_conn_settings.max_frame_bytes),
151 );
152 return Some((sock_addr.to_owned(), supplier_conn));
154 }
155
156 error!(
157 "Unable to connect to supplier, tried to connect to {:?}",
158 sock_addrs
159 );
160 None
161}
162
163#[instrument(level="info", skip(refresh_coord, tls_connector, idms), fields(uuid=Uuid::new_v4().to_string()))]
165async fn repl_run_consumer_refresh(
166 refresh_coord: Arc<Mutex<(bool, mpsc::Sender<()>)>>,
167 domain: &str,
168 sock_addrs: &[SocketAddr],
169 tls_connector: &SslConnector,
170 idms: &IdmServer,
171 consumer_conn_settings: &ConsumerConnSettings,
172) -> Result<Option<SocketAddr>, ()> {
173 let mut refresh_coord_guard = refresh_coord.lock().await;
177
178 if refresh_coord_guard.0 {
180 trace!("Refresh already completed by another task, return.");
181 return Ok(None);
182 }
183
184 let (addr, mut supplier_conn) =
186 repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
187 .await
188 .ok_or(())?;
189
190 supplier_conn
193 .send(ConsumerRequest::Refresh)
194 .await
195 .map_err(|err| error!(?err, "consumer encode error, unable to continue."))?;
196
197 let refresh = if let Some(codec_msg) = supplier_conn.next().await {
198 match codec_msg.map_err(|err| error!(?err, "Consumer decode error, unable to continue."))? {
199 SupplierResponse::Refresh(changes) => {
200 changes
202 }
203 SupplierResponse::Pong | SupplierResponse::Incremental(_) => {
204 error!("Supplier Response contains invalid State");
205 return Err(());
206 }
207 }
208 } else {
209 error!("Connection closed");
210 return Err(());
211 };
212
213 {
215 let ct = duration_from_epoch_now();
217 idms.proxy_write(ct)
218 .await
219 .and_then(|mut write_txn| {
220 write_txn
221 .qs_write
222 .consumer_apply_refresh(refresh)
223 .and_then(|cs| write_txn.commit().map(|()| cs))
224 })
225 .map_err(|err| error!(?err, "Consumer was not able to apply refresh."))?;
226 }
227
228 refresh_coord_guard.0 = true;
230 if refresh_coord_guard.1.send(()).await.is_err() {
231 warn!("Unable to signal to caller that refresh has completed.");
232 }
233
234 info!("Replication refresh was successful.");
237 Ok(Some(addr))
238}
239
240#[instrument(level="debug", skip(tls_connector, idms), fields(eventid=Uuid::new_v4().to_string()))]
241async fn repl_run_consumer(
242 domain: &str,
243 sock_addrs: &[SocketAddr],
244 tls_connector: &SslConnector,
245 automatic_refresh: bool,
246 idms: &IdmServer,
247 consumer_conn_settings: &ConsumerConnSettings,
248) -> Option<SocketAddr> {
249 let (socket_addr, mut supplier_conn) =
250 repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
251 .await?;
252
253 let consumer_ruv_range = {
255 let consumer_state = idms
256 .proxy_read()
257 .await
258 .and_then(|mut read_txn| read_txn.qs_read.consumer_get_state());
259 match consumer_state {
260 Ok(ruv_range) => ruv_range,
261 Err(err) => {
262 error!(
263 ?err,
264 "consumer ruv range could not be accessed, unable to continue."
265 );
266 return None;
267 }
268 }
269 };
270
271 if let Err(err) = supplier_conn
272 .send(ConsumerRequest::Incremental(consumer_ruv_range))
273 .await
274 {
275 error!(?err, "consumer encode error, unable to continue.");
276 return None;
277 }
278
279 let changes = if let Some(codec_msg) = supplier_conn.next().await {
280 match codec_msg {
281 Ok(SupplierResponse::Incremental(changes)) => {
282 changes
284 }
285 Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Refresh(_)) => {
286 error!("Supplier Response contains invalid state");
287 return None;
288 }
289 Err(err) => {
290 error!(?err, "Consumer decode error, unable to continue.");
291 return None;
292 }
293 }
294 } else {
295 error!("Connection closed");
296 return None;
297 };
298
299 let consumer_state = {
301 let ct = duration_from_epoch_now();
302 match idms.proxy_write(ct).await.and_then(|mut write_txn| {
303 write_txn
304 .qs_write
305 .consumer_apply_changes(changes)
306 .and_then(|cs| write_txn.commit().map(|()| cs))
307 }) {
308 Ok(state) => state,
309 Err(err) => {
310 error!(?err, "Consumer was not able to apply changes.");
311 return None;
312 }
313 }
314 };
315
316 match consumer_state {
317 ConsumerState::Ok => {
318 info!("Incremental Replication Success");
319 return Some(socket_addr);
321 }
322 ConsumerState::RefreshRequired => {
323 if automatic_refresh {
324 warn!("Consumer is out of date and must be refreshed. This will happen *now*.");
325 } else {
326 error!("Consumer is out of date and must be refreshed. You must manually resolve this situation.");
327 return None;
328 };
329 }
330 }
331
332 if let Err(err) = supplier_conn.send(ConsumerRequest::Refresh).await {
333 error!(?err, "consumer encode error, unable to continue.");
334 return None;
335 }
336
337 let refresh = if let Some(codec_msg) = supplier_conn.next().await {
338 match codec_msg {
339 Ok(SupplierResponse::Refresh(changes)) => {
340 changes
342 }
343 Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Incremental(_)) => {
344 error!("Supplier Response contains invalid State");
345 return None;
346 }
347 Err(err) => {
348 error!(?err, "consumer decode error, unable to continue.");
349 return None;
350 }
351 }
352 } else {
353 error!("Connection closed");
354 return None;
355 };
356
357 let ct = duration_from_epoch_now();
359 if let Err(err) = idms.proxy_write(ct).await.and_then(|mut write_txn| {
360 write_txn
361 .qs_write
362 .consumer_apply_refresh(refresh)
363 .and_then(|cs| write_txn.commit().map(|()| cs))
364 }) {
365 error!(?err, "consumer was not able to apply refresh.");
366 return None;
367 }
368
369 info!("Replication refresh was successful.");
370 Some(socket_addr)
371}
372
373#[derive(Debug, Clone)]
374struct ConsumerConnSettings {
375 max_frame_bytes: usize,
376 task_poll_interval: Duration,
377 replica_connect_timeout: Duration,
378}
379
380#[allow(clippy::too_many_arguments)]
381async fn repl_task(
382 origin: Url,
383 client_key: PKey<Private>,
384 client_cert: X509,
385 supplier_cert: X509,
386 consumer_conn_settings: ConsumerConnSettings,
387 mut task_rx: broadcast::Receiver<ReplConsumerCtrl>,
388 automatic_refresh: bool,
389 idms: Arc<IdmServer>,
390) {
391 if origin.scheme() != "repl" {
392 error!("Replica origin is not repl:// - refusing to proceed.");
393 return;
394 }
395
396 let domain = match origin.domain() {
397 Some(d) => d,
398 None => {
399 error!("Replica origin does not have a valid domain name, unable to proceed. Perhaps you tried to use an ip address?");
400 return;
401 }
402 };
403
404 let mut ssl_builder = match SslConnector::builder(SslMethod::tls_client()) {
406 Ok(sb) => sb,
407 Err(err) => {
408 error!(?err, "Unable to configure tls connector");
409 return;
410 }
411 };
412
413 let setup_client_cert = ssl_builder
414 .set_certificate(&client_cert)
415 .and_then(|_| ssl_builder.set_private_key(&client_key))
416 .and_then(|_| ssl_builder.check_private_key());
417 if let Err(err) = setup_client_cert {
418 error!(?err, "Unable to configure client certificate/key");
419 return;
420 }
421
422 let mut cert_store = match X509StoreBuilder::new() {
428 Ok(csb) => csb,
429 Err(err) => {
430 error!(?err, "Unable to configure certificate store builder.");
431 return;
432 }
433 };
434
435 if let Err(err) = cert_store.add_cert(supplier_cert) {
436 error!(?err, "Unable to add supplier certificate to cert store");
437 return;
438 }
439
440 let cert_store = cert_store.build();
441 ssl_builder.set_cert_store(cert_store);
442
443 let verify_param = ssl_builder.verify_param_mut();
445 if let Err(err) = verify_param.set_host(domain) {
446 error!(?err, "Unable to set domain name for tls peer verification");
447 return;
448 }
449
450 ssl_builder.set_verify(SslVerifyMode::PEER);
452 let tls_connector = ssl_builder.build();
453
454 let mut repl_interval = interval(consumer_conn_settings.task_poll_interval);
455
456 info!("Replica task for {} has started.", origin);
457
458 let mut last_working_address: Option<SocketAddr> = None;
460
461 loop {
463 let mut sorted_socket_addrs = vec![];
470
471 if let Some(addr) = last_working_address {
473 debug!(?last_working_address);
474 sorted_socket_addrs.push(addr);
475 };
476
477 match origin.socket_addrs(|| Some(443)) {
479 Ok(mut socket_addrs) => {
480 socket_addrs.sort_unstable();
482 socket_addrs.dedup();
483
484 socket_addrs.into_iter().for_each(|addr| {
487 if Some(&addr) != last_working_address.as_ref() {
488 sorted_socket_addrs.push(addr);
490 }
491 });
492 }
493 Err(err) => {
494 if let Some(addr) = last_working_address {
495 warn!(
496 ?err,
497 "Unable to resolve '{origin}' to ip:port, using last known working address '{addr}'"
498 );
499 } else {
500 warn!(?err, "Unable to resolve '{origin}' to ip:port.");
501 }
502 }
503 };
504
505 if sorted_socket_addrs.is_empty() {
506 warn!(
507 "No replication addresses available, delaying replication operation for '{origin}'"
508 );
509 repl_interval.tick().await;
510 continue;
511 }
512
513 tokio::select! {
514 Ok(task) = task_rx.recv() => {
515 match task {
516 ReplConsumerCtrl::Stop => break,
517 ReplConsumerCtrl::Refresh ( refresh_coord ) => {
518 last_working_address = (repl_run_consumer_refresh(
519 refresh_coord,
520 domain,
521 &sorted_socket_addrs,
522 &tls_connector,
523 &idms,
524 &consumer_conn_settings
525 )
526 .await).unwrap_or_default();
527 }
528 }
529 }
530 _ = repl_interval.tick() => {
531 repl_run_consumer(
533 domain,
534 &sorted_socket_addrs,
535 &tls_connector,
536 automatic_refresh,
537 &idms,
538 &consumer_conn_settings
539 )
540 .await;
541 }
542 }
543 }
544
545 info!("Replica task for {} has stopped.", origin);
546}
547
548#[instrument(level = "debug", skip_all)]
549async fn handle_repl_conn(
550 max_frame_bytes: usize,
551 tcpstream: TcpStream,
552 client_address: SocketAddr,
553 tls_parms: SslAcceptor,
554 idms: Arc<IdmServer>,
555) {
556 debug!(?client_address, "replication client connected 🛫");
557
558 let mut tlsstream = match Ssl::new(tls_parms.context())
559 .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
560 {
561 Ok(ta) => ta,
562 Err(err) => {
563 error!(?err, "Replication TLS setup error, disconnecting client");
564 return;
565 }
566 };
567 if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
568 error!(?err, "Replication TLS accept error, disconnecting client");
569 return;
570 };
571 let (r, w) = tokio::io::split(tlsstream);
572 let mut r = FramedRead::new(r, codec::SupplierCodec::new(max_frame_bytes));
573 let mut w = FramedWrite::new(w, codec::SupplierCodec::new(max_frame_bytes));
574
575 while let Some(codec_msg) = r.next().await {
576 match codec_msg {
577 Ok(ConsumerRequest::Ping) => {
578 debug!("consumer requested ping");
579 if let Err(err) = w.send(SupplierResponse::Pong).await {
580 error!(?err, "supplier encode error, unable to continue.");
581 break;
582 }
583 }
584 Ok(ConsumerRequest::Incremental(consumer_ruv_range)) => {
585 let changes = match idms.proxy_read().await.and_then(|mut read_txn| {
586 read_txn
587 .qs_read
588 .supplier_provide_changes(consumer_ruv_range)
589 }) {
590 Ok(changes) => changes,
591 Err(err) => {
592 error!(?err, "supplier provide changes failed.");
593 break;
594 }
595 };
596
597 if let Err(err) = w.send(SupplierResponse::Incremental(changes)).await {
598 error!(?err, "supplier encode error, unable to continue.");
599 break;
600 }
601 }
602 Ok(ConsumerRequest::Refresh) => {
603 let changes = match idms
604 .proxy_read()
605 .await
606 .and_then(|mut read_txn| read_txn.qs_read.supplier_provide_refresh())
607 {
608 Ok(changes) => changes,
609 Err(err) => {
610 error!(?err, "supplier provide refresh failed.");
611 break;
612 }
613 };
614
615 if let Err(err) = w.send(SupplierResponse::Refresh(changes)).await {
616 error!(?err, "supplier encode error, unable to continue.");
617 break;
618 }
619 }
620 Err(err) => {
621 error!(?err, "supplier decode error, unable to continue.");
622 break;
623 }
624 }
625 }
626
627 debug!(?client_address, "replication client disconnected 🛬");
628}
629
630async fn repl_acceptor(
632 listener: TcpListener,
633 idms: Arc<IdmServer>,
634 repl_config: ReplicationConfiguration,
635 mut rx: broadcast::Receiver<CoreAction>,
636 mut ctrl_rx: mpsc::Receiver<ReplCtrl>,
637) {
638 info!("Starting Replication Acceptor ...");
639 let replica_connect_timeout = Duration::from_secs(2);
642 let retry_timeout = Duration::from_secs(60);
643 let max_frame_bytes = 268435456;
644
645 let consumer_conn_settings = ConsumerConnSettings {
646 max_frame_bytes,
647 task_poll_interval: repl_config.get_task_poll_interval(),
648 replica_connect_timeout,
649 };
650
651 let (task_tx, task_rx1) = broadcast::channel(1);
653 drop(task_rx1);
656 let mut task_handles = VecDeque::new();
657
658 let replication_node_map = repl_config.manual.clone();
665 let domain_name = match repl_config.origin.domain() {
666 Some(n) => n.to_string(),
667 None => {
668 error!("Unable to start replication, replication origin does not contain a valid domain name.");
669 return;
670 }
671 };
672
673 'event: loop {
676 info!("Starting replication reload ...");
677 info!("Stopping {} Replication Tasks ...", task_handles.len());
681 debug_assert!(task_handles.len() >= task_tx.receiver_count());
682 let _ = task_tx.send(ReplConsumerCtrl::Stop);
683 for task_handle in task_handles.drain(..) {
684 let res: Result<(), _> = task_handle.await;
686 if res.is_err() {
687 warn!("Failed to join replication task, continuing ...");
688 }
689 }
690
691 let res = {
696 let ct = duration_from_epoch_now();
697 idms.proxy_write(ct).await.and_then(|mut idms_prox_write| {
698 idms_prox_write
699 .qs_write
700 .supplier_get_key_cert(&domain_name)
701 .and_then(|res| idms_prox_write.commit().map(|()| res))
702 })
703 };
704
705 let (server_key, server_cert) = match res {
706 Ok(r) => r,
707 Err(err) => {
708 error!(?err, "CRITICAL: Unable to access supplier certificate/key.");
709 sleep(retry_timeout).await;
710 continue;
711 }
712 };
713
714 info!(
715 replication_cert_not_before = ?server_cert.not_before(),
716 replication_cert_not_after = ?server_cert.not_after(),
717 );
718
719 let mut client_certs = Vec::new();
720
721 for (origin, node) in replication_node_map.iter() {
724 match node {
726 RepNodeConfig::MutualPull {
727 partner_cert: consumer_cert,
728 automatic_refresh: _,
729 }
730 | RepNodeConfig::AllowPull { consumer_cert } => {
731 client_certs.push(consumer_cert.clone())
732 }
733 RepNodeConfig::Pull {
734 supplier_cert: _,
735 automatic_refresh: _,
736 } => {}
737 };
738
739 match node {
740 RepNodeConfig::MutualPull {
741 partner_cert: supplier_cert,
742 automatic_refresh,
743 }
744 | RepNodeConfig::Pull {
745 supplier_cert,
746 automatic_refresh,
747 } => {
748 let task_rx = task_tx.subscribe();
749
750 let handle: JoinHandle<()> = tokio::spawn(repl_task(
751 origin.clone(),
752 server_key.clone(),
753 server_cert.clone(),
754 supplier_cert.clone(),
755 consumer_conn_settings.clone(),
756 task_rx,
757 *automatic_refresh,
758 idms.clone(),
759 ));
760
761 task_handles.push_back(handle);
762 debug_assert_eq!(task_handles.len(), task_tx.receiver_count());
763 }
764 RepNodeConfig::AllowPull { consumer_cert: _ } => {}
765 };
766 }
767
768 let mut tls_builder = match SslAcceptor::mozilla_modern_v5(SslMethod::tls()) {
774 Ok(tls_builder) => tls_builder,
775 Err(err) => {
776 error!(?err, "CRITICAL, unable to create SslAcceptorBuilder.");
777 sleep(retry_timeout).await;
778 continue;
779 }
780 };
781
782 if let Err(err) = tls_builder
784 .set_certificate(&server_cert)
785 .and_then(|_| tls_builder.set_private_key(&server_key))
786 .and_then(|_| tls_builder.check_private_key())
787 {
788 error!(?err, "CRITICAL, unable to set server_cert and server key.");
789 sleep(retry_timeout).await;
790 continue;
791 };
792
793 let cert_store = tls_builder.cert_store_mut();
796 for client_cert in client_certs.into_iter() {
797 if let Err(err) = cert_store.add_cert(client_cert.clone()) {
798 error!(?err, "CRITICAL, unable to add client certificates.");
799 sleep(retry_timeout).await;
800 continue;
801 }
802 }
803
804 let mut verify = SslVerifyMode::PEER;
809 verify.insert(SslVerifyMode::FAIL_IF_NO_PEER_CERT);
810 tls_builder.set_verify(verify);
811
812 let tls_acceptor = tls_builder.build();
813
814 loop {
815 let eventid = Uuid::new_v4();
819
820 tokio::select! {
821 Ok(action) = rx.recv() => {
822 match action {
823 CoreAction::Shutdown => break 'event,
824 }
825 }
826 Some(ctrl_msg) = ctrl_rx.recv() => {
827 match ctrl_msg {
828 ReplCtrl::GetCertificate {
829 respond
830 } => {
831 let _span = debug_span!("supplier_accept_loop", uuid = ?eventid).entered();
832 if respond.send(server_cert.clone()).is_err() {
833 warn!("Server certificate was requested, but requsetor disconnected");
834 } else {
835 trace!("Sent server certificate via control channel");
836 }
837 }
838 ReplCtrl::RenewCertificate {
839 respond
840 } => {
841 let span = debug_span!("supplier_accept_loop", uuid = ?eventid);
842 async {
843 debug!("renewing replication certificate ...");
844 let res = {
846 let ct = duration_from_epoch_now();
847 idms.proxy_write(ct).await
848 .and_then(|mut idms_prox_write|
849 idms_prox_write
850 .qs_write
851 .supplier_renew_key_cert(&domain_name)
852 .and_then(|res| idms_prox_write.commit().map(|()| res))
853 )
854 };
855
856 let success = res.is_ok();
857
858 if let Err(err) = res {
859 error!(?err, "failed to renew server certificate");
860 }
861
862 if respond.send(success).is_err() {
863 warn!("Server certificate renewal was requested, but requester disconnected!");
864 } else {
865 trace!("Sent server certificate renewal status via control channel");
866 }
867 }
868 .instrument(span)
869 .await;
870
871 continue 'event;
873 }
874 ReplCtrl::RefreshConsumer {
875 respond
876 } => {
877 let (tx, rx) = mpsc::channel(1);
879
880 let refresh_coord = Arc::new(
881 Mutex::new(
882 (
883 false, tx
884 )
885 )
886 );
887
888 if task_tx.send(ReplConsumerCtrl::Refresh(refresh_coord)).is_err() {
889 error!("Unable to begin replication consumer refresh, tasks are unable to be notified.");
890 }
891
892 if respond.send(rx).is_err() {
893 warn!("Replication consumer refresh was requested, but requester disconnected");
894 } else {
895 trace!("Sent refresh comms channel to requester");
896 }
897 }
898 }
899 }
900 accept_result = listener.accept() => {
909 match accept_result {
910 Ok((tcpstream, client_socket_addr)) => {
911 let clone_idms = idms.clone();
912 let clone_tls_acceptor = tls_acceptor.clone();
913 tokio::spawn(
916 handle_repl_conn(max_frame_bytes, tcpstream, client_socket_addr, clone_tls_acceptor, clone_idms)
917 );
918 }
919 Err(e) => {
920 error!("replication acceptor error, continuing -> {:?}", e);
921 }
922 }
923 }
924 } }
927 }
928 info!("Stopping {} Replication Tasks ...", task_handles.len());
930 debug_assert!(task_handles.len() >= task_tx.receiver_count());
931 let _ = task_tx.send(ReplConsumerCtrl::Stop);
932 for task_handle in task_handles.drain(..) {
933 let res: Result<(), _> = task_handle.await.map(|_| ());
935 if res.is_err() {
936 warn!("Failed to join replication task, continuing ...");
937 }
938 }
939
940 info!("Stopped {}", super::TaskName::Replication);
941}