1use crate::error::Error;
2use crate::state::*;
3use crate::stats::{BasicStatistics, TestPhase};
4
5use std::sync::Arc;
6
7use rand::{Rng, SeedableRng};
8use rand_chacha::ChaCha8Rng;
9
10use crossbeam::queue::{ArrayQueue, SegQueue};
11
12use kanidm_client::{KanidmClient, KanidmClientBuilder};
13
14use serde::Serialize;
15use tokio::sync::broadcast;
16
17use std::time::{Duration, Instant};
18
19async fn actor_person(
20 main_client: KanidmClient,
21 person: Person,
22 stats_queue: Arc<SegQueue<EventRecord>>,
23 mut actor_rx: broadcast::Receiver<Signal>,
24 rng_seed: u64,
25 additional_clients: Vec<KanidmClient>,
26 warmup_time: Duration,
27) -> Result<(), Error> {
28 let mut model =
29 person
30 .model
31 .as_dyn_object(rng_seed, additional_clients, &person.username, warmup_time)?;
32
33 while let Err(broadcast::error::TryRecvError::Empty) = actor_rx.try_recv() {
34 let events = model.transition(&main_client, &person).await?;
35 debug!("Pushed event to queue!");
36 for event in events.into_iter() {
37 stats_queue.push(event);
38 }
39 }
40
41 debug!("Stopped person {}", person.username);
42 Ok(())
43}
44
45#[derive(Debug)]
46pub struct EventRecord {
47 pub start: Instant,
48 pub duration: Duration,
49 pub details: EventDetail,
50}
51
52#[derive(Debug, Serialize, Clone)]
53pub enum EventDetail {
54 Login,
55 Logout,
56 PersonSetSelfMail,
57 PersonGetSelfAccount,
58 PersonGetSelfMemberOf,
59 PersonSetSelfPassword,
60 PersonReauth,
61 PersonCreateGroup,
62 PersonAddGroupMembers,
63 GroupReplicationDelay,
64 Error,
65}
66
67#[derive(Clone, Debug)]
68pub enum Signal {
69 Stop,
70}
71
72async fn execute_inner(
73 warmup: Duration,
74 test_time: Option<Duration>,
75 mut control_rx: broadcast::Receiver<Signal>,
76 stat_ctrl: Arc<ArrayQueue<TestPhase>>,
77) -> Result<(), Error> {
78 tokio::select! {
80 _ = tokio::time::sleep(warmup) => {
81 }
83 _ = control_rx.recv() => {
84 return Err(Error::Interrupt);
88 }
89 }
90 info!("warmup time passed, statistics will now be collected ...");
91
92 let start = Instant::now();
93 if let Err(crossbeam_err) = stat_ctrl.push(TestPhase::Start(start)) {
94 error!(
95 ?crossbeam_err,
96 "Unable to signal statistics collector to start"
97 );
98 return Err(Error::Crossbeam);
99 }
100
101 if let Some(test_time) = test_time {
102 let sleep = tokio::time::sleep(test_time);
103 tokio::pin!(sleep);
104 let recv = (control_rx).recv();
105 tokio::pin!(recv);
106
107 tokio::select! {
109 _ = sleep => {
110 }
112 _ = recv => {
113 debug!("Interrupt");
117 return Err(Error::Interrupt);
118 }
119 }
120 } else {
121 let _ = control_rx.recv().await;
122 }
123
124 let end = Instant::now();
125 if let Err(crossbeam_err) = stat_ctrl.push(TestPhase::End(end)) {
126 error!(
127 ?crossbeam_err,
128 "Unable to signal statistics collector to end"
129 );
130 return Err(Error::Crossbeam);
131 }
132
133 Ok(())
134}
135
136pub async fn execute(state: State, control_rx: broadcast::Receiver<Signal>) -> Result<(), Error> {
137 let stats_queue = Arc::new(SegQueue::new());
139 let stats_ctrl = Arc::new(ArrayQueue::new(4));
140
141 let c_stats_queue = stats_queue.clone();
143 let c_stats_ctrl = stats_ctrl.clone();
144
145 let node_count = 1 + state.profile.extra_uris().len();
146 let mut dyn_data_collector =
147 BasicStatistics::new(state.persons.len(), state.groups.len(), node_count);
148
149 let dump_raw_data = state.profile.dump_raw_data();
150
151 let stats_task = tokio::task::spawn_blocking(move || {
152 dyn_data_collector.run(c_stats_queue, c_stats_ctrl, dump_raw_data)
153 });
154
155 let mut seeded_rng = ChaCha8Rng::seed_from_u64(state.profile.seed());
159
160 let clients = std::iter::once(state.profile.control_uri().to_string())
161 .chain(state.profile.extra_uris().iter().cloned())
162 .map(|uri| {
163 KanidmClientBuilder::new()
164 .address(uri)
165 .danger_accept_invalid_hostnames(true)
166 .danger_accept_invalid_certs(true)
167 .build()
168 .map_err(|err| {
169 error!(?err, "Unable to create kanidm client");
170 Error::KanidmClient
171 })
172 })
173 .collect::<Result<Vec<_>, _>>()?;
174
175 let (actor_tx, _actor_rx) = broadcast::channel(1);
176
177 let mut tasks = Vec::with_capacity(state.persons.len());
179 for person in state.persons.into_iter() {
180 let mut cloned_clients: Vec<KanidmClient> = clients
182 .iter()
183 .map(|client| {
184 client.new_session().map_err(|err| {
185 error!(?err, "Unable to create a new kanidm client session");
186 Error::KanidmClient
187 })
188 })
189 .collect::<Result<Vec<_>, _>>()?;
190 let main_client_index = seeded_rng.random_range(0..cloned_clients.len());
191 let main_client = cloned_clients.remove(main_client_index);
192 let c_stats_queue = stats_queue.clone();
195
196 let c_actor_rx = actor_tx.subscribe();
197
198 tasks.push(tokio::spawn(actor_person(
199 main_client,
200 person,
201 c_stats_queue,
202 c_actor_rx,
203 state.profile.seed(),
204 cloned_clients,
205 state.profile.warmup_time(),
206 )))
207 }
208
209 let warmup = state.profile.warmup_time();
210 let test_time = state.profile.test_time();
211
212 let c_stats_ctrl = stats_ctrl.clone();
215 let test_result = execute_inner(warmup, test_time, control_rx, c_stats_ctrl).await;
218
219 info!("stopping stats");
220
221 if let Err(crossbeam_err) = stats_ctrl.push(TestPhase::StopNow) {
224 error!(
225 ?crossbeam_err,
226 "Unable to signal statistics collector to stop"
227 );
228 return Err(Error::Crossbeam);
229 }
230
231 info!("stopping workers");
232
233 actor_tx.send(Signal::Stop).map_err(|broadcast_err| {
235 error!(?broadcast_err, "Unable to signal workers to stop");
236 Error::Tokio
237 })?;
238
239 info!("joining workers");
240
241 for task in tasks {
243 task.await.map_err(|tokio_err| {
244 error!(?tokio_err, "Failed to join task");
245 Error::Tokio
246 })??;
247 }
250
251 stats_task.await.map_err(|tokio_err| {
253 error!(?tokio_err, "Failed to join statistics task");
254 Error::Tokio
255 })??;
256 test_result
261}