diff options
author | Alex Auvolat <alex@adnab.me> | 2020-11-20 23:01:12 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-11-20 23:53:54 +0100 |
commit | f8a04852a2cda84d4fd12ec19e90bbd8224bf4e9 (patch) | |
tree | bab388b5958fa0ae2bd8135ed137a54a0d97fc80 /src/table/crdt.rs | |
parent | e02e9e035e269cd6b660c92706db424a2edb5306 (diff) | |
download | garage-f8a04852a2cda84d4fd12ec19e90bbd8224bf4e9.tar.gz garage-f8a04852a2cda84d4fd12ec19e90bbd8224bf4e9.zip |
Convert bucket table to better CRDT representation
Diffstat (limited to 'src/table/crdt.rs')
-rw-r--r-- | src/table/crdt.rs | 73 |
1 files changed, 30 insertions, 43 deletions
diff --git a/src/table/crdt.rs b/src/table/crdt.rs index 8f5e4d71..7c888e3a 100644 --- a/src/table/crdt.rs +++ b/src/table/crdt.rs @@ -7,7 +7,9 @@ pub trait CRDT { } impl<T> CRDT for T -where T: Ord + Clone { +where + T: Ord + Clone, +{ fn merge(&mut self, other: &Self) { if other > self { *self = other.clone(); @@ -18,14 +20,14 @@ where T: Ord + Clone { // ---- LWW Register ---- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWW<T> -{ +pub struct LWW<T> { ts: u64, v: T, } impl<T> LWW<T> -where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord +where + T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { pub fn new(value: T) -> Self { Self { @@ -34,10 +36,7 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } } pub fn migrate_from_raw(ts: u64, value: T) -> Self { - Self { - ts, - v: value, - } + Self { ts, v: value } } pub fn update(&mut self, new_value: T) { self.ts = std::cmp::max(self.ts + 1, now_msec()); @@ -46,10 +45,14 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part pub fn get(&self) -> &T { &self.v } + pub fn get_mut(&mut self) -> &mut T { + &mut self.v + } } impl<T> CRDT for LWW<T> -where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT +where + T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { fn merge(&mut self, other: &Self) { if other.ts > self.ts { @@ -61,7 +64,6 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } } - // ---- Boolean (true as absorbing state) ---- #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] @@ -85,61 +87,48 @@ impl CRDT for Bool { } } - // ---- LWW Map ---- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWWMap<K, V> -{ +pub struct LWWMap<K, V> { vals: Vec<(K, u64, V)>, } impl<K, V> LWWMap<K, V> -where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, +where + K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, { pub fn new() -> Self { - Self{ - vals: vec![], - } + Self { vals: vec![] } } pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { - Self{ + Self { vals: vec![(k, ts, v)], } } pub fn take_and_clear(&mut self) -> Self { let vals = std::mem::replace(&mut self.vals, vec![]); - Self{vals} + Self { vals } } pub fn clear(&mut self) { self.vals.clear(); } pub fn update_mutator(&self, k: K, new_v: V) -> Self { - let new_vals = match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { let (_, old_ts, _) = self.vals[i]; - let new_ts = std::cmp::max(old_ts+1, now_msec()); + let new_ts = std::cmp::max(old_ts + 1, now_msec()); vec![(k, new_ts, new_v)] } - Err(_) => { - vec![(k, now_msec(), new_v)] - } + Err(_) => vec![(k, now_msec(), new_v)], }; - Self{ - vals: new_vals, - } + Self { vals: new_vals } } pub fn get(&self, k: &K) -> Option<&V> { - match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => Some(&self.vals[i].2), - Err(_) => None + Err(_) => None, } } pub fn items(&self) -> &[(K, u64, V)] { @@ -148,17 +137,15 @@ where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } impl<K, V> CRDT for LWWMap<K, V> -where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, +where + K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, { fn merge(&mut self, other: &Self) { for (k, ts2, v2) in other.vals.iter() { - match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { - let (_, ts1, v1) = &self.vals[i]; + let (_, ts1, _v1) = &self.vals[i]; if ts2 > ts1 { self.vals[i].1 = *ts2; self.vals[i].2 = v2.clone(); |