orca/
run.rs

1use crate::error::Error;
2use crate::state::*;
3use crate::stats::{BasicStatistics, TestPhase};
4
5use std::sync::Arc;
6
7use rand::{Rng, SeedableRng};
8use rand_chacha::ChaCha8Rng;
9
10use crossbeam::queue::{ArrayQueue, SegQueue};
11
12use kanidm_client::{KanidmClient, KanidmClientBuilder};
13
14use serde::Serialize;
15use tokio::sync::broadcast;
16
17use std::time::{Duration, Instant};
18
19async fn actor_person(
20    main_client: KanidmClient,
21    person: Person,
22    stats_queue: Arc<SegQueue<EventRecord>>,
23    mut actor_rx: broadcast::Receiver<Signal>,
24    rng_seed: u64,
25    additional_clients: Vec<KanidmClient>,
26    warmup_time: Duration,
27) -> Result<(), Error> {
28    let mut model =
29        person
30            .model
31            .as_dyn_object(rng_seed, additional_clients, &person.username, warmup_time)?;
32
33    while let Err(broadcast::error::TryRecvError::Empty) = actor_rx.try_recv() {
34        let events = model.transition(&main_client, &person).await?;
35        debug!("Pushed event to queue!");
36        for event in events.into_iter() {
37            stats_queue.push(event);
38        }
39    }
40
41    debug!("Stopped person {}", person.username);
42    Ok(())
43}
44
45#[derive(Debug)]
46pub struct EventRecord {
47    pub start: Instant,
48    pub duration: Duration,
49    pub details: EventDetail,
50}
51
52#[derive(Debug, Serialize, Clone)]
53pub enum EventDetail {
54    Login,
55    Logout,
56    PersonSetSelfMail,
57    PersonGetSelfAccount,
58    PersonGetSelfMemberOf,
59    PersonSetSelfPassword,
60    PersonReauth,
61    PersonCreateGroup,
62    PersonAddGroupMembers,
63    GroupReplicationDelay,
64    Error,
65}
66
67#[derive(Clone, Debug)]
68pub enum Signal {
69    Stop,
70}
71
72async fn execute_inner(
73    warmup: Duration,
74    test_time: Option<Duration>,
75    mut control_rx: broadcast::Receiver<Signal>,
76    stat_ctrl: Arc<ArrayQueue<TestPhase>>,
77) -> Result<(), Error> {
78    // Delay for warmup time.
79    tokio::select! {
80        _ = tokio::time::sleep(warmup) => {
81            // continue.
82        }
83        _ = control_rx.recv() => {
84            // Until we add other signal types, any event is
85            // either Ok(Signal::Stop) or Err(_), both of which indicate
86            // we need to stop immediately.
87            return Err(Error::Interrupt);
88        }
89    }
90    info!("warmup time passed, statistics will now be collected ...");
91
92    let start = Instant::now();
93    if let Err(crossbeam_err) = stat_ctrl.push(TestPhase::Start(start)) {
94        error!(
95            ?crossbeam_err,
96            "Unable to signal statistics collector to start"
97        );
98        return Err(Error::Crossbeam);
99    }
100
101    if let Some(test_time) = test_time {
102        let sleep = tokio::time::sleep(test_time);
103        tokio::pin!(sleep);
104        let recv = (control_rx).recv();
105        tokio::pin!(recv);
106
107        // Wait for some condition (signal, or time).
108        tokio::select! {
109            _ = sleep => {
110                // continue.
111            }
112            _ = recv => {
113                // Until we add other signal types, any event is
114                // either Ok(Signal::Stop) or Err(_), both of which indicate
115                // we need to stop immediately.
116                debug!("Interrupt");
117                return Err(Error::Interrupt);
118            }
119        }
120    } else {
121        let _ = control_rx.recv().await;
122    }
123
124    let end = Instant::now();
125    if let Err(crossbeam_err) = stat_ctrl.push(TestPhase::End(end)) {
126        error!(
127            ?crossbeam_err,
128            "Unable to signal statistics collector to end"
129        );
130        return Err(Error::Crossbeam);
131    }
132
133    Ok(())
134}
135
136pub async fn execute(state: State, control_rx: broadcast::Receiver<Signal>) -> Result<(), Error> {
137    // Create a statistics queue.
138    let stats_queue = Arc::new(SegQueue::new());
139    let stats_ctrl = Arc::new(ArrayQueue::new(4));
140
141    // Spawn the stats aggregator
142    let c_stats_queue = stats_queue.clone();
143    let c_stats_ctrl = stats_ctrl.clone();
144
145    let node_count = 1 + state.profile.extra_uris().len();
146    let mut dyn_data_collector =
147        BasicStatistics::new(state.persons.len(), state.groups.len(), node_count);
148
149    let dump_raw_data = state.profile.dump_raw_data();
150
151    let stats_task = tokio::task::spawn_blocking(move || {
152        dyn_data_collector.run(c_stats_queue, c_stats_ctrl, dump_raw_data)
153    });
154
155    // Create clients. Note, we actually seed these deterministically too, so that
156    // or persons are spread over the clients that exist, in a way that is also
157    // deterministic.
158    let mut seeded_rng = ChaCha8Rng::seed_from_u64(state.profile.seed());
159
160    let clients = std::iter::once(state.profile.control_uri().to_string())
161        .chain(state.profile.extra_uris().iter().cloned())
162        .map(|uri| {
163            KanidmClientBuilder::new()
164                .address(uri)
165                .danger_accept_invalid_hostnames(true)
166                .danger_accept_invalid_certs(true)
167                .build()
168                .map_err(|err| {
169                    error!(?err, "Unable to create kanidm client");
170                    Error::KanidmClient
171                })
172        })
173        .collect::<Result<Vec<_>, _>>()?;
174
175    let (actor_tx, _actor_rx) = broadcast::channel(1);
176
177    // Start the actors
178    let mut tasks = Vec::with_capacity(state.persons.len());
179    for person in state.persons.into_iter() {
180        // this is not super efficient but we don't really care as we are not even inside the warmup time window, so we're not in a hurry
181        let mut cloned_clients: Vec<KanidmClient> = clients
182            .iter()
183            .map(|client| {
184                client.new_session().map_err(|err| {
185                    error!(?err, "Unable to create a new kanidm client session");
186                    Error::KanidmClient
187                })
188            })
189            .collect::<Result<Vec<_>, _>>()?;
190        let main_client_index = seeded_rng.random_range(0..cloned_clients.len());
191        let main_client = cloned_clients.remove(main_client_index);
192        //note that cloned_clients now contains all other clients except the first one
193
194        let c_stats_queue = stats_queue.clone();
195
196        let c_actor_rx = actor_tx.subscribe();
197
198        tasks.push(tokio::spawn(actor_person(
199            main_client,
200            person,
201            c_stats_queue,
202            c_actor_rx,
203            state.profile.seed(),
204            cloned_clients,
205            state.profile.warmup_time(),
206        )))
207    }
208
209    let warmup = state.profile.warmup_time();
210    let test_time = state.profile.test_time();
211
212    // We run a separate test inner so we don't have to worry about
213    // task spawn/join within our logic.
214    let c_stats_ctrl = stats_ctrl.clone();
215    // Don't ? this, we want to stash the result so we cleanly stop all the workers
216    // before returning the inner test result.
217    let test_result = execute_inner(warmup, test_time, control_rx, c_stats_ctrl).await;
218
219    info!("stopping stats");
220
221    // The statistics collector has been working in the BG, and was likely told
222    // to end by now, but if not (due to an error) send a signal to stop immediately.
223    if let Err(crossbeam_err) = stats_ctrl.push(TestPhase::StopNow) {
224        error!(
225            ?crossbeam_err,
226            "Unable to signal statistics collector to stop"
227        );
228        return Err(Error::Crossbeam);
229    }
230
231    info!("stopping workers");
232
233    // Test workers to stop
234    actor_tx.send(Signal::Stop).map_err(|broadcast_err| {
235        error!(?broadcast_err, "Unable to signal workers to stop");
236        Error::Tokio
237    })?;
238
239    info!("joining workers");
240
241    // Join all the tasks.
242    for task in tasks {
243        task.await.map_err(|tokio_err| {
244            error!(?tokio_err, "Failed to join task");
245            Error::Tokio
246        })??;
247        // The double ? isn't a mistake, it's because this is Result<Result<T, E>, E>
248        // and flatten is nightly.
249    }
250
251    // By this point the stats task should have been told to halt and rejoin.
252    stats_task.await.map_err(|tokio_err| {
253        error!(?tokio_err, "Failed to join statistics task");
254        Error::Tokio
255    })??;
256    // Not an error, two ? to handle the inner data collector error.
257
258    // Complete!
259
260    test_result
261}