1use super::cid::Cid;
2use crate::be::dbrepl::DbEntryChangeState;
3use crate::be::dbvalue::DbCidV1;
4use crate::entry::Eattrs;
5use crate::prelude::*;
6use crate::schema::SchemaTransaction;
7
8use std::collections::BTreeMap;
9
10#[derive(Debug, Clone)]
11pub enum State {
12 Live {
13 at: Cid,
14 changes: BTreeMap<Attribute, Cid>,
15 },
16 Tombstone {
17 at: Cid,
18 },
19}
20
21#[derive(Debug, Clone)]
22pub struct EntryChangeState {
23 pub(super) st: State,
24}
25
26impl EntryChangeState {
27 pub fn new(cid: &Cid, attrs: &Eattrs, _schema: &dyn SchemaTransaction) -> Self {
28 let changes = attrs
29 .keys()
30 .cloned()
31 .map(|attr| (attr, cid.clone()))
32 .collect();
33
34 let st = State::Live {
35 at: cid.clone(),
36 changes,
37 };
38
39 EntryChangeState { st }
40 }
41
42 pub fn new_without_schema(cid: &Cid, attrs: &Eattrs) -> Self {
43 let class = attrs.get(&Attribute::Class);
44 let st = if class
45 .as_ref()
46 .map(|c| c.contains(&EntryClass::Tombstone.to_partialvalue()))
47 .unwrap_or(false)
48 {
49 State::Tombstone { at: cid.clone() }
50 } else {
51 let changes = attrs
52 .keys()
53 .cloned()
54 .map(|attr| (attr, cid.clone()))
55 .collect();
56
57 State::Live {
58 at: cid.clone(),
59 changes,
60 }
61 };
62
63 EntryChangeState { st }
64 }
65
66 pub(crate) fn to_db_changestate(&self) -> DbEntryChangeState {
67 match &self.st {
68 State::Live { at, changes } => {
69 let at = DbCidV1 {
70 server_id: at.s_uuid,
71 timestamp: at.ts,
72 };
73
74 let changes = changes
75 .iter()
76 .map(|(attr, cid)| {
77 (
78 attr.clone(),
79 DbCidV1 {
80 server_id: cid.s_uuid,
81 timestamp: cid.ts,
82 },
83 )
84 })
85 .collect();
86
87 DbEntryChangeState::V1Live { at, changes }
88 }
89 State::Tombstone { at } => {
90 let at = DbCidV1 {
91 server_id: at.s_uuid,
92 timestamp: at.ts,
93 };
94
95 DbEntryChangeState::V1Tombstone { at }
96 }
97 }
98 }
99
100 pub(crate) fn from_db_changestate(db_ecstate: DbEntryChangeState) -> Self {
101 match db_ecstate {
102 DbEntryChangeState::V1Live { at, changes } => {
103 let at = Cid {
104 s_uuid: at.server_id,
105 ts: at.timestamp,
106 };
107
108 let changes = changes
109 .iter()
110 .map(|(attr, cid)| {
111 (
112 attr.clone(),
113 Cid {
114 s_uuid: cid.server_id,
115 ts: cid.timestamp,
116 },
117 )
118 })
119 .collect();
120
121 EntryChangeState {
122 st: State::Live { at, changes },
123 }
124 }
125 DbEntryChangeState::V1Tombstone { at } => EntryChangeState {
126 st: State::Tombstone {
127 at: Cid {
128 s_uuid: at.server_id,
129 ts: at.timestamp,
130 },
131 },
132 },
133 }
134 }
135
136 pub(crate) fn build(st: State) -> Self {
137 EntryChangeState { st }
138 }
139
140 pub fn current(&self) -> &State {
141 &self.st
142 }
143
144 pub fn at(&self) -> &Cid {
145 match &self.st {
146 State::Live { at, .. } => at,
147 State::Tombstone { at } => at,
148 }
149 }
150
151 pub(crate) fn stub(&self) -> Self {
152 let st = match &self.st {
153 State::Live { at, changes: _ } => State::Live {
154 at: at.clone(),
155 changes: Default::default(),
156 },
157 State::Tombstone { at } => State::Tombstone { at: at.clone() },
158 };
159 EntryChangeState { st }
160 }
161
162 pub fn change_ava(&mut self, cid: &Cid, attr: &Attribute) {
163 match &mut self.st {
164 State::Live {
165 at: _,
166 ref mut changes,
167 } => {
168 if let Some(change) = changes.get_mut(attr) {
169 if change != cid {
171 *change = cid.clone()
172 }
173 } else {
174 changes.insert(attr.clone(), cid.clone());
175 }
176 }
177 State::Tombstone { .. } => {
178 unreachable!();
179 }
180 }
181 }
182
183 pub fn tombstone(&mut self, cid: &Cid) {
184 match &mut self.st {
185 State::Live { at: _, changes: _ } => self.st = State::Tombstone { at: cid.clone() },
186 State::Tombstone { .. } => {} };
188 }
189
190 pub fn can_delete(&self, cid: &Cid) -> bool {
191 match &self.st {
192 State::Live { .. } => false,
193 State::Tombstone { at } => at < cid,
194 }
195 }
196
197 pub fn is_live(&self) -> bool {
198 match &self.st {
199 State::Live { .. } => true,
200 State::Tombstone { .. } => false,
201 }
202 }
203
204 pub fn contains_tail_cid(&self, cid: &Cid) -> bool {
205 match &self.st {
207 State::Live { at: _, changes } => changes.values().any(|change| change == cid),
208 State::Tombstone { at } => at == cid,
209 }
210 }
211
212 pub(crate) fn get_max_cid(&self) -> &Cid {
213 match &self.st {
214 State::Live { at, changes } => changes.values().max().unwrap_or(at),
215 State::Tombstone { at } => at,
216 }
217 }
218
219 #[cfg(test)]
220 pub(crate) fn get_attr_cid(&self, attr: &Attribute) -> Option<&Cid> {
221 match &self.st {
222 State::Live { at: _, changes } => changes.get(attr),
223 State::Tombstone { at: _ } => None,
224 }
225 }
226
227 pub(crate) fn cid_iter(&self) -> Vec<&Cid> {
228 match &self.st {
229 State::Live { at: _, changes } => {
230 let mut v: Vec<_> = changes.values().collect();
231 v.sort_unstable();
232 v.dedup();
233 v
234 }
235 State::Tombstone { at } => vec![at],
236 }
237 }
238
239 pub fn retain<F>(&mut self, f: F)
240 where
241 F: FnMut(&Attribute, &mut Cid) -> bool,
242 {
243 match &mut self.st {
244 State::Live { at: _, changes } => changes.retain(f),
245 State::Tombstone { .. } => {}
246 }
247 }
248
249 #[instrument(level = "trace", name = "verify", skip_all)]
250 pub fn verify(
251 &self,
252 schema: &dyn SchemaTransaction,
253 expected_attrs: &Eattrs,
254 entry_id: u64,
255 results: &mut Vec<Result<(), ConsistencyError>>,
256 ) {
257 let class = expected_attrs.get(&Attribute::Class);
258 let is_ts = class
259 .as_ref()
260 .map(|c| c.contains(&EntryClass::Tombstone.to_partialvalue()))
261 .unwrap_or(false);
262
263 match (&self.st, is_ts) {
264 (State::Live { at, changes }, false) => {
265 let inconsistent: Vec<_> = expected_attrs
269 .keys()
270 .filter(|attr| {
271 let change_cid_present = if let Some(change_cid) = changes.get(*attr) {
288 if change_cid < at {
289 warn!("changestate has a change that occurs before entry was created! {attr:?} {change_cid:?} {at:?}");
290 results.push(Err(ConsistencyError::ChangeStateDesynchronised(entry_id)));
291 }
292 true
293 } else {
294 false
295 };
296
297 let desync = schema.is_replicated(attr) && !change_cid_present;
299 if desync {
300 debug!(%entry_id, %attr, %desync);
301 }
302 desync
303 })
304 .collect();
305
306 if inconsistent.is_empty() {
307 trace!("changestate is synchronised");
308 } else {
309 warn!("changestate has desynchronised! Missing state attrs {inconsistent:?}");
310 results.push(Err(ConsistencyError::ChangeStateDesynchronised(entry_id)));
311 }
312 }
313 (State::Tombstone { .. }, true) => {
314 trace!("changestate is synchronised");
315 }
316 (State::Live { .. }, true) => {
317 warn!("changestate has desynchronised! State Live when tombstone is true");
318 results.push(Err(ConsistencyError::ChangeStateDesynchronised(entry_id)));
319 }
320 (State::Tombstone { .. }, false) => {
321 warn!("changestate has desynchronised! State Tombstone when tombstone is false");
322 results.push(Err(ConsistencyError::ChangeStateDesynchronised(entry_id)));
323 }
324 }
325 }
326}
327
328impl PartialEq for EntryChangeState {
329 fn eq(&self, rhs: &Self) -> bool {
330 match (&self.st, &rhs.st) {
331 (
332 State::Live {
333 at: at_left,
334 changes: changes_left,
335 },
336 State::Live {
337 at: at_right,
338 changes: changes_right,
339 },
340 ) => at_left.eq(at_right) && changes_left.eq(changes_right),
341 (State::Tombstone { at: at_left }, State::Tombstone { at: at_right }) => {
342 at_left.eq(at_right)
343 }
344 (_, _) => false,
345 }
346 }
347}