aboutsummaryrefslogtreecommitdiff
path: root/src/util/crdt
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/crdt')
-rw-r--r--src/util/crdt/bool.rs34
-rw-r--r--src/util/crdt/crdt.rs71
-rw-r--r--src/util/crdt/lww.rs120
-rw-r--r--src/util/crdt/lww_map.rs167
-rw-r--r--src/util/crdt/map.rs99
-rw-r--r--src/util/crdt/mod.rs23
6 files changed, 514 insertions, 0 deletions
diff --git a/src/util/crdt/bool.rs b/src/util/crdt/bool.rs
new file mode 100644
index 00000000..53af8f82
--- /dev/null
+++ b/src/util/crdt/bool.rs
@@ -0,0 +1,34 @@
+use serde::{Deserialize, Serialize};
+
+use crate::crdt::crdt::*;
+
+/// Boolean, where `true` is an absorbing state
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Bool(bool);
+
+impl Bool {
+ /// Create a new boolean with the specified value
+ pub fn new(b: bool) -> Self {
+ Self(b)
+ }
+ /// Set the boolean to true
+ pub fn set(&mut self) {
+ self.0 = true;
+ }
+ /// Get the boolean value
+ pub fn get(&self) -> bool {
+ self.0
+ }
+}
+
+impl From<bool> for Bool {
+ fn from(b: bool) -> Bool {
+ Bool::new(b)
+ }
+}
+
+impl Crdt for Bool {
+ fn merge(&mut self, other: &Self) {
+ self.0 = self.0 || other.0;
+ }
+}
diff --git a/src/util/crdt/crdt.rs b/src/util/crdt/crdt.rs
new file mode 100644
index 00000000..9b5f230d
--- /dev/null
+++ b/src/util/crdt/crdt.rs
@@ -0,0 +1,71 @@
+use crate::data::*;
+
+/// Definition of a CRDT - all CRDT Rust types implement this.
+///
+/// A CRDT is defined as a merge operator that respects a certain set of axioms.
+///
+/// In particular, the merge operator must be commutative, associative,
+/// idempotent, and monotonic.
+/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
+/// the following axioms must apply:
+///
+/// ```text
+/// a ⊔ b = b ⊔ a (commutativity)
+/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
+/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
+/// ```
+///
+/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
+/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
+/// as this would imply a cycle in the partial order.
+pub trait Crdt {
+ /// Merge the two datastructures according to the CRDT rules.
+ /// `self` is modified to contain the merged CRDT value. `other` is not modified.
+ ///
+ /// # Arguments
+ ///
+ /// * `other` - the other CRDT we wish to merge with
+ fn merge(&mut self, other: &Self);
+}
+
+/// All types that implement `Ord` (a total order) can also implement a trivial CRDT
+/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type
+/// to enable this behavior.
+pub trait AutoCrdt: Ord + Clone + std::fmt::Debug {
+ /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if
+ /// different values in your application should never happen. Set this to false
+ /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`.
+ const WARN_IF_DIFFERENT: bool;
+}
+
+impl<T> Crdt for T
+where
+ T: AutoCrdt,
+{
+ fn merge(&mut self, other: &Self) {
+ if Self::WARN_IF_DIFFERENT && self != other {
+ warn!(
+ "Different CRDT values should be the same (logic error!): {:?} vs {:?}",
+ self, other
+ );
+ if other > self {
+ *self = other.clone();
+ }
+ warn!("Making an arbitrary choice: {:?}", self);
+ } else if other > self {
+ *self = other.clone();
+ }
+ }
+}
+
+impl AutoCrdt for String {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCrdt for bool {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCrdt for FixedBytes32 {
+ const WARN_IF_DIFFERENT: bool = true;
+}
diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs
new file mode 100644
index 00000000..43d13f27
--- /dev/null
+++ b/src/util/crdt/lww.rs
@@ -0,0 +1,120 @@
+use std::cmp::Ordering;
+
+use serde::{Deserialize, Serialize};
+
+use crate::time::now_msec;
+
+use crate::crdt::crdt::*;
+
+/// Last Write Win (LWW)
+///
+/// An LWW CRDT associates a timestamp with a value, in order to implement a
+/// time-based reconciliation rule: the most recent write wins.
+/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
+/// with the same timestamp but different values.
+///
+/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
+/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
+/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
+/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
+/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
+/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
+/// but different inner values, as the rule to keep the maximum value isn't generally the desired
+/// semantics.)
+///
+/// As multiple computers clocks are always desynchronized,
+/// when operations are close enough, it is equivalent to
+/// take one copy and drop the other one.
+///
+/// Given that clocks are not too desynchronized, this assumption
+/// is enough for most cases, as there is few chance that two humans
+/// coordonate themself faster than the time difference between two NTP servers.
+///
+/// As a more concret example, let's suppose you want to upload a file
+/// with the same key (path) in the same bucket at the very same time.
+/// For each request, the file will be timestamped by the receiving server
+/// and may differ from what you observed with your atomic clock!
+///
+/// This scheme is used by AWS S3 or Soundcloud and often without knowing
+/// in enterprise when reconciliating databases with ad-hoc scripts.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Lww<T> {
+ ts: u64,
+ v: T,
+}
+
+impl<T> Lww<T>
+where
+ T: Crdt,
+{
+ /// Creates a new CRDT
+ ///
+ /// CRDT's internal timestamp is set with current node's clock.
+ pub fn new(value: T) -> Self {
+ Self {
+ ts: now_msec(),
+ v: value,
+ }
+ }
+
+ /// Build a new CRDT from a previous non-compatible one
+ ///
+ /// Compared to new, the CRDT's timestamp is not set to now
+ /// but must be set to the previous, non-compatible, CRDT's timestamp.
+ pub fn migrate_from_raw(ts: u64, value: T) -> Self {
+ Self { ts, v: value }
+ }
+
+ /// Update the LWW CRDT while keeping some causal ordering.
+ ///
+ /// The timestamp of the LWW CRDT is updated to be the current node's clock
+ /// at time of update, or the previous timestamp + 1 if that's bigger,
+ /// so that the new timestamp is always strictly larger than the previous one.
+ /// This ensures that merging the update with the old value will result in keeping
+ /// the updated value.
+ pub fn update(&mut self, new_value: T) {
+ self.ts = std::cmp::max(self.ts + 1, now_msec());
+ self.v = new_value;
+ }
+
+ /// Get the CRDT value
+ pub fn get(&self) -> &T {
+ &self.v
+ }
+
+ /// Get a mutable reference to the CRDT's value
+ ///
+ /// This is usefull to mutate the inside value without changing the LWW timestamp.
+ /// When such mutation is done, the merge between two LWW values is done using the inner
+ /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
+ /// data type, such as a map, and we only want to change a single item in the map.
+ /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
+ /// This delta consists in a LWW with the same timestamp, and the map
+ /// inside only contains the updated value.
+ /// The advantage of such a delta is that it is much smaller than the whole map.
+ ///
+ /// Avoid using this if the inner data type is a primitive type such as a number or a string,
+ /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
+ /// of both values.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.v
+ }
+}
+
+impl<T> Crdt for Lww<T>
+where
+ T: Clone + Crdt,
+{
+ fn merge(&mut self, other: &Self) {
+ match other.ts.cmp(&self.ts) {
+ Ordering::Greater => {
+ self.ts = other.ts;
+ self.v = other.v.clone();
+ }
+ Ordering::Equal => {
+ self.v.merge(&other.v);
+ }
+ Ordering::Less => (),
+ }
+ }
+}
diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs
new file mode 100644
index 00000000..3e9aba79
--- /dev/null
+++ b/src/util/crdt/lww_map.rs
@@ -0,0 +1,167 @@
+use std::cmp::Ordering;
+
+use serde::{Deserialize, Serialize};
+
+use crate::time::now_msec;
+
+use crate::crdt::crdt::*;
+
+/// Last Write Win Map
+///
+/// This types defines a CRDT for a map from keys to values.
+/// The values have an associated timestamp, such that the last written value
+/// takes precedence over previous ones. As for the simpler `LWW` type, the value
+/// type `V` is also required to implement the CRDT trait.
+/// We do not encourage mutating the values associated with a given key
+/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
+/// method that would allow that.
+///
+/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
+/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
+/// such that two values can be compared for equality based on their hashes). As a consequence,
+/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
+/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
+/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
+/// actually not losing anything here.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LwwMap<K, V> {
+ vals: Vec<(K, u64, V)>,
+}
+
+impl<K, V> LwwMap<K, V>
+where
+ K: Ord,
+ V: Crdt,
+{
+ /// Create a new empty map CRDT
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+ /// Used to migrate from a map defined in an incompatible format. This produces
+ /// a map that contains a single item with the specified timestamp (copied from
+ /// the incompatible format). Do this as many times as you have items to migrate,
+ /// and put them all together using the CRDT merge operator.
+ pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
+ Self {
+ vals: vec![(k, ts, v)],
+ }
+ }
+ /// Returns a map that contains a single mapping from the specified key to the specified value.
+ /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
+ /// the previous value will be replaced with the one specified here.
+ /// The timestamp in the provided mutator is set to the maximum of the current system's clock
+ /// and 1 + the previous value's timestamp (if there is one), so that the new value will always
+ /// take precedence (LWW rule).
+ ///
+ /// Typically, to update the value associated to a key in the map, you would do the following:
+ ///
+ /// ```ignore
+ /// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
+ /// my_crdt.merge(&my_update);
+ /// ```
+ ///
+ /// However extracting the mutator on its own and only sending that on the network is very
+ /// interesting as it is much smaller than the whole map.
+ pub fn update_mutator(&self, k: K, new_v: V) -> Self {
+ let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, old_ts, _) = self.vals[i];
+ let new_ts = std::cmp::max(old_ts + 1, now_msec());
+ vec![(k, new_ts, new_v)]
+ }
+ Err(_) => vec![(k, now_msec(), new_v)],
+ };
+ Self { vals: new_vals }
+ }
+ /// Takes all of the values of the map and returns them. The current map is reset to the
+ /// empty map. This is very usefull to produce in-place a new map that contains only a delta
+ /// that modifies a certain value:
+ ///
+ /// ```ignore
+ /// let mut a = get_my_crdt_value();
+ /// let old_a = a.take_and_clear();
+ /// a.merge(&old_a.update_mutator(key_to_modify, new_value));
+ /// put_my_crdt_value(a);
+ /// ```
+ ///
+ /// Of course in this simple example we could have written simply
+ /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
+ /// but in the case where the map is a field in a struct for instance (as is always the case),
+ /// this becomes very handy:
+ ///
+ /// ```ignore
+ /// let mut a = get_my_crdt_value();
+ /// let old_a_map = a.map_field.take_and_clear();
+ /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
+ /// put_my_crdt_value(a);
+ /// ```
+ pub fn take_and_clear(&mut self) -> Self {
+ let vals = std::mem::take(&mut self.vals);
+ Self { vals }
+ }
+ /// Removes all values from the map
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+ /// Get a reference to the value assigned to a key
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
+ Ok(i) => Some(&self.vals[i].2),
+ Err(_) => None,
+ }
+ }
+ /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
+ /// In most case you will want to ignore the timestamp (second item of the tuple).
+ pub fn items(&self) -> &[(K, u64, V)] {
+ &self.vals[..]
+ }
+
+ /// Returns the number of items in the map
+ pub fn len(&self) -> usize {
+ self.vals.len()
+ }
+
+ /// Returns true if the map is empty
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+}
+
+impl<K, V> Crdt for LwwMap<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + Crdt,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, ts2, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
+ Ok(i) => {
+ let (_, ts1, _v1) = &self.vals[i];
+ match ts2.cmp(ts1) {
+ Ordering::Greater => {
+ self.vals[i].1 = *ts2;
+ self.vals[i].2 = v2.clone();
+ }
+ Ordering::Equal => {
+ self.vals[i].2.merge(v2);
+ }
+ Ordering::Less => (),
+ }
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
+ }
+ }
+ }
+ }
+}
+
+impl<K, V> Default for LwwMap<K, V>
+where
+ K: Ord,
+ V: Crdt,
+{
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/src/util/crdt/map.rs b/src/util/crdt/map.rs
new file mode 100644
index 00000000..7553cd50
--- /dev/null
+++ b/src/util/crdt/map.rs
@@ -0,0 +1,99 @@
+use serde::{Deserialize, Serialize};
+
+use crate::crdt::crdt::*;
+
+/// Simple CRDT Map
+///
+/// This types defines a CRDT for a map from keys to values. Values are CRDT types which
+/// can have their own updating logic.
+///
+/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
+/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
+/// such that two values can be compared for equality based on their hashes). As a consequence,
+/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps.
+/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
+/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
+/// actually not losing anything here.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Map<K, V> {
+ vals: Vec<(K, V)>,
+}
+
+impl<K, V> Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + Crdt,
+{
+ /// Create a new empty map CRDT
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+
+ /// Returns a map that contains a single mapping from the specified key to the specified value.
+ /// This can be used to build a delta-mutator:
+ /// when merged with another map, the value will be added or CRDT-merged if a previous
+ /// value already exists.
+ pub fn put_mutator(k: K, v: V) -> Self {
+ Self { vals: vec![(k, v)] }
+ }
+
+ /// Add a value to the map
+ pub fn put(&mut self, k: K, v: V) {
+ self.merge(&Self::put_mutator(k, v));
+ }
+
+ /// Removes all values from the map
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+
+ /// Get a reference to the value assigned to a key
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) {
+ Ok(i) => Some(&self.vals[i].1),
+ Err(_) => None,
+ }
+ }
+ /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
+ pub fn items(&self) -> &[(K, V)] {
+ &self.vals[..]
+ }
+ /// Returns the number of items in the map
+ pub fn len(&self) -> usize {
+ self.vals.len()
+ }
+
+ /// Returns true if the map is empty
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+}
+
+impl<K, V> Crdt for Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + Crdt,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) {
+ Ok(i) => {
+ self.vals[i].1.merge(v2);
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), v2.clone()));
+ }
+ }
+ }
+ }
+}
+
+impl<K, V> Default for Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + Crdt,
+{
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/src/util/crdt/mod.rs b/src/util/crdt/mod.rs
new file mode 100644
index 00000000..9663a5a5
--- /dev/null
+++ b/src/util/crdt/mod.rs
@@ -0,0 +1,23 @@
+//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
+//!
+//! CRDTs are a type of data structures that do not require coordination. In other words, we can
+//! edit them in parallel, we will always find a way to merge it.
+//!
+//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
+//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
+//! it is easy to merge their counters, order does not count: we always get 4.
+//!
+//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
+
+mod bool;
+#[allow(clippy::module_inception)]
+mod crdt;
+mod lww;
+mod lww_map;
+mod map;
+
+pub use self::bool::*;
+pub use crdt::*;
+pub use lww::*;
+pub use lww_map::*;
+pub use map::*;