kanidmd_core/
interval.rs
1use std::fs;
5use std::path::Path;
6use std::str::FromStr;
7
8use chrono::Utc;
9use cron::Schedule;
10
11use tokio::sync::broadcast;
12use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
13
14use crate::config::OnlineBackup;
15use crate::CoreAction;
16
17use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
18use kanidmd_lib::constants::PURGE_FREQUENCY;
19use kanidmd_lib::event::{OnlineBackupEvent, PurgeRecycledEvent, PurgeTombstoneEvent};
20
21pub(crate) struct IntervalActor;
22
23impl IntervalActor {
24 pub fn start(
25 server: &'static QueryServerWriteV1,
26 mut rx: broadcast::Receiver<CoreAction>,
27 ) -> tokio::task::JoinHandle<()> {
28 tokio::spawn(async move {
29 let mut inter = interval(Duration::from_secs(PURGE_FREQUENCY));
30 inter.set_missed_tick_behavior(MissedTickBehavior::Skip);
31
32 loop {
33 server
34 .handle_purgetombstoneevent(PurgeTombstoneEvent::new())
35 .await;
36 server
37 .handle_purgerecycledevent(PurgeRecycledEvent::new())
38 .await;
39
40 tokio::select! {
41 Ok(action) = rx.recv() => {
42 match action {
43 CoreAction::Shutdown => break,
44 }
45 }
46 _ = inter.tick() => {
47 continue
49 }
50 }
51 }
52
53 info!("Stopped {}", super::TaskName::IntervalActor);
54 })
55 }
56
57 #[allow(clippy::result_unit_err)]
60 pub fn start_online_backup(
61 server: &'static QueryServerReadV1,
62 online_backup_config: &OnlineBackup,
63 mut rx: broadcast::Receiver<CoreAction>,
64 ) -> Result<tokio::task::JoinHandle<()>, ()> {
65 let outpath = match online_backup_config.path.to_owned() {
66 Some(val) => val,
67 None => {
68 error!("Online backup output path is not set.");
69 return Err(());
70 }
71 };
72 let versions = online_backup_config.versions;
73 let crono_expr = online_backup_config.schedule.as_str().to_string();
74 let mut crono_expr_values = crono_expr.split_ascii_whitespace().collect::<Vec<&str>>();
75 let chrono_expr_uses_standard_syntax = crono_expr_values.len() == 5;
76 if chrono_expr_uses_standard_syntax {
77 crono_expr_values.insert(0, "0");
80 crono_expr_values.push("*");
81 }
82 let crono_expr_schedule = crono_expr_values.join(" ");
83 if chrono_expr_uses_standard_syntax {
84 info!(
85 "Provided online backup schedule is: {}, now being transformed to: {}",
86 crono_expr, crono_expr_schedule
87 );
88 }
89 let cron_expr = Schedule::from_str(crono_expr_schedule.as_str()).map_err(|e| {
91 error!("Online backup schedule parse error: {}", e);
92 error!("valid formats are:");
93 error!("sec min hour day of month month day of week year");
94 error!("min hour day of month month day of week");
95 error!("@hourly | @daily | @weekly");
96 })?;
97
98 info!("Online backup schedule parsed as: {}", cron_expr);
99
100 if cron_expr.upcoming(Utc).next().is_none() {
101 error!(
102 "Online backup schedule error: '{}' will not match any date.",
103 cron_expr
104 );
105 return Err(());
106 }
107
108 let op = Path::new(&outpath);
110
111 if !op.exists() {
113 info!(
114 "Online backup output folder '{}' does not exist, trying to create it.",
115 outpath.display()
116 );
117 fs::create_dir_all(&outpath).map_err(|e| {
118 error!(
119 "Online backup failed to create output directory '{}': {}",
120 outpath.display(),
121 e
122 )
123 })?;
124 }
125
126 if !op.is_dir() {
127 error!("Online backup output '{}' is not a directory or we are missing permissions to access it.", outpath.display());
128 return Err(());
129 }
130
131 let handle = tokio::spawn(async move {
132 for next_time in cron_expr.upcoming(Utc) {
133 let wait_seconds = 1 + (next_time - Utc::now()).num_seconds() as u64;
136 info!(
137 "Online backup next run on {}, wait_time = {}s",
138 next_time, wait_seconds
139 );
140
141 tokio::select! {
142 Ok(action) = rx.recv() => {
143 match action {
144 CoreAction::Shutdown => break,
145 }
146 }
147 _ = sleep(Duration::from_secs(wait_seconds)) => {
148 if let Err(e) = server
149 .handle_online_backup(
150 OnlineBackupEvent::new(),
151 &outpath,
152 versions,
153 )
154 .await
155 {
156 error!(?e, "An online backup error occurred.");
157 }
158 }
159 }
160 }
161 info!("Stopped {}", super::TaskName::BackupActor);
162 });
163
164 Ok(handle)
165 }
166}