1use openssl::{
2 pkey::{PKey, Private},
3 ssl::{Ssl, SslAcceptor, SslConnector, SslMethod, SslVerifyMode},
4 x509::{store::X509StoreBuilder, X509},
5};
6use std::collections::VecDeque;
7use std::net::SocketAddr;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::broadcast;
12use tokio::sync::mpsc;
13use tokio::sync::oneshot;
14use tokio::sync::Mutex;
15use tokio::time::{interval, sleep, timeout};
16use tokio::{
17 net::{TcpListener, TcpStream},
18 task::JoinHandle,
19};
20use tokio_openssl::SslStream;
21use tokio_util::codec::{Framed, FramedRead, FramedWrite};
22use tracing::{error, Instrument};
23use url::Url;
24use uuid::Uuid;
25
26use futures_util::sink::SinkExt;
27use futures_util::stream::StreamExt;
28
29use kanidmd_lib::prelude::duration_from_epoch_now;
30use kanidmd_lib::prelude::IdmServer;
31use kanidmd_lib::repl::proto::ConsumerState;
32use kanidmd_lib::server::QueryServerTransaction;
33
34use crate::CoreAction;
35use config::{RepNodeConfig, ReplicationConfiguration};
36
37use self::codec::{ConsumerRequest, SupplierResponse};
38
39mod codec;
40pub(crate) mod config;
41
42pub(crate) enum ReplCtrl {
43 GetCertificate {
44 respond: oneshot::Sender<X509>,
45 },
46 RenewCertificate {
47 respond: oneshot::Sender<bool>,
48 },
49 RefreshConsumer {
50 respond: oneshot::Sender<mpsc::Receiver<()>>,
51 },
52}
53
54#[derive(Debug, Clone)]
55enum ReplConsumerCtrl {
56 Stop,
57 Refresh(Arc<Mutex<(bool, mpsc::Sender<()>)>>),
58}
59
60pub(crate) async fn create_repl_server(
61 idms: Arc<IdmServer>,
62 repl_config: &ReplicationConfiguration,
63 rx: broadcast::Receiver<CoreAction>,
64) -> Result<(tokio::task::JoinHandle<()>, mpsc::Sender<ReplCtrl>), ()> {
65 let listener = TcpListener::bind(&repl_config.bindaddress)
67 .await
68 .map_err(|e| {
69 error!(
70 "Could not bind to replication address {} -> {:?}",
71 repl_config.bindaddress, e
72 );
73 })?;
74
75 let (ctrl_tx, ctrl_rx) = mpsc::channel(4);
77
78 info!(
80 "Starting replication interface https://{} ...",
81 repl_config.bindaddress
82 );
83 let repl_handle: JoinHandle<()> = tokio::spawn(repl_acceptor(
84 listener,
85 idms,
86 repl_config.clone(),
87 rx,
88 ctrl_rx,
89 ));
90
91 info!("Created replication interface");
92 Ok((repl_handle, ctrl_tx))
93}
94
95#[instrument(level = "info", skip_all)]
96async fn repl_consumer_connect_supplier(
98 domain: &str,
99 sock_addrs: &[SocketAddr],
100 tls_connector: &SslConnector,
101 consumer_conn_settings: &ConsumerConnSettings,
102) -> Option<(
103 SocketAddr,
104 Framed<SslStream<TcpStream>, codec::ConsumerCodec>,
105)> {
106 for sock_addr in sock_addrs {
108 debug!(
109 "Attempting to connect to {} replica via {}",
110 domain, sock_addr
111 );
112
113 let tcpstream = match timeout(
114 consumer_conn_settings.replica_connect_timeout,
115 TcpStream::connect(sock_addr),
116 )
117 .await
118 {
119 Ok(Ok(tc)) => tc,
120 Ok(Err(err)) => {
121 debug!(?err, "Failed to connect to {}", sock_addr);
122 continue;
123 }
124 Err(_) => {
125 debug!("Timeout connecting to {}", sock_addr);
126 continue;
127 }
128 };
129
130 trace!("Connection established to peer on {:?}", sock_addr);
131
132 let mut tlsstream = match Ssl::new(tls_connector.context())
133 .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
134 {
135 Ok(ta) => ta,
136 Err(e) => {
137 error!("Replication client TLS setup error, continuing -> {:?}", e);
138 continue;
139 }
140 };
141
142 if let Err(e) = SslStream::connect(Pin::new(&mut tlsstream)).await {
143 error!("Replication client TLS accept error, continuing -> {:?}", e);
144 continue;
145 };
146
147 let supplier_conn = Framed::new(
148 tlsstream,
149 codec::ConsumerCodec::new(consumer_conn_settings.max_frame_bytes),
150 );
151 return Some((sock_addr.to_owned(), supplier_conn));
153 }
154
155 error!(
156 "Unable to connect to supplier, tried to connect to {:?}",
157 sock_addrs
158 );
159 None
160}
161
162#[instrument(level="info", skip(refresh_coord, tls_connector, idms), fields(uuid=Uuid::new_v4().to_string()))]
164async fn repl_run_consumer_refresh(
165 refresh_coord: Arc<Mutex<(bool, mpsc::Sender<()>)>>,
166 domain: &str,
167 sock_addrs: &[SocketAddr],
168 tls_connector: &SslConnector,
169 idms: &IdmServer,
170 consumer_conn_settings: &ConsumerConnSettings,
171) -> Result<Option<SocketAddr>, ()> {
172 let mut refresh_coord_guard = refresh_coord.lock().await;
176
177 if refresh_coord_guard.0 {
179 trace!("Refresh already completed by another task, return.");
180 return Ok(None);
181 }
182
183 let (addr, mut supplier_conn) =
185 repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
186 .await
187 .ok_or(())?;
188
189 supplier_conn
192 .send(ConsumerRequest::Refresh)
193 .await
194 .map_err(|err| error!(?err, "consumer encode error, unable to continue."))?;
195
196 let refresh = if let Some(codec_msg) = supplier_conn.next().await {
197 match codec_msg.map_err(|err| error!(?err, "Consumer decode error, unable to continue."))? {
198 SupplierResponse::Refresh(changes) => {
199 changes
201 }
202 SupplierResponse::Pong | SupplierResponse::Incremental(_) => {
203 error!("Supplier Response contains invalid State");
204 return Err(());
205 }
206 }
207 } else {
208 error!("Connection closed");
209 return Err(());
210 };
211
212 {
214 let ct = duration_from_epoch_now();
216 idms.proxy_write(ct)
217 .await
218 .and_then(|mut write_txn| {
219 write_txn
220 .qs_write
221 .consumer_apply_refresh(refresh)
222 .and_then(|cs| write_txn.commit().map(|()| cs))
223 })
224 .map_err(|err| error!(?err, "Consumer was not able to apply refresh."))?;
225 }
226
227 refresh_coord_guard.0 = true;
229 if refresh_coord_guard.1.send(()).await.is_err() {
230 warn!("Unable to signal to caller that refresh has completed.");
231 }
232
233 info!("Replication refresh was successful.");
236 Ok(Some(addr))
237}
238
239#[instrument(level="info", skip(tls_connector, idms), fields(eventid=Uuid::new_v4().to_string()))]
240async fn repl_run_consumer(
241 domain: &str,
242 sock_addrs: &[SocketAddr],
243 tls_connector: &SslConnector,
244 automatic_refresh: bool,
245 idms: &IdmServer,
246 consumer_conn_settings: &ConsumerConnSettings,
247) -> Option<SocketAddr> {
248 let (socket_addr, mut supplier_conn) =
249 repl_consumer_connect_supplier(domain, sock_addrs, tls_connector, consumer_conn_settings)
250 .await?;
251
252 let consumer_ruv_range = {
254 let consumer_state = idms
255 .proxy_read()
256 .await
257 .and_then(|mut read_txn| read_txn.qs_read.consumer_get_state());
258 match consumer_state {
259 Ok(ruv_range) => ruv_range,
260 Err(err) => {
261 error!(
262 ?err,
263 "consumer ruv range could not be accessed, unable to continue."
264 );
265 return None;
266 }
267 }
268 };
269
270 if let Err(err) = supplier_conn
271 .send(ConsumerRequest::Incremental(consumer_ruv_range))
272 .await
273 {
274 error!(?err, "consumer encode error, unable to continue.");
275 return None;
276 }
277
278 let changes = if let Some(codec_msg) = supplier_conn.next().await {
279 match codec_msg {
280 Ok(SupplierResponse::Incremental(changes)) => {
281 changes
283 }
284 Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Refresh(_)) => {
285 error!("Supplier Response contains invalid State");
286 return None;
287 }
288 Err(err) => {
289 error!(?err, "consumer decode error, unable to continue.");
290 return None;
291 }
292 }
293 } else {
294 error!("Connection closed");
295 return None;
296 };
297
298 let consumer_state = {
300 let ct = duration_from_epoch_now();
301 match idms.proxy_write(ct).await.and_then(|mut write_txn| {
302 write_txn
303 .qs_write
304 .consumer_apply_changes(changes)
305 .and_then(|cs| write_txn.commit().map(|()| cs))
306 }) {
307 Ok(state) => state,
308 Err(err) => {
309 error!(?err, "consumer was not able to apply changes.");
310 return None;
311 }
312 }
313 };
314
315 match consumer_state {
316 ConsumerState::Ok => {
317 info!("Incremental Replication Success");
318 return Some(socket_addr);
320 }
321 ConsumerState::RefreshRequired => {
322 if automatic_refresh {
323 warn!("Consumer is out of date and must be refreshed. This will happen *now*.");
324 } else {
325 error!("Consumer is out of date and must be refreshed. You must manually resolve this situation.");
326 return None;
327 };
328 }
329 }
330
331 if let Err(err) = supplier_conn.send(ConsumerRequest::Refresh).await {
332 error!(?err, "consumer encode error, unable to continue.");
333 return None;
334 }
335
336 let refresh = if let Some(codec_msg) = supplier_conn.next().await {
337 match codec_msg {
338 Ok(SupplierResponse::Refresh(changes)) => {
339 changes
341 }
342 Ok(SupplierResponse::Pong) | Ok(SupplierResponse::Incremental(_)) => {
343 error!("Supplier Response contains invalid State");
344 return None;
345 }
346 Err(err) => {
347 error!(?err, "consumer decode error, unable to continue.");
348 return None;
349 }
350 }
351 } else {
352 error!("Connection closed");
353 return None;
354 };
355
356 let ct = duration_from_epoch_now();
358 if let Err(err) = idms.proxy_write(ct).await.and_then(|mut write_txn| {
359 write_txn
360 .qs_write
361 .consumer_apply_refresh(refresh)
362 .and_then(|cs| write_txn.commit().map(|()| cs))
363 }) {
364 error!(?err, "consumer was not able to apply refresh.");
365 return None;
366 }
367
368 warn!("Replication refresh was successful.");
369 Some(socket_addr)
370}
371
372#[derive(Debug, Clone)]
373struct ConsumerConnSettings {
374 max_frame_bytes: usize,
375 task_poll_interval: Duration,
376 replica_connect_timeout: Duration,
377}
378
379#[allow(clippy::too_many_arguments)]
380async fn repl_task(
381 origin: Url,
382 client_key: PKey<Private>,
383 client_cert: X509,
384 supplier_cert: X509,
385 consumer_conn_settings: ConsumerConnSettings,
386 mut task_rx: broadcast::Receiver<ReplConsumerCtrl>,
387 automatic_refresh: bool,
388 idms: Arc<IdmServer>,
389) {
390 if origin.scheme() != "repl" {
391 error!("Replica origin is not repl:// - refusing to proceed.");
392 return;
393 }
394
395 let domain = match origin.domain() {
396 Some(d) => d,
397 None => {
398 error!("Replica origin does not have a valid domain name, unable to proceed. Perhaps you tried to use an ip address?");
399 return;
400 }
401 };
402
403 let mut ssl_builder = match SslConnector::builder(SslMethod::tls_client()) {
405 Ok(sb) => sb,
406 Err(err) => {
407 error!(?err, "Unable to configure tls connector");
408 return;
409 }
410 };
411
412 let setup_client_cert = ssl_builder
413 .set_certificate(&client_cert)
414 .and_then(|_| ssl_builder.set_private_key(&client_key))
415 .and_then(|_| ssl_builder.check_private_key());
416 if let Err(err) = setup_client_cert {
417 error!(?err, "Unable to configure client certificate/key");
418 return;
419 }
420
421 let mut cert_store = match X509StoreBuilder::new() {
427 Ok(csb) => csb,
428 Err(err) => {
429 error!(?err, "Unable to configure certificate store builder.");
430 return;
431 }
432 };
433
434 if let Err(err) = cert_store.add_cert(supplier_cert) {
435 error!(?err, "Unable to add supplier certificate to cert store");
436 return;
437 }
438
439 let cert_store = cert_store.build();
440 ssl_builder.set_cert_store(cert_store);
441
442 let verify_param = ssl_builder.verify_param_mut();
444 if let Err(err) = verify_param.set_host(domain) {
445 error!(?err, "Unable to set domain name for tls peer verification");
446 return;
447 }
448
449 ssl_builder.set_verify(SslVerifyMode::PEER);
451 let tls_connector = ssl_builder.build();
452
453 let mut repl_interval = interval(consumer_conn_settings.task_poll_interval);
454
455 info!("Replica task for {} has started.", origin);
456
457 let mut last_working_address: Option<SocketAddr> = None;
459
460 loop {
462 let mut sorted_socket_addrs = vec![];
469
470 if let Some(addr) = last_working_address {
472 debug!(?last_working_address);
473 sorted_socket_addrs.push(addr);
474 };
475
476 match origin.socket_addrs(|| Some(443)) {
478 Ok(mut socket_addrs) => {
479 socket_addrs.sort_unstable();
481 socket_addrs.dedup();
482
483 socket_addrs.into_iter().for_each(|addr| {
486 if Some(&addr) != last_working_address.as_ref() {
487 sorted_socket_addrs.push(addr);
489 }
490 });
491 }
492 Err(err) => {
493 if let Some(addr) = last_working_address {
494 warn!(
495 ?err,
496 "Unable to resolve '{origin}' to ip:port, using last known working address '{addr}'"
497 );
498 } else {
499 warn!(?err, "Unable to resolve '{origin}' to ip:port.");
500 }
501 }
502 };
503
504 if sorted_socket_addrs.is_empty() {
505 warn!(
506 "No replication addresses available, delaying replication operation for '{origin}'"
507 );
508 repl_interval.tick().await;
509 continue;
510 }
511
512 tokio::select! {
513 Ok(task) = task_rx.recv() => {
514 match task {
515 ReplConsumerCtrl::Stop => break,
516 ReplConsumerCtrl::Refresh ( refresh_coord ) => {
517 last_working_address = (repl_run_consumer_refresh(
518 refresh_coord,
519 domain,
520 &sorted_socket_addrs,
521 &tls_connector,
522 &idms,
523 &consumer_conn_settings
524 )
525 .await).unwrap_or_default();
526 }
527 }
528 }
529 _ = repl_interval.tick() => {
530 repl_run_consumer(
532 domain,
533 &sorted_socket_addrs,
534 &tls_connector,
535 automatic_refresh,
536 &idms,
537 &consumer_conn_settings
538 )
539 .await;
540 }
541 }
542 }
543
544 info!("Replica task for {} has stopped.", origin);
545}
546
547#[instrument(level = "info", skip_all)]
548async fn handle_repl_conn(
549 max_frame_bytes: usize,
550 tcpstream: TcpStream,
551 client_address: SocketAddr,
552 tls_parms: SslAcceptor,
553 idms: Arc<IdmServer>,
554) {
555 debug!(?client_address, "replication client connected 🛫");
556
557 let mut tlsstream = match Ssl::new(tls_parms.context())
558 .and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
559 {
560 Ok(ta) => ta,
561 Err(err) => {
562 error!(?err, "Replication TLS setup error, disconnecting client");
563 return;
564 }
565 };
566 if let Err(err) = SslStream::accept(Pin::new(&mut tlsstream)).await {
567 error!(?err, "Replication TLS accept error, disconnecting client");
568 return;
569 };
570 let (r, w) = tokio::io::split(tlsstream);
571 let mut r = FramedRead::new(r, codec::SupplierCodec::new(max_frame_bytes));
572 let mut w = FramedWrite::new(w, codec::SupplierCodec::new(max_frame_bytes));
573
574 while let Some(codec_msg) = r.next().await {
575 match codec_msg {
576 Ok(ConsumerRequest::Ping) => {
577 debug!("consumer requested ping");
578 if let Err(err) = w.send(SupplierResponse::Pong).await {
579 error!(?err, "supplier encode error, unable to continue.");
580 break;
581 }
582 }
583 Ok(ConsumerRequest::Incremental(consumer_ruv_range)) => {
584 let changes = match idms.proxy_read().await.and_then(|mut read_txn| {
585 read_txn
586 .qs_read
587 .supplier_provide_changes(consumer_ruv_range)
588 }) {
589 Ok(changes) => changes,
590 Err(err) => {
591 error!(?err, "supplier provide changes failed.");
592 break;
593 }
594 };
595
596 if let Err(err) = w.send(SupplierResponse::Incremental(changes)).await {
597 error!(?err, "supplier encode error, unable to continue.");
598 break;
599 }
600 }
601 Ok(ConsumerRequest::Refresh) => {
602 let changes = match idms
603 .proxy_read()
604 .await
605 .and_then(|mut read_txn| read_txn.qs_read.supplier_provide_refresh())
606 {
607 Ok(changes) => changes,
608 Err(err) => {
609 error!(?err, "supplier provide refresh failed.");
610 break;
611 }
612 };
613
614 if let Err(err) = w.send(SupplierResponse::Refresh(changes)).await {
615 error!(?err, "supplier encode error, unable to continue.");
616 break;
617 }
618 }
619 Err(err) => {
620 error!(?err, "supplier decode error, unable to continue.");
621 break;
622 }
623 }
624 }
625
626 debug!(?client_address, "replication client disconnected 🛬");
627}
628
629async fn repl_acceptor(
630 listener: TcpListener,
631 idms: Arc<IdmServer>,
632 repl_config: ReplicationConfiguration,
633 mut rx: broadcast::Receiver<CoreAction>,
634 mut ctrl_rx: mpsc::Receiver<ReplCtrl>,
635) {
636 info!("Starting Replication Acceptor ...");
637 let replica_connect_timeout = Duration::from_secs(2);
640 let retry_timeout = Duration::from_secs(60);
641 let max_frame_bytes = 268435456;
642
643 let consumer_conn_settings = ConsumerConnSettings {
644 max_frame_bytes,
645 task_poll_interval: repl_config.get_task_poll_interval(),
646 replica_connect_timeout,
647 };
648
649 let (task_tx, task_rx1) = broadcast::channel(1);
651 drop(task_rx1);
654 let mut task_handles = VecDeque::new();
655
656 let replication_node_map = repl_config.manual.clone();
663 let domain_name = match repl_config.origin.domain() {
664 Some(n) => n.to_string(),
665 None => {
666 error!("Unable to start replication, replication origin does not contain a valid domain name.");
667 return;
668 }
669 };
670
671 'event: loop {
674 info!("Starting replication reload ...");
675 info!("Stopping {} Replication Tasks ...", task_handles.len());
679 debug_assert!(task_handles.len() >= task_tx.receiver_count());
680 let _ = task_tx.send(ReplConsumerCtrl::Stop);
681 for task_handle in task_handles.drain(..) {
682 let res: Result<(), _> = task_handle.await;
684 if res.is_err() {
685 warn!("Failed to join replication task, continuing ...");
686 }
687 }
688
689 let res = {
694 let ct = duration_from_epoch_now();
695 idms.proxy_write(ct).await.and_then(|mut idms_prox_write| {
696 idms_prox_write
697 .qs_write
698 .supplier_get_key_cert(&domain_name)
699 .and_then(|res| idms_prox_write.commit().map(|()| res))
700 })
701 };
702
703 let (server_key, server_cert) = match res {
704 Ok(r) => r,
705 Err(err) => {
706 error!(?err, "CRITICAL: Unable to access supplier certificate/key.");
707 sleep(retry_timeout).await;
708 continue;
709 }
710 };
711
712 info!(
713 replication_cert_not_before = ?server_cert.not_before(),
714 replication_cert_not_after = ?server_cert.not_after(),
715 );
716
717 let mut client_certs = Vec::new();
718
719 for (origin, node) in replication_node_map.iter() {
722 match node {
724 RepNodeConfig::MutualPull {
725 partner_cert: consumer_cert,
726 automatic_refresh: _,
727 }
728 | RepNodeConfig::AllowPull { consumer_cert } => {
729 client_certs.push(consumer_cert.clone())
730 }
731 RepNodeConfig::Pull {
732 supplier_cert: _,
733 automatic_refresh: _,
734 } => {}
735 };
736
737 match node {
738 RepNodeConfig::MutualPull {
739 partner_cert: supplier_cert,
740 automatic_refresh,
741 }
742 | RepNodeConfig::Pull {
743 supplier_cert,
744 automatic_refresh,
745 } => {
746 let task_rx = task_tx.subscribe();
747
748 let handle: JoinHandle<()> = tokio::spawn(repl_task(
749 origin.clone(),
750 server_key.clone(),
751 server_cert.clone(),
752 supplier_cert.clone(),
753 consumer_conn_settings.clone(),
754 task_rx,
755 *automatic_refresh,
756 idms.clone(),
757 ));
758
759 task_handles.push_back(handle);
760 debug_assert_eq!(task_handles.len(), task_tx.receiver_count());
761 }
762 RepNodeConfig::AllowPull { consumer_cert: _ } => {}
763 };
764 }
765
766 let mut tls_builder = match SslAcceptor::mozilla_modern_v5(SslMethod::tls()) {
772 Ok(tls_builder) => tls_builder,
773 Err(err) => {
774 error!(?err, "CRITICAL, unable to create SslAcceptorBuilder.");
775 sleep(retry_timeout).await;
776 continue;
777 }
778 };
779
780 if let Err(err) = tls_builder
782 .set_certificate(&server_cert)
783 .and_then(|_| tls_builder.set_private_key(&server_key))
784 .and_then(|_| tls_builder.check_private_key())
785 {
786 error!(?err, "CRITICAL, unable to set server_cert and server key.");
787 sleep(retry_timeout).await;
788 continue;
789 };
790
791 let cert_store = tls_builder.cert_store_mut();
794 for client_cert in client_certs.into_iter() {
795 if let Err(err) = cert_store.add_cert(client_cert.clone()) {
796 error!(?err, "CRITICAL, unable to add client certificates.");
797 sleep(retry_timeout).await;
798 continue;
799 }
800 }
801
802 let mut verify = SslVerifyMode::PEER;
807 verify.insert(SslVerifyMode::FAIL_IF_NO_PEER_CERT);
808 tls_builder.set_verify(verify);
809
810 let tls_acceptor = tls_builder.build();
811
812 loop {
813 let eventid = Uuid::new_v4();
817
818 tokio::select! {
819 Ok(action) = rx.recv() => {
820 match action {
821 CoreAction::Shutdown => break 'event,
822 }
823 }
824 Some(ctrl_msg) = ctrl_rx.recv() => {
825 match ctrl_msg {
826 ReplCtrl::GetCertificate {
827 respond
828 } => {
829 let _span = debug_span!("supplier_accept_loop", uuid = ?eventid).entered();
830 if respond.send(server_cert.clone()).is_err() {
831 warn!("Server certificate was requested, but requsetor disconnected");
832 } else {
833 trace!("Sent server certificate via control channel");
834 }
835 }
836 ReplCtrl::RenewCertificate {
837 respond
838 } => {
839 let span = debug_span!("supplier_accept_loop", uuid = ?eventid);
840 async {
841 debug!("renewing replication certificate ...");
842 let res = {
844 let ct = duration_from_epoch_now();
845 idms.proxy_write(ct).await
846 .and_then(|mut idms_prox_write|
847 idms_prox_write
848 .qs_write
849 .supplier_renew_key_cert(&domain_name)
850 .and_then(|res| idms_prox_write.commit().map(|()| res))
851 )
852 };
853
854 let success = res.is_ok();
855
856 if let Err(err) = res {
857 error!(?err, "failed to renew server certificate");
858 }
859
860 if respond.send(success).is_err() {
861 warn!("Server certificate renewal was requested, but requester disconnected!");
862 } else {
863 trace!("Sent server certificate renewal status via control channel");
864 }
865 }
866 .instrument(span)
867 .await;
868
869 continue 'event;
871 }
872 ReplCtrl::RefreshConsumer {
873 respond
874 } => {
875 let (tx, rx) = mpsc::channel(1);
877
878 let refresh_coord = Arc::new(
879 Mutex::new(
880 (
881 false, tx
882 )
883 )
884 );
885
886 if task_tx.send(ReplConsumerCtrl::Refresh(refresh_coord)).is_err() {
887 error!("Unable to begin replication consumer refresh, tasks are unable to be notified.");
888 }
889
890 if respond.send(rx).is_err() {
891 warn!("Replication consumer refresh was requested, but requester disconnected");
892 } else {
893 trace!("Sent refresh comms channel to requester");
894 }
895 }
896 }
897 }
898 accept_result = listener.accept() => {
907 match accept_result {
908 Ok((tcpstream, client_socket_addr)) => {
909 let clone_idms = idms.clone();
910 let clone_tls_acceptor = tls_acceptor.clone();
911 tokio::spawn(
914 handle_repl_conn(max_frame_bytes, tcpstream, client_socket_addr, clone_tls_acceptor, clone_idms)
915 );
916 }
917 Err(e) => {
918 error!("replication acceptor error, continuing -> {:?}", e);
919 }
920 }
921 }
922 } }
925 }
926 info!("Stopping {} Replication Tasks ...", task_handles.len());
928 debug_assert!(task_handles.len() >= task_tx.receiver_count());
929 let _ = task_tx.send(ReplConsumerCtrl::Stop);
930 for task_handle in task_handles.drain(..) {
931 let res: Result<(), _> = task_handle.await.map(|_| ());
933 if res.is_err() {
934 warn!("Failed to join replication task, continuing ...");
935 }
936 }
937
938 info!("Stopped {}", super::TaskName::Replication);
939}