kanidmd_core/
interval.rs

1//! This contains scheduled tasks/interval tasks that are run inside of the server on a schedule
2//! as background operations.
3
4use std::fs;
5use std::path::Path;
6use std::str::FromStr;
7
8use chrono::Utc;
9use cron::Schedule;
10
11use tokio::sync::broadcast;
12use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
13
14use crate::config::OnlineBackup;
15use crate::CoreAction;
16
17use crate::actors::{QueryServerReadV1, QueryServerWriteV1};
18use kanidmd_lib::constants::PURGE_FREQUENCY;
19use kanidmd_lib::event::{
20    OnlineBackupEvent, PurgeDeleteAfterEvent, PurgeRecycledEvent, PurgeTombstoneEvent,
21};
22
23pub(crate) struct IntervalActor;
24
25impl IntervalActor {
26    pub fn start(
27        server: &'static QueryServerWriteV1,
28        mut rx: broadcast::Receiver<CoreAction>,
29    ) -> tokio::task::JoinHandle<()> {
30        tokio::spawn(async move {
31            let mut inter = interval(Duration::from_secs(PURGE_FREQUENCY));
32            inter.set_missed_tick_behavior(MissedTickBehavior::Skip);
33
34            loop {
35                server
36                    .handle_purgetombstoneevent(PurgeTombstoneEvent::new())
37                    .await;
38                server
39                    .handle_purgerecycledevent(PurgeRecycledEvent::new())
40                    .await;
41                server
42                    .handle_purge_delete_after_event(PurgeDeleteAfterEvent::new())
43                    .await;
44
45                tokio::select! {
46                    Ok(action) = rx.recv() => {
47                        match action {
48                            CoreAction::Shutdown => break,
49                        }
50                    }
51                    _ = inter.tick() => {
52                        // Next iter.
53                        continue
54                    }
55                }
56            }
57
58            info!("Stopped {}", super::TaskName::IntervalActor);
59        })
60    }
61
62    // Allow this because result is the only way to map and ? to bubble up, but we aren't
63    // returning an op-error here because this is in early start up.
64    #[allow(clippy::result_unit_err)]
65    pub fn start_online_backup(
66        server: &'static QueryServerReadV1,
67        online_backup_config: &OnlineBackup,
68        mut rx: broadcast::Receiver<CoreAction>,
69    ) -> Result<tokio::task::JoinHandle<()>, ()> {
70        let outpath = match online_backup_config.path.to_owned() {
71            Some(val) => val,
72            None => {
73                error!("Online backup output path is not set.");
74                return Err(());
75            }
76        };
77        let versions = online_backup_config.versions;
78        let crono_expr = online_backup_config.schedule.as_str().to_string();
79        let mut crono_expr_values = crono_expr.split_ascii_whitespace().collect::<Vec<&str>>();
80        let chrono_expr_uses_standard_syntax = crono_expr_values.len() == 5;
81        if chrono_expr_uses_standard_syntax {
82            // we add a 0 element at the beginning to simulate the standard crono syntax which always runs
83            // commands at seconds 00
84            crono_expr_values.insert(0, "0");
85            crono_expr_values.push("*");
86        }
87        let crono_expr_schedule = crono_expr_values.join(" ");
88        if chrono_expr_uses_standard_syntax {
89            info!(
90                "Provided online backup schedule is: {}, now being transformed to: {}",
91                crono_expr, crono_expr_schedule
92            );
93        }
94        // Cron expression handling
95        let cron_expr = Schedule::from_str(crono_expr_schedule.as_str()).map_err(|e| {
96            error!("Online backup schedule parse error: {}", e);
97            error!("valid formats are:");
98            error!("sec  min   hour   day of month   month   day of week   year");
99            error!("min   hour   day of month   month   day of week");
100            error!("@hourly | @daily | @weekly");
101        })?;
102
103        info!("Online backup schedule parsed as: {}", cron_expr);
104
105        if cron_expr.upcoming(Utc).next().is_none() {
106            error!(
107                "Online backup schedule error: '{}' will not match any date.",
108                cron_expr
109            );
110            return Err(());
111        }
112
113        // Output path handling
114        let op = Path::new(&outpath);
115
116        // does the path exist and is a directory?
117        if !op.exists() {
118            info!(
119                "Online backup output folder '{}' does not exist, trying to create it.",
120                outpath.display()
121            );
122            fs::create_dir_all(&outpath).map_err(|e| {
123                error!(
124                    "Online backup failed to create output directory '{}': {}",
125                    outpath.display(),
126                    e
127                )
128            })?;
129        }
130
131        if !op.is_dir() {
132            error!("Online backup output '{}' is not a directory or we are missing permissions to access it.", outpath.display());
133            return Err(());
134        }
135
136        let backup_compression = online_backup_config.compression;
137
138        let handle = tokio::spawn(async move {
139            for next_time in cron_expr.upcoming(Utc) {
140                // We add 1 second to the `wait_time` in order to get "even" timestampes
141                // for example: 1 + 17:05:59Z --> 17:06:00Z
142                let wait_seconds = 1 + (next_time - Utc::now()).num_seconds() as u64;
143                info!(
144                    "Online backup next run on {}, wait_time = {}s",
145                    next_time, wait_seconds
146                );
147
148                tokio::select! {
149                    Ok(action) = rx.recv() => {
150                        match action {
151                            CoreAction::Shutdown => break,
152                        }
153                    }
154                    _ = sleep(Duration::from_secs(wait_seconds)) => {
155                        if let Err(e) = server
156                            .handle_online_backup(
157                                OnlineBackupEvent::new(),
158                                &outpath,
159                                versions,
160                                backup_compression,
161                            )
162                            .await
163                        {
164                            error!(?e, "An online backup error occurred.");
165                        }
166                    }
167                }
168            }
169            info!("Stopped {}", super::TaskName::BackupActor);
170        });
171
172        Ok(handle)
173    }
174}