1use crate::error::Error;
2use crate::run::{EventDetail, EventRecord};
3use chrono::Local;
4use crossbeam::queue::{ArrayQueue, SegQueue};
5use csv::Writer;
6use serde::Serialize;
7use std::sync::Arc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use mathru::statistics::distrib::{Continuous, Normal};
12
13#[derive(Debug)]
14pub enum TestPhase {
15 Start(Instant),
16 End(Instant),
17 StopNow,
18}
19
20pub trait DataCollector {
21 fn run(
22 &mut self,
23 stats_queue: Arc<SegQueue<EventRecord>>,
24 ctrl: Arc<ArrayQueue<TestPhase>>,
25 dump_raw_data: bool,
26 ) -> Result<(), Error>;
27}
28
29enum OpKind {
30 WriteOp,
31 ReadOp,
32 ReplicationDelay,
33 Auth, Error,
35}
36
37impl From<EventDetail> for OpKind {
38 fn from(value: EventDetail) -> Self {
39 match value {
40 EventDetail::PersonGetSelfMemberOf | EventDetail::PersonGetSelfAccount => {
41 OpKind::ReadOp
42 }
43 EventDetail::PersonSetSelfMail
44 | EventDetail::PersonSetSelfPassword
45 | EventDetail::PersonCreateGroup
46 | EventDetail::PersonAddGroupMembers => OpKind::WriteOp,
47 EventDetail::Login | EventDetail::Logout | EventDetail::PersonReauth => OpKind::Auth,
48 EventDetail::GroupReplicationDelay => OpKind::ReplicationDelay,
49 EventDetail::Error => OpKind::Error,
50 }
51 }
52}
53pub struct BasicStatistics {
54 person_count: usize,
55 group_count: usize,
56 node_count: usize,
57}
58
59impl BasicStatistics {
60 #[allow(clippy::new_ret_no_self)]
61 pub fn new(
62 person_count: usize,
63 group_count: usize,
64 node_count: usize,
65 ) -> Box<dyn DataCollector + Send> {
66 Box::new(BasicStatistics {
67 person_count,
68 group_count,
69 node_count,
70 })
71 }
72}
73
74impl DataCollector for BasicStatistics {
75 fn run(
76 &mut self,
77 stats_queue: Arc<SegQueue<EventRecord>>,
78 ctrl: Arc<ArrayQueue<TestPhase>>,
79 dump_raw_data: bool,
80 ) -> Result<(), Error> {
81 debug!("Started statistics collector");
82
83 let start = loop {
86 match ctrl.pop() {
87 Some(TestPhase::Start(start)) => {
88 break start;
89 }
90 Some(TestPhase::End(_)) => {
91 error!("invalid state");
92 return Err(Error::InvalidState);
94 }
95 Some(TestPhase::StopNow) => {
96 return Ok(());
98 }
99 None => {
100 #[allow(clippy::disallowed_methods)]
101 thread::sleep(Duration::from_millis(100))
103 }
104 }
105 };
106
107 let end = loop {
109 match ctrl.pop() {
110 Some(TestPhase::Start(_)) => {
111 return Err(Error::InvalidState);
113 }
114 Some(TestPhase::End(end)) => {
115 break end;
116 }
117 Some(TestPhase::StopNow) => {
118 warn!("requested to stop now!");
119 return Ok(());
121 }
122 None => {
123 #[allow(clippy::disallowed_methods)]
124 thread::sleep(Duration::from_millis(100))
126 }
127 }
128 };
129
130 info!("start statistics processing ...");
131
132 let mut readop_times = Vec::new();
133 let mut writeop_times = Vec::new();
134 let mut replication_delays = Vec::new();
135 let mut raw_stats = Vec::new();
136
137 while let Some(event_record) = stats_queue.pop() {
139 if event_record.start < start || event_record.start > end {
140 continue;
142 }
143
144 if dump_raw_data {
145 raw_stats.push(SerializableEventRecord::from_event_record(
146 &event_record,
147 start,
148 ));
149 }
150
151 match OpKind::from(event_record.details) {
152 OpKind::ReadOp => {
153 readop_times.push(event_record.duration.as_secs_f64());
154 }
155 OpKind::WriteOp => {
156 writeop_times.push(event_record.duration.as_secs_f64());
157 }
158 OpKind::ReplicationDelay => {
159 replication_delays.push(event_record.duration.as_secs_f64())
160 }
161 OpKind::Auth => {}
162 OpKind::Error => {}
163 }
164 }
165
166 if readop_times.is_empty() && writeop_times.is_empty() && replication_delays.is_empty() {
167 error!("For some weird reason no valid data was recorded in this benchmark, bailing out...");
168 return Err(Error::InvalidState);
169 }
170
171 let stats = StatsContainer::new(
172 &readop_times,
173 &writeop_times,
174 &replication_delays,
175 self.node_count,
176 self.person_count,
177 self.group_count,
178 );
179
180 info!(
181 "Server configuration was: {} nodes, {} users and {} groups",
182 self.node_count, self.person_count, self.group_count
183 );
184
185 info!("Received {} read events", stats.read_events);
186
187 info!("mean: {} seconds", stats.read_mean);
188 info!("variance: {} seconds", stats.read_variance);
189 info!("SD: {} seconds", stats.read_sd);
190 info!("95%: {}", stats.read_95);
191
192 info!("Received {} write events", stats.write_events);
193
194 info!("mean: {} seconds", stats.write_mean);
195 info!("variance: {} seconds", stats.write_variance);
196 info!("SD: {} seconds", stats.write_sd);
197 info!("95%: {}", stats.write_95);
198
199 info!(
200 "Received {} replication delays",
201 stats.replication_delay_events
202 );
203
204 info!("mean: {} seconds", stats.replication_delay_mean);
205 info!("variance: {} seconds", stats.replication_delay_variance);
206 info!("SD: {} seconds", stats.replication_delay_sd);
207 info!("95%: {}", stats.replication_delay_95);
208
209 let now = Local::now();
210 let filepath = format!("orca-run-{}.csv", now.to_rfc3339());
211
212 info!("Now saving stats as '{filepath}'");
213
214 let mut wrt = Writer::from_path(filepath).map_err(|_| Error::Io)?;
215 wrt.serialize(stats).map_err(|_| Error::Io)?;
216
217 if dump_raw_data {
218 let raw_data_filepath = format!("orca-run-{}-raw.csv", now.to_rfc3339());
219 info!("Now saving raw data as '{raw_data_filepath}'");
220
221 let mut wrt = Writer::from_path(raw_data_filepath).map_err(|_| Error::Io)?;
222
223 for record in raw_stats.iter() {
224 wrt.serialize(record).map_err(|_| Error::Io)?;
225 }
226 }
227
228 debug!("Ended statistics collector");
229
230 Ok(())
231 }
232}
233
234#[derive(Serialize)]
235struct SerializableEventRecord {
236 time_from_start_ms: u128,
237 duration_ms: u128,
238 details: EventDetail,
239}
240
241impl SerializableEventRecord {
242 fn from_event_record(event_record: &EventRecord, test_start: Instant) -> Self {
243 SerializableEventRecord {
244 time_from_start_ms: event_record.start.duration_since(test_start).as_millis(),
245 duration_ms: event_record.duration.as_millis(),
246 details: event_record.details.clone(),
247 }
248 }
249}
250
251#[derive(Serialize)]
252struct StatsContainer {
253 node_count: usize,
254 person_count: usize,
255 group_count: usize,
256 read_events: usize,
257 read_sd: f64,
258 read_mean: f64,
259 read_variance: f64,
260 read_95: f64,
261 write_events: usize,
262 write_sd: f64,
263 write_mean: f64,
264 write_variance: f64,
265 write_95: f64,
266 replication_delay_events: usize,
267 replication_delay_sd: f64,
268 replication_delay_mean: f64,
269 replication_delay_variance: f64,
270 replication_delay_95: f64,
271}
272
273type EventCount = usize;
275type Mean = f64;
276type Sd = f64;
277type Variance = f64;
278type Percentile95 = f64;
279
280impl StatsContainer {
281 fn new(
282 readop_times: &[f64],
283 writeop_times: &[f64],
284 replication_delays: &[f64],
285 node_count: usize,
286 person_count: usize,
287 group_count: usize,
288 ) -> Self {
289 let (read_events, read_mean, read_variance, read_sd, read_95) =
290 Self::compute_stats_from_timings_vec(readop_times);
291
292 let (write_events, write_mean, write_variance, write_sd, write_95) =
293 Self::compute_stats_from_timings_vec(writeop_times);
294
295 let (
296 replication_delay_events,
297 replication_delay_mean,
298 replication_delay_variance,
299 replication_delay_sd,
300 replication_delay_95,
301 ) = Self::compute_stats_from_timings_vec(replication_delays);
302
303 StatsContainer {
304 person_count,
305 group_count,
306 node_count,
307 read_events,
308 read_sd,
309 read_mean,
310 read_variance,
311 read_95,
312 write_events,
313 write_sd,
314 write_mean,
315 write_variance,
316 write_95,
317 replication_delay_events,
318 replication_delay_sd,
319 replication_delay_mean,
320 replication_delay_variance,
321 replication_delay_95,
322 }
323 }
324
325 fn compute_stats_from_timings_vec(
326 op_times: &[f64],
327 ) -> (EventCount, Mean, Variance, Sd, Percentile95) {
328 let op_times_len = op_times.len();
329 if op_times_len >= 2 {
330 let distr = Normal::from_data(op_times);
331 let mean = distr.mean();
332 let variance = distr.variance();
333 let sd = variance.sqrt();
334 let percentile_95 = mean + 2. * sd;
335 (op_times_len, mean, variance, sd, percentile_95)
336 } else {
337 (0, 0., 0., 0., 0.)
338 }
339 }
340}