diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/Cargo.toml | 11 | ||||
-rw-r--r-- | src/table/crdt.rs | 327 | ||||
-rw-r--r-- | src/table/crdt/bool.rs | 34 | ||||
-rw-r--r-- | src/table/crdt/crdt.rs | 73 | ||||
-rw-r--r-- | src/table/crdt/lww.rs | 114 | ||||
-rw-r--r-- | src/table/crdt/lww_map.rs | 145 | ||||
-rw-r--r-- | src/table/crdt/map.rs | 83 | ||||
-rw-r--r-- | src/table/crdt/mod.rs | 22 | ||||
-rw-r--r-- | src/table/data.rs | 254 | ||||
-rw-r--r-- | src/table/gc.rs | 248 | ||||
-rw-r--r-- | src/table/lib.rs | 8 | ||||
-rw-r--r-- | src/table/merkle.rs | 454 | ||||
-rw-r--r-- | src/table/replication/fullcopy.rs | 51 | ||||
-rw-r--r-- | src/table/replication/mod.rs | 6 | ||||
-rw-r--r-- | src/table/replication/parameters.rs | 21 | ||||
-rw-r--r-- | src/table/replication/sharded.rs (renamed from src/table/table_sharded.rs) | 31 | ||||
-rw-r--r-- | src/table/schema.rs | 10 | ||||
-rw-r--r-- | src/table/sync.rs | 614 | ||||
-rw-r--r-- | src/table/table.rs | 229 | ||||
-rw-r--r-- | src/table/table_fullcopy.rs | 59 | ||||
-rw-r--r-- | src/table/table_sync.rs | 891 |
21 files changed, 2191 insertions, 1494 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6485a542..f9d98dec 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -16,21 +16,18 @@ path = "lib.rs" garage_util = { version = "0.1.1", path = "../util" } garage_rpc = { version = "0.1.1", path = "../rpc" } -bytes = "0.4" -rand = "0.7" -hex = "0.3" -arc-swap = "0.4" +bytes = "1.0" +rand = "0.8" log = "0.4" hexdump = "0.1" sled = "0.34" -rmp-serde = "0.14.3" +rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" -async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/table/crdt.rs b/src/table/crdt.rs deleted file mode 100644 index 4cba10ce..00000000 --- a/src/table/crdt.rs +++ /dev/null @@ -1,327 +0,0 @@ -//! This package provides a simple implementation of conflict-free replicated data types (CRDTs) -//! -//! CRDTs are a type of data structures that do not require coordination. In other words, we can -//! edit them in parallel, we will always find a way to merge it. -//! -//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the -//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now, -//! it is easy to merge their counters, order does not count: we always get 4. -//! -//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) - -use serde::{Deserialize, Serialize}; - -use garage_util::data::*; - -/// Definition of a CRDT - all CRDT Rust types implement this. -/// -/// A CRDT is defined as a merge operator that respects a certain set of axioms. -/// -/// In particular, the merge operator must be commutative, associative, -/// idempotent, and monotonic. -/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator, -/// the following axioms must apply: -/// -/// ```text -/// a ⊔ b = b ⊔ a (commutativity) -/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity) -/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence) -/// ``` -/// -/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order. -/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`, -/// as this would imply a cycle in the partial order. -pub trait CRDT { - /// Merge the two datastructures according to the CRDT rules. - /// `self` is modified to contain the merged CRDT value. `other` is not modified. - /// - /// # Arguments - /// - /// * `other` - the other CRDT we wish to merge with - fn merge(&mut self, other: &Self); -} - -/// All types that implement `Ord` (a total order) also implement a trivial CRDT -/// defined by the merge rule: `a ⊔ b = max(a, b)`. -impl<T> CRDT for T -where - T: Ord + Clone, -{ - fn merge(&mut self, other: &Self) { - if other > self { - *self = other.clone(); - } - } -} - -// ---- LWW Register ---- - -/// Last Write Win (LWW) -/// -/// An LWW CRDT associates a timestamp with a value, in order to implement a -/// time-based reconciliation rule: the most recent write wins. -/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs -/// with the same timestamp but different values. -/// -/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must -/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to -/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types -/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value. -/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is -/// generally desirable in this case to never explicitly produce LWW values with the same timestamp -/// but different inner values, as the rule to keep the maximum value isn't generally the desired -/// semantics.) -/// -/// As multiple computers clocks are always desynchronized, -/// when operations are close enough, it is equivalent to -/// take one copy and drop the other one. -/// -/// Given that clocks are not too desynchronized, this assumption -/// is enough for most cases, as there is few chance that two humans -/// coordonate themself faster than the time difference between two NTP servers. -/// -/// As a more concret example, let's suppose you want to upload a file -/// with the same key (path) in the same bucket at the very same time. -/// For each request, the file will be timestamped by the receiving server -/// and may differ from what you observed with your atomic clock! -/// -/// This scheme is used by AWS S3 or Soundcloud and often without knowing -/// in entreprise when reconciliating databases with ad-hoc scripts. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWW<T> { - ts: u64, - v: T, -} - -impl<T> LWW<T> -where - T: CRDT, -{ - /// Creates a new CRDT - /// - /// CRDT's internal timestamp is set with current node's clock. - pub fn new(value: T) -> Self { - Self { - ts: now_msec(), - v: value, - } - } - - /// Build a new CRDT from a previous non-compatible one - /// - /// Compared to new, the CRDT's timestamp is not set to now - /// but must be set to the previous, non-compatible, CRDT's timestamp. - pub fn migrate_from_raw(ts: u64, value: T) -> Self { - Self { ts, v: value } - } - - /// Update the LWW CRDT while keeping some causal ordering. - /// - /// The timestamp of the LWW CRDT is updated to be the current node's clock - /// at time of update, or the previous timestamp + 1 if that's bigger, - /// so that the new timestamp is always strictly larger than the previous one. - /// This ensures that merging the update with the old value will result in keeping - /// the updated value. - pub fn update(&mut self, new_value: T) { - self.ts = std::cmp::max(self.ts + 1, now_msec()); - self.v = new_value; - } - - /// Get the CRDT value - pub fn get(&self) -> &T { - &self.v - } - - /// Get a mutable reference to the CRDT's value - /// - /// This is usefull to mutate the inside value without changing the LWW timestamp. - /// When such mutation is done, the merge between two LWW values is done using the inner - /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large - /// data type, such as a map, and we only want to change a single item in the map. - /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification. - /// This delta consists in a LWW with the same timestamp, and the map - /// inside only contains the updated value. - /// The advantage of such a delta is that it is much smaller than the whole map. - /// - /// Avoid using this if the inner data type is a primitive type such as a number or a string, - /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum - /// of both values. - pub fn get_mut(&mut self) -> &mut T { - &mut self.v - } -} - -impl<T> CRDT for LWW<T> -where - T: Clone + CRDT, -{ - fn merge(&mut self, other: &Self) { - if other.ts > self.ts { - self.ts = other.ts; - self.v = other.v.clone(); - } else if other.ts == self.ts { - self.v.merge(&other.v); - } - } -} - -/// Boolean, where `true` is an absorbing state -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] -pub struct Bool(bool); - -impl Bool { - /// Create a new boolean with the specified value - pub fn new(b: bool) -> Self { - Self(b) - } - /// Set the boolean to true - pub fn set(&mut self) { - self.0 = true; - } - /// Get the boolean value - pub fn get(&self) -> bool { - self.0 - } -} - -impl CRDT for Bool { - fn merge(&mut self, other: &Self) { - self.0 = self.0 || other.0; - } -} - -/// Last Write Win Map -/// -/// This types defines a CRDT for a map from keys to values. -/// The values have an associated timestamp, such that the last written value -/// takes precedence over previous ones. As for the simpler `LWW` type, the value -/// type `V` is also required to implement the CRDT trait. -/// We do not encourage mutating the values associated with a given key -/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()` -/// method that would allow that. -/// -/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. -/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, -/// such that two values can be compared for equality based on their hashes). As a consequence, -/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps. -/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, -/// the serialization cost `O(n)` would still have to be paid at each modification, so we are -/// actually not losing anything here. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWWMap<K, V> { - vals: Vec<(K, u64, V)>, -} - -impl<K, V> LWWMap<K, V> -where - K: Ord, - V: CRDT, -{ - /// Create a new empty map CRDT - pub fn new() -> Self { - Self { vals: vec![] } - } - /// Used to migrate from a map defined in an incompatible format. This produces - /// a map that contains a single item with the specified timestamp (copied from - /// the incompatible format). Do this as many times as you have items to migrate, - /// and put them all together using the CRDT merge operator. - pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { - Self { - vals: vec![(k, ts, v)], - } - } - /// Returns a map that contains a single mapping from the specified key to the specified value. - /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map, - /// the previous value will be replaced with the one specified here. - /// The timestamp in the provided mutator is set to the maximum of the current system's clock - /// and 1 + the previous value's timestamp (if there is one), so that the new value will always - /// take precedence (LWW rule). - /// - /// Typically, to update the value associated to a key in the map, you would do the following: - /// - /// ```ignore - /// let my_update = my_crdt.update_mutator(key_to_modify, new_value); - /// my_crdt.merge(&my_update); - /// ``` - /// - /// However extracting the mutator on its own and only sending that on the network is very - /// interesting as it is much smaller than the whole map. - pub fn update_mutator(&self, k: K, new_v: V) -> Self { - let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { - Ok(i) => { - let (_, old_ts, _) = self.vals[i]; - let new_ts = std::cmp::max(old_ts + 1, now_msec()); - vec![(k, new_ts, new_v)] - } - Err(_) => vec![(k, now_msec(), new_v)], - }; - Self { vals: new_vals } - } - /// Takes all of the values of the map and returns them. The current map is reset to the - /// empty map. This is very usefull to produce in-place a new map that contains only a delta - /// that modifies a certain value: - /// - /// ```ignore - /// let mut a = get_my_crdt_value(); - /// let old_a = a.take_and_clear(); - /// a.merge(&old_a.update_mutator(key_to_modify, new_value)); - /// put_my_crdt_value(a); - /// ``` - /// - /// Of course in this simple example we could have written simply - /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`, - /// but in the case where the map is a field in a struct for instance (as is always the case), - /// this becomes very handy: - /// - /// ```ignore - /// let mut a = get_my_crdt_value(); - /// let old_a_map = a.map_field.take_and_clear(); - /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value)); - /// put_my_crdt_value(a); - /// ``` - pub fn take_and_clear(&mut self) -> Self { - let vals = std::mem::replace(&mut self.vals, vec![]); - Self { vals } - } - /// Removes all values from the map - pub fn clear(&mut self) { - self.vals.clear(); - } - /// Get a reference to the value assigned to a key - pub fn get(&self, k: &K) -> Option<&V> { - match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { - Ok(i) => Some(&self.vals[i].2), - Err(_) => None, - } - } - /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. - /// In most case you will want to ignore the timestamp (second item of the tuple). - pub fn items(&self) -> &[(K, u64, V)] { - &self.vals[..] - } -} - -impl<K, V> CRDT for LWWMap<K, V> -where - K: Clone + Ord, - V: Clone + CRDT, -{ - fn merge(&mut self, other: &Self) { - for (k, ts2, v2) in other.vals.iter() { - match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { - Ok(i) => { - let (_, ts1, _v1) = &self.vals[i]; - if ts2 > ts1 { - self.vals[i].1 = *ts2; - self.vals[i].2 = v2.clone(); - } else if ts1 == ts2 { - self.vals[i].2.merge(&v2); - } - } - Err(i) => { - self.vals.insert(i, (k.clone(), *ts2, v2.clone())); - } - } - } - } -} diff --git a/src/table/crdt/bool.rs b/src/table/crdt/bool.rs new file mode 100644 index 00000000..1989c92e --- /dev/null +++ b/src/table/crdt/bool.rs @@ -0,0 +1,34 @@ +use serde::{Deserialize, Serialize}; + +use crate::crdt::crdt::*; + +/// Boolean, where `true` is an absorbing state +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +pub struct Bool(bool); + +impl Bool { + /// Create a new boolean with the specified value + pub fn new(b: bool) -> Self { + Self(b) + } + /// Set the boolean to true + pub fn set(&mut self) { + self.0 = true; + } + /// Get the boolean value + pub fn get(&self) -> bool { + self.0 + } +} + +impl From<bool> for Bool { + fn from(b: bool) -> Bool { + Bool::new(b) + } +} + +impl CRDT for Bool { + fn merge(&mut self, other: &Self) { + self.0 = self.0 || other.0; + } +} diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs new file mode 100644 index 00000000..636b6df6 --- /dev/null +++ b/src/table/crdt/crdt.rs @@ -0,0 +1,73 @@ +use garage_util::data::*; + +/// Definition of a CRDT - all CRDT Rust types implement this. +/// +/// A CRDT is defined as a merge operator that respects a certain set of axioms. +/// +/// In particular, the merge operator must be commutative, associative, +/// idempotent, and monotonic. +/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator, +/// the following axioms must apply: +/// +/// ```text +/// a ⊔ b = b ⊔ a (commutativity) +/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity) +/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence) +/// ``` +/// +/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order. +/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`, +/// as this would imply a cycle in the partial order. +pub trait CRDT { + /// Merge the two datastructures according to the CRDT rules. + /// `self` is modified to contain the merged CRDT value. `other` is not modified. + /// + /// # Arguments + /// + /// * `other` - the other CRDT we wish to merge with + fn merge(&mut self, other: &Self); +} + +/// All types that implement `Ord` (a total order) can also implement a trivial CRDT +/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type +/// to enable this behavior. +pub trait AutoCRDT: Ord + Clone + std::fmt::Debug { + /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if + /// different values in your application should never happen. Set this to false + /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`. + const WARN_IF_DIFFERENT: bool; +} + +impl<T> CRDT for T +where + T: AutoCRDT, +{ + fn merge(&mut self, other: &Self) { + if Self::WARN_IF_DIFFERENT && self != other { + warn!( + "Different CRDT values should be the same (logic error!): {:?} vs {:?}", + self, other + ); + if other > self { + *self = other.clone(); + } + warn!("Making an arbitrary choice: {:?}", self); + } else { + if other > self { + *self = other.clone(); + } + } + } +} + +impl AutoCRDT for String { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCRDT for bool { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCRDT for FixedBytes32 { + const WARN_IF_DIFFERENT: bool = true; +} diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs new file mode 100644 index 00000000..25ecdb07 --- /dev/null +++ b/src/table/crdt/lww.rs @@ -0,0 +1,114 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::time::now_msec; + +use crate::crdt::crdt::*; + +/// Last Write Win (LWW) +/// +/// An LWW CRDT associates a timestamp with a value, in order to implement a +/// time-based reconciliation rule: the most recent write wins. +/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs +/// with the same timestamp but different values. +/// +/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must +/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to +/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types +/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value. +/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is +/// generally desirable in this case to never explicitly produce LWW values with the same timestamp +/// but different inner values, as the rule to keep the maximum value isn't generally the desired +/// semantics.) +/// +/// As multiple computers clocks are always desynchronized, +/// when operations are close enough, it is equivalent to +/// take one copy and drop the other one. +/// +/// Given that clocks are not too desynchronized, this assumption +/// is enough for most cases, as there is few chance that two humans +/// coordonate themself faster than the time difference between two NTP servers. +/// +/// As a more concret example, let's suppose you want to upload a file +/// with the same key (path) in the same bucket at the very same time. +/// For each request, the file will be timestamped by the receiving server +/// and may differ from what you observed with your atomic clock! +/// +/// This scheme is used by AWS S3 or Soundcloud and often without knowing +/// in entreprise when reconciliating databases with ad-hoc scripts. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWW<T> { + ts: u64, + v: T, +} + +impl<T> LWW<T> +where + T: CRDT, +{ + /// Creates a new CRDT + /// + /// CRDT's internal timestamp is set with current node's clock. + pub fn new(value: T) -> Self { + Self { + ts: now_msec(), + v: value, + } + } + + /// Build a new CRDT from a previous non-compatible one + /// + /// Compared to new, the CRDT's timestamp is not set to now + /// but must be set to the previous, non-compatible, CRDT's timestamp. + pub fn migrate_from_raw(ts: u64, value: T) -> Self { + Self { ts, v: value } + } + + /// Update the LWW CRDT while keeping some causal ordering. + /// + /// The timestamp of the LWW CRDT is updated to be the current node's clock + /// at time of update, or the previous timestamp + 1 if that's bigger, + /// so that the new timestamp is always strictly larger than the previous one. + /// This ensures that merging the update with the old value will result in keeping + /// the updated value. + pub fn update(&mut self, new_value: T) { + self.ts = std::cmp::max(self.ts + 1, now_msec()); + self.v = new_value; + } + + /// Get the CRDT value + pub fn get(&self) -> &T { + &self.v + } + + /// Get a mutable reference to the CRDT's value + /// + /// This is usefull to mutate the inside value without changing the LWW timestamp. + /// When such mutation is done, the merge between two LWW values is done using the inner + /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large + /// data type, such as a map, and we only want to change a single item in the map. + /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification. + /// This delta consists in a LWW with the same timestamp, and the map + /// inside only contains the updated value. + /// The advantage of such a delta is that it is much smaller than the whole map. + /// + /// Avoid using this if the inner data type is a primitive type such as a number or a string, + /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum + /// of both values. + pub fn get_mut(&mut self) -> &mut T { + &mut self.v + } +} + +impl<T> CRDT for LWW<T> +where + T: Clone + CRDT, +{ + fn merge(&mut self, other: &Self) { + if other.ts > self.ts { + self.ts = other.ts; + self.v = other.v.clone(); + } else if other.ts == self.ts { + self.v.merge(&other.v); + } + } +} diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs new file mode 100644 index 00000000..7b372191 --- /dev/null +++ b/src/table/crdt/lww_map.rs @@ -0,0 +1,145 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::time::now_msec; + +use crate::crdt::crdt::*; + +/// Last Write Win Map +/// +/// This types defines a CRDT for a map from keys to values. +/// The values have an associated timestamp, such that the last written value +/// takes precedence over previous ones. As for the simpler `LWW` type, the value +/// type `V` is also required to implement the CRDT trait. +/// We do not encourage mutating the values associated with a given key +/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()` +/// method that would allow that. +/// +/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. +/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, +/// such that two values can be compared for equality based on their hashes). As a consequence, +/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps. +/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, +/// the serialization cost `O(n)` would still have to be paid at each modification, so we are +/// actually not losing anything here. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWWMap<K, V> { + vals: Vec<(K, u64, V)>, +} + +impl<K, V> LWWMap<K, V> +where + K: Ord, + V: CRDT, +{ + /// Create a new empty map CRDT + pub fn new() -> Self { + Self { vals: vec![] } + } + /// Used to migrate from a map defined in an incompatible format. This produces + /// a map that contains a single item with the specified timestamp (copied from + /// the incompatible format). Do this as many times as you have items to migrate, + /// and put them all together using the CRDT merge operator. + pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { + Self { + vals: vec![(k, ts, v)], + } + } + /// Returns a map that contains a single mapping from the specified key to the specified value. + /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map, + /// the previous value will be replaced with the one specified here. + /// The timestamp in the provided mutator is set to the maximum of the current system's clock + /// and 1 + the previous value's timestamp (if there is one), so that the new value will always + /// take precedence (LWW rule). + /// + /// Typically, to update the value associated to a key in the map, you would do the following: + /// + /// ```ignore + /// let my_update = my_crdt.update_mutator(key_to_modify, new_value); + /// my_crdt.merge(&my_update); + /// ``` + /// + /// However extracting the mutator on its own and only sending that on the network is very + /// interesting as it is much smaller than the whole map. + pub fn update_mutator(&self, k: K, new_v: V) -> Self { + let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => { + let (_, old_ts, _) = self.vals[i]; + let new_ts = std::cmp::max(old_ts + 1, now_msec()); + vec![(k, new_ts, new_v)] + } + Err(_) => vec![(k, now_msec(), new_v)], + }; + Self { vals: new_vals } + } + /// Takes all of the values of the map and returns them. The current map is reset to the + /// empty map. This is very usefull to produce in-place a new map that contains only a delta + /// that modifies a certain value: + /// + /// ```ignore + /// let mut a = get_my_crdt_value(); + /// let old_a = a.take_and_clear(); + /// a.merge(&old_a.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + /// + /// Of course in this simple example we could have written simply + /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`, + /// but in the case where the map is a field in a struct for instance (as is always the case), + /// this becomes very handy: + /// + /// ```ignore + /// let mut a = get_my_crdt_value(); + /// let old_a_map = a.map_field.take_and_clear(); + /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + pub fn take_and_clear(&mut self) -> Self { + let vals = std::mem::replace(&mut self.vals, vec![]); + Self { vals } + } + /// Removes all values from the map + pub fn clear(&mut self) { + self.vals.clear(); + } + /// Get a reference to the value assigned to a key + pub fn get(&self, k: &K) -> Option<&V> { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => Some(&self.vals[i].2), + Err(_) => None, + } + } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. + /// In most case you will want to ignore the timestamp (second item of the tuple). + pub fn items(&self) -> &[(K, u64, V)] { + &self.vals[..] + } + /// Returns the number of items in the map + pub fn len(&self) -> usize { + self.vals.len() + } +} + +impl<K, V> CRDT for LWWMap<K, V> +where + K: Clone + Ord, + V: Clone + CRDT, +{ + fn merge(&mut self, other: &Self) { + for (k, ts2, v2) in other.vals.iter() { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => { + let (_, ts1, _v1) = &self.vals[i]; + if ts2 > ts1 { + self.vals[i].1 = *ts2; + self.vals[i].2 = v2.clone(); + } else if ts1 == ts2 { + self.vals[i].2.merge(&v2); + } + } + Err(i) => { + self.vals.insert(i, (k.clone(), *ts2, v2.clone())); + } + } + } + } +} diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs new file mode 100644 index 00000000..1193e6db --- /dev/null +++ b/src/table/crdt/map.rs @@ -0,0 +1,83 @@ +use serde::{Deserialize, Serialize}; + +use crate::crdt::crdt::*; + +/// Simple CRDT Map +/// +/// This types defines a CRDT for a map from keys to values. Values are CRDT types which +/// can have their own updating logic. +/// +/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. +/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, +/// such that two values can be compared for equality based on their hashes). As a consequence, +/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps. +/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, +/// the serialization cost `O(n)` would still have to be paid at each modification, so we are +/// actually not losing anything here. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct Map<K, V> { + vals: Vec<(K, V)>, +} + +impl<K, V> Map<K, V> +where + K: Clone + Ord, + V: Clone + CRDT, +{ + /// Create a new empty map CRDT + pub fn new() -> Self { + Self { vals: vec![] } + } + + /// Returns a map that contains a single mapping from the specified key to the specified value. + /// This can be used to build a delta-mutator: + /// when merged with another map, the value will be added or CRDT-merged if a previous + /// value already exists. + pub fn put_mutator(k: K, v: V) -> Self { + Self { vals: vec![(k, v)] } + } + + pub fn put(&mut self, k: K, v: V) { + self.merge(&Self::put_mutator(k, v)); + } + + /// Removes all values from the map + pub fn clear(&mut self) { + self.vals.clear(); + } + + /// Get a reference to the value assigned to a key + pub fn get(&self, k: &K) -> Option<&V> { + match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) { + Ok(i) => Some(&self.vals[i].1), + Err(_) => None, + } + } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. + pub fn items(&self) -> &[(K, V)] { + &self.vals[..] + } + /// Returns the number of items in the map + pub fn len(&self) -> usize { + self.vals.len() + } +} + +impl<K, V> CRDT for Map<K, V> +where + K: Clone + Ord, + V: Clone + CRDT, +{ + fn merge(&mut self, other: &Self) { + for (k, v2) in other.vals.iter() { + match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) { + Ok(i) => { + self.vals[i].1.merge(&v2); + } + Err(i) => { + self.vals.insert(i, (k.clone(), v2.clone())); + } + } + } + } +} diff --git a/src/table/crdt/mod.rs b/src/table/crdt/mod.rs new file mode 100644 index 00000000..eb75d061 --- /dev/null +++ b/src/table/crdt/mod.rs @@ -0,0 +1,22 @@ +//! This package provides a simple implementation of conflict-free replicated data types (CRDTs) +//! +//! CRDTs are a type of data structures that do not require coordination. In other words, we can +//! edit them in parallel, we will always find a way to merge it. +//! +//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the +//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now, +//! it is easy to merge their counters, order does not count: we always get 4. +//! +//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) + +mod bool; +mod crdt; +mod lww; +mod lww_map; +mod map; + +pub use self::bool::*; +pub use crdt::*; +pub use lww::*; +pub use lww_map::*; +pub use map::*; diff --git a/src/table/data.rs b/src/table/data.rs new file mode 100644 index 00000000..e07a21d2 --- /dev/null +++ b/src/table/data.rs @@ -0,0 +1,254 @@ +use core::borrow::Borrow; +use std::sync::Arc; + +use log::warn; +use serde_bytes::ByteBuf; +use sled::Transactional; +use tokio::sync::Notify; + +use garage_util::data::*; +use garage_util::error::*; + +use garage_rpc::membership::System; + +use crate::crdt::CRDT; +use crate::replication::*; +use crate::schema::*; + +pub struct TableData<F: TableSchema, R: TableReplication> { + system: Arc<System>, + + pub name: String, + pub(crate) instance: F, + pub(crate) replication: R, + + pub store: sled::Tree, + + pub(crate) merkle_tree: sled::Tree, + pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_todo_notify: Notify, + pub(crate) gc_todo: sled::Tree, +} + +impl<F, R> TableData<F, R> +where + F: TableSchema, + R: TableReplication, +{ + pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { + let store = db + .open_tree(&format!("{}:table", name)) + .expect("Unable to open DB tree"); + + let merkle_tree = db + .open_tree(&format!("{}:merkle_tree", name)) + .expect("Unable to open DB Merkle tree tree"); + let merkle_todo = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); + + let gc_todo = db + .open_tree(&format!("{}:gc_todo", name)) + .expect("Unable to open DB tree"); + + Arc::new(Self { + system, + name, + instance, + replication, + store, + merkle_tree, + merkle_todo, + merkle_todo_notify: Notify::new(), + gc_todo, + }) + } + + // Read functions + + pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { + let tree_key = self.tree_key(p, s); + if let Some(bytes) = self.store.get(&tree_key)? { + Ok(Some(ByteBuf::from(bytes.to_vec()))) + } else { + Ok(None) + } + } + + pub fn read_range( + &self, + p: &F::P, + s: &Option<F::S>, + filter: &Option<F::Filter>, + limit: usize, + ) -> Result<Vec<Arc<ByteBuf>>, Error> { + let partition_hash = p.hash(); + let first_key = match s { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(p, sk), + }; + let mut ret = vec![]; + for item in self.store.range(first_key..) { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let keep = match filter { + None => true, + Some(f) => { + let entry = self.decode_entry(value.as_ref())?; + F::matches_filter(&entry, f) + } + }; + if keep { + ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + } + if ret.len() >= limit { + break; + } + } + Ok(ret) + } + + // Mutation functions + // When changing this code, take care of propagating modifications correctly: + // - When an entry is modified or deleted, call the updated() function + // on the table instance + // - When an entry is modified or deleted, add it to the merkle updater's todo list. + // This has to be done atomically with the modification for the merkle updater + // to maintain consistency. The merkle updater must then be notified with todo_notify. + // - When an entry is updated to be a tombstone, add it to the gc_todo tree + + pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> { + for update_bytes in entries.iter() { + self.update_entry(update_bytes.borrow().as_slice())?; + } + Ok(()) + } + + pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { + let update = self.decode_entry(update_bytes)?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { + let (old_entry, new_entry) = match store.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let new_bytes_hash = blake2sum(&new_bytes[..]); + mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; + store.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry, new_bytes_hash))) + } else { + Ok(None) + } + })?; + + if let Some((old_entry, new_entry, new_bytes_hash)) = changed { + let is_tombstone = new_entry.is_tombstone(); + self.instance.updated(old_entry, Some(new_entry)); + self.merkle_todo_notify.notify_one(); + if is_tombstone { + // We are only responsible for GC'ing this item if we are the + // "leader" of the partition, i.e. the first node in the + // set of nodes that replicates this partition. + // This avoids GC loops and does not change the termination properties + // of the GC algorithm, as in all cases GC is suspended if + // any node of the partition is unavailable. + let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); + let nodes = self.replication.write_nodes(&pk_hash); + if nodes.first() == Some(&self.system.id) { + self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; + } + } + } + + Ok(()) + } + + pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { + let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if cur_v == v { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(true); + } + } + Ok(false) + })?; + + if removed { + let old_entry = self.decode_entry(v)?; + self.instance.updated(Some(old_entry), None); + self.merkle_todo_notify.notify_one(); + } + Ok(removed) + } + + pub(crate) fn delete_if_equal_hash( + self: &Arc<Self>, + k: &[u8], + vhash: Hash, + ) -> Result<bool, Error> { + let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if blake2sum(&cur_v[..]) == vhash { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(Some(cur_v)); + } + } + Ok(None) + })?; + + if let Some(old_v) = removed { + let old_entry = self.decode_entry(&old_v[..])?; + self.instance.updated(Some(old_entry), None); + self.merkle_todo_notify.notify_one(); + Ok(true) + } else { + Ok(false) + } + } + + // ---- Utility functions ---- + + pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { + let mut ret = p.hash().to_vec(); + ret.extend(s.sort_key()); + ret + } + + pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { + match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { + Ok(x) => Ok(x), + Err(e) => match F::try_migrate(bytes) { + Some(x) => Ok(x), + None => { + warn!("Unable to decode entry of {}: {}", self.name, e); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } + Err(e.into()) + } + }, + } + } + + pub fn gc_todo_len(&self) -> usize { + self.gc_todo.len() + } +} diff --git a/src/table/gc.rs b/src/table/gc.rs new file mode 100644 index 00000000..a37c052f --- /dev/null +++ b/src/table/gc.rs @@ -0,0 +1,248 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; + +use futures::future::join_all; +use futures::select; +use futures_util::future::*; +use tokio::sync::watch; + +use garage_util::data::*; +use garage_util::error::Error; + +use garage_rpc::membership::System; +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; + +use crate::data::*; +use crate::replication::*; +use crate::schema::*; + +const TABLE_GC_BATCH_SIZE: usize = 1024; +const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); + +pub struct TableGC<F: TableSchema, R: TableReplication> { + system: Arc<System>, + data: Arc<TableData<F, R>>, + + rpc_client: Arc<RpcClient<GcRPC>>, +} + +#[derive(Serialize, Deserialize)] +enum GcRPC { + Update(Vec<ByteBuf>), + DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), + Ok, +} + +impl RpcMessage for GcRPC {} + +impl<F, R> TableGC<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub(crate) fn launch( + system: Arc<System>, + data: Arc<TableData<F, R>>, + rpc_server: &mut RpcServer, + ) -> Arc<Self> { + let rpc_path = format!("table_{}/gc", data.name); + let rpc_client = system.rpc_client::<GcRPC>(&rpc_path); + + let gc = Arc::new(Self { + system: system.clone(), + data: data.clone(), + rpc_client, + }); + + gc.register_handler(rpc_server, rpc_path); + + let gc1 = gc.clone(); + system.background.spawn_worker( + format!("GC loop for {}", data.name), + move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit), + ); + + gc + } + + async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { + while !*must_exit.borrow() { + match self.gc_loop_iter().await { + Ok(true) => { + // Stuff was done, loop imediately + continue; + } + Ok(false) => { + // Nothing was done, sleep for some time (below) + } + Err(e) => { + warn!("({}) Error doing GC: {}", self.data.name, e); + } + } + select! { + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (), + _ = must_exit.changed().fuse() => (), + } + } + } + + async fn gc_loop_iter(&self) -> Result<bool, Error> { + let mut entries = vec![]; + let mut excluded = vec![]; + + for item in self.data.gc_todo.iter() { + let (k, vhash) = item?; + + let vhash = Hash::try_from(&vhash[..]).unwrap(); + + let v_opt = self + .data + .store + .get(&k[..])? + .filter(|v| blake2sum(&v[..]) == vhash); + + if let Some(v) = v_opt { + entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec()))); + if entries.len() >= TABLE_GC_BATCH_SIZE { + break; + } + } else { + excluded.push((k, vhash)); + } + } + + for (k, vhash) in excluded { + self.todo_remove_if_equal(&k[..], vhash)?; + } + + if entries.len() == 0 { + // Nothing to do in this iteration + return Ok(false); + } + + debug!("({}) GC: doing {} items", self.data.name, entries.len()); + + let mut partitions = HashMap::new(); + for (k, vhash, v) in entries { + let pkh = Hash::try_from(&k[..32]).unwrap(); + let mut nodes = self.data.replication.write_nodes(&pkh); + nodes.retain(|x| *x != self.system.id); + nodes.sort(); + + if !partitions.contains_key(&nodes) { + partitions.insert(nodes.clone(), vec![]); + } + partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); + } + + let resps = join_all( + partitions + .into_iter() + .map(|(nodes, items)| self.try_send_and_delete(nodes, items)), + ) + .await; + + let mut errs = vec![]; + for resp in resps { + if let Err(e) = resp { + errs.push(e); + } + } + + if errs.is_empty() { + Ok(true) + } else { + Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", "))) + } + } + + async fn try_send_and_delete( + &self, + nodes: Vec<UUID>, + items: Vec<(ByteBuf, Hash, ByteBuf)>, + ) -> Result<(), Error> { + let n_items = items.len(); + + let mut updates = vec![]; + let mut deletes = vec![]; + for (k, vhash, v) in items { + updates.push(v); + deletes.push((k, vhash)); + } + + self.rpc_client + .try_call_many( + &nodes[..], + GcRPC::Update(updates), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + ) + .await?; + + info!( + "({}) GC: {} items successfully pushed, will try to delete.", + self.data.name, n_items + ); + + self.rpc_client + .try_call_many( + &nodes[..], + GcRPC::DeleteIfEqualHash(deletes.clone()), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + ) + .await?; + + for (k, vhash) in deletes { + self.data.delete_if_equal_hash(&k[..], vhash)?; + self.todo_remove_if_equal(&k[..], vhash)?; + } + + Ok(()) + } + + fn todo_remove_if_equal(&self, key: &[u8], vhash: Hash) -> Result<(), Error> { + let _ = self + .data + .gc_todo + .compare_and_swap::<_, _, Vec<u8>>(key, Some(vhash), None)?; + Ok(()) + } + + // ---- RPC HANDLER ---- + + fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); + rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + } + + async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> { + match message { + GcRPC::Update(items) => { + self.data.update_many(items)?; + Ok(GcRPC::Ok) + } + GcRPC::DeleteIfEqualHash(items) => { + for (key, vhash) in items.iter() { + self.data.delete_if_equal_hash(&key[..], *vhash)?; + self.todo_remove_if_equal(&key[..], *vhash)?; + } + Ok(GcRPC::Ok) + } + _ => Err(Error::Message(format!("Unexpected GC RPC"))), + } + } +} diff --git a/src/table/lib.rs b/src/table/lib.rs index 704f8f1e..3b73163b 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -7,10 +7,12 @@ pub mod crdt; pub mod schema; pub mod util; +pub mod data; +pub mod gc; +pub mod merkle; +pub mod replication; +pub mod sync; pub mod table; -pub mod table_fullcopy; -pub mod table_sharded; -pub mod table_sync; pub use schema::*; pub use table::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs new file mode 100644 index 00000000..3001786f --- /dev/null +++ b/src/table/merkle.rs @@ -0,0 +1,454 @@ +use std::sync::Arc; +use std::time::Duration; + +use futures::select; +use futures_util::future::*; +use log::{debug, warn}; +use serde::{Deserialize, Serialize}; +use sled::transaction::{ + ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, +}; +use tokio::sync::watch; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_rpc::ring::*; + +use crate::data::*; +use crate::replication::*; +use crate::schema::*; + +// This modules partitions the data in 2**16 partitions, based on the top +// 16 bits (two bytes) of item's partition keys' hashes. +// It builds one Merkle tree for each of these 2**16 partitions. + +pub struct MerkleUpdater<F: TableSchema, R: TableReplication> { + data: Arc<TableData<F, R>>, + + // Content of the todo tree: items where + // - key = the key of an item in the main table, ie hash(partition_key)+sort_key + // - value = the hash of the full serialized item, if present, + // or an empty vec if item is absent (deleted) + // Fields in data: + // pub(crate) merkle_todo: sled::Tree, + // pub(crate) merkle_todo_notify: Notify, + + // Content of the merkle tree: items where + // - key = .bytes() for MerkleNodeKey + // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found + // Field in data: + // pub(crate) merkle_tree: sled::Tree, + empty_node_hash: Hash, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MerkleNodeKey { + // partition number + pub partition: Partition, + + // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) + #[serde(with = "serde_bytes")] + pub prefix: Vec<u8>, +} + +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] +pub enum MerkleNode { + // The empty Merkle node + Empty, + + // An intermediate Merkle tree node for a prefix + // Contains the hashes of the 256 possible next prefixes + Intermediate(Vec<(u8, Hash)>), + + // A final node for an item + // Contains the full key of the item and the hash of the value + Leaf(Vec<u8>, Hash), +} + +impl<F, R> MerkleUpdater<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> { + let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); + + let ret = Arc::new(Self { + data, + empty_node_hash, + }); + + let ret2 = ret.clone(); + background.spawn_worker( + format!("Merkle tree updater for {}", ret.data.name), + |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), + ); + + ret + } + + async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { + while !*must_exit.borrow() { + if let Some(x) = self.data.merkle_todo.iter().next() { + match x { + Ok((key, valhash)) => { + if let Err(e) = self.update_item(&key[..], &valhash[..]) { + warn!( + "({}) Error while updating Merkle tree item: {}", + self.data.name, e + ); + } + } + Err(e) => { + warn!( + "({}) Error while iterating on Merkle todo tree: {}", + self.data.name, e + ); + tokio::time::sleep(Duration::from_secs(10)).await; + } + } + } else { + select! { + _ = self.data.merkle_todo_notify.notified().fuse() => (), + _ = must_exit.changed().fuse() => (), + } + } + } + } + + fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { + let khash = blake2sum(k); + + let new_vhash = if vhash_by.len() == 0 { + None + } else { + Some(Hash::try_from(&vhash_by[..]).unwrap()) + }; + + let key = MerkleNodeKey { + partition: self + .data + .replication + .partition_of(&Hash::try_from(&k[0..32]).unwrap()), + prefix: vec![], + }; + self.data + .merkle_tree + .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; + + let deleted = self + .data + .merkle_todo + .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? + .is_ok(); + + if !deleted { + debug!( + "({}) Item not deleted from Merkle todo because it changed: {:?}", + self.data.name, k + ); + } + Ok(()) + } + + fn update_item_rec( + &self, + tx: &TransactionalTree, + k: &[u8], + khash: &Hash, + key: &MerkleNodeKey, + new_vhash: Option<Hash>, + ) -> ConflictableTransactionResult<Option<Hash>, Error> { + let i = key.prefix.len(); + + // Read node at current position (defined by the prefix stored in key) + // Calculate an update to apply to this node + // This update is an Option<_>, so that it is None if the update is a no-op + // and we can thus skip recalculating and re-storing everything + let mutate = match self.read_node_txn(tx, &key)? { + MerkleNode::Empty => { + if let Some(vhv) = new_vhash { + Some(MerkleNode::Leaf(k.to_vec(), vhv)) + } else { + // Nothing to do, keep empty node + None + } + } + MerkleNode::Intermediate(mut children) => { + let key2 = key.next_key(khash); + if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? { + // Subtree changed, update this node as well + if subhash == self.empty_node_hash { + intermediate_rm_child(&mut children, key2.prefix[i]); + } else { + intermediate_set_child(&mut children, key2.prefix[i], subhash); + } + + if children.len() == 0 { + // should not happen + warn!( + "({}) Replacing intermediate node with empty node, should not happen.", + self.data.name + ); + Some(MerkleNode::Empty) + } else if children.len() == 1 { + // We now have a single node (case when the update deleted one of only two + // children). If that node is a leaf, move it to this level. + let key_sub = key.add_byte(children[0].0); + let subnode = self.read_node_txn(tx, &key_sub)?; + match subnode { + MerkleNode::Empty => { + warn!("({}) Single subnode in tree is empty Merkle node", self.data.name); + Some(MerkleNode::Empty) + } + MerkleNode::Intermediate(_) => { + Some(MerkleNode::Intermediate(children)) + } + x @ MerkleNode::Leaf(_, _) => { + tx.remove(key_sub.encode())?; + Some(x) + } + } + } else { + Some(MerkleNode::Intermediate(children)) + } + } else { + // Subtree not changed, nothing to do + None + } + } + MerkleNode::Leaf(exlf_k, exlf_vhash) => { + if exlf_k == k { + // This leaf is for the same key that the one we are updating + match new_vhash { + Some(vhv) if vhv == exlf_vhash => None, + Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)), + None => Some(MerkleNode::Empty), + } + } else { + // This is an only leaf for another key + if new_vhash.is_some() { + // Move that other key to a subnode, create another subnode for our + // insertion and replace current node by an intermediary node + let mut int = vec![]; + + let exlf_khash = blake2sum(&exlf_k[..]); + assert_eq!(khash.as_slice()[..i], exlf_khash.as_slice()[..i]); + + { + let exlf_subkey = key.next_key(&exlf_khash); + let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap(); + intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash); + assert_eq!(int.len(), 1); + } + + { + let key2 = key.next_key(khash); + let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap(); + intermediate_set_child(&mut int, key2.prefix[i], subhash); + if exlf_khash.as_slice()[i] == khash.as_slice()[i] { + assert_eq!(int.len(), 1); + } else { + assert_eq!(int.len(), 2); + } + } + Some(MerkleNode::Intermediate(int)) + } else { + // Nothing to do, we don't want to insert this value because it is None, + // and we don't want to change the other value because it's for something + // else + None + } + } + } + }; + + if let Some(new_node) = mutate { + let hash = self.put_node_txn(tx, &key, &new_node)?; + Ok(Some(hash)) + } else { + Ok(None) + } + } + + // Merkle tree node manipulation + + fn read_node_txn( + &self, + tx: &TransactionalTree, + k: &MerkleNodeKey, + ) -> ConflictableTransactionResult<MerkleNode, Error> { + let ent = tx.get(k.encode())?; + MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) + } + + fn put_node_txn( + &self, + tx: &TransactionalTree, + k: &MerkleNodeKey, + v: &MerkleNode, + ) -> ConflictableTransactionResult<Hash, Error> { + trace!("Put Merkle node: {:?} => {:?}", k, v); + if *v == MerkleNode::Empty { + tx.remove(k.encode())?; + Ok(self.empty_node_hash) + } else { + let vby = rmp_to_vec_all_named(v) + .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let rethash = blake2sum(&vby[..]); + tx.insert(k.encode(), vby)?; + Ok(rethash) + } + } + + // Access a node in the Merkle tree, used by the sync protocol + pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> { + let ent = self.data.merkle_tree.get(k.encode())?; + MerkleNode::decode_opt(ent) + } + + pub fn merkle_tree_len(&self) -> usize { + self.data.merkle_tree.len() + } + + pub fn todo_len(&self) -> usize { + self.data.merkle_todo.len() + } +} + +impl MerkleNodeKey { + fn encode(&self) -> Vec<u8> { + let mut ret = Vec::with_capacity(2 + self.prefix.len()); + ret.extend(&u16::to_be_bytes(self.partition)[..]); + ret.extend(&self.prefix[..]); + ret + } + + pub fn next_key(&self, h: &Hash) -> Self { + assert_eq!(h.as_slice()[0..self.prefix.len()], self.prefix[..]); + let mut s2 = self.clone(); + s2.prefix.push(h.as_slice()[self.prefix.len()]); + s2 + } + + pub fn add_byte(&self, b: u8) -> Self { + let mut s2 = self.clone(); + s2.prefix.push(b); + s2 + } +} + +impl MerkleNode { + fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> { + match ent { + None => Ok(MerkleNode::Empty), + Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), + } + } + + pub fn is_empty(&self) -> bool { + *self == MerkleNode::Empty + } +} + +fn intermediate_set_child(ch: &mut Vec<(u8, Hash)>, pos: u8, v: Hash) { + for i in 0..ch.len() { + if ch[i].0 == pos { + ch[i].1 = v; + return; + } else if ch[i].0 > pos { + ch.insert(i, (pos, v)); + return; + } + } + ch.push((pos, v)); +} + +fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) { + for i in 0..ch.len() { + if ch[i].0 == pos { + ch.remove(i); + return; + } + } +} + +#[test] +fn test_intermediate_aux() { + let mut v = vec![]; + + intermediate_set_child(&mut v, 12u8, [12u8; 32].into()); + assert_eq!(v, vec![(12u8, [12u8; 32].into())]); + + intermediate_set_child(&mut v, 42u8, [42u8; 32].into()); + assert_eq!( + v, + vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())] + ); + + intermediate_set_child(&mut v, 4u8, [4u8; 32].into()); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (12u8, [12u8; 32].into()), + (42u8, [42u8; 32].into()) + ] + ); + + intermediate_set_child(&mut v, 12u8, [8u8; 32].into()); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (12u8, [8u8; 32].into()), + (42u8, [42u8; 32].into()) + ] + ); + + intermediate_set_child(&mut v, 6u8, [6u8; 32].into()); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [6u8; 32].into()), + (12u8, [8u8; 32].into()), + (42u8, [42u8; 32].into()) + ] + ); + + intermediate_rm_child(&mut v, 42u8); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [6u8; 32].into()), + (12u8, [8u8; 32].into()) + ] + ); + + intermediate_rm_child(&mut v, 11u8); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [6u8; 32].into()), + (12u8, [8u8; 32].into()) + ] + ); + + intermediate_rm_child(&mut v, 6u8); + assert_eq!(v, vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]); + + intermediate_set_child(&mut v, 6u8, [7u8; 32].into()); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [7u8; 32].into()), + (12u8, [8u8; 32].into()) + ] + ); +} diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs new file mode 100644 index 00000000..bd658f63 --- /dev/null +++ b/src/table/replication/fullcopy.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; + +use garage_rpc::membership::System; +use garage_rpc::ring::*; +use garage_util::data::*; + +use crate::replication::*; + +#[derive(Clone)] +pub struct TableFullReplication { + pub system: Arc<System>, + pub max_faults: usize, +} + +impl TableReplication for TableFullReplication { + // Full replication schema: all nodes store everything + // Writes are disseminated in an epidemic manner in the network + + // Advantage: do all reads locally, extremely fast + // Inconvenient: only suitable to reasonably small tables + + fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> { + vec![self.system.id] + } + fn read_quorum(&self) -> usize { + 1 + } + + fn write_nodes(&self, _hash: &Hash) -> Vec<UUID> { + let ring = self.system.ring.borrow(); + ring.config.members.keys().cloned().collect::<Vec<_>>() + } + fn write_quorum(&self) -> usize { + let nmembers = self.system.ring.borrow().config.members.len(); + if nmembers > self.max_faults { + nmembers - self.max_faults + } else { + 1 + } + } + fn max_write_errors(&self) -> usize { + self.max_faults + } + + fn partition_of(&self, _hash: &Hash) -> Partition { + 0u16 + } + fn partitions(&self) -> Vec<(Partition, Hash)> { + vec![(0u16, [0u8; 32].into())] + } +} diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs new file mode 100644 index 00000000..d43d7f19 --- /dev/null +++ b/src/table/replication/mod.rs @@ -0,0 +1,6 @@ +mod parameters; + +pub mod fullcopy; +pub mod sharded; + +pub use parameters::*; diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs new file mode 100644 index 00000000..e46bd172 --- /dev/null +++ b/src/table/replication/parameters.rs @@ -0,0 +1,21 @@ +use garage_rpc::ring::*; + +use garage_util::data::*; + +pub trait TableReplication: Send + Sync { + // See examples in table_sharded.rs and table_fullcopy.rs + // To understand various replication methods + + // Which nodes to send reads from + fn read_nodes(&self, hash: &Hash) -> Vec<UUID>; + fn read_quorum(&self) -> usize; + + // Which nodes to send writes to + fn write_nodes(&self, hash: &Hash) -> Vec<UUID>; + fn write_quorum(&self) -> usize; + fn max_write_errors(&self) -> usize; + + // Accessing partitions, for Merkle tree & sync + fn partition_of(&self, hash: &Hash) -> Partition; + fn partitions(&self) -> Vec<(Partition, Hash)>; +} diff --git a/src/table/table_sharded.rs b/src/table/replication/sharded.rs index 098637dd..dce74b03 100644 --- a/src/table/table_sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,11 +1,14 @@ +use std::sync::Arc; + use garage_rpc::membership::System; -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_util::data::*; -use crate::*; +use crate::replication::*; #[derive(Clone)] pub struct TableShardedReplication { + pub system: Arc<System>, pub replication_factor: usize, pub read_quorum: usize, pub write_quorum: usize, @@ -19,35 +22,29 @@ impl TableReplication for TableShardedReplication { // - reads are done on all of the nodes that replicate the data // - writes as well - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> { - let ring = system.ring.borrow().clone(); + fn read_nodes(&self, hash: &Hash) -> Vec<UUID> { + let ring = self.system.ring.borrow().clone(); ring.walk_ring(&hash, self.replication_factor) } fn read_quorum(&self) -> usize { self.read_quorum } - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> { - let ring = system.ring.borrow().clone(); + fn write_nodes(&self, hash: &Hash) -> Vec<UUID> { + let ring = self.system.ring.borrow(); ring.walk_ring(&hash, self.replication_factor) } - fn write_quorum(&self, _system: &System) -> usize { + fn write_quorum(&self) -> usize { self.write_quorum } fn max_write_errors(&self) -> usize { self.replication_factor - self.write_quorum } - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> { - ring.walk_ring(&hash, self.replication_factor) + fn partition_of(&self, hash: &Hash) -> Partition { + self.system.ring.borrow().partition_of(hash) } - fn split_points(&self, ring: &Ring) -> Vec<Hash> { - let mut ret = vec![]; - - for entry in ring.ring.iter() { - ret.push(entry.location); - } - ret.push([0xFFu8; 32].into()); - ret + fn partitions(&self) -> Vec<(Partition, Hash)> { + self.system.ring.borrow().partitions() } } diff --git a/src/table/schema.rs b/src/table/schema.rs index edd04000..4d754664 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -2,13 +2,15 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; +use crate::crdt::CRDT; + pub trait PartitionKey { fn hash(&self) -> Hash; } impl PartitionKey for String { fn hash(&self) -> Hash { - sha256sum(self.as_bytes()) + blake2sum(self.as_bytes()) } } @@ -35,12 +37,14 @@ impl SortKey for Hash { } pub trait Entry<P: PartitionKey, S: SortKey>: - PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; - fn merge(&mut self, other: &Self); + fn is_tombstone(&self) -> bool { + false + } } pub trait TableSchema: Send + Sync { diff --git a/src/table/sync.rs b/src/table/sync.rs new file mode 100644 index 00000000..3130abe8 --- /dev/null +++ b/src/table/sync.rs @@ -0,0 +1,614 @@ +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use futures::select; +use futures_util::future::*; +use futures_util::stream::*; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; +use tokio::sync::{mpsc, watch}; + +use garage_util::data::*; +use garage_util::error::Error; + +use garage_rpc::membership::System; +use garage_rpc::ring::*; +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; + +use crate::data::*; +use crate::merkle::*; +use crate::replication::*; +use crate::*; + +const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); + +// Do anti-entropy every 10 minutes +const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); + +pub struct TableSyncer<F: TableSchema, R: TableReplication> { + system: Arc<System>, + data: Arc<TableData<F, R>>, + merkle: Arc<MerkleUpdater<F, R>>, + + todo: Mutex<SyncTodo>, + rpc_client: Arc<RpcClient<SyncRPC>>, +} + +#[derive(Serialize, Deserialize)] +pub(crate) enum SyncRPC { + RootCkHash(Partition, Hash), + RootCkDifferent(bool), + GetNode(MerkleNodeKey), + Node(MerkleNodeKey, MerkleNode), + Items(Vec<Arc<ByteBuf>>), + Ok, +} + +impl RpcMessage for SyncRPC {} + +struct SyncTodo { + todo: Vec<TodoPartition>, +} + +#[derive(Debug, Clone)] +struct TodoPartition { + partition: Partition, + begin: Hash, + end: Hash, + + // Are we a node that stores this partition or not? + retain: bool, +} + +impl<F, R> TableSyncer<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub(crate) fn launch( + system: Arc<System>, + data: Arc<TableData<F, R>>, + merkle: Arc<MerkleUpdater<F, R>>, + rpc_server: &mut RpcServer, + ) -> Arc<Self> { + let rpc_path = format!("table_{}/sync", data.name); + let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path); + + let todo = SyncTodo { todo: vec![] }; + + let syncer = Arc::new(Self { + system: system.clone(), + data: data.clone(), + merkle, + todo: Mutex::new(todo), + rpc_client, + }); + + syncer.register_handler(rpc_server, rpc_path); + + let (busy_tx, busy_rx) = mpsc::unbounded_channel(); + + let s1 = syncer.clone(); + system.background.spawn_worker( + format!("table sync watcher for {}", data.name), + move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), + ); + + let s2 = syncer.clone(); + system.background.spawn_worker( + format!("table syncer for {}", data.name), + move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), + ); + + let s3 = syncer.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(20)).await; + s3.add_full_sync(); + }); + + syncer + } + + fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); + rpc_server.add_handler::<SyncRPC, _, _>(path, move |msg, _addr| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + } + + async fn watcher_task( + self: Arc<Self>, + mut must_exit: watch::Receiver<bool>, + mut busy_rx: mpsc::UnboundedReceiver<bool>, + ) { + let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone(); + let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone(); + let mut nothing_to_do_since = Some(Instant::now()); + + while !*must_exit.borrow() { + select! { + _ = ring_recv.changed().fuse() => { + let new_ring = ring_recv.borrow(); + if !Arc::ptr_eq(&new_ring, &prev_ring) { + debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); + self.add_full_sync(); + prev_ring = new_ring.clone(); + } + } + busy_opt = busy_rx.recv().fuse() => { + if let Some(busy) = busy_opt { + if busy { + nothing_to_do_since = None; + } else { + if nothing_to_do_since.is_none() { + nothing_to_do_since = Some(Instant::now()); + } + } + } + } + _ = must_exit.changed().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { + if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { + nothing_to_do_since = None; + debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name); + self.add_full_sync(); + } + } + } + } + } + + pub fn add_full_sync(&self) { + self.todo + .lock() + .unwrap() + .add_full_sync(&self.data, &self.system); + } + + async fn syncer_task( + self: Arc<Self>, + mut must_exit: watch::Receiver<bool>, + busy_tx: mpsc::UnboundedSender<bool>, + ) { + while !*must_exit.borrow() { + let task = self.todo.lock().unwrap().pop_task(); + if let Some(partition) = task { + busy_tx.send(true).unwrap(); + let res = self + .clone() + .sync_partition(&partition, &mut must_exit) + .await; + if let Err(e) = res { + warn!( + "({}) Error while syncing {:?}: {}", + self.data.name, partition, e + ); + } + } else { + busy_tx.send(false).unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + + async fn sync_partition( + self: Arc<Self>, + partition: &TodoPartition, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<(), Error> { + if partition.retain { + let my_id = self.system.id; + + let nodes = self + .data + .replication + .write_nodes(&partition.begin) + .into_iter() + .filter(|node| *node != my_id) + .collect::<Vec<_>>(); + + debug!( + "({}) Syncing {:?} with {:?}...", + self.data.name, partition, nodes + ); + let mut sync_futures = nodes + .iter() + .map(|node| { + self.clone() + .do_sync_with(partition.clone(), *node, must_exit.clone()) + }) + .collect::<FuturesUnordered<_>>(); + + let mut n_errors = 0; + while let Some(r) = sync_futures.next().await { + if let Err(e) = r { + n_errors += 1; + warn!("({}) Sync error: {}", self.data.name, e); + } + } + if n_errors > self.data.replication.max_write_errors() { + return Err(Error::Message(format!( + "Sync failed with too many nodes (should have been: {:?}).", + nodes + ))); + } + } else { + self.offload_partition(&partition.begin, &partition.end, must_exit) + .await?; + } + + Ok(()) + } + + // Offload partition: this partition is not something we are storing, + // so send it out to all other nodes that store it and delete items locally. + // We don't bother checking if the remote nodes already have the items, + // we just batch-send everything. Offloading isn't supposed to happen very often. + // If any of the nodes that are supposed to store the items is unable to + // save them, we interrupt the process. + async fn offload_partition( + self: &Arc<Self>, + begin: &Hash, + end: &Hash, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<(), Error> { + let mut counter: usize = 0; + + while !*must_exit.borrow() { + let mut items = Vec::new(); + + for item in self.data.store.range(begin.to_vec()..end.to_vec()) { + let (key, value) = item?; + items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); + + if items.len() >= 1024 { + break; + } + } + + if items.len() > 0 { + let nodes = self + .data + .replication + .write_nodes(&begin) + .into_iter() + .collect::<Vec<_>>(); + if nodes.contains(&self.system.id) { + warn!( + "({}) Interrupting offload as partitions seem to have changed", + self.data.name + ); + break; + } + if nodes.len() < self.data.replication.write_quorum() { + return Err(Error::Message(format!( + "Not offloading as we don't have a quorum of nodes to write to." + ))); + } + + counter += 1; + info!( + "({}) Offloading {} items from {:?}..{:?} ({})", + self.data.name, + items.len(), + begin, + end, + counter + ); + self.offload_items(&items, &nodes[..]).await?; + } else { + break; + } + } + + Ok(()) + } + + async fn offload_items( + self: &Arc<Self>, + items: &Vec<(Vec<u8>, Arc<ByteBuf>)>, + nodes: &[UUID], + ) -> Result<(), Error> { + let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); + + self.rpc_client + .try_call_many( + &nodes[..], + SyncRPC::Items(values), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT), + ) + .await?; + + // All remote nodes have written those items, now we can delete them locally + let mut not_removed = 0; + for (k, v) in items.iter() { + if !self.data.delete_if_equal(&k[..], &v[..])? { + not_removed += 1; + } + } + + if not_removed > 0 { + debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed); + } + + Ok(()) + } + + // ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ====== + // The driver side is only concerned with sending out the item it has + // and the other side might not have. Receiving items that differ from one + // side to the other will happen when the other side syncs with us, + // which they also do regularly. + + fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> { + let key = MerkleNodeKey { + partition, + prefix: vec![], + }; + let node = self.merkle.read_node(&key)?; + Ok((key, node)) + } + + async fn do_sync_with( + self: Arc<Self>, + partition: TodoPartition, + who: UUID, + must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { + let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; + if root_ck.is_empty() { + debug!( + "({}) Sync {:?} with {:?}: partition is empty.", + self.data.name, partition, who + ); + return Ok(()); + } + let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?; + + // Check if they have the same root checksum + // If so, do nothing. + let root_resp = self + .rpc_client + .call( + who, + SyncRPC::RootCkHash(partition.partition, root_ck_hash), + TABLE_SYNC_RPC_TIMEOUT, + ) + .await?; + + let mut todo = match root_resp { + SyncRPC::RootCkDifferent(false) => { + debug!( + "({}) Sync {:?} with {:?}: no difference", + self.data.name, partition, who + ); + return Ok(()); + } + SyncRPC::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]), + x => { + return Err(Error::Message(format!( + "Invalid respone to RootCkHash RPC: {}", + debug_serialize(x) + ))); + } + }; + + let mut todo_items = vec![]; + + while !todo.is_empty() && !*must_exit.borrow() { + let key = todo.pop_front().unwrap(); + let node = self.merkle.read_node(&key)?; + + match node { + MerkleNode::Empty => { + // They have items we don't have. + // We don't request those items from them, they will send them. + // We only bother with pushing items that differ + } + MerkleNode::Leaf(ik, ivhash) => { + // Just send that item directly + if let Some(val) = self.data.store.get(&ik[..])? { + if blake2sum(&val[..]) != ivhash { + warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); + } + todo_items.push(val.to_vec()); + } else { + warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); + } + } + MerkleNode::Intermediate(l) => { + // Get Merkle node for this tree position at remote node + // and compare it with local node + let remote_node = match self + .rpc_client + .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT) + .await? + { + SyncRPC::Node(_, node) => node, + x => { + return Err(Error::Message(format!( + "Invalid respone to GetNode RPC: {}", + debug_serialize(x) + ))); + } + }; + let int_l2 = match remote_node { + // If they have an intermediate node at this tree position, + // we can compare them to find differences + MerkleNode::Intermediate(l2) => l2, + // Otherwise, treat it as if they have nothing for this subtree, + // which will have the consequence of sending them everything + _ => vec![], + }; + + let join = join_ordered(&l[..], &int_l2[..]); + for (p, v1, v2) in join.into_iter() { + let diff = match (v1, v2) { + (Some(_), None) | (None, Some(_)) => true, + (Some(a), Some(b)) => a != b, + _ => false, + }; + if diff { + todo.push_back(key.add_byte(*p)); + } + } + } + } + + if todo_items.len() >= 256 { + self.send_items(who, std::mem::replace(&mut todo_items, vec![])) + .await?; + } + } + + if !todo_items.is_empty() { + self.send_items(who, todo_items).await?; + } + + Ok(()) + } + + async fn send_items(&self, who: UUID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { + info!( + "({}) Sending {} items to {:?}", + self.data.name, + item_value_list.len(), + who + ); + + let values = item_value_list + .into_iter() + .map(|x| Arc::new(ByteBuf::from(x))) + .collect::<Vec<_>>(); + + let rpc_resp = self + .rpc_client + .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT) + .await?; + if let SyncRPC::Ok = rpc_resp { + Ok(()) + } else { + Err(Error::Message(format!( + "Unexpected response to RPC Update: {}", + debug_serialize(&rpc_resp) + ))) + } + } + + // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== + + async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> { + match message { + SyncRPC::RootCkHash(range, h) => { + let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; + let hash = hash_of::<MerkleNode>(&root_ck)?; + Ok(SyncRPC::RootCkDifferent(hash != *h)) + } + SyncRPC::GetNode(k) => { + let node = self.merkle.read_node(&k)?; + Ok(SyncRPC::Node(k.clone(), node)) + } + SyncRPC::Items(items) => { + self.data.update_many(items)?; + Ok(SyncRPC::Ok) + } + _ => Err(Error::Message(format!("Unexpected sync RPC"))), + } + } +} + +impl SyncTodo { + fn add_full_sync<F: TableSchema, R: TableReplication>( + &mut self, + data: &TableData<F, R>, + system: &System, + ) { + let my_id = system.id; + + self.todo.clear(); + + let partitions = data.replication.partitions(); + + for i in 0..partitions.len() { + let begin = partitions[i].1; + + let end = if i + 1 < partitions.len() { + partitions[i + 1].1 + } else { + [0xFFu8; 32].into() + }; + + let nodes = data.replication.write_nodes(&begin); + + let retain = nodes.contains(&my_id); + if !retain { + // Check if we have some data to send, otherwise skip + if data.store.range(begin..end).next().is_none() { + continue; + } + } + + self.todo.push(TodoPartition { + partition: partitions[i].0, + begin, + end, + retain, + }); + } + } + + fn pop_task(&mut self) -> Option<TodoPartition> { + if self.todo.is_empty() { + return None; + } + + let i = rand::thread_rng().gen_range(0..self.todo.len()); + if i == self.todo.len() - 1 { + self.todo.pop() + } else { + let replacement = self.todo.pop().unwrap(); + let ret = std::mem::replace(&mut self.todo[i], replacement); + Some(ret) + } + } +} + +fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { + Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) +} + +fn join_ordered<'a, K: Ord + Eq, V1, V2>( + x: &'a [(K, V1)], + y: &'a [(K, V2)], +) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> { + let mut ret = vec![]; + let mut i = 0; + let mut j = 0; + while i < x.len() || j < y.len() { + if i < x.len() && j < y.len() && x[i].0 == y[j].0 { + ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1))); + i += 1; + j += 1; + } else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) { + ret.push((&x[i].0, Some(&x[i].1), None)); + i += 1; + } else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) { + ret.push((&y[j].0, None, Some(&y[j].1))); + j += 1; + } else { + unreachable!(); + } + } + ret +} diff --git a/src/table/table.rs b/src/table/table.rs index 1f6b7d25..e203b178 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,9 +2,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; -use log::warn; - -use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -13,25 +10,25 @@ use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; +use crate::crdt::CRDT; +use crate::data::*; +use crate::gc::*; +use crate::merkle::*; +use crate::replication::*; use crate::schema::*; -use crate::table_sync::*; +use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct Table<F: TableSchema, R: TableReplication> { - pub instance: F, - pub replication: R, - - pub name: String, - pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>, - pub system: Arc<System>, - pub store: sled::Tree, - pub syncer: ArcSwapOption<TableSyncer<F, R>>, + pub data: Arc<TableData<F, R>>, + pub merkle_updater: Arc<MerkleUpdater<F, R>>, + pub syncer: Arc<TableSyncer<F, R>>, + rpc_client: Arc<RpcClient<TableRPC<F>>>, } #[derive(Serialize, Deserialize)] @@ -45,30 +42,10 @@ pub(crate) enum TableRPC<F: TableSchema> { ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), Update(Vec<Arc<ByteBuf>>), - - SyncRPC(SyncRPC), } impl<F: TableSchema> RpcMessage for TableRPC<F> {} -pub trait TableReplication: Send + Sync { - // See examples in table_sharded.rs and table_fullcopy.rs - // To understand various replication methods - - // Which nodes to send reads from - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; - fn read_quorum(&self) -> usize; - - // Which nodes to send writes to - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; - fn write_quorum(&self, system: &System) -> usize; - fn max_write_errors(&self) -> usize; - - // Which are the nodes that do actually replicate the data - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>; - fn split_points(&self, ring: &Ring) -> Vec<Hash>; -} - impl<F, R> Table<F, R> where F: TableSchema + 'static, @@ -76,7 +53,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub async fn new( + pub fn new( instance: F, replication: R, system: Arc<System>, @@ -84,31 +61,37 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc<Self> { - let store = db.open_tree(&name).expect("Unable to open DB tree"); - let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); + let data = TableData::new(system.clone(), name, instance, replication, db); + + let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); + + let syncer = TableSyncer::launch( + system.clone(), + data.clone(), + merkle_updater.clone(), + rpc_server, + ); + TableGC::launch(system.clone(), data.clone(), rpc_server); + let table = Arc::new(Self { - instance, - replication, - name, - rpc_client, system, - store, - syncer: ArcSwapOption::from(None), + data, + merkle_updater, + syncer, + rpc_client, }); - table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()).await; - table.syncer.swap(Some(syncer)); + table.clone().register_handler(rpc_server, rpc_path); table } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -118,7 +101,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) + RequestStrategy::with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -130,7 +113,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -154,7 +137,7 @@ where errors.push(e); } } - if errors.len() > self.replication.max_write_errors() { + if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -167,7 +150,7 @@ where sort_key: &F::S, ) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -176,7 +159,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -187,7 +170,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = self.decode_entry(v_bytes.as_slice())?; + let v = self.data.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -223,7 +206,7 @@ where limit: usize, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.data.replication.read_nodes(&hash); let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); @@ -232,7 +215,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -243,8 +226,8 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = self.decode_entry(entry_bytes.as_slice())?; - let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + let entry = self.data.decode_entry(entry_bytes.as_slice())?; + let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { ret.insert(entry_key, Some(entry)); @@ -313,146 +296,18 @@ where async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { - let value = self.handle_read_entry(key, sort_key)?; + let value = self.data.read_entry(key, sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; + let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs).await?; + self.data.update_many(pairs)?; Ok(TableRPC::Ok) } - TableRPC::SyncRPC(rpc) => { - let syncer = self.syncer.load_full().unwrap(); - let response = syncer - .handle_rpc(rpc, self.system.background.stop_signal.clone()) - .await?; - Ok(TableRPC::SyncRPC(response)) - } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), } } - - fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { - let tree_key = self.tree_key(p, s); - if let Some(bytes) = self.store.get(&tree_key)? { - Ok(Some(ByteBuf::from(bytes.to_vec()))) - } else { - Ok(None) - } - } - - fn handle_read_range( - &self, - p: &F::P, - s: &Option<F::S>, - filter: &Option<F::Filter>, - limit: usize, - ) -> Result<Vec<Arc<ByteBuf>>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; - let mut ret = vec![]; - for item in self.store.range(first_key..) { - let (key, value) = item?; - if &key[..32] != partition_hash.as_slice() { - break; - } - let keep = match filter { - None => true, - Some(f) => { - let entry = self.decode_entry(value.as_ref())?; - F::matches_filter(&entry, f) - } - }; - if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); - } - if ret.len() >= limit { - break; - } - } - Ok(ret) - } - - pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); - - for update_bytes in entries.iter() { - let update = self.decode_entry(update_bytes.as_slice())?; - - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let (old_entry, new_entry) = self.store.transaction(|db| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; - - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, new_entry)) - })?; - - if old_entry.as_ref() != Some(&new_entry) { - self.instance.updated(old_entry, Some(new_entry)); - syncer.invalidate(&tree_key[..]); - } - } - - Ok(()) - } - - pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = self.store.transaction(|txn| { - if let Some(cur_v) = txn.get(k)? { - if cur_v == v { - txn.remove(k)?; - return Ok(true); - } - } - Ok(false) - })?; - if removed { - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); - self.syncer.load_full().unwrap().invalidate(k); - } - Ok(removed) - } - - fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { - let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); - ret - } - - fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => { - warn!("Unable to decode entry of {}: {}", self.name, e); - for line in hexdump::hexdump_iter(bytes) { - debug!("{}", line); - } - Err(e.into()) - } - }, - } - } } diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs deleted file mode 100644 index c55879d8..00000000 --- a/src/table/table_fullcopy.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::sync::Arc; - -use garage_rpc::membership::System; -use garage_rpc::ring::Ring; -use garage_util::data::*; - -use crate::*; - -#[derive(Clone)] -pub struct TableFullReplication { - pub max_faults: usize, -} - -#[derive(Clone)] -struct Neighbors { - ring: Arc<Ring>, - neighbors: Vec<UUID>, -} - -impl TableFullReplication { - pub fn new(max_faults: usize) -> Self { - TableFullReplication { max_faults } - } -} - -impl TableReplication for TableFullReplication { - // Full replication schema: all nodes store everything - // Writes are disseminated in an epidemic manner in the network - - // Advantage: do all reads locally, extremely fast - // Inconvenient: only suitable to reasonably small tables - - fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> { - vec![system.id] - } - fn read_quorum(&self) -> usize { - 1 - } - - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> { - self.replication_nodes(hash, system.ring.borrow().as_ref()) - } - fn write_quorum(&self, system: &System) -> usize { - system.ring.borrow().config.members.len() - self.max_faults - } - fn max_write_errors(&self) -> usize { - self.max_faults - } - - fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec<UUID> { - ring.config.members.keys().cloned().collect::<Vec<_>>() - } - fn split_points(&self, _ring: &Ring) -> Vec<Hash> { - let mut ret = vec![]; - ret.push([0u8; 32].into()); - ret.push([0xFFu8; 32].into()); - ret - } -} diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs deleted file mode 100644 index 5fa6793b..00000000 --- a/src/table/table_sync.rs +++ /dev/null @@ -1,891 +0,0 @@ -use rand::Rng; -use std::collections::{BTreeMap, VecDeque}; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; - -use futures::future::join_all; -use futures::{pin_mut, select}; -use futures_util::future::*; -use futures_util::stream::*; -use serde::{Deserialize, Serialize}; -use serde_bytes::ByteBuf; -use tokio::sync::{mpsc, watch}; - -use garage_rpc::ring::Ring; -use garage_util::data::*; -use garage_util::error::Error; - -use crate::*; - -const MAX_DEPTH: usize = 16; -const SCAN_INTERVAL: Duration = Duration::from_secs(3600); -const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); - -pub struct TableSyncer<F: TableSchema, R: TableReplication> { - table: Arc<Table<F, R>>, - todo: Mutex<SyncTodo>, - cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>, -} - -#[derive(Serialize, Deserialize)] -pub(crate) enum SyncRPC { - GetRootChecksumRange(Hash, Hash), - RootChecksumRange(SyncRange), - Checksums(Vec<RangeChecksum>), - Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>), -} - -struct SyncTodo { - todo: Vec<TodoPartition>, -} - -#[derive(Debug, Clone)] -struct TodoPartition { - // Partition consists in hashes between begin included and end excluded - begin: Hash, - end: Hash, - - // Are we a node that stores this partition or not? - retain: bool, -} - -// A SyncRange defines a query on the dataset stored by a node, in the following way: -// - all items whose key are >= `begin` -// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded) -// - except if the first item of the range has such many leading zero bytes -// - and stopping at `end` (excluded) if such an item is not found -// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges" -// i.e. of ranges of level `level-1` that cover the same range -// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin) -// See RangeChecksum for the struct that stores this information. -#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] -pub(crate) struct SyncRange { - begin: Vec<u8>, - end: Vec<u8>, - level: usize, -} - -impl std::cmp::PartialOrd for SyncRange { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} -impl std::cmp::Ord for SyncRange { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.begin - .cmp(&other.begin) - .then(self.level.cmp(&other.level)) - .then(self.end.cmp(&other.end)) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct RangeChecksum { - bounds: SyncRange, - children: Vec<(SyncRange, Hash)>, - found_limit: Option<Vec<u8>>, - - #[serde(skip, default = "std::time::Instant::now")] - time: Instant, -} - -#[derive(Debug, Clone)] -struct RangeChecksumCache { - hash: Option<Hash>, // None if no children - found_limit: Option<Vec<u8>>, - time: Instant, -} - -impl<F, R> TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { - let todo = SyncTodo { todo: Vec::new() }; - let syncer = Arc::new(TableSyncer { - table: table.clone(), - todo: Mutex::new(todo), - cache: (0..MAX_DEPTH) - .map(|_| Mutex::new(BTreeMap::new())) - .collect::<Vec<_>>(), - }); - - let (busy_tx, busy_rx) = mpsc::unbounded_channel(); - - let s1 = syncer.clone(); - table - .system - .background - .spawn_worker( - format!("table sync watcher for {}", table.name), - move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), - ) - .await; - - let s2 = syncer.clone(); - table - .system - .background - .spawn_worker( - format!("table syncer for {}", table.name), - move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), - ) - .await; - - let s3 = syncer.clone(); - tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(20)).await; - s3.add_full_scan().await; - }); - - syncer - } - - async fn watcher_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - mut busy_rx: mpsc::UnboundedReceiver<bool>, - ) -> Result<(), Error> { - let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone(); - let mut nothing_to_do_since = Some(Instant::now()); - - while !*must_exit.borrow() { - let s_ring_recv = ring_recv.recv().fuse(); - let s_busy = busy_rx.recv().fuse(); - let s_must_exit = must_exit.recv().fuse(); - let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); - pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); - - select! { - new_ring_r = s_ring_recv => { - if let Some(new_ring) = new_ring_r { - debug!("({}) Adding ring difference to syncer todo list", self.table.name); - self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring); - prev_ring = new_ring; - } - } - busy_opt = s_busy => { - if let Some(busy) = busy_opt { - if busy { - nothing_to_do_since = None; - } else { - if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } - } - } - } - must_exit_v = s_must_exit => { - if must_exit_v.unwrap_or(false) { - break; - } - } - _ = s_timeout => { - if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { - nothing_to_do_since = None; - debug!("({}) Adding full scan to syncer todo list", self.table.name); - self.add_full_scan().await; - } - } - } - } - Ok(()) - } - - pub async fn add_full_scan(&self) { - self.todo.lock().unwrap().add_full_scan(&self.table); - } - - async fn syncer_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - busy_tx: mpsc::UnboundedSender<bool>, - ) -> Result<(), Error> { - while !*must_exit.borrow() { - let task = self.todo.lock().unwrap().pop_task(); - if let Some(partition) = task { - busy_tx.send(true)?; - let res = self - .clone() - .sync_partition(&partition, &mut must_exit) - .await; - if let Err(e) = res { - warn!( - "({}) Error while syncing {:?}: {}", - self.table.name, partition, e - ); - } - } else { - busy_tx.send(false)?; - tokio::time::delay_for(Duration::from_secs(1)).await; - } - } - Ok(()) - } - - async fn sync_partition( - self: Arc<Self>, - partition: &TodoPartition, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<(), Error> { - if partition.retain { - let my_id = self.table.system.id; - let nodes = self - .table - .replication - .write_nodes(&partition.begin, &self.table.system) - .into_iter() - .filter(|node| *node != my_id) - .collect::<Vec<_>>(); - - debug!( - "({}) Preparing to sync {:?} with {:?}...", - self.table.name, partition, nodes - ); - let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?; - - let mut sync_futures = nodes - .iter() - .map(|node| { - self.clone().do_sync_with( - partition.clone(), - root_cks.clone(), - *node, - must_exit.clone(), - ) - }) - .collect::<FuturesUnordered<_>>(); - - let mut n_errors = 0; - while let Some(r) = sync_futures.next().await { - if let Err(e) = r { - n_errors += 1; - warn!("({}) Sync error: {}", self.table.name, e); - } - } - if n_errors > self.table.replication.max_write_errors() { - return Err(Error::Message(format!( - "Sync failed with too many nodes (should have been: {:?}).", - nodes - ))); - } - } else { - self.offload_partition(&partition.begin, &partition.end, must_exit) - .await?; - } - - Ok(()) - } - - // Offload partition: this partition is not something we are storing, - // so send it out to all other nodes that store it and delete items locally. - // We don't bother checking if the remote nodes already have the items, - // we just batch-send everything. Offloading isn't supposed to happen very often. - // If any of the nodes that are supposed to store the items is unable to - // save them, we interrupt the process. - async fn offload_partition( - self: &Arc<Self>, - begin: &Hash, - end: &Hash, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<(), Error> { - let mut counter: usize = 0; - - while !*must_exit.borrow() { - let mut items = Vec::new(); - - for item in self.table.store.range(begin.to_vec()..end.to_vec()) { - let (key, value) = item?; - items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); - - if items.len() >= 1024 { - break; - } - } - - if items.len() > 0 { - let nodes = self - .table - .replication - .write_nodes(&begin, &self.table.system) - .into_iter() - .collect::<Vec<_>>(); - if nodes.contains(&self.table.system.id) { - warn!("Interrupting offload as partitions seem to have changed"); - break; - } - - counter += 1; - debug!( - "Offloading {} items from {:?}..{:?} ({})", - items.len(), - begin, - end, - counter - ); - self.offload_items(&items, &nodes[..]).await?; - } else { - break; - } - } - - Ok(()) - } - - async fn offload_items( - self: &Arc<Self>, - items: &Vec<(Vec<u8>, Arc<ByteBuf>)>, - nodes: &[UUID], - ) -> Result<(), Error> { - let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); - let update_msg = Arc::new(TableRPC::<F>::Update(values)); - - for res in join_all(nodes.iter().map(|to| { - self.table - .rpc_client - .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) - })) - .await - { - res?; - } - - // All remote nodes have written those items, now we can delete them locally - let mut not_removed = 0; - for (k, v) in items.iter() { - if !self.table.delete_if_equal(&k[..], &v[..])? { - not_removed += 1; - } - } - - if not_removed > 0 { - debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed); - } - - Ok(()) - } - - fn root_checksum( - self: &Arc<Self>, - begin: &Hash, - end: &Hash, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<RangeChecksum, Error> { - for i in 1..MAX_DEPTH { - let rc = self.range_checksum( - &SyncRange { - begin: begin.to_vec(), - end: end.to_vec(), - level: i, - }, - must_exit, - )?; - if rc.found_limit.is_none() { - return Ok(rc); - } - } - Err(Error::Message(format!( - "Unable to compute root checksum (this should never happen)" - ))) - } - - fn range_checksum( - self: &Arc<Self>, - range: &SyncRange, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<RangeChecksum, Error> { - assert!(range.level != 0); - trace!("Call range_checksum {:?}", range); - - if range.level == 1 { - let mut children = vec![]; - for item in self - .table - .store - .range(range.begin.clone()..range.end.clone()) - { - let (key, value) = item?; - let key_hash = blake2sum(&key[..]); - if children.len() > 0 - && key_hash.as_slice()[0..range.level] - .iter() - .all(|x| *x == 0u8) - { - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - return Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: Some(key.to_vec()), - time: Instant::now(), - }); - } - let item_range = SyncRange { - begin: key.to_vec(), - end: vec![], - level: 0, - }; - children.push((item_range, blake2sum(&value[..]))); - } - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: None, - time: Instant::now(), - }) - } else { - let mut children = vec![]; - let mut sub_range = SyncRange { - begin: range.begin.clone(), - end: range.end.clone(), - level: range.level - 1, - }; - let mut time = Instant::now(); - while !*must_exit.borrow() { - let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?; - - if let Some(hash) = sub_ck.hash { - children.push((sub_range.clone(), hash)); - if sub_ck.time < time { - time = sub_ck.time; - } - } - - if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() { - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - return Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: None, - time, - }); - } - let found_limit = sub_ck.found_limit.unwrap(); - - let actual_limit_hash = blake2sum(&found_limit[..]); - if actual_limit_hash.as_slice()[0..range.level] - .iter() - .all(|x| *x == 0u8) - { - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - return Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: Some(found_limit.clone()), - time, - }); - } - - sub_range.begin = found_limit; - } - trace!("range_checksum {:?} exiting due to must_exit", range); - Err(Error::Message(format!("Exiting."))) - } - } - - fn range_checksum_cached_hash( - self: &Arc<Self>, - range: &SyncRange, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<RangeChecksumCache, Error> { - { - let mut cache = self.cache[range.level].lock().unwrap(); - if let Some(v) = cache.get(&range) { - if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT { - return Ok(v.clone()); - } - } - cache.remove(&range); - } - - let v = self.range_checksum(&range, must_exit)?; - trace!( - "({}) New checksum calculated for {}-{}/{}, {} children", - self.table.name, - hex::encode(&range.begin) - .chars() - .take(16) - .collect::<String>(), - hex::encode(&range.end).chars().take(16).collect::<String>(), - range.level, - v.children.len() - ); - - let hash = if v.children.len() > 0 { - Some(blake2sum(&rmp_to_vec_all_named(&v)?[..])) - } else { - None - }; - let cache_entry = RangeChecksumCache { - hash, - found_limit: v.found_limit, - time: v.time, - }; - - let mut cache = self.cache[range.level].lock().unwrap(); - cache.insert(range.clone(), cache_entry.clone()); - Ok(cache_entry) - } - - async fn do_sync_with( - self: Arc<Self>, - partition: TodoPartition, - root_ck: RangeChecksum, - who: UUID, - mut must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { - let mut todo = VecDeque::new(); - - // If their root checksum has level > than us, use that as a reference - let root_cks_resp = self - .table - .rpc_client - .call( - who, - TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange( - partition.begin.clone(), - partition.end.clone(), - )), - TABLE_SYNC_RPC_TIMEOUT, - ) - .await?; - if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp { - if range.level > root_ck.bounds.level { - let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?; - todo.push_back(their_root_range_ck); - } else { - todo.push_back(root_ck); - } - } else { - return Err(Error::Message(format!( - "Invalid respone to GetRootChecksumRange RPC: {}", - debug_serialize(root_cks_resp) - ))); - } - - while !todo.is_empty() && !*must_exit.borrow() { - let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - trace!( - "({}) Sync with {:?}: {} ({}) remaining", - self.table.name, - who, - todo.len(), - total_children - ); - - let step_size = std::cmp::min(16, todo.len()); - let step = todo.drain(..step_size).collect::<Vec<_>>(); - - let rpc_resp = self - .table - .rpc_client - .call( - who, - TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)), - TABLE_SYNC_RPC_TIMEOUT, - ) - .await?; - if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = - rpc_resp - { - if diff_ranges.len() > 0 || diff_items.len() > 0 { - info!( - "({}) Sync with {:?}: difference {} ranges, {} items", - self.table.name, - who, - diff_ranges.len(), - diff_items.len() - ); - } - let mut items_to_send = vec![]; - for differing in diff_ranges.drain(..) { - if differing.level == 0 { - items_to_send.push(differing.begin); - } else { - let checksum = self.range_checksum(&differing, &mut must_exit)?; - todo.push_back(checksum); - } - } - if diff_items.len() > 0 { - self.table.handle_update(&diff_items[..]).await?; - } - if items_to_send.len() > 0 { - self.send_items(who, items_to_send).await?; - } - } else { - return Err(Error::Message(format!( - "Unexpected response to sync RPC checksums: {}", - debug_serialize(&rpc_resp) - ))); - } - } - Ok(()) - } - - async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { - info!( - "({}) Sending {} items to {:?}", - self.table.name, - item_list.len(), - who - ); - - let mut values = vec![]; - for item in item_list.iter() { - if let Some(v) = self.table.store.get(&item[..])? { - values.push(Arc::new(ByteBuf::from(v.as_ref()))); - } - } - let rpc_resp = self - .table - .rpc_client - .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT) - .await?; - if let TableRPC::<F>::Ok = rpc_resp { - Ok(()) - } else { - Err(Error::Message(format!( - "Unexpected response to RPC Update: {}", - debug_serialize(&rpc_resp) - ))) - } - } - - pub(crate) async fn handle_rpc( - self: &Arc<Self>, - message: &SyncRPC, - mut must_exit: watch::Receiver<bool>, - ) -> Result<SyncRPC, Error> { - match message { - SyncRPC::GetRootChecksumRange(begin, end) => { - let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?; - Ok(SyncRPC::RootChecksumRange(root_cks.bounds)) - } - SyncRPC::Checksums(checksums) => { - self.handle_checksums_rpc(&checksums[..], &mut must_exit) - .await - } - _ => Err(Error::Message(format!("Unexpected sync RPC"))), - } - } - - async fn handle_checksums_rpc( - self: &Arc<Self>, - checksums: &[RangeChecksum], - must_exit: &mut watch::Receiver<bool>, - ) -> Result<SyncRPC, Error> { - let mut ret_ranges = vec![]; - let mut ret_items = vec![]; - - for their_ckr in checksums.iter() { - let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?; - for (their_range, their_hash) in their_ckr.children.iter() { - let differs = match our_ckr - .children - .binary_search_by(|(our_range, _)| our_range.cmp(&their_range)) - { - Err(_) => { - if their_range.level >= 1 { - let cached_hash = - self.range_checksum_cached_hash(&their_range, must_exit)?; - cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true) - } else { - true - } - } - Ok(i) => our_ckr.children[i].1 != *their_hash, - }; - if differs { - ret_ranges.push(their_range.clone()); - if their_range.level == 0 { - if let Some(item_bytes) = - self.table.store.get(their_range.begin.as_slice())? - { - ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); - } - } - } - } - for (our_range, _hash) in our_ckr.children.iter() { - if let Some(their_found_limit) = &their_ckr.found_limit { - if our_range.begin.as_slice() > their_found_limit.as_slice() { - break; - } - } - - let not_present = our_ckr - .children - .binary_search_by(|(their_range, _)| their_range.cmp(&our_range)) - .is_err(); - if not_present { - if our_range.level > 0 { - ret_ranges.push(our_range.clone()); - } - if our_range.level == 0 { - if let Some(item_bytes) = - self.table.store.get(our_range.begin.as_slice())? - { - ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); - } - } - } - } - } - let n_checksums = checksums - .iter() - .map(|x| x.children.len()) - .fold(0, |x, y| x + y); - if ret_ranges.len() > 0 || ret_items.len() > 0 { - trace!( - "({}) Checksum comparison RPC: {} different + {} items for {} received", - self.table.name, - ret_ranges.len(), - ret_items.len(), - n_checksums - ); - } - Ok(SyncRPC::Difference(ret_ranges, ret_items)) - } - - pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) { - for i in 1..MAX_DEPTH { - let needle = SyncRange { - begin: item_key.to_vec(), - end: vec![], - level: i, - }; - let mut cache = self.cache[i].lock().unwrap(); - if let Some(cache_entry) = cache.range(..=needle).rev().next() { - if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key { - let index = cache_entry.0.clone(); - drop(cache_entry); - cache.remove(&index); - } - } - } - } -} - -impl SyncTodo { - fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) { - let my_id = table.system.id; - - self.todo.clear(); - - let ring = table.system.ring.borrow().clone(); - let split_points = table.replication.split_points(&ring); - - for i in 0..split_points.len() - 1 { - let begin = split_points[i]; - let end = split_points[i + 1]; - let nodes = table.replication.replication_nodes(&begin, &ring); - - let retain = nodes.contains(&my_id); - if !retain { - // Check if we have some data to send, otherwise skip - if table.store.range(begin..end).next().is_none() { - continue; - } - } - - self.todo.push(TodoPartition { begin, end, retain }); - } - } - - fn add_ring_difference<F: TableSchema, R: TableReplication>( - &mut self, - table: &Table<F, R>, - old_ring: &Ring, - new_ring: &Ring, - ) { - let my_id = table.system.id; - - // If it is us who are entering or leaving the system, - // initiate a full sync instead of incremental sync - if old_ring.config.members.contains_key(&my_id) - != new_ring.config.members.contains_key(&my_id) - { - self.add_full_scan(table); - return; - } - - let mut all_points = None - .into_iter() - .chain(table.replication.split_points(old_ring).drain(..)) - .chain(table.replication.split_points(new_ring).drain(..)) - .chain(self.todo.iter().map(|x| x.begin)) - .chain(self.todo.iter().map(|x| x.end)) - .collect::<Vec<_>>(); - all_points.sort(); - all_points.dedup(); - - let mut old_todo = std::mem::replace(&mut self.todo, vec![]); - old_todo.sort_by(|x, y| x.begin.cmp(&y.begin)); - let mut new_todo = vec![]; - - for i in 0..all_points.len() - 1 { - let begin = all_points[i]; - let end = all_points[i + 1]; - let was_ours = table - .replication - .replication_nodes(&begin, &old_ring) - .contains(&my_id); - let is_ours = table - .replication - .replication_nodes(&begin, &new_ring) - .contains(&my_id); - - let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) { - Ok(_) => true, - Err(j) => { - (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end) - || (j < old_todo.len() - && old_todo[j].begin < end && begin < old_todo[j].end) - } - }; - if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { - new_todo.push(TodoPartition { - begin, - end, - retain: is_ours, - }); - } - } - - self.todo = new_todo; - } - - fn pop_task(&mut self) -> Option<TodoPartition> { - if self.todo.is_empty() { - return None; - } - - let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len()); - if i == self.todo.len() - 1 { - self.todo.pop() - } else { - let replacement = self.todo.pop().unwrap(); - let ret = std::mem::replace(&mut self.todo[i], replacement); - Some(ret) - } - } -} |