diff options
Diffstat (limited to 'src/util/crdt')
-rw-r--r-- | src/util/crdt/bool.rs | 34 | ||||
-rw-r--r-- | src/util/crdt/crdt.rs | 71 | ||||
-rw-r--r-- | src/util/crdt/lww.rs | 120 | ||||
-rw-r--r-- | src/util/crdt/lww_map.rs | 167 | ||||
-rw-r--r-- | src/util/crdt/map.rs | 99 | ||||
-rw-r--r-- | src/util/crdt/mod.rs | 23 |
6 files changed, 514 insertions, 0 deletions
diff --git a/src/util/crdt/bool.rs b/src/util/crdt/bool.rs new file mode 100644 index 00000000..53af8f82 --- /dev/null +++ b/src/util/crdt/bool.rs @@ -0,0 +1,34 @@ +use serde::{Deserialize, Serialize}; + +use crate::crdt::crdt::*; + +/// Boolean, where `true` is an absorbing state +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +pub struct Bool(bool); + +impl Bool { + /// Create a new boolean with the specified value + pub fn new(b: bool) -> Self { + Self(b) + } + /// Set the boolean to true + pub fn set(&mut self) { + self.0 = true; + } + /// Get the boolean value + pub fn get(&self) -> bool { + self.0 + } +} + +impl From<bool> for Bool { + fn from(b: bool) -> Bool { + Bool::new(b) + } +} + +impl Crdt for Bool { + fn merge(&mut self, other: &Self) { + self.0 = self.0 || other.0; + } +} diff --git a/src/util/crdt/crdt.rs b/src/util/crdt/crdt.rs new file mode 100644 index 00000000..9b5f230d --- /dev/null +++ b/src/util/crdt/crdt.rs @@ -0,0 +1,71 @@ +use crate::data::*; + +/// Definition of a CRDT - all CRDT Rust types implement this. +/// +/// A CRDT is defined as a merge operator that respects a certain set of axioms. +/// +/// In particular, the merge operator must be commutative, associative, +/// idempotent, and monotonic. +/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator, +/// the following axioms must apply: +/// +/// ```text +/// a ⊔ b = b ⊔ a (commutativity) +/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity) +/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence) +/// ``` +/// +/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order. +/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`, +/// as this would imply a cycle in the partial order. +pub trait Crdt { + /// Merge the two datastructures according to the CRDT rules. + /// `self` is modified to contain the merged CRDT value. `other` is not modified. + /// + /// # Arguments + /// + /// * `other` - the other CRDT we wish to merge with + fn merge(&mut self, other: &Self); +} + +/// All types that implement `Ord` (a total order) can also implement a trivial CRDT +/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type +/// to enable this behavior. +pub trait AutoCrdt: Ord + Clone + std::fmt::Debug { + /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if + /// different values in your application should never happen. Set this to false + /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`. + const WARN_IF_DIFFERENT: bool; +} + +impl<T> Crdt for T +where + T: AutoCrdt, +{ + fn merge(&mut self, other: &Self) { + if Self::WARN_IF_DIFFERENT && self != other { + warn!( + "Different CRDT values should be the same (logic error!): {:?} vs {:?}", + self, other + ); + if other > self { + *self = other.clone(); + } + warn!("Making an arbitrary choice: {:?}", self); + } else if other > self { + *self = other.clone(); + } + } +} + +impl AutoCrdt for String { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for bool { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for FixedBytes32 { + const WARN_IF_DIFFERENT: bool = true; +} diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs new file mode 100644 index 00000000..43d13f27 --- /dev/null +++ b/src/util/crdt/lww.rs @@ -0,0 +1,120 @@ +use std::cmp::Ordering; + +use serde::{Deserialize, Serialize}; + +use crate::time::now_msec; + +use crate::crdt::crdt::*; + +/// Last Write Win (LWW) +/// +/// An LWW CRDT associates a timestamp with a value, in order to implement a +/// time-based reconciliation rule: the most recent write wins. +/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs +/// with the same timestamp but different values. +/// +/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must +/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to +/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types +/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value. +/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is +/// generally desirable in this case to never explicitly produce LWW values with the same timestamp +/// but different inner values, as the rule to keep the maximum value isn't generally the desired +/// semantics.) +/// +/// As multiple computers clocks are always desynchronized, +/// when operations are close enough, it is equivalent to +/// take one copy and drop the other one. +/// +/// Given that clocks are not too desynchronized, this assumption +/// is enough for most cases, as there is few chance that two humans +/// coordonate themself faster than the time difference between two NTP servers. +/// +/// As a more concret example, let's suppose you want to upload a file +/// with the same key (path) in the same bucket at the very same time. +/// For each request, the file will be timestamped by the receiving server +/// and may differ from what you observed with your atomic clock! +/// +/// This scheme is used by AWS S3 or Soundcloud and often without knowing +/// in enterprise when reconciliating databases with ad-hoc scripts. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct Lww<T> { + ts: u64, + v: T, +} + +impl<T> Lww<T> +where + T: Crdt, +{ + /// Creates a new CRDT + /// + /// CRDT's internal timestamp is set with current node's clock. + pub fn new(value: T) -> Self { + Self { + ts: now_msec(), + v: value, + } + } + + /// Build a new CRDT from a previous non-compatible one + /// + /// Compared to new, the CRDT's timestamp is not set to now + /// but must be set to the previous, non-compatible, CRDT's timestamp. + pub fn migrate_from_raw(ts: u64, value: T) -> Self { + Self { ts, v: value } + } + + /// Update the LWW CRDT while keeping some causal ordering. + /// + /// The timestamp of the LWW CRDT is updated to be the current node's clock + /// at time of update, or the previous timestamp + 1 if that's bigger, + /// so that the new timestamp is always strictly larger than the previous one. + /// This ensures that merging the update with the old value will result in keeping + /// the updated value. + pub fn update(&mut self, new_value: T) { + self.ts = std::cmp::max(self.ts + 1, now_msec()); + self.v = new_value; + } + + /// Get the CRDT value + pub fn get(&self) -> &T { + &self.v + } + + /// Get a mutable reference to the CRDT's value + /// + /// This is usefull to mutate the inside value without changing the LWW timestamp. + /// When such mutation is done, the merge between two LWW values is done using the inner + /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large + /// data type, such as a map, and we only want to change a single item in the map. + /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification. + /// This delta consists in a LWW with the same timestamp, and the map + /// inside only contains the updated value. + /// The advantage of such a delta is that it is much smaller than the whole map. + /// + /// Avoid using this if the inner data type is a primitive type such as a number or a string, + /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum + /// of both values. + pub fn get_mut(&mut self) -> &mut T { + &mut self.v + } +} + +impl<T> Crdt for Lww<T> +where + T: Clone + Crdt, +{ + fn merge(&mut self, other: &Self) { + match other.ts.cmp(&self.ts) { + Ordering::Greater => { + self.ts = other.ts; + self.v = other.v.clone(); + } + Ordering::Equal => { + self.v.merge(&other.v); + } + Ordering::Less => (), + } + } +} diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs new file mode 100644 index 00000000..3e9aba79 --- /dev/null +++ b/src/util/crdt/lww_map.rs @@ -0,0 +1,167 @@ +use std::cmp::Ordering; + +use serde::{Deserialize, Serialize}; + +use crate::time::now_msec; + +use crate::crdt::crdt::*; + +/// Last Write Win Map +/// +/// This types defines a CRDT for a map from keys to values. +/// The values have an associated timestamp, such that the last written value +/// takes precedence over previous ones. As for the simpler `LWW` type, the value +/// type `V` is also required to implement the CRDT trait. +/// We do not encourage mutating the values associated with a given key +/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()` +/// method that would allow that. +/// +/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. +/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, +/// such that two values can be compared for equality based on their hashes). As a consequence, +/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps. +/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, +/// the serialization cost `O(n)` would still have to be paid at each modification, so we are +/// actually not losing anything here. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LwwMap<K, V> { + vals: Vec<(K, u64, V)>, +} + +impl<K, V> LwwMap<K, V> +where + K: Ord, + V: Crdt, +{ + /// Create a new empty map CRDT + pub fn new() -> Self { + Self { vals: vec![] } + } + /// Used to migrate from a map defined in an incompatible format. This produces + /// a map that contains a single item with the specified timestamp (copied from + /// the incompatible format). Do this as many times as you have items to migrate, + /// and put them all together using the CRDT merge operator. + pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { + Self { + vals: vec![(k, ts, v)], + } + } + /// Returns a map that contains a single mapping from the specified key to the specified value. + /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map, + /// the previous value will be replaced with the one specified here. + /// The timestamp in the provided mutator is set to the maximum of the current system's clock + /// and 1 + the previous value's timestamp (if there is one), so that the new value will always + /// take precedence (LWW rule). + /// + /// Typically, to update the value associated to a key in the map, you would do the following: + /// + /// ```ignore + /// let my_update = my_crdt.update_mutator(key_to_modify, new_value); + /// my_crdt.merge(&my_update); + /// ``` + /// + /// However extracting the mutator on its own and only sending that on the network is very + /// interesting as it is much smaller than the whole map. + pub fn update_mutator(&self, k: K, new_v: V) -> Self { + let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => { + let (_, old_ts, _) = self.vals[i]; + let new_ts = std::cmp::max(old_ts + 1, now_msec()); + vec![(k, new_ts, new_v)] + } + Err(_) => vec![(k, now_msec(), new_v)], + }; + Self { vals: new_vals } + } + /// Takes all of the values of the map and returns them. The current map is reset to the + /// empty map. This is very usefull to produce in-place a new map that contains only a delta + /// that modifies a certain value: + /// + /// ```ignore + /// let mut a = get_my_crdt_value(); + /// let old_a = a.take_and_clear(); + /// a.merge(&old_a.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + /// + /// Of course in this simple example we could have written simply + /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`, + /// but in the case where the map is a field in a struct for instance (as is always the case), + /// this becomes very handy: + /// + /// ```ignore + /// let mut a = get_my_crdt_value(); + /// let old_a_map = a.map_field.take_and_clear(); + /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + pub fn take_and_clear(&mut self) -> Self { + let vals = std::mem::take(&mut self.vals); + Self { vals } + } + /// Removes all values from the map + pub fn clear(&mut self) { + self.vals.clear(); + } + /// Get a reference to the value assigned to a key + pub fn get(&self, k: &K) -> Option<&V> { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { + Ok(i) => Some(&self.vals[i].2), + Err(_) => None, + } + } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. + /// In most case you will want to ignore the timestamp (second item of the tuple). + pub fn items(&self) -> &[(K, u64, V)] { + &self.vals[..] + } + + /// Returns the number of items in the map + pub fn len(&self) -> usize { + self.vals.len() + } + + /// Returns true if the map is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl<K, V> Crdt for LwwMap<K, V> +where + K: Clone + Ord, + V: Clone + Crdt, +{ + fn merge(&mut self, other: &Self) { + for (k, ts2, v2) in other.vals.iter() { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { + Ok(i) => { + let (_, ts1, _v1) = &self.vals[i]; + match ts2.cmp(ts1) { + Ordering::Greater => { + self.vals[i].1 = *ts2; + self.vals[i].2 = v2.clone(); + } + Ordering::Equal => { + self.vals[i].2.merge(v2); + } + Ordering::Less => (), + } + } + Err(i) => { + self.vals.insert(i, (k.clone(), *ts2, v2.clone())); + } + } + } + } +} + +impl<K, V> Default for LwwMap<K, V> +where + K: Ord, + V: Crdt, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/src/util/crdt/map.rs b/src/util/crdt/map.rs new file mode 100644 index 00000000..7553cd50 --- /dev/null +++ b/src/util/crdt/map.rs @@ -0,0 +1,99 @@ +use serde::{Deserialize, Serialize}; + +use crate::crdt::crdt::*; + +/// Simple CRDT Map +/// +/// This types defines a CRDT for a map from keys to values. Values are CRDT types which +/// can have their own updating logic. +/// +/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. +/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, +/// such that two values can be compared for equality based on their hashes). As a consequence, +/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps. +/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, +/// the serialization cost `O(n)` would still have to be paid at each modification, so we are +/// actually not losing anything here. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct Map<K, V> { + vals: Vec<(K, V)>, +} + +impl<K, V> Map<K, V> +where + K: Clone + Ord, + V: Clone + Crdt, +{ + /// Create a new empty map CRDT + pub fn new() -> Self { + Self { vals: vec![] } + } + + /// Returns a map that contains a single mapping from the specified key to the specified value. + /// This can be used to build a delta-mutator: + /// when merged with another map, the value will be added or CRDT-merged if a previous + /// value already exists. + pub fn put_mutator(k: K, v: V) -> Self { + Self { vals: vec![(k, v)] } + } + + /// Add a value to the map + pub fn put(&mut self, k: K, v: V) { + self.merge(&Self::put_mutator(k, v)); + } + + /// Removes all values from the map + pub fn clear(&mut self) { + self.vals.clear(); + } + + /// Get a reference to the value assigned to a key + pub fn get(&self, k: &K) -> Option<&V> { + match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) { + Ok(i) => Some(&self.vals[i].1), + Err(_) => None, + } + } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. + pub fn items(&self) -> &[(K, V)] { + &self.vals[..] + } + /// Returns the number of items in the map + pub fn len(&self) -> usize { + self.vals.len() + } + + /// Returns true if the map is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl<K, V> Crdt for Map<K, V> +where + K: Clone + Ord, + V: Clone + Crdt, +{ + fn merge(&mut self, other: &Self) { + for (k, v2) in other.vals.iter() { + match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) { + Ok(i) => { + self.vals[i].1.merge(v2); + } + Err(i) => { + self.vals.insert(i, (k.clone(), v2.clone())); + } + } + } + } +} + +impl<K, V> Default for Map<K, V> +where + K: Clone + Ord, + V: Clone + Crdt, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/src/util/crdt/mod.rs b/src/util/crdt/mod.rs new file mode 100644 index 00000000..9663a5a5 --- /dev/null +++ b/src/util/crdt/mod.rs @@ -0,0 +1,23 @@ +//! This package provides a simple implementation of conflict-free replicated data types (CRDTs) +//! +//! CRDTs are a type of data structures that do not require coordination. In other words, we can +//! edit them in parallel, we will always find a way to merge it. +//! +//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the +//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now, +//! it is easy to merge their counters, order does not count: we always get 4. +//! +//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) + +mod bool; +#[allow(clippy::module_inception)] +mod crdt; +mod lww; +mod lww_map; +mod map; + +pub use self::bool::*; +pub use crdt::*; +pub use lww::*; +pub use lww_map::*; +pub use map::*; |