use crate::error::Error;
use crate::state::*;
use crate::stats::{BasicStatistics, TestPhase};
use std::sync::Arc;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use crossbeam::queue::{ArrayQueue, SegQueue};
use kanidm_client::{KanidmClient, KanidmClientBuilder};
use serde::Serialize;
use tokio::sync::broadcast;
use std::time::{Duration, Instant};
async fn actor_person(
main_client: KanidmClient,
person: Person,
stats_queue: Arc<SegQueue<EventRecord>>,
mut actor_rx: broadcast::Receiver<Signal>,
rng_seed: u64,
additional_clients: Vec<KanidmClient>,
warmup_time: Duration,
) -> Result<(), Error> {
let mut model =
person
.model
.as_dyn_object(rng_seed, additional_clients, &person.username, warmup_time)?;
while let Err(broadcast::error::TryRecvError::Empty) = actor_rx.try_recv() {
let events = model.transition(&main_client, &person).await?;
debug!("Pushed event to queue!");
for event in events.into_iter() {
stats_queue.push(event);
}
}
debug!("Stopped person {}", person.username);
Ok(())
}
#[derive(Debug)]
pub struct EventRecord {
pub start: Instant,
pub duration: Duration,
pub details: EventDetail,
}
#[derive(Debug, Serialize, Clone)]
pub enum EventDetail {
Login,
Logout,
PersonSetSelfMail,
PersonGetSelfAccount,
PersonGetSelfMemberOf,
PersonSetSelfPassword,
PersonReauth,
PersonCreateGroup,
PersonAddGroupMembers,
GroupReplicationDelay,
Error,
}
#[derive(Clone, Debug)]
pub enum Signal {
Stop,
}
async fn execute_inner(
warmup: Duration,
test_time: Option<Duration>,
mut control_rx: broadcast::Receiver<Signal>,
stat_ctrl: Arc<ArrayQueue<TestPhase>>,
) -> Result<(), Error> {
tokio::select! {
_ = tokio::time::sleep(warmup) => {
}
_ = control_rx.recv() => {
return Err(Error::Interrupt);
}
}
info!("warmup time passed, statistics will now be collected ...");
let start = Instant::now();
if let Err(crossbeam_err) = stat_ctrl.push(TestPhase::Start(start)) {
error!(
?crossbeam_err,
"Unable to signal statistics collector to start"
);
return Err(Error::Crossbeam);
}
if let Some(test_time) = test_time {
let sleep = tokio::time::sleep(test_time);
tokio::pin!(sleep);
let recv = (control_rx).recv();
tokio::pin!(recv);
tokio::select! {
_ = sleep => {
}
_ = recv => {
debug!("Interrupt");
return Err(Error::Interrupt);
}
}
} else {
let _ = control_rx.recv().await;
}
let end = Instant::now();
if let Err(crossbeam_err) = stat_ctrl.push(TestPhase::End(end)) {
error!(
?crossbeam_err,
"Unable to signal statistics collector to end"
);
return Err(Error::Crossbeam);
}
Ok(())
}
pub async fn execute(state: State, control_rx: broadcast::Receiver<Signal>) -> Result<(), Error> {
let stats_queue = Arc::new(SegQueue::new());
let stats_ctrl = Arc::new(ArrayQueue::new(4));
let c_stats_queue = stats_queue.clone();
let c_stats_ctrl = stats_ctrl.clone();
let node_count = 1 + state.profile.extra_uris().len();
let mut dyn_data_collector =
BasicStatistics::new(state.persons.len(), state.groups.len(), node_count);
let dump_raw_data = state.profile.dump_raw_data();
let stats_task = tokio::task::spawn_blocking(move || {
dyn_data_collector.run(c_stats_queue, c_stats_ctrl, dump_raw_data)
});
let mut seeded_rng = ChaCha8Rng::seed_from_u64(state.profile.seed());
let clients = std::iter::once(state.profile.control_uri().to_string())
.chain(state.profile.extra_uris().iter().cloned())
.map(|uri| {
KanidmClientBuilder::new()
.address(uri)
.danger_accept_invalid_hostnames(true)
.danger_accept_invalid_certs(true)
.build()
.map_err(|err| {
error!(?err, "Unable to create kanidm client");
Error::KanidmClient
})
})
.collect::<Result<Vec<_>, _>>()?;
let (actor_tx, _actor_rx) = broadcast::channel(1);
let mut tasks = Vec::with_capacity(state.persons.len());
for person in state.persons.into_iter() {
let mut cloned_clients: Vec<KanidmClient> = clients
.iter()
.map(|client| {
client.new_session().map_err(|err| {
error!(?err, "Unable to create a new kanidm client session");
Error::KanidmClient
})
})
.collect::<Result<Vec<_>, _>>()?;
let main_client_index = seeded_rng.gen_range(0..cloned_clients.len());
let main_client = cloned_clients.remove(main_client_index);
let c_stats_queue = stats_queue.clone();
let c_actor_rx = actor_tx.subscribe();
tasks.push(tokio::spawn(actor_person(
main_client,
person,
c_stats_queue,
c_actor_rx,
state.profile.seed(),
cloned_clients,
state.profile.warmup_time(),
)))
}
let warmup = state.profile.warmup_time();
let test_time = state.profile.test_time();
let c_stats_ctrl = stats_ctrl.clone();
let test_result = execute_inner(warmup, test_time, control_rx, c_stats_ctrl).await;
info!("stopping stats");
if let Err(crossbeam_err) = stats_ctrl.push(TestPhase::StopNow) {
error!(
?crossbeam_err,
"Unable to signal statistics collector to stop"
);
return Err(Error::Crossbeam);
}
info!("stopping workers");
actor_tx.send(Signal::Stop).map_err(|broadcast_err| {
error!(?broadcast_err, "Unable to signal workers to stop");
Error::Tokio
})?;
info!("joining workers");
for task in tasks {
task.await.map_err(|tokio_err| {
error!(?tokio_err, "Failed to join task");
Error::Tokio
})??;
}
stats_task.await.map_err(|tokio_err| {
error!(?tokio_err, "Failed to join statistics task");
Error::Tokio
})??;
test_result
}