1use crate::error::Error;
2use crate::run::{EventDetail, EventRecord};
3use chrono::Local;
4use crossbeam::queue::{ArrayQueue, SegQueue};
5use csv::Writer;
6use serde::Serialize;
7use std::sync::Arc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use mathru::statistics::distrib::{Continuous, Normal};
12
13#[derive(Debug)]
14pub enum TestPhase {
15 Start(Instant),
16 End(Instant),
17 StopNow,
18}
19
20pub trait DataCollector {
21 fn run(
22 &mut self,
23 stats_queue: Arc<SegQueue<EventRecord>>,
24 ctrl: Arc<ArrayQueue<TestPhase>>,
25 dump_raw_data: bool,
26 ) -> Result<(), Error>;
27}
28
29enum OpKind {
30 WriteOp,
31 ReadOp,
32 ReplicationDelay,
33 Auth, Error,
35}
36
37impl From<EventDetail> for OpKind {
38 fn from(value: EventDetail) -> Self {
39 match value {
40 EventDetail::PersonGetSelfMemberOf | EventDetail::PersonGetSelfAccount => {
41 OpKind::ReadOp
42 }
43 EventDetail::PersonSetSelfMail
44 | EventDetail::PersonSetSelfPassword
45 | EventDetail::PersonCreateGroup
46 | EventDetail::PersonAddGroupMembers => OpKind::WriteOp,
47 EventDetail::Login | EventDetail::Logout | EventDetail::PersonReauth => OpKind::Auth,
48 EventDetail::GroupReplicationDelay => OpKind::ReplicationDelay,
49 EventDetail::Error => OpKind::Error,
50 }
51 }
52}
53pub struct BasicStatistics {
54 person_count: usize,
55 group_count: usize,
56 node_count: usize,
57}
58
59impl BasicStatistics {
60 #[allow(clippy::new_ret_no_self)]
61 pub fn new(
62 person_count: usize,
63 group_count: usize,
64 node_count: usize,
65 ) -> Box<dyn DataCollector + Send> {
66 Box::new(BasicStatistics {
67 person_count,
68 group_count,
69 node_count,
70 })
71 }
72}
73
74impl DataCollector for BasicStatistics {
75 fn run(
76 &mut self,
77 stats_queue: Arc<SegQueue<EventRecord>>,
78 ctrl: Arc<ArrayQueue<TestPhase>>,
79 dump_raw_data: bool,
80 ) -> Result<(), Error> {
81 debug!("Started statistics collector");
82
83 let start = loop {
86 match ctrl.pop() {
87 Some(TestPhase::Start(start)) => {
88 break start;
89 }
90 Some(TestPhase::End(_)) => {
91 error!("invalid state");
92 return Err(Error::InvalidState);
94 }
95 Some(TestPhase::StopNow) => {
96 return Ok(());
98 }
99 None => thread::sleep(Duration::from_millis(100)),
100 }
101 };
102
103 let end = loop {
105 match ctrl.pop() {
106 Some(TestPhase::Start(_)) => {
107 return Err(Error::InvalidState);
109 }
110 Some(TestPhase::End(end)) => {
111 break end;
112 }
113 Some(TestPhase::StopNow) => {
114 warn!("requested to stop now!");
115 return Ok(());
117 }
118 None => thread::sleep(Duration::from_millis(100)),
119 }
120 };
121
122 info!("start statistics processing ...");
123
124 let mut readop_times = Vec::new();
125 let mut writeop_times = Vec::new();
126 let mut replication_delays = Vec::new();
127 let mut raw_stats = Vec::new();
128
129 while let Some(event_record) = stats_queue.pop() {
131 if event_record.start < start || event_record.start > end {
132 continue;
134 }
135
136 if dump_raw_data {
137 raw_stats.push(SerializableEventRecord::from_event_record(
138 &event_record,
139 start,
140 ));
141 }
142
143 match OpKind::from(event_record.details) {
144 OpKind::ReadOp => {
145 readop_times.push(event_record.duration.as_secs_f64());
146 }
147 OpKind::WriteOp => {
148 writeop_times.push(event_record.duration.as_secs_f64());
149 }
150 OpKind::ReplicationDelay => {
151 replication_delays.push(event_record.duration.as_secs_f64())
152 }
153 OpKind::Auth => {}
154 OpKind::Error => {}
155 }
156 }
157
158 if readop_times.is_empty() && writeop_times.is_empty() && replication_delays.is_empty() {
159 error!("For some weird reason no valid data was recorded in this benchmark, bailing out...");
160 return Err(Error::InvalidState);
161 }
162
163 let stats = StatsContainer::new(
164 &readop_times,
165 &writeop_times,
166 &replication_delays,
167 self.node_count,
168 self.person_count,
169 self.group_count,
170 );
171
172 info!(
173 "Server configuration was: {} nodes, {} users and {} groups",
174 self.node_count, self.person_count, self.group_count
175 );
176
177 info!("Received {} read events", stats.read_events);
178
179 info!("mean: {} seconds", stats.read_mean);
180 info!("variance: {} seconds", stats.read_variance);
181 info!("SD: {} seconds", stats.read_sd);
182 info!("95%: {}", stats.read_95);
183
184 info!("Received {} write events", stats.write_events);
185
186 info!("mean: {} seconds", stats.write_mean);
187 info!("variance: {} seconds", stats.write_variance);
188 info!("SD: {} seconds", stats.write_sd);
189 info!("95%: {}", stats.write_95);
190
191 info!(
192 "Received {} replication delays",
193 stats.replication_delay_events
194 );
195
196 info!("mean: {} seconds", stats.replication_delay_mean);
197 info!("variance: {} seconds", stats.replication_delay_variance);
198 info!("SD: {} seconds", stats.replication_delay_sd);
199 info!("95%: {}", stats.replication_delay_95);
200
201 let now = Local::now();
202 let filepath = format!("orca-run-{}.csv", now.to_rfc3339());
203
204 info!("Now saving stats as '{filepath}'");
205
206 let mut wrt = Writer::from_path(filepath).map_err(|_| Error::Io)?;
207 wrt.serialize(stats).map_err(|_| Error::Io)?;
208
209 if dump_raw_data {
210 let raw_data_filepath = format!("orca-run-{}-raw.csv", now.to_rfc3339());
211 info!("Now saving raw data as '{raw_data_filepath}'");
212
213 let mut wrt = Writer::from_path(raw_data_filepath).map_err(|_| Error::Io)?;
214
215 for record in raw_stats.iter() {
216 wrt.serialize(record).map_err(|_| Error::Io)?;
217 }
218 }
219
220 debug!("Ended statistics collector");
221
222 Ok(())
223 }
224}
225
226#[derive(Serialize)]
227struct SerializableEventRecord {
228 time_from_start_ms: u128,
229 duration_ms: u128,
230 details: EventDetail,
231}
232
233impl SerializableEventRecord {
234 fn from_event_record(event_record: &EventRecord, test_start: Instant) -> Self {
235 SerializableEventRecord {
236 time_from_start_ms: event_record.start.duration_since(test_start).as_millis(),
237 duration_ms: event_record.duration.as_millis(),
238 details: event_record.details.clone(),
239 }
240 }
241}
242
243#[derive(Serialize)]
244struct StatsContainer {
245 node_count: usize,
246 person_count: usize,
247 group_count: usize,
248 read_events: usize,
249 read_sd: f64,
250 read_mean: f64,
251 read_variance: f64,
252 read_95: f64,
253 write_events: usize,
254 write_sd: f64,
255 write_mean: f64,
256 write_variance: f64,
257 write_95: f64,
258 replication_delay_events: usize,
259 replication_delay_sd: f64,
260 replication_delay_mean: f64,
261 replication_delay_variance: f64,
262 replication_delay_95: f64,
263}
264
265type EventCount = usize;
267type Mean = f64;
268type Sd = f64;
269type Variance = f64;
270type Percentile95 = f64;
271
272impl StatsContainer {
273 fn new(
274 readop_times: &Vec<f64>,
275 writeop_times: &Vec<f64>,
276 replication_delays: &Vec<f64>,
277 node_count: usize,
278 person_count: usize,
279 group_count: usize,
280 ) -> Self {
281 let (read_events, read_mean, read_variance, read_sd, read_95) =
282 Self::compute_stats_from_timings_vec(readop_times);
283
284 let (write_events, write_mean, write_variance, write_sd, write_95) =
285 Self::compute_stats_from_timings_vec(writeop_times);
286
287 let (
288 replication_delay_events,
289 replication_delay_mean,
290 replication_delay_variance,
291 replication_delay_sd,
292 replication_delay_95,
293 ) = Self::compute_stats_from_timings_vec(replication_delays);
294
295 StatsContainer {
296 person_count,
297 group_count,
298 node_count,
299 read_events,
300 read_sd,
301 read_mean,
302 read_variance,
303 read_95,
304 write_events,
305 write_sd,
306 write_mean,
307 write_variance,
308 write_95,
309 replication_delay_events,
310 replication_delay_sd,
311 replication_delay_mean,
312 replication_delay_variance,
313 replication_delay_95,
314 }
315 }
316
317 fn compute_stats_from_timings_vec(
318 op_times: &Vec<f64>,
319 ) -> (EventCount, Mean, Variance, Sd, Percentile95) {
320 let op_times_len = op_times.len();
321 if op_times_len >= 2 {
322 let distr = Normal::from_data(op_times);
323 let mean = distr.mean();
324 let variance = distr.variance();
325 let sd = variance.sqrt();
326 let percentile_95 = mean + 2. * sd;
327 (op_times_len, mean, variance, sd, percentile_95)
328 } else {
329 (0, 0., 0., 0., 0.)
330 }
331 }
332}