From f319a7d3740ba8b83c9c0eae27edfda1c1d14c03 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 16:21:56 +0100 Subject: Refactor model stuff, including cleaner CRDTs --- src/table/crdt.rs | 327 ---------------------------------------------- src/table/crdt/bool.rs | 34 +++++ src/table/crdt/crdt.rs | 73 +++++++++++ src/table/crdt/lww.rs | 114 ++++++++++++++++ src/table/crdt/lww_map.rs | 145 ++++++++++++++++++++ src/table/crdt/map.rs | 83 ++++++++++++ src/table/crdt/mod.rs | 22 ++++ src/table/schema.rs | 6 +- src/table/table.rs | 1 + 9 files changed, 475 insertions(+), 330 deletions(-) delete mode 100644 src/table/crdt.rs create mode 100644 src/table/crdt/bool.rs create mode 100644 src/table/crdt/crdt.rs create mode 100644 src/table/crdt/lww.rs create mode 100644 src/table/crdt/lww_map.rs create mode 100644 src/table/crdt/map.rs create mode 100644 src/table/crdt/mod.rs (limited to 'src/table') diff --git a/src/table/crdt.rs b/src/table/crdt.rs deleted file mode 100644 index 4cba10ce..00000000 --- a/src/table/crdt.rs +++ /dev/null @@ -1,327 +0,0 @@ -//! This package provides a simple implementation of conflict-free replicated data types (CRDTs) -//! -//! CRDTs are a type of data structures that do not require coordination. In other words, we can -//! edit them in parallel, we will always find a way to merge it. -//! -//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the -//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now, -//! it is easy to merge their counters, order does not count: we always get 4. -//! -//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) - -use serde::{Deserialize, Serialize}; - -use garage_util::data::*; - -/// Definition of a CRDT - all CRDT Rust types implement this. -/// -/// A CRDT is defined as a merge operator that respects a certain set of axioms. -/// -/// In particular, the merge operator must be commutative, associative, -/// idempotent, and monotonic. -/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator, -/// the following axioms must apply: -/// -/// ```text -/// a ⊔ b = b ⊔ a (commutativity) -/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity) -/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence) -/// ``` -/// -/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order. -/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`, -/// as this would imply a cycle in the partial order. -pub trait CRDT { - /// Merge the two datastructures according to the CRDT rules. - /// `self` is modified to contain the merged CRDT value. `other` is not modified. - /// - /// # Arguments - /// - /// * `other` - the other CRDT we wish to merge with - fn merge(&mut self, other: &Self); -} - -/// All types that implement `Ord` (a total order) also implement a trivial CRDT -/// defined by the merge rule: `a ⊔ b = max(a, b)`. -impl CRDT for T -where - T: Ord + Clone, -{ - fn merge(&mut self, other: &Self) { - if other > self { - *self = other.clone(); - } - } -} - -// ---- LWW Register ---- - -/// Last Write Win (LWW) -/// -/// An LWW CRDT associates a timestamp with a value, in order to implement a -/// time-based reconciliation rule: the most recent write wins. -/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs -/// with the same timestamp but different values. -/// -/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must -/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to -/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types -/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value. -/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is -/// generally desirable in this case to never explicitly produce LWW values with the same timestamp -/// but different inner values, as the rule to keep the maximum value isn't generally the desired -/// semantics.) -/// -/// As multiple computers clocks are always desynchronized, -/// when operations are close enough, it is equivalent to -/// take one copy and drop the other one. -/// -/// Given that clocks are not too desynchronized, this assumption -/// is enough for most cases, as there is few chance that two humans -/// coordonate themself faster than the time difference between two NTP servers. -/// -/// As a more concret example, let's suppose you want to upload a file -/// with the same key (path) in the same bucket at the very same time. -/// For each request, the file will be timestamped by the receiving server -/// and may differ from what you observed with your atomic clock! -/// -/// This scheme is used by AWS S3 or Soundcloud and often without knowing -/// in entreprise when reconciliating databases with ad-hoc scripts. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWW { - ts: u64, - v: T, -} - -impl LWW -where - T: CRDT, -{ - /// Creates a new CRDT - /// - /// CRDT's internal timestamp is set with current node's clock. - pub fn new(value: T) -> Self { - Self { - ts: now_msec(), - v: value, - } - } - - /// Build a new CRDT from a previous non-compatible one - /// - /// Compared to new, the CRDT's timestamp is not set to now - /// but must be set to the previous, non-compatible, CRDT's timestamp. - pub fn migrate_from_raw(ts: u64, value: T) -> Self { - Self { ts, v: value } - } - - /// Update the LWW CRDT while keeping some causal ordering. - /// - /// The timestamp of the LWW CRDT is updated to be the current node's clock - /// at time of update, or the previous timestamp + 1 if that's bigger, - /// so that the new timestamp is always strictly larger than the previous one. - /// This ensures that merging the update with the old value will result in keeping - /// the updated value. - pub fn update(&mut self, new_value: T) { - self.ts = std::cmp::max(self.ts + 1, now_msec()); - self.v = new_value; - } - - /// Get the CRDT value - pub fn get(&self) -> &T { - &self.v - } - - /// Get a mutable reference to the CRDT's value - /// - /// This is usefull to mutate the inside value without changing the LWW timestamp. - /// When such mutation is done, the merge between two LWW values is done using the inner - /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large - /// data type, such as a map, and we only want to change a single item in the map. - /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification. - /// This delta consists in a LWW with the same timestamp, and the map - /// inside only contains the updated value. - /// The advantage of such a delta is that it is much smaller than the whole map. - /// - /// Avoid using this if the inner data type is a primitive type such as a number or a string, - /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum - /// of both values. - pub fn get_mut(&mut self) -> &mut T { - &mut self.v - } -} - -impl CRDT for LWW -where - T: Clone + CRDT, -{ - fn merge(&mut self, other: &Self) { - if other.ts > self.ts { - self.ts = other.ts; - self.v = other.v.clone(); - } else if other.ts == self.ts { - self.v.merge(&other.v); - } - } -} - -/// Boolean, where `true` is an absorbing state -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] -pub struct Bool(bool); - -impl Bool { - /// Create a new boolean with the specified value - pub fn new(b: bool) -> Self { - Self(b) - } - /// Set the boolean to true - pub fn set(&mut self) { - self.0 = true; - } - /// Get the boolean value - pub fn get(&self) -> bool { - self.0 - } -} - -impl CRDT for Bool { - fn merge(&mut self, other: &Self) { - self.0 = self.0 || other.0; - } -} - -/// Last Write Win Map -/// -/// This types defines a CRDT for a map from keys to values. -/// The values have an associated timestamp, such that the last written value -/// takes precedence over previous ones. As for the simpler `LWW` type, the value -/// type `V` is also required to implement the CRDT trait. -/// We do not encourage mutating the values associated with a given key -/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()` -/// method that would allow that. -/// -/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. -/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, -/// such that two values can be compared for equality based on their hashes). As a consequence, -/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps. -/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, -/// the serialization cost `O(n)` would still have to be paid at each modification, so we are -/// actually not losing anything here. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWWMap { - vals: Vec<(K, u64, V)>, -} - -impl LWWMap -where - K: Ord, - V: CRDT, -{ - /// Create a new empty map CRDT - pub fn new() -> Self { - Self { vals: vec![] } - } - /// Used to migrate from a map defined in an incompatible format. This produces - /// a map that contains a single item with the specified timestamp (copied from - /// the incompatible format). Do this as many times as you have items to migrate, - /// and put them all together using the CRDT merge operator. - pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { - Self { - vals: vec![(k, ts, v)], - } - } - /// Returns a map that contains a single mapping from the specified key to the specified value. - /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map, - /// the previous value will be replaced with the one specified here. - /// The timestamp in the provided mutator is set to the maximum of the current system's clock - /// and 1 + the previous value's timestamp (if there is one), so that the new value will always - /// take precedence (LWW rule). - /// - /// Typically, to update the value associated to a key in the map, you would do the following: - /// - /// ```ignore - /// let my_update = my_crdt.update_mutator(key_to_modify, new_value); - /// my_crdt.merge(&my_update); - /// ``` - /// - /// However extracting the mutator on its own and only sending that on the network is very - /// interesting as it is much smaller than the whole map. - pub fn update_mutator(&self, k: K, new_v: V) -> Self { - let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { - Ok(i) => { - let (_, old_ts, _) = self.vals[i]; - let new_ts = std::cmp::max(old_ts + 1, now_msec()); - vec![(k, new_ts, new_v)] - } - Err(_) => vec![(k, now_msec(), new_v)], - }; - Self { vals: new_vals } - } - /// Takes all of the values of the map and returns them. The current map is reset to the - /// empty map. This is very usefull to produce in-place a new map that contains only a delta - /// that modifies a certain value: - /// - /// ```ignore - /// let mut a = get_my_crdt_value(); - /// let old_a = a.take_and_clear(); - /// a.merge(&old_a.update_mutator(key_to_modify, new_value)); - /// put_my_crdt_value(a); - /// ``` - /// - /// Of course in this simple example we could have written simply - /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`, - /// but in the case where the map is a field in a struct for instance (as is always the case), - /// this becomes very handy: - /// - /// ```ignore - /// let mut a = get_my_crdt_value(); - /// let old_a_map = a.map_field.take_and_clear(); - /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value)); - /// put_my_crdt_value(a); - /// ``` - pub fn take_and_clear(&mut self) -> Self { - let vals = std::mem::replace(&mut self.vals, vec![]); - Self { vals } - } - /// Removes all values from the map - pub fn clear(&mut self) { - self.vals.clear(); - } - /// Get a reference to the value assigned to a key - pub fn get(&self, k: &K) -> Option<&V> { - match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { - Ok(i) => Some(&self.vals[i].2), - Err(_) => None, - } - } - /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. - /// In most case you will want to ignore the timestamp (second item of the tuple). - pub fn items(&self) -> &[(K, u64, V)] { - &self.vals[..] - } -} - -impl CRDT for LWWMap -where - K: Clone + Ord, - V: Clone + CRDT, -{ - fn merge(&mut self, other: &Self) { - for (k, ts2, v2) in other.vals.iter() { - match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { - Ok(i) => { - let (_, ts1, _v1) = &self.vals[i]; - if ts2 > ts1 { - self.vals[i].1 = *ts2; - self.vals[i].2 = v2.clone(); - } else if ts1 == ts2 { - self.vals[i].2.merge(&v2); - } - } - Err(i) => { - self.vals.insert(i, (k.clone(), *ts2, v2.clone())); - } - } - } - } -} diff --git a/src/table/crdt/bool.rs b/src/table/crdt/bool.rs new file mode 100644 index 00000000..1989c92e --- /dev/null +++ b/src/table/crdt/bool.rs @@ -0,0 +1,34 @@ +use serde::{Deserialize, Serialize}; + +use crate::crdt::crdt::*; + +/// Boolean, where `true` is an absorbing state +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +pub struct Bool(bool); + +impl Bool { + /// Create a new boolean with the specified value + pub fn new(b: bool) -> Self { + Self(b) + } + /// Set the boolean to true + pub fn set(&mut self) { + self.0 = true; + } + /// Get the boolean value + pub fn get(&self) -> bool { + self.0 + } +} + +impl From for Bool { + fn from(b: bool) -> Bool { + Bool::new(b) + } +} + +impl CRDT for Bool { + fn merge(&mut self, other: &Self) { + self.0 = self.0 || other.0; + } +} diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs new file mode 100644 index 00000000..636b6df6 --- /dev/null +++ b/src/table/crdt/crdt.rs @@ -0,0 +1,73 @@ +use garage_util::data::*; + +/// Definition of a CRDT - all CRDT Rust types implement this. +/// +/// A CRDT is defined as a merge operator that respects a certain set of axioms. +/// +/// In particular, the merge operator must be commutative, associative, +/// idempotent, and monotonic. +/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator, +/// the following axioms must apply: +/// +/// ```text +/// a ⊔ b = b ⊔ a (commutativity) +/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity) +/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence) +/// ``` +/// +/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order. +/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`, +/// as this would imply a cycle in the partial order. +pub trait CRDT { + /// Merge the two datastructures according to the CRDT rules. + /// `self` is modified to contain the merged CRDT value. `other` is not modified. + /// + /// # Arguments + /// + /// * `other` - the other CRDT we wish to merge with + fn merge(&mut self, other: &Self); +} + +/// All types that implement `Ord` (a total order) can also implement a trivial CRDT +/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type +/// to enable this behavior. +pub trait AutoCRDT: Ord + Clone + std::fmt::Debug { + /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if + /// different values in your application should never happen. Set this to false + /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`. + const WARN_IF_DIFFERENT: bool; +} + +impl CRDT for T +where + T: AutoCRDT, +{ + fn merge(&mut self, other: &Self) { + if Self::WARN_IF_DIFFERENT && self != other { + warn!( + "Different CRDT values should be the same (logic error!): {:?} vs {:?}", + self, other + ); + if other > self { + *self = other.clone(); + } + warn!("Making an arbitrary choice: {:?}", self); + } else { + if other > self { + *self = other.clone(); + } + } + } +} + +impl AutoCRDT for String { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCRDT for bool { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCRDT for FixedBytes32 { + const WARN_IF_DIFFERENT: bool = true; +} diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs new file mode 100644 index 00000000..9a3ab671 --- /dev/null +++ b/src/table/crdt/lww.rs @@ -0,0 +1,114 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::data::now_msec; + +use crate::crdt::crdt::*; + +/// Last Write Win (LWW) +/// +/// An LWW CRDT associates a timestamp with a value, in order to implement a +/// time-based reconciliation rule: the most recent write wins. +/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs +/// with the same timestamp but different values. +/// +/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must +/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to +/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types +/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value. +/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is +/// generally desirable in this case to never explicitly produce LWW values with the same timestamp +/// but different inner values, as the rule to keep the maximum value isn't generally the desired +/// semantics.) +/// +/// As multiple computers clocks are always desynchronized, +/// when operations are close enough, it is equivalent to +/// take one copy and drop the other one. +/// +/// Given that clocks are not too desynchronized, this assumption +/// is enough for most cases, as there is few chance that two humans +/// coordonate themself faster than the time difference between two NTP servers. +/// +/// As a more concret example, let's suppose you want to upload a file +/// with the same key (path) in the same bucket at the very same time. +/// For each request, the file will be timestamped by the receiving server +/// and may differ from what you observed with your atomic clock! +/// +/// This scheme is used by AWS S3 or Soundcloud and often without knowing +/// in entreprise when reconciliating databases with ad-hoc scripts. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWW { + ts: u64, + v: T, +} + +impl LWW +where + T: CRDT, +{ + /// Creates a new CRDT + /// + /// CRDT's internal timestamp is set with current node's clock. + pub fn new(value: T) -> Self { + Self { + ts: now_msec(), + v: value, + } + } + + /// Build a new CRDT from a previous non-compatible one + /// + /// Compared to new, the CRDT's timestamp is not set to now + /// but must be set to the previous, non-compatible, CRDT's timestamp. + pub fn migrate_from_raw(ts: u64, value: T) -> Self { + Self { ts, v: value } + } + + /// Update the LWW CRDT while keeping some causal ordering. + /// + /// The timestamp of the LWW CRDT is updated to be the current node's clock + /// at time of update, or the previous timestamp + 1 if that's bigger, + /// so that the new timestamp is always strictly larger than the previous one. + /// This ensures that merging the update with the old value will result in keeping + /// the updated value. + pub fn update(&mut self, new_value: T) { + self.ts = std::cmp::max(self.ts + 1, now_msec()); + self.v = new_value; + } + + /// Get the CRDT value + pub fn get(&self) -> &T { + &self.v + } + + /// Get a mutable reference to the CRDT's value + /// + /// This is usefull to mutate the inside value without changing the LWW timestamp. + /// When such mutation is done, the merge between two LWW values is done using the inner + /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large + /// data type, such as a map, and we only want to change a single item in the map. + /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification. + /// This delta consists in a LWW with the same timestamp, and the map + /// inside only contains the updated value. + /// The advantage of such a delta is that it is much smaller than the whole map. + /// + /// Avoid using this if the inner data type is a primitive type such as a number or a string, + /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum + /// of both values. + pub fn get_mut(&mut self) -> &mut T { + &mut self.v + } +} + +impl CRDT for LWW +where + T: Clone + CRDT, +{ + fn merge(&mut self, other: &Self) { + if other.ts > self.ts { + self.ts = other.ts; + self.v = other.v.clone(); + } else if other.ts == self.ts { + self.v.merge(&other.v); + } + } +} diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs new file mode 100644 index 00000000..bd40f368 --- /dev/null +++ b/src/table/crdt/lww_map.rs @@ -0,0 +1,145 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::data::now_msec; + +use crate::crdt::crdt::*; + +/// Last Write Win Map +/// +/// This types defines a CRDT for a map from keys to values. +/// The values have an associated timestamp, such that the last written value +/// takes precedence over previous ones. As for the simpler `LWW` type, the value +/// type `V` is also required to implement the CRDT trait. +/// We do not encourage mutating the values associated with a given key +/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()` +/// method that would allow that. +/// +/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. +/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, +/// such that two values can be compared for equality based on their hashes). As a consequence, +/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps. +/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, +/// the serialization cost `O(n)` would still have to be paid at each modification, so we are +/// actually not losing anything here. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWWMap { + vals: Vec<(K, u64, V)>, +} + +impl LWWMap +where + K: Ord, + V: CRDT, +{ + /// Create a new empty map CRDT + pub fn new() -> Self { + Self { vals: vec![] } + } + /// Used to migrate from a map defined in an incompatible format. This produces + /// a map that contains a single item with the specified timestamp (copied from + /// the incompatible format). Do this as many times as you have items to migrate, + /// and put them all together using the CRDT merge operator. + pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { + Self { + vals: vec![(k, ts, v)], + } + } + /// Returns a map that contains a single mapping from the specified key to the specified value. + /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map, + /// the previous value will be replaced with the one specified here. + /// The timestamp in the provided mutator is set to the maximum of the current system's clock + /// and 1 + the previous value's timestamp (if there is one), so that the new value will always + /// take precedence (LWW rule). + /// + /// Typically, to update the value associated to a key in the map, you would do the following: + /// + /// ```ignore + /// let my_update = my_crdt.update_mutator(key_to_modify, new_value); + /// my_crdt.merge(&my_update); + /// ``` + /// + /// However extracting the mutator on its own and only sending that on the network is very + /// interesting as it is much smaller than the whole map. + pub fn update_mutator(&self, k: K, new_v: V) -> Self { + let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => { + let (_, old_ts, _) = self.vals[i]; + let new_ts = std::cmp::max(old_ts + 1, now_msec()); + vec![(k, new_ts, new_v)] + } + Err(_) => vec![(k, now_msec(), new_v)], + }; + Self { vals: new_vals } + } + /// Takes all of the values of the map and returns them. The current map is reset to the + /// empty map. This is very usefull to produce in-place a new map that contains only a delta + /// that modifies a certain value: + /// + /// ```ignore + /// let mut a = get_my_crdt_value(); + /// let old_a = a.take_and_clear(); + /// a.merge(&old_a.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + /// + /// Of course in this simple example we could have written simply + /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`, + /// but in the case where the map is a field in a struct for instance (as is always the case), + /// this becomes very handy: + /// + /// ```ignore + /// let mut a = get_my_crdt_value(); + /// let old_a_map = a.map_field.take_and_clear(); + /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + pub fn take_and_clear(&mut self) -> Self { + let vals = std::mem::replace(&mut self.vals, vec![]); + Self { vals } + } + /// Removes all values from the map + pub fn clear(&mut self) { + self.vals.clear(); + } + /// Get a reference to the value assigned to a key + pub fn get(&self, k: &K) -> Option<&V> { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => Some(&self.vals[i].2), + Err(_) => None, + } + } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. + /// In most case you will want to ignore the timestamp (second item of the tuple). + pub fn items(&self) -> &[(K, u64, V)] { + &self.vals[..] + } + /// Returns the number of items in the map + pub fn len(&self) -> usize { + self.vals.len() + } +} + +impl CRDT for LWWMap +where + K: Clone + Ord, + V: Clone + CRDT, +{ + fn merge(&mut self, other: &Self) { + for (k, ts2, v2) in other.vals.iter() { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => { + let (_, ts1, _v1) = &self.vals[i]; + if ts2 > ts1 { + self.vals[i].1 = *ts2; + self.vals[i].2 = v2.clone(); + } else if ts1 == ts2 { + self.vals[i].2.merge(&v2); + } + } + Err(i) => { + self.vals.insert(i, (k.clone(), *ts2, v2.clone())); + } + } + } + } +} diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs new file mode 100644 index 00000000..1193e6db --- /dev/null +++ b/src/table/crdt/map.rs @@ -0,0 +1,83 @@ +use serde::{Deserialize, Serialize}; + +use crate::crdt::crdt::*; + +/// Simple CRDT Map +/// +/// This types defines a CRDT for a map from keys to values. Values are CRDT types which +/// can have their own updating logic. +/// +/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. +/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, +/// such that two values can be compared for equality based on their hashes). As a consequence, +/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps. +/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, +/// the serialization cost `O(n)` would still have to be paid at each modification, so we are +/// actually not losing anything here. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct Map { + vals: Vec<(K, V)>, +} + +impl Map +where + K: Clone + Ord, + V: Clone + CRDT, +{ + /// Create a new empty map CRDT + pub fn new() -> Self { + Self { vals: vec![] } + } + + /// Returns a map that contains a single mapping from the specified key to the specified value. + /// This can be used to build a delta-mutator: + /// when merged with another map, the value will be added or CRDT-merged if a previous + /// value already exists. + pub fn put_mutator(k: K, v: V) -> Self { + Self { vals: vec![(k, v)] } + } + + pub fn put(&mut self, k: K, v: V) { + self.merge(&Self::put_mutator(k, v)); + } + + /// Removes all values from the map + pub fn clear(&mut self) { + self.vals.clear(); + } + + /// Get a reference to the value assigned to a key + pub fn get(&self, k: &K) -> Option<&V> { + match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) { + Ok(i) => Some(&self.vals[i].1), + Err(_) => None, + } + } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. + pub fn items(&self) -> &[(K, V)] { + &self.vals[..] + } + /// Returns the number of items in the map + pub fn len(&self) -> usize { + self.vals.len() + } +} + +impl CRDT for Map +where + K: Clone + Ord, + V: Clone + CRDT, +{ + fn merge(&mut self, other: &Self) { + for (k, v2) in other.vals.iter() { + match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) { + Ok(i) => { + self.vals[i].1.merge(&v2); + } + Err(i) => { + self.vals.insert(i, (k.clone(), v2.clone())); + } + } + } + } +} diff --git a/src/table/crdt/mod.rs b/src/table/crdt/mod.rs new file mode 100644 index 00000000..eb75d061 --- /dev/null +++ b/src/table/crdt/mod.rs @@ -0,0 +1,22 @@ +//! This package provides a simple implementation of conflict-free replicated data types (CRDTs) +//! +//! CRDTs are a type of data structures that do not require coordination. In other words, we can +//! edit them in parallel, we will always find a way to merge it. +//! +//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the +//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now, +//! it is easy to merge their counters, order does not count: we always get 4. +//! +//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) + +mod bool; +mod crdt; +mod lww; +mod lww_map; +mod map; + +pub use self::bool::*; +pub use crdt::*; +pub use lww::*; +pub use lww_map::*; +pub use map::*; diff --git a/src/table/schema.rs b/src/table/schema.rs index edd04000..5b789a02 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; +use crate::crdt::CRDT; + pub trait PartitionKey { fn hash(&self) -> Hash; } @@ -35,12 +37,10 @@ impl SortKey for Hash { } pub trait Entry: - PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; - - fn merge(&mut self, other: &Self); } pub trait TableSchema: Send + Sync { diff --git a/src/table/table.rs b/src/table/table.rs index 1f6b7d25..366ce925 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -17,6 +17,7 @@ use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; +use crate::crdt::CRDT; use crate::schema::*; use crate::table_sync::*; -- cgit v1.2.3 From 2afd2c81baacf66f00333de3f0fb18df0b497b41 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 16:23:57 +0100 Subject: Change hash function to blake2 for partition keys based on strings --- src/table/schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table') diff --git a/src/table/schema.rs b/src/table/schema.rs index 5b789a02..7fbb7b25 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -10,7 +10,7 @@ pub trait PartitionKey { impl PartitionKey for String { fn hash(&self) -> Hash { - sha256sum(self.as_bytes()) + blake2sum(self.as_bytes()) } } -- cgit v1.2.3 From 3214dd52dd144c99353830d7340ea158e262b06f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 21:50:09 +0100 Subject: Very minor changes --- src/table/table_sharded.rs | 1 + src/table/table_sync.rs | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) (limited to 'src/table') diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs index 098637dd..47bdfeaf 100644 --- a/src/table/table_sharded.rs +++ b/src/table/table_sharded.rs @@ -44,6 +44,7 @@ impl TableReplication for TableShardedReplication { fn split_points(&self, ring: &Ring) -> Vec { let mut ret = vec![]; + ret.push([0u8; 32].into()); for entry in ring.ring.iter() { ret.push(entry.location); } diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 5fa6793b..c38b6bd5 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -18,10 +18,14 @@ use garage_util::error::Error; use crate::*; const MAX_DEPTH: usize = 16; -const SCAN_INTERVAL: Duration = Duration::from_secs(3600); -const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); +// Scan & sync every 12 hours +const SCAN_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); + +// Expire cache after 30 minutes +const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(30 * 60); + pub struct TableSyncer { table: Arc>, todo: Mutex, @@ -797,6 +801,10 @@ impl SyncTodo { for i in 0..split_points.len() - 1 { let begin = split_points[i]; let end = split_points[i + 1]; + if begin == end { + continue; + } + let nodes = table.replication.replication_nodes(&begin, &ring); let retain = nodes.contains(&my_id); -- cgit v1.2.3 From 8d63738cb062e816fc01c6aa2b32936ad31ff65b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 13:47:21 +0100 Subject: Checkpoint: add merkle tree in data table --- src/table/lib.rs | 1 + src/table/merkle.rs | 352 ++++++++++++++++++++++++++++++++++++++++++++++++ src/table/table.rs | 91 +++++++++---- src/table/table_sync.rs | 28 ++-- 4 files changed, 425 insertions(+), 47 deletions(-) create mode 100644 src/table/merkle.rs (limited to 'src/table') diff --git a/src/table/lib.rs b/src/table/lib.rs index 704f8f1e..62fd30c5 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -7,6 +7,7 @@ pub mod crdt; pub mod schema; pub mod util; +pub mod merkle; pub mod table; pub mod table_fullcopy; pub mod table_sharded; diff --git a/src/table/merkle.rs b/src/table/merkle.rs new file mode 100644 index 00000000..50cb90d5 --- /dev/null +++ b/src/table/merkle.rs @@ -0,0 +1,352 @@ +use std::convert::TryInto; +use std::sync::Arc; +use std::time::Duration; + +use futures::select; +use futures_util::future::*; +use log::{info, warn}; +use serde::{Deserialize, Serialize}; +use sled::transaction::{ + ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, +}; +use tokio::sync::{watch, Notify}; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error; + +// This modules partitions the data in 2**16 partitions, based on the top +// 16 bits (two bytes) of item's partition keys' hashes. +// It builds one Merkle tree for each of these 2**16 partitions. + +pub(crate) struct MerkleUpdater { + table_name: String, + background: Arc, + + // Content of the todo tree: items where + // - key = the key of an item in the main table, ie hash(partition_key)+sort_key + // - value = the hash of the full serialized item, if present, + // or an empty vec if item is absent (deleted) + pub(crate) todo: sled::Tree, + pub(crate) todo_notify: Notify, + + // Content of the merkle tree: items where + // - key = .bytes() for MerkleNodeKey + // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found + pub(crate) merkle_tree: sled::Tree, + empty_node_hash: Hash, +} + +#[derive(Clone)] +pub struct MerkleNodeKey { + // partition: first 16 bits (two bytes) of the partition_key's hash + pub partition: [u8; 2], + + // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) + pub prefix: Vec, +} + +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] +pub enum MerkleNode { + // The empty Merkle node + Empty, + + // An intermediate Merkle tree node for a prefix + // Contains the hashes of the 256 possible next prefixes + Intermediate(Vec<(u8, Hash)>), + + // A final node for an item + // Contains the full key of the item and the hash of the value + Leaf(Vec, Hash), +} + +impl MerkleUpdater { + pub(crate) fn new( + table_name: String, + background: Arc, + todo: sled::Tree, + merkle_tree: sled::Tree, + ) -> Arc { + let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); + + Arc::new(Self { + table_name, + background, + todo, + todo_notify: Notify::new(), + merkle_tree, + empty_node_hash, + }) + } + + pub(crate) fn launch(self: &Arc) { + let self2 = self.clone(); + self.background.spawn_worker( + format!("Merkle tree updater for {}", self.table_name), + |must_exit: watch::Receiver| self2.updater_loop(must_exit), + ); + } + + async fn updater_loop( + self: Arc, + mut must_exit: watch::Receiver, + ) -> Result<(), Error> { + while !*must_exit.borrow() { + if let Some(x) = self.todo.iter().next() { + match x { + Ok((key, valhash)) => { + if let Err(e) = self.update_item(&key[..], &valhash[..]) { + warn!("Error while updating Merkle tree item: {}", e); + } + } + Err(e) => { + warn!("Error while iterating on Merkle todo tree: {}", e); + tokio::time::delay_for(Duration::from_secs(10)).await; + } + } + } else { + select! { + _ = self.todo_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), + } + } + } + Ok(()) + } + + fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { + let khash = blake2sum(k); + + let new_vhash = if vhash_by.len() == 0 { + None + } else { + let vhash_by: [u8; 32] = vhash_by + .try_into() + .map_err(|_| Error::Message(format!("Invalid value in Merkle todo table")))?; + Some(Hash::from(vhash_by)) + }; + + let key = MerkleNodeKey { + partition: k[0..2].try_into().unwrap(), + prefix: vec![], + }; + self.merkle_tree + .transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?; + + let deleted = self + .todo + .compare_and_swap::<_, _, Vec>(k, Some(vhash_by), None)? + .is_ok(); + + if !deleted { + info!( + "Item not deleted from Merkle todo because it changed: {:?}", + k + ); + } + Ok(()) + } + + fn update_item_rec( + &self, + tx: &TransactionalTree, + k: &[u8], + khash: Hash, + key: &MerkleNodeKey, + new_vhash: Option, + ) -> ConflictableTransactionResult, Error> { + let i = key.prefix.len(); + let mutate = match self.read_node_txn(tx, &key)? { + MerkleNode::Empty => { + if let Some(vhv) = new_vhash { + Some(MerkleNode::Leaf(k.to_vec(), vhv)) + } else { + None + } + } + MerkleNode::Intermediate(mut children) => { + let key2 = key.next_key(khash); + if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? { + if subhash == self.empty_node_hash { + intermediate_rm_child(&mut children, key2.prefix[i]); + } else { + intermediate_set_child(&mut children, key2.prefix[i], subhash); + } + if children.len() == 0 { + // should not happen + warn!("Replacing intermediate node with empty node, should not happen."); + Some(MerkleNode::Empty) + } else if children.len() == 1 { + // move node down to this level + let key_sub = key.add_byte(children[0].0); + let subnode = self.read_node_txn(tx, &key_sub)?; + tx.remove(key_sub.encode())?; + Some(subnode) + } else { + Some(MerkleNode::Intermediate(children)) + } + } else { + None + } + } + MerkleNode::Leaf(exlf_key, exlf_hash) => { + if exlf_key == k { + match new_vhash { + Some(vhv) if vhv == exlf_hash => None, + Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)), + None => Some(MerkleNode::Empty), + } + } else { + if let Some(vhv) = new_vhash { + // Create two sub-nodes and replace by intermediary node + let (pos1, h1) = { + let key2 = key.next_key(blake2sum(&exlf_key[..])); + let subhash = + self.put_node_txn(tx, &key2, &MerkleNode::Leaf(exlf_key, exlf_hash))?; + (key2.prefix[i], subhash) + }; + let (pos2, h2) = { + let key2 = key.next_key(khash); + let subhash = + self.put_node_txn(tx, &key2, &MerkleNode::Leaf(k.to_vec(), vhv))?; + (key2.prefix[i], subhash) + }; + let mut int = vec![]; + intermediate_set_child(&mut int, pos1, h1); + intermediate_set_child(&mut int, pos2, h2); + Some(MerkleNode::Intermediate(int)) + } else { + None + } + } + } + }; + + if let Some(new_node) = mutate { + let hash = self.put_node_txn(tx, &key, &new_node)?; + Ok(Some(hash)) + } else { + Ok(None) + } + } + + // Merkle tree node manipulation + + fn read_node_txn( + &self, + tx: &TransactionalTree, + k: &MerkleNodeKey, + ) -> ConflictableTransactionResult { + let ent = tx.get(k.encode())?; + match ent { + None => Ok(MerkleNode::Empty), + Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..]) + .map_err(|e| ConflictableTransactionError::Abort(e.into()))?), + } + } + + fn put_node_txn( + &self, + tx: &TransactionalTree, + k: &MerkleNodeKey, + v: &MerkleNode, + ) -> ConflictableTransactionResult { + if *v == MerkleNode::Empty { + tx.remove(k.encode())?; + Ok(self.empty_node_hash) + } else { + let vby = rmp_to_vec_all_named(v) + .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let rethash = blake2sum(&vby[..]); + tx.insert(k.encode(), vby)?; + Ok(rethash) + } + } + + pub(crate) fn read_node( + &self, + k: &MerkleNodeKey, + ) -> Result { + let ent = self.merkle_tree.get(k.encode())?; + match ent { + None => Ok(MerkleNode::Empty), + Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?) + } + } +} + +impl MerkleNodeKey { + fn encode(&self) -> Vec { + let mut ret = Vec::with_capacity(2 + self.prefix.len()); + ret.extend(&self.partition[..]); + ret.extend(&self.prefix[..]); + ret + } + + pub fn next_key(&self, h: Hash) -> Self { + assert!(&h.as_slice()[0..self.prefix.len()] == &self.prefix[..]); + let mut s2 = self.clone(); + s2.prefix.push(h.as_slice()[self.prefix.len()]); + s2 + } + + pub fn add_byte(&self, b: u8) -> Self { + let mut s2 = self.clone(); + s2.prefix.push(b); + s2 + } +} + +fn intermediate_set_child(ch: &mut Vec<(u8, Hash)>, pos: u8, v: Hash) { + for i in 0..ch.len() { + if ch[i].0 == pos { + ch[i].1 = v; + return; + } else if ch[i].0 > pos { + ch.insert(i, (pos, v)); + return; + } + } + ch.insert(ch.len(), (pos, v)); +} + +fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) { + for i in 0..ch.len() { + if ch[i].0 == pos { + ch.remove(i); + return; + } + } +} + +#[test] +fn test_intermediate_aux() { + let mut v = vec![]; + + intermediate_set_child(&mut v, 12u8, [12u8; 32].into()); + assert!(v == vec![(12u8, [12u8; 32].into())]); + + intermediate_set_child(&mut v, 42u8, [42u8; 32].into()); + assert!(v == vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]); + + intermediate_set_child(&mut v, 4u8, [4u8; 32].into()); + assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]); + + intermediate_set_child(&mut v, 12u8, [8u8; 32].into()); + assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]); + + intermediate_set_child(&mut v, 6u8, [6u8; 32].into()); + assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]); + + intermediate_rm_child(&mut v, 42u8); + assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]); + + intermediate_rm_child(&mut v, 11u8); + assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]); + + intermediate_rm_child(&mut v, 6u8); + assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]); + + intermediate_set_child(&mut v, 6u8, [7u8; 32].into()); + assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [7u8; 32].into()), (12u8, [8u8; 32].into())]); +} diff --git a/src/table/table.rs b/src/table/table.rs index 366ce925..0e75754c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -8,6 +8,7 @@ use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use sled::Transactional; use garage_util::data::*; use garage_util::error::Error; @@ -18,6 +19,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::crdt::CRDT; +use crate::merkle::*; use crate::schema::*; use crate::table_sync::*; @@ -33,6 +35,7 @@ pub struct Table { pub system: Arc, pub store: sled::Tree, pub syncer: ArcSwapOption>, + merkle_updater: Arc, } #[derive(Serialize, Deserialize)] @@ -77,7 +80,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub async fn new( + pub fn new( instance: F, replication: R, system: Arc, @@ -85,11 +88,27 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc { - let store = db.open_tree(&name).expect("Unable to open DB tree"); + let store = db + .open_tree(&format!("{}:table", name)) + .expect("Unable to open DB tree"); + + let merkle_todo_store = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); + let merkle_tree_store = db + .open_tree(&format!("{}:merkle_tree", name)) + .expect("Unable to open DB Merkle tree tree"); let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); + let merkle_updater = MerkleUpdater::new( + name.clone(), + system.background.clone(), + merkle_todo_store, + merkle_tree_store, + ); + let table = Arc::new(Self { instance, replication, @@ -98,12 +117,15 @@ where system, store, syncer: ArcSwapOption::from(None), + merkle_updater, }); table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()).await; + let syncer = TableSyncer::launch(table.clone()); table.syncer.swap(Some(syncer)); + table.merkle_updater.launch(); + table } @@ -322,7 +344,7 @@ where Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs).await?; + self.handle_update(pairs)?; Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { @@ -380,53 +402,64 @@ where Ok(ret) } - pub async fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); + // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================ + pub fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { for update_bytes in entries.iter() { - let update = self.decode_entry(update_bytes.as_slice())?; - - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let (old_entry, new_entry) = self.store.transaction(|db| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; + self.update_entry(update_bytes.as_slice())?; + } + Ok(()) + } + pub(crate) fn update_entry(self: &Arc, update_bytes: &[u8]) -> Result<(), Error> { + let update = self.decode_entry(update_bytes)?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { + let (old_entry, new_entry) = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RMPEncode) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, new_entry)) - })?; - - if old_entry.as_ref() != Some(&new_entry) { - self.instance.updated(old_entry, Some(new_entry)); - syncer.invalidate(&tree_key[..]); + Ok(Some((old_entry, new_entry))) + } else { + Ok(None) } + })?; + + if let Some((old_entry, new_entry)) = changed { + self.instance.updated(old_entry, Some(new_entry)); + self.syncer.load_full().unwrap().invalidate(&tree_key[..]); } Ok(()) } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = self.store.transaction(|txn| { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { if let Some(cur_v) = txn.get(k)? { if cur_v == v { txn.remove(k)?; + mkl_todo.insert(k, vec![])?; return Ok(true); } } Ok(false) })?; + if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index c38b6bd5..51f8cd6f 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -106,7 +106,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) async fn launch(table: Arc>) -> Arc { + pub(crate) fn launch(table: Arc>) -> Arc { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), @@ -119,24 +119,16 @@ where let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let s1 = syncer.clone(); - table - .system - .background - .spawn_worker( - format!("table sync watcher for {}", table.name), - move |must_exit: watch::Receiver| s1.watcher_task(must_exit, busy_rx), - ) - .await; + table.system.background.spawn_worker( + format!("table sync watcher for {}", table.name), + move |must_exit: watch::Receiver| s1.watcher_task(must_exit, busy_rx), + ); let s2 = syncer.clone(); - table - .system - .background - .spawn_worker( - format!("table syncer for {}", table.name), - move |must_exit: watch::Receiver| s2.syncer_task(must_exit, busy_tx), - ) - .await; + table.system.background.spawn_worker( + format!("table syncer for {}", table.name), + move |must_exit: watch::Receiver| s2.syncer_task(must_exit, busy_tx), + ); let s3 = syncer.clone(); tokio::spawn(async move { @@ -630,7 +622,7 @@ where } } if diff_items.len() > 0 { - self.table.handle_update(&diff_items[..]).await?; + self.table.handle_update(&diff_items[..])?; } if items_to_send.len() > 0 { self.send_items(who, items_to_send).await?; -- cgit v1.2.3 From 94f3d287742ff90f179f528421c690b00b71a912 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 16:54:15 +0100 Subject: WIP big refactoring --- src/table/Cargo.toml | 1 - src/table/data.rs | 189 +++++++++++++++++++++++++ src/table/lib.rs | 4 +- src/table/merkle.rs | 39 ++++-- src/table/replication/fullcopy.rs | 59 ++++++++ src/table/replication/mod.rs | 6 + src/table/replication/parameters.rs | 22 +++ src/table/replication/sharded.rs | 54 ++++++++ src/table/table.rs | 265 +++++++----------------------------- src/table/table_fullcopy.rs | 59 -------- src/table/table_sharded.rs | 54 -------- src/table/table_sync.rs | 129 +++++++++--------- 12 files changed, 479 insertions(+), 402 deletions(-) create mode 100644 src/table/data.rs create mode 100644 src/table/replication/fullcopy.rs create mode 100644 src/table/replication/mod.rs create mode 100644 src/table/replication/parameters.rs create mode 100644 src/table/replication/sharded.rs delete mode 100644 src/table/table_fullcopy.rs delete mode 100644 src/table/table_sharded.rs (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6485a542..6b3aaceb 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -19,7 +19,6 @@ garage_rpc = { version = "0.1.1", path = "../rpc" } bytes = "0.4" rand = "0.7" hex = "0.3" -arc-swap = "0.4" log = "0.4" hexdump = "0.1" diff --git a/src/table/data.rs b/src/table/data.rs new file mode 100644 index 00000000..fa89fc27 --- /dev/null +++ b/src/table/data.rs @@ -0,0 +1,189 @@ +use std::sync::Arc; + +use log::warn; +use sled::Transactional; +use serde_bytes::ByteBuf; + +use garage_util::data::*; +use garage_util::error::*; +use garage_util::background::BackgroundRunner; + +use crate::schema::*; +use crate::merkle::*; +use crate::crdt::CRDT; + +pub struct TableData { + pub name: String, + pub instance: F, + + pub store: sled::Tree, + pub(crate) merkle_updater: Arc, +} + +impl TableData where F: TableSchema { + pub fn new( + name: String, + instance: F, + db: &sled::Db, + background: Arc, + ) -> Arc { + let store = db + .open_tree(&format!("{}:table", name)) + .expect("Unable to open DB tree"); + + let merkle_todo_store = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); + let merkle_tree_store = db + .open_tree(&format!("{}:merkle_tree", name)) + .expect("Unable to open DB Merkle tree tree"); + + let merkle_updater = MerkleUpdater::launch( + name.clone(), + background, + merkle_todo_store, + merkle_tree_store, + ); + + Arc::new(Self{ + name, + instance, + store, + merkle_updater, + }) + } + + // Read functions + + pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result, Error> { + let tree_key = self.tree_key(p, s); + if let Some(bytes) = self.store.get(&tree_key)? { + Ok(Some(ByteBuf::from(bytes.to_vec()))) + } else { + Ok(None) + } + } + + pub fn read_range( + &self, + p: &F::P, + s: &Option, + filter: &Option, + limit: usize, + ) -> Result>, Error> { + let partition_hash = p.hash(); + let first_key = match s { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(p, sk), + }; + let mut ret = vec![]; + for item in self.store.range(first_key..) { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let keep = match filter { + None => true, + Some(f) => { + let entry = self.decode_entry(value.as_ref())?; + F::matches_filter(&entry, f) + } + }; + if keep { + ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + } + if ret.len() >= limit { + break; + } + } + Ok(ret) + } + + // Mutation functions + + pub(crate) fn update_many(&self, entries: &[Arc]) -> Result<(), Error> { + for update_bytes in entries.iter() { + self.update_entry(update_bytes.as_slice())?; + } + Ok(()) + } + + pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { + let update = self.decode_entry(update_bytes)?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { + let (old_entry, new_entry) = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; + db.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry))) + } else { + Ok(None) + } + })?; + + if let Some((old_entry, new_entry)) = changed { + self.instance.updated(old_entry, Some(new_entry)); + //self.syncer.load_full().unwrap().invalidate(&tree_key[..]); + } + + Ok(()) + } + + pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { + if let Some(cur_v) = txn.get(k)? { + if cur_v == v { + txn.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(true); + } + } + Ok(false) + })?; + + if removed { + let old_entry = self.decode_entry(v)?; + self.instance.updated(Some(old_entry), None); + //self.syncer.load_full().unwrap().invalidate(k); + } + Ok(removed) + } + + pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { + let mut ret = p.hash().to_vec(); + ret.extend(s.sort_key()); + ret + } + + pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result { + match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { + Ok(x) => Ok(x), + Err(e) => match F::try_migrate(bytes) { + Some(x) => Ok(x), + None => { + warn!("Unable to decode entry of {}: {}", self.name, e); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } + Err(e.into()) + } + }, + } + } +} diff --git a/src/table/lib.rs b/src/table/lib.rs index 62fd30c5..bb249a56 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -8,9 +8,9 @@ pub mod schema; pub mod util; pub mod merkle; +pub mod replication; +pub mod data; pub mod table; -pub mod table_fullcopy; -pub mod table_sharded; pub mod table_sync; pub use schema::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 50cb90d5..ef197dc8 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -61,7 +61,7 @@ pub enum MerkleNode { } impl MerkleUpdater { - pub(crate) fn new( + pub(crate) fn launch( table_name: String, background: Arc, todo: sled::Tree, @@ -69,22 +69,22 @@ impl MerkleUpdater { ) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); - Arc::new(Self { + let ret = Arc::new(Self { table_name, background, todo, todo_notify: Notify::new(), merkle_tree, empty_node_hash, - }) - } + }); - pub(crate) fn launch(self: &Arc) { - let self2 = self.clone(); - self.background.spawn_worker( - format!("Merkle tree updater for {}", self.table_name), - |must_exit: watch::Receiver| self2.updater_loop(must_exit), + let ret2 = ret.clone(); + ret.background.spawn_worker( + format!("Merkle tree updater for {}", ret.table_name), + |must_exit: watch::Receiver| ret2.updater_loop(must_exit), ); + + ret } async fn updater_loop( @@ -156,28 +156,37 @@ impl MerkleUpdater { new_vhash: Option, ) -> ConflictableTransactionResult, Error> { let i = key.prefix.len(); + + // Read node at current position (defined by the prefix stored in key) + // Calculate an update to apply to this node + // This update is an Option<_>, so that it is None if the update is a no-op + // and we can thus skip recalculating and re-storing everything let mutate = match self.read_node_txn(tx, &key)? { MerkleNode::Empty => { if let Some(vhv) = new_vhash { Some(MerkleNode::Leaf(k.to_vec(), vhv)) } else { + // Nothing to do, keep empty node None } } MerkleNode::Intermediate(mut children) => { let key2 = key.next_key(khash); if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? { + // Subtree changed, update this node as well if subhash == self.empty_node_hash { intermediate_rm_child(&mut children, key2.prefix[i]); } else { intermediate_set_child(&mut children, key2.prefix[i], subhash); } + if children.len() == 0 { // should not happen warn!("Replacing intermediate node with empty node, should not happen."); Some(MerkleNode::Empty) } else if children.len() == 1 { - // move node down to this level + // We now have a single node (case when the update deleted one of only two + // children). Move that single child to this level of the tree. let key_sub = key.add_byte(children[0].0); let subnode = self.read_node_txn(tx, &key_sub)?; tx.remove(key_sub.encode())?; @@ -186,19 +195,23 @@ impl MerkleUpdater { Some(MerkleNode::Intermediate(children)) } } else { + // Subtree not changed, nothing to do None } } MerkleNode::Leaf(exlf_key, exlf_hash) => { if exlf_key == k { + // This leaf is for the same key that the one we are updating match new_vhash { Some(vhv) if vhv == exlf_hash => None, Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)), None => Some(MerkleNode::Empty), } } else { + // This is an only leaf for another key if let Some(vhv) = new_vhash { - // Create two sub-nodes and replace by intermediary node + // Move that other key to a subnode, create another subnode for our + // insertion and replace current node by an intermediary node let (pos1, h1) = { let key2 = key.next_key(blake2sum(&exlf_key[..])); let subhash = @@ -216,6 +229,9 @@ impl MerkleUpdater { intermediate_set_child(&mut int, pos2, h2); Some(MerkleNode::Intermediate(int)) } else { + // Nothing to do, we don't want to insert this value because it is None, + // and we don't want to change the other value because it's for something + // else None } } @@ -263,6 +279,7 @@ impl MerkleUpdater { } } + // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node( &self, k: &MerkleNodeKey, diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs new file mode 100644 index 00000000..a62a6c3c --- /dev/null +++ b/src/table/replication/fullcopy.rs @@ -0,0 +1,59 @@ +use std::sync::Arc; + +use garage_rpc::membership::System; +use garage_rpc::ring::Ring; +use garage_util::data::*; + +use crate::replication::*; + +#[derive(Clone)] +pub struct TableFullReplication { + pub max_faults: usize, +} + +#[derive(Clone)] +struct Neighbors { + ring: Arc, + neighbors: Vec, +} + +impl TableFullReplication { + pub fn new(max_faults: usize) -> Self { + TableFullReplication { max_faults } + } +} + +impl TableReplication for TableFullReplication { + // Full replication schema: all nodes store everything + // Writes are disseminated in an epidemic manner in the network + + // Advantage: do all reads locally, extremely fast + // Inconvenient: only suitable to reasonably small tables + + fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec { + vec![system.id] + } + fn read_quorum(&self) -> usize { + 1 + } + + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec { + self.replication_nodes(hash, system.ring.borrow().as_ref()) + } + fn write_quorum(&self, system: &System) -> usize { + system.ring.borrow().config.members.len() - self.max_faults + } + fn max_write_errors(&self) -> usize { + self.max_faults + } + + fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec { + ring.config.members.keys().cloned().collect::>() + } + fn split_points(&self, _ring: &Ring) -> Vec { + let mut ret = vec![]; + ret.push([0u8; 32].into()); + ret.push([0xFFu8; 32].into()); + ret + } +} diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs new file mode 100644 index 00000000..d43d7f19 --- /dev/null +++ b/src/table/replication/mod.rs @@ -0,0 +1,6 @@ +mod parameters; + +pub mod fullcopy; +pub mod sharded; + +pub use parameters::*; diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs new file mode 100644 index 00000000..4607b050 --- /dev/null +++ b/src/table/replication/parameters.rs @@ -0,0 +1,22 @@ +use garage_rpc::membership::System; +use garage_rpc::ring::Ring; + +use garage_util::data::*; + +pub trait TableReplication: Send + Sync { + // See examples in table_sharded.rs and table_fullcopy.rs + // To understand various replication methods + + // Which nodes to send reads from + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec; + fn read_quorum(&self) -> usize; + + // Which nodes to send writes to + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; + fn write_quorum(&self, system: &System) -> usize; + fn max_write_errors(&self) -> usize; + + // Which are the nodes that do actually replicate the data + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; + fn split_points(&self, ring: &Ring) -> Vec; +} diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs new file mode 100644 index 00000000..42a742cd --- /dev/null +++ b/src/table/replication/sharded.rs @@ -0,0 +1,54 @@ +use garage_rpc::membership::System; +use garage_rpc::ring::Ring; +use garage_util::data::*; + +use crate::replication::*; + +#[derive(Clone)] +pub struct TableShardedReplication { + pub replication_factor: usize, + pub read_quorum: usize, + pub write_quorum: usize, +} + +impl TableReplication for TableShardedReplication { + // Sharded replication schema: + // - based on the ring of nodes, a certain set of neighbors + // store entries, given as a function of the position of the + // entry's hash in the ring + // - reads are done on all of the nodes that replicate the data + // - writes as well + + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec { + let ring = system.ring.borrow().clone(); + ring.walk_ring(&hash, self.replication_factor) + } + fn read_quorum(&self) -> usize { + self.read_quorum + } + + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec { + let ring = system.ring.borrow().clone(); + ring.walk_ring(&hash, self.replication_factor) + } + fn write_quorum(&self, _system: &System) -> usize { + self.write_quorum + } + fn max_write_errors(&self) -> usize { + self.replication_factor - self.write_quorum + } + + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec { + ring.walk_ring(&hash, self.replication_factor) + } + fn split_points(&self, ring: &Ring) -> Vec { + let mut ret = vec![]; + + ret.push([0u8; 32].into()); + for entry in ring.ring.iter() { + ret.push(entry.location); + } + ret.push([0xFFu8; 32].into()); + ret + } +} diff --git a/src/table/table.rs b/src/table/table.rs index 0e75754c..a4cb4b24 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,40 +2,35 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; -use log::warn; - -use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use sled::Transactional; use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::crdt::CRDT; -use crate::merkle::*; +use crate::data::*; use crate::schema::*; use crate::table_sync::*; +use crate::replication::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct Table { - pub instance: F, +pub struct TableAux { + pub system: Arc, pub replication: R, - - pub name: String, pub(crate) rpc_client: Arc>>, +} - pub system: Arc, - pub store: sled::Tree, - pub syncer: ArcSwapOption>, - merkle_updater: Arc, +pub struct Table { + pub data: Arc>, + pub aux: Arc>, + pub syncer: Arc>, } #[derive(Serialize, Deserialize)] @@ -55,23 +50,6 @@ pub(crate) enum TableRPC { impl RpcMessage for TableRPC {} -pub trait TableReplication: Send + Sync { - // See examples in table_sharded.rs and table_fullcopy.rs - // To understand various replication methods - - // Which nodes to send reads from - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn read_quorum(&self) -> usize; - - // Which nodes to send writes to - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn write_quorum(&self, system: &System) -> usize; - fn max_write_errors(&self) -> usize; - - // Which are the nodes that do actually replicate the data - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; - fn split_points(&self, ring: &Ring) -> Vec; -} impl Table where @@ -88,60 +66,51 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc { - let store = db - .open_tree(&format!("{}:table", name)) - .expect("Unable to open DB tree"); - - let merkle_todo_store = db - .open_tree(&format!("{}:merkle_todo", name)) - .expect("Unable to open DB Merkle TODO tree"); - let merkle_tree_store = db - .open_tree(&format!("{}:merkle_tree", name)) - .expect("Unable to open DB Merkle tree tree"); - let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); - let merkle_updater = MerkleUpdater::new( - name.clone(), + let data = TableData::new( + name, + instance, + db, system.background.clone(), - merkle_todo_store, - merkle_tree_store, ); - let table = Arc::new(Self { - instance, + let aux = Arc::new(TableAux{ + system, replication, - name, rpc_client, - system, - store, - syncer: ArcSwapOption::from(None), - merkle_updater, }); - table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()); - table.syncer.swap(Some(syncer)); + let syncer = TableSyncer::launch( + data.clone(), + aux.clone(), + ); - table.merkle_updater.launch(); + let table = Arc::new(Self { + data, + aux, + syncer, + }); + + table.clone().register_handler(rpc_server, rpc_path); table } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.aux.replication.write_nodes(&hash, &self.aux.system); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::::Update(vec![e_enc]); - self.rpc_client + self.aux.rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) + RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system)) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -153,7 +122,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.aux.replication.write_nodes(&hash, &self.aux.system); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -166,7 +135,7 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); - let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); @@ -177,7 +146,7 @@ where errors.push(e); } } - if errors.len() > self.replication.max_write_errors() { + if errors.len() > self.aux.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -190,16 +159,17 @@ where sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.aux.replication.read_nodes(&hash, &self.aux.system); //eprintln!("get who: {:?}", who); let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self + .aux .rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.aux.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -210,7 +180,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = self.decode_entry(v_bytes.as_slice())?; + let v = self.data.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -230,7 +200,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system + self.aux.system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -246,16 +216,16 @@ where limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.aux.replication.read_nodes(&hash, &self.aux.system); let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .rpc_client + .aux.rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.aux.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -266,8 +236,8 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = self.decode_entry(entry_bytes.as_slice())?; - let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + let entry = self.data.decode_entry(entry_bytes.as_slice())?; + let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { ret.insert(entry_key, Some(entry)); @@ -287,7 +257,7 @@ where } if !to_repair.is_empty() { let self2 = self.clone(); - self.system.background.spawn_cancellable(async move { + self.aux.system.background.spawn_cancellable(async move { for (_, v) in to_repair.iter_mut() { self2.repair_on_read(&who[..], v.take().unwrap()).await?; } @@ -306,7 +276,7 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.rpc_client + self.aux.rpc_client .try_call_many( &who[..], TableRPC::::Update(vec![what_enc]), @@ -326,8 +296,8 @@ where }); let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { + self.aux.rpc_client + .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); @@ -336,157 +306,24 @@ where async fn handle(self: &Arc, msg: &TableRPC) -> Result, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { - let value = self.handle_read_entry(key, sort_key)?; + let value = self.data.read_entry(key, sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; + let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs)?; + self.data.update_many(pairs)?; Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let syncer = self.syncer.load_full().unwrap(); - let response = syncer - .handle_rpc(rpc, self.system.background.stop_signal.clone()) + let response = self.syncer + .handle_rpc(rpc, self.aux.system.background.stop_signal.clone()) .await?; Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), } } - - fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result, Error> { - let tree_key = self.tree_key(p, s); - if let Some(bytes) = self.store.get(&tree_key)? { - Ok(Some(ByteBuf::from(bytes.to_vec()))) - } else { - Ok(None) - } - } - - fn handle_read_range( - &self, - p: &F::P, - s: &Option, - filter: &Option, - limit: usize, - ) -> Result>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; - let mut ret = vec![]; - for item in self.store.range(first_key..) { - let (key, value) = item?; - if &key[..32] != partition_hash.as_slice() { - break; - } - let keep = match filter { - None => true, - Some(f) => { - let entry = self.decode_entry(value.as_ref())?; - F::matches_filter(&entry, f) - } - }; - if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); - } - if ret.len() >= limit { - break; - } - } - Ok(ret) - } - - // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================ - - pub fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { - for update_bytes in entries.iter() { - self.update_entry(update_bytes.as_slice())?; - } - Ok(()) - } - - pub(crate) fn update_entry(self: &Arc, update_bytes: &[u8]) -> Result<(), Error> { - let update = self.decode_entry(update_bytes)?; - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; - - if Some(&new_entry) != old_entry.as_ref() { - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; - db.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry))) - } else { - Ok(None) - } - })?; - - if let Some((old_entry, new_entry)) = changed { - self.instance.updated(old_entry, Some(new_entry)); - self.syncer.load_full().unwrap().invalidate(&tree_key[..]); - } - - Ok(()) - } - - pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { - if let Some(cur_v) = txn.get(k)? { - if cur_v == v { - txn.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); - } - } - Ok(false) - })?; - - if removed { - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); - self.syncer.load_full().unwrap().invalidate(k); - } - Ok(removed) - } - - fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { - let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); - ret - } - - fn decode_entry(&self, bytes: &[u8]) -> Result { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => { - warn!("Unable to decode entry of {}: {}", self.name, e); - for line in hexdump::hexdump_iter(bytes) { - debug!("{}", line); - } - Err(e.into()) - } - }, - } - } } diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs deleted file mode 100644 index c55879d8..00000000 --- a/src/table/table_fullcopy.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::sync::Arc; - -use garage_rpc::membership::System; -use garage_rpc::ring::Ring; -use garage_util::data::*; - -use crate::*; - -#[derive(Clone)] -pub struct TableFullReplication { - pub max_faults: usize, -} - -#[derive(Clone)] -struct Neighbors { - ring: Arc, - neighbors: Vec, -} - -impl TableFullReplication { - pub fn new(max_faults: usize) -> Self { - TableFullReplication { max_faults } - } -} - -impl TableReplication for TableFullReplication { - // Full replication schema: all nodes store everything - // Writes are disseminated in an epidemic manner in the network - - // Advantage: do all reads locally, extremely fast - // Inconvenient: only suitable to reasonably small tables - - fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec { - vec![system.id] - } - fn read_quorum(&self) -> usize { - 1 - } - - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec { - self.replication_nodes(hash, system.ring.borrow().as_ref()) - } - fn write_quorum(&self, system: &System) -> usize { - system.ring.borrow().config.members.len() - self.max_faults - } - fn max_write_errors(&self) -> usize { - self.max_faults - } - - fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec { - ring.config.members.keys().cloned().collect::>() - } - fn split_points(&self, _ring: &Ring) -> Vec { - let mut ret = vec![]; - ret.push([0u8; 32].into()); - ret.push([0xFFu8; 32].into()); - ret - } -} diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs deleted file mode 100644 index 47bdfeaf..00000000 --- a/src/table/table_sharded.rs +++ /dev/null @@ -1,54 +0,0 @@ -use garage_rpc::membership::System; -use garage_rpc::ring::Ring; -use garage_util::data::*; - -use crate::*; - -#[derive(Clone)] -pub struct TableShardedReplication { - pub replication_factor: usize, - pub read_quorum: usize, - pub write_quorum: usize, -} - -impl TableReplication for TableShardedReplication { - // Sharded replication schema: - // - based on the ring of nodes, a certain set of neighbors - // store entries, given as a function of the position of the - // entry's hash in the ring - // - reads are done on all of the nodes that replicate the data - // - writes as well - - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec { - let ring = system.ring.borrow().clone(); - ring.walk_ring(&hash, self.replication_factor) - } - fn read_quorum(&self) -> usize { - self.read_quorum - } - - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec { - let ring = system.ring.borrow().clone(); - ring.walk_ring(&hash, self.replication_factor) - } - fn write_quorum(&self, _system: &System) -> usize { - self.write_quorum - } - fn max_write_errors(&self) -> usize { - self.replication_factor - self.write_quorum - } - - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec { - ring.walk_ring(&hash, self.replication_factor) - } - fn split_points(&self, ring: &Ring) -> Vec { - let mut ret = vec![]; - - ret.push([0u8; 32].into()); - for entry in ring.ring.iter() { - ret.push(entry.location); - } - ret.push([0xFFu8; 32].into()); - ret - } -} diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 51f8cd6f..7394be1b 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -16,18 +16,22 @@ use garage_util::data::*; use garage_util::error::Error; use crate::*; +use crate::data::*; +use crate::replication::*; const MAX_DEPTH: usize = 16; + const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); -// Scan & sync every 12 hours -const SCAN_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); +// Do anti-entropy every 10 minutes +const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60); -// Expire cache after 30 minutes -const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(30 * 60); +const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60); pub struct TableSyncer { - table: Arc>, + data: Arc>, + aux: Arc>, + todo: Mutex, cache: Vec>>, } @@ -106,10 +110,13 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(table: Arc>) -> Arc { - let todo = SyncTodo { todo: Vec::new() }; - let syncer = Arc::new(TableSyncer { - table: table.clone(), + pub(crate) fn launch(data: Arc>, + aux: Arc>) -> Arc { + let todo = SyncTodo{ todo: vec![] }; + + let syncer = Arc::new(Self { + data: data.clone(), + aux: aux.clone(), todo: Mutex::new(todo), cache: (0..MAX_DEPTH) .map(|_| Mutex::new(BTreeMap::new())) @@ -119,21 +126,21 @@ where let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let s1 = syncer.clone(); - table.system.background.spawn_worker( - format!("table sync watcher for {}", table.name), + aux.system.background.spawn_worker( + format!("table sync watcher for {}", data.name), move |must_exit: watch::Receiver| s1.watcher_task(must_exit, busy_rx), ); let s2 = syncer.clone(); - table.system.background.spawn_worker( - format!("table syncer for {}", table.name), + aux.system.background.spawn_worker( + format!("table syncer for {}", data.name), move |must_exit: watch::Receiver| s2.syncer_task(must_exit, busy_tx), ); let s3 = syncer.clone(); tokio::spawn(async move { tokio::time::delay_for(Duration::from_secs(20)).await; - s3.add_full_scan().await; + s3.add_full_scan(); }); syncer @@ -144,8 +151,8 @@ where mut must_exit: watch::Receiver, mut busy_rx: mpsc::UnboundedReceiver, ) -> Result<(), Error> { - let mut prev_ring: Arc = self.table.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver> = self.table.system.ring.clone(); + let mut prev_ring: Arc = self.aux.system.ring.borrow().clone(); + let mut ring_recv: watch::Receiver> = self.aux.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { @@ -158,8 +165,8 @@ where select! { new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { - debug!("({}) Adding ring difference to syncer todo list", self.table.name); - self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring); + debug!("({}) Adding ring difference to syncer todo list", self.data.name); + self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux); prev_ring = new_ring; } } @@ -182,8 +189,8 @@ where _ = s_timeout => { if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; - debug!("({}) Adding full scan to syncer todo list", self.table.name); - self.add_full_scan().await; + debug!("({}) Adding full scan to syncer todo list", self.data.name); + self.add_full_scan(); } } } @@ -191,8 +198,8 @@ where Ok(()) } - pub async fn add_full_scan(&self) { - self.todo.lock().unwrap().add_full_scan(&self.table); + pub fn add_full_scan(&self) { + self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux); } async fn syncer_task( @@ -211,7 +218,7 @@ where if let Err(e) = res { warn!( "({}) Error while syncing {:?}: {}", - self.table.name, partition, e + self.data.name, partition, e ); } } else { @@ -228,18 +235,18 @@ where must_exit: &mut watch::Receiver, ) -> Result<(), Error> { if partition.retain { - let my_id = self.table.system.id; + let my_id = self.aux.system.id; let nodes = self - .table + .aux .replication - .write_nodes(&partition.begin, &self.table.system) + .write_nodes(&partition.begin, &self.aux.system) .into_iter() .filter(|node| *node != my_id) .collect::>(); debug!( "({}) Preparing to sync {:?} with {:?}...", - self.table.name, partition, nodes + self.data.name, partition, nodes ); let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?; @@ -259,10 +266,10 @@ where while let Some(r) = sync_futures.next().await { if let Err(e) = r { n_errors += 1; - warn!("({}) Sync error: {}", self.table.name, e); + warn!("({}) Sync error: {}", self.data.name, e); } } - if n_errors > self.table.replication.max_write_errors() { + if n_errors > self.aux.replication.max_write_errors() { return Err(Error::Message(format!( "Sync failed with too many nodes (should have been: {:?}).", nodes @@ -293,7 +300,7 @@ where while !*must_exit.borrow() { let mut items = Vec::new(); - for item in self.table.store.range(begin.to_vec()..end.to_vec()) { + for item in self.data.store.range(begin.to_vec()..end.to_vec()) { let (key, value) = item?; items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); @@ -304,12 +311,12 @@ where if items.len() > 0 { let nodes = self - .table + .aux .replication - .write_nodes(&begin, &self.table.system) + .write_nodes(&begin, &self.aux.system) .into_iter() .collect::>(); - if nodes.contains(&self.table.system.id) { + if nodes.contains(&self.aux.system.id) { warn!("Interrupting offload as partitions seem to have changed"); break; } @@ -340,7 +347,7 @@ where let update_msg = Arc::new(TableRPC::::Update(values)); for res in join_all(nodes.iter().map(|to| { - self.table + self.aux .rpc_client .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) })) @@ -352,7 +359,7 @@ where // All remote nodes have written those items, now we can delete them locally let mut not_removed = 0; for (k, v) in items.iter() { - if !self.table.delete_if_equal(&k[..], &v[..])? { + if !self.data.delete_if_equal(&k[..], &v[..])? { not_removed += 1; } } @@ -399,7 +406,7 @@ where if range.level == 1 { let mut children = vec![]; for item in self - .table + .data .store .range(range.begin.clone()..range.end.clone()) { @@ -516,7 +523,7 @@ where let v = self.range_checksum(&range, must_exit)?; trace!( "({}) New checksum calculated for {}-{}/{}, {} children", - self.table.name, + self.data.name, hex::encode(&range.begin) .chars() .take(16) @@ -553,7 +560,7 @@ where // If their root checksum has level > than us, use that as a reference let root_cks_resp = self - .table + .aux .rpc_client .call( who, @@ -582,7 +589,7 @@ where let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); trace!( "({}) Sync with {:?}: {} ({}) remaining", - self.table.name, + self.data.name, who, todo.len(), total_children @@ -592,7 +599,7 @@ where let step = todo.drain(..step_size).collect::>(); let rpc_resp = self - .table + .aux .rpc_client .call( who, @@ -606,7 +613,7 @@ where if diff_ranges.len() > 0 || diff_items.len() > 0 { info!( "({}) Sync with {:?}: difference {} ranges, {} items", - self.table.name, + self.data.name, who, diff_ranges.len(), diff_items.len() @@ -622,7 +629,7 @@ where } } if diff_items.len() > 0 { - self.table.handle_update(&diff_items[..])?; + self.data.update_many(&diff_items[..])?; } if items_to_send.len() > 0 { self.send_items(who, items_to_send).await?; @@ -640,19 +647,19 @@ where async fn send_items(&self, who: UUID, item_list: Vec>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", - self.table.name, + self.data.name, item_list.len(), who ); let mut values = vec![]; for item in item_list.iter() { - if let Some(v) = self.table.store.get(&item[..])? { + if let Some(v) = self.data.store.get(&item[..])? { values.push(Arc::new(ByteBuf::from(v.as_ref()))); } } let rpc_resp = self - .table + .aux .rpc_client .call(who, TableRPC::::Update(values), TABLE_SYNC_RPC_TIMEOUT) .await?; @@ -714,7 +721,7 @@ where ret_ranges.push(their_range.clone()); if their_range.level == 0 { if let Some(item_bytes) = - self.table.store.get(their_range.begin.as_slice())? + self.data.store.get(their_range.begin.as_slice())? { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } @@ -738,7 +745,7 @@ where } if our_range.level == 0 { if let Some(item_bytes) = - self.table.store.get(our_range.begin.as_slice())? + self.data.store.get(our_range.begin.as_slice())? { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } @@ -753,7 +760,7 @@ where if ret_ranges.len() > 0 || ret_items.len() > 0 { trace!( "({}) Checksum comparison RPC: {} different + {} items for {} received", - self.table.name, + self.data.name, ret_ranges.len(), ret_items.len(), n_checksums @@ -782,13 +789,13 @@ where } impl SyncTodo { - fn add_full_scan(&mut self, table: &Table) { - let my_id = table.system.id; + fn add_full_scan(&mut self, data: &TableData, aux: &TableAux) { + let my_id = aux.system.id; self.todo.clear(); - let ring = table.system.ring.borrow().clone(); - let split_points = table.replication.split_points(&ring); + let ring = aux.system.ring.borrow().clone(); + let split_points = aux.replication.split_points(&ring); for i in 0..split_points.len() - 1 { let begin = split_points[i]; @@ -797,12 +804,12 @@ impl SyncTodo { continue; } - let nodes = table.replication.replication_nodes(&begin, &ring); + let nodes = aux.replication.replication_nodes(&begin, &ring); let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if table.store.range(begin..end).next().is_none() { + if data.store.range(begin..end).next().is_none() { continue; } } @@ -813,25 +820,25 @@ impl SyncTodo { fn add_ring_difference( &mut self, - table: &Table, old_ring: &Ring, new_ring: &Ring, + data: &TableData, aux: &TableAux, ) { - let my_id = table.system.id; + let my_id = aux.system.id; // If it is us who are entering or leaving the system, // initiate a full sync instead of incremental sync if old_ring.config.members.contains_key(&my_id) != new_ring.config.members.contains_key(&my_id) { - self.add_full_scan(table); + self.add_full_scan(data, aux); return; } let mut all_points = None .into_iter() - .chain(table.replication.split_points(old_ring).drain(..)) - .chain(table.replication.split_points(new_ring).drain(..)) + .chain(aux.replication.split_points(old_ring).drain(..)) + .chain(aux.replication.split_points(new_ring).drain(..)) .chain(self.todo.iter().map(|x| x.begin)) .chain(self.todo.iter().map(|x| x.end)) .collect::>(); @@ -845,11 +852,11 @@ impl SyncTodo { for i in 0..all_points.len() - 1 { let begin = all_points[i]; let end = all_points[i + 1]; - let was_ours = table + let was_ours = aux .replication .replication_nodes(&begin, &old_ring) .contains(&my_id); - let is_ours = table + let is_ours = aux .replication .replication_nodes(&begin, &new_ring) .contains(&my_id); -- cgit v1.2.3 From 046b649bcc3b147140fc2b0af0e071d3dd1b2c8d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 18:28:03 +0100 Subject: (not well tested) use merkle tree for sync --- src/table/data.rs | 15 +- src/table/lib.rs | 4 +- src/table/merkle.rs | 107 ++++- src/table/replication/fullcopy.rs | 1 - src/table/replication/sharded.rs | 6 +- src/table/sync.rs | 632 +++++++++++++++++++++++++++ src/table/table.rs | 50 +-- src/table/table_sync.rs | 898 -------------------------------------- 8 files changed, 752 insertions(+), 961 deletions(-) create mode 100644 src/table/sync.rs delete mode 100644 src/table/table_sync.rs (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index fa89fc27..6217bf6d 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -1,16 +1,16 @@ use std::sync::Arc; use log::warn; -use sled::Transactional; use serde_bytes::ByteBuf; +use sled::Transactional; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; -use garage_util::background::BackgroundRunner; -use crate::schema::*; -use crate::merkle::*; use crate::crdt::CRDT; +use crate::merkle::*; +use crate::schema::*; pub struct TableData { pub name: String, @@ -20,7 +20,10 @@ pub struct TableData { pub(crate) merkle_updater: Arc, } -impl TableData where F: TableSchema { +impl TableData +where + F: TableSchema, +{ pub fn new( name: String, instance: F, @@ -45,7 +48,7 @@ impl TableData where F: TableSchema { merkle_tree_store, ); - Arc::new(Self{ + Arc::new(Self { name, instance, store, diff --git a/src/table/lib.rs b/src/table/lib.rs index bb249a56..18c29c35 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -7,11 +7,11 @@ pub mod crdt; pub mod schema; pub mod util; +pub mod data; pub mod merkle; pub mod replication; -pub mod data; +pub mod sync; pub mod table; -pub mod table_sync; pub use schema::*; pub use table::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index ef197dc8..92c18e09 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -15,6 +15,19 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +pub type MerklePartition = [u8; 2]; + +pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash { + let mut partition_pos = [0u8; 32]; + partition_pos[0..2].copy_from_slice(&p[..]); + partition_pos.into() +} + +pub fn hash_of_merkle_partition_opt(p: Option) -> Hash { + p.map(hash_of_merkle_partition) + .unwrap_or([0xFFu8; 32].into()) +} + // This modules partitions the data in 2**16 partitions, based on the top // 16 bits (two bytes) of item's partition keys' hashes. // It builds one Merkle tree for each of these 2**16 partitions. @@ -37,10 +50,10 @@ pub(crate) struct MerkleUpdater { empty_node_hash: Hash, } -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] pub struct MerkleNodeKey { // partition: first 16 bits (two bytes) of the partition_key's hash - pub partition: [u8; 2], + pub partition: MerklePartition, // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) pub prefix: Vec, @@ -214,8 +227,11 @@ impl MerkleUpdater { // insertion and replace current node by an intermediary node let (pos1, h1) = { let key2 = key.next_key(blake2sum(&exlf_key[..])); - let subhash = - self.put_node_txn(tx, &key2, &MerkleNode::Leaf(exlf_key, exlf_hash))?; + let subhash = self.put_node_txn( + tx, + &key2, + &MerkleNode::Leaf(exlf_key, exlf_hash), + )?; (key2.prefix[i], subhash) }; let (pos2, h2) = { @@ -280,14 +296,11 @@ impl MerkleUpdater { } // Access a node in the Merkle tree, used by the sync protocol - pub(crate) fn read_node( - &self, - k: &MerkleNodeKey, - ) -> Result { + pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result { let ent = self.merkle_tree.get(k.encode())?; match ent { None => Ok(MerkleNode::Empty), - Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?) + Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), } } } @@ -339,31 +352,77 @@ fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) { #[test] fn test_intermediate_aux() { let mut v = vec![]; - + intermediate_set_child(&mut v, 12u8, [12u8; 32].into()); - assert!(v == vec![(12u8, [12u8; 32].into())]); - + assert_eq!(v, vec![(12u8, [12u8; 32].into())]); + intermediate_set_child(&mut v, 42u8, [42u8; 32].into()); - assert!(v == vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]); - + assert_eq!( + v, + vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())] + ); + intermediate_set_child(&mut v, 4u8, [4u8; 32].into()); - assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]); - + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (12u8, [12u8; 32].into()), + (42u8, [42u8; 32].into()) + ] + ); + intermediate_set_child(&mut v, 12u8, [8u8; 32].into()); - assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]); - + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (12u8, [8u8; 32].into()), + (42u8, [42u8; 32].into()) + ] + ); + intermediate_set_child(&mut v, 6u8, [6u8; 32].into()); - assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [6u8; 32].into()), + (12u8, [8u8; 32].into()), + (42u8, [42u8; 32].into()) + ] + ); intermediate_rm_child(&mut v, 42u8); - assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [6u8; 32].into()), + (12u8, [8u8; 32].into()) + ] + ); intermediate_rm_child(&mut v, 11u8); - assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [6u8; 32].into()), + (12u8, [8u8; 32].into()) + ] + ); intermediate_rm_child(&mut v, 6u8); - assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]); - + assert_eq!(v, vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]); + intermediate_set_child(&mut v, 6u8, [7u8; 32].into()); - assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [7u8; 32].into()), (12u8, [8u8; 32].into())]); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [7u8; 32].into()), + (12u8, [8u8; 32].into()) + ] + ); } diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index a62a6c3c..a20f20b7 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -53,7 +53,6 @@ impl TableReplication for TableFullReplication { fn split_points(&self, _ring: &Ring) -> Vec { let mut ret = vec![]; ret.push([0u8; 32].into()); - ret.push([0xFFu8; 32].into()); ret } } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 42a742cd..886c7c08 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -44,11 +44,13 @@ impl TableReplication for TableShardedReplication { fn split_points(&self, ring: &Ring) -> Vec { let mut ret = vec![]; - ret.push([0u8; 32].into()); for entry in ring.ring.iter() { ret.push(entry.location); } - ret.push([0xFFu8; 32].into()); + if ret.len() > 0 { + assert_eq!(ret[0], [0u8; 32].into()); + } + ret } } diff --git a/src/table/sync.rs b/src/table/sync.rs new file mode 100644 index 00000000..9c37c286 --- /dev/null +++ b/src/table/sync.rs @@ -0,0 +1,632 @@ +use std::collections::VecDeque; +use std::convert::TryInto; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use futures::future::join_all; +use futures::{pin_mut, select}; +use futures_util::future::*; +use futures_util::stream::*; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; +use tokio::sync::{mpsc, watch}; + +use garage_rpc::ring::Ring; +use garage_util::data::*; +use garage_util::error::Error; + +use crate::data::*; +use crate::merkle::*; +use crate::replication::*; +use crate::*; + +const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); + +// Do anti-entropy every 10 minutes +const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); + +pub struct TableSyncer { + data: Arc>, + aux: Arc>, + + todo: Mutex, +} + +type RootCk = Vec<(MerklePartition, Hash)>; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct PartitionRange { + begin: MerklePartition, + // if end is None, go all the way to partition 0xFFFF included + end: Option, +} + +#[derive(Serialize, Deserialize)] +pub(crate) enum SyncRPC { + RootCkHash(PartitionRange, Hash), + RootCkList(PartitionRange, RootCk), + CkNoDifference, + GetNode(MerkleNodeKey), + Node(MerkleNodeKey, MerkleNode), + Items(Vec>), +} + +struct SyncTodo { + todo: Vec, +} + +#[derive(Debug, Clone)] +struct TodoPartition { + range: PartitionRange, + + // Are we a node that stores this partition or not? + retain: bool, +} + +impl TableSyncer +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub(crate) fn launch(data: Arc>, aux: Arc>) -> Arc { + let todo = SyncTodo { todo: vec![] }; + + let syncer = Arc::new(Self { + data: data.clone(), + aux: aux.clone(), + todo: Mutex::new(todo), + }); + + let (busy_tx, busy_rx) = mpsc::unbounded_channel(); + + let s1 = syncer.clone(); + aux.system.background.spawn_worker( + format!("table sync watcher for {}", data.name), + move |must_exit: watch::Receiver| s1.watcher_task(must_exit, busy_rx), + ); + + let s2 = syncer.clone(); + aux.system.background.spawn_worker( + format!("table syncer for {}", data.name), + move |must_exit: watch::Receiver| s2.syncer_task(must_exit, busy_tx), + ); + + let s3 = syncer.clone(); + tokio::spawn(async move { + tokio::time::delay_for(Duration::from_secs(20)).await; + s3.add_full_sync(); + }); + + syncer + } + + async fn watcher_task( + self: Arc, + mut must_exit: watch::Receiver, + mut busy_rx: mpsc::UnboundedReceiver, + ) -> Result<(), Error> { + let mut ring_recv: watch::Receiver> = self.aux.system.ring.clone(); + let mut nothing_to_do_since = Some(Instant::now()); + + while !*must_exit.borrow() { + let s_ring_recv = ring_recv.recv().fuse(); + let s_busy = busy_rx.recv().fuse(); + let s_must_exit = must_exit.recv().fuse(); + let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); + pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); + + select! { + new_ring_r = s_ring_recv => { + if new_ring_r.is_some() { + debug!("({}) Adding ring difference to syncer todo list", self.data.name); + self.add_full_sync(); + } + } + busy_opt = s_busy => { + if let Some(busy) = busy_opt { + if busy { + nothing_to_do_since = None; + } else { + if nothing_to_do_since.is_none() { + nothing_to_do_since = Some(Instant::now()); + } + } + } + } + must_exit_v = s_must_exit => { + if must_exit_v.unwrap_or(false) { + break; + } + } + _ = s_timeout => { + if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { + nothing_to_do_since = None; + debug!("({}) Adding full sync to syncer todo list", self.data.name); + self.add_full_sync(); + } + } + } + } + Ok(()) + } + + pub fn add_full_sync(&self) { + self.todo + .lock() + .unwrap() + .add_full_sync(&self.data, &self.aux); + } + + async fn syncer_task( + self: Arc, + mut must_exit: watch::Receiver, + busy_tx: mpsc::UnboundedSender, + ) -> Result<(), Error> { + while !*must_exit.borrow() { + let task = self.todo.lock().unwrap().pop_task(); + if let Some(partition) = task { + busy_tx.send(true)?; + let res = self + .clone() + .sync_partition(&partition, &mut must_exit) + .await; + if let Err(e) = res { + warn!( + "({}) Error while syncing {:?}: {}", + self.data.name, partition, e + ); + } + } else { + busy_tx.send(false)?; + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } + Ok(()) + } + + async fn sync_partition( + self: Arc, + partition: &TodoPartition, + must_exit: &mut watch::Receiver, + ) -> Result<(), Error> { + if partition.retain { + let my_id = self.aux.system.id; + + let nodes = self + .aux + .replication + .write_nodes( + &hash_of_merkle_partition(partition.range.begin), + &self.aux.system, + ) + .into_iter() + .filter(|node| *node != my_id) + .collect::>(); + + debug!( + "({}) Syncing {:?} with {:?}...", + self.data.name, partition, nodes + ); + let mut sync_futures = nodes + .iter() + .map(|node| { + self.clone() + .do_sync_with(partition.clone(), *node, must_exit.clone()) + }) + .collect::>(); + + let mut n_errors = 0; + while let Some(r) = sync_futures.next().await { + if let Err(e) = r { + n_errors += 1; + warn!("({}) Sync error: {}", self.data.name, e); + } + } + if n_errors > self.aux.replication.max_write_errors() { + return Err(Error::Message(format!( + "Sync failed with too many nodes (should have been: {:?}).", + nodes + ))); + } + } else { + self.offload_partition( + &hash_of_merkle_partition(partition.range.begin), + &hash_of_merkle_partition_opt(partition.range.end), + must_exit, + ) + .await?; + } + + Ok(()) + } + + // Offload partition: this partition is not something we are storing, + // so send it out to all other nodes that store it and delete items locally. + // We don't bother checking if the remote nodes already have the items, + // we just batch-send everything. Offloading isn't supposed to happen very often. + // If any of the nodes that are supposed to store the items is unable to + // save them, we interrupt the process. + async fn offload_partition( + self: &Arc, + begin: &Hash, + end: &Hash, + must_exit: &mut watch::Receiver, + ) -> Result<(), Error> { + let mut counter: usize = 0; + + while !*must_exit.borrow() { + let mut items = Vec::new(); + + for item in self.data.store.range(begin.to_vec()..end.to_vec()) { + let (key, value) = item?; + items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); + + if items.len() >= 1024 { + break; + } + } + + if items.len() > 0 { + let nodes = self + .aux + .replication + .write_nodes(&begin, &self.aux.system) + .into_iter() + .collect::>(); + if nodes.contains(&self.aux.system.id) { + warn!("Interrupting offload as partitions seem to have changed"); + break; + } + + counter += 1; + debug!( + "Offloading {} items from {:?}..{:?} ({})", + items.len(), + begin, + end, + counter + ); + self.offload_items(&items, &nodes[..]).await?; + } else { + break; + } + } + + Ok(()) + } + + async fn offload_items( + self: &Arc, + items: &Vec<(Vec, Arc)>, + nodes: &[UUID], + ) -> Result<(), Error> { + let values = items.iter().map(|(_k, v)| v.clone()).collect::>(); + let update_msg = Arc::new(TableRPC::::Update(values)); + + for res in join_all(nodes.iter().map(|to| { + self.aux + .rpc_client + .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) + })) + .await + { + res?; + } + + // All remote nodes have written those items, now we can delete them locally + let mut not_removed = 0; + for (k, v) in items.iter() { + if !self.data.delete_if_equal(&k[..], &v[..])? { + not_removed += 1; + } + } + + if not_removed > 0 { + debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed); + } + + Ok(()) + } + + // ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ====== + + fn get_root_ck(&self, range: PartitionRange) -> Result { + let begin = u16::from_be_bytes(range.begin); + let range_iter = match range.end { + Some(end) => { + let end = u16::from_be_bytes(end); + begin..=(end - 1) + } + None => begin..=0xFFFF, + }; + + let mut ret = vec![]; + for i in range_iter { + let key = MerkleNodeKey { + partition: u16::to_be_bytes(i), + prefix: vec![], + }; + match self.data.merkle_updater.read_node(&key)? { + MerkleNode::Empty => (), + x => { + ret.push((key.partition, hash_of(&x)?)); + } + } + } + Ok(ret) + } + + async fn do_sync_with( + self: Arc, + partition: TodoPartition, + who: UUID, + must_exit: watch::Receiver, + ) -> Result<(), Error> { + let root_ck = self.get_root_ck(partition.range)?; + let root_ck_hash = hash_of(&root_ck)?; + + // If their root checksum has level > than us, use that as a reference + let root_resp = self + .aux + .rpc_client + .call( + who, + TableRPC::::SyncRPC(SyncRPC::RootCkHash(partition.range, root_ck_hash)), + TABLE_SYNC_RPC_TIMEOUT, + ) + .await?; + + let mut todo = match root_resp { + TableRPC::::SyncRPC(SyncRPC::CkNoDifference) => { + debug!( + "({}) Sync {:?} with {:?}: no difference", + self.data.name, partition, who + ); + return Ok(()); + } + TableRPC::::SyncRPC(SyncRPC::RootCkList(_, their_root_ck)) => { + let join = join_ordered(&root_ck[..], &their_root_ck[..]); + let mut todo = VecDeque::new(); + for (p, v1, v2) in join.iter() { + let diff = match (v1, v2) { + (Some(_), None) | (None, Some(_)) => true, + (Some(a), Some(b)) => a != b, + _ => false, + }; + if diff { + todo.push_back(MerkleNodeKey { + partition: **p, + prefix: vec![], + }); + } + } + debug!( + "({}) Sync {:?} with {:?}: todo.len() = {}", + self.data.name, + partition, + who, + todo.len() + ); + todo + } + x => { + return Err(Error::Message(format!( + "Invalid respone to RootCkHash RPC: {}", + debug_serialize(x) + ))); + } + }; + + let mut todo_items = vec![]; + + while !todo.is_empty() && !*must_exit.borrow() { + let key = todo.pop_front().unwrap(); + let node = self.data.merkle_updater.read_node(&key)?; + + match node { + MerkleNode::Empty => { + // They have items we don't have. + // We don't request those items from them, they will send them. + // We only bother with pushing items that differ + } + MerkleNode::Leaf(ik, _) => { + // Just send that item directly + if let Some(val) = self.data.store.get(ik)? { + todo_items.push(val.to_vec()); + } + } + MerkleNode::Intermediate(l) => { + let remote_node = match self + .aux + .rpc_client + .call( + who, + TableRPC::::SyncRPC(SyncRPC::GetNode(key.clone())), + TABLE_SYNC_RPC_TIMEOUT, + ) + .await? + { + TableRPC::::SyncRPC(SyncRPC::Node(_, node)) => node, + x => { + return Err(Error::Message(format!( + "Invalid respone to GetNode RPC: {}", + debug_serialize(x) + ))); + } + }; + let int_l2 = match remote_node { + MerkleNode::Intermediate(l2) => l2, + _ => vec![], + }; + + let join = join_ordered(&l[..], &int_l2[..]); + for (p, v1, v2) in join.into_iter() { + let diff = match (v1, v2) { + (Some(_), None) | (None, Some(_)) => true, + (Some(a), Some(b)) => a != b, + _ => false, + }; + if diff { + todo.push_back(key.add_byte(*p)); + } + } + } + } + + if todo_items.len() >= 256 { + self.send_items(who, std::mem::replace(&mut todo_items, vec![])) + .await?; + } + } + + if !todo_items.is_empty() { + self.send_items(who, todo_items).await?; + } + + Ok(()) + } + + async fn send_items(&self, who: UUID, item_list: Vec>) -> Result<(), Error> { + info!( + "({}) Sending {} items to {:?}", + self.data.name, + item_list.len(), + who + ); + + let mut values = vec![]; + for item in item_list.iter() { + if let Some(v) = self.data.store.get(&item[..])? { + values.push(Arc::new(ByteBuf::from(v.as_ref()))); + } + } + let rpc_resp = self + .aux + .rpc_client + .call(who, TableRPC::::Update(values), TABLE_SYNC_RPC_TIMEOUT) + .await?; + if let TableRPC::::Ok = rpc_resp { + Ok(()) + } else { + Err(Error::Message(format!( + "Unexpected response to RPC Update: {}", + debug_serialize(&rpc_resp) + ))) + } + } + + // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== + + pub(crate) async fn handle_rpc(self: &Arc, message: &SyncRPC) -> Result { + match message { + SyncRPC::RootCkHash(range, h) => { + let root_ck = self.get_root_ck(*range)?; + let hash = hash_of(&root_ck)?; + if hash == *h { + Ok(SyncRPC::CkNoDifference) + } else { + Ok(SyncRPC::RootCkList(*range, root_ck)) + } + } + SyncRPC::GetNode(k) => { + let node = self.data.merkle_updater.read_node(&k)?; + Ok(SyncRPC::Node(k.clone(), node)) + } + _ => Err(Error::Message(format!("Unexpected sync RPC"))), + } + } +} + +impl SyncTodo { + fn add_full_sync( + &mut self, + data: &TableData, + aux: &TableAux, + ) { + let my_id = aux.system.id; + + self.todo.clear(); + + let ring = aux.system.ring.borrow().clone(); + let split_points = aux.replication.split_points(&ring); + + for i in 0..split_points.len() { + let begin: MerklePartition = { + let b = split_points[i]; + assert_eq!(b.as_slice()[2..], [0u8; 30][..]); + b.as_slice()[..2].try_into().unwrap() + }; + + let end: Option = if i + 1 < split_points.len() { + let e = split_points[i + 1]; + assert_eq!(e.as_slice()[2..], [0u8; 30][..]); + Some(e.as_slice()[..2].try_into().unwrap()) + } else { + None + }; + + let begin_hash = hash_of_merkle_partition(begin); + let end_hash = hash_of_merkle_partition_opt(end); + + let nodes = aux.replication.replication_nodes(&begin_hash, &ring); + + let retain = nodes.contains(&my_id); + if !retain { + // Check if we have some data to send, otherwise skip + if data.store.range(begin_hash..end_hash).next().is_none() { + continue; + } + } + + self.todo.push(TodoPartition { + range: PartitionRange { begin, end }, + retain, + }); + } + } + + fn pop_task(&mut self) -> Option { + if self.todo.is_empty() { + return None; + } + + let i = rand::thread_rng().gen_range::(0, self.todo.len()); + if i == self.todo.len() - 1 { + self.todo.pop() + } else { + let replacement = self.todo.pop().unwrap(); + let ret = std::mem::replace(&mut self.todo[i], replacement); + Some(ret) + } + } +} + +fn hash_of(x: &T) -> Result { + Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) +} + +fn join_ordered<'a, K: Ord + Eq, V1, V2>( + x: &'a [(K, V1)], + y: &'a [(K, V2)], +) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> { + let mut ret = vec![]; + let mut i = 0; + let mut j = 0; + while i < x.len() || j < y.len() { + if i < x.len() && j < y.len() && x[i].0 == y[j].0 { + ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1))); + i += 1; + j += 1; + } else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) { + ret.push((&x[i].0, Some(&x[i].1), None)); + i += 1; + } else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) { + ret.push((&x[i].0, None, Some(&y[j].1))); + j += 1; + } else { + unreachable!(); + } + } + ret +} diff --git a/src/table/table.rs b/src/table/table.rs index a4cb4b24..516c9358 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -15,9 +15,9 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; -use crate::schema::*; -use crate::table_sync::*; use crate::replication::*; +use crate::schema::*; +use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -50,7 +50,6 @@ pub(crate) enum TableRPC { impl RpcMessage for TableRPC {} - impl Table where F: TableSchema + 'static, @@ -69,29 +68,17 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); - let data = TableData::new( - name, - instance, - db, - system.background.clone(), - ); + let data = TableData::new(name, instance, db, system.background.clone()); - let aux = Arc::new(TableAux{ + let aux = Arc::new(TableAux { system, replication, rpc_client, }); - let syncer = TableSyncer::launch( - data.clone(), - aux.clone(), - ); + let syncer = TableSyncer::launch(data.clone(), aux.clone()); - let table = Arc::new(Self { - data, - aux, - syncer, - }); + let table = Arc::new(Self { data, aux, syncer }); table.clone().register_handler(rpc_server, rpc_path); @@ -106,7 +93,8 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::::Update(vec![e_enc]); - self.aux.rpc_client + self.aux + .rpc_client .try_call_many( &who[..], rpc, @@ -135,7 +123,11 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); - let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self + .aux + .rpc_client + .call(node, rpc, TABLE_RPC_TIMEOUT) + .await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); @@ -200,7 +192,8 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.aux.system + self.aux + .system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -221,7 +214,8 @@ where let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .aux.rpc_client + .aux + .rpc_client .try_call_many( &who[..], rpc, @@ -276,7 +270,8 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.aux.rpc_client + self.aux + .rpc_client .try_call_many( &who[..], TableRPC::::Update(vec![what_enc]), @@ -296,7 +291,8 @@ where }); let self2 = self.clone(); - self.aux.rpc_client + self.aux + .rpc_client .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } @@ -318,9 +314,7 @@ where Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let response = self.syncer - .handle_rpc(rpc, self.aux.system.background.stop_signal.clone()) - .await?; + let response = self.syncer.handle_rpc(rpc).await?; Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs deleted file mode 100644 index 7394be1b..00000000 --- a/src/table/table_sync.rs +++ /dev/null @@ -1,898 +0,0 @@ -use rand::Rng; -use std::collections::{BTreeMap, VecDeque}; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; - -use futures::future::join_all; -use futures::{pin_mut, select}; -use futures_util::future::*; -use futures_util::stream::*; -use serde::{Deserialize, Serialize}; -use serde_bytes::ByteBuf; -use tokio::sync::{mpsc, watch}; - -use garage_rpc::ring::Ring; -use garage_util::data::*; -use garage_util::error::Error; - -use crate::*; -use crate::data::*; -use crate::replication::*; - -const MAX_DEPTH: usize = 16; - -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); - -// Do anti-entropy every 10 minutes -const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60); - -const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60); - -pub struct TableSyncer { - data: Arc>, - aux: Arc>, - - todo: Mutex, - cache: Vec>>, -} - -#[derive(Serialize, Deserialize)] -pub(crate) enum SyncRPC { - GetRootChecksumRange(Hash, Hash), - RootChecksumRange(SyncRange), - Checksums(Vec), - Difference(Vec, Vec>), -} - -struct SyncTodo { - todo: Vec, -} - -#[derive(Debug, Clone)] -struct TodoPartition { - // Partition consists in hashes between begin included and end excluded - begin: Hash, - end: Hash, - - // Are we a node that stores this partition or not? - retain: bool, -} - -// A SyncRange defines a query on the dataset stored by a node, in the following way: -// - all items whose key are >= `begin` -// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded) -// - except if the first item of the range has such many leading zero bytes -// - and stopping at `end` (excluded) if such an item is not found -// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges" -// i.e. of ranges of level `level-1` that cover the same range -// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin) -// See RangeChecksum for the struct that stores this information. -#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] -pub(crate) struct SyncRange { - begin: Vec, - end: Vec, - level: usize, -} - -impl std::cmp::PartialOrd for SyncRange { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} -impl std::cmp::Ord for SyncRange { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.begin - .cmp(&other.begin) - .then(self.level.cmp(&other.level)) - .then(self.end.cmp(&other.end)) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct RangeChecksum { - bounds: SyncRange, - children: Vec<(SyncRange, Hash)>, - found_limit: Option>, - - #[serde(skip, default = "std::time::Instant::now")] - time: Instant, -} - -#[derive(Debug, Clone)] -struct RangeChecksumCache { - hash: Option, // None if no children - found_limit: Option>, - time: Instant, -} - -impl TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - pub(crate) fn launch(data: Arc>, - aux: Arc>) -> Arc { - let todo = SyncTodo{ todo: vec![] }; - - let syncer = Arc::new(Self { - data: data.clone(), - aux: aux.clone(), - todo: Mutex::new(todo), - cache: (0..MAX_DEPTH) - .map(|_| Mutex::new(BTreeMap::new())) - .collect::>(), - }); - - let (busy_tx, busy_rx) = mpsc::unbounded_channel(); - - let s1 = syncer.clone(); - aux.system.background.spawn_worker( - format!("table sync watcher for {}", data.name), - move |must_exit: watch::Receiver| s1.watcher_task(must_exit, busy_rx), - ); - - let s2 = syncer.clone(); - aux.system.background.spawn_worker( - format!("table syncer for {}", data.name), - move |must_exit: watch::Receiver| s2.syncer_task(must_exit, busy_tx), - ); - - let s3 = syncer.clone(); - tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(20)).await; - s3.add_full_scan(); - }); - - syncer - } - - async fn watcher_task( - self: Arc, - mut must_exit: watch::Receiver, - mut busy_rx: mpsc::UnboundedReceiver, - ) -> Result<(), Error> { - let mut prev_ring: Arc = self.aux.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver> = self.aux.system.ring.clone(); - let mut nothing_to_do_since = Some(Instant::now()); - - while !*must_exit.borrow() { - let s_ring_recv = ring_recv.recv().fuse(); - let s_busy = busy_rx.recv().fuse(); - let s_must_exit = must_exit.recv().fuse(); - let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); - pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); - - select! { - new_ring_r = s_ring_recv => { - if let Some(new_ring) = new_ring_r { - debug!("({}) Adding ring difference to syncer todo list", self.data.name); - self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux); - prev_ring = new_ring; - } - } - busy_opt = s_busy => { - if let Some(busy) = busy_opt { - if busy { - nothing_to_do_since = None; - } else { - if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } - } - } - } - must_exit_v = s_must_exit => { - if must_exit_v.unwrap_or(false) { - break; - } - } - _ = s_timeout => { - if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { - nothing_to_do_since = None; - debug!("({}) Adding full scan to syncer todo list", self.data.name); - self.add_full_scan(); - } - } - } - } - Ok(()) - } - - pub fn add_full_scan(&self) { - self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux); - } - - async fn syncer_task( - self: Arc, - mut must_exit: watch::Receiver, - busy_tx: mpsc::UnboundedSender, - ) -> Result<(), Error> { - while !*must_exit.borrow() { - let task = self.todo.lock().unwrap().pop_task(); - if let Some(partition) = task { - busy_tx.send(true)?; - let res = self - .clone() - .sync_partition(&partition, &mut must_exit) - .await; - if let Err(e) = res { - warn!( - "({}) Error while syncing {:?}: {}", - self.data.name, partition, e - ); - } - } else { - busy_tx.send(false)?; - tokio::time::delay_for(Duration::from_secs(1)).await; - } - } - Ok(()) - } - - async fn sync_partition( - self: Arc, - partition: &TodoPartition, - must_exit: &mut watch::Receiver, - ) -> Result<(), Error> { - if partition.retain { - let my_id = self.aux.system.id; - let nodes = self - .aux - .replication - .write_nodes(&partition.begin, &self.aux.system) - .into_iter() - .filter(|node| *node != my_id) - .collect::>(); - - debug!( - "({}) Preparing to sync {:?} with {:?}...", - self.data.name, partition, nodes - ); - let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?; - - let mut sync_futures = nodes - .iter() - .map(|node| { - self.clone().do_sync_with( - partition.clone(), - root_cks.clone(), - *node, - must_exit.clone(), - ) - }) - .collect::>(); - - let mut n_errors = 0; - while let Some(r) = sync_futures.next().await { - if let Err(e) = r { - n_errors += 1; - warn!("({}) Sync error: {}", self.data.name, e); - } - } - if n_errors > self.aux.replication.max_write_errors() { - return Err(Error::Message(format!( - "Sync failed with too many nodes (should have been: {:?}).", - nodes - ))); - } - } else { - self.offload_partition(&partition.begin, &partition.end, must_exit) - .await?; - } - - Ok(()) - } - - // Offload partition: this partition is not something we are storing, - // so send it out to all other nodes that store it and delete items locally. - // We don't bother checking if the remote nodes already have the items, - // we just batch-send everything. Offloading isn't supposed to happen very often. - // If any of the nodes that are supposed to store the items is unable to - // save them, we interrupt the process. - async fn offload_partition( - self: &Arc, - begin: &Hash, - end: &Hash, - must_exit: &mut watch::Receiver, - ) -> Result<(), Error> { - let mut counter: usize = 0; - - while !*must_exit.borrow() { - let mut items = Vec::new(); - - for item in self.data.store.range(begin.to_vec()..end.to_vec()) { - let (key, value) = item?; - items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); - - if items.len() >= 1024 { - break; - } - } - - if items.len() > 0 { - let nodes = self - .aux - .replication - .write_nodes(&begin, &self.aux.system) - .into_iter() - .collect::>(); - if nodes.contains(&self.aux.system.id) { - warn!("Interrupting offload as partitions seem to have changed"); - break; - } - - counter += 1; - debug!( - "Offloading {} items from {:?}..{:?} ({})", - items.len(), - begin, - end, - counter - ); - self.offload_items(&items, &nodes[..]).await?; - } else { - break; - } - } - - Ok(()) - } - - async fn offload_items( - self: &Arc, - items: &Vec<(Vec, Arc)>, - nodes: &[UUID], - ) -> Result<(), Error> { - let values = items.iter().map(|(_k, v)| v.clone()).collect::>(); - let update_msg = Arc::new(TableRPC::::Update(values)); - - for res in join_all(nodes.iter().map(|to| { - self.aux - .rpc_client - .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) - })) - .await - { - res?; - } - - // All remote nodes have written those items, now we can delete them locally - let mut not_removed = 0; - for (k, v) in items.iter() { - if !self.data.delete_if_equal(&k[..], &v[..])? { - not_removed += 1; - } - } - - if not_removed > 0 { - debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed); - } - - Ok(()) - } - - fn root_checksum( - self: &Arc, - begin: &Hash, - end: &Hash, - must_exit: &mut watch::Receiver, - ) -> Result { - for i in 1..MAX_DEPTH { - let rc = self.range_checksum( - &SyncRange { - begin: begin.to_vec(), - end: end.to_vec(), - level: i, - }, - must_exit, - )?; - if rc.found_limit.is_none() { - return Ok(rc); - } - } - Err(Error::Message(format!( - "Unable to compute root checksum (this should never happen)" - ))) - } - - fn range_checksum( - self: &Arc, - range: &SyncRange, - must_exit: &mut watch::Receiver, - ) -> Result { - assert!(range.level != 0); - trace!("Call range_checksum {:?}", range); - - if range.level == 1 { - let mut children = vec![]; - for item in self - .data - .store - .range(range.begin.clone()..range.end.clone()) - { - let (key, value) = item?; - let key_hash = blake2sum(&key[..]); - if children.len() > 0 - && key_hash.as_slice()[0..range.level] - .iter() - .all(|x| *x == 0u8) - { - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - return Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: Some(key.to_vec()), - time: Instant::now(), - }); - } - let item_range = SyncRange { - begin: key.to_vec(), - end: vec![], - level: 0, - }; - children.push((item_range, blake2sum(&value[..]))); - } - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: None, - time: Instant::now(), - }) - } else { - let mut children = vec![]; - let mut sub_range = SyncRange { - begin: range.begin.clone(), - end: range.end.clone(), - level: range.level - 1, - }; - let mut time = Instant::now(); - while !*must_exit.borrow() { - let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?; - - if let Some(hash) = sub_ck.hash { - children.push((sub_range.clone(), hash)); - if sub_ck.time < time { - time = sub_ck.time; - } - } - - if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() { - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - return Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: None, - time, - }); - } - let found_limit = sub_ck.found_limit.unwrap(); - - let actual_limit_hash = blake2sum(&found_limit[..]); - if actual_limit_hash.as_slice()[0..range.level] - .iter() - .all(|x| *x == 0u8) - { - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - return Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: Some(found_limit.clone()), - time, - }); - } - - sub_range.begin = found_limit; - } - trace!("range_checksum {:?} exiting due to must_exit", range); - Err(Error::Message(format!("Exiting."))) - } - } - - fn range_checksum_cached_hash( - self: &Arc, - range: &SyncRange, - must_exit: &mut watch::Receiver, - ) -> Result { - { - let mut cache = self.cache[range.level].lock().unwrap(); - if let Some(v) = cache.get(&range) { - if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT { - return Ok(v.clone()); - } - } - cache.remove(&range); - } - - let v = self.range_checksum(&range, must_exit)?; - trace!( - "({}) New checksum calculated for {}-{}/{}, {} children", - self.data.name, - hex::encode(&range.begin) - .chars() - .take(16) - .collect::(), - hex::encode(&range.end).chars().take(16).collect::(), - range.level, - v.children.len() - ); - - let hash = if v.children.len() > 0 { - Some(blake2sum(&rmp_to_vec_all_named(&v)?[..])) - } else { - None - }; - let cache_entry = RangeChecksumCache { - hash, - found_limit: v.found_limit, - time: v.time, - }; - - let mut cache = self.cache[range.level].lock().unwrap(); - cache.insert(range.clone(), cache_entry.clone()); - Ok(cache_entry) - } - - async fn do_sync_with( - self: Arc, - partition: TodoPartition, - root_ck: RangeChecksum, - who: UUID, - mut must_exit: watch::Receiver, - ) -> Result<(), Error> { - let mut todo = VecDeque::new(); - - // If their root checksum has level > than us, use that as a reference - let root_cks_resp = self - .aux - .rpc_client - .call( - who, - TableRPC::::SyncRPC(SyncRPC::GetRootChecksumRange( - partition.begin.clone(), - partition.end.clone(), - )), - TABLE_SYNC_RPC_TIMEOUT, - ) - .await?; - if let TableRPC::::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp { - if range.level > root_ck.bounds.level { - let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?; - todo.push_back(their_root_range_ck); - } else { - todo.push_back(root_ck); - } - } else { - return Err(Error::Message(format!( - "Invalid respone to GetRootChecksumRange RPC: {}", - debug_serialize(root_cks_resp) - ))); - } - - while !todo.is_empty() && !*must_exit.borrow() { - let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - trace!( - "({}) Sync with {:?}: {} ({}) remaining", - self.data.name, - who, - todo.len(), - total_children - ); - - let step_size = std::cmp::min(16, todo.len()); - let step = todo.drain(..step_size).collect::>(); - - let rpc_resp = self - .aux - .rpc_client - .call( - who, - TableRPC::::SyncRPC(SyncRPC::Checksums(step)), - TABLE_SYNC_RPC_TIMEOUT, - ) - .await?; - if let TableRPC::::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = - rpc_resp - { - if diff_ranges.len() > 0 || diff_items.len() > 0 { - info!( - "({}) Sync with {:?}: difference {} ranges, {} items", - self.data.name, - who, - diff_ranges.len(), - diff_items.len() - ); - } - let mut items_to_send = vec![]; - for differing in diff_ranges.drain(..) { - if differing.level == 0 { - items_to_send.push(differing.begin); - } else { - let checksum = self.range_checksum(&differing, &mut must_exit)?; - todo.push_back(checksum); - } - } - if diff_items.len() > 0 { - self.data.update_many(&diff_items[..])?; - } - if items_to_send.len() > 0 { - self.send_items(who, items_to_send).await?; - } - } else { - return Err(Error::Message(format!( - "Unexpected response to sync RPC checksums: {}", - debug_serialize(&rpc_resp) - ))); - } - } - Ok(()) - } - - async fn send_items(&self, who: UUID, item_list: Vec>) -> Result<(), Error> { - info!( - "({}) Sending {} items to {:?}", - self.data.name, - item_list.len(), - who - ); - - let mut values = vec![]; - for item in item_list.iter() { - if let Some(v) = self.data.store.get(&item[..])? { - values.push(Arc::new(ByteBuf::from(v.as_ref()))); - } - } - let rpc_resp = self - .aux - .rpc_client - .call(who, TableRPC::::Update(values), TABLE_SYNC_RPC_TIMEOUT) - .await?; - if let TableRPC::::Ok = rpc_resp { - Ok(()) - } else { - Err(Error::Message(format!( - "Unexpected response to RPC Update: {}", - debug_serialize(&rpc_resp) - ))) - } - } - - pub(crate) async fn handle_rpc( - self: &Arc, - message: &SyncRPC, - mut must_exit: watch::Receiver, - ) -> Result { - match message { - SyncRPC::GetRootChecksumRange(begin, end) => { - let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?; - Ok(SyncRPC::RootChecksumRange(root_cks.bounds)) - } - SyncRPC::Checksums(checksums) => { - self.handle_checksums_rpc(&checksums[..], &mut must_exit) - .await - } - _ => Err(Error::Message(format!("Unexpected sync RPC"))), - } - } - - async fn handle_checksums_rpc( - self: &Arc, - checksums: &[RangeChecksum], - must_exit: &mut watch::Receiver, - ) -> Result { - let mut ret_ranges = vec![]; - let mut ret_items = vec![]; - - for their_ckr in checksums.iter() { - let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?; - for (their_range, their_hash) in their_ckr.children.iter() { - let differs = match our_ckr - .children - .binary_search_by(|(our_range, _)| our_range.cmp(&their_range)) - { - Err(_) => { - if their_range.level >= 1 { - let cached_hash = - self.range_checksum_cached_hash(&their_range, must_exit)?; - cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true) - } else { - true - } - } - Ok(i) => our_ckr.children[i].1 != *their_hash, - }; - if differs { - ret_ranges.push(their_range.clone()); - if their_range.level == 0 { - if let Some(item_bytes) = - self.data.store.get(their_range.begin.as_slice())? - { - ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); - } - } - } - } - for (our_range, _hash) in our_ckr.children.iter() { - if let Some(their_found_limit) = &their_ckr.found_limit { - if our_range.begin.as_slice() > their_found_limit.as_slice() { - break; - } - } - - let not_present = our_ckr - .children - .binary_search_by(|(their_range, _)| their_range.cmp(&our_range)) - .is_err(); - if not_present { - if our_range.level > 0 { - ret_ranges.push(our_range.clone()); - } - if our_range.level == 0 { - if let Some(item_bytes) = - self.data.store.get(our_range.begin.as_slice())? - { - ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); - } - } - } - } - } - let n_checksums = checksums - .iter() - .map(|x| x.children.len()) - .fold(0, |x, y| x + y); - if ret_ranges.len() > 0 || ret_items.len() > 0 { - trace!( - "({}) Checksum comparison RPC: {} different + {} items for {} received", - self.data.name, - ret_ranges.len(), - ret_items.len(), - n_checksums - ); - } - Ok(SyncRPC::Difference(ret_ranges, ret_items)) - } - - pub(crate) fn invalidate(self: &Arc, item_key: &[u8]) { - for i in 1..MAX_DEPTH { - let needle = SyncRange { - begin: item_key.to_vec(), - end: vec![], - level: i, - }; - let mut cache = self.cache[i].lock().unwrap(); - if let Some(cache_entry) = cache.range(..=needle).rev().next() { - if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key { - let index = cache_entry.0.clone(); - drop(cache_entry); - cache.remove(&index); - } - } - } - } -} - -impl SyncTodo { - fn add_full_scan(&mut self, data: &TableData, aux: &TableAux) { - let my_id = aux.system.id; - - self.todo.clear(); - - let ring = aux.system.ring.borrow().clone(); - let split_points = aux.replication.split_points(&ring); - - for i in 0..split_points.len() - 1 { - let begin = split_points[i]; - let end = split_points[i + 1]; - if begin == end { - continue; - } - - let nodes = aux.replication.replication_nodes(&begin, &ring); - - let retain = nodes.contains(&my_id); - if !retain { - // Check if we have some data to send, otherwise skip - if data.store.range(begin..end).next().is_none() { - continue; - } - } - - self.todo.push(TodoPartition { begin, end, retain }); - } - } - - fn add_ring_difference( - &mut self, - old_ring: &Ring, - new_ring: &Ring, - data: &TableData, aux: &TableAux, - ) { - let my_id = aux.system.id; - - // If it is us who are entering or leaving the system, - // initiate a full sync instead of incremental sync - if old_ring.config.members.contains_key(&my_id) - != new_ring.config.members.contains_key(&my_id) - { - self.add_full_scan(data, aux); - return; - } - - let mut all_points = None - .into_iter() - .chain(aux.replication.split_points(old_ring).drain(..)) - .chain(aux.replication.split_points(new_ring).drain(..)) - .chain(self.todo.iter().map(|x| x.begin)) - .chain(self.todo.iter().map(|x| x.end)) - .collect::>(); - all_points.sort(); - all_points.dedup(); - - let mut old_todo = std::mem::replace(&mut self.todo, vec![]); - old_todo.sort_by(|x, y| x.begin.cmp(&y.begin)); - let mut new_todo = vec![]; - - for i in 0..all_points.len() - 1 { - let begin = all_points[i]; - let end = all_points[i + 1]; - let was_ours = aux - .replication - .replication_nodes(&begin, &old_ring) - .contains(&my_id); - let is_ours = aux - .replication - .replication_nodes(&begin, &new_ring) - .contains(&my_id); - - let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) { - Ok(_) => true, - Err(j) => { - (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end) - || (j < old_todo.len() - && old_todo[j].begin < end && begin < old_todo[j].end) - } - }; - if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { - new_todo.push(TodoPartition { - begin, - end, - retain: is_ours, - }); - } - } - - self.todo = new_todo; - } - - fn pop_task(&mut self) -> Option { - if self.todo.is_empty() { - return None; - } - - let i = rand::thread_rng().gen_range::(0, self.todo.len()); - if i == self.todo.len() - 1 { - self.todo.pop() - } else { - let replacement = self.todo.pop().unwrap(); - let ret = std::mem::replace(&mut self.todo[i], replacement); - Some(ret) - } - } -} -- cgit v1.2.3 From db7a9d4948d41e4b641f9c50f6ff8921a436431d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 18:45:26 +0100 Subject: Tiny changes --- src/table/sync.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index 9c37c286..42321ac6 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -49,7 +49,6 @@ pub(crate) enum SyncRPC { CkNoDifference, GetNode(MerkleNodeKey), Node(MerkleNodeKey, MerkleNode), - Items(Vec>), } struct SyncTodo { @@ -119,7 +118,7 @@ where select! { new_ring_r = s_ring_recv => { if new_ring_r.is_some() { - debug!("({}) Adding ring difference to syncer todo list", self.data.name); + debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); self.add_full_sync(); } } @@ -142,7 +141,7 @@ where _ = s_timeout => { if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; - debug!("({}) Adding full sync to syncer todo list", self.data.name); + debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name); self.add_full_sync(); } } @@ -330,6 +329,10 @@ where } // ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ====== + // The driver side is only concerned with sending out the item it has + // and the other side might not have. Receiving items that differ from one + // side to the other will happen when the other side syncs with us, + // which they also do regularly. fn get_root_ck(&self, range: PartitionRange) -> Result { let begin = u16::from_be_bytes(range.begin); -- cgit v1.2.3 From fae5104a2cf91206f995b183c5f217ea6729a551 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 18:50:32 +0100 Subject: Add a nice warning --- src/table/sync.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index 42321ac6..68fc9fcb 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -433,9 +433,12 @@ where // We don't request those items from them, they will send them. // We only bother with pushing items that differ } - MerkleNode::Leaf(ik, _) => { + MerkleNode::Leaf(ik, ivhash) => { // Just send that item directly - if let Some(val) = self.data.store.get(ik)? { + if let Some(val) = self.data.store.get(&ik[..])? { + if blake2sum(&val[..]) != ivhash { + warn!("Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", ik); + } todo_items.push(val.to_vec()); } } -- cgit v1.2.3 From f7c2cd1cd7ee15b9c97b9fbdef25c0644b3523bb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 18:55:17 +0100 Subject: Add comment, and also whoops, this wasn't doing what we expected --- src/table/sync.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index 68fc9fcb..b5044a4e 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -440,9 +440,13 @@ where warn!("Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", ik); } todo_items.push(val.to_vec()); + } else { + warn!("Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", ik); } } MerkleNode::Intermediate(l) => { + // Get Merkle node for this tree position at remote node + // and compare it with local node let remote_node = match self .aux .rpc_client @@ -462,7 +466,11 @@ where } }; let int_l2 = match remote_node { + // If they have an intermediate node at this tree position, + // we can compare them to find differences MerkleNode::Intermediate(l2) => l2, + // Otherwise, treat it as if they have nothing for this subtree, + // which will have the consequence of sending them everything _ => vec![], }; @@ -493,20 +501,18 @@ where Ok(()) } - async fn send_items(&self, who: UUID, item_list: Vec>) -> Result<(), Error> { + async fn send_items(&self, who: UUID, item_value_list: Vec>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", self.data.name, - item_list.len(), + item_value_list.len(), who ); - let mut values = vec![]; - for item in item_list.iter() { - if let Some(v) = self.data.store.get(&item[..])? { - values.push(Arc::new(ByteBuf::from(v.as_ref()))); - } - } + let values = item_value_list.into_iter() + .map(|x| Arc::new(ByteBuf::from(x))) + .collect::>(); + let rpc_resp = self .aux .rpc_client -- cgit v1.2.3 From 3f7a496355bdbeeeee859912fa6fa7a95cb47f3b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 19:06:27 +0100 Subject: More security: don't delete stuff too easily --- src/table/replication/fullcopy.rs | 7 ++++++- src/table/sync.rs | 3 +++ 2 files changed, 9 insertions(+), 1 deletion(-) (limited to 'src/table') diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index a20f20b7..a5faece9 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -41,7 +41,12 @@ impl TableReplication for TableFullReplication { self.replication_nodes(hash, system.ring.borrow().as_ref()) } fn write_quorum(&self, system: &System) -> usize { - system.ring.borrow().config.members.len() - self.max_faults + let nmembers = system.ring.borrow().config.members.len(); + if nmembers > self.max_faults { + nmembers - self.max_faults + } else { + 1 + } } fn max_write_errors(&self) -> usize { self.max_faults diff --git a/src/table/sync.rs b/src/table/sync.rs index b5044a4e..f8ebb2f0 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -277,6 +277,9 @@ where warn!("Interrupting offload as partitions seem to have changed"); break; } + if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) { + return Err(Error::Message(format!("Not offloading as we don't have a quorum of nodes to write to."))); + } counter += 1; debug!( -- cgit v1.2.3 From 1ec49980ec876ef9395a9ae088f82d86a1a0d9f6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 19:30:24 +0100 Subject: whoops --- src/table/merkle.rs | 3 ++- src/table/sync.rs | 10 +++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) (limited to 'src/table') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 92c18e09..467ce615 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -50,7 +50,7 @@ pub(crate) struct MerkleUpdater { empty_node_hash: Hash, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct MerkleNodeKey { // partition: first 16 bits (two bytes) of the partition_key's hash pub partition: MerklePartition, @@ -283,6 +283,7 @@ impl MerkleUpdater { k: &MerkleNodeKey, v: &MerkleNode, ) -> ConflictableTransactionResult { + trace!("Put Merkle node: {:?} => {:?}", k, v); if *v == MerkleNode::Empty { tx.remove(k.encode())?; Ok(self.empty_node_hash) diff --git a/src/table/sync.rs b/src/table/sync.rs index f8ebb2f0..07d48155 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -370,6 +370,14 @@ where must_exit: watch::Receiver, ) -> Result<(), Error> { let root_ck = self.get_root_ck(partition.range)?; + if root_ck.is_empty() { + debug!( + "({}) Sync {:?} with {:?}: partition is empty.", + self.data.name, partition, who + ); + return Ok(()) + } + let root_ck_hash = hash_of(&root_ck)?; // If their root checksum has level > than us, use that as a reference @@ -637,7 +645,7 @@ fn join_ordered<'a, K: Ord + Eq, V1, V2>( ret.push((&x[i].0, Some(&x[i].1), None)); i += 1; } else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) { - ret.push((&x[i].0, None, Some(&y[j].1))); + ret.push((&y[j].0, None, Some(&y[j].1))); j += 1; } else { unreachable!(); -- cgit v1.2.3 From 7fdaf7aef0c2aa8b38dbc7dac630f6f9baf8f0a4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 14:37:46 +0100 Subject: Fix merkle updater not being notified; improved logging --- src/table/data.rs | 4 ++-- src/table/merkle.rs | 13 +++++++------ src/table/sync.rs | 13 +++++++------ 3 files changed, 16 insertions(+), 14 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index 6217bf6d..2817a849 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -142,7 +142,7 @@ where if let Some((old_entry, new_entry)) = changed { self.instance.updated(old_entry, Some(new_entry)); - //self.syncer.load_full().unwrap().invalidate(&tree_key[..]); + self.merkle_updater.todo_notify.notify(); } Ok(()) @@ -163,7 +163,7 @@ where if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); - //self.syncer.load_full().unwrap().invalidate(k); + self.merkle_updater.todo_notify.notify(); } Ok(removed) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 467ce615..a694c9e9 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -4,7 +4,7 @@ use std::time::Duration; use futures::select; use futures_util::future::*; -use log::{info, warn}; +use log::{debug, warn}; use serde::{Deserialize, Serialize}; use sled::transaction::{ ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, @@ -109,11 +109,11 @@ impl MerkleUpdater { match x { Ok((key, valhash)) => { if let Err(e) = self.update_item(&key[..], &valhash[..]) { - warn!("Error while updating Merkle tree item: {}", e); + warn!("({}) Error while updating Merkle tree item: {}", self.table_name, e); } } Err(e) => { - warn!("Error while iterating on Merkle todo tree: {}", e); + warn!("({}) Error while iterating on Merkle todo tree: {}", self.table_name, e); tokio::time::delay_for(Duration::from_secs(10)).await; } } @@ -152,8 +152,9 @@ impl MerkleUpdater { .is_ok(); if !deleted { - info!( - "Item not deleted from Merkle todo because it changed: {:?}", + debug!( + "({}) Item not deleted from Merkle todo because it changed: {:?}", + self.table_name, k ); } @@ -195,7 +196,7 @@ impl MerkleUpdater { if children.len() == 0 { // should not happen - warn!("Replacing intermediate node with empty node, should not happen."); + warn!("({}) Replacing intermediate node with empty node, should not happen.", self.table_name); Some(MerkleNode::Empty) } else if children.len() == 1 { // We now have a single node (case when the update deleted one of only two diff --git a/src/table/sync.rs b/src/table/sync.rs index 07d48155..dbfa0a9f 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -274,7 +274,7 @@ where .into_iter() .collect::>(); if nodes.contains(&self.aux.system.id) { - warn!("Interrupting offload as partitions seem to have changed"); + warn!("({}) Interrupting offload as partitions seem to have changed", self.data.name); break; } if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) { @@ -282,8 +282,9 @@ where } counter += 1; - debug!( - "Offloading {} items from {:?}..{:?} ({})", + info!( + "({}) Offloading {} items from {:?}..{:?} ({})", + self.data.name, items.len(), begin, end, @@ -325,7 +326,7 @@ where } if not_removed > 0 { - debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed); + debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed); } Ok(()) @@ -448,11 +449,11 @@ where // Just send that item directly if let Some(val) = self.data.store.get(&ik[..])? { if blake2sum(&val[..]) != ivhash { - warn!("Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", ik); + warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); } todo_items.push(val.to_vec()); } else { - warn!("Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", ik); + warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); } } MerkleNode::Intermediate(l) => { -- cgit v1.2.3 From 1fea257291bdbf447f9918274ebf73848afb3a0c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 14:51:17 +0100 Subject: Don't sync at beginning --- src/table/sync.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index dbfa0a9f..049a16ae 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -105,6 +105,7 @@ where mut must_exit: watch::Receiver, mut busy_rx: mpsc::UnboundedReceiver, ) -> Result<(), Error> { + let mut prev_ring: Arc = self.aux.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver> = self.aux.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); @@ -117,9 +118,12 @@ where select! { new_ring_r = s_ring_recv => { - if new_ring_r.is_some() { - debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); - self.add_full_sync(); + if let Some(new_ring) = new_ring_r { + if !Arc::ptr_eq(&new_ring, &prev_ring) { + debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); + self.add_full_sync(); + prev_ring = new_ring; + } } } busy_opt = s_busy => { -- cgit v1.2.3 From 8860aa19b867183b83ee48efd9990cd34e567f53 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 15:05:26 +0100 Subject: Make syncer have its own rpc client/server --- src/table/merkle.rs | 18 ++++++++---- src/table/sync.rs | 81 ++++++++++++++++++++++++++++++++++++++--------------- src/table/table.rs | 4 +-- 3 files changed, 73 insertions(+), 30 deletions(-) (limited to 'src/table') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index a694c9e9..a164df04 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -109,11 +109,17 @@ impl MerkleUpdater { match x { Ok((key, valhash)) => { if let Err(e) = self.update_item(&key[..], &valhash[..]) { - warn!("({}) Error while updating Merkle tree item: {}", self.table_name, e); + warn!( + "({}) Error while updating Merkle tree item: {}", + self.table_name, e + ); } } Err(e) => { - warn!("({}) Error while iterating on Merkle todo tree: {}", self.table_name, e); + warn!( + "({}) Error while iterating on Merkle todo tree: {}", + self.table_name, e + ); tokio::time::delay_for(Duration::from_secs(10)).await; } } @@ -154,8 +160,7 @@ impl MerkleUpdater { if !deleted { debug!( "({}) Item not deleted from Merkle todo because it changed: {:?}", - self.table_name, - k + self.table_name, k ); } Ok(()) @@ -196,7 +201,10 @@ impl MerkleUpdater { if children.len() == 0 { // should not happen - warn!("({}) Replacing intermediate node with empty node, should not happen.", self.table_name); + warn!( + "({}) Replacing intermediate node with empty node, should not happen.", + self.table_name + ); Some(MerkleNode::Empty) } else if children.len() == 1 { // We now have a single node (case when the update deleted one of only two diff --git a/src/table/sync.rs b/src/table/sync.rs index 049a16ae..23161d15 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -12,10 +12,13 @@ use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use tokio::sync::{mpsc, watch}; -use garage_rpc::ring::Ring; use garage_util::data::*; use garage_util::error::Error; +use garage_rpc::ring::Ring; +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; + use crate::data::*; use crate::merkle::*; use crate::replication::*; @@ -31,6 +34,7 @@ pub struct TableSyncer { aux: Arc>, todo: Mutex, + rpc_client: Arc>, } type RootCk = Vec<(MerklePartition, Hash)>; @@ -49,8 +53,12 @@ pub(crate) enum SyncRPC { CkNoDifference, GetNode(MerkleNodeKey), Node(MerkleNodeKey, MerkleNode), + Items(Vec>), + Ok, } +impl RpcMessage for SyncRPC {} + struct SyncTodo { todo: Vec, } @@ -68,15 +76,25 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(data: Arc>, aux: Arc>) -> Arc { + pub(crate) fn launch( + data: Arc>, + aux: Arc>, + rpc_server: &mut RpcServer, + ) -> Arc { + let rpc_path = format!("table_{}/sync", data.name); + let rpc_client = aux.system.rpc_client::(&rpc_path); + let todo = SyncTodo { todo: vec![] }; let syncer = Arc::new(Self { data: data.clone(), aux: aux.clone(), todo: Mutex::new(todo), + rpc_client, }); + syncer.register_handler(rpc_server, rpc_path); + let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let s1 = syncer.clone(); @@ -100,6 +118,21 @@ where syncer } + fn register_handler(self: &Arc, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); + rpc_server.add_handler::(path, move |msg, _addr| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.aux.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + } + async fn watcher_task( self: Arc, mut must_exit: watch::Receiver, @@ -278,11 +311,16 @@ where .into_iter() .collect::>(); if nodes.contains(&self.aux.system.id) { - warn!("({}) Interrupting offload as partitions seem to have changed", self.data.name); + warn!( + "({}) Interrupting offload as partitions seem to have changed", + self.data.name + ); break; } if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) { - return Err(Error::Message(format!("Not offloading as we don't have a quorum of nodes to write to."))); + return Err(Error::Message(format!( + "Not offloading as we don't have a quorum of nodes to write to." + ))); } counter += 1; @@ -309,11 +347,10 @@ where nodes: &[UUID], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::>(); - let update_msg = Arc::new(TableRPC::::Update(values)); + let update_msg = Arc::new(SyncRPC::Items(values)); for res in join_all(nodes.iter().map(|to| { - self.aux - .rpc_client + self.rpc_client .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) })) .await @@ -380,31 +417,30 @@ where "({}) Sync {:?} with {:?}: partition is empty.", self.data.name, partition, who ); - return Ok(()) + return Ok(()); } let root_ck_hash = hash_of(&root_ck)?; // If their root checksum has level > than us, use that as a reference let root_resp = self - .aux .rpc_client .call( who, - TableRPC::::SyncRPC(SyncRPC::RootCkHash(partition.range, root_ck_hash)), + SyncRPC::RootCkHash(partition.range, root_ck_hash), TABLE_SYNC_RPC_TIMEOUT, ) .await?; let mut todo = match root_resp { - TableRPC::::SyncRPC(SyncRPC::CkNoDifference) => { + SyncRPC::CkNoDifference => { debug!( "({}) Sync {:?} with {:?}: no difference", self.data.name, partition, who ); return Ok(()); } - TableRPC::::SyncRPC(SyncRPC::RootCkList(_, their_root_ck)) => { + SyncRPC::RootCkList(_, their_root_ck) => { let join = join_ordered(&root_ck[..], &their_root_ck[..]); let mut todo = VecDeque::new(); for (p, v1, v2) in join.iter() { @@ -464,16 +500,11 @@ where // Get Merkle node for this tree position at remote node // and compare it with local node let remote_node = match self - .aux .rpc_client - .call( - who, - TableRPC::::SyncRPC(SyncRPC::GetNode(key.clone())), - TABLE_SYNC_RPC_TIMEOUT, - ) + .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT) .await? { - TableRPC::::SyncRPC(SyncRPC::Node(_, node)) => node, + SyncRPC::Node(_, node) => node, x => { return Err(Error::Message(format!( "Invalid respone to GetNode RPC: {}", @@ -525,16 +556,16 @@ where who ); - let values = item_value_list.into_iter() + let values = item_value_list + .into_iter() .map(|x| Arc::new(ByteBuf::from(x))) .collect::>(); let rpc_resp = self - .aux .rpc_client - .call(who, TableRPC::::Update(values), TABLE_SYNC_RPC_TIMEOUT) + .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT) .await?; - if let TableRPC::::Ok = rpc_resp { + if let SyncRPC::Ok = rpc_resp { Ok(()) } else { Err(Error::Message(format!( @@ -561,6 +592,10 @@ where let node = self.data.merkle_updater.read_node(&k)?; Ok(SyncRPC::Node(k.clone(), node)) } + SyncRPC::Items(items) => { + self.data.update_many(items)?; + Ok(SyncRPC::Ok) + } _ => Err(Error::Message(format!("Unexpected sync RPC"))), } } diff --git a/src/table/table.rs b/src/table/table.rs index 516c9358..edb1be3f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -24,7 +24,7 @@ const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct TableAux { pub system: Arc, pub replication: R, - pub(crate) rpc_client: Arc>>, + rpc_client: Arc>>, } pub struct Table { @@ -76,7 +76,7 @@ where rpc_client, }); - let syncer = TableSyncer::launch(data.clone(), aux.clone()); + let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); let table = Arc::new(Self { data, aux, syncer }); -- cgit v1.2.3 From cbe7e1a66a9dceaaeae0467b4eefe51afd5b297c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 15:07:23 +0100 Subject: Move table rpc client out of tableaux --- src/table/sync.rs | 6 +++--- src/table/table.rs | 31 +++++++++++++------------------ 2 files changed, 16 insertions(+), 21 deletions(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index 23161d15..4be8cd10 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -31,7 +31,7 @@ const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); pub struct TableSyncer { data: Arc>, - aux: Arc>, + aux: Arc>, todo: Mutex, rpc_client: Arc>, @@ -78,7 +78,7 @@ where { pub(crate) fn launch( data: Arc>, - aux: Arc>, + aux: Arc>, rpc_server: &mut RpcServer, ) -> Arc { let rpc_path = format!("table_{}/sync", data.name); @@ -605,7 +605,7 @@ impl SyncTodo { fn add_full_sync( &mut self, data: &TableData, - aux: &TableAux, + aux: &TableAux, ) { let my_id = aux.system.id; diff --git a/src/table/table.rs b/src/table/table.rs index edb1be3f..dd3394bd 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -21,16 +21,16 @@ use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct TableAux { +pub struct TableAux { pub system: Arc, pub replication: R, - rpc_client: Arc>>, } pub struct Table { pub data: Arc>, - pub aux: Arc>, + pub aux: Arc>, pub syncer: Arc>, + rpc_client: Arc>>, } #[derive(Serialize, Deserialize)] @@ -73,12 +73,16 @@ where let aux = Arc::new(TableAux { system, replication, - rpc_client, }); let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); - let table = Arc::new(Self { data, aux, syncer }); + let table = Arc::new(Self { + data, + aux, + syncer, + rpc_client, + }); table.clone().register_handler(rpc_server, rpc_path); @@ -93,8 +97,7 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::::Update(vec![e_enc]); - self.aux - .rpc_client + self.rpc_client .try_call_many( &who[..], rpc, @@ -123,11 +126,7 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); - let resp = self - .aux - .rpc_client - .call(node, rpc, TABLE_RPC_TIMEOUT) - .await?; + let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); @@ -156,7 +155,6 @@ where let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self - .aux .rpc_client .try_call_many( &who[..], @@ -214,7 +212,6 @@ where let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .aux .rpc_client .try_call_many( &who[..], @@ -270,8 +267,7 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.aux - .rpc_client + self.rpc_client .try_call_many( &who[..], TableRPC::::Update(vec![what_enc]), @@ -291,8 +287,7 @@ where }); let self2 = self.clone(); - self.aux - .rpc_client + self.rpc_client .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } -- cgit v1.2.3 From a1442f072ad9427851f49103083582637ddcdbd4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 15:40:54 +0100 Subject: Implement garage stats to get info on node contents --- src/table/data.rs | 2 +- src/table/merkle.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index 2817a849..5e7314d2 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -17,7 +17,7 @@ pub struct TableData { pub instance: F, pub store: sled::Tree, - pub(crate) merkle_updater: Arc, + pub merkle_updater: Arc, } impl TableData diff --git a/src/table/merkle.rs b/src/table/merkle.rs index a164df04..b04a2a88 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -32,7 +32,7 @@ pub fn hash_of_merkle_partition_opt(p: Option) -> Hash { // 16 bits (two bytes) of item's partition keys' hashes. // It builds one Merkle tree for each of these 2**16 partitions. -pub(crate) struct MerkleUpdater { +pub struct MerkleUpdater { table_name: String, background: Arc, @@ -40,7 +40,7 @@ pub(crate) struct MerkleUpdater { // - key = the key of an item in the main table, ie hash(partition_key)+sort_key // - value = the hash of the full serialized item, if present, // or an empty vec if item is absent (deleted) - pub(crate) todo: sled::Tree, + pub todo: sled::Tree, pub(crate) todo_notify: Notify, // Content of the merkle tree: items where -- cgit v1.2.3 From c475471e7a8e7544f2be490898f4249cf27a17e9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 19:57:37 +0100 Subject: Implement table gc, currently for block_ref and version only --- src/table/data.rs | 67 ++++++++++++++--- src/table/gc.rs | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/table/lib.rs | 1 + src/table/merkle.rs | 5 +- src/table/schema.rs | 2 + src/table/sync.rs | 18 ++--- src/table/table.rs | 8 +- 7 files changed, 279 insertions(+), 34 deletions(-) create mode 100644 src/table/gc.rs (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index 5e7314d2..91607f7a 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -1,3 +1,4 @@ +use core::borrow::Borrow; use std::sync::Arc; use log::warn; @@ -17,6 +18,7 @@ pub struct TableData { pub instance: F, pub store: sled::Tree, + pub gc_todo: sled::Tree, pub merkle_updater: Arc, } @@ -41,6 +43,10 @@ where .open_tree(&format!("{}:merkle_tree", name)) .expect("Unable to open DB Merkle tree tree"); + let gc_todo = db + .open_tree(&format!("{}:gc_todo", name)) + .expect("Unable to open DB tree"); + let merkle_updater = MerkleUpdater::launch( name.clone(), background, @@ -52,6 +58,7 @@ where name, instance, store, + gc_todo, merkle_updater, }) } @@ -103,10 +110,17 @@ where } // Mutation functions - - pub(crate) fn update_many(&self, entries: &[Arc]) -> Result<(), Error> { + // When changing this code, take care of propagating modifications correctly: + // - When an entry is modified or deleted, call the updated() function + // on the table instance + // - When an entry is modified or deleted, add it to the merkle updater's todo list. + // This has to be done atomically with the modification for the merkle updater + // to maintain consistency. The merkle updater must then be notified with todo_notify. + // - When an entry is updated to be a tombstone, add it to the gc_todo tree + + pub(crate) fn update_many>(&self, entries: &[T]) -> Result<(), Error> { for update_bytes in entries.iter() { - self.update_entry(update_bytes.as_slice())?; + self.update_entry(update_bytes.borrow().as_slice())?; } Ok(()) } @@ -115,8 +129,8 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { - let (old_entry, new_entry) = match db.get(&tree_key)? { + let changed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + let (old_entry, new_entry) = match store.get(&tree_key)? { Some(prev_bytes) => { let old_entry = self .decode_entry(&prev_bytes) @@ -132,27 +146,32 @@ where let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RMPEncode) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; - db.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry))) + let new_bytes_hash = blake2sum(&new_bytes[..]); + mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; + store.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry, new_bytes_hash))) } else { Ok(None) } })?; - if let Some((old_entry, new_entry)) = changed { + if let Some((old_entry, new_entry, new_bytes_hash)) = changed { + let is_tombstone = new_entry.is_tombstone(); self.instance.updated(old_entry, Some(new_entry)); self.merkle_updater.todo_notify.notify(); + if is_tombstone { + self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; + } } Ok(()) } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { - if let Some(cur_v) = txn.get(k)? { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { if cur_v == v { - txn.remove(k)?; + store.remove(k)?; mkl_todo.insert(k, vec![])?; return Ok(true); } @@ -168,6 +187,30 @@ where Ok(removed) } + pub(crate) fn delete_if_equal_hash(self: &Arc, k: &[u8], vhash: Hash) -> Result { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if blake2sum(&cur_v[..]) == vhash { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(Some(cur_v)); + } + } + Ok(None) + })?; + + if let Some(old_v) = removed { + let old_entry = self.decode_entry(&old_v[..])?; + self.instance.updated(Some(old_entry), None); + self.merkle_updater.todo_notify.notify(); + Ok(true) + } else { + Ok(false) + } + } + + // ---- Utility functions ---- + pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); diff --git a/src/table/gc.rs b/src/table/gc.rs new file mode 100644 index 00000000..afc8a473 --- /dev/null +++ b/src/table/gc.rs @@ -0,0 +1,212 @@ +use std::sync::Arc; +use std::time::Duration; +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; + +use futures::future::join_all; +use futures::select; +use futures_util::future::*; +use tokio::sync::watch; + +use garage_util::data::*; +use garage_util::error::Error; + +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; + +use crate::data::*; +use crate::table::*; +use crate::schema::*; +use crate::replication::*; + +const TABLE_GC_BATCH_SIZE: usize = 1024; +const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); + +pub struct TableGC { + data: Arc>, + aux: Arc>, + + rpc_client: Arc>, +} + +#[derive(Serialize, Deserialize)] +enum GcRPC { + Update(Vec), + DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), + Ok, +} + +impl RpcMessage for GcRPC {} + +impl TableGC +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub(crate) fn launch( + data: Arc>, + aux: Arc>, + rpc_server: &mut RpcServer, + ) -> Arc { + let rpc_path = format!("table_{}/gc", data.name); + let rpc_client = aux.system.rpc_client::(&rpc_path); + + let gc = Arc::new(Self { + data: data.clone(), + aux: aux.clone(), + rpc_client, + }); + + gc.register_handler(rpc_server, rpc_path); + + let gc1 = gc.clone(); + aux.system.background.spawn_worker( + format!("GC loop for {}", data.name), + move |must_exit: watch::Receiver| gc1.gc_loop(must_exit), + ); + + gc + } + + async fn gc_loop(self: Arc, mut must_exit: watch::Receiver) -> Result<(), Error> { + while !*must_exit.borrow() { + match self.gc_loop_iter().await { + Ok(true) => { + // Stuff was done, loop imediately + } + Ok(false) => { + select! { + _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), + _ = must_exit.recv().fuse() => (), + } + } + Err(e) => { + warn!("({}) Error doing GC: {}", self.data.name, e); + } + } + } + Ok(()) + } + + async fn gc_loop_iter(&self) -> Result { + let mut entries = vec![]; + let mut excluded = vec![]; + + for item in self.data.gc_todo.iter() { + let (k, vhash) = item?; + + let vhash = Hash::try_from(&vhash[..]).unwrap(); + + let v_opt = self.data.store.get(&k[..])? + .filter(|v| blake2sum(&v[..]) == vhash); + + if let Some(v) = v_opt { + entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec()))); + if entries.len() >= TABLE_GC_BATCH_SIZE { + break; + } + } else { + excluded.push((k, vhash)); + } + } + + for (k, vhash) in excluded { + let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec>(k, Some(vhash), None)?; + } + + if entries.len() == 0 { + // Nothing to do in this iteration + return Ok(false); + } + + debug!("({}) GC: doing {} items", self.data.name, entries.len()); + + let mut partitions = HashMap::new(); + for (k, vhash, v) in entries { + let pkh = Hash::try_from(&k[..32]).unwrap(); + let mut nodes = self.aux.replication.write_nodes(&pkh, &self.aux.system); + nodes.retain(|x| *x != self.aux.system.id); + nodes.sort(); + + if !partitions.contains_key(&nodes) { + partitions.insert(nodes.clone(), vec![]); + } + partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); + } + + let resps = join_all(partitions.into_iter() + .map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await; + for resp in resps { + if let Err(e) = resp { + warn!("({}) Unable to send and delete for GC: {}", self.data.name, e); + } + } + + Ok(true) + } + + async fn try_send_and_delete(&self, nodes: Vec, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> { + let n_items = items.len(); + + let mut updates = vec![]; + let mut deletes = vec![]; + for (k, vhash, v) in items { + updates.push(v); + deletes.push((k, vhash)); + } + + self.rpc_client.try_call_many( + &nodes[..], + GcRPC::Update(updates), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; + + info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items); + + self.rpc_client.try_call_many( + &nodes[..], + GcRPC::DeleteIfEqualHash(deletes.clone()), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; + + for (k, vhash) in deletes { + self.data.delete_if_equal_hash(&k[..], vhash)?; + let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec>(k, Some(vhash), None)?; + } + + Ok(()) + } + + // ---- RPC HANDLER ---- + + fn register_handler(self: &Arc, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); + rpc_server.add_handler::(path, move |msg, _addr| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.aux.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle_rpc(&msg).await } + }); + } + + async fn handle_rpc(self: &Arc, message: &GcRPC) -> Result { + match message { + GcRPC::Update(items) => { + self.data.update_many(items)?; + Ok(GcRPC::Ok) + } + GcRPC::DeleteIfEqualHash(items) => { + for (key, vhash) in items.iter() { + self.data.delete_if_equal_hash(&key[..], *vhash)?; + } + Ok(GcRPC::Ok) + } + _ => Err(Error::Message(format!("Unexpected GC RPC"))), + } + } +} diff --git a/src/table/lib.rs b/src/table/lib.rs index 18c29c35..8a64ff0b 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -11,6 +11,7 @@ pub mod data; pub mod merkle; pub mod replication; pub mod sync; +pub mod gc; pub mod table; pub use schema::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index b04a2a88..7a0adba1 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -139,10 +139,7 @@ impl MerkleUpdater { let new_vhash = if vhash_by.len() == 0 { None } else { - let vhash_by: [u8; 32] = vhash_by - .try_into() - .map_err(|_| Error::Message(format!("Invalid value in Merkle todo table")))?; - Some(Hash::from(vhash_by)) + Some(Hash::try_from(&vhash_by[..]).unwrap()) }; let key = MerkleNodeKey { diff --git a/src/table/schema.rs b/src/table/schema.rs index 7fbb7b25..5112ea15 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -41,6 +41,8 @@ pub trait Entry: { fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; + + fn is_tombstone(&self) -> bool { false } } pub trait TableSchema: Send + Sync { diff --git a/src/table/sync.rs b/src/table/sync.rs index 4be8cd10..aae65852 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -3,7 +3,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use futures::future::join_all; use futures::{pin_mut, select}; use futures_util::future::*; use futures_util::stream::*; @@ -347,16 +346,11 @@ where nodes: &[UUID], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::>(); - let update_msg = Arc::new(SyncRPC::Items(values)); - - for res in join_all(nodes.iter().map(|to| { - self.rpc_client - .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) - })) - .await - { - res?; - } + + self.rpc_client.try_call_many( + &nodes[..], + SyncRPC::Items(values), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT)).await?; // All remote nodes have written those items, now we can delete them locally let mut not_removed = 0; @@ -577,7 +571,7 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - pub(crate) async fn handle_rpc(self: &Arc, message: &SyncRPC) -> Result { + async fn handle_rpc(self: &Arc, message: &SyncRPC) -> Result { match message { SyncRPC::RootCkHash(range, h) => { let root_ck = self.get_root_ck(*range)?; diff --git a/src/table/table.rs b/src/table/table.rs index dd3394bd..7b0d9a24 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -18,6 +18,7 @@ use crate::data::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; +use crate::gc::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -44,8 +45,6 @@ pub(crate) enum TableRPC { ReadRange(F::P, Option, Option, usize), Update(Vec>), - - SyncRPC(SyncRPC), } impl RpcMessage for TableRPC {} @@ -76,6 +75,7 @@ where }); let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); + TableGC::launch(data.clone(), aux.clone(), rpc_server); let table = Arc::new(Self { data, @@ -308,10 +308,6 @@ where self.data.update_many(pairs)?; Ok(TableRPC::Ok) } - TableRPC::SyncRPC(rpc) => { - let response = self.syncer.handle_rpc(rpc).await?; - Ok(TableRPC::SyncRPC(response)) - } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), } } -- cgit v1.2.3 From 831eb35763fdaeecb7b6d6aa13ebd78da14db04e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 21:52:19 +0100 Subject: cargo fmt --- src/table/data.rs | 91 ++++++++++++++++++++++++++++------------------------- src/table/gc.rs | 69 ++++++++++++++++++++++++++++------------ src/table/lib.rs | 2 +- src/table/schema.rs | 4 ++- src/table/sync.rs | 11 ++++--- src/table/table.rs | 2 +- 6 files changed, 110 insertions(+), 69 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index 91607f7a..a491f877 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -129,31 +129,32 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - let changed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - let (old_entry, new_entry) = match store.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) + let changed = + (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + let (old_entry, new_entry) = match store.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) + let new_bytes_hash = blake2sum(&new_bytes[..]); + mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; + store.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry, new_bytes_hash))) + } else { + Ok(None) } - None => (None, update.clone()), - }; - - if Some(&new_entry) != old_entry.as_ref() { - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry, new_bytes_hash))) - } else { - Ok(None) - } - })?; + })?; if let Some((old_entry, new_entry, new_bytes_hash)) = changed { let is_tombstone = new_entry.is_tombstone(); @@ -168,16 +169,17 @@ where } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); + let removed = + (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if cur_v == v { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(true); + } } - } - Ok(false) - })?; + Ok(false) + })?; if removed { let old_entry = self.decode_entry(v)?; @@ -187,17 +189,22 @@ where Ok(removed) } - pub(crate) fn delete_if_equal_hash(self: &Arc, k: &[u8], vhash: Hash) -> Result { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); + pub(crate) fn delete_if_equal_hash( + self: &Arc, + k: &[u8], + vhash: Hash, + ) -> Result { + let removed = + (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if blake2sum(&cur_v[..]) == vhash { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(Some(cur_v)); + } } - } - Ok(None) - })?; + Ok(None) + })?; if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; diff --git a/src/table/gc.rs b/src/table/gc.rs index afc8a473..594044b8 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -1,6 +1,6 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use std::collections::HashMap; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -17,9 +17,9 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::data::*; -use crate::table::*; -use crate::schema::*; use crate::replication::*; +use crate::schema::*; +use crate::table::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); @@ -99,7 +99,10 @@ where let vhash = Hash::try_from(&vhash[..]).unwrap(); - let v_opt = self.data.store.get(&k[..])? + let v_opt = self + .data + .store + .get(&k[..])? .filter(|v| blake2sum(&v[..]) == vhash); if let Some(v) = v_opt { @@ -113,7 +116,10 @@ where } for (k, vhash) in excluded { - let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec>(k, Some(vhash), None)?; + let _ = self + .data + .gc_todo + .compare_and_swap::<_, _, Vec>(k, Some(vhash), None)?; } if entries.len() == 0 { @@ -136,18 +142,29 @@ where partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); } - let resps = join_all(partitions.into_iter() - .map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await; + let resps = join_all( + partitions + .into_iter() + .map(|(nodes, items)| self.try_send_and_delete(nodes, items)), + ) + .await; for resp in resps { if let Err(e) = resp { - warn!("({}) Unable to send and delete for GC: {}", self.data.name, e); + warn!( + "({}) Unable to send and delete for GC: {}", + self.data.name, e + ); } } Ok(true) } - async fn try_send_and_delete(&self, nodes: Vec, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> { + async fn try_send_and_delete( + &self, + nodes: Vec, + items: Vec<(ByteBuf, Hash, ByteBuf)>, + ) -> Result<(), Error> { let n_items = items.len(); let mut updates = vec![]; @@ -157,21 +174,33 @@ where deletes.push((k, vhash)); } - self.rpc_client.try_call_many( - &nodes[..], - GcRPC::Update(updates), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; + self.rpc_client + .try_call_many( + &nodes[..], + GcRPC::Update(updates), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + ) + .await?; + + info!( + "({}) GC: {} items successfully pushed, will try to delete.", + self.data.name, n_items + ); - info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items); + self.rpc_client + .try_call_many( + &nodes[..], + GcRPC::DeleteIfEqualHash(deletes.clone()), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + ) + .await?; - self.rpc_client.try_call_many( - &nodes[..], - GcRPC::DeleteIfEqualHash(deletes.clone()), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; - for (k, vhash) in deletes { self.data.delete_if_equal_hash(&k[..], vhash)?; - let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec>(k, Some(vhash), None)?; + let _ = self + .data + .gc_todo + .compare_and_swap::<_, _, Vec>(k, Some(vhash), None)?; } Ok(()) diff --git a/src/table/lib.rs b/src/table/lib.rs index 8a64ff0b..3b73163b 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -8,10 +8,10 @@ pub mod schema; pub mod util; pub mod data; +pub mod gc; pub mod merkle; pub mod replication; pub mod sync; -pub mod gc; pub mod table; pub use schema::*; diff --git a/src/table/schema.rs b/src/table/schema.rs index 5112ea15..4d754664 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -42,7 +42,9 @@ pub trait Entry: fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; - fn is_tombstone(&self) -> bool { false } + fn is_tombstone(&self) -> bool { + false + } } pub trait TableSchema: Send + Sync { diff --git a/src/table/sync.rs b/src/table/sync.rs index aae65852..6c8792d2 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -347,10 +347,13 @@ where ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::>(); - self.rpc_client.try_call_many( - &nodes[..], - SyncRPC::Items(values), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT)).await?; + self.rpc_client + .try_call_many( + &nodes[..], + SyncRPC::Items(values), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT), + ) + .await?; // All remote nodes have written those items, now we can delete them locally let mut not_removed = 0; diff --git a/src/table/table.rs b/src/table/table.rs index 7b0d9a24..2d3c5fe9 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -15,10 +15,10 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; +use crate::gc::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; -use crate::gc::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -- cgit v1.2.3 From 9b118160a8b668b376d25ac16b097bce050f8b67 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 22:06:56 +0100 Subject: Optim & refactor --- src/table/gc.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index 594044b8..5b7f1ee7 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -116,10 +116,7 @@ where } for (k, vhash) in excluded { - let _ = self - .data - .gc_todo - .compare_and_swap::<_, _, Vec>(k, Some(vhash), None)?; + self.todo_remove_if_equal(&k[..], vhash)?; } if entries.len() == 0 { @@ -197,15 +194,20 @@ where for (k, vhash) in deletes { self.data.delete_if_equal_hash(&k[..], vhash)?; - let _ = self - .data - .gc_todo - .compare_and_swap::<_, _, Vec>(k, Some(vhash), None)?; + self.todo_remove_if_equal(&k[..], vhash)?; } Ok(()) } + fn todo_remove_if_equal(&self, key: &[u8], vhash: Hash) -> Result<(), Error> { + let _ = self + .data + .gc_todo + .compare_and_swap::<_, _, Vec>(key, Some(vhash), None)?; + Ok(()) + } + // ---- RPC HANDLER ---- fn register_handler(self: &Arc, rpc_server: &mut RpcServer, path: String) { @@ -232,6 +234,7 @@ where GcRPC::DeleteIfEqualHash(items) => { for (key, vhash) in items.iter() { self.data.delete_if_equal_hash(&key[..], *vhash)?; + self.todo_remove_if_equal(&key[..], *vhash)?; } Ok(GcRPC::Ok) } -- cgit v1.2.3 From 3bf2df622a070fe8f233bec6d60bd5cca995fbfc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 16:21:41 +0100 Subject: Time and metadata improvements --- src/table/crdt/lww.rs | 2 +- src/table/crdt/lww_map.rs | 2 +- src/table/gc.rs | 10 ++++++---- 3 files changed, 8 insertions(+), 6 deletions(-) (limited to 'src/table') diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs index 9a3ab671..25ecdb07 100644 --- a/src/table/crdt/lww.rs +++ b/src/table/crdt/lww.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_util::data::now_msec; +use garage_util::time::now_msec; use crate::crdt::crdt::*; diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs index bd40f368..7b372191 100644 --- a/src/table/crdt/lww_map.rs +++ b/src/table/crdt/lww_map.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_util::data::now_msec; +use garage_util::time::now_msec; use crate::crdt::crdt::*; diff --git a/src/table/gc.rs b/src/table/gc.rs index 5b7f1ee7..c13c8234 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -75,17 +75,19 @@ where match self.gc_loop_iter().await { Ok(true) => { // Stuff was done, loop imediately + continue; } Ok(false) => { - select! { - _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), - _ = must_exit.recv().fuse() => (), - } + // Nothing was done, sleep for some time (below) } Err(e) => { warn!("({}) Error doing GC: {}", self.data.name, e); } } + select! { + _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), + _ = must_exit.recv().fuse() => (), + } } Ok(()) } -- cgit v1.2.3 From 0290afe1f8eafabf71695d677807e07658d078ab Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 18:27:26 +0100 Subject: Make block rc code more understandable --- src/table/merkle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 7a0adba1..a917a028 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -46,7 +46,7 @@ pub struct MerkleUpdater { // Content of the merkle tree: items where // - key = .bytes() for MerkleNodeKey // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found - pub(crate) merkle_tree: sled::Tree, + pub merkle_tree: sled::Tree, empty_node_hash: Hash, } -- cgit v1.2.3 From 537f652fec479c7c5676bba14c23ea6634613122 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 18:40:27 +0100 Subject: Tiny things --- src/table/merkle.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/table') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index a917a028..f60be8a8 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -56,6 +56,7 @@ pub struct MerkleNodeKey { pub partition: MerklePartition, // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) + #[serde(with = "serde_bytes")] pub prefix: Vec, } -- cgit v1.2.3 From 667e4e72a8e64a094d57ceeb6442cef08f1ef0e1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 19:51:16 +0100 Subject: Small fixes --- src/table/data.rs | 6 +++++- src/table/merkle.rs | 12 ++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index a491f877..0a7b2cec 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -18,7 +18,7 @@ pub struct TableData { pub instance: F, pub store: sled::Tree, - pub gc_todo: sled::Tree, + pub(crate) gc_todo: sled::Tree, pub merkle_updater: Arc, } @@ -239,4 +239,8 @@ where }, } } + + pub fn gc_todo_len(&self) -> usize { + self.gc_todo.len() + } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index f60be8a8..aefb5169 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -40,13 +40,13 @@ pub struct MerkleUpdater { // - key = the key of an item in the main table, ie hash(partition_key)+sort_key // - value = the hash of the full serialized item, if present, // or an empty vec if item is absent (deleted) - pub todo: sled::Tree, + pub(crate) todo: sled::Tree, pub(crate) todo_notify: Notify, // Content of the merkle tree: items where // - key = .bytes() for MerkleNodeKey // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found - pub merkle_tree: sled::Tree, + pub(crate) merkle_tree: sled::Tree, empty_node_hash: Hash, } @@ -311,6 +311,14 @@ impl MerkleUpdater { Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), } } + + pub fn merkle_tree_len(&self) -> usize { + self.merkle_tree.len() + } + + pub fn todo_len(&self) -> usize { + self.todo.len() + } } impl MerkleNodeKey { -- cgit v1.2.3 From 4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 20:09:44 +0100 Subject: Refactor block resync loop; make workers infaillible --- src/table/gc.rs | 3 +-- src/table/merkle.rs | 3 +-- src/table/sync.rs | 10 ++++------ 3 files changed, 6 insertions(+), 10 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index c13c8234..fd9a26d1 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -70,7 +70,7 @@ where gc } - async fn gc_loop(self: Arc, mut must_exit: watch::Receiver) -> Result<(), Error> { + async fn gc_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { match self.gc_loop_iter().await { Ok(true) => { @@ -89,7 +89,6 @@ where _ = must_exit.recv().fuse() => (), } } - Ok(()) } async fn gc_loop_iter(&self) -> Result { diff --git a/src/table/merkle.rs b/src/table/merkle.rs index aefb5169..5ce9cee3 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -104,7 +104,7 @@ impl MerkleUpdater { async fn updater_loop( self: Arc, mut must_exit: watch::Receiver, - ) -> Result<(), Error> { + ) { while !*must_exit.borrow() { if let Some(x) = self.todo.iter().next() { match x { @@ -131,7 +131,6 @@ impl MerkleUpdater { } } } - Ok(()) } fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index 6c8792d2..b344eb88 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -136,7 +136,7 @@ where self: Arc, mut must_exit: watch::Receiver, mut busy_rx: mpsc::UnboundedReceiver, - ) -> Result<(), Error> { + ) { let mut prev_ring: Arc = self.aux.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver> = self.aux.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); @@ -183,7 +183,6 @@ where } } } - Ok(()) } pub fn add_full_sync(&self) { @@ -197,11 +196,11 @@ where self: Arc, mut must_exit: watch::Receiver, busy_tx: mpsc::UnboundedSender, - ) -> Result<(), Error> { + ) { while !*must_exit.borrow() { let task = self.todo.lock().unwrap().pop_task(); if let Some(partition) = task { - busy_tx.send(true)?; + busy_tx.send(true).unwrap(); let res = self .clone() .sync_partition(&partition, &mut must_exit) @@ -213,11 +212,10 @@ where ); } } else { - busy_tx.send(false)?; + busy_tx.send(false).unwrap(); tokio::time::delay_for(Duration::from_secs(1)).await; } } - Ok(()) } async fn sync_partition( -- cgit v1.2.3 From 0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 22:36:41 +0100 Subject: WIP migrate to tokio 1 --- src/table/Cargo.toml | 2 +- src/table/data.rs | 6 +++--- src/table/gc.rs | 4 ++-- src/table/merkle.rs | 4 ++-- src/table/sync.rs | 35 ++++++++++++----------------------- 5 files changed, 20 insertions(+), 31 deletions(-) (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6b3aaceb..8f73470e 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -31,5 +31,5 @@ serde_bytes = "0.11" async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/table/data.rs b/src/table/data.rs index 0a7b2cec..0029b936 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -159,7 +159,7 @@ where if let Some((old_entry, new_entry, new_bytes_hash)) = changed { let is_tombstone = new_entry.is_tombstone(); self.instance.updated(old_entry, Some(new_entry)); - self.merkle_updater.todo_notify.notify(); + self.merkle_updater.todo_notify.notify_one(); if is_tombstone { self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; } @@ -184,7 +184,7 @@ where if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify(); + self.merkle_updater.todo_notify.notify_one(); } Ok(removed) } @@ -209,7 +209,7 @@ where if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify(); + self.merkle_updater.todo_notify.notify_one(); Ok(true) } else { Ok(false) diff --git a/src/table/gc.rs b/src/table/gc.rs index fd9a26d1..d37fdf35 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -85,8 +85,8 @@ where } } select! { - _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (), + _ = must_exit.changed().fuse() => (), } } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 5ce9cee3..86289bf1 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -121,13 +121,13 @@ impl MerkleUpdater { "({}) Error while iterating on Merkle todo tree: {}", self.table_name, e ); - tokio::time::delay_for(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } } } else { select! { _ = self.todo_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } } diff --git a/src/table/sync.rs b/src/table/sync.rs index b344eb88..65231cd5 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use futures::{pin_mut, select}; +use futures::{select}; use futures_util::future::*; use futures_util::stream::*; use rand::Rng; @@ -110,7 +110,7 @@ where let s3 = syncer.clone(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(20)).await; + tokio::time::sleep(Duration::from_secs(20)).await; s3.add_full_sync(); }); @@ -142,23 +142,16 @@ where let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { - let s_ring_recv = ring_recv.recv().fuse(); - let s_busy = busy_rx.recv().fuse(); - let s_must_exit = must_exit.recv().fuse(); - let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); - pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); - select! { - new_ring_r = s_ring_recv => { - if let Some(new_ring) = new_ring_r { - if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); - self.add_full_sync(); - prev_ring = new_ring; - } + _ = ring_recv.changed().fuse() => { + let new_ring = ring_recv.borrow(); + if !Arc::ptr_eq(&new_ring, &prev_ring) { + debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); + self.add_full_sync(); + prev_ring = new_ring.clone(); } } - busy_opt = s_busy => { + busy_opt = busy_rx.recv().fuse() => { if let Some(busy) = busy_opt { if busy { nothing_to_do_since = None; @@ -169,12 +162,8 @@ where } } } - must_exit_v = s_must_exit => { - if must_exit_v.unwrap_or(false) { - break; - } - } - _ = s_timeout => { + _ = must_exit.changed().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name); @@ -213,7 +202,7 @@ where } } else { busy_tx.send(false).unwrap(); - tokio::time::delay_for(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; } } } -- cgit v1.2.3 From 6a8439fd1345ecae7414386f76dda7a03eb14df2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 23:14:12 +0100 Subject: Some improvements in background worker but we terminate late --- src/table/merkle.rs | 5 +---- src/table/sync.rs | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) (limited to 'src/table') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 86289bf1..60b7833f 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -101,10 +101,7 @@ impl MerkleUpdater { ret } - async fn updater_loop( - self: Arc, - mut must_exit: watch::Receiver, - ) { + async fn updater_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { if let Some(x) = self.todo.iter().next() { match x { diff --git a/src/table/sync.rs b/src/table/sync.rs index 65231cd5..f8fef53c 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use futures::{select}; +use futures::select; use futures_util::future::*; use futures_util::stream::*; use rand::Rng; -- cgit v1.2.3 From 1d9961e4118af0e26068e1d6c5c6c009a1292a88 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 11:14:27 +0100 Subject: Simplify replication logic --- src/table/gc.rs | 2 +- src/table/replication/fullcopy.rs | 33 ++++++++++++--------------------- src/table/replication/parameters.rs | 13 +++++++------ src/table/replication/sharded.rs | 20 ++++++++++++-------- src/table/sync.rs | 11 ++++------- src/table/table.rs | 10 +++++----- 6 files changed, 41 insertions(+), 48 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index d37fdf35..061c5045 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -130,7 +130,7 @@ where let mut partitions = HashMap::new(); for (k, vhash, v) in entries { let pkh = Hash::try_from(&k[..32]).unwrap(); - let mut nodes = self.aux.replication.write_nodes(&pkh, &self.aux.system); + let mut nodes = self.aux.replication.write_nodes(&pkh); nodes.retain(|x| *x != self.aux.system.id); nodes.sort(); diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index a5faece9..aea8c1f3 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -8,21 +8,10 @@ use crate::replication::*; #[derive(Clone)] pub struct TableFullReplication { + pub system: Arc, pub max_faults: usize, } -#[derive(Clone)] -struct Neighbors { - ring: Arc, - neighbors: Vec, -} - -impl TableFullReplication { - pub fn new(max_faults: usize) -> Self { - TableFullReplication { max_faults } - } -} - impl TableReplication for TableFullReplication { // Full replication schema: all nodes store everything // Writes are disseminated in an epidemic manner in the network @@ -30,18 +19,23 @@ impl TableReplication for TableFullReplication { // Advantage: do all reads locally, extremely fast // Inconvenient: only suitable to reasonably small tables - fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec { - vec![system.id] + fn partition_of(&self, _hash: &Hash) -> u16 { + 0u16 + } + + fn read_nodes(&self, _hash: &Hash) -> Vec { + vec![self.system.id] } fn read_quorum(&self) -> usize { 1 } - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec { - self.replication_nodes(hash, system.ring.borrow().as_ref()) + fn write_nodes(&self, _hash: &Hash) -> Vec { + let ring = self.system.ring.borrow(); + ring.config.members.keys().cloned().collect::>() } - fn write_quorum(&self, system: &System) -> usize { - let nmembers = system.ring.borrow().config.members.len(); + fn write_quorum(&self) -> usize { + let nmembers = self.system.ring.borrow().config.members.len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { @@ -52,9 +46,6 @@ impl TableReplication for TableFullReplication { self.max_faults } - fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec { - ring.config.members.keys().cloned().collect::>() - } fn split_points(&self, _ring: &Ring) -> Vec { let mut ret = vec![]; ret.push([0u8; 32].into()); diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 4607b050..ace82bd9 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,4 +1,3 @@ -use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_util::data::*; @@ -7,16 +6,18 @@ pub trait TableReplication: Send + Sync { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods + // Partition number of data item (for Merkle tree) + fn partition_of(&self, hash: &Hash) -> u16; + // Which nodes to send reads from - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec; + fn read_nodes(&self, hash: &Hash) -> Vec; fn read_quorum(&self) -> usize; // Which nodes to send writes to - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn write_quorum(&self, system: &System) -> usize; + fn write_nodes(&self, hash: &Hash) -> Vec; + fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; - // Which are the nodes that do actually replicate the data - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; + // Get partition boundaries fn split_points(&self, ring: &Ring) -> Vec; } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 886c7c08..966be31a 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_util::data::*; @@ -6,6 +8,7 @@ use crate::replication::*; #[derive(Clone)] pub struct TableShardedReplication { + pub system: Arc, pub replication_factor: usize, pub read_quorum: usize, pub write_quorum: usize, @@ -19,28 +22,29 @@ impl TableReplication for TableShardedReplication { // - reads are done on all of the nodes that replicate the data // - writes as well - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec { - let ring = system.ring.borrow().clone(); + fn partition_of(&self, hash: &Hash) -> u16 { + self.system.ring.borrow().partition_of(hash) + } + + fn read_nodes(&self, hash: &Hash) -> Vec { + let ring = self.system.ring.borrow().clone(); ring.walk_ring(&hash, self.replication_factor) } fn read_quorum(&self) -> usize { self.read_quorum } - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec { - let ring = system.ring.borrow().clone(); + fn write_nodes(&self, hash: &Hash) -> Vec { + let ring = self.system.ring.borrow(); ring.walk_ring(&hash, self.replication_factor) } - fn write_quorum(&self, _system: &System) -> usize { + fn write_quorum(&self) -> usize { self.write_quorum } fn max_write_errors(&self) -> usize { self.replication_factor - self.write_quorum } - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec { - ring.walk_ring(&hash, self.replication_factor) - } fn split_points(&self, ring: &Ring) -> Vec { let mut ret = vec![]; diff --git a/src/table/sync.rs b/src/table/sync.rs index f8fef53c..ac0305e2 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -218,10 +218,7 @@ where let nodes = self .aux .replication - .write_nodes( - &hash_of_merkle_partition(partition.range.begin), - &self.aux.system, - ) + .write_nodes(&hash_of_merkle_partition(partition.range.begin)) .into_iter() .filter(|node| *node != my_id) .collect::>(); @@ -293,7 +290,7 @@ where let nodes = self .aux .replication - .write_nodes(&begin, &self.aux.system) + .write_nodes(&begin) .into_iter() .collect::>(); if nodes.contains(&self.aux.system.id) { @@ -303,7 +300,7 @@ where ); break; } - if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) { + if nodes.len() < self.aux.replication.write_quorum() { return Err(Error::Message(format!( "Not offloading as we don't have a quorum of nodes to write to." ))); @@ -616,7 +613,7 @@ impl SyncTodo { let begin_hash = hash_of_merkle_partition(begin); let end_hash = hash_of_merkle_partition_opt(end); - let nodes = aux.replication.replication_nodes(&begin_hash, &ring); + let nodes = aux.replication.write_nodes(&begin_hash); let retain = nodes.contains(&my_id); if !retain { diff --git a/src/table/table.rs b/src/table/table.rs index 2d3c5fe9..2ce5868f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -91,7 +91,7 @@ where pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash, &self.aux.system); + let who = self.aux.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -101,7 +101,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system)) + RequestStrategy::with_quorum(self.aux.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -113,7 +113,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash, &self.aux.system); + let who = self.aux.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -150,7 +150,7 @@ where sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash, &self.aux.system); + let who = self.aux.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -207,7 +207,7 @@ where limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash, &self.aux.system); + let who = self.aux.replication.read_nodes(&hash); let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); -- cgit v1.2.3 From 515029d026937d29395379c76188f509984b8ace Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 11:43:58 +0100 Subject: Refactor code --- src/table/data.rs | 136 +++++++++++++++++++++++++--------------------------- src/table/gc.rs | 22 ++++----- src/table/merkle.rs | 61 ++++++++++++----------- src/table/sync.rs | 58 +++++++++++----------- src/table/table.rs | 53 ++++++++++---------- 5 files changed, 167 insertions(+), 163 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index 0029b936..9aa2a3bc 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -4,62 +4,59 @@ use std::sync::Arc; use log::warn; use serde_bytes::ByteBuf; use sled::Transactional; +use tokio::sync::Notify; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; use crate::crdt::CRDT; -use crate::merkle::*; +use crate::replication::*; use crate::schema::*; -pub struct TableData { +pub struct TableData { pub name: String, - pub instance: F, + + pub(crate) instance: F, + pub(crate) replication: R, pub store: sled::Tree, + + pub(crate) merkle_tree: sled::Tree, + pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_todo_notify: Notify, pub(crate) gc_todo: sled::Tree, - pub merkle_updater: Arc, } -impl TableData +impl TableData where F: TableSchema, + R: TableReplication, { - pub fn new( - name: String, - instance: F, - db: &sled::Db, - background: Arc, - ) -> Arc { + pub fn new(name: String, instance: F, replication: R, db: &sled::Db) -> Arc { let store = db .open_tree(&format!("{}:table", name)) .expect("Unable to open DB tree"); - let merkle_todo_store = db - .open_tree(&format!("{}:merkle_todo", name)) - .expect("Unable to open DB Merkle TODO tree"); - let merkle_tree_store = db + let merkle_tree = db .open_tree(&format!("{}:merkle_tree", name)) .expect("Unable to open DB Merkle tree tree"); + let merkle_todo = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); let gc_todo = db .open_tree(&format!("{}:gc_todo", name)) .expect("Unable to open DB tree"); - let merkle_updater = MerkleUpdater::launch( - name.clone(), - background, - merkle_todo_store, - merkle_tree_store, - ); - Arc::new(Self { name, instance, + replication, store, + merkle_tree, + merkle_todo, + merkle_todo_notify: Notify::new(), gc_todo, - merkle_updater, }) } @@ -129,37 +126,36 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - let changed = - (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - let (old_entry, new_entry) = match store.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; - - if Some(&new_entry) != old_entry.as_ref() { - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) + let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { + let (old_entry, new_entry) = match store.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry, new_bytes_hash))) - } else { - Ok(None) + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) } - })?; + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let new_bytes_hash = blake2sum(&new_bytes[..]); + mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; + store.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry, new_bytes_hash))) + } else { + Ok(None) + } + })?; if let Some((old_entry, new_entry, new_bytes_hash)) = changed { let is_tombstone = new_entry.is_tombstone(); self.instance.updated(old_entry, Some(new_entry)); - self.merkle_updater.todo_notify.notify_one(); + self.merkle_todo_notify.notify_one(); if is_tombstone { self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; } @@ -169,22 +165,21 @@ where } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = - (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); - } + let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if cur_v == v { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(true); } - Ok(false) - })?; + } + Ok(false) + })?; if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify_one(); + self.merkle_todo_notify.notify_one(); } Ok(removed) } @@ -194,22 +189,21 @@ where k: &[u8], vhash: Hash, ) -> Result { - let removed = - (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); - } + let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if blake2sum(&cur_v[..]) == vhash { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(Some(cur_v)); } - Ok(None) - })?; + } + Ok(None) + })?; if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify_one(); + self.merkle_todo_notify.notify_one(); Ok(true) } else { Ok(false) diff --git a/src/table/gc.rs b/src/table/gc.rs index 061c5045..d99e3e40 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -13,20 +13,20 @@ use tokio::sync::watch; use garage_util::data::*; use garage_util::error::Error; +use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::data::*; use crate::replication::*; use crate::schema::*; -use crate::table::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub struct TableGC { - data: Arc>, - aux: Arc>, + data: Arc>, + system: Arc, rpc_client: Arc>, } @@ -46,23 +46,23 @@ where R: TableReplication + 'static, { pub(crate) fn launch( - data: Arc>, - aux: Arc>, + data: Arc>, + system: Arc, rpc_server: &mut RpcServer, ) -> Arc { let rpc_path = format!("table_{}/gc", data.name); - let rpc_client = aux.system.rpc_client::(&rpc_path); + let rpc_client = system.rpc_client::(&rpc_path); let gc = Arc::new(Self { data: data.clone(), - aux: aux.clone(), + system: system.clone(), rpc_client, }); gc.register_handler(rpc_server, rpc_path); let gc1 = gc.clone(); - aux.system.background.spawn_worker( + system.background.spawn_worker( format!("GC loop for {}", data.name), move |must_exit: watch::Receiver| gc1.gc_loop(must_exit), ); @@ -130,8 +130,8 @@ where let mut partitions = HashMap::new(); for (k, vhash, v) in entries { let pkh = Hash::try_from(&k[..32]).unwrap(); - let mut nodes = self.aux.replication.write_nodes(&pkh); - nodes.retain(|x| *x != self.aux.system.id); + let mut nodes = self.data.replication.write_nodes(&pkh); + nodes.retain(|x| *x != self.system.id); nodes.sort(); if !partitions.contains_key(&nodes) { @@ -220,7 +220,7 @@ where let self2 = self.clone(); self.rpc_client - .set_local_handler(self.aux.system.id, move |msg| { + .set_local_handler(self.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle_rpc(&msg).await } }); diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 60b7833f..8c3dcad9 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -9,12 +9,16 @@ use serde::{Deserialize, Serialize}; use sled::transaction::{ ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, }; -use tokio::sync::{watch, Notify}; +use tokio::sync::watch; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use crate::data::*; +use crate::replication::*; +use crate::schema::*; + pub type MerklePartition = [u8; 2]; pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash { @@ -32,28 +36,30 @@ pub fn hash_of_merkle_partition_opt(p: Option) -> Hash { // 16 bits (two bytes) of item's partition keys' hashes. // It builds one Merkle tree for each of these 2**16 partitions. -pub struct MerkleUpdater { - table_name: String, +pub struct MerkleUpdater { + data: Arc>, background: Arc, // Content of the todo tree: items where // - key = the key of an item in the main table, ie hash(partition_key)+sort_key // - value = the hash of the full serialized item, if present, // or an empty vec if item is absent (deleted) - pub(crate) todo: sled::Tree, - pub(crate) todo_notify: Notify, + // Fields in data: + // pub(crate) merkle_todo: sled::Tree, + // pub(crate) merkle_todo_notify: Notify, // Content of the merkle tree: items where // - key = .bytes() for MerkleNodeKey // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found - pub(crate) merkle_tree: sled::Tree, + // Field in data: + // pub(crate) merkle_tree: sled::Tree, empty_node_hash: Hash, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct MerkleNodeKey { // partition: first 16 bits (two bytes) of the partition_key's hash - pub partition: MerklePartition, + pub partition: [u8; 2], // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) #[serde(with = "serde_bytes")] @@ -74,27 +80,26 @@ pub enum MerkleNode { Leaf(Vec, Hash), } -impl MerkleUpdater { +impl MerkleUpdater +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ pub(crate) fn launch( - table_name: String, + data: Arc>, background: Arc, - todo: sled::Tree, - merkle_tree: sled::Tree, ) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); let ret = Arc::new(Self { - table_name, + data, background, - todo, - todo_notify: Notify::new(), - merkle_tree, empty_node_hash, }); let ret2 = ret.clone(); ret.background.spawn_worker( - format!("Merkle tree updater for {}", ret.table_name), + format!("Merkle tree updater for {}", ret.data.name), |must_exit: watch::Receiver| ret2.updater_loop(must_exit), ); @@ -103,27 +108,27 @@ impl MerkleUpdater { async fn updater_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { - if let Some(x) = self.todo.iter().next() { + if let Some(x) = self.data.merkle_todo.iter().next() { match x { Ok((key, valhash)) => { if let Err(e) = self.update_item(&key[..], &valhash[..]) { warn!( "({}) Error while updating Merkle tree item: {}", - self.table_name, e + self.data.name, e ); } } Err(e) => { warn!( "({}) Error while iterating on Merkle todo tree: {}", - self.table_name, e + self.data.name, e ); tokio::time::sleep(Duration::from_secs(10)).await; } } } else { select! { - _ = self.todo_notify.notified().fuse() => (), + _ = self.data.merkle_todo_notify.notified().fuse() => (), _ = must_exit.changed().fuse() => (), } } @@ -143,18 +148,20 @@ impl MerkleUpdater { partition: k[0..2].try_into().unwrap(), prefix: vec![], }; - self.merkle_tree + self.data + .merkle_tree .transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?; let deleted = self - .todo + .data + .merkle_todo .compare_and_swap::<_, _, Vec>(k, Some(vhash_by), None)? .is_ok(); if !deleted { debug!( "({}) Item not deleted from Merkle todo because it changed: {:?}", - self.table_name, k + self.data.name, k ); } Ok(()) @@ -197,7 +204,7 @@ impl MerkleUpdater { // should not happen warn!( "({}) Replacing intermediate node with empty node, should not happen.", - self.table_name + self.data.name ); Some(MerkleNode::Empty) } else if children.len() == 1 { @@ -301,7 +308,7 @@ impl MerkleUpdater { // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result { - let ent = self.merkle_tree.get(k.encode())?; + let ent = self.data.merkle_tree.get(k.encode())?; match ent { None => Ok(MerkleNode::Empty), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), @@ -309,11 +316,11 @@ impl MerkleUpdater { } pub fn merkle_tree_len(&self) -> usize { - self.merkle_tree.len() + self.data.merkle_tree.len() } pub fn todo_len(&self) -> usize { - self.todo.len() + self.data.merkle_todo.len() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index ac0305e2..9c148393 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::data::*; use garage_util::error::Error; +use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -29,8 +30,9 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); pub struct TableSyncer { - data: Arc>, - aux: Arc>, + system: Arc, + data: Arc>, + merkle: Arc>, todo: Mutex, rpc_client: Arc>, @@ -76,18 +78,20 @@ where R: TableReplication + 'static, { pub(crate) fn launch( - data: Arc>, - aux: Arc>, + system: Arc, + data: Arc>, + merkle: Arc>, rpc_server: &mut RpcServer, ) -> Arc { let rpc_path = format!("table_{}/sync", data.name); - let rpc_client = aux.system.rpc_client::(&rpc_path); + let rpc_client = system.rpc_client::(&rpc_path); let todo = SyncTodo { todo: vec![] }; let syncer = Arc::new(Self { + system: system.clone(), data: data.clone(), - aux: aux.clone(), + merkle, todo: Mutex::new(todo), rpc_client, }); @@ -97,13 +101,13 @@ where let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let s1 = syncer.clone(); - aux.system.background.spawn_worker( + system.background.spawn_worker( format!("table sync watcher for {}", data.name), move |must_exit: watch::Receiver| s1.watcher_task(must_exit, busy_rx), ); let s2 = syncer.clone(); - aux.system.background.spawn_worker( + system.background.spawn_worker( format!("table syncer for {}", data.name), move |must_exit: watch::Receiver| s2.syncer_task(must_exit, busy_tx), ); @@ -126,7 +130,7 @@ where let self2 = self.clone(); self.rpc_client - .set_local_handler(self.aux.system.id, move |msg| { + .set_local_handler(self.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle_rpc(&msg).await } }); @@ -137,8 +141,8 @@ where mut must_exit: watch::Receiver, mut busy_rx: mpsc::UnboundedReceiver, ) { - let mut prev_ring: Arc = self.aux.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver> = self.aux.system.ring.clone(); + let mut prev_ring: Arc = self.system.ring.borrow().clone(); + let mut ring_recv: watch::Receiver> = self.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { @@ -178,7 +182,7 @@ where self.todo .lock() .unwrap() - .add_full_sync(&self.data, &self.aux); + .add_full_sync(&self.data, &self.system); } async fn syncer_task( @@ -213,10 +217,10 @@ where must_exit: &mut watch::Receiver, ) -> Result<(), Error> { if partition.retain { - let my_id = self.aux.system.id; + let my_id = self.system.id; let nodes = self - .aux + .data .replication .write_nodes(&hash_of_merkle_partition(partition.range.begin)) .into_iter() @@ -242,7 +246,7 @@ where warn!("({}) Sync error: {}", self.data.name, e); } } - if n_errors > self.aux.replication.max_write_errors() { + if n_errors > self.data.replication.max_write_errors() { return Err(Error::Message(format!( "Sync failed with too many nodes (should have been: {:?}).", nodes @@ -288,19 +292,19 @@ where if items.len() > 0 { let nodes = self - .aux + .data .replication .write_nodes(&begin) .into_iter() .collect::>(); - if nodes.contains(&self.aux.system.id) { + if nodes.contains(&self.system.id) { warn!( "({}) Interrupting offload as partitions seem to have changed", self.data.name ); break; } - if nodes.len() < self.aux.replication.write_quorum() { + if nodes.len() < self.data.replication.write_quorum() { return Err(Error::Message(format!( "Not offloading as we don't have a quorum of nodes to write to." ))); @@ -376,7 +380,7 @@ where partition: u16::to_be_bytes(i), prefix: vec![], }; - match self.data.merkle_updater.read_node(&key)? { + match self.merkle.read_node(&key)? { MerkleNode::Empty => (), x => { ret.push((key.partition, hash_of(&x)?)); @@ -458,7 +462,7 @@ where while !todo.is_empty() && !*must_exit.borrow() { let key = todo.pop_front().unwrap(); - let node = self.data.merkle_updater.read_node(&key)?; + let node = self.merkle.read_node(&key)?; match node { MerkleNode::Empty => { @@ -570,7 +574,7 @@ where } } SyncRPC::GetNode(k) => { - let node = self.data.merkle_updater.read_node(&k)?; + let node = self.merkle.read_node(&k)?; Ok(SyncRPC::Node(k.clone(), node)) } SyncRPC::Items(items) => { @@ -585,15 +589,15 @@ where impl SyncTodo { fn add_full_sync( &mut self, - data: &TableData, - aux: &TableAux, + data: &TableData, + system: &System, ) { - let my_id = aux.system.id; + let my_id = system.id; self.todo.clear(); - let ring = aux.system.ring.borrow().clone(); - let split_points = aux.replication.split_points(&ring); + let ring = system.ring.borrow().clone(); + let split_points = data.replication.split_points(&ring); for i in 0..split_points.len() { let begin: MerklePartition = { @@ -613,7 +617,7 @@ impl SyncTodo { let begin_hash = hash_of_merkle_partition(begin); let end_hash = hash_of_merkle_partition_opt(end); - let nodes = aux.replication.write_nodes(&begin_hash); + let nodes = data.replication.write_nodes(&begin_hash); let retain = nodes.contains(&my_id); if !retain { diff --git a/src/table/table.rs b/src/table/table.rs index 2ce5868f..f00b4239 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -16,20 +16,17 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; use crate::gc::*; +use crate::merkle::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct TableAux { - pub system: Arc, - pub replication: R, -} - pub struct Table { - pub data: Arc>, - pub aux: Arc>, + pub system: Arc, + pub data: Arc>, + pub merkle_updater: Arc>, pub syncer: Arc>, rpc_client: Arc>>, } @@ -67,19 +64,22 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); - let data = TableData::new(name, instance, db, system.background.clone()); + let data = TableData::new(name, instance, replication, db); - let aux = Arc::new(TableAux { - system, - replication, - }); + let merkle_updater = MerkleUpdater::launch(data.clone(), system.background.clone()); - let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); - TableGC::launch(data.clone(), aux.clone(), rpc_server); + let syncer = TableSyncer::launch( + system.clone(), + data.clone(), + merkle_updater.clone(), + rpc_server, + ); + TableGC::launch(data.clone(), system.clone(), rpc_server); let table = Arc::new(Self { + system, data, - aux, + merkle_updater, syncer, rpc_client, }); @@ -91,7 +91,7 @@ where pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash); + let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -101,7 +101,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.write_quorum()) + RequestStrategy::with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -113,7 +113,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash); + let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -137,7 +137,7 @@ where errors.push(e); } } - if errors.len() > self.aux.replication.max_write_errors() { + if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -150,7 +150,7 @@ where sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash); + let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -159,7 +159,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -190,8 +190,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.aux - .system + self.system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -207,7 +206,7 @@ where limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash); + let who = self.data.replication.read_nodes(&hash); let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); @@ -216,7 +215,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -248,7 +247,7 @@ where } if !to_repair.is_empty() { let self2 = self.clone(); - self.aux.system.background.spawn_cancellable(async move { + self.system.background.spawn_cancellable(async move { for (_, v) in to_repair.iter_mut() { self2.repair_on_read(&who[..], v.take().unwrap()).await?; } @@ -288,7 +287,7 @@ where let self2 = self.clone(); self.rpc_client - .set_local_handler(self.aux.system.id, move |msg| { + .set_local_handler(self.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); -- cgit v1.2.3 From 0aad2f2e066b5914ac94bb319e7679e2e7761b2b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 11:47:39 +0100 Subject: some reordering --- src/table/gc.rs | 6 +++--- src/table/merkle.rs | 6 ++---- src/table/table.rs | 4 ++-- 3 files changed, 7 insertions(+), 9 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index d99e3e40..20ebe3c7 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -25,8 +25,8 @@ const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub struct TableGC { - data: Arc>, system: Arc, + data: Arc>, rpc_client: Arc>, } @@ -46,16 +46,16 @@ where R: TableReplication + 'static, { pub(crate) fn launch( - data: Arc>, system: Arc, + data: Arc>, rpc_server: &mut RpcServer, ) -> Arc { let rpc_path = format!("table_{}/gc", data.name); let rpc_client = system.rpc_client::(&rpc_path); let gc = Arc::new(Self { - data: data.clone(), system: system.clone(), + data: data.clone(), rpc_client, }); diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 8c3dcad9..86fef4c5 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -38,7 +38,6 @@ pub fn hash_of_merkle_partition_opt(p: Option) -> Hash { pub struct MerkleUpdater { data: Arc>, - background: Arc, // Content of the todo tree: items where // - key = the key of an item in the main table, ie hash(partition_key)+sort_key @@ -86,19 +85,18 @@ where R: TableReplication + 'static, { pub(crate) fn launch( + background: &BackgroundRunner, data: Arc>, - background: Arc, ) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); let ret = Arc::new(Self { data, - background, empty_node_hash, }); let ret2 = ret.clone(); - ret.background.spawn_worker( + background.spawn_worker( format!("Merkle tree updater for {}", ret.data.name), |must_exit: watch::Receiver| ret2.updater_loop(must_exit), ); diff --git a/src/table/table.rs b/src/table/table.rs index f00b4239..421c8bf5 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -66,7 +66,7 @@ where let data = TableData::new(name, instance, replication, db); - let merkle_updater = MerkleUpdater::launch(data.clone(), system.background.clone()); + let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); let syncer = TableSyncer::launch( system.clone(), @@ -74,7 +74,7 @@ where merkle_updater.clone(), rpc_server, ); - TableGC::launch(data.clone(), system.clone(), rpc_server); + TableGC::launch(system.clone(), data.clone(), rpc_server); let table = Arc::new(Self { system, -- cgit v1.2.3 From 2a41b8238496dfeac5ee0f273445299cbd112ff6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 12:18:03 +0100 Subject: Simpler Merkle & sync --- src/table/merkle.rs | 29 ++++---- src/table/replication/fullcopy.rs | 15 ++-- src/table/replication/parameters.rs | 10 ++- src/table/replication/sharded.rs | 22 ++---- src/table/sync.rs | 134 ++++++++++-------------------------- 5 files changed, 65 insertions(+), 145 deletions(-) (limited to 'src/table') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 86fef4c5..db05cca4 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -1,4 +1,3 @@ -use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; @@ -15,22 +14,12 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use garage_rpc::ring::*; + use crate::data::*; use crate::replication::*; use crate::schema::*; -pub type MerklePartition = [u8; 2]; - -pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash { - let mut partition_pos = [0u8; 32]; - partition_pos[0..2].copy_from_slice(&p[..]); - partition_pos.into() -} - -pub fn hash_of_merkle_partition_opt(p: Option) -> Hash { - p.map(hash_of_merkle_partition) - .unwrap_or([0xFFu8; 32].into()) -} // This modules partitions the data in 2**16 partitions, based on the top // 16 bits (two bytes) of item's partition keys' hashes. @@ -57,8 +46,8 @@ pub struct MerkleUpdater { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct MerkleNodeKey { - // partition: first 16 bits (two bytes) of the partition_key's hash - pub partition: [u8; 2], + // partition number + pub partition: Partition, // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) #[serde(with = "serde_bytes")] @@ -143,7 +132,7 @@ where }; let key = MerkleNodeKey { - partition: k[0..2].try_into().unwrap(), + partition: self.data.replication.partition_of(&Hash::try_from(&k[0..32]).unwrap()), prefix: vec![], }; self.data @@ -325,7 +314,7 @@ where impl MerkleNodeKey { fn encode(&self) -> Vec { let mut ret = Vec::with_capacity(2 + self.prefix.len()); - ret.extend(&self.partition[..]); + ret.extend(&u16::to_be_bytes(self.partition)[..]); ret.extend(&self.prefix[..]); ret } @@ -443,3 +432,9 @@ fn test_intermediate_aux() { ] ); } + +impl MerkleNode { + pub fn is_empty(&self) -> bool { + *self == MerkleNode::Empty + } +} diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index aea8c1f3..bd658f63 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_util::data::*; use crate::replication::*; @@ -19,10 +19,6 @@ impl TableReplication for TableFullReplication { // Advantage: do all reads locally, extremely fast // Inconvenient: only suitable to reasonably small tables - fn partition_of(&self, _hash: &Hash) -> u16 { - 0u16 - } - fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } @@ -46,9 +42,10 @@ impl TableReplication for TableFullReplication { self.max_faults } - fn split_points(&self, _ring: &Ring) -> Vec { - let mut ret = vec![]; - ret.push([0u8; 32].into()); - ret + fn partition_of(&self, _hash: &Hash) -> Partition { + 0u16 + } + fn partitions(&self) -> Vec<(Partition, Hash)> { + vec![(0u16, [0u8; 32].into())] } } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index ace82bd9..e46bd172 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,4 +1,4 @@ -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_util::data::*; @@ -6,9 +6,6 @@ pub trait TableReplication: Send + Sync { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods - // Partition number of data item (for Merkle tree) - fn partition_of(&self, hash: &Hash) -> u16; - // Which nodes to send reads from fn read_nodes(&self, hash: &Hash) -> Vec; fn read_quorum(&self) -> usize; @@ -18,6 +15,7 @@ pub trait TableReplication: Send + Sync { fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; - // Get partition boundaries - fn split_points(&self, ring: &Ring) -> Vec; + // Accessing partitions, for Merkle tree & sync + fn partition_of(&self, hash: &Hash) -> Partition; + fn partitions(&self) -> Vec<(Partition, Hash)>; } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 966be31a..dce74b03 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_util::data::*; use crate::replication::*; @@ -22,10 +22,6 @@ impl TableReplication for TableShardedReplication { // - reads are done on all of the nodes that replicate the data // - writes as well - fn partition_of(&self, hash: &Hash) -> u16 { - self.system.ring.borrow().partition_of(hash) - } - fn read_nodes(&self, hash: &Hash) -> Vec { let ring = self.system.ring.borrow().clone(); ring.walk_ring(&hash, self.replication_factor) @@ -45,16 +41,10 @@ impl TableReplication for TableShardedReplication { self.replication_factor - self.write_quorum } - fn split_points(&self, ring: &Ring) -> Vec { - let mut ret = vec![]; - - for entry in ring.ring.iter() { - ret.push(entry.location); - } - if ret.len() > 0 { - assert_eq!(ret[0], [0u8; 32].into()); - } - - ret + fn partition_of(&self, hash: &Hash) -> Partition { + self.system.ring.borrow().partition_of(hash) + } + fn partitions(&self) -> Vec<(Partition, Hash)> { + self.system.ring.borrow().partitions() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 9c148393..f5c2ef33 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -1,5 +1,4 @@ use std::collections::VecDeque; -use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -15,7 +14,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -38,20 +37,10 @@ pub struct TableSyncer { rpc_client: Arc>, } -type RootCk = Vec<(MerklePartition, Hash)>; - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct PartitionRange { - begin: MerklePartition, - // if end is None, go all the way to partition 0xFFFF included - end: Option, -} - #[derive(Serialize, Deserialize)] pub(crate) enum SyncRPC { - RootCkHash(PartitionRange, Hash), - RootCkList(PartitionRange, RootCk), - CkNoDifference, + RootCkHash(Partition, Hash), + RootCkDifferent(bool), GetNode(MerkleNodeKey), Node(MerkleNodeKey, MerkleNode), Items(Vec>), @@ -66,7 +55,9 @@ struct SyncTodo { #[derive(Debug, Clone)] struct TodoPartition { - range: PartitionRange, + partition: Partition, + begin: Hash, + end: Hash, // Are we a node that stores this partition or not? retain: bool, @@ -222,7 +213,7 @@ where let nodes = self .data .replication - .write_nodes(&hash_of_merkle_partition(partition.range.begin)) + .write_nodes(&partition.begin) .into_iter() .filter(|node| *node != my_id) .collect::>(); @@ -254,8 +245,8 @@ where } } else { self.offload_partition( - &hash_of_merkle_partition(partition.range.begin), - &hash_of_merkle_partition_opt(partition.range.end), + &partition.begin, + &partition.end, must_exit, ) .await?; @@ -364,30 +355,13 @@ where // side to the other will happen when the other side syncs with us, // which they also do regularly. - fn get_root_ck(&self, range: PartitionRange) -> Result { - let begin = u16::from_be_bytes(range.begin); - let range_iter = match range.end { - Some(end) => { - let end = u16::from_be_bytes(end); - begin..=(end - 1) - } - None => begin..=0xFFFF, + fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> { + let key = MerkleNodeKey { + partition, + prefix: vec![], }; - - let mut ret = vec![]; - for i in range_iter { - let key = MerkleNodeKey { - partition: u16::to_be_bytes(i), - prefix: vec![], - }; - match self.merkle.read_node(&key)? { - MerkleNode::Empty => (), - x => { - ret.push((key.partition, hash_of(&x)?)); - } - } - } - Ok(ret) + let node = self.merkle.read_node(&key)?; + Ok((key, node)) } async fn do_sync_with( @@ -396,7 +370,7 @@ where who: UUID, must_exit: watch::Receiver, ) -> Result<(), Error> { - let root_ck = self.get_root_ck(partition.range)?; + let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; if root_ck.is_empty() { debug!( "({}) Sync {:?} with {:?}: partition is empty.", @@ -404,51 +378,29 @@ where ); return Ok(()); } + let root_ck_hash = hash_of::(&root_ck)?; - let root_ck_hash = hash_of(&root_ck)?; - - // If their root checksum has level > than us, use that as a reference + // Check if they have the same root checksum + // If so, do nothing. let root_resp = self .rpc_client .call( who, - SyncRPC::RootCkHash(partition.range, root_ck_hash), + SyncRPC::RootCkHash(partition.partition, root_ck_hash), TABLE_SYNC_RPC_TIMEOUT, ) .await?; let mut todo = match root_resp { - SyncRPC::CkNoDifference => { + SyncRPC::RootCkDifferent(false) => { debug!( "({}) Sync {:?} with {:?}: no difference", self.data.name, partition, who ); return Ok(()); } - SyncRPC::RootCkList(_, their_root_ck) => { - let join = join_ordered(&root_ck[..], &their_root_ck[..]); - let mut todo = VecDeque::new(); - for (p, v1, v2) in join.iter() { - let diff = match (v1, v2) { - (Some(_), None) | (None, Some(_)) => true, - (Some(a), Some(b)) => a != b, - _ => false, - }; - if diff { - todo.push_back(MerkleNodeKey { - partition: **p, - prefix: vec![], - }); - } - } - debug!( - "({}) Sync {:?} with {:?}: todo.len() = {}", - self.data.name, - partition, - who, - todo.len() - ); - todo + SyncRPC::RootCkDifferent(true) => { + VecDeque::from(vec![root_ck_key]) } x => { return Err(Error::Message(format!( @@ -565,13 +517,9 @@ where async fn handle_rpc(self: &Arc, message: &SyncRPC) -> Result { match message { SyncRPC::RootCkHash(range, h) => { - let root_ck = self.get_root_ck(*range)?; - let hash = hash_of(&root_ck)?; - if hash == *h { - Ok(SyncRPC::CkNoDifference) - } else { - Ok(SyncRPC::RootCkList(*range, root_ck)) - } + let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; + let hash = hash_of::(&root_ck)?; + Ok(SyncRPC::RootCkDifferent(hash != *h)) } SyncRPC::GetNode(k) => { let node = self.merkle.read_node(&k)?; @@ -596,39 +544,31 @@ impl SyncTodo { self.todo.clear(); - let ring = system.ring.borrow().clone(); - let split_points = data.replication.split_points(&ring); + let partitions = data.replication.partitions(); - for i in 0..split_points.len() { - let begin: MerklePartition = { - let b = split_points[i]; - assert_eq!(b.as_slice()[2..], [0u8; 30][..]); - b.as_slice()[..2].try_into().unwrap() - }; + for i in 0..partitions.len() { + let begin = partitions[i].1; - let end: Option = if i + 1 < split_points.len() { - let e = split_points[i + 1]; - assert_eq!(e.as_slice()[2..], [0u8; 30][..]); - Some(e.as_slice()[..2].try_into().unwrap()) + let end = if i + 1 < partitions.len() { + partitions[i+1].1 } else { - None + [0xFFu8; 32].into() }; - let begin_hash = hash_of_merkle_partition(begin); - let end_hash = hash_of_merkle_partition_opt(end); - - let nodes = data.replication.write_nodes(&begin_hash); + let nodes = data.replication.write_nodes(&begin); let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if data.store.range(begin_hash..end_hash).next().is_none() { + if data.store.range(begin..end).next().is_none() { continue; } } self.todo.push(TodoPartition { - range: PartitionRange { begin, end }, + partition: partitions[i].0, + begin, + end, retain, }); } -- cgit v1.2.3 From f4346cc5f45839ace93d2d11ce6beea632fd8f2c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 15:58:40 +0100 Subject: Update dependencies --- src/table/Cargo.toml | 8 +++----- src/table/merkle.rs | 11 +++++------ src/table/sync.rs | 16 +++++----------- 3 files changed, 13 insertions(+), 22 deletions(-) (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 8f73470e..f9d98dec 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -16,19 +16,17 @@ path = "lib.rs" garage_util = { version = "0.1.1", path = "../util" } garage_rpc = { version = "0.1.1", path = "../rpc" } -bytes = "0.4" -rand = "0.7" -hex = "0.3" +bytes = "1.0" +rand = "0.8" log = "0.4" hexdump = "0.1" sled = "0.34" -rmp-serde = "0.14.3" +rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" -async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index db05cca4..8a8eb342 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -20,7 +20,6 @@ use crate::data::*; use crate::replication::*; use crate::schema::*; - // This modules partitions the data in 2**16 partitions, based on the top // 16 bits (two bytes) of item's partition keys' hashes. // It builds one Merkle tree for each of these 2**16 partitions. @@ -73,10 +72,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch( - background: &BackgroundRunner, - data: Arc>, - ) -> Arc { + pub(crate) fn launch(background: &BackgroundRunner, data: Arc>) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); let ret = Arc::new(Self { @@ -132,7 +128,10 @@ where }; let key = MerkleNodeKey { - partition: self.data.replication.partition_of(&Hash::try_from(&k[0..32]).unwrap()), + partition: self + .data + .replication + .partition_of(&Hash::try_from(&k[0..32]).unwrap()), prefix: vec![], }; self.data diff --git a/src/table/sync.rs b/src/table/sync.rs index f5c2ef33..3130abe8 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -244,12 +244,8 @@ where ))); } } else { - self.offload_partition( - &partition.begin, - &partition.end, - must_exit, - ) - .await?; + self.offload_partition(&partition.begin, &partition.end, must_exit) + .await?; } Ok(()) @@ -399,9 +395,7 @@ where ); return Ok(()); } - SyncRPC::RootCkDifferent(true) => { - VecDeque::from(vec![root_ck_key]) - } + SyncRPC::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]), x => { return Err(Error::Message(format!( "Invalid respone to RootCkHash RPC: {}", @@ -550,7 +544,7 @@ impl SyncTodo { let begin = partitions[i].1; let end = if i + 1 < partitions.len() { - partitions[i+1].1 + partitions[i + 1].1 } else { [0xFFu8; 32].into() }; @@ -579,7 +573,7 @@ impl SyncTodo { return None; } - let i = rand::thread_rng().gen_range::(0, self.todo.len()); + let i = rand::thread_rng().gen_range(0..self.todo.len()); if i == self.todo.len() - 1 { self.todo.pop() } else { -- cgit v1.2.3 From 3fadc5cbbd5067e160dbe9cf23e301f74a19186e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 16:35:10 +0100 Subject: Small changes --- src/table/gc.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index 20ebe3c7..a37c052f 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -146,16 +146,19 @@ where .map(|(nodes, items)| self.try_send_and_delete(nodes, items)), ) .await; + + let mut errs = vec![]; for resp in resps { if let Err(e) = resp { - warn!( - "({}) Unable to send and delete for GC: {}", - self.data.name, e - ); + errs.push(e); } } - Ok(true) + if errs.is_empty() { + Ok(true) + } else { + Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::>().join(", "))) + } } async fn try_send_and_delete( -- cgit v1.2.3 From 7b10245dfb741b7f801d1f3eaa56c6cb4f385d65 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 18:42:33 +0100 Subject: Leader-based GC --- src/table/data.rs | 20 +++++++++++++++++--- src/table/table.rs | 2 +- 2 files changed, 18 insertions(+), 4 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index 9aa2a3bc..e07a21d2 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -9,13 +9,16 @@ use tokio::sync::Notify; use garage_util::data::*; use garage_util::error::*; +use garage_rpc::membership::System; + use crate::crdt::CRDT; use crate::replication::*; use crate::schema::*; pub struct TableData { - pub name: String, + system: Arc, + pub name: String, pub(crate) instance: F, pub(crate) replication: R, @@ -32,7 +35,7 @@ where F: TableSchema, R: TableReplication, { - pub fn new(name: String, instance: F, replication: R, db: &sled::Db) -> Arc { + pub fn new(system: Arc, name: String, instance: F, replication: R, db: &sled::Db) -> Arc { let store = db .open_tree(&format!("{}:table", name)) .expect("Unable to open DB tree"); @@ -49,6 +52,7 @@ where .expect("Unable to open DB tree"); Arc::new(Self { + system, name, instance, replication, @@ -157,7 +161,17 @@ where self.instance.updated(old_entry, Some(new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { - self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; + // We are only responsible for GC'ing this item if we are the + // "leader" of the partition, i.e. the first node in the + // set of nodes that replicates this partition. + // This avoids GC loops and does not change the termination properties + // of the GC algorithm, as in all cases GC is suspended if + // any node of the partition is unavailable. + let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); + let nodes = self.replication.write_nodes(&pk_hash); + if nodes.first() == Some(&self.system.id) { + self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; + } } } diff --git a/src/table/table.rs b/src/table/table.rs index 421c8bf5..e203b178 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -64,7 +64,7 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); - let data = TableData::new(name, instance, replication, db); + let data = TableData::new(system.clone(), name, instance, replication, db); let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); -- cgit v1.2.3 From 390ab02f41c32e75e1df2b6893dfa0fd484e8b4b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 20:10:41 +0100 Subject: Todo make a test for the Merkle updater --- src/table/merkle.rs | 103 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 59 insertions(+), 44 deletions(-) (limited to 'src/table') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 8a8eb342..3001786f 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -136,7 +136,7 @@ where }; self.data .merkle_tree - .transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?; + .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; let deleted = self .data @@ -157,7 +157,7 @@ where &self, tx: &TransactionalTree, k: &[u8], - khash: Hash, + khash: &Hash, key: &MerkleNodeKey, new_vhash: Option, ) -> ConflictableTransactionResult, Error> { @@ -195,11 +195,22 @@ where Some(MerkleNode::Empty) } else if children.len() == 1 { // We now have a single node (case when the update deleted one of only two - // children). Move that single child to this level of the tree. + // children). If that node is a leaf, move it to this level. let key_sub = key.add_byte(children[0].0); let subnode = self.read_node_txn(tx, &key_sub)?; - tx.remove(key_sub.encode())?; - Some(subnode) + match subnode { + MerkleNode::Empty => { + warn!("({}) Single subnode in tree is empty Merkle node", self.data.name); + Some(MerkleNode::Empty) + } + MerkleNode::Intermediate(_) => { + Some(MerkleNode::Intermediate(children)) + } + x @ MerkleNode::Leaf(_, _) => { + tx.remove(key_sub.encode())?; + Some(x) + } + } } else { Some(MerkleNode::Intermediate(children)) } @@ -208,37 +219,41 @@ where None } } - MerkleNode::Leaf(exlf_key, exlf_hash) => { - if exlf_key == k { + MerkleNode::Leaf(exlf_k, exlf_vhash) => { + if exlf_k == k { // This leaf is for the same key that the one we are updating match new_vhash { - Some(vhv) if vhv == exlf_hash => None, + Some(vhv) if vhv == exlf_vhash => None, Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)), None => Some(MerkleNode::Empty), } } else { // This is an only leaf for another key - if let Some(vhv) = new_vhash { + if new_vhash.is_some() { // Move that other key to a subnode, create another subnode for our // insertion and replace current node by an intermediary node - let (pos1, h1) = { - let key2 = key.next_key(blake2sum(&exlf_key[..])); - let subhash = self.put_node_txn( - tx, - &key2, - &MerkleNode::Leaf(exlf_key, exlf_hash), - )?; - (key2.prefix[i], subhash) - }; - let (pos2, h2) = { - let key2 = key.next_key(khash); - let subhash = - self.put_node_txn(tx, &key2, &MerkleNode::Leaf(k.to_vec(), vhv))?; - (key2.prefix[i], subhash) - }; let mut int = vec![]; - intermediate_set_child(&mut int, pos1, h1); - intermediate_set_child(&mut int, pos2, h2); + + let exlf_khash = blake2sum(&exlf_k[..]); + assert_eq!(khash.as_slice()[..i], exlf_khash.as_slice()[..i]); + + { + let exlf_subkey = key.next_key(&exlf_khash); + let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap(); + intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash); + assert_eq!(int.len(), 1); + } + + { + let key2 = key.next_key(khash); + let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap(); + intermediate_set_child(&mut int, key2.prefix[i], subhash); + if exlf_khash.as_slice()[i] == khash.as_slice()[i] { + assert_eq!(int.len(), 1); + } else { + assert_eq!(int.len(), 2); + } + } Some(MerkleNode::Intermediate(int)) } else { // Nothing to do, we don't want to insert this value because it is None, @@ -266,11 +281,7 @@ where k: &MerkleNodeKey, ) -> ConflictableTransactionResult { let ent = tx.get(k.encode())?; - match ent { - None => Ok(MerkleNode::Empty), - Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..]) - .map_err(|e| ConflictableTransactionError::Abort(e.into()))?), - } + MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) } fn put_node_txn( @@ -295,10 +306,7 @@ where // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result { let ent = self.data.merkle_tree.get(k.encode())?; - match ent { - None => Ok(MerkleNode::Empty), - Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), - } + MerkleNode::decode_opt(ent) } pub fn merkle_tree_len(&self) -> usize { @@ -318,8 +326,8 @@ impl MerkleNodeKey { ret } - pub fn next_key(&self, h: Hash) -> Self { - assert!(&h.as_slice()[0..self.prefix.len()] == &self.prefix[..]); + pub fn next_key(&self, h: &Hash) -> Self { + assert_eq!(h.as_slice()[0..self.prefix.len()], self.prefix[..]); let mut s2 = self.clone(); s2.prefix.push(h.as_slice()[self.prefix.len()]); s2 @@ -332,6 +340,19 @@ impl MerkleNodeKey { } } +impl MerkleNode { + fn decode_opt(ent: Option) -> Result { + match ent { + None => Ok(MerkleNode::Empty), + Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), + } + } + + pub fn is_empty(&self) -> bool { + *self == MerkleNode::Empty + } +} + fn intermediate_set_child(ch: &mut Vec<(u8, Hash)>, pos: u8, v: Hash) { for i in 0..ch.len() { if ch[i].0 == pos { @@ -342,7 +363,7 @@ fn intermediate_set_child(ch: &mut Vec<(u8, Hash)>, pos: u8, v: Hash) { return; } } - ch.insert(ch.len(), (pos, v)); + ch.push((pos, v)); } fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) { @@ -431,9 +452,3 @@ fn test_intermediate_aux() { ] ); } - -impl MerkleNode { - pub fn is_empty(&self) -> bool { - *self == MerkleNode::Empty - } -} -- cgit v1.2.3