aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/Cargo.toml11
-rw-r--r--src/table/crdt.rs327
-rw-r--r--src/table/crdt/bool.rs34
-rw-r--r--src/table/crdt/crdt.rs73
-rw-r--r--src/table/crdt/lww.rs114
-rw-r--r--src/table/crdt/lww_map.rs145
-rw-r--r--src/table/crdt/map.rs83
-rw-r--r--src/table/crdt/mod.rs22
-rw-r--r--src/table/data.rs254
-rw-r--r--src/table/gc.rs248
-rw-r--r--src/table/lib.rs8
-rw-r--r--src/table/merkle.rs454
-rw-r--r--src/table/replication/fullcopy.rs51
-rw-r--r--src/table/replication/mod.rs6
-rw-r--r--src/table/replication/parameters.rs21
-rw-r--r--src/table/replication/sharded.rs (renamed from src/table/table_sharded.rs)31
-rw-r--r--src/table/schema.rs10
-rw-r--r--src/table/sync.rs614
-rw-r--r--src/table/table.rs229
-rw-r--r--src/table/table_fullcopy.rs59
-rw-r--r--src/table/table_sync.rs891
21 files changed, 2191 insertions, 1494 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 6485a542..f9d98dec 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -16,21 +16,18 @@ path = "lib.rs"
garage_util = { version = "0.1.1", path = "../util" }
garage_rpc = { version = "0.1.1", path = "../rpc" }
-bytes = "0.4"
-rand = "0.7"
-hex = "0.3"
-arc-swap = "0.4"
+bytes = "1.0"
+rand = "0.8"
log = "0.4"
hexdump = "0.1"
sled = "0.34"
-rmp-serde = "0.14.3"
+rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
-async-trait = "0.1.30"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/table/crdt.rs b/src/table/crdt.rs
deleted file mode 100644
index 4cba10ce..00000000
--- a/src/table/crdt.rs
+++ /dev/null
@@ -1,327 +0,0 @@
-//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
-//!
-//! CRDTs are a type of data structures that do not require coordination. In other words, we can
-//! edit them in parallel, we will always find a way to merge it.
-//!
-//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
-//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
-//! it is easy to merge their counters, order does not count: we always get 4.
-//!
-//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
-
-use serde::{Deserialize, Serialize};
-
-use garage_util::data::*;
-
-/// Definition of a CRDT - all CRDT Rust types implement this.
-///
-/// A CRDT is defined as a merge operator that respects a certain set of axioms.
-///
-/// In particular, the merge operator must be commutative, associative,
-/// idempotent, and monotonic.
-/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
-/// the following axioms must apply:
-///
-/// ```text
-/// a ⊔ b = b ⊔ a (commutativity)
-/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
-/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
-/// ```
-///
-/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
-/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
-/// as this would imply a cycle in the partial order.
-pub trait CRDT {
- /// Merge the two datastructures according to the CRDT rules.
- /// `self` is modified to contain the merged CRDT value. `other` is not modified.
- ///
- /// # Arguments
- ///
- /// * `other` - the other CRDT we wish to merge with
- fn merge(&mut self, other: &Self);
-}
-
-/// All types that implement `Ord` (a total order) also implement a trivial CRDT
-/// defined by the merge rule: `a ⊔ b = max(a, b)`.
-impl<T> CRDT for T
-where
- T: Ord + Clone,
-{
- fn merge(&mut self, other: &Self) {
- if other > self {
- *self = other.clone();
- }
- }
-}
-
-// ---- LWW Register ----
-
-/// Last Write Win (LWW)
-///
-/// An LWW CRDT associates a timestamp with a value, in order to implement a
-/// time-based reconciliation rule: the most recent write wins.
-/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
-/// with the same timestamp but different values.
-///
-/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
-/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
-/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
-/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
-/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
-/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
-/// but different inner values, as the rule to keep the maximum value isn't generally the desired
-/// semantics.)
-///
-/// As multiple computers clocks are always desynchronized,
-/// when operations are close enough, it is equivalent to
-/// take one copy and drop the other one.
-///
-/// Given that clocks are not too desynchronized, this assumption
-/// is enough for most cases, as there is few chance that two humans
-/// coordonate themself faster than the time difference between two NTP servers.
-///
-/// As a more concret example, let's suppose you want to upload a file
-/// with the same key (path) in the same bucket at the very same time.
-/// For each request, the file will be timestamped by the receiving server
-/// and may differ from what you observed with your atomic clock!
-///
-/// This scheme is used by AWS S3 or Soundcloud and often without knowing
-/// in entreprise when reconciliating databases with ad-hoc scripts.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWW<T> {
- ts: u64,
- v: T,
-}
-
-impl<T> LWW<T>
-where
- T: CRDT,
-{
- /// Creates a new CRDT
- ///
- /// CRDT's internal timestamp is set with current node's clock.
- pub fn new(value: T) -> Self {
- Self {
- ts: now_msec(),
- v: value,
- }
- }
-
- /// Build a new CRDT from a previous non-compatible one
- ///
- /// Compared to new, the CRDT's timestamp is not set to now
- /// but must be set to the previous, non-compatible, CRDT's timestamp.
- pub fn migrate_from_raw(ts: u64, value: T) -> Self {
- Self { ts, v: value }
- }
-
- /// Update the LWW CRDT while keeping some causal ordering.
- ///
- /// The timestamp of the LWW CRDT is updated to be the current node's clock
- /// at time of update, or the previous timestamp + 1 if that's bigger,
- /// so that the new timestamp is always strictly larger than the previous one.
- /// This ensures that merging the update with the old value will result in keeping
- /// the updated value.
- pub fn update(&mut self, new_value: T) {
- self.ts = std::cmp::max(self.ts + 1, now_msec());
- self.v = new_value;
- }
-
- /// Get the CRDT value
- pub fn get(&self) -> &T {
- &self.v
- }
-
- /// Get a mutable reference to the CRDT's value
- ///
- /// This is usefull to mutate the inside value without changing the LWW timestamp.
- /// When such mutation is done, the merge between two LWW values is done using the inner
- /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
- /// data type, such as a map, and we only want to change a single item in the map.
- /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
- /// This delta consists in a LWW with the same timestamp, and the map
- /// inside only contains the updated value.
- /// The advantage of such a delta is that it is much smaller than the whole map.
- ///
- /// Avoid using this if the inner data type is a primitive type such as a number or a string,
- /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
- /// of both values.
- pub fn get_mut(&mut self) -> &mut T {
- &mut self.v
- }
-}
-
-impl<T> CRDT for LWW<T>
-where
- T: Clone + CRDT,
-{
- fn merge(&mut self, other: &Self) {
- if other.ts > self.ts {
- self.ts = other.ts;
- self.v = other.v.clone();
- } else if other.ts == self.ts {
- self.v.merge(&other.v);
- }
- }
-}
-
-/// Boolean, where `true` is an absorbing state
-#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
-pub struct Bool(bool);
-
-impl Bool {
- /// Create a new boolean with the specified value
- pub fn new(b: bool) -> Self {
- Self(b)
- }
- /// Set the boolean to true
- pub fn set(&mut self) {
- self.0 = true;
- }
- /// Get the boolean value
- pub fn get(&self) -> bool {
- self.0
- }
-}
-
-impl CRDT for Bool {
- fn merge(&mut self, other: &Self) {
- self.0 = self.0 || other.0;
- }
-}
-
-/// Last Write Win Map
-///
-/// This types defines a CRDT for a map from keys to values.
-/// The values have an associated timestamp, such that the last written value
-/// takes precedence over previous ones. As for the simpler `LWW` type, the value
-/// type `V` is also required to implement the CRDT trait.
-/// We do not encourage mutating the values associated with a given key
-/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
-/// method that would allow that.
-///
-/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
-/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
-/// such that two values can be compared for equality based on their hashes). As a consequence,
-/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
-/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
-/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
-/// actually not losing anything here.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWWMap<K, V> {
- vals: Vec<(K, u64, V)>,
-}
-
-impl<K, V> LWWMap<K, V>
-where
- K: Ord,
- V: CRDT,
-{
- /// Create a new empty map CRDT
- pub fn new() -> Self {
- Self { vals: vec![] }
- }
- /// Used to migrate from a map defined in an incompatible format. This produces
- /// a map that contains a single item with the specified timestamp (copied from
- /// the incompatible format). Do this as many times as you have items to migrate,
- /// and put them all together using the CRDT merge operator.
- pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
- Self {
- vals: vec![(k, ts, v)],
- }
- }
- /// Returns a map that contains a single mapping from the specified key to the specified value.
- /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
- /// the previous value will be replaced with the one specified here.
- /// The timestamp in the provided mutator is set to the maximum of the current system's clock
- /// and 1 + the previous value's timestamp (if there is one), so that the new value will always
- /// take precedence (LWW rule).
- ///
- /// Typically, to update the value associated to a key in the map, you would do the following:
- ///
- /// ```ignore
- /// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
- /// my_crdt.merge(&my_update);
- /// ```
- ///
- /// However extracting the mutator on its own and only sending that on the network is very
- /// interesting as it is much smaller than the whole map.
- pub fn update_mutator(&self, k: K, new_v: V) -> Self {
- let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
- Ok(i) => {
- let (_, old_ts, _) = self.vals[i];
- let new_ts = std::cmp::max(old_ts + 1, now_msec());
- vec![(k, new_ts, new_v)]
- }
- Err(_) => vec![(k, now_msec(), new_v)],
- };
- Self { vals: new_vals }
- }
- /// Takes all of the values of the map and returns them. The current map is reset to the
- /// empty map. This is very usefull to produce in-place a new map that contains only a delta
- /// that modifies a certain value:
- ///
- /// ```ignore
- /// let mut a = get_my_crdt_value();
- /// let old_a = a.take_and_clear();
- /// a.merge(&old_a.update_mutator(key_to_modify, new_value));
- /// put_my_crdt_value(a);
- /// ```
- ///
- /// Of course in this simple example we could have written simply
- /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
- /// but in the case where the map is a field in a struct for instance (as is always the case),
- /// this becomes very handy:
- ///
- /// ```ignore
- /// let mut a = get_my_crdt_value();
- /// let old_a_map = a.map_field.take_and_clear();
- /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
- /// put_my_crdt_value(a);
- /// ```
- pub fn take_and_clear(&mut self) -> Self {
- let vals = std::mem::replace(&mut self.vals, vec![]);
- Self { vals }
- }
- /// Removes all values from the map
- pub fn clear(&mut self) {
- self.vals.clear();
- }
- /// Get a reference to the value assigned to a key
- pub fn get(&self, k: &K) -> Option<&V> {
- match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
- Ok(i) => Some(&self.vals[i].2),
- Err(_) => None,
- }
- }
- /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
- /// In most case you will want to ignore the timestamp (second item of the tuple).
- pub fn items(&self) -> &[(K, u64, V)] {
- &self.vals[..]
- }
-}
-
-impl<K, V> CRDT for LWWMap<K, V>
-where
- K: Clone + Ord,
- V: Clone + CRDT,
-{
- fn merge(&mut self, other: &Self) {
- for (k, ts2, v2) in other.vals.iter() {
- match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
- Ok(i) => {
- let (_, ts1, _v1) = &self.vals[i];
- if ts2 > ts1 {
- self.vals[i].1 = *ts2;
- self.vals[i].2 = v2.clone();
- } else if ts1 == ts2 {
- self.vals[i].2.merge(&v2);
- }
- }
- Err(i) => {
- self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
- }
- }
- }
- }
-}
diff --git a/src/table/crdt/bool.rs b/src/table/crdt/bool.rs
new file mode 100644
index 00000000..1989c92e
--- /dev/null
+++ b/src/table/crdt/bool.rs
@@ -0,0 +1,34 @@
+use serde::{Deserialize, Serialize};
+
+use crate::crdt::crdt::*;
+
+/// Boolean, where `true` is an absorbing state
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Bool(bool);
+
+impl Bool {
+ /// Create a new boolean with the specified value
+ pub fn new(b: bool) -> Self {
+ Self(b)
+ }
+ /// Set the boolean to true
+ pub fn set(&mut self) {
+ self.0 = true;
+ }
+ /// Get the boolean value
+ pub fn get(&self) -> bool {
+ self.0
+ }
+}
+
+impl From<bool> for Bool {
+ fn from(b: bool) -> Bool {
+ Bool::new(b)
+ }
+}
+
+impl CRDT for Bool {
+ fn merge(&mut self, other: &Self) {
+ self.0 = self.0 || other.0;
+ }
+}
diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs
new file mode 100644
index 00000000..636b6df6
--- /dev/null
+++ b/src/table/crdt/crdt.rs
@@ -0,0 +1,73 @@
+use garage_util::data::*;
+
+/// Definition of a CRDT - all CRDT Rust types implement this.
+///
+/// A CRDT is defined as a merge operator that respects a certain set of axioms.
+///
+/// In particular, the merge operator must be commutative, associative,
+/// idempotent, and monotonic.
+/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
+/// the following axioms must apply:
+///
+/// ```text
+/// a ⊔ b = b ⊔ a (commutativity)
+/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
+/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
+/// ```
+///
+/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
+/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
+/// as this would imply a cycle in the partial order.
+pub trait CRDT {
+ /// Merge the two datastructures according to the CRDT rules.
+ /// `self` is modified to contain the merged CRDT value. `other` is not modified.
+ ///
+ /// # Arguments
+ ///
+ /// * `other` - the other CRDT we wish to merge with
+ fn merge(&mut self, other: &Self);
+}
+
+/// All types that implement `Ord` (a total order) can also implement a trivial CRDT
+/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type
+/// to enable this behavior.
+pub trait AutoCRDT: Ord + Clone + std::fmt::Debug {
+ /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if
+ /// different values in your application should never happen. Set this to false
+ /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`.
+ const WARN_IF_DIFFERENT: bool;
+}
+
+impl<T> CRDT for T
+where
+ T: AutoCRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ if Self::WARN_IF_DIFFERENT && self != other {
+ warn!(
+ "Different CRDT values should be the same (logic error!): {:?} vs {:?}",
+ self, other
+ );
+ if other > self {
+ *self = other.clone();
+ }
+ warn!("Making an arbitrary choice: {:?}", self);
+ } else {
+ if other > self {
+ *self = other.clone();
+ }
+ }
+ }
+}
+
+impl AutoCRDT for String {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCRDT for bool {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCRDT for FixedBytes32 {
+ const WARN_IF_DIFFERENT: bool = true;
+}
diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs
new file mode 100644
index 00000000..25ecdb07
--- /dev/null
+++ b/src/table/crdt/lww.rs
@@ -0,0 +1,114 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::time::now_msec;
+
+use crate::crdt::crdt::*;
+
+/// Last Write Win (LWW)
+///
+/// An LWW CRDT associates a timestamp with a value, in order to implement a
+/// time-based reconciliation rule: the most recent write wins.
+/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
+/// with the same timestamp but different values.
+///
+/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
+/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
+/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
+/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
+/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
+/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
+/// but different inner values, as the rule to keep the maximum value isn't generally the desired
+/// semantics.)
+///
+/// As multiple computers clocks are always desynchronized,
+/// when operations are close enough, it is equivalent to
+/// take one copy and drop the other one.
+///
+/// Given that clocks are not too desynchronized, this assumption
+/// is enough for most cases, as there is few chance that two humans
+/// coordonate themself faster than the time difference between two NTP servers.
+///
+/// As a more concret example, let's suppose you want to upload a file
+/// with the same key (path) in the same bucket at the very same time.
+/// For each request, the file will be timestamped by the receiving server
+/// and may differ from what you observed with your atomic clock!
+///
+/// This scheme is used by AWS S3 or Soundcloud and often without knowing
+/// in entreprise when reconciliating databases with ad-hoc scripts.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWW<T> {
+ ts: u64,
+ v: T,
+}
+
+impl<T> LWW<T>
+where
+ T: CRDT,
+{
+ /// Creates a new CRDT
+ ///
+ /// CRDT's internal timestamp is set with current node's clock.
+ pub fn new(value: T) -> Self {
+ Self {
+ ts: now_msec(),
+ v: value,
+ }
+ }
+
+ /// Build a new CRDT from a previous non-compatible one
+ ///
+ /// Compared to new, the CRDT's timestamp is not set to now
+ /// but must be set to the previous, non-compatible, CRDT's timestamp.
+ pub fn migrate_from_raw(ts: u64, value: T) -> Self {
+ Self { ts, v: value }
+ }
+
+ /// Update the LWW CRDT while keeping some causal ordering.
+ ///
+ /// The timestamp of the LWW CRDT is updated to be the current node's clock
+ /// at time of update, or the previous timestamp + 1 if that's bigger,
+ /// so that the new timestamp is always strictly larger than the previous one.
+ /// This ensures that merging the update with the old value will result in keeping
+ /// the updated value.
+ pub fn update(&mut self, new_value: T) {
+ self.ts = std::cmp::max(self.ts + 1, now_msec());
+ self.v = new_value;
+ }
+
+ /// Get the CRDT value
+ pub fn get(&self) -> &T {
+ &self.v
+ }
+
+ /// Get a mutable reference to the CRDT's value
+ ///
+ /// This is usefull to mutate the inside value without changing the LWW timestamp.
+ /// When such mutation is done, the merge between two LWW values is done using the inner
+ /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
+ /// data type, such as a map, and we only want to change a single item in the map.
+ /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
+ /// This delta consists in a LWW with the same timestamp, and the map
+ /// inside only contains the updated value.
+ /// The advantage of such a delta is that it is much smaller than the whole map.
+ ///
+ /// Avoid using this if the inner data type is a primitive type such as a number or a string,
+ /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
+ /// of both values.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.v
+ }
+}
+
+impl<T> CRDT for LWW<T>
+where
+ T: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ if other.ts > self.ts {
+ self.ts = other.ts;
+ self.v = other.v.clone();
+ } else if other.ts == self.ts {
+ self.v.merge(&other.v);
+ }
+ }
+}
diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs
new file mode 100644
index 00000000..7b372191
--- /dev/null
+++ b/src/table/crdt/lww_map.rs
@@ -0,0 +1,145 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::time::now_msec;
+
+use crate::crdt::crdt::*;
+
+/// Last Write Win Map
+///
+/// This types defines a CRDT for a map from keys to values.
+/// The values have an associated timestamp, such that the last written value
+/// takes precedence over previous ones. As for the simpler `LWW` type, the value
+/// type `V` is also required to implement the CRDT trait.
+/// We do not encourage mutating the values associated with a given key
+/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
+/// method that would allow that.
+///
+/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
+/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
+/// such that two values can be compared for equality based on their hashes). As a consequence,
+/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
+/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
+/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
+/// actually not losing anything here.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWWMap<K, V> {
+ vals: Vec<(K, u64, V)>,
+}
+
+impl<K, V> LWWMap<K, V>
+where
+ K: Ord,
+ V: CRDT,
+{
+ /// Create a new empty map CRDT
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+ /// Used to migrate from a map defined in an incompatible format. This produces
+ /// a map that contains a single item with the specified timestamp (copied from
+ /// the incompatible format). Do this as many times as you have items to migrate,
+ /// and put them all together using the CRDT merge operator.
+ pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
+ Self {
+ vals: vec![(k, ts, v)],
+ }
+ }
+ /// Returns a map that contains a single mapping from the specified key to the specified value.
+ /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
+ /// the previous value will be replaced with the one specified here.
+ /// The timestamp in the provided mutator is set to the maximum of the current system's clock
+ /// and 1 + the previous value's timestamp (if there is one), so that the new value will always
+ /// take precedence (LWW rule).
+ ///
+ /// Typically, to update the value associated to a key in the map, you would do the following:
+ ///
+ /// ```ignore
+ /// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
+ /// my_crdt.merge(&my_update);
+ /// ```
+ ///
+ /// However extracting the mutator on its own and only sending that on the network is very
+ /// interesting as it is much smaller than the whole map.
+ pub fn update_mutator(&self, k: K, new_v: V) -> Self {
+ let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, old_ts, _) = self.vals[i];
+ let new_ts = std::cmp::max(old_ts + 1, now_msec());
+ vec![(k, new_ts, new_v)]
+ }
+ Err(_) => vec![(k, now_msec(), new_v)],
+ };
+ Self { vals: new_vals }
+ }
+ /// Takes all of the values of the map and returns them. The current map is reset to the
+ /// empty map. This is very usefull to produce in-place a new map that contains only a delta
+ /// that modifies a certain value:
+ ///
+ /// ```ignore
+ /// let mut a = get_my_crdt_value();
+ /// let old_a = a.take_and_clear();
+ /// a.merge(&old_a.update_mutator(key_to_modify, new_value));
+ /// put_my_crdt_value(a);
+ /// ```
+ ///
+ /// Of course in this simple example we could have written simply
+ /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
+ /// but in the case where the map is a field in a struct for instance (as is always the case),
+ /// this becomes very handy:
+ ///
+ /// ```ignore
+ /// let mut a = get_my_crdt_value();
+ /// let old_a_map = a.map_field.take_and_clear();
+ /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
+ /// put_my_crdt_value(a);
+ /// ```
+ pub fn take_and_clear(&mut self) -> Self {
+ let vals = std::mem::replace(&mut self.vals, vec![]);
+ Self { vals }
+ }
+ /// Removes all values from the map
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+ /// Get a reference to the value assigned to a key
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => Some(&self.vals[i].2),
+ Err(_) => None,
+ }
+ }
+ /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
+ /// In most case you will want to ignore the timestamp (second item of the tuple).
+ pub fn items(&self) -> &[(K, u64, V)] {
+ &self.vals[..]
+ }
+ /// Returns the number of items in the map
+ pub fn len(&self) -> usize {
+ self.vals.len()
+ }
+}
+
+impl<K, V> CRDT for LWWMap<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, ts2, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, ts1, _v1) = &self.vals[i];
+ if ts2 > ts1 {
+ self.vals[i].1 = *ts2;
+ self.vals[i].2 = v2.clone();
+ } else if ts1 == ts2 {
+ self.vals[i].2.merge(&v2);
+ }
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs
new file mode 100644
index 00000000..1193e6db
--- /dev/null
+++ b/src/table/crdt/map.rs
@@ -0,0 +1,83 @@
+use serde::{Deserialize, Serialize};
+
+use crate::crdt::crdt::*;
+
+/// Simple CRDT Map
+///
+/// This types defines a CRDT for a map from keys to values. Values are CRDT types which
+/// can have their own updating logic.
+///
+/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
+/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
+/// such that two values can be compared for equality based on their hashes). As a consequence,
+/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps.
+/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
+/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
+/// actually not losing anything here.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Map<K, V> {
+ vals: Vec<(K, V)>,
+}
+
+impl<K, V> Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ /// Create a new empty map CRDT
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+
+ /// Returns a map that contains a single mapping from the specified key to the specified value.
+ /// This can be used to build a delta-mutator:
+ /// when merged with another map, the value will be added or CRDT-merged if a previous
+ /// value already exists.
+ pub fn put_mutator(k: K, v: V) -> Self {
+ Self { vals: vec![(k, v)] }
+ }
+
+ pub fn put(&mut self, k: K, v: V) {
+ self.merge(&Self::put_mutator(k, v));
+ }
+
+ /// Removes all values from the map
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+
+ /// Get a reference to the value assigned to a key
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
+ Ok(i) => Some(&self.vals[i].1),
+ Err(_) => None,
+ }
+ }
+ /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
+ pub fn items(&self) -> &[(K, V)] {
+ &self.vals[..]
+ }
+ /// Returns the number of items in the map
+ pub fn len(&self) -> usize {
+ self.vals.len()
+ }
+}
+
+impl<K, V> CRDT for Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ self.vals[i].1.merge(&v2);
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), v2.clone()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/table/crdt/mod.rs b/src/table/crdt/mod.rs
new file mode 100644
index 00000000..eb75d061
--- /dev/null
+++ b/src/table/crdt/mod.rs
@@ -0,0 +1,22 @@
+//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
+//!
+//! CRDTs are a type of data structures that do not require coordination. In other words, we can
+//! edit them in parallel, we will always find a way to merge it.
+//!
+//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
+//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
+//! it is easy to merge their counters, order does not count: we always get 4.
+//!
+//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
+
+mod bool;
+mod crdt;
+mod lww;
+mod lww_map;
+mod map;
+
+pub use self::bool::*;
+pub use crdt::*;
+pub use lww::*;
+pub use lww_map::*;
+pub use map::*;
diff --git a/src/table/data.rs b/src/table/data.rs
new file mode 100644
index 00000000..e07a21d2
--- /dev/null
+++ b/src/table/data.rs
@@ -0,0 +1,254 @@
+use core::borrow::Borrow;
+use std::sync::Arc;
+
+use log::warn;
+use serde_bytes::ByteBuf;
+use sled::Transactional;
+use tokio::sync::Notify;
+
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_rpc::membership::System;
+
+use crate::crdt::CRDT;
+use crate::replication::*;
+use crate::schema::*;
+
+pub struct TableData<F: TableSchema, R: TableReplication> {
+ system: Arc<System>,
+
+ pub name: String,
+ pub(crate) instance: F,
+ pub(crate) replication: R,
+
+ pub store: sled::Tree,
+
+ pub(crate) merkle_tree: sled::Tree,
+ pub(crate) merkle_todo: sled::Tree,
+ pub(crate) merkle_todo_notify: Notify,
+ pub(crate) gc_todo: sled::Tree,
+}
+
+impl<F, R> TableData<F, R>
+where
+ F: TableSchema,
+ R: TableReplication,
+{
+ pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
+ let store = db
+ .open_tree(&format!("{}:table", name))
+ .expect("Unable to open DB tree");
+
+ let merkle_tree = db
+ .open_tree(&format!("{}:merkle_tree", name))
+ .expect("Unable to open DB Merkle tree tree");
+ let merkle_todo = db
+ .open_tree(&format!("{}:merkle_todo", name))
+ .expect("Unable to open DB Merkle TODO tree");
+
+ let gc_todo = db
+ .open_tree(&format!("{}:gc_todo", name))
+ .expect("Unable to open DB tree");
+
+ Arc::new(Self {
+ system,
+ name,
+ instance,
+ replication,
+ store,
+ merkle_tree,
+ merkle_todo,
+ merkle_todo_notify: Notify::new(),
+ gc_todo,
+ })
+ }
+
+ // Read functions
+
+ pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
+ let tree_key = self.tree_key(p, s);
+ if let Some(bytes) = self.store.get(&tree_key)? {
+ Ok(Some(ByteBuf::from(bytes.to_vec())))
+ } else {
+ Ok(None)
+ }
+ }
+
+ pub fn read_range(
+ &self,
+ p: &F::P,
+ s: &Option<F::S>,
+ filter: &Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<Arc<ByteBuf>>, Error> {
+ let partition_hash = p.hash();
+ let first_key = match s {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.tree_key(p, sk),
+ };
+ let mut ret = vec![];
+ for item in self.store.range(first_key..) {
+ let (key, value) = item?;
+ if &key[..32] != partition_hash.as_slice() {
+ break;
+ }
+ let keep = match filter {
+ None => true,
+ Some(f) => {
+ let entry = self.decode_entry(value.as_ref())?;
+ F::matches_filter(&entry, f)
+ }
+ };
+ if keep {
+ ret.push(Arc::new(ByteBuf::from(value.as_ref())));
+ }
+ if ret.len() >= limit {
+ break;
+ }
+ }
+ Ok(ret)
+ }
+
+ // Mutation functions
+ // When changing this code, take care of propagating modifications correctly:
+ // - When an entry is modified or deleted, call the updated() function
+ // on the table instance
+ // - When an entry is modified or deleted, add it to the merkle updater's todo list.
+ // This has to be done atomically with the modification for the merkle updater
+ // to maintain consistency. The merkle updater must then be notified with todo_notify.
+ // - When an entry is updated to be a tombstone, add it to the gc_todo tree
+
+ pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> {
+ for update_bytes in entries.iter() {
+ self.update_entry(update_bytes.borrow().as_slice())?;
+ }
+ Ok(())
+ }
+
+ pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
+ let update = self.decode_entry(update_bytes)?;
+ let tree_key = self.tree_key(update.partition_key(), update.sort_key());
+
+ let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ let (old_entry, new_entry) = match store.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = self
+ .decode_entry(&prev_bytes)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let mut new_entry = old_entry.clone();
+ new_entry.merge(&update);
+ (Some(old_entry), new_entry)
+ }
+ None => (None, update.clone()),
+ };
+
+ if Some(&new_entry) != old_entry.as_ref() {
+ let new_bytes = rmp_to_vec_all_named(&new_entry)
+ .map_err(Error::RMPEncode)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let new_bytes_hash = blake2sum(&new_bytes[..]);
+ mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
+ store.insert(tree_key.clone(), new_bytes)?;
+ Ok(Some((old_entry, new_entry, new_bytes_hash)))
+ } else {
+ Ok(None)
+ }
+ })?;
+
+ if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ let is_tombstone = new_entry.is_tombstone();
+ self.instance.updated(old_entry, Some(new_entry));
+ self.merkle_todo_notify.notify_one();
+ if is_tombstone {
+ // We are only responsible for GC'ing this item if we are the
+ // "leader" of the partition, i.e. the first node in the
+ // set of nodes that replicates this partition.
+ // This avoids GC loops and does not change the termination properties
+ // of the GC algorithm, as in all cases GC is suspended if
+ // any node of the partition is unavailable.
+ let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
+ let nodes = self.replication.write_nodes(&pk_hash);
+ if nodes.first() == Some(&self.system.id) {
+ self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
+ let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if cur_v == v {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(true);
+ }
+ }
+ Ok(false)
+ })?;
+
+ if removed {
+ let old_entry = self.decode_entry(v)?;
+ self.instance.updated(Some(old_entry), None);
+ self.merkle_todo_notify.notify_one();
+ }
+ Ok(removed)
+ }
+
+ pub(crate) fn delete_if_equal_hash(
+ self: &Arc<Self>,
+ k: &[u8],
+ vhash: Hash,
+ ) -> Result<bool, Error> {
+ let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if blake2sum(&cur_v[..]) == vhash {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(Some(cur_v));
+ }
+ }
+ Ok(None)
+ })?;
+
+ if let Some(old_v) = removed {
+ let old_entry = self.decode_entry(&old_v[..])?;
+ self.instance.updated(Some(old_entry), None);
+ self.merkle_todo_notify.notify_one();
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
+ // ---- Utility functions ----
+
+ pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
+ let mut ret = p.hash().to_vec();
+ ret.extend(s.sort_key());
+ ret
+ }
+
+ pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
+ match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
+ Ok(x) => Ok(x),
+ Err(e) => match F::try_migrate(bytes) {
+ Some(x) => Ok(x),
+ None => {
+ warn!("Unable to decode entry of {}: {}", self.name, e);
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(e.into())
+ }
+ },
+ }
+ }
+
+ pub fn gc_todo_len(&self) -> usize {
+ self.gc_todo.len()
+ }
+}
diff --git a/src/table/gc.rs b/src/table/gc.rs
new file mode 100644
index 00000000..a37c052f
--- /dev/null
+++ b/src/table/gc.rs
@@ -0,0 +1,248 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+
+use futures::future::join_all;
+use futures::select;
+use futures_util::future::*;
+use tokio::sync::watch;
+
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_rpc::membership::System;
+use garage_rpc::rpc_client::*;
+use garage_rpc::rpc_server::*;
+
+use crate::data::*;
+use crate::replication::*;
+use crate::schema::*;
+
+const TABLE_GC_BATCH_SIZE: usize = 1024;
+const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub struct TableGC<F: TableSchema, R: TableReplication> {
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+
+ rpc_client: Arc<RpcClient<GcRPC>>,
+}
+
+#[derive(Serialize, Deserialize)]
+enum GcRPC {
+ Update(Vec<ByteBuf>),
+ DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
+ Ok,
+}
+
+impl RpcMessage for GcRPC {}
+
+impl<F, R> TableGC<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ let rpc_path = format!("table_{}/gc", data.name);
+ let rpc_client = system.rpc_client::<GcRPC>(&rpc_path);
+
+ let gc = Arc::new(Self {
+ system: system.clone(),
+ data: data.clone(),
+ rpc_client,
+ });
+
+ gc.register_handler(rpc_server, rpc_path);
+
+ let gc1 = gc.clone();
+ system.background.spawn_worker(
+ format!("GC loop for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
+ );
+
+ gc
+ }
+
+ async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
+ while !*must_exit.borrow() {
+ match self.gc_loop_iter().await {
+ Ok(true) => {
+ // Stuff was done, loop imediately
+ continue;
+ }
+ Ok(false) => {
+ // Nothing was done, sleep for some time (below)
+ }
+ Err(e) => {
+ warn!("({}) Error doing GC: {}", self.data.name, e);
+ }
+ }
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (),
+ _ = must_exit.changed().fuse() => (),
+ }
+ }
+ }
+
+ async fn gc_loop_iter(&self) -> Result<bool, Error> {
+ let mut entries = vec![];
+ let mut excluded = vec![];
+
+ for item in self.data.gc_todo.iter() {
+ let (k, vhash) = item?;
+
+ let vhash = Hash::try_from(&vhash[..]).unwrap();
+
+ let v_opt = self
+ .data
+ .store
+ .get(&k[..])?
+ .filter(|v| blake2sum(&v[..]) == vhash);
+
+ if let Some(v) = v_opt {
+ entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec())));
+ if entries.len() >= TABLE_GC_BATCH_SIZE {
+ break;
+ }
+ } else {
+ excluded.push((k, vhash));
+ }
+ }
+
+ for (k, vhash) in excluded {
+ self.todo_remove_if_equal(&k[..], vhash)?;
+ }
+
+ if entries.len() == 0 {
+ // Nothing to do in this iteration
+ return Ok(false);
+ }
+
+ debug!("({}) GC: doing {} items", self.data.name, entries.len());
+
+ let mut partitions = HashMap::new();
+ for (k, vhash, v) in entries {
+ let pkh = Hash::try_from(&k[..32]).unwrap();
+ let mut nodes = self.data.replication.write_nodes(&pkh);
+ nodes.retain(|x| *x != self.system.id);
+ nodes.sort();
+
+ if !partitions.contains_key(&nodes) {
+ partitions.insert(nodes.clone(), vec![]);
+ }
+ partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
+ }
+
+ let resps = join_all(
+ partitions
+ .into_iter()
+ .map(|(nodes, items)| self.try_send_and_delete(nodes, items)),
+ )
+ .await;
+
+ let mut errs = vec![];
+ for resp in resps {
+ if let Err(e) = resp {
+ errs.push(e);
+ }
+ }
+
+ if errs.is_empty() {
+ Ok(true)
+ } else {
+ Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", ")))
+ }
+ }
+
+ async fn try_send_and_delete(
+ &self,
+ nodes: Vec<UUID>,
+ items: Vec<(ByteBuf, Hash, ByteBuf)>,
+ ) -> Result<(), Error> {
+ let n_items = items.len();
+
+ let mut updates = vec![];
+ let mut deletes = vec![];
+ for (k, vhash, v) in items {
+ updates.push(v);
+ deletes.push((k, vhash));
+ }
+
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ GcRPC::Update(updates),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ )
+ .await?;
+
+ info!(
+ "({}) GC: {} items successfully pushed, will try to delete.",
+ self.data.name, n_items
+ );
+
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ GcRPC::DeleteIfEqualHash(deletes.clone()),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ )
+ .await?;
+
+ for (k, vhash) in deletes {
+ self.data.delete_if_equal_hash(&k[..], vhash)?;
+ self.todo_remove_if_equal(&k[..], vhash)?;
+ }
+
+ Ok(())
+ }
+
+ fn todo_remove_if_equal(&self, key: &[u8], vhash: Hash) -> Result<(), Error> {
+ let _ = self
+ .data
+ .gc_todo
+ .compare_and_swap::<_, _, Vec<u8>>(key, Some(vhash), None)?;
+ Ok(())
+ }
+
+ // ---- RPC HANDLER ----
+
+ fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ let self2 = self.clone();
+ rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+
+ let self2 = self.clone();
+ self.rpc_client
+ .set_local_handler(self.system.id, move |msg| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+ }
+
+ async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> {
+ match message {
+ GcRPC::Update(items) => {
+ self.data.update_many(items)?;
+ Ok(GcRPC::Ok)
+ }
+ GcRPC::DeleteIfEqualHash(items) => {
+ for (key, vhash) in items.iter() {
+ self.data.delete_if_equal_hash(&key[..], *vhash)?;
+ self.todo_remove_if_equal(&key[..], *vhash)?;
+ }
+ Ok(GcRPC::Ok)
+ }
+ _ => Err(Error::Message(format!("Unexpected GC RPC"))),
+ }
+ }
+}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 704f8f1e..3b73163b 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -7,10 +7,12 @@ pub mod crdt;
pub mod schema;
pub mod util;
+pub mod data;
+pub mod gc;
+pub mod merkle;
+pub mod replication;
+pub mod sync;
pub mod table;
-pub mod table_fullcopy;
-pub mod table_sharded;
-pub mod table_sync;
pub use schema::*;
pub use table::*;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
new file mode 100644
index 00000000..3001786f
--- /dev/null
+++ b/src/table/merkle.rs
@@ -0,0 +1,454 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use futures::select;
+use futures_util::future::*;
+use log::{debug, warn};
+use serde::{Deserialize, Serialize};
+use sled::transaction::{
+ ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
+};
+use tokio::sync::watch;
+
+use garage_util::background::BackgroundRunner;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_rpc::ring::*;
+
+use crate::data::*;
+use crate::replication::*;
+use crate::schema::*;
+
+// This modules partitions the data in 2**16 partitions, based on the top
+// 16 bits (two bytes) of item's partition keys' hashes.
+// It builds one Merkle tree for each of these 2**16 partitions.
+
+pub struct MerkleUpdater<F: TableSchema, R: TableReplication> {
+ data: Arc<TableData<F, R>>,
+
+ // Content of the todo tree: items where
+ // - key = the key of an item in the main table, ie hash(partition_key)+sort_key
+ // - value = the hash of the full serialized item, if present,
+ // or an empty vec if item is absent (deleted)
+ // Fields in data:
+ // pub(crate) merkle_todo: sled::Tree,
+ // pub(crate) merkle_todo_notify: Notify,
+
+ // Content of the merkle tree: items where
+ // - key = .bytes() for MerkleNodeKey
+ // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
+ // Field in data:
+ // pub(crate) merkle_tree: sled::Tree,
+ empty_node_hash: Hash,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct MerkleNodeKey {
+ // partition number
+ pub partition: Partition,
+
+ // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
+ #[serde(with = "serde_bytes")]
+ pub prefix: Vec<u8>,
+}
+
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
+pub enum MerkleNode {
+ // The empty Merkle node
+ Empty,
+
+ // An intermediate Merkle tree node for a prefix
+ // Contains the hashes of the 256 possible next prefixes
+ Intermediate(Vec<(u8, Hash)>),
+
+ // A final node for an item
+ // Contains the full key of the item and the hash of the value
+ Leaf(Vec<u8>, Hash),
+}
+
+impl<F, R> MerkleUpdater<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
+
+ let ret = Arc::new(Self {
+ data,
+ empty_node_hash,
+ });
+
+ let ret2 = ret.clone();
+ background.spawn_worker(
+ format!("Merkle tree updater for {}", ret.data.name),
+ |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
+ );
+
+ ret
+ }
+
+ async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
+ while !*must_exit.borrow() {
+ if let Some(x) = self.data.merkle_todo.iter().next() {
+ match x {
+ Ok((key, valhash)) => {
+ if let Err(e) = self.update_item(&key[..], &valhash[..]) {
+ warn!(
+ "({}) Error while updating Merkle tree item: {}",
+ self.data.name, e
+ );
+ }
+ }
+ Err(e) => {
+ warn!(
+ "({}) Error while iterating on Merkle todo tree: {}",
+ self.data.name, e
+ );
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ }
+ }
+ } else {
+ select! {
+ _ = self.data.merkle_todo_notify.notified().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
+ }
+ }
+ }
+ }
+
+ fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
+ let khash = blake2sum(k);
+
+ let new_vhash = if vhash_by.len() == 0 {
+ None
+ } else {
+ Some(Hash::try_from(&vhash_by[..]).unwrap())
+ };
+
+ let key = MerkleNodeKey {
+ partition: self
+ .data
+ .replication
+ .partition_of(&Hash::try_from(&k[0..32]).unwrap()),
+ prefix: vec![],
+ };
+ self.data
+ .merkle_tree
+ .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
+
+ let deleted = self
+ .data
+ .merkle_todo
+ .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
+ .is_ok();
+
+ if !deleted {
+ debug!(
+ "({}) Item not deleted from Merkle todo because it changed: {:?}",
+ self.data.name, k
+ );
+ }
+ Ok(())
+ }
+
+ fn update_item_rec(
+ &self,
+ tx: &TransactionalTree,
+ k: &[u8],
+ khash: &Hash,
+ key: &MerkleNodeKey,
+ new_vhash: Option<Hash>,
+ ) -> ConflictableTransactionResult<Option<Hash>, Error> {
+ let i = key.prefix.len();
+
+ // Read node at current position (defined by the prefix stored in key)
+ // Calculate an update to apply to this node
+ // This update is an Option<_>, so that it is None if the update is a no-op
+ // and we can thus skip recalculating and re-storing everything
+ let mutate = match self.read_node_txn(tx, &key)? {
+ MerkleNode::Empty => {
+ if let Some(vhv) = new_vhash {
+ Some(MerkleNode::Leaf(k.to_vec(), vhv))
+ } else {
+ // Nothing to do, keep empty node
+ None
+ }
+ }
+ MerkleNode::Intermediate(mut children) => {
+ let key2 = key.next_key(khash);
+ if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? {
+ // Subtree changed, update this node as well
+ if subhash == self.empty_node_hash {
+ intermediate_rm_child(&mut children, key2.prefix[i]);
+ } else {
+ intermediate_set_child(&mut children, key2.prefix[i], subhash);
+ }
+
+ if children.len() == 0 {
+ // should not happen
+ warn!(
+ "({}) Replacing intermediate node with empty node, should not happen.",
+ self.data.name
+ );
+ Some(MerkleNode::Empty)
+ } else if children.len() == 1 {
+ // We now have a single node (case when the update deleted one of only two
+ // children). If that node is a leaf, move it to this level.
+ let key_sub = key.add_byte(children[0].0);
+ let subnode = self.read_node_txn(tx, &key_sub)?;
+ match subnode {
+ MerkleNode::Empty => {
+ warn!("({}) Single subnode in tree is empty Merkle node", self.data.name);
+ Some(MerkleNode::Empty)
+ }
+ MerkleNode::Intermediate(_) => {
+ Some(MerkleNode::Intermediate(children))
+ }
+ x @ MerkleNode::Leaf(_, _) => {
+ tx.remove(key_sub.encode())?;
+ Some(x)
+ }
+ }
+ } else {
+ Some(MerkleNode::Intermediate(children))
+ }
+ } else {
+ // Subtree not changed, nothing to do
+ None
+ }
+ }
+ MerkleNode::Leaf(exlf_k, exlf_vhash) => {
+ if exlf_k == k {
+ // This leaf is for the same key that the one we are updating
+ match new_vhash {
+ Some(vhv) if vhv == exlf_vhash => None,
+ Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)),
+ None => Some(MerkleNode::Empty),
+ }
+ } else {
+ // This is an only leaf for another key
+ if new_vhash.is_some() {
+ // Move that other key to a subnode, create another subnode for our
+ // insertion and replace current node by an intermediary node
+ let mut int = vec![];
+
+ let exlf_khash = blake2sum(&exlf_k[..]);
+ assert_eq!(khash.as_slice()[..i], exlf_khash.as_slice()[..i]);
+
+ {
+ let exlf_subkey = key.next_key(&exlf_khash);
+ let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap();
+ intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash);
+ assert_eq!(int.len(), 1);
+ }
+
+ {
+ let key2 = key.next_key(khash);
+ let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap();
+ intermediate_set_child(&mut int, key2.prefix[i], subhash);
+ if exlf_khash.as_slice()[i] == khash.as_slice()[i] {
+ assert_eq!(int.len(), 1);
+ } else {
+ assert_eq!(int.len(), 2);
+ }
+ }
+ Some(MerkleNode::Intermediate(int))
+ } else {
+ // Nothing to do, we don't want to insert this value because it is None,
+ // and we don't want to change the other value because it's for something
+ // else
+ None
+ }
+ }
+ }
+ };
+
+ if let Some(new_node) = mutate {
+ let hash = self.put_node_txn(tx, &key, &new_node)?;
+ Ok(Some(hash))
+ } else {
+ Ok(None)
+ }
+ }
+
+ // Merkle tree node manipulation
+
+ fn read_node_txn(
+ &self,
+ tx: &TransactionalTree,
+ k: &MerkleNodeKey,
+ ) -> ConflictableTransactionResult<MerkleNode, Error> {
+ let ent = tx.get(k.encode())?;
+ MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort)
+ }
+
+ fn put_node_txn(
+ &self,
+ tx: &TransactionalTree,
+ k: &MerkleNodeKey,
+ v: &MerkleNode,
+ ) -> ConflictableTransactionResult<Hash, Error> {
+ trace!("Put Merkle node: {:?} => {:?}", k, v);
+ if *v == MerkleNode::Empty {
+ tx.remove(k.encode())?;
+ Ok(self.empty_node_hash)
+ } else {
+ let vby = rmp_to_vec_all_named(v)
+ .map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
+ let rethash = blake2sum(&vby[..]);
+ tx.insert(k.encode(), vby)?;
+ Ok(rethash)
+ }
+ }
+
+ // Access a node in the Merkle tree, used by the sync protocol
+ pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
+ let ent = self.data.merkle_tree.get(k.encode())?;
+ MerkleNode::decode_opt(ent)
+ }
+
+ pub fn merkle_tree_len(&self) -> usize {
+ self.data.merkle_tree.len()
+ }
+
+ pub fn todo_len(&self) -> usize {
+ self.data.merkle_todo.len()
+ }
+}
+
+impl MerkleNodeKey {
+ fn encode(&self) -> Vec<u8> {
+ let mut ret = Vec::with_capacity(2 + self.prefix.len());
+ ret.extend(&u16::to_be_bytes(self.partition)[..]);
+ ret.extend(&self.prefix[..]);
+ ret
+ }
+
+ pub fn next_key(&self, h: &Hash) -> Self {
+ assert_eq!(h.as_slice()[0..self.prefix.len()], self.prefix[..]);
+ let mut s2 = self.clone();
+ s2.prefix.push(h.as_slice()[self.prefix.len()]);
+ s2
+ }
+
+ pub fn add_byte(&self, b: u8) -> Self {
+ let mut s2 = self.clone();
+ s2.prefix.push(b);
+ s2
+ }
+}
+
+impl MerkleNode {
+ fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> {
+ match ent {
+ None => Ok(MerkleNode::Empty),
+ Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ *self == MerkleNode::Empty
+ }
+}
+
+fn intermediate_set_child(ch: &mut Vec<(u8, Hash)>, pos: u8, v: Hash) {
+ for i in 0..ch.len() {
+ if ch[i].0 == pos {
+ ch[i].1 = v;
+ return;
+ } else if ch[i].0 > pos {
+ ch.insert(i, (pos, v));
+ return;
+ }
+ }
+ ch.push((pos, v));
+}
+
+fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) {
+ for i in 0..ch.len() {
+ if ch[i].0 == pos {
+ ch.remove(i);
+ return;
+ }
+ }
+}
+
+#[test]
+fn test_intermediate_aux() {
+ let mut v = vec![];
+
+ intermediate_set_child(&mut v, 12u8, [12u8; 32].into());
+ assert_eq!(v, vec![(12u8, [12u8; 32].into())]);
+
+ intermediate_set_child(&mut v, 42u8, [42u8; 32].into());
+ assert_eq!(
+ v,
+ vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]
+ );
+
+ intermediate_set_child(&mut v, 4u8, [4u8; 32].into());
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (12u8, [12u8; 32].into()),
+ (42u8, [42u8; 32].into())
+ ]
+ );
+
+ intermediate_set_child(&mut v, 12u8, [8u8; 32].into());
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (12u8, [8u8; 32].into()),
+ (42u8, [42u8; 32].into())
+ ]
+ );
+
+ intermediate_set_child(&mut v, 6u8, [6u8; 32].into());
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [6u8; 32].into()),
+ (12u8, [8u8; 32].into()),
+ (42u8, [42u8; 32].into())
+ ]
+ );
+
+ intermediate_rm_child(&mut v, 42u8);
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [6u8; 32].into()),
+ (12u8, [8u8; 32].into())
+ ]
+ );
+
+ intermediate_rm_child(&mut v, 11u8);
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [6u8; 32].into()),
+ (12u8, [8u8; 32].into())
+ ]
+ );
+
+ intermediate_rm_child(&mut v, 6u8);
+ assert_eq!(v, vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]);
+
+ intermediate_set_child(&mut v, 6u8, [7u8; 32].into());
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [7u8; 32].into()),
+ (12u8, [8u8; 32].into())
+ ]
+ );
+}
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
new file mode 100644
index 00000000..bd658f63
--- /dev/null
+++ b/src/table/replication/fullcopy.rs
@@ -0,0 +1,51 @@
+use std::sync::Arc;
+
+use garage_rpc::membership::System;
+use garage_rpc::ring::*;
+use garage_util::data::*;
+
+use crate::replication::*;
+
+#[derive(Clone)]
+pub struct TableFullReplication {
+ pub system: Arc<System>,
+ pub max_faults: usize,
+}
+
+impl TableReplication for TableFullReplication {
+ // Full replication schema: all nodes store everything
+ // Writes are disseminated in an epidemic manner in the network
+
+ // Advantage: do all reads locally, extremely fast
+ // Inconvenient: only suitable to reasonably small tables
+
+ fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
+ vec![self.system.id]
+ }
+ fn read_quorum(&self) -> usize {
+ 1
+ }
+
+ fn write_nodes(&self, _hash: &Hash) -> Vec<UUID> {
+ let ring = self.system.ring.borrow();
+ ring.config.members.keys().cloned().collect::<Vec<_>>()
+ }
+ fn write_quorum(&self) -> usize {
+ let nmembers = self.system.ring.borrow().config.members.len();
+ if nmembers > self.max_faults {
+ nmembers - self.max_faults
+ } else {
+ 1
+ }
+ }
+ fn max_write_errors(&self) -> usize {
+ self.max_faults
+ }
+
+ fn partition_of(&self, _hash: &Hash) -> Partition {
+ 0u16
+ }
+ fn partitions(&self) -> Vec<(Partition, Hash)> {
+ vec![(0u16, [0u8; 32].into())]
+ }
+}
diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs
new file mode 100644
index 00000000..d43d7f19
--- /dev/null
+++ b/src/table/replication/mod.rs
@@ -0,0 +1,6 @@
+mod parameters;
+
+pub mod fullcopy;
+pub mod sharded;
+
+pub use parameters::*;
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
new file mode 100644
index 00000000..e46bd172
--- /dev/null
+++ b/src/table/replication/parameters.rs
@@ -0,0 +1,21 @@
+use garage_rpc::ring::*;
+
+use garage_util::data::*;
+
+pub trait TableReplication: Send + Sync {
+ // See examples in table_sharded.rs and table_fullcopy.rs
+ // To understand various replication methods
+
+ // Which nodes to send reads from
+ fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
+ fn read_quorum(&self) -> usize;
+
+ // Which nodes to send writes to
+ fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
+ fn write_quorum(&self) -> usize;
+ fn max_write_errors(&self) -> usize;
+
+ // Accessing partitions, for Merkle tree & sync
+ fn partition_of(&self, hash: &Hash) -> Partition;
+ fn partitions(&self) -> Vec<(Partition, Hash)>;
+}
diff --git a/src/table/table_sharded.rs b/src/table/replication/sharded.rs
index 098637dd..dce74b03 100644
--- a/src/table/table_sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,11 +1,14 @@
+use std::sync::Arc;
+
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
+use garage_rpc::ring::*;
use garage_util::data::*;
-use crate::*;
+use crate::replication::*;
#[derive(Clone)]
pub struct TableShardedReplication {
+ pub system: Arc<System>,
pub replication_factor: usize,
pub read_quorum: usize,
pub write_quorum: usize,
@@ -19,35 +22,29 @@ impl TableReplication for TableShardedReplication {
// - reads are done on all of the nodes that replicate the data
// - writes as well
- fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
- let ring = system.ring.borrow().clone();
+ fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
+ let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
- let ring = system.ring.borrow().clone();
+ fn write_nodes(&self, hash: &Hash) -> Vec<UUID> {
+ let ring = self.system.ring.borrow();
ring.walk_ring(&hash, self.replication_factor)
}
- fn write_quorum(&self, _system: &System) -> usize {
+ fn write_quorum(&self) -> usize {
self.write_quorum
}
fn max_write_errors(&self) -> usize {
self.replication_factor - self.write_quorum
}
- fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> {
- ring.walk_ring(&hash, self.replication_factor)
+ fn partition_of(&self, hash: &Hash) -> Partition {
+ self.system.ring.borrow().partition_of(hash)
}
- fn split_points(&self, ring: &Ring) -> Vec<Hash> {
- let mut ret = vec![];
-
- for entry in ring.ring.iter() {
- ret.push(entry.location);
- }
- ret.push([0xFFu8; 32].into());
- ret
+ fn partitions(&self) -> Vec<(Partition, Hash)> {
+ self.system.ring.borrow().partitions()
}
}
diff --git a/src/table/schema.rs b/src/table/schema.rs
index edd04000..4d754664 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -2,13 +2,15 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
+use crate::crdt::CRDT;
+
pub trait PartitionKey {
fn hash(&self) -> Hash;
}
impl PartitionKey for String {
fn hash(&self) -> Hash {
- sha256sum(self.as_bytes())
+ blake2sum(self.as_bytes())
}
}
@@ -35,12 +37,14 @@ impl SortKey for Hash {
}
pub trait Entry<P: PartitionKey, S: SortKey>:
- PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+ CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
- fn merge(&mut self, other: &Self);
+ fn is_tombstone(&self) -> bool {
+ false
+ }
}
pub trait TableSchema: Send + Sync {
diff --git a/src/table/sync.rs b/src/table/sync.rs
new file mode 100644
index 00000000..3130abe8
--- /dev/null
+++ b/src/table/sync.rs
@@ -0,0 +1,614 @@
+use std::collections::VecDeque;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::select;
+use futures_util::future::*;
+use futures_util::stream::*;
+use rand::Rng;
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+use tokio::sync::{mpsc, watch};
+
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_rpc::membership::System;
+use garage_rpc::ring::*;
+use garage_rpc::rpc_client::*;
+use garage_rpc::rpc_server::*;
+
+use crate::data::*;
+use crate::merkle::*;
+use crate::replication::*;
+use crate::*;
+
+const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+
+// Do anti-entropy every 10 minutes
+const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
+
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ merkle: Arc<MerkleUpdater<F, R>>,
+
+ todo: Mutex<SyncTodo>,
+ rpc_client: Arc<RpcClient<SyncRPC>>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub(crate) enum SyncRPC {
+ RootCkHash(Partition, Hash),
+ RootCkDifferent(bool),
+ GetNode(MerkleNodeKey),
+ Node(MerkleNodeKey, MerkleNode),
+ Items(Vec<Arc<ByteBuf>>),
+ Ok,
+}
+
+impl RpcMessage for SyncRPC {}
+
+struct SyncTodo {
+ todo: Vec<TodoPartition>,
+}
+
+#[derive(Debug, Clone)]
+struct TodoPartition {
+ partition: Partition,
+ begin: Hash,
+ end: Hash,
+
+ // Are we a node that stores this partition or not?
+ retain: bool,
+}
+
+impl<F, R> TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ merkle: Arc<MerkleUpdater<F, R>>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ let rpc_path = format!("table_{}/sync", data.name);
+ let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path);
+
+ let todo = SyncTodo { todo: vec![] };
+
+ let syncer = Arc::new(Self {
+ system: system.clone(),
+ data: data.clone(),
+ merkle,
+ todo: Mutex::new(todo),
+ rpc_client,
+ });
+
+ syncer.register_handler(rpc_server, rpc_path);
+
+ let (busy_tx, busy_rx) = mpsc::unbounded_channel();
+
+ let s1 = syncer.clone();
+ system.background.spawn_worker(
+ format!("table sync watcher for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
+ );
+
+ let s2 = syncer.clone();
+ system.background.spawn_worker(
+ format!("table syncer for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
+ );
+
+ let s3 = syncer.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(20)).await;
+ s3.add_full_sync();
+ });
+
+ syncer
+ }
+
+ fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ let self2 = self.clone();
+ rpc_server.add_handler::<SyncRPC, _, _>(path, move |msg, _addr| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+
+ let self2 = self.clone();
+ self.rpc_client
+ .set_local_handler(self.system.id, move |msg| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+ }
+
+ async fn watcher_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ mut busy_rx: mpsc::UnboundedReceiver<bool>,
+ ) {
+ let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
+ let mut nothing_to_do_since = Some(Instant::now());
+
+ while !*must_exit.borrow() {
+ select! {
+ _ = ring_recv.changed().fuse() => {
+ let new_ring = ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &prev_ring) {
+ debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
+ self.add_full_sync();
+ prev_ring = new_ring.clone();
+ }
+ }
+ busy_opt = busy_rx.recv().fuse() => {
+ if let Some(busy) = busy_opt {
+ if busy {
+ nothing_to_do_since = None;
+ } else {
+ if nothing_to_do_since.is_none() {
+ nothing_to_do_since = Some(Instant::now());
+ }
+ }
+ }
+ }
+ _ = must_exit.changed().fuse() => (),
+ _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
+ if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
+ nothing_to_do_since = None;
+ debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name);
+ self.add_full_sync();
+ }
+ }
+ }
+ }
+ }
+
+ pub fn add_full_sync(&self) {
+ self.todo
+ .lock()
+ .unwrap()
+ .add_full_sync(&self.data, &self.system);
+ }
+
+ async fn syncer_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ busy_tx: mpsc::UnboundedSender<bool>,
+ ) {
+ while !*must_exit.borrow() {
+ let task = self.todo.lock().unwrap().pop_task();
+ if let Some(partition) = task {
+ busy_tx.send(true).unwrap();
+ let res = self
+ .clone()
+ .sync_partition(&partition, &mut must_exit)
+ .await;
+ if let Err(e) = res {
+ warn!(
+ "({}) Error while syncing {:?}: {}",
+ self.data.name, partition, e
+ );
+ }
+ } else {
+ busy_tx.send(false).unwrap();
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ }
+ }
+ }
+
+ async fn sync_partition(
+ self: Arc<Self>,
+ partition: &TodoPartition,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ if partition.retain {
+ let my_id = self.system.id;
+
+ let nodes = self
+ .data
+ .replication
+ .write_nodes(&partition.begin)
+ .into_iter()
+ .filter(|node| *node != my_id)
+ .collect::<Vec<_>>();
+
+ debug!(
+ "({}) Syncing {:?} with {:?}...",
+ self.data.name, partition, nodes
+ );
+ let mut sync_futures = nodes
+ .iter()
+ .map(|node| {
+ self.clone()
+ .do_sync_with(partition.clone(), *node, must_exit.clone())
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let mut n_errors = 0;
+ while let Some(r) = sync_futures.next().await {
+ if let Err(e) = r {
+ n_errors += 1;
+ warn!("({}) Sync error: {}", self.data.name, e);
+ }
+ }
+ if n_errors > self.data.replication.max_write_errors() {
+ return Err(Error::Message(format!(
+ "Sync failed with too many nodes (should have been: {:?}).",
+ nodes
+ )));
+ }
+ } else {
+ self.offload_partition(&partition.begin, &partition.end, must_exit)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ // Offload partition: this partition is not something we are storing,
+ // so send it out to all other nodes that store it and delete items locally.
+ // We don't bother checking if the remote nodes already have the items,
+ // we just batch-send everything. Offloading isn't supposed to happen very often.
+ // If any of the nodes that are supposed to store the items is unable to
+ // save them, we interrupt the process.
+ async fn offload_partition(
+ self: &Arc<Self>,
+ begin: &Hash,
+ end: &Hash,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut counter: usize = 0;
+
+ while !*must_exit.borrow() {
+ let mut items = Vec::new();
+
+ for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
+ let (key, value) = item?;
+ items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
+
+ if items.len() >= 1024 {
+ break;
+ }
+ }
+
+ if items.len() > 0 {
+ let nodes = self
+ .data
+ .replication
+ .write_nodes(&begin)
+ .into_iter()
+ .collect::<Vec<_>>();
+ if nodes.contains(&self.system.id) {
+ warn!(
+ "({}) Interrupting offload as partitions seem to have changed",
+ self.data.name
+ );
+ break;
+ }
+ if nodes.len() < self.data.replication.write_quorum() {
+ return Err(Error::Message(format!(
+ "Not offloading as we don't have a quorum of nodes to write to."
+ )));
+ }
+
+ counter += 1;
+ info!(
+ "({}) Offloading {} items from {:?}..{:?} ({})",
+ self.data.name,
+ items.len(),
+ begin,
+ end,
+ counter
+ );
+ self.offload_items(&items, &nodes[..]).await?;
+ } else {
+ break;
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn offload_items(
+ self: &Arc<Self>,
+ items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
+ nodes: &[UUID],
+ ) -> Result<(), Error> {
+ let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
+
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ SyncRPC::Items(values),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ )
+ .await?;
+
+ // All remote nodes have written those items, now we can delete them locally
+ let mut not_removed = 0;
+ for (k, v) in items.iter() {
+ if !self.data.delete_if_equal(&k[..], &v[..])? {
+ not_removed += 1;
+ }
+ }
+
+ if not_removed > 0 {
+ debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed);
+ }
+
+ Ok(())
+ }
+
+ // ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
+ // The driver side is only concerned with sending out the item it has
+ // and the other side might not have. Receiving items that differ from one
+ // side to the other will happen when the other side syncs with us,
+ // which they also do regularly.
+
+ fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> {
+ let key = MerkleNodeKey {
+ partition,
+ prefix: vec![],
+ };
+ let node = self.merkle.read_node(&key)?;
+ Ok((key, node))
+ }
+
+ async fn do_sync_with(
+ self: Arc<Self>,
+ partition: TodoPartition,
+ who: UUID,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
+ if root_ck.is_empty() {
+ debug!(
+ "({}) Sync {:?} with {:?}: partition is empty.",
+ self.data.name, partition, who
+ );
+ return Ok(());
+ }
+ let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
+
+ // Check if they have the same root checksum
+ // If so, do nothing.
+ let root_resp = self
+ .rpc_client
+ .call(
+ who,
+ SyncRPC::RootCkHash(partition.partition, root_ck_hash),
+ TABLE_SYNC_RPC_TIMEOUT,
+ )
+ .await?;
+
+ let mut todo = match root_resp {
+ SyncRPC::RootCkDifferent(false) => {
+ debug!(
+ "({}) Sync {:?} with {:?}: no difference",
+ self.data.name, partition, who
+ );
+ return Ok(());
+ }
+ SyncRPC::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]),
+ x => {
+ return Err(Error::Message(format!(
+ "Invalid respone to RootCkHash RPC: {}",
+ debug_serialize(x)
+ )));
+ }
+ };
+
+ let mut todo_items = vec![];
+
+ while !todo.is_empty() && !*must_exit.borrow() {
+ let key = todo.pop_front().unwrap();
+ let node = self.merkle.read_node(&key)?;
+
+ match node {
+ MerkleNode::Empty => {
+ // They have items we don't have.
+ // We don't request those items from them, they will send them.
+ // We only bother with pushing items that differ
+ }
+ MerkleNode::Leaf(ik, ivhash) => {
+ // Just send that item directly
+ if let Some(val) = self.data.store.get(&ik[..])? {
+ if blake2sum(&val[..]) != ivhash {
+ warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
+ }
+ todo_items.push(val.to_vec());
+ } else {
+ warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
+ }
+ }
+ MerkleNode::Intermediate(l) => {
+ // Get Merkle node for this tree position at remote node
+ // and compare it with local node
+ let remote_node = match self
+ .rpc_client
+ .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
+ .await?
+ {
+ SyncRPC::Node(_, node) => node,
+ x => {
+ return Err(Error::Message(format!(
+ "Invalid respone to GetNode RPC: {}",
+ debug_serialize(x)
+ )));
+ }
+ };
+ let int_l2 = match remote_node {
+ // If they have an intermediate node at this tree position,
+ // we can compare them to find differences
+ MerkleNode::Intermediate(l2) => l2,
+ // Otherwise, treat it as if they have nothing for this subtree,
+ // which will have the consequence of sending them everything
+ _ => vec![],
+ };
+
+ let join = join_ordered(&l[..], &int_l2[..]);
+ for (p, v1, v2) in join.into_iter() {
+ let diff = match (v1, v2) {
+ (Some(_), None) | (None, Some(_)) => true,
+ (Some(a), Some(b)) => a != b,
+ _ => false,
+ };
+ if diff {
+ todo.push_back(key.add_byte(*p));
+ }
+ }
+ }
+ }
+
+ if todo_items.len() >= 256 {
+ self.send_items(who, std::mem::replace(&mut todo_items, vec![]))
+ .await?;
+ }
+ }
+
+ if !todo_items.is_empty() {
+ self.send_items(who, todo_items).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn send_items(&self, who: UUID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ info!(
+ "({}) Sending {} items to {:?}",
+ self.data.name,
+ item_value_list.len(),
+ who
+ );
+
+ let values = item_value_list
+ .into_iter()
+ .map(|x| Arc::new(ByteBuf::from(x)))
+ .collect::<Vec<_>>();
+
+ let rpc_resp = self
+ .rpc_client
+ .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT)
+ .await?;
+ if let SyncRPC::Ok = rpc_resp {
+ Ok(())
+ } else {
+ Err(Error::Message(format!(
+ "Unexpected response to RPC Update: {}",
+ debug_serialize(&rpc_resp)
+ )))
+ }
+ }
+
+ // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
+
+ async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
+ match message {
+ SyncRPC::RootCkHash(range, h) => {
+ let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
+ let hash = hash_of::<MerkleNode>(&root_ck)?;
+ Ok(SyncRPC::RootCkDifferent(hash != *h))
+ }
+ SyncRPC::GetNode(k) => {
+ let node = self.merkle.read_node(&k)?;
+ Ok(SyncRPC::Node(k.clone(), node))
+ }
+ SyncRPC::Items(items) => {
+ self.data.update_many(items)?;
+ Ok(SyncRPC::Ok)
+ }
+ _ => Err(Error::Message(format!("Unexpected sync RPC"))),
+ }
+ }
+}
+
+impl SyncTodo {
+ fn add_full_sync<F: TableSchema, R: TableReplication>(
+ &mut self,
+ data: &TableData<F, R>,
+ system: &System,
+ ) {
+ let my_id = system.id;
+
+ self.todo.clear();
+
+ let partitions = data.replication.partitions();
+
+ for i in 0..partitions.len() {
+ let begin = partitions[i].1;
+
+ let end = if i + 1 < partitions.len() {
+ partitions[i + 1].1
+ } else {
+ [0xFFu8; 32].into()
+ };
+
+ let nodes = data.replication.write_nodes(&begin);
+
+ let retain = nodes.contains(&my_id);
+ if !retain {
+ // Check if we have some data to send, otherwise skip
+ if data.store.range(begin..end).next().is_none() {
+ continue;
+ }
+ }
+
+ self.todo.push(TodoPartition {
+ partition: partitions[i].0,
+ begin,
+ end,
+ retain,
+ });
+ }
+ }
+
+ fn pop_task(&mut self) -> Option<TodoPartition> {
+ if self.todo.is_empty() {
+ return None;
+ }
+
+ let i = rand::thread_rng().gen_range(0..self.todo.len());
+ if i == self.todo.len() - 1 {
+ self.todo.pop()
+ } else {
+ let replacement = self.todo.pop().unwrap();
+ let ret = std::mem::replace(&mut self.todo[i], replacement);
+ Some(ret)
+ }
+ }
+}
+
+fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
+ Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
+}
+
+fn join_ordered<'a, K: Ord + Eq, V1, V2>(
+ x: &'a [(K, V1)],
+ y: &'a [(K, V2)],
+) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> {
+ let mut ret = vec![];
+ let mut i = 0;
+ let mut j = 0;
+ while i < x.len() || j < y.len() {
+ if i < x.len() && j < y.len() && x[i].0 == y[j].0 {
+ ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1)));
+ i += 1;
+ j += 1;
+ } else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) {
+ ret.push((&x[i].0, Some(&x[i].1), None));
+ i += 1;
+ } else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) {
+ ret.push((&y[j].0, None, Some(&y[j].1)));
+ j += 1;
+ } else {
+ unreachable!();
+ }
+ }
+ ret
+}
diff --git a/src/table/table.rs b/src/table/table.rs
index 1f6b7d25..e203b178 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,9 +2,6 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
-use log::warn;
-
-use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -13,25 +10,25 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
+use crate::crdt::CRDT;
+use crate::data::*;
+use crate::gc::*;
+use crate::merkle::*;
+use crate::replication::*;
use crate::schema::*;
-use crate::table_sync::*;
+use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Table<F: TableSchema, R: TableReplication> {
- pub instance: F,
- pub replication: R,
-
- pub name: String,
- pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>,
-
pub system: Arc<System>,
- pub store: sled::Tree,
- pub syncer: ArcSwapOption<TableSyncer<F, R>>,
+ pub data: Arc<TableData<F, R>>,
+ pub merkle_updater: Arc<MerkleUpdater<F, R>>,
+ pub syncer: Arc<TableSyncer<F, R>>,
+ rpc_client: Arc<RpcClient<TableRPC<F>>>,
}
#[derive(Serialize, Deserialize)]
@@ -45,30 +42,10 @@ pub(crate) enum TableRPC<F: TableSchema> {
ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize),
Update(Vec<Arc<ByteBuf>>),
-
- SyncRPC(SyncRPC),
}
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
-pub trait TableReplication: Send + Sync {
- // See examples in table_sharded.rs and table_fullcopy.rs
- // To understand various replication methods
-
- // Which nodes to send reads from
- fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn read_quorum(&self) -> usize;
-
- // Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn write_quorum(&self, system: &System) -> usize;
- fn max_write_errors(&self) -> usize;
-
- // Which are the nodes that do actually replicate the data
- fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
- fn split_points(&self, ring: &Ring) -> Vec<Hash>;
-}
-
impl<F, R> Table<F, R>
where
F: TableSchema + 'static,
@@ -76,7 +53,7 @@ where
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
- pub async fn new(
+ pub fn new(
instance: F,
replication: R,
system: Arc<System>,
@@ -84,31 +61,37 @@ where
name: String,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let store = db.open_tree(&name).expect("Unable to open DB tree");
-
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
+ let data = TableData::new(system.clone(), name, instance, replication, db);
+
+ let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
+
+ let syncer = TableSyncer::launch(
+ system.clone(),
+ data.clone(),
+ merkle_updater.clone(),
+ rpc_server,
+ );
+ TableGC::launch(system.clone(), data.clone(), rpc_server);
+
let table = Arc::new(Self {
- instance,
- replication,
- name,
- rpc_client,
system,
- store,
- syncer: ArcSwapOption::from(None),
+ data,
+ merkle_updater,
+ syncer,
+ rpc_client,
});
- table.clone().register_handler(rpc_server, rpc_path);
- let syncer = TableSyncer::launch(table.clone()).await;
- table.syncer.swap(Some(syncer));
+ table.clone().register_handler(rpc_server, rpc_path);
table
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.replication.write_nodes(&hash, &self.system);
+ let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
@@ -118,7 +101,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
+ RequestStrategy::with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -130,7 +113,7 @@ where
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let who = self.replication.write_nodes(&hash, &self.system);
+ let who = self.data.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -154,7 +137,7 @@ where
errors.push(e);
}
}
- if errors.len() > self.replication.max_write_errors() {
+ if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
Ok(())
@@ -167,7 +150,7 @@ where
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.replication.read_nodes(&hash, &self.system);
+ let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
@@ -176,7 +159,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.read_quorum())
+ RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -187,7 +170,7 @@ where
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
- let v = self.decode_entry(v_bytes.as_slice())?;
+ let v = self.data.decode_entry(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@@ -223,7 +206,7 @@ where
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.replication.read_nodes(&hash, &self.system);
+ let who = self.data.replication.read_nodes(&hash);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
@@ -232,7 +215,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.read_quorum())
+ RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -243,8 +226,8 @@ where
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
- let entry = self.decode_entry(entry_bytes.as_slice())?;
- let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
+ let entry = self.data.decode_entry(entry_bytes.as_slice())?;
+ let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
ret.insert(entry_key, Some(entry));
@@ -313,146 +296,18 @@ where
async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
- let value = self.handle_read_entry(key, sort_key)?;
+ let value = self.data.read_entry(key, sort_key)?;
Ok(TableRPC::ReadEntryResponse(value))
}
TableRPC::ReadRange(key, begin_sort_key, filter, limit) => {
- let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?;
+ let values = self.data.read_range(key, begin_sort_key, filter, *limit)?;
Ok(TableRPC::Update(values))
}
TableRPC::Update(pairs) => {
- self.handle_update(pairs).await?;
+ self.data.update_many(pairs)?;
Ok(TableRPC::Ok)
}
- TableRPC::SyncRPC(rpc) => {
- let syncer = self.syncer.load_full().unwrap();
- let response = syncer
- .handle_rpc(rpc, self.system.background.stop_signal.clone())
- .await?;
- Ok(TableRPC::SyncRPC(response))
- }
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
}
}
-
- fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
- let tree_key = self.tree_key(p, s);
- if let Some(bytes) = self.store.get(&tree_key)? {
- Ok(Some(ByteBuf::from(bytes.to_vec())))
- } else {
- Ok(None)
- }
- }
-
- fn handle_read_range(
- &self,
- p: &F::P,
- s: &Option<F::S>,
- filter: &Option<F::Filter>,
- limit: usize,
- ) -> Result<Vec<Arc<ByteBuf>>, Error> {
- let partition_hash = p.hash();
- let first_key = match s {
- None => partition_hash.to_vec(),
- Some(sk) => self.tree_key(p, sk),
- };
- let mut ret = vec![];
- for item in self.store.range(first_key..) {
- let (key, value) = item?;
- if &key[..32] != partition_hash.as_slice() {
- break;
- }
- let keep = match filter {
- None => true,
- Some(f) => {
- let entry = self.decode_entry(value.as_ref())?;
- F::matches_filter(&entry, f)
- }
- };
- if keep {
- ret.push(Arc::new(ByteBuf::from(value.as_ref())));
- }
- if ret.len() >= limit {
- break;
- }
- }
- Ok(ret)
- }
-
- pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
- let syncer = self.syncer.load_full().unwrap();
-
- for update_bytes in entries.iter() {
- let update = self.decode_entry(update_bytes.as_slice())?;
-
- let tree_key = self.tree_key(update.partition_key(), update.sort_key());
-
- let (old_entry, new_entry) = self.store.transaction(|db| {
- let (old_entry, new_entry) = match db.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = self
- .decode_entry(&prev_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
- (Some(old_entry), new_entry)
- }
- None => (None, update.clone()),
- };
-
- let new_bytes = rmp_to_vec_all_named(&new_entry)
- .map_err(Error::RMPEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- db.insert(tree_key.clone(), new_bytes)?;
- Ok((old_entry, new_entry))
- })?;
-
- if old_entry.as_ref() != Some(&new_entry) {
- self.instance.updated(old_entry, Some(new_entry));
- syncer.invalidate(&tree_key[..]);
- }
- }
-
- Ok(())
- }
-
- pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = self.store.transaction(|txn| {
- if let Some(cur_v) = txn.get(k)? {
- if cur_v == v {
- txn.remove(k)?;
- return Ok(true);
- }
- }
- Ok(false)
- })?;
- if removed {
- let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(old_entry), None);
- self.syncer.load_full().unwrap().invalidate(k);
- }
- Ok(removed)
- }
-
- fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
- let mut ret = p.hash().to_vec();
- ret.extend(s.sort_key());
- ret
- }
-
- fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
- match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
- Ok(x) => Ok(x),
- Err(e) => match F::try_migrate(bytes) {
- Some(x) => Ok(x),
- None => {
- warn!("Unable to decode entry of {}: {}", self.name, e);
- for line in hexdump::hexdump_iter(bytes) {
- debug!("{}", line);
- }
- Err(e.into())
- }
- },
- }
- }
}
diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs
deleted file mode 100644
index c55879d8..00000000
--- a/src/table/table_fullcopy.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-use std::sync::Arc;
-
-use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
-use garage_util::data::*;
-
-use crate::*;
-
-#[derive(Clone)]
-pub struct TableFullReplication {
- pub max_faults: usize,
-}
-
-#[derive(Clone)]
-struct Neighbors {
- ring: Arc<Ring>,
- neighbors: Vec<UUID>,
-}
-
-impl TableFullReplication {
- pub fn new(max_faults: usize) -> Self {
- TableFullReplication { max_faults }
- }
-}
-
-impl TableReplication for TableFullReplication {
- // Full replication schema: all nodes store everything
- // Writes are disseminated in an epidemic manner in the network
-
- // Advantage: do all reads locally, extremely fast
- // Inconvenient: only suitable to reasonably small tables
-
- fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> {
- vec![system.id]
- }
- fn read_quorum(&self) -> usize {
- 1
- }
-
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
- self.replication_nodes(hash, system.ring.borrow().as_ref())
- }
- fn write_quorum(&self, system: &System) -> usize {
- system.ring.borrow().config.members.len() - self.max_faults
- }
- fn max_write_errors(&self) -> usize {
- self.max_faults
- }
-
- fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec<UUID> {
- ring.config.members.keys().cloned().collect::<Vec<_>>()
- }
- fn split_points(&self, _ring: &Ring) -> Vec<Hash> {
- let mut ret = vec![];
- ret.push([0u8; 32].into());
- ret.push([0xFFu8; 32].into());
- ret
- }
-}
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
deleted file mode 100644
index 5fa6793b..00000000
--- a/src/table/table_sync.rs
+++ /dev/null
@@ -1,891 +0,0 @@
-use rand::Rng;
-use std::collections::{BTreeMap, VecDeque};
-use std::sync::{Arc, Mutex};
-use std::time::{Duration, Instant};
-
-use futures::future::join_all;
-use futures::{pin_mut, select};
-use futures_util::future::*;
-use futures_util::stream::*;
-use serde::{Deserialize, Serialize};
-use serde_bytes::ByteBuf;
-use tokio::sync::{mpsc, watch};
-
-use garage_rpc::ring::Ring;
-use garage_util::data::*;
-use garage_util::error::Error;
-
-use crate::*;
-
-const MAX_DEPTH: usize = 16;
-const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
-const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-
-pub struct TableSyncer<F: TableSchema, R: TableReplication> {
- table: Arc<Table<F, R>>,
- todo: Mutex<SyncTodo>,
- cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
-}
-
-#[derive(Serialize, Deserialize)]
-pub(crate) enum SyncRPC {
- GetRootChecksumRange(Hash, Hash),
- RootChecksumRange(SyncRange),
- Checksums(Vec<RangeChecksum>),
- Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
-}
-
-struct SyncTodo {
- todo: Vec<TodoPartition>,
-}
-
-#[derive(Debug, Clone)]
-struct TodoPartition {
- // Partition consists in hashes between begin included and end excluded
- begin: Hash,
- end: Hash,
-
- // Are we a node that stores this partition or not?
- retain: bool,
-}
-
-// A SyncRange defines a query on the dataset stored by a node, in the following way:
-// - all items whose key are >= `begin`
-// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded)
-// - except if the first item of the range has such many leading zero bytes
-// - and stopping at `end` (excluded) if such an item is not found
-// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges"
-// i.e. of ranges of level `level-1` that cover the same range
-// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
-// See RangeChecksum for the struct that stores this information.
-#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
-pub(crate) struct SyncRange {
- begin: Vec<u8>,
- end: Vec<u8>,
- level: usize,
-}
-
-impl std::cmp::PartialOrd for SyncRange {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- Some(self.cmp(other))
- }
-}
-impl std::cmp::Ord for SyncRange {
- fn cmp(&self, other: &Self) -> std::cmp::Ordering {
- self.begin
- .cmp(&other.begin)
- .then(self.level.cmp(&other.level))
- .then(self.end.cmp(&other.end))
- }
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub(crate) struct RangeChecksum {
- bounds: SyncRange,
- children: Vec<(SyncRange, Hash)>,
- found_limit: Option<Vec<u8>>,
-
- #[serde(skip, default = "std::time::Instant::now")]
- time: Instant,
-}
-
-#[derive(Debug, Clone)]
-struct RangeChecksumCache {
- hash: Option<Hash>, // None if no children
- found_limit: Option<Vec<u8>>,
- time: Instant,
-}
-
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
- let todo = SyncTodo { todo: Vec::new() };
- let syncer = Arc::new(TableSyncer {
- table: table.clone(),
- todo: Mutex::new(todo),
- cache: (0..MAX_DEPTH)
- .map(|_| Mutex::new(BTreeMap::new()))
- .collect::<Vec<_>>(),
- });
-
- let (busy_tx, busy_rx) = mpsc::unbounded_channel();
-
- let s1 = syncer.clone();
- table
- .system
- .background
- .spawn_worker(
- format!("table sync watcher for {}", table.name),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- )
- .await;
-
- let s2 = syncer.clone();
- table
- .system
- .background
- .spawn_worker(
- format!("table syncer for {}", table.name),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- )
- .await;
-
- let s3 = syncer.clone();
- tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(20)).await;
- s3.add_full_scan().await;
- });
-
- syncer
- }
-
- async fn watcher_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) -> Result<(), Error> {
- let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
- let mut nothing_to_do_since = Some(Instant::now());
-
- while !*must_exit.borrow() {
- let s_ring_recv = ring_recv.recv().fuse();
- let s_busy = busy_rx.recv().fuse();
- let s_must_exit = must_exit.recv().fuse();
- let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
- pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
-
- select! {
- new_ring_r = s_ring_recv => {
- if let Some(new_ring) = new_ring_r {
- debug!("({}) Adding ring difference to syncer todo list", self.table.name);
- self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring);
- prev_ring = new_ring;
- }
- }
- busy_opt = s_busy => {
- if let Some(busy) = busy_opt {
- if busy {
- nothing_to_do_since = None;
- } else {
- if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
- }
- }
- }
- must_exit_v = s_must_exit => {
- if must_exit_v.unwrap_or(false) {
- break;
- }
- }
- _ = s_timeout => {
- if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
- nothing_to_do_since = None;
- debug!("({}) Adding full scan to syncer todo list", self.table.name);
- self.add_full_scan().await;
- }
- }
- }
- }
- Ok(())
- }
-
- pub async fn add_full_scan(&self) {
- self.todo.lock().unwrap().add_full_scan(&self.table);
- }
-
- async fn syncer_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- busy_tx: mpsc::UnboundedSender<bool>,
- ) -> Result<(), Error> {
- while !*must_exit.borrow() {
- let task = self.todo.lock().unwrap().pop_task();
- if let Some(partition) = task {
- busy_tx.send(true)?;
- let res = self
- .clone()
- .sync_partition(&partition, &mut must_exit)
- .await;
- if let Err(e) = res {
- warn!(
- "({}) Error while syncing {:?}: {}",
- self.table.name, partition, e
- );
- }
- } else {
- busy_tx.send(false)?;
- tokio::time::delay_for(Duration::from_secs(1)).await;
- }
- }
- Ok(())
- }
-
- async fn sync_partition(
- self: Arc<Self>,
- partition: &TodoPartition,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<(), Error> {
- if partition.retain {
- let my_id = self.table.system.id;
- let nodes = self
- .table
- .replication
- .write_nodes(&partition.begin, &self.table.system)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
-
- debug!(
- "({}) Preparing to sync {:?} with {:?}...",
- self.table.name, partition, nodes
- );
- let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
-
- let mut sync_futures = nodes
- .iter()
- .map(|node| {
- self.clone().do_sync_with(
- partition.clone(),
- root_cks.clone(),
- *node,
- must_exit.clone(),
- )
- })
- .collect::<FuturesUnordered<_>>();
-
- let mut n_errors = 0;
- while let Some(r) = sync_futures.next().await {
- if let Err(e) = r {
- n_errors += 1;
- warn!("({}) Sync error: {}", self.table.name, e);
- }
- }
- if n_errors > self.table.replication.max_write_errors() {
- return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
- )));
- }
- } else {
- self.offload_partition(&partition.begin, &partition.end, must_exit)
- .await?;
- }
-
- Ok(())
- }
-
- // Offload partition: this partition is not something we are storing,
- // so send it out to all other nodes that store it and delete items locally.
- // We don't bother checking if the remote nodes already have the items,
- // we just batch-send everything. Offloading isn't supposed to happen very often.
- // If any of the nodes that are supposed to store the items is unable to
- // save them, we interrupt the process.
- async fn offload_partition(
- self: &Arc<Self>,
- begin: &Hash,
- end: &Hash,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut counter: usize = 0;
-
- while !*must_exit.borrow() {
- let mut items = Vec::new();
-
- for item in self.table.store.range(begin.to_vec()..end.to_vec()) {
- let (key, value) = item?;
- items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
-
- if items.len() >= 1024 {
- break;
- }
- }
-
- if items.len() > 0 {
- let nodes = self
- .table
- .replication
- .write_nodes(&begin, &self.table.system)
- .into_iter()
- .collect::<Vec<_>>();
- if nodes.contains(&self.table.system.id) {
- warn!("Interrupting offload as partitions seem to have changed");
- break;
- }
-
- counter += 1;
- debug!(
- "Offloading {} items from {:?}..{:?} ({})",
- items.len(),
- begin,
- end,
- counter
- );
- self.offload_items(&items, &nodes[..]).await?;
- } else {
- break;
- }
- }
-
- Ok(())
- }
-
- async fn offload_items(
- self: &Arc<Self>,
- items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
- nodes: &[UUID],
- ) -> Result<(), Error> {
- let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- let update_msg = Arc::new(TableRPC::<F>::Update(values));
-
- for res in join_all(nodes.iter().map(|to| {
- self.table
- .rpc_client
- .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
- }))
- .await
- {
- res?;
- }
-
- // All remote nodes have written those items, now we can delete them locally
- let mut not_removed = 0;
- for (k, v) in items.iter() {
- if !self.table.delete_if_equal(&k[..], &v[..])? {
- not_removed += 1;
- }
- }
-
- if not_removed > 0 {
- debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed);
- }
-
- Ok(())
- }
-
- fn root_checksum(
- self: &Arc<Self>,
- begin: &Hash,
- end: &Hash,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksum, Error> {
- for i in 1..MAX_DEPTH {
- let rc = self.range_checksum(
- &SyncRange {
- begin: begin.to_vec(),
- end: end.to_vec(),
- level: i,
- },
- must_exit,
- )?;
- if rc.found_limit.is_none() {
- return Ok(rc);
- }
- }
- Err(Error::Message(format!(
- "Unable to compute root checksum (this should never happen)"
- )))
- }
-
- fn range_checksum(
- self: &Arc<Self>,
- range: &SyncRange,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksum, Error> {
- assert!(range.level != 0);
- trace!("Call range_checksum {:?}", range);
-
- if range.level == 1 {
- let mut children = vec![];
- for item in self
- .table
- .store
- .range(range.begin.clone()..range.end.clone())
- {
- let (key, value) = item?;
- let key_hash = blake2sum(&key[..]);
- if children.len() > 0
- && key_hash.as_slice()[0..range.level]
- .iter()
- .all(|x| *x == 0u8)
- {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: Some(key.to_vec()),
- time: Instant::now(),
- });
- }
- let item_range = SyncRange {
- begin: key.to_vec(),
- end: vec![],
- level: 0,
- };
- children.push((item_range, blake2sum(&value[..])));
- }
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: None,
- time: Instant::now(),
- })
- } else {
- let mut children = vec![];
- let mut sub_range = SyncRange {
- begin: range.begin.clone(),
- end: range.end.clone(),
- level: range.level - 1,
- };
- let mut time = Instant::now();
- while !*must_exit.borrow() {
- let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?;
-
- if let Some(hash) = sub_ck.hash {
- children.push((sub_range.clone(), hash));
- if sub_ck.time < time {
- time = sub_ck.time;
- }
- }
-
- if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: None,
- time,
- });
- }
- let found_limit = sub_ck.found_limit.unwrap();
-
- let actual_limit_hash = blake2sum(&found_limit[..]);
- if actual_limit_hash.as_slice()[0..range.level]
- .iter()
- .all(|x| *x == 0u8)
- {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: Some(found_limit.clone()),
- time,
- });
- }
-
- sub_range.begin = found_limit;
- }
- trace!("range_checksum {:?} exiting due to must_exit", range);
- Err(Error::Message(format!("Exiting.")))
- }
- }
-
- fn range_checksum_cached_hash(
- self: &Arc<Self>,
- range: &SyncRange,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksumCache, Error> {
- {
- let mut cache = self.cache[range.level].lock().unwrap();
- if let Some(v) = cache.get(&range) {
- if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
- return Ok(v.clone());
- }
- }
- cache.remove(&range);
- }
-
- let v = self.range_checksum(&range, must_exit)?;
- trace!(
- "({}) New checksum calculated for {}-{}/{}, {} children",
- self.table.name,
- hex::encode(&range.begin)
- .chars()
- .take(16)
- .collect::<String>(),
- hex::encode(&range.end).chars().take(16).collect::<String>(),
- range.level,
- v.children.len()
- );
-
- let hash = if v.children.len() > 0 {
- Some(blake2sum(&rmp_to_vec_all_named(&v)?[..]))
- } else {
- None
- };
- let cache_entry = RangeChecksumCache {
- hash,
- found_limit: v.found_limit,
- time: v.time,
- };
-
- let mut cache = self.cache[range.level].lock().unwrap();
- cache.insert(range.clone(), cache_entry.clone());
- Ok(cache_entry)
- }
-
- async fn do_sync_with(
- self: Arc<Self>,
- partition: TodoPartition,
- root_ck: RangeChecksum,
- who: UUID,
- mut must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut todo = VecDeque::new();
-
- // If their root checksum has level > than us, use that as a reference
- let root_cks_resp = self
- .table
- .rpc_client
- .call(
- who,
- TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange(
- partition.begin.clone(),
- partition.end.clone(),
- )),
- TABLE_SYNC_RPC_TIMEOUT,
- )
- .await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
- if range.level > root_ck.bounds.level {
- let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?;
- todo.push_back(their_root_range_ck);
- } else {
- todo.push_back(root_ck);
- }
- } else {
- return Err(Error::Message(format!(
- "Invalid respone to GetRootChecksumRange RPC: {}",
- debug_serialize(root_cks_resp)
- )));
- }
-
- while !todo.is_empty() && !*must_exit.borrow() {
- let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
- trace!(
- "({}) Sync with {:?}: {} ({}) remaining",
- self.table.name,
- who,
- todo.len(),
- total_children
- );
-
- let step_size = std::cmp::min(16, todo.len());
- let step = todo.drain(..step_size).collect::<Vec<_>>();
-
- let rpc_resp = self
- .table
- .rpc_client
- .call(
- who,
- TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)),
- TABLE_SYNC_RPC_TIMEOUT,
- )
- .await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) =
- rpc_resp
- {
- if diff_ranges.len() > 0 || diff_items.len() > 0 {
- info!(
- "({}) Sync with {:?}: difference {} ranges, {} items",
- self.table.name,
- who,
- diff_ranges.len(),
- diff_items.len()
- );
- }
- let mut items_to_send = vec![];
- for differing in diff_ranges.drain(..) {
- if differing.level == 0 {
- items_to_send.push(differing.begin);
- } else {
- let checksum = self.range_checksum(&differing, &mut must_exit)?;
- todo.push_back(checksum);
- }
- }
- if diff_items.len() > 0 {
- self.table.handle_update(&diff_items[..]).await?;
- }
- if items_to_send.len() > 0 {
- self.send_items(who, items_to_send).await?;
- }
- } else {
- return Err(Error::Message(format!(
- "Unexpected response to sync RPC checksums: {}",
- debug_serialize(&rpc_resp)
- )));
- }
- }
- Ok(())
- }
-
- async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
- info!(
- "({}) Sending {} items to {:?}",
- self.table.name,
- item_list.len(),
- who
- );
-
- let mut values = vec![];
- for item in item_list.iter() {
- if let Some(v) = self.table.store.get(&item[..])? {
- values.push(Arc::new(ByteBuf::from(v.as_ref())));
- }
- }
- let rpc_resp = self
- .table
- .rpc_client
- .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
- .await?;
- if let TableRPC::<F>::Ok = rpc_resp {
- Ok(())
- } else {
- Err(Error::Message(format!(
- "Unexpected response to RPC Update: {}",
- debug_serialize(&rpc_resp)
- )))
- }
- }
-
- pub(crate) async fn handle_rpc(
- self: &Arc<Self>,
- message: &SyncRPC,
- mut must_exit: watch::Receiver<bool>,
- ) -> Result<SyncRPC, Error> {
- match message {
- SyncRPC::GetRootChecksumRange(begin, end) => {
- let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?;
- Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
- }
- SyncRPC::Checksums(checksums) => {
- self.handle_checksums_rpc(&checksums[..], &mut must_exit)
- .await
- }
- _ => Err(Error::Message(format!("Unexpected sync RPC"))),
- }
- }
-
- async fn handle_checksums_rpc(
- self: &Arc<Self>,
- checksums: &[RangeChecksum],
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<SyncRPC, Error> {
- let mut ret_ranges = vec![];
- let mut ret_items = vec![];
-
- for their_ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?;
- for (their_range, their_hash) in their_ckr.children.iter() {
- let differs = match our_ckr
- .children
- .binary_search_by(|(our_range, _)| our_range.cmp(&their_range))
- {
- Err(_) => {
- if their_range.level >= 1 {
- let cached_hash =
- self.range_checksum_cached_hash(&their_range, must_exit)?;
- cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
- } else {
- true
- }
- }
- Ok(i) => our_ckr.children[i].1 != *their_hash,
- };
- if differs {
- ret_ranges.push(their_range.clone());
- if their_range.level == 0 {
- if let Some(item_bytes) =
- self.table.store.get(their_range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
- }
- }
- }
- for (our_range, _hash) in our_ckr.children.iter() {
- if let Some(their_found_limit) = &their_ckr.found_limit {
- if our_range.begin.as_slice() > their_found_limit.as_slice() {
- break;
- }
- }
-
- let not_present = our_ckr
- .children
- .binary_search_by(|(their_range, _)| their_range.cmp(&our_range))
- .is_err();
- if not_present {
- if our_range.level > 0 {
- ret_ranges.push(our_range.clone());
- }
- if our_range.level == 0 {
- if let Some(item_bytes) =
- self.table.store.get(our_range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
- }
- }
- }
- }
- let n_checksums = checksums
- .iter()
- .map(|x| x.children.len())
- .fold(0, |x, y| x + y);
- if ret_ranges.len() > 0 || ret_items.len() > 0 {
- trace!(
- "({}) Checksum comparison RPC: {} different + {} items for {} received",
- self.table.name,
- ret_ranges.len(),
- ret_items.len(),
- n_checksums
- );
- }
- Ok(SyncRPC::Difference(ret_ranges, ret_items))
- }
-
- pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) {
- for i in 1..MAX_DEPTH {
- let needle = SyncRange {
- begin: item_key.to_vec(),
- end: vec![],
- level: i,
- };
- let mut cache = self.cache[i].lock().unwrap();
- if let Some(cache_entry) = cache.range(..=needle).rev().next() {
- if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key {
- let index = cache_entry.0.clone();
- drop(cache_entry);
- cache.remove(&index);
- }
- }
- }
- }
-}
-
-impl SyncTodo {
- fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
- let my_id = table.system.id;
-
- self.todo.clear();
-
- let ring = table.system.ring.borrow().clone();
- let split_points = table.replication.split_points(&ring);
-
- for i in 0..split_points.len() - 1 {
- let begin = split_points[i];
- let end = split_points[i + 1];
- let nodes = table.replication.replication_nodes(&begin, &ring);
-
- let retain = nodes.contains(&my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- if table.store.range(begin..end).next().is_none() {
- continue;
- }
- }
-
- self.todo.push(TodoPartition { begin, end, retain });
- }
- }
-
- fn add_ring_difference<F: TableSchema, R: TableReplication>(
- &mut self,
- table: &Table<F, R>,
- old_ring: &Ring,
- new_ring: &Ring,
- ) {
- let my_id = table.system.id;
-
- // If it is us who are entering or leaving the system,
- // initiate a full sync instead of incremental sync
- if old_ring.config.members.contains_key(&my_id)
- != new_ring.config.members.contains_key(&my_id)
- {
- self.add_full_scan(table);
- return;
- }
-
- let mut all_points = None
- .into_iter()
- .chain(table.replication.split_points(old_ring).drain(..))
- .chain(table.replication.split_points(new_ring).drain(..))
- .chain(self.todo.iter().map(|x| x.begin))
- .chain(self.todo.iter().map(|x| x.end))
- .collect::<Vec<_>>();
- all_points.sort();
- all_points.dedup();
-
- let mut old_todo = std::mem::replace(&mut self.todo, vec![]);
- old_todo.sort_by(|x, y| x.begin.cmp(&y.begin));
- let mut new_todo = vec![];
-
- for i in 0..all_points.len() - 1 {
- let begin = all_points[i];
- let end = all_points[i + 1];
- let was_ours = table
- .replication
- .replication_nodes(&begin, &old_ring)
- .contains(&my_id);
- let is_ours = table
- .replication
- .replication_nodes(&begin, &new_ring)
- .contains(&my_id);
-
- let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) {
- Ok(_) => true,
- Err(j) => {
- (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end)
- || (j < old_todo.len()
- && old_todo[j].begin < end && begin < old_todo[j].end)
- }
- };
- if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
- new_todo.push(TodoPartition {
- begin,
- end,
- retain: is_ours,
- });
- }
- }
-
- self.todo = new_todo;
- }
-
- fn pop_task(&mut self) -> Option<TodoPartition> {
- if self.todo.is_empty() {
- return None;
- }
-
- let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
- if i == self.todo.len() - 1 {
- self.todo.pop()
- } else {
- let replacement = self.todo.pop().unwrap();
- let ret = std::mem::replace(&mut self.todo[i], replacement);
- Some(ret)
- }
- }
-}