1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use im::{OrdMap, OrdSet, ordset};
use aero_bayou::*;
use crate::unique_ident::UniqueIdent;
/// Parents are only persisted in the event log,
/// not in the checkpoints.
pub type Parents = Vec<UniqueIdent>;
pub type Etag = String;
pub type FileName = String;
pub type IndexEntry = (FileName, Etag);
#[derive(Clone, Default)]
pub struct DavDag {
/// Source of trust
pub table: OrdMap<UniqueIdent, IndexEntry>,
/// Indexes optimized for queries
pub idx_by_filename: OrdMap<FileName, UniqueIdent>,
/// Partial synchronization graph
/// parent -> direct children
pub successors: OrdMap<UniqueIdent, OrdSet<UniqueIdent>>,
/// Head nodes
pub heads: OrdSet<UniqueIdent>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum DavDagOp {
/// Merge is a virtual operation run when multiple heads are discovered
Merge(Parents, UniqueIdent),
/// Add an item to the collection
Put(Parents, UniqueIdent, IndexEntry),
/// Delete an item from the collection
Delete(Parents, UniqueIdent),
}
impl DavDag {
pub fn op_merge(&self, ident: UniqueIdent) -> DavDagOp {
DavDagOp::Merge(self.heads_vec(), ident)
}
pub fn op_put(&self, ident: UniqueIdent, entry: IndexEntry) -> DavDagOp {
DavDagOp::Put(self.heads_vec(), ident, entry)
}
pub fn op_delete(&self, ident: UniqueIdent) -> DavDagOp {
DavDagOp::Delete(self.heads_vec(), ident)
}
// HELPER functions
pub fn heads_vec(&self) -> Vec<UniqueIdent> {
self.heads.clone().into_iter().collect()
}
// INTERNAL functions
fn register(&mut self, ident: UniqueIdent, entry: IndexEntry) {
// Insert item in the source of trust
self.table.insert(ident, entry.clone());
// Update the cache
let (filename, _etag) = entry;
self.idx_by_filename.insert(filename, ident);
}
fn unregister(&mut self, ident: &UniqueIdent) {
// Query the source of truth to get the information we
// need to clean the indexes
let (filename, _etag) = match self.table.get(ident) {
Some(v) => v,
None => return,
};
self.idx_by_filename.remove(filename);
// Finally clear item from the source of trust
self.table.remove(ident);
}
// @FIXME: maybe in case of error we could simply disable the sync graph
// and ask the client to rely on manual sync. For now, we are skipping the event
// which is midly satisfying.
fn sync_dag(&mut self, child: &UniqueIdent, parents: &[UniqueIdent]) -> bool {
// All parents must exist in successors otherwise we can't accept item
// do the check + update successors
let mut try_successors = self.successors.clone();
for par in parents.iter() {
match try_successors.get_mut(par) {
None => {
tracing::warn!("Unable to push a Dav DAG sync op into the graph, an event is missing, it's a bug");
return false
},
Some(v) => v.insert(*child),
};
}
self.successors = try_successors;
// Remove from HEADS this event's parents
parents.iter().for_each(|par| { self.heads.remove(par); });
// This event becomes a new HEAD in turn
self.heads.insert(*child);
// This event is also a future successor
self.successors.insert(*child, ordset![]);
true
}
}
impl BayouState for DavDag {
type Op = DavDagOp;
fn apply(&self, op: &Self::Op) -> Self {
let mut new = self.clone();
match op {
DavDagOp::Put(parents, ident, entry) => {
if new.sync_dag(ident, parents.as_slice()) {
new.register(*ident, entry.clone());
}
},
DavDagOp::Delete(parents, ident) => {
if new.sync_dag(ident, parents.as_slice()) {
new.unregister(ident);
}
},
DavDagOp::Merge(parents, ident) => {
new.sync_dag(ident, parents.as_slice());
}
}
new
}
}
// CUSTOM SERIALIZATION & DESERIALIZATION
#[derive(Serialize, Deserialize)]
struct DavDagSerializedRepr {
items: Vec<(UniqueIdent, IndexEntry)>,
heads: Vec<UniqueIdent>,
}
impl<'de> Deserialize<'de> for DavDag {
fn deserialize<D>(d: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let val: DavDagSerializedRepr = DavDagSerializedRepr::deserialize(d)?;
let mut davdag = DavDag::default();
// Build the table + index
val.items.into_iter().for_each(|(ident, entry)| davdag.register(ident, entry));
// Initialize the synchronization DAG with its roots
val.heads.into_iter().for_each(|ident| {
davdag.successors.insert(ident, ordset![]);
davdag.heads.insert(ident);
});
Ok(davdag)
}
}
impl Serialize for DavDag {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// Indexes are rebuilt on the fly, we serialize only the core database
let items = self.table.iter().map(|(ident, entry)| (*ident, entry.clone())).collect();
// We keep only the head entries from the sync graph,
// these entries will be used to initialize it back when deserializing
let heads = self.heads_vec();
// Finale serialization object
let val = DavDagSerializedRepr { items, heads };
val.serialize(serializer)
}
}
|