From e02e9e035e269cd6b660c92706db424a2edb5306 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 21:15:24 +0100 Subject: Begin improve model to use better CRDTs --- src/table/crdt.rs | 175 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/table/lib.rs | 1 + 2 files changed, 176 insertions(+) create mode 100644 src/table/crdt.rs (limited to 'src/table') diff --git a/src/table/crdt.rs b/src/table/crdt.rs new file mode 100644 index 00000000..8f5e4d71 --- /dev/null +++ b/src/table/crdt.rs @@ -0,0 +1,175 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +pub trait CRDT { + fn merge(&mut self, other: &Self); +} + +impl CRDT for T +where T: Ord + Clone { + fn merge(&mut self, other: &Self) { + if other > self { + *self = other.clone(); + } + } +} + +// ---- LWW Register ---- + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWW +{ + ts: u64, + v: T, +} + +impl LWW +where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord +{ + pub fn new(value: T) -> Self { + Self { + ts: now_msec(), + v: value, + } + } + pub fn migrate_from_raw(ts: u64, value: T) -> Self { + Self { + ts, + v: value, + } + } + pub fn update(&mut self, new_value: T) { + self.ts = std::cmp::max(self.ts + 1, now_msec()); + self.v = new_value; + } + pub fn get(&self) -> &T { + &self.v + } +} + +impl CRDT for LWW +where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT +{ + fn merge(&mut self, other: &Self) { + if other.ts > self.ts { + self.ts = other.ts; + self.v = other.v.clone(); + } else if other.ts == self.ts { + self.v.merge(&other.v); + } + } +} + + +// ---- Boolean (true as absorbing state) ---- + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +pub struct Bool(bool); + +impl Bool { + pub fn new(b: bool) -> Self { + Self(b) + } + pub fn set(&mut self) { + self.0 = true; + } + pub fn get(&self) -> bool { + self.0 + } +} + +impl CRDT for Bool { + fn merge(&mut self, other: &Self) { + self.0 = self.0 || other.0; + } +} + + +// ---- LWW Map ---- + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWWMap +{ + vals: Vec<(K, u64, V)>, +} + +impl LWWMap +where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, +{ + pub fn new() -> Self { + Self{ + vals: vec![], + } + } + pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { + Self{ + vals: vec![(k, ts, v)], + } + } + pub fn take_and_clear(&mut self) -> Self { + let vals = std::mem::replace(&mut self.vals, vec![]); + Self{vals} + } + pub fn clear(&mut self) { + self.vals.clear(); + } + pub fn update_mutator(&self, k: K, new_v: V) -> Self { + let new_vals = match self + .vals + .binary_search_by(|(k2, _, _)| k2.cmp(&k)) + { + Ok(i) => { + let (_, old_ts, _) = self.vals[i]; + let new_ts = std::cmp::max(old_ts+1, now_msec()); + vec![(k, new_ts, new_v)] + } + Err(_) => { + vec![(k, now_msec(), new_v)] + } + }; + Self{ + vals: new_vals, + } + } + pub fn get(&self, k: &K) -> Option<&V> { + match self + .vals + .binary_search_by(|(k2, _, _)| k2.cmp(&k)) + { + Ok(i) => Some(&self.vals[i].2), + Err(_) => None + } + } + pub fn items(&self) -> &[(K, u64, V)] { + &self.vals[..] + } +} + +impl CRDT for LWWMap +where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, +{ + fn merge(&mut self, other: &Self) { + for (k, ts2, v2) in other.vals.iter() { + match self + .vals + .binary_search_by(|(k2, _, _)| k2.cmp(&k)) + { + Ok(i) => { + let (_, ts1, v1) = &self.vals[i]; + if ts2 > ts1 { + self.vals[i].1 = *ts2; + self.vals[i].2 = v2.clone(); + } else if ts1 == ts2 { + self.vals[i].2.merge(&v2); + } + } + Err(i) => { + self.vals.insert(i, (k.clone(), *ts2, v2.clone())); + } + } + } + } +} diff --git a/src/table/lib.rs b/src/table/lib.rs index 7684fe9d..e2bf1f46 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -5,6 +5,7 @@ extern crate log; pub mod schema; pub mod util; +pub mod crdt; pub mod table; pub mod table_fullcopy; -- cgit v1.2.3 From f8a04852a2cda84d4fd12ec19e90bbd8224bf4e9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:01:12 +0100 Subject: Convert bucket table to better CRDT representation --- src/table/crdt.rs | 73 ++++++++++++++++++++++------------------------------- src/table/lib.rs | 4 +-- src/table/schema.rs | 4 --- 3 files changed, 32 insertions(+), 49 deletions(-) (limited to 'src/table') diff --git a/src/table/crdt.rs b/src/table/crdt.rs index 8f5e4d71..7c888e3a 100644 --- a/src/table/crdt.rs +++ b/src/table/crdt.rs @@ -7,7 +7,9 @@ pub trait CRDT { } impl CRDT for T -where T: Ord + Clone { +where + T: Ord + Clone, +{ fn merge(&mut self, other: &Self) { if other > self { *self = other.clone(); @@ -18,14 +20,14 @@ where T: Ord + Clone { // ---- LWW Register ---- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWW -{ +pub struct LWW { ts: u64, v: T, } impl LWW -where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord +where + T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { pub fn new(value: T) -> Self { Self { @@ -34,10 +36,7 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } } pub fn migrate_from_raw(ts: u64, value: T) -> Self { - Self { - ts, - v: value, - } + Self { ts, v: value } } pub fn update(&mut self, new_value: T) { self.ts = std::cmp::max(self.ts + 1, now_msec()); @@ -46,10 +45,14 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part pub fn get(&self) -> &T { &self.v } + pub fn get_mut(&mut self) -> &mut T { + &mut self.v + } } impl CRDT for LWW -where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT +where + T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { fn merge(&mut self, other: &Self) { if other.ts > self.ts { @@ -61,7 +64,6 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } } - // ---- Boolean (true as absorbing state) ---- #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] @@ -85,61 +87,48 @@ impl CRDT for Bool { } } - // ---- LWW Map ---- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWWMap -{ +pub struct LWWMap { vals: Vec<(K, u64, V)>, } impl LWWMap -where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, +where + K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, { pub fn new() -> Self { - Self{ - vals: vec![], - } + Self { vals: vec![] } } pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { - Self{ + Self { vals: vec![(k, ts, v)], } } pub fn take_and_clear(&mut self) -> Self { let vals = std::mem::replace(&mut self.vals, vec![]); - Self{vals} + Self { vals } } pub fn clear(&mut self) { self.vals.clear(); } pub fn update_mutator(&self, k: K, new_v: V) -> Self { - let new_vals = match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { let (_, old_ts, _) = self.vals[i]; - let new_ts = std::cmp::max(old_ts+1, now_msec()); + let new_ts = std::cmp::max(old_ts + 1, now_msec()); vec![(k, new_ts, new_v)] } - Err(_) => { - vec![(k, now_msec(), new_v)] - } + Err(_) => vec![(k, now_msec(), new_v)], }; - Self{ - vals: new_vals, - } + Self { vals: new_vals } } pub fn get(&self, k: &K) -> Option<&V> { - match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => Some(&self.vals[i].2), - Err(_) => None + Err(_) => None, } } pub fn items(&self) -> &[(K, u64, V)] { @@ -148,17 +137,15 @@ where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } impl CRDT for LWWMap -where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, +where + K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, { fn merge(&mut self, other: &Self) { for (k, ts2, v2) in other.vals.iter() { - match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { - let (_, ts1, v1) = &self.vals[i]; + let (_, ts1, _v1) = &self.vals[i]; if ts2 > ts1 { self.vals[i].1 = *ts2; self.vals[i].2 = v2.clone(); diff --git a/src/table/lib.rs b/src/table/lib.rs index e2bf1f46..704f8f1e 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -3,9 +3,9 @@ #[macro_use] extern crate log; +pub mod crdt; pub mod schema; pub mod util; -pub mod crdt; pub mod table; pub mod table_fullcopy; @@ -13,5 +13,5 @@ pub mod table_sharded; pub mod table_sync; pub use schema::*; -pub use util::*; pub use table::*; +pub use util::*; diff --git a/src/table/schema.rs b/src/table/schema.rs index 49cede0a..d2ec9450 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -20,7 +20,6 @@ impl PartitionKey for Hash { } } - pub trait SortKey { fn sort_key(&self) -> &[u8]; } @@ -37,7 +36,6 @@ impl SortKey for Hash { } } - pub trait Entry: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { @@ -47,7 +45,6 @@ pub trait Entry: fn merge(&mut self, other: &Self); } - #[async_trait] pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; @@ -66,4 +63,3 @@ pub trait TableSchema: Send + Sync { true } } - -- cgit v1.2.3 From 173f0dbac98f7962c75663cf7ee37c700596b40d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:09:32 +0100 Subject: oops --- src/table/crdt.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table') diff --git a/src/table/crdt.rs b/src/table/crdt.rs index 7c888e3a..708d47f3 100644 --- a/src/table/crdt.rs +++ b/src/table/crdt.rs @@ -97,7 +97,7 @@ pub struct LWWMap { impl LWWMap where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { pub fn new() -> Self { Self { vals: vec![] } -- cgit v1.2.3 From 4f7f1d1cb3023af30b0741b04c8b9bcd900f5cc7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:23:55 +0100 Subject: less type bounds --- src/table/crdt.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src/table') diff --git a/src/table/crdt.rs b/src/table/crdt.rs index 708d47f3..2b903cf0 100644 --- a/src/table/crdt.rs +++ b/src/table/crdt.rs @@ -27,7 +27,7 @@ pub struct LWW { impl LWW where - T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, + T: CRDT, { pub fn new(value: T) -> Self { Self { @@ -52,7 +52,7 @@ where impl CRDT for LWW where - T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, + T: Clone + CRDT, { fn merge(&mut self, other: &Self) { if other.ts > self.ts { @@ -96,8 +96,8 @@ pub struct LWWMap { impl LWWMap where - K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, + K: Ord, + V: CRDT, { pub fn new() -> Self { Self { vals: vec![] } @@ -138,8 +138,8 @@ where impl CRDT for LWWMap where - K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, + K: Clone + Ord, + V: Clone + CRDT, { fn merge(&mut self, other: &Self) { for (k, ts2, v2) in other.vals.iter() { -- cgit v1.2.3 From f9be964c3f3efd10f1bd9cf752d839f8133edcc1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:33:30 +0100 Subject: Warning when cannot decode entry (data format incompatibilities) --- src/table/table.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'src/table') diff --git a/src/table/table.rs b/src/table/table.rs index 2beac3f4..54a42d34 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,6 +2,8 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; +use log::warn; + use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; @@ -185,7 +187,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = Self::decode_entry(v_bytes.as_slice())?; + let v = self.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -241,7 +243,7 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = Self::decode_entry(entry_bytes.as_slice())?; + let entry = self.decode_entry(entry_bytes.as_slice())?; let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { @@ -363,7 +365,7 @@ where let keep = match filter { None => true, Some(f) => { - let entry = Self::decode_entry(value.as_ref())?; + let entry = self.decode_entry(value.as_ref())?; F::matches_filter(&entry, f) } }; @@ -382,14 +384,14 @@ where let mut epidemic_propagate = vec![]; for update_bytes in entries.iter() { - let update = Self::decode_entry(update_bytes.as_slice())?; + let update = self.decode_entry(update_bytes.as_slice())?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = match db.get(&tree_key)? { Some(prev_bytes) => { - let old_entry = Self::decode_entry(&prev_bytes) + let old_entry = self.decode_entry(&prev_bytes) .map_err(sled::ConflictableTransactionError::Abort)?; let mut new_entry = old_entry.clone(); new_entry.merge(&update); @@ -437,7 +439,7 @@ where break; } if let Some(old_val) = self.store.remove(&key)? { - let old_entry = Self::decode_entry(&old_val)?; + let old_entry = self.decode_entry(&old_val)?; self.instance.updated(Some(old_entry), None).await?; self.system .background @@ -455,12 +457,15 @@ where ret } - fn decode_entry(bytes: &[u8]) -> Result { + fn decode_entry(&self, bytes: &[u8]) -> Result { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { Some(x) => Ok(x), - None => Err(e.into()), + None => { + warn!("Unable to decode entry of {}: {}", self.name, e); + Err(e.into()) + } }, } } -- cgit v1.2.3 From a8b3c8fd5898400d64310f61bb1a41dd5fefe5ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Nov 2020 23:37:34 +0100 Subject: data hexdump in warning --- src/table/Cargo.toml | 1 + src/table/table.rs | 3 +++ 2 files changed, 4 insertions(+) (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 1963f3da..945763fa 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -21,6 +21,7 @@ rand = "0.7" hex = "0.3" arc-swap = "0.4" log = "0.4" +hexdump = "0.1" sled = "0.31" diff --git a/src/table/table.rs b/src/table/table.rs index 54a42d34..5dfee3c8 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -464,6 +464,9 @@ where Some(x) => Ok(x), None => { warn!("Unable to decode entry of {}: {}", self.name, e); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } Err(e.into()) } }, -- cgit v1.2.3