aboutsummaryrefslogtreecommitdiff
path: root/src/table/crdt.rs
blob: 4cba10ce786cc71d49cdaea0fbfcd87e441093da (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
//!
//! CRDTs are a type of data structures that do not require coordination.  In other words, we can
//! edit them in parallel, we will always find a way to merge it.
//!
//! A general example is a counter. Its initial value is 0.  Alice and Bob get a copy of the
//! counter.  Alice does +1 on her copy, she reads 1.  Bob does +3 on his copy, he reads 3.  Now,
//! it is easy to merge their counters, order does not count: we always get 4.
//!
//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)

use serde::{Deserialize, Serialize};

use garage_util::data::*;

/// Definition of a CRDT - all CRDT Rust types implement this.
///
/// A CRDT is defined as a merge operator that respects a certain set of axioms.
///
/// In particular, the merge operator must be commutative, associative,
/// idempotent, and monotonic.
/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
/// the following axioms must apply:
///
/// ```text
/// a ⊔ b = b ⊔ a                   (commutativity)
/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c)       (associativity)
/// (a ⊔ b) ⊔ b = a ⊔ b             (idempotence)
/// ```
///
/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
/// as this would imply a cycle in the partial order.
pub trait CRDT {
	/// Merge the two datastructures according to the CRDT rules.
	/// `self` is modified to contain the merged CRDT value. `other` is not modified.
	///
	/// # Arguments
	///
	/// * `other` - the other CRDT we wish to merge with
	fn merge(&mut self, other: &Self);
}

/// All types that implement `Ord` (a total order) also implement a trivial CRDT
/// defined by the merge rule: `a ⊔ b = max(a, b)`.
impl<T> CRDT for T
where
	T: Ord + Clone,
{
	fn merge(&mut self, other: &Self) {
		if other > self {
			*self = other.clone();
		}
	}
}

// ---- LWW Register ----

/// Last Write Win (LWW)
///
/// An LWW CRDT associates a timestamp with a value, in order to implement a
/// time-based reconciliation rule: the most recent write wins.
/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
/// with the same timestamp but different values.
///
/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
/// keep, the merge rule of the inner CRDT is applied on the wrapped values.  (Note that all types
/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
/// but different inner values, as the rule to keep the maximum value isn't generally the desired
/// semantics.)
///
/// As multiple computers clocks are always desynchronized,
/// when operations are close enough, it is equivalent to
/// take one copy and drop the other one.
///
/// Given that clocks are not too desynchronized, this assumption
/// is enough for most cases, as there is few chance that two humans
/// coordonate themself faster than the time difference between two NTP servers.
///
/// As a more concret example, let's suppose you want to upload a file
/// with the same key (path) in the same bucket at the very same time.
/// For each request, the file will be timestamped by the receiving server
/// and may differ from what you observed with your atomic clock!
///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in entreprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> {
	ts: u64,
	v: T,
}

impl<T> LWW<T>
where
	T: CRDT,
{
	/// Creates a new CRDT
	///
	/// CRDT's internal timestamp is set with current node's clock.
	pub fn new(value: T) -> Self {
		Self {
			ts: now_msec(),
			v: value,
		}
	}

	/// Build a new CRDT from a previous non-compatible one
	///
	/// Compared to new, the CRDT's timestamp is not set to now
	/// but must be set to the previous, non-compatible, CRDT's timestamp.
	pub fn migrate_from_raw(ts: u64, value: T) -> Self {
		Self { ts, v: value }
	}

	/// Update the LWW CRDT while keeping some causal ordering.
	///
	/// The timestamp of the LWW CRDT is updated to be the current node's clock
	/// at time of update, or the previous timestamp + 1 if that's bigger,
	/// so that the new timestamp is always strictly larger than the previous one.
	/// This ensures that merging the update with the old value will result in keeping
	/// the updated value.
	pub fn update(&mut self, new_value: T) {
		self.ts = std::cmp::max(self.ts + 1, now_msec());
		self.v = new_value;
	}

	/// Get the CRDT value
	pub fn get(&self) -> &T {
		&self.v
	}

	/// Get a mutable reference to the CRDT's value
	///
	/// This is usefull to mutate the inside value without changing the LWW timestamp.
	/// When such mutation is done, the merge between two LWW values is done using the inner
	/// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
	/// data type, such as a map, and we only want to change a single item in the map.
	/// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
	/// This delta consists in a LWW with the same timestamp, and the map
	/// inside only contains the updated value.
	/// The advantage of such a delta is that it is much smaller than the whole map.
	///
	/// Avoid using this if the inner data type is a primitive type such as a number or a string,
	/// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
	/// of both values.
	pub fn get_mut(&mut self) -> &mut T {
		&mut self.v
	}
}

impl<T> CRDT for LWW<T>
where
	T: Clone + CRDT,
{
	fn merge(&mut self, other: &Self) {
		if other.ts > self.ts {
			self.ts = other.ts;
			self.v = other.v.clone();
		} else if other.ts == self.ts {
			self.v.merge(&other.v);
		}
	}
}

/// Boolean, where `true` is an absorbing state
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub struct Bool(bool);

impl Bool {
	/// Create a new boolean with the specified value
	pub fn new(b: bool) -> Self {
		Self(b)
	}
	/// Set the boolean to true
	pub fn set(&mut self) {
		self.0 = true;
	}
	/// Get the boolean value
	pub fn get(&self) -> bool {
		self.0
	}
}

impl CRDT for Bool {
	fn merge(&mut self, other: &Self) {
		self.0 = self.0 || other.0;
	}
}

/// Last Write Win Map
///
/// This types defines a CRDT for a map from keys to values.
/// The values have an associated timestamp, such that the last written value
/// takes precedence over previous ones. As for the simpler `LWW` type, the value
/// type `V` is also required to implement the CRDT trait.
/// We do not encourage mutating the values associated with a given key
/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
/// method that would allow that.
///
/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
/// such that two values can be compared for equality based on their hashes). As a consequence,
/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWWMap<K, V> {
	vals: Vec<(K, u64, V)>,
}

impl<K, V> LWWMap<K, V>
where
	K: Ord,
	V: CRDT,
{
	/// Create a new empty map CRDT
	pub fn new() -> Self {
		Self { vals: vec![] }
	}
	/// Used to migrate from a map defined in an incompatible format. This produces
	/// a map that contains a single item with the specified timestamp (copied from
	/// the incompatible format). Do this as many times as you have items to migrate,
	/// and put them all together using the CRDT merge operator.
	pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
		Self {
			vals: vec![(k, ts, v)],
		}
	}
	/// Returns a map that contains a single mapping from the specified key to the specified value.
	/// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
	/// the previous value will be replaced with the one specified here.
	/// The timestamp in the provided mutator is set to the maximum of the current system's clock
	/// and 1 + the previous value's timestamp (if there is one), so that the new value will always
	/// take precedence (LWW rule).
	///
	/// Typically, to update the value associated to a key in the map, you would do the following:
	///
	/// ```ignore
	/// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
	/// my_crdt.merge(&my_update);
	/// ```
	///
	/// However extracting the mutator on its own and only sending that on the network is very
	/// interesting as it is much smaller than the whole map.
	pub fn update_mutator(&self, k: K, new_v: V) -> Self {
		let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
			Ok(i) => {
				let (_, old_ts, _) = self.vals[i];
				let new_ts = std::cmp::max(old_ts + 1, now_msec());
				vec![(k, new_ts, new_v)]
			}
			Err(_) => vec![(k, now_msec(), new_v)],
		};
		Self { vals: new_vals }
	}
	/// Takes all of the values of the map and returns them. The current map is reset to the
	/// empty map. This is very usefull to produce in-place a new map that contains only a delta
	/// that modifies a certain value:
	///
	/// ```ignore
	/// let mut a = get_my_crdt_value();
	/// let old_a = a.take_and_clear();
	/// a.merge(&old_a.update_mutator(key_to_modify, new_value));
	/// put_my_crdt_value(a);
	/// ```
	///
	/// Of course in this simple example we could have written simply
	/// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
	/// but in the case where the map is a field in a struct for instance (as is always the case),
	/// this becomes very handy:
	///
	/// ```ignore
	/// let mut a = get_my_crdt_value();
	/// let old_a_map = a.map_field.take_and_clear();
	/// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
	/// put_my_crdt_value(a);
	/// ```
	pub fn take_and_clear(&mut self) -> Self {
		let vals = std::mem::replace(&mut self.vals, vec![]);
		Self { vals }
	}
	/// Removes all values from the map
	pub fn clear(&mut self) {
		self.vals.clear();
	}
	/// Get a reference to the value assigned to a key
	pub fn get(&self, k: &K) -> Option<&V> {
		match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
			Ok(i) => Some(&self.vals[i].2),
			Err(_) => None,
		}
	}
	/// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
	/// In most case you will want to ignore the timestamp (second item of the tuple).
	pub fn items(&self) -> &[(K, u64, V)] {
		&self.vals[..]
	}
}

impl<K, V> CRDT for LWWMap<K, V>
where
	K: Clone + Ord,
	V: Clone + CRDT,
{
	fn merge(&mut self, other: &Self) {
		for (k, ts2, v2) in other.vals.iter() {
			match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
				Ok(i) => {
					let (_, ts1, _v1) = &self.vals[i];
					if ts2 > ts1 {
						self.vals[i].1 = *ts2;
						self.vals[i].2 = v2.clone();
					} else if ts1 == ts2 {
						self.vals[i].2.merge(&v2);
					}
				}
				Err(i) => {
					self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
				}
			}
		}
	}
}