1use std::{
2 iter,
3 str::FromStr,
4 time::{Duration, Instant},
5};
6
7use async_trait::async_trait;
8use idlset::v2::IDLBitRange;
9
10use hashbrown::HashMap;
11use kanidm_client::KanidmClient;
12use rand::Rng;
13use rand_chacha::ChaCha8Rng;
14
15use crate::{
16 error::Error,
17 model::{self, ActorModel, TransitionResult},
18 run::{EventDetail, EventRecord},
19 state::Person,
20};
21
22pub enum TransitionAction {
23 Login,
24 PrivilegeReauth,
25 CreatePersonalGroup,
26 CreateGroup,
27 AddCreatedGroupToPersonalGroup,
28 CheckPersonalGroupReplicationStatus,
29}
30
31pub struct Transition {
34 pub delay: Option<Duration>,
35 pub action: TransitionAction,
36}
37
38impl Transition {
39 #[allow(dead_code)]
40 pub fn delay(&self) -> Option<Duration> {
41 self.delay
42 }
43}
44
45enum State {
46 Unauthenticated,
47 Authenticated,
48 AuthenticatedWithReauth,
49 CreatedPersonalGroup,
50 CreatedGroup,
51 AddedCreatedGroupToPersonalGroup,
52 CheckedPersonalGroupReplicationStatus,
53}
54
55pub struct ActorLatencyMeasurer {
56 state: State,
57 randomised_backoff_time: Duration,
58 additional_clients: Vec<KanidmClient>,
59 group_index: u64,
60 personal_group_name: String,
61 groups_creation_time: HashMap<u64, Instant>,
62 unreplicated_groups_by_client: Vec<IDLBitRange>,
63}
64
65impl ActorLatencyMeasurer {
66 pub fn new(
67 mut cha_rng: ChaCha8Rng,
68 additional_clients: Vec<KanidmClient>,
69 person_name: &str,
70 warmup_time_ms: u64,
71 ) -> Result<Self, Error> {
72 if additional_clients.is_empty() {
73 return Err(Error::InvalidState);
74 };
75 let additional_clients_len = additional_clients.len();
76
77 let max_backoff_time_in_ms = 2 * warmup_time_ms / 3;
78 let randomised_backoff_time =
79 Duration::from_millis(cha_rng.random_range(0..max_backoff_time_in_ms));
80 Ok(ActorLatencyMeasurer {
81 state: State::Unauthenticated,
82 randomised_backoff_time,
83 additional_clients,
84 group_index: 0,
85 personal_group_name: format!("{person_name}-personal-group"),
86 groups_creation_time: HashMap::new(),
87 unreplicated_groups_by_client: vec![IDLBitRange::new(); additional_clients_len],
88 })
89 }
90}
91
92#[async_trait]
93impl ActorModel for ActorLatencyMeasurer {
94 async fn transition(
95 &mut self,
96 client: &KanidmClient,
97 person: &Person,
98 ) -> Result<Vec<EventRecord>, Error> {
99 let transition = self.next_transition();
100
101 if let Some(delay) = transition.delay {
102 tokio::time::sleep(delay).await;
103 }
104
105 let (result, event) = match transition.action {
106 TransitionAction::Login => {
107 let mut event_records = Vec::new();
108 let mut final_res = TransitionResult::Ok;
109
110 for client in iter::once(client).chain(self.additional_clients.iter()) {
112 let (res, more_records) = model::login(client, person).await?;
113 final_res = res;
114 event_records.extend(more_records);
115 if let TransitionResult::Error = final_res {
116 break;
117 }
118 }
119 Ok((final_res, event_records))
120 }
121 TransitionAction::PrivilegeReauth => model::privilege_reauth(client, person).await,
123 TransitionAction::CreatePersonalGroup => {
124 model::person_create_group(client, &self.personal_group_name).await
125 }
126 TransitionAction::CreateGroup => {
127 self.generate_new_group_name();
128 let outcome = model::person_create_group(client, &self.get_group_name()).await;
129 if let Ok((transition_result, _)) = &outcome {
131 if let TransitionResult::Error = transition_result {
132 self.rollback_new_group_name()
133 } else {
134 self.commit_new_group_name()
135 }
136 }
137 outcome
138 }
139 TransitionAction::AddCreatedGroupToPersonalGroup => {
140 model::person_add_group_members(
141 client,
142 &self.personal_group_name,
143 &[&self.get_group_name()],
144 )
145 .await
146 }
147 TransitionAction::CheckPersonalGroupReplicationStatus => {
148 let mut event_records = Vec::new();
149 let clients_number = self.additional_clients.len();
150 for client_index in 0..clients_number {
151 match self.get_replicated_groups_by_client(client_index).await {
152 Ok(replicated_groups) => {
153 let groups_read_time = Instant::now();
154 let repl_event_records = self
155 .parse_replicated_groups_into_replication_event_records(
156 &replicated_groups,
157 client_index,
158 groups_read_time,
159 );
160 event_records.extend(repl_event_records);
161 }
162 Err(event_record) => event_records.push(event_record),
163 };
164 }
165 Ok((TransitionResult::Ok, event_records))
169 }
170 }?;
171
172 self.next_state(transition.action, result);
173
174 Ok(event)
175 }
176}
177
178impl ActorLatencyMeasurer {
179 fn generate_new_group_name(&mut self) {
180 self.group_index += 1;
181 }
182
183 fn commit_new_group_name(&mut self) {
184 self.groups_creation_time
185 .insert(self.group_index, Instant::now());
186 self.unreplicated_groups_by_client
187 .iter_mut()
188 .for_each(|c| c.insert_id(self.group_index))
189 }
190
191 fn rollback_new_group_name(&mut self) {
192 self.group_index -= 1;
193 }
194
195 fn get_group_name(&self) -> String {
196 format!("{}-{}", &self.personal_group_name, self.group_index)
197 }
198
199 async fn get_replicated_groups_by_client(
200 &self,
201 client_index: usize,
202 ) -> Result<Vec<String>, EventRecord> {
203 let start = Instant::now();
204 let replicated_groups = self.additional_clients[client_index]
205 .idm_group_get_members(&self.personal_group_name)
206 .await;
207 let duration = Instant::now().duration_since(start);
208
209 match replicated_groups {
210 Err(client_err) => {
211 debug!(?client_err);
212 Err(EventRecord {
213 start,
214 duration,
215 details: EventDetail::Error,
216 })
217 }
218 Ok(maybe_replicated_groups) => Ok(maybe_replicated_groups.unwrap_or_default()),
219 }
220 }
221
222 fn parse_replicated_groups_into_replication_event_records(
223 &mut self,
224 replicated_group_names: &[String],
225 client_index: usize,
226 groups_read_time: Instant,
227 ) -> Vec<EventRecord> {
228 let group_id_from_group_name =
229 |group_name: &String| u64::from_str(group_name.split(&['-', '@']).nth(3)?).ok();
230
231 let replicated_group_ids: Vec<u64> = replicated_group_names
232 .iter()
233 .filter_map(group_id_from_group_name)
234 .collect();
235 let replicated_group_ids_set = IDLBitRange::from_iter(replicated_group_ids);
237
238 let newly_replicated_groups =
242 &replicated_group_ids_set & &self.unreplicated_groups_by_client[client_index];
243
244 for group_id in newly_replicated_groups.into_iter() {
247 self.unreplicated_groups_by_client[client_index].remove_id(group_id)
248 }
249
250 newly_replicated_groups
251 .into_iter()
252 .filter_map(|group| {
253 Some(self.create_replication_delay_event_record(
254 *self.groups_creation_time.get(&group)?,
255 groups_read_time,
256 ))
257 })
258 .collect()
259 }
260
261 fn create_replication_delay_event_record(
262 &self,
263 creation_time: Instant,
264 read_time: Instant,
265 ) -> EventRecord {
266 EventRecord {
267 start: creation_time,
268 duration: read_time.duration_since(creation_time),
269 details: EventDetail::GroupReplicationDelay,
270 }
271 }
272
273 fn next_transition(&mut self) -> Transition {
274 match self.state {
275 State::Unauthenticated => Transition {
276 delay: Some(self.randomised_backoff_time),
277 action: TransitionAction::Login,
278 },
279 State::Authenticated => Transition {
280 delay: Some(Duration::from_secs(2)),
281 action: TransitionAction::PrivilegeReauth,
282 },
283 State::AuthenticatedWithReauth => Transition {
284 delay: Some(Duration::from_secs(1)),
285 action: TransitionAction::CreatePersonalGroup,
286 },
287 State::CreatedPersonalGroup => Transition {
288 delay: Some(Duration::from_secs(1)),
289 action: TransitionAction::CreateGroup,
290 },
291 State::CreatedGroup => Transition {
292 delay: None,
293 action: TransitionAction::AddCreatedGroupToPersonalGroup,
294 },
295 State::AddedCreatedGroupToPersonalGroup => Transition {
296 delay: None,
297 action: TransitionAction::CheckPersonalGroupReplicationStatus,
298 },
299 State::CheckedPersonalGroupReplicationStatus => Transition {
300 delay: Some(Duration::from_secs(1)),
301 action: TransitionAction::CreateGroup,
302 },
303 }
304 }
305
306 fn next_state(&mut self, action: TransitionAction, result: TransitionResult) {
307 match (&self.state, action, result) {
310 (State::Unauthenticated, TransitionAction::Login, TransitionResult::Ok) => {
311 self.state = State::Authenticated;
312 }
313 (State::Authenticated, TransitionAction::PrivilegeReauth, TransitionResult::Ok) => {
314 self.state = State::AuthenticatedWithReauth;
315 }
316 (
317 State::AuthenticatedWithReauth,
318 TransitionAction::CreatePersonalGroup,
319 TransitionResult::Ok,
320 ) => self.state = State::CreatedPersonalGroup,
321 (State::CreatedPersonalGroup, TransitionAction::CreateGroup, TransitionResult::Ok) => {
322 self.state = State::CreatedGroup
323 }
324 (
325 State::CreatedGroup,
326 TransitionAction::AddCreatedGroupToPersonalGroup,
327 TransitionResult::Ok,
328 ) => self.state = State::AddedCreatedGroupToPersonalGroup,
329 (
330 State::AddedCreatedGroupToPersonalGroup,
331 TransitionAction::CheckPersonalGroupReplicationStatus,
332 TransitionResult::Ok,
333 ) => self.state = State::CheckedPersonalGroupReplicationStatus,
334 (
335 State::CheckedPersonalGroupReplicationStatus,
336 TransitionAction::CreateGroup,
337 TransitionResult::Ok,
338 ) => self.state = State::CreatedGroup,
339
340 #[allow(clippy::unreachable)]
341 (_, _, TransitionResult::Ok) => {
342 unreachable!();
343 }
344 (_, _, TransitionResult::Error) => {
345 }
347 }
348 }
349}