orca/
stats.rs

1use crate::error::Error;
2use crate::run::{EventDetail, EventRecord};
3use chrono::Local;
4use crossbeam::queue::{ArrayQueue, SegQueue};
5use csv::Writer;
6use serde::Serialize;
7use std::sync::Arc;
8use std::thread;
9use std::time::{Duration, Instant};
10
11use mathru::statistics::distrib::{Continuous, Normal};
12
13#[derive(Debug)]
14pub enum TestPhase {
15    Start(Instant),
16    End(Instant),
17    StopNow,
18}
19
20pub trait DataCollector {
21    fn run(
22        &mut self,
23        stats_queue: Arc<SegQueue<EventRecord>>,
24        ctrl: Arc<ArrayQueue<TestPhase>>,
25        dump_raw_data: bool,
26    ) -> Result<(), Error>;
27}
28
29enum OpKind {
30    WriteOp,
31    ReadOp,
32    ReplicationDelay,
33    Auth, //TODO! does this make sense?
34    Error,
35}
36
37impl From<EventDetail> for OpKind {
38    fn from(value: EventDetail) -> Self {
39        match value {
40            EventDetail::PersonGetSelfMemberOf | EventDetail::PersonGetSelfAccount => {
41                OpKind::ReadOp
42            }
43            EventDetail::PersonSetSelfMail
44            | EventDetail::PersonSetSelfPassword
45            | EventDetail::PersonCreateGroup
46            | EventDetail::PersonAddGroupMembers => OpKind::WriteOp,
47            EventDetail::Login | EventDetail::Logout | EventDetail::PersonReauth => OpKind::Auth,
48            EventDetail::GroupReplicationDelay => OpKind::ReplicationDelay,
49            EventDetail::Error => OpKind::Error,
50        }
51    }
52}
53pub struct BasicStatistics {
54    person_count: usize,
55    group_count: usize,
56    node_count: usize,
57}
58
59impl BasicStatistics {
60    #[allow(clippy::new_ret_no_self)]
61    pub fn new(
62        person_count: usize,
63        group_count: usize,
64        node_count: usize,
65    ) -> Box<dyn DataCollector + Send> {
66        Box::new(BasicStatistics {
67            person_count,
68            group_count,
69            node_count,
70        })
71    }
72}
73
74impl DataCollector for BasicStatistics {
75    fn run(
76        &mut self,
77        stats_queue: Arc<SegQueue<EventRecord>>,
78        ctrl: Arc<ArrayQueue<TestPhase>>,
79        dump_raw_data: bool,
80    ) -> Result<(), Error> {
81        debug!("Started statistics collector");
82
83        // Wait for an event on ctrl. We use small amounts of backoff if none are
84        // present yet.
85        let start = loop {
86            match ctrl.pop() {
87                Some(TestPhase::Start(start)) => {
88                    break start;
89                }
90                Some(TestPhase::End(_)) => {
91                    error!("invalid state");
92                    // Invalid state.
93                    return Err(Error::InvalidState);
94                }
95                Some(TestPhase::StopNow) => {
96                    // We have been told to stop immediately.
97                    return Ok(());
98                }
99                None => thread::sleep(Duration::from_millis(100)),
100            }
101        };
102
103        // Due to the design of this collector, we don't do anything until the end of the test.
104        let end = loop {
105            match ctrl.pop() {
106                Some(TestPhase::Start(_)) => {
107                    // Invalid state.
108                    return Err(Error::InvalidState);
109                }
110                Some(TestPhase::End(end)) => {
111                    break end;
112                }
113                Some(TestPhase::StopNow) => {
114                    warn!("requested to stop now!");
115                    // We have been told to stop immediately.
116                    return Ok(());
117                }
118                None => thread::sleep(Duration::from_millis(100)),
119            }
120        };
121
122        info!("start statistics processing ...");
123
124        let mut readop_times = Vec::new();
125        let mut writeop_times = Vec::new();
126        let mut replication_delays = Vec::new();
127        let mut raw_stats = Vec::new();
128
129        // We will drain this now.
130        while let Some(event_record) = stats_queue.pop() {
131            if event_record.start < start || event_record.start > end {
132                // Skip event, outside of the test time window
133                continue;
134            }
135
136            if dump_raw_data {
137                raw_stats.push(SerializableEventRecord::from_event_record(
138                    &event_record,
139                    start,
140                ));
141            }
142
143            match OpKind::from(event_record.details) {
144                OpKind::ReadOp => {
145                    readop_times.push(event_record.duration.as_secs_f64());
146                }
147                OpKind::WriteOp => {
148                    writeop_times.push(event_record.duration.as_secs_f64());
149                }
150                OpKind::ReplicationDelay => {
151                    replication_delays.push(event_record.duration.as_secs_f64())
152                }
153                OpKind::Auth => {}
154                OpKind::Error => {}
155            }
156        }
157
158        if readop_times.is_empty() && writeop_times.is_empty() && replication_delays.is_empty() {
159            error!("For some weird reason no valid data was recorded in this benchmark, bailing out...");
160            return Err(Error::InvalidState);
161        }
162
163        let stats = StatsContainer::new(
164            &readop_times,
165            &writeop_times,
166            &replication_delays,
167            self.node_count,
168            self.person_count,
169            self.group_count,
170        );
171
172        info!(
173            "Server configuration was: {} nodes, {} users and {} groups",
174            self.node_count, self.person_count, self.group_count
175        );
176
177        info!("Received {} read events", stats.read_events);
178
179        info!("mean: {} seconds", stats.read_mean);
180        info!("variance: {} seconds", stats.read_variance);
181        info!("SD: {} seconds", stats.read_sd);
182        info!("95%: {}", stats.read_95);
183
184        info!("Received {} write events", stats.write_events);
185
186        info!("mean: {} seconds", stats.write_mean);
187        info!("variance: {} seconds", stats.write_variance);
188        info!("SD: {} seconds", stats.write_sd);
189        info!("95%: {}", stats.write_95);
190
191        info!(
192            "Received {} replication delays",
193            stats.replication_delay_events
194        );
195
196        info!("mean: {} seconds", stats.replication_delay_mean);
197        info!("variance: {} seconds", stats.replication_delay_variance);
198        info!("SD: {} seconds", stats.replication_delay_sd);
199        info!("95%: {}", stats.replication_delay_95);
200
201        let now = Local::now();
202        let filepath = format!("orca-run-{}.csv", now.to_rfc3339());
203
204        info!("Now saving stats as '{filepath}'");
205
206        let mut wrt = Writer::from_path(filepath).map_err(|_| Error::Io)?;
207        wrt.serialize(stats).map_err(|_| Error::Io)?;
208
209        if dump_raw_data {
210            let raw_data_filepath = format!("orca-run-{}-raw.csv", now.to_rfc3339());
211            info!("Now saving raw data as '{raw_data_filepath}'");
212
213            let mut wrt = Writer::from_path(raw_data_filepath).map_err(|_| Error::Io)?;
214
215            for record in raw_stats.iter() {
216                wrt.serialize(record).map_err(|_| Error::Io)?;
217            }
218        }
219
220        debug!("Ended statistics collector");
221
222        Ok(())
223    }
224}
225
226#[derive(Serialize)]
227struct SerializableEventRecord {
228    time_from_start_ms: u128,
229    duration_ms: u128,
230    details: EventDetail,
231}
232
233impl SerializableEventRecord {
234    fn from_event_record(event_record: &EventRecord, test_start: Instant) -> Self {
235        SerializableEventRecord {
236            time_from_start_ms: event_record.start.duration_since(test_start).as_millis(),
237            duration_ms: event_record.duration.as_millis(),
238            details: event_record.details.clone(),
239        }
240    }
241}
242
243#[derive(Serialize)]
244struct StatsContainer {
245    node_count: usize,
246    person_count: usize,
247    group_count: usize,
248    read_events: usize,
249    read_sd: f64,
250    read_mean: f64,
251    read_variance: f64,
252    read_95: f64,
253    write_events: usize,
254    write_sd: f64,
255    write_mean: f64,
256    write_variance: f64,
257    write_95: f64,
258    replication_delay_events: usize,
259    replication_delay_sd: f64,
260    replication_delay_mean: f64,
261    replication_delay_variance: f64,
262    replication_delay_95: f64,
263}
264
265// These should help prevent confusion when using 'compute_stats_from_timings_vec'
266type EventCount = usize;
267type Mean = f64;
268type Sd = f64;
269type Variance = f64;
270type Percentile95 = f64;
271
272impl StatsContainer {
273    fn new(
274        readop_times: &Vec<f64>,
275        writeop_times: &Vec<f64>,
276        replication_delays: &Vec<f64>,
277        node_count: usize,
278        person_count: usize,
279        group_count: usize,
280    ) -> Self {
281        let (read_events, read_mean, read_variance, read_sd, read_95) =
282            Self::compute_stats_from_timings_vec(readop_times);
283
284        let (write_events, write_mean, write_variance, write_sd, write_95) =
285            Self::compute_stats_from_timings_vec(writeop_times);
286
287        let (
288            replication_delay_events,
289            replication_delay_mean,
290            replication_delay_variance,
291            replication_delay_sd,
292            replication_delay_95,
293        ) = Self::compute_stats_from_timings_vec(replication_delays);
294
295        StatsContainer {
296            person_count,
297            group_count,
298            node_count,
299            read_events,
300            read_sd,
301            read_mean,
302            read_variance,
303            read_95,
304            write_events,
305            write_sd,
306            write_mean,
307            write_variance,
308            write_95,
309            replication_delay_events,
310            replication_delay_sd,
311            replication_delay_mean,
312            replication_delay_variance,
313            replication_delay_95,
314        }
315    }
316
317    fn compute_stats_from_timings_vec(
318        op_times: &Vec<f64>,
319    ) -> (EventCount, Mean, Variance, Sd, Percentile95) {
320        let op_times_len = op_times.len();
321        if op_times_len >= 2 {
322            let distr = Normal::from_data(op_times);
323            let mean = distr.mean();
324            let variance = distr.variance();
325            let sd = variance.sqrt();
326            let percentile_95 = mean + 2. * sd;
327            (op_times_len, mean, variance, sd, percentile_95)
328        } else {
329            (0, 0., 0., 0., 0.)
330        }
331    }
332}