1use super::cid::Cid;
2use super::entry::EntryChangeState;
3use super::entry::State;
4use crate::be::dbvalue::DbValueSetV2;
5use crate::entry::Eattrs;
6use crate::prelude::*;
7use crate::schema::{SchemaReadTransaction, SchemaTransaction};
8use crate::valueset;
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeMap;
11use std::fmt;
12
13pub enum ConsumerState {
14 Ok,
15 RefreshRequired,
16}
17
18#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
19pub struct ReplCidV1 {
20 #[serde(rename = "t")]
21 pub ts: Duration,
22 #[serde(rename = "s")]
23 pub s_uuid: Uuid,
24}
25
26impl From<&Cid> for ReplCidV1 {
28 fn from(cid: &Cid) -> Self {
29 ReplCidV1 {
30 ts: cid.ts,
31 s_uuid: cid.s_uuid,
32 }
33 }
34}
35
36impl From<ReplCidV1> for Cid {
37 fn from(cid: ReplCidV1) -> Self {
38 Cid {
39 ts: cid.ts,
40 s_uuid: cid.s_uuid,
41 }
42 }
43}
44
45impl From<&ReplCidV1> for Cid {
46 fn from(cid: &ReplCidV1) -> Self {
47 Cid {
48 ts: cid.ts,
49 s_uuid: cid.s_uuid,
50 }
51 }
52}
53
54#[derive(Serialize, Deserialize, PartialEq, Eq)]
59pub struct ReplAnchoredCidRange {
60 #[serde(rename = "m")]
61 pub ts_min: Duration,
62 #[serde(rename = "a", default)]
63 pub anchors: Vec<Duration>,
64 #[serde(rename = "x")]
65 pub ts_max: Duration,
66}
67
68impl fmt::Debug for ReplAnchoredCidRange {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 write!(
71 f,
72 "{:032} --{}-> {:032}",
73 self.ts_min.as_nanos(),
74 self.anchors.len(),
75 self.ts_max.as_nanos()
76 )
77 }
78}
79
80#[derive(Serialize, Deserialize, PartialEq, Eq)]
83pub struct ReplCidRange {
84 #[serde(rename = "m")]
85 pub ts_min: Duration,
86 #[serde(rename = "x")]
87 pub ts_max: Duration,
88}
89
90impl fmt::Debug for ReplCidRange {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 write!(
93 f,
94 "{:032} -> {:032}",
95 self.ts_min.as_nanos(),
96 self.ts_max.as_nanos()
97 )
98 }
99}
100
101#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
102pub enum ReplRuvRange {
103 V1 {
104 domain_uuid: Uuid,
105 ranges: BTreeMap<Uuid, ReplCidRange>,
106 },
107}
108
109impl ReplRuvRange {
110 pub fn is_empty(&self) -> bool {
111 match self {
112 ReplRuvRange::V1 { ranges, .. } => ranges.is_empty(),
113 }
114 }
115}
116
117#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
118pub struct ReplAttrStateV1 {
119 cid: ReplCidV1,
120 attr: Option<DbValueSetV2>,
121}
122
123#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
124pub enum ReplStateV1 {
125 Live {
126 at: ReplCidV1,
127 attrs: BTreeMap<Attribute, ReplAttrStateV1>,
129 },
130 Tombstone {
131 at: ReplCidV1,
132 },
133}
134
135#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
136#[serde(rename_all = "lowercase")]
137pub struct ReplEntryV1 {
139 uuid: Uuid,
140 st: ReplStateV1,
142}
143
144impl ReplEntryV1 {
145 pub fn new(entry: &EntrySealedCommitted, schema: &SchemaReadTransaction) -> ReplEntryV1 {
146 let cs = entry.get_changestate();
147 let uuid = entry.get_uuid();
148
149 let st = match cs.current() {
150 State::Live { at, changes } => {
151 let live_attrs = entry.get_ava();
152
153 let attrs = changes
154 .iter()
155 .filter_map(|(attr_name, cid)| {
156 if schema.is_replicated(attr_name) {
157 let live_attr = live_attrs.get(attr_name);
158
159 let cid = cid.into();
160 let attr = live_attr.and_then(|maybe|
161 if maybe.is_empty() {
169 None
170 } else {
171 Some(maybe.to_db_valueset_v2())
172 }
173 );
174
175 Some((attr_name.clone(), ReplAttrStateV1 { cid, attr }))
176 } else {
177 None
178 }
179 })
180 .collect();
181
182 ReplStateV1::Live {
183 at: at.into(),
184 attrs,
185 }
186 }
187 State::Tombstone { at } => ReplStateV1::Tombstone { at: at.into() },
188 };
189
190 ReplEntryV1 { uuid, st }
191 }
192
193 pub fn rehydrate(self) -> Result<(EntryChangeState, Eattrs), OperationError> {
194 match self.st {
195 ReplStateV1::Live { at, attrs } => {
196 trace!("{:?} {:#?}", at, attrs);
197 let mut changes = BTreeMap::default();
200 let mut eattrs = Eattrs::default();
201
202 for (attr_name, ReplAttrStateV1 { cid, attr }) in attrs.into_iter() {
203 let cid: Cid = cid.into();
204
205 if let Some(attr_value) = attr {
206 let v = valueset::from_db_valueset_v2(attr_value).inspect_err(|err| {
207 error!(?err, "Unable to restore valueset for {}", attr_name);
208 })?;
209 if eattrs.insert(attr_name.clone(), v).is_some() {
210 error!(
211 "Impossible eattrs state, attribute {} appears to be duplicated!",
212 attr_name
213 );
214 return Err(OperationError::InvalidEntryState);
215 }
216 }
217
218 if changes.insert(attr_name.clone(), cid).is_some() {
219 error!(
220 "Impossible changes state, attribute {} appears to be duplicated!",
221 attr_name
222 );
223 return Err(OperationError::InvalidEntryState);
224 }
225 }
226
227 let at: Cid = at.into();
228
229 let ecstate = EntryChangeState {
230 st: State::Live { at, changes },
231 };
232 Ok((ecstate, eattrs))
233 }
234 ReplStateV1::Tombstone { at } => {
235 let at: Cid = at.into();
236
237 let mut eattrs = Eattrs::default();
238
239 let class_ava = vs_iutf8![EntryClass::Object.into(), EntryClass::Tombstone.into()];
240 let last_mod_ava = vs_cid![at.clone()];
241
242 eattrs.insert(Attribute::Uuid, vs_uuid![self.uuid]);
243 eattrs.insert(Attribute::Class, class_ava);
244 eattrs.insert(Attribute::LastModifiedCid, last_mod_ava);
245
246 let ecstate = EntryChangeState {
247 st: State::Tombstone { at },
248 };
249
250 Ok((ecstate, eattrs))
251 }
252 }
253 }
254}
255
256#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
257#[serde(rename_all = "lowercase")]
258pub struct ReplIncrementalEntryV1 {
260 pub(crate) uuid: Uuid,
261 pub(crate) st: ReplStateV1,
263}
264
265impl ReplIncrementalEntryV1 {
266 pub fn new(
267 entry: &EntrySealedCommitted,
268 schema: &SchemaReadTransaction,
269 ctx_range: &BTreeMap<Uuid, ReplCidRange>,
270 ) -> ReplIncrementalEntryV1 {
271 let cs = entry.get_changestate();
272 let uuid = entry.get_uuid();
273
274 let st = match cs.current() {
275 State::Live { at, changes } => {
276 let live_attrs = entry.get_ava();
279
280 let attrs = changes
281 .iter()
282 .filter_map(|(attr_name, cid)| {
283 let within = schema.is_replicated(attr_name)
285 && ctx_range
286 .get(&cid.s_uuid)
287 .map(|repl_range| {
288 cid.ts <= repl_range.ts_max &&
290 cid.ts > repl_range.ts_min
292 })
293 .unwrap_or(false);
295
296 if within {
298 let live_attr = live_attrs.get(attr_name);
299 let cid = cid.into();
300 let attr = live_attr.and_then(|maybe| {
301 if maybe.is_empty() {
302 None
303 } else {
304 Some(maybe.to_db_valueset_v2())
305 }
306 });
307
308 Some((attr_name.clone(), ReplAttrStateV1 { cid, attr }))
309 } else {
310 None
311 }
312 })
313 .collect();
314
315 ReplStateV1::Live {
316 at: at.into(),
317 attrs,
318 }
319 }
320 State::Tombstone { at } => ReplStateV1::Tombstone { at: at.into() },
322 };
323
324 ReplIncrementalEntryV1 { uuid, st }
325 }
326
327 pub fn rehydrate(self) -> Result<(Uuid, EntryChangeState, Eattrs), OperationError> {
328 match self.st {
329 ReplStateV1::Live { at, attrs } => {
330 trace!("{:?} {:#?}", at, attrs);
331 let mut changes = BTreeMap::default();
332 let mut eattrs = Eattrs::default();
333
334 for (attr_name, ReplAttrStateV1 { cid, attr }) in attrs.into_iter() {
335 let cid: Cid = cid.into();
336
337 if let Some(attr_value) = attr {
338 let v = valueset::from_db_valueset_v2(attr_value).inspect_err(|err| {
339 error!(?err, "Unable to restore valueset for {}", attr_name);
340 })?;
341 if eattrs.insert(attr_name.clone(), v).is_some() {
342 error!(
343 "Impossible eattrs state, attribute {} appears to be duplicated!",
344 attr_name
345 );
346 return Err(OperationError::InvalidEntryState);
347 }
348 }
349
350 if changes.insert(attr_name.clone(), cid).is_some() {
351 error!(
352 "Impossible changes state, attribute {} appears to be duplicated!",
353 attr_name
354 );
355 return Err(OperationError::InvalidEntryState);
356 }
357 }
358
359 let at: Cid = at.into();
360
361 let ecstate = EntryChangeState {
362 st: State::Live { at, changes },
363 };
364 Ok((self.uuid, ecstate, eattrs))
365 }
366 ReplStateV1::Tombstone { at } => {
367 let at: Cid = at.into();
368 let eattrs = Eattrs::default();
369 let ecstate = EntryChangeState {
370 st: State::Tombstone { at },
371 };
372 Ok((self.uuid, ecstate, eattrs))
373 }
374 }
375 }
376}
377
378#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
381#[serde(rename_all = "lowercase")]
382pub enum ReplRefreshContext {
383 V1 {
384 domain_version: DomainVersion,
385 domain_devel: bool,
386 domain_uuid: Uuid,
387 ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
390 schema_entries: Vec<ReplEntryV1>,
391 meta_entries: Vec<ReplEntryV1>,
392 entries: Vec<ReplEntryV1>,
393 },
394}
395
396#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
397#[serde(rename_all = "lowercase")]
398pub enum ReplIncrementalContext {
399 DomainMismatch,
400 NoChangesAvailable,
401 RefreshRequired,
402 UnwillingToSupply,
403 V1 {
404 domain_version: DomainVersion,
405 #[serde(default)]
406 domain_patch_level: u32,
407 domain_uuid: Uuid,
408 ranges: BTreeMap<Uuid, ReplAnchoredCidRange>,
413 schema_entries: Vec<ReplIncrementalEntryV1>,
414 meta_entries: Vec<ReplIncrementalEntryV1>,
415 entries: Vec<ReplIncrementalEntryV1>,
416 },
417}