kanidmd_core/
interval.rs

1//! This contains scheduled tasks/interval tasks that are run inside of the server on a schedule
2//! as background operations.
3
4use std::fs;
5use std::path::Path;
6use std::str::FromStr;
7
8use chrono::Utc;
9use cron::Schedule;
10
11use tokio::sync::broadcast;
12use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
13
14use crate::config::OnlineBackup;
15use crate::CoreAction;
16
17use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
18use kanidmd_lib::constants::PURGE_FREQUENCY;
19use kanidmd_lib::event::{OnlineBackupEvent, PurgeRecycledEvent, PurgeTombstoneEvent};
20
21pub(crate) struct IntervalActor;
22
23impl IntervalActor {
24    pub fn start(
25        server: &'static QueryServerWriteV1,
26        mut rx: broadcast::Receiver<CoreAction>,
27    ) -> tokio::task::JoinHandle<()> {
28        tokio::spawn(async move {
29            let mut inter = interval(Duration::from_secs(PURGE_FREQUENCY));
30            inter.set_missed_tick_behavior(MissedTickBehavior::Skip);
31
32            loop {
33                server
34                    .handle_purgetombstoneevent(PurgeTombstoneEvent::new())
35                    .await;
36                server
37                    .handle_purgerecycledevent(PurgeRecycledEvent::new())
38                    .await;
39
40                tokio::select! {
41                    Ok(action) = rx.recv() => {
42                        match action {
43                            CoreAction::Shutdown => break,
44                        }
45                    }
46                    _ = inter.tick() => {
47                        // Next iter.
48                        continue
49                    }
50                }
51            }
52
53            info!("Stopped {}", super::TaskName::IntervalActor);
54        })
55    }
56
57    // Allow this because result is the only way to map and ? to bubble up, but we aren't
58    // returning an op-error here because this is in early start up.
59    #[allow(clippy::result_unit_err)]
60    pub fn start_online_backup(
61        server: &'static QueryServerReadV1,
62        online_backup_config: &OnlineBackup,
63        mut rx: broadcast::Receiver<CoreAction>,
64    ) -> Result<tokio::task::JoinHandle<()>, ()> {
65        let outpath = match online_backup_config.path.to_owned() {
66            Some(val) => val,
67            None => {
68                error!("Online backup output path is not set.");
69                return Err(());
70            }
71        };
72        let versions = online_backup_config.versions;
73        let crono_expr = online_backup_config.schedule.as_str().to_string();
74        let mut crono_expr_values = crono_expr.split_ascii_whitespace().collect::<Vec<&str>>();
75        let chrono_expr_uses_standard_syntax = crono_expr_values.len() == 5;
76        if chrono_expr_uses_standard_syntax {
77            // we add a 0 element at the beginning to simulate the standard crono syntax which always runs
78            // commands at seconds 00
79            crono_expr_values.insert(0, "0");
80            crono_expr_values.push("*");
81        }
82        let crono_expr_schedule = crono_expr_values.join(" ");
83        if chrono_expr_uses_standard_syntax {
84            info!(
85                "Provided online backup schedule is: {}, now being transformed to: {}",
86                crono_expr, crono_expr_schedule
87            );
88        }
89        // Cron expression handling
90        let cron_expr = Schedule::from_str(crono_expr_schedule.as_str()).map_err(|e| {
91            error!("Online backup schedule parse error: {}", e);
92            error!("valid formats are:");
93            error!("sec  min   hour   day of month   month   day of week   year");
94            error!("min   hour   day of month   month   day of week");
95            error!("@hourly | @daily | @weekly");
96        })?;
97
98        info!("Online backup schedule parsed as: {}", cron_expr);
99
100        if cron_expr.upcoming(Utc).next().is_none() {
101            error!(
102                "Online backup schedule error: '{}' will not match any date.",
103                cron_expr
104            );
105            return Err(());
106        }
107
108        // Output path handling
109        let op = Path::new(&outpath);
110
111        // does the path exist and is a directory?
112        if !op.exists() {
113            info!(
114                "Online backup output folder '{}' does not exist, trying to create it.",
115                outpath.display()
116            );
117            fs::create_dir_all(&outpath).map_err(|e| {
118                error!(
119                    "Online backup failed to create output directory '{}': {}",
120                    outpath.display(),
121                    e
122                )
123            })?;
124        }
125
126        if !op.is_dir() {
127            error!("Online backup output '{}' is not a directory or we are missing permissions to access it.", outpath.display());
128            return Err(());
129        }
130
131        let handle = tokio::spawn(async move {
132            for next_time in cron_expr.upcoming(Utc) {
133                // We add 1 second to the `wait_time` in order to get "even" timestampes
134                // for example: 1 + 17:05:59Z --> 17:06:00Z
135                let wait_seconds = 1 + (next_time - Utc::now()).num_seconds() as u64;
136                info!(
137                    "Online backup next run on {}, wait_time = {}s",
138                    next_time, wait_seconds
139                );
140
141                tokio::select! {
142                    Ok(action) = rx.recv() => {
143                        match action {
144                            CoreAction::Shutdown => break,
145                        }
146                    }
147                    _ = sleep(Duration::from_secs(wait_seconds)) => {
148                        if let Err(e) = server
149                            .handle_online_backup(
150                                OnlineBackupEvent::new(),
151                                &outpath,
152                                versions,
153                            )
154                            .await
155                        {
156                            error!(?e, "An online backup error occurred.");
157                        }
158                    }
159                }
160            }
161            info!("Stopped {}", super::TaskName::BackupActor);
162        });
163
164        Ok(handle)
165    }
166}