orca/
stats.rs

1use crate::error::Error;
2use crate::run::{EventDetail, EventRecord};
3use chrono::Local;
4use crossbeam::queue::{ArrayQueue, SegQueue};
5use csv::Writer;
6use serde::Serialize;
7use std::sync::Arc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use mathru::statistics::distrib::{Continuous, Normal};
12
13#[derive(Debug)]
14pub enum TestPhase {
15    Start(Instant),
16    End(Instant),
17    StopNow,
18}
19
20pub trait DataCollector {
21    fn run(
22        &mut self,
23        stats_queue: Arc<SegQueue<EventRecord>>,
24        ctrl: Arc<ArrayQueue<TestPhase>>,
25        dump_raw_data: bool,
26    ) -> Result<(), Error>;
27}
28
29enum OpKind {
30    WriteOp,
31    ReadOp,
32    ReplicationDelay,
33    Auth, //TODO! does this make sense?
34    Error,
35}
36
37impl From<EventDetail> for OpKind {
38    fn from(value: EventDetail) -> Self {
39        match value {
40            EventDetail::PersonGetSelfMemberOf | EventDetail::PersonGetSelfAccount => {
41                OpKind::ReadOp
42            }
43            EventDetail::PersonSetSelfMail
44            | EventDetail::PersonSetSelfPassword
45            | EventDetail::PersonCreateGroup
46            | EventDetail::PersonAddGroupMembers => OpKind::WriteOp,
47            EventDetail::Login | EventDetail::Logout | EventDetail::PersonReauth => OpKind::Auth,
48            EventDetail::GroupReplicationDelay => OpKind::ReplicationDelay,
49            EventDetail::Error => OpKind::Error,
50        }
51    }
52}
53pub struct BasicStatistics {
54    person_count: usize,
55    group_count: usize,
56    node_count: usize,
57}
58
59impl BasicStatistics {
60    #[allow(clippy::new_ret_no_self)]
61    pub fn new(
62        person_count: usize,
63        group_count: usize,
64        node_count: usize,
65    ) -> Box<dyn DataCollector + Send> {
66        Box::new(BasicStatistics {
67            person_count,
68            group_count,
69            node_count,
70        })
71    }
72}
73
74impl DataCollector for BasicStatistics {
75    fn run(
76        &mut self,
77        stats_queue: Arc<SegQueue<EventRecord>>,
78        ctrl: Arc<ArrayQueue<TestPhase>>,
79        dump_raw_data: bool,
80    ) -> Result<(), Error> {
81        debug!("Started statistics collector");
82
83        // Wait for an event on ctrl. We use small amounts of backoff if none are
84        // present yet.
85        let start = loop {
86            match ctrl.pop() {
87                Some(TestPhase::Start(start)) => {
88                    break start;
89                }
90                Some(TestPhase::End(_)) => {
91                    error!("invalid state");
92                    // Invalid state.
93                    return Err(Error::InvalidState);
94                }
95                Some(TestPhase::StopNow) => {
96                    // We have been told to stop immediately.
97                    return Ok(());
98                }
99                None => {
100                    #[allow(clippy::disallowed_methods)]
101                    // Allowed as this is a backoff in a synchronous code loop
102                    thread::sleep(Duration::from_millis(100))
103                }
104            }
105        };
106
107        // Due to the design of this collector, we don't do anything until the end of the test.
108        let end = loop {
109            match ctrl.pop() {
110                Some(TestPhase::Start(_)) => {
111                    // Invalid state.
112                    return Err(Error::InvalidState);
113                }
114                Some(TestPhase::End(end)) => {
115                    break end;
116                }
117                Some(TestPhase::StopNow) => {
118                    warn!("requested to stop now!");
119                    // We have been told to stop immediately.
120                    return Ok(());
121                }
122                None => {
123                    #[allow(clippy::disallowed_methods)]
124                    // Allowed as this is a backoff in a synchronous code loop
125                    thread::sleep(Duration::from_millis(100))
126                }
127            }
128        };
129
130        info!("start statistics processing ...");
131
132        let mut readop_times = Vec::new();
133        let mut writeop_times = Vec::new();
134        let mut replication_delays = Vec::new();
135        let mut raw_stats = Vec::new();
136
137        // We will drain this now.
138        while let Some(event_record) = stats_queue.pop() {
139            if event_record.start < start || event_record.start > end {
140                // Skip event, outside of the test time window
141                continue;
142            }
143
144            if dump_raw_data {
145                raw_stats.push(SerializableEventRecord::from_event_record(
146                    &event_record,
147                    start,
148                ));
149            }
150
151            match OpKind::from(event_record.details) {
152                OpKind::ReadOp => {
153                    readop_times.push(event_record.duration.as_secs_f64());
154                }
155                OpKind::WriteOp => {
156                    writeop_times.push(event_record.duration.as_secs_f64());
157                }
158                OpKind::ReplicationDelay => {
159                    replication_delays.push(event_record.duration.as_secs_f64())
160                }
161                OpKind::Auth => {}
162                OpKind::Error => {}
163            }
164        }
165
166        if readop_times.is_empty() && writeop_times.is_empty() && replication_delays.is_empty() {
167            error!("For some weird reason no valid data was recorded in this benchmark, bailing out...");
168            return Err(Error::InvalidState);
169        }
170
171        let stats = StatsContainer::new(
172            &readop_times,
173            &writeop_times,
174            &replication_delays,
175            self.node_count,
176            self.person_count,
177            self.group_count,
178        );
179
180        info!(
181            "Server configuration was: {} nodes, {} users and {} groups",
182            self.node_count, self.person_count, self.group_count
183        );
184
185        info!("Received {} read events", stats.read_events);
186
187        info!("mean: {} seconds", stats.read_mean);
188        info!("variance: {} seconds", stats.read_variance);
189        info!("SD: {} seconds", stats.read_sd);
190        info!("95%: {}", stats.read_95);
191
192        info!("Received {} write events", stats.write_events);
193
194        info!("mean: {} seconds", stats.write_mean);
195        info!("variance: {} seconds", stats.write_variance);
196        info!("SD: {} seconds", stats.write_sd);
197        info!("95%: {}", stats.write_95);
198
199        info!(
200            "Received {} replication delays",
201            stats.replication_delay_events
202        );
203
204        info!("mean: {} seconds", stats.replication_delay_mean);
205        info!("variance: {} seconds", stats.replication_delay_variance);
206        info!("SD: {} seconds", stats.replication_delay_sd);
207        info!("95%: {}", stats.replication_delay_95);
208
209        let now = Local::now();
210        let filepath = format!("orca-run-{}.csv", now.to_rfc3339());
211
212        info!("Now saving stats as '{filepath}'");
213
214        let mut wrt = Writer::from_path(filepath).map_err(|_| Error::Io)?;
215        wrt.serialize(stats).map_err(|_| Error::Io)?;
216
217        if dump_raw_data {
218            let raw_data_filepath = format!("orca-run-{}-raw.csv", now.to_rfc3339());
219            info!("Now saving raw data as '{raw_data_filepath}'");
220
221            let mut wrt = Writer::from_path(raw_data_filepath).map_err(|_| Error::Io)?;
222
223            for record in raw_stats.iter() {
224                wrt.serialize(record).map_err(|_| Error::Io)?;
225            }
226        }
227
228        debug!("Ended statistics collector");
229
230        Ok(())
231    }
232}
233
234#[derive(Serialize)]
235struct SerializableEventRecord {
236    time_from_start_ms: u128,
237    duration_ms: u128,
238    details: EventDetail,
239}
240
241impl SerializableEventRecord {
242    fn from_event_record(event_record: &EventRecord, test_start: Instant) -> Self {
243        SerializableEventRecord {
244            time_from_start_ms: event_record.start.duration_since(test_start).as_millis(),
245            duration_ms: event_record.duration.as_millis(),
246            details: event_record.details.clone(),
247        }
248    }
249}
250
251#[derive(Serialize)]
252struct StatsContainer {
253    node_count: usize,
254    person_count: usize,
255    group_count: usize,
256    read_events: usize,
257    read_sd: f64,
258    read_mean: f64,
259    read_variance: f64,
260    read_95: f64,
261    write_events: usize,
262    write_sd: f64,
263    write_mean: f64,
264    write_variance: f64,
265    write_95: f64,
266    replication_delay_events: usize,
267    replication_delay_sd: f64,
268    replication_delay_mean: f64,
269    replication_delay_variance: f64,
270    replication_delay_95: f64,
271}
272
273// These should help prevent confusion when using 'compute_stats_from_timings_vec'
274type EventCount = usize;
275type Mean = f64;
276type Sd = f64;
277type Variance = f64;
278type Percentile95 = f64;
279
280impl StatsContainer {
281    fn new(
282        readop_times: &[f64],
283        writeop_times: &[f64],
284        replication_delays: &[f64],
285        node_count: usize,
286        person_count: usize,
287        group_count: usize,
288    ) -> Self {
289        let (read_events, read_mean, read_variance, read_sd, read_95) =
290            Self::compute_stats_from_timings_vec(readop_times);
291
292        let (write_events, write_mean, write_variance, write_sd, write_95) =
293            Self::compute_stats_from_timings_vec(writeop_times);
294
295        let (
296            replication_delay_events,
297            replication_delay_mean,
298            replication_delay_variance,
299            replication_delay_sd,
300            replication_delay_95,
301        ) = Self::compute_stats_from_timings_vec(replication_delays);
302
303        StatsContainer {
304            person_count,
305            group_count,
306            node_count,
307            read_events,
308            read_sd,
309            read_mean,
310            read_variance,
311            read_95,
312            write_events,
313            write_sd,
314            write_mean,
315            write_variance,
316            write_95,
317            replication_delay_events,
318            replication_delay_sd,
319            replication_delay_mean,
320            replication_delay_variance,
321            replication_delay_95,
322        }
323    }
324
325    fn compute_stats_from_timings_vec(
326        op_times: &[f64],
327    ) -> (EventCount, Mean, Variance, Sd, Percentile95) {
328        let op_times_len = op_times.len();
329        if op_times_len >= 2 {
330            let distr = Normal::from_data(op_times);
331            let mean = distr.mean();
332            let variance = distr.variance();
333            let sd = variance.sqrt();
334            let percentile_95 = mean + 2. * sd;
335            (op_times_len, mean, variance, sd, percentile_95)
336        } else {
337            (0, 0., 0., 0., 0.)
338        }
339    }
340}