orca/models/
latency_measurer.rs

1use std::{
2    iter,
3    str::FromStr,
4    time::{Duration, Instant},
5};
6
7use async_trait::async_trait;
8use idlset::v2::IDLBitRange;
9
10use hashbrown::HashMap;
11use kanidm_client::KanidmClient;
12use rand::Rng;
13use rand_chacha::ChaCha8Rng;
14
15use crate::{
16    error::Error,
17    model::{self, ActorModel, TransitionResult},
18    run::{EventDetail, EventRecord},
19    state::Person,
20};
21
22pub enum TransitionAction {
23    Login,
24    PrivilegeReauth,
25    CreatePersonalGroup,
26    CreateGroup,
27    AddCreatedGroupToPersonalGroup,
28    CheckPersonalGroupReplicationStatus,
29}
30
31// Is this the right way? Should transitions/delay be part of the actor model? Should
32// they be responsible.
33pub struct Transition {
34    pub delay: Option<Duration>,
35    pub action: TransitionAction,
36}
37
38impl Transition {
39    #[allow(dead_code)]
40    pub fn delay(&self) -> Option<Duration> {
41        self.delay
42    }
43}
44
45enum State {
46    Unauthenticated,
47    Authenticated,
48    AuthenticatedWithReauth,
49    CreatedPersonalGroup,
50    CreatedGroup,
51    AddedCreatedGroupToPersonalGroup,
52    CheckedPersonalGroupReplicationStatus,
53}
54
55pub struct ActorLatencyMeasurer {
56    state: State,
57    randomised_backoff_time: Duration,
58    additional_clients: Vec<KanidmClient>,
59    group_index: u64,
60    personal_group_name: String,
61    groups_creation_time: HashMap<u64, Instant>,
62    unreplicated_groups_by_client: Vec<IDLBitRange>,
63}
64
65impl ActorLatencyMeasurer {
66    pub fn new(
67        mut cha_rng: ChaCha8Rng,
68        additional_clients: Vec<KanidmClient>,
69        person_name: &str,
70        warmup_time_ms: u64,
71    ) -> Result<Self, Error> {
72        if additional_clients.is_empty() {
73            return Err(Error::InvalidState);
74        };
75        let additional_clients_len = additional_clients.len();
76
77        let max_backoff_time_in_ms = 2 * warmup_time_ms / 3;
78        let randomised_backoff_time =
79            Duration::from_millis(cha_rng.random_range(0..max_backoff_time_in_ms));
80        Ok(ActorLatencyMeasurer {
81            state: State::Unauthenticated,
82            randomised_backoff_time,
83            additional_clients,
84            group_index: 0,
85            personal_group_name: format!("{person_name}-personal-group"),
86            groups_creation_time: HashMap::new(),
87            unreplicated_groups_by_client: vec![IDLBitRange::new(); additional_clients_len],
88        })
89    }
90}
91
92#[async_trait]
93impl ActorModel for ActorLatencyMeasurer {
94    async fn transition(
95        &mut self,
96        client: &KanidmClient,
97        person: &Person,
98    ) -> Result<Vec<EventRecord>, Error> {
99        let transition = self.next_transition();
100
101        if let Some(delay) = transition.delay {
102            tokio::time::sleep(delay).await;
103        }
104
105        let (result, event) = match transition.action {
106            TransitionAction::Login => {
107                let mut event_records = Vec::new();
108                let mut final_res = TransitionResult::Ok;
109
110                // We need to login on all the instances. Every time one of the login fails, we abort
111                for client in iter::once(client).chain(self.additional_clients.iter()) {
112                    let (res, more_records) = model::login(client, person).await?;
113                    final_res = res;
114                    event_records.extend(more_records);
115                    if let TransitionResult::Error = final_res {
116                        break;
117                    }
118                }
119                Ok((final_res, event_records))
120            }
121            // PrivilegeReauth is only useful to create new groups, so we just need it on our main client
122            TransitionAction::PrivilegeReauth => model::privilege_reauth(client, person).await,
123            TransitionAction::CreatePersonalGroup => {
124                model::person_create_group(client, &self.personal_group_name).await
125            }
126            TransitionAction::CreateGroup => {
127                self.generate_new_group_name();
128                let outcome = model::person_create_group(client, &self.get_group_name()).await;
129                // We need to check if the group was successfully created or not, and act accordingly!
130                if let Ok((transition_result, _)) = &outcome {
131                    if let TransitionResult::Error = transition_result {
132                        self.rollback_new_group_name()
133                    } else {
134                        self.commit_new_group_name()
135                    }
136                }
137                outcome
138            }
139            TransitionAction::AddCreatedGroupToPersonalGroup => {
140                model::person_add_group_members(
141                    client,
142                    &self.personal_group_name,
143                    &[&self.get_group_name()],
144                )
145                .await
146            }
147            TransitionAction::CheckPersonalGroupReplicationStatus => {
148                let mut event_records = Vec::new();
149                let clients_number = self.additional_clients.len();
150                for client_index in 0..clients_number {
151                    match self.get_replicated_groups_by_client(client_index).await {
152                        Ok(replicated_groups) => {
153                            let groups_read_time = Instant::now();
154                            let repl_event_records = self
155                                .parse_replicated_groups_into_replication_event_records(
156                                    &replicated_groups,
157                                    client_index,
158                                    groups_read_time,
159                                );
160                            event_records.extend(repl_event_records);
161                        }
162                        Err(event_record) => event_records.push(event_record),
163                    };
164                }
165                // Note for the future folks ending up here: we MUST always return TransitionResult::Ok otherwise we will loop here forever (believe me
166                // I know from personal experience). If we loop here we never do TransitionAction::CreateGroup, which is basically the only transition we care
167                // about in this model. If you really need to change this then you also need to change the `next_state` function below
168                Ok((TransitionResult::Ok, event_records))
169            }
170        }?;
171
172        self.next_state(transition.action, result);
173
174        Ok(event)
175    }
176}
177
178impl ActorLatencyMeasurer {
179    fn generate_new_group_name(&mut self) {
180        self.group_index += 1;
181    }
182
183    fn commit_new_group_name(&mut self) {
184        self.groups_creation_time
185            .insert(self.group_index, Instant::now());
186        self.unreplicated_groups_by_client
187            .iter_mut()
188            .for_each(|c| c.insert_id(self.group_index))
189    }
190
191    fn rollback_new_group_name(&mut self) {
192        self.group_index -= 1;
193    }
194
195    fn get_group_name(&self) -> String {
196        format!("{}-{}", &self.personal_group_name, self.group_index)
197    }
198
199    async fn get_replicated_groups_by_client(
200        &self,
201        client_index: usize,
202    ) -> Result<Vec<String>, EventRecord> {
203        let start = Instant::now();
204        let replicated_groups = self.additional_clients[client_index]
205            .idm_group_get_members(&self.personal_group_name)
206            .await;
207        let duration = Instant::now().duration_since(start);
208
209        match replicated_groups {
210            Err(client_err) => {
211                debug!(?client_err);
212                Err(EventRecord {
213                    start,
214                    duration,
215                    details: EventDetail::Error,
216                })
217            }
218            Ok(maybe_replicated_groups) => Ok(maybe_replicated_groups.unwrap_or_default()),
219        }
220    }
221
222    fn parse_replicated_groups_into_replication_event_records(
223        &mut self,
224        replicated_group_names: &[String],
225        client_index: usize,
226        groups_read_time: Instant,
227    ) -> Vec<EventRecord> {
228        let group_id_from_group_name =
229            |group_name: &String| u64::from_str(group_name.split(&['-', '@']).nth(3)?).ok();
230
231        let replicated_group_ids: Vec<u64> = replicated_group_names
232            .iter()
233            .filter_map(group_id_from_group_name)
234            .collect();
235        // We just create a more efficient set to store the replicated group ids. This will be useful later
236        let replicated_group_ids_set = IDLBitRange::from_iter(replicated_group_ids);
237
238        // The newly_replicated_groups contains all replicated groups that have been spotted for the first time in the given client (determined by client_index);
239        // It is the union of the set of groups we created and up to this point assumed were unreplicated (which is stored in unreplicated_groups_by_client) and
240        // the set of groups we have just observed to be replicated, stored in replicated_group_names.
241        let newly_replicated_groups =
242            &replicated_group_ids_set & &self.unreplicated_groups_by_client[client_index];
243
244        // Once we have these newly replicated groups, we remove them from the unreplicated_groups_by_client, as we now know they have indeed been replicated,
245        // and therefore have no place in unreplicated_groups_by_client.
246        for group_id in newly_replicated_groups.into_iter() {
247            self.unreplicated_groups_by_client[client_index].remove_id(group_id)
248        }
249
250        newly_replicated_groups
251            .into_iter()
252            .filter_map(|group| {
253                Some(self.create_replication_delay_event_record(
254                    *self.groups_creation_time.get(&group)?,
255                    groups_read_time,
256                ))
257            })
258            .collect()
259    }
260
261    fn create_replication_delay_event_record(
262        &self,
263        creation_time: Instant,
264        read_time: Instant,
265    ) -> EventRecord {
266        EventRecord {
267            start: creation_time,
268            duration: read_time.duration_since(creation_time),
269            details: EventDetail::GroupReplicationDelay,
270        }
271    }
272
273    fn next_transition(&mut self) -> Transition {
274        match self.state {
275            State::Unauthenticated => Transition {
276                delay: Some(self.randomised_backoff_time),
277                action: TransitionAction::Login,
278            },
279            State::Authenticated => Transition {
280                delay: Some(Duration::from_secs(2)),
281                action: TransitionAction::PrivilegeReauth,
282            },
283            State::AuthenticatedWithReauth => Transition {
284                delay: Some(Duration::from_secs(1)),
285                action: TransitionAction::CreatePersonalGroup,
286            },
287            State::CreatedPersonalGroup => Transition {
288                delay: Some(Duration::from_secs(1)),
289                action: TransitionAction::CreateGroup,
290            },
291            State::CreatedGroup => Transition {
292                delay: None,
293                action: TransitionAction::AddCreatedGroupToPersonalGroup,
294            },
295            State::AddedCreatedGroupToPersonalGroup => Transition {
296                delay: None,
297                action: TransitionAction::CheckPersonalGroupReplicationStatus,
298            },
299            State::CheckedPersonalGroupReplicationStatus => Transition {
300                delay: Some(Duration::from_secs(1)),
301                action: TransitionAction::CreateGroup,
302            },
303        }
304    }
305
306    fn next_state(&mut self, action: TransitionAction, result: TransitionResult) {
307        // Is this a design flaw? We probably need to know what the state was that we
308        // requested to move to?
309        match (&self.state, action, result) {
310            (State::Unauthenticated, TransitionAction::Login, TransitionResult::Ok) => {
311                self.state = State::Authenticated;
312            }
313            (State::Authenticated, TransitionAction::PrivilegeReauth, TransitionResult::Ok) => {
314                self.state = State::AuthenticatedWithReauth;
315            }
316            (
317                State::AuthenticatedWithReauth,
318                TransitionAction::CreatePersonalGroup,
319                TransitionResult::Ok,
320            ) => self.state = State::CreatedPersonalGroup,
321            (State::CreatedPersonalGroup, TransitionAction::CreateGroup, TransitionResult::Ok) => {
322                self.state = State::CreatedGroup
323            }
324            (
325                State::CreatedGroup,
326                TransitionAction::AddCreatedGroupToPersonalGroup,
327                TransitionResult::Ok,
328            ) => self.state = State::AddedCreatedGroupToPersonalGroup,
329            (
330                State::AddedCreatedGroupToPersonalGroup,
331                TransitionAction::CheckPersonalGroupReplicationStatus,
332                TransitionResult::Ok,
333            ) => self.state = State::CheckedPersonalGroupReplicationStatus,
334            (
335                State::CheckedPersonalGroupReplicationStatus,
336                TransitionAction::CreateGroup,
337                TransitionResult::Ok,
338            ) => self.state = State::CreatedGroup,
339
340            #[allow(clippy::unreachable)]
341            (_, _, TransitionResult::Ok) => {
342                unreachable!();
343            }
344            (_, _, TransitionResult::Error) => {
345                // If an error occurred we don't do anything, aka we remain on the same state we were before and we try again
346            }
347        }
348    }
349}