diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/model/bucket_table.rs | 3 | ||||
-rw-r--r-- | src/rpc/lib.rs | 2 | ||||
-rw-r--r-- | src/rpc/membership.rs | 47 | ||||
-rw-r--r-- | src/rpc/rpc_client.rs | 6 | ||||
-rw-r--r-- | src/table/crdt.rs | 187 |
5 files changed, 207 insertions, 38 deletions
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 78b0416f..a101555f 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -8,6 +8,9 @@ use garage_util::error::Error; use crate::key_table::PermissionSet; +// We import the same file but in its version 0.1.0. +// We can then access v0.1.0 data structures. +// We use them to perform migrations. use model010::bucket_table as prev; /// A bucket is a collection of objects diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 4c5f6e31..639ece15 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -5,4 +5,4 @@ pub mod consul; pub mod membership; pub mod rpc_client; pub mod rpc_server; -pub mod tls_util; +pub(crate) mod tls_util; diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 697cddd0..6e573a61 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -46,13 +46,13 @@ impl RpcMessage for Message {} #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { - pub id: UUID, - pub rpc_port: u16, + id: UUID, + rpc_port: u16, - pub status_hash: Hash, - pub config_version: u64, + status_hash: Hash, + config_version: u64, - pub state_info: StateInfo, + state_info: StateInfo, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -81,12 +81,13 @@ pub struct NetworkConfigEntry { pub struct System { pub id: UUID, - pub data_dir: PathBuf, - pub rpc_local_port: u16, - pub state_info: StateInfo, + metadata_dir: PathBuf, + rpc_local_port: u16, + + state_info: StateInfo, - pub rpc_http_client: Arc<RpcHttpClient>, + rpc_http_client: Arc<RpcHttpClient>, rpc_client: Arc<RpcClient<Message>>, pub status: watch::Receiver<Arc<Status>>, @@ -296,15 +297,15 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> { impl System { pub fn new( - data_dir: PathBuf, + metadata_dir: PathBuf, rpc_http_client: Arc<RpcHttpClient>, background: Arc<BackgroundRunner>, rpc_server: &mut RpcServer, ) -> Arc<Self> { - let id = gen_node_id(&data_dir).expect("Unable to read or generate node ID"); + let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID"); info!("Node ID: {}", hex::encode(&id)); - let net_config = match read_network_config(&data_dir) { + let net_config = match read_network_config(&metadata_dir) { Ok(x) => x, Err(e) => { info!( @@ -347,7 +348,7 @@ impl System { let sys = Arc::new(System { id, - data_dir, + metadata_dir, rpc_local_port: rpc_server.bind_addr.port(), state_info, rpc_http_client, @@ -388,7 +389,7 @@ impl System { } async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { - let mut path = self.data_dir.clone(); + let mut path = self.metadata_dir.clone(); path.push("network_config"); let ring = self.ring.borrow().clone(); @@ -399,7 +400,7 @@ impl System { Ok(()) } - pub fn make_ping(&self) -> Message { + fn make_ping(&self) -> Message { let status = self.status.borrow().clone(); let ring = self.ring.borrow().clone(); Message::Ping(PingMessage { @@ -411,7 +412,7 @@ impl System { }) } - pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) { + async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) { let status = self.status.borrow().clone(); let to = status .nodes @@ -527,7 +528,7 @@ impl System { } } - pub async fn handle_ping( + async fn handle_ping( self: Arc<Self>, from: &SocketAddr, ping: &PingMessage, @@ -557,7 +558,7 @@ impl System { Ok(self.make_ping()) } - pub fn handle_pull_status(&self) -> Result<Message, Error> { + fn handle_pull_status(&self) -> Result<Message, Error> { let status = self.status.borrow().clone(); let mut mem = vec![]; for (node, status) in status.nodes.iter() { @@ -577,12 +578,12 @@ impl System { Ok(Message::AdvertiseNodesUp(mem)) } - pub fn handle_pull_config(&self) -> Result<Message, Error> { + fn handle_pull_config(&self) -> Result<Message, Error> { let ring = self.ring.borrow().clone(); Ok(Message::AdvertiseConfig(ring.config.clone())) } - pub async fn handle_advertise_nodes_up( + async fn handle_advertise_nodes_up( self: Arc<Self>, adv: &[AdvertisedNode], ) -> Result<Message, Error> { @@ -635,7 +636,7 @@ impl System { Ok(Message::Ok) } - pub async fn handle_advertise_config( + async fn handle_advertise_config( self: Arc<Self>, adv: &NetworkConfig, ) -> Result<Message, Error> { @@ -716,7 +717,7 @@ impl System { } } - pub fn pull_status( + fn pull_status( self: Arc<Self>, peer: UUID, ) -> impl futures::future::Future<Output = ()> + Send + 'static { @@ -731,7 +732,7 @@ impl System { } } - pub async fn pull_config(self: Arc<Self>, peer: UUID) { + async fn pull_config(self: Arc<Self>, peer: UUID) { let resp = self .rpc_client .call(peer, Message::PullConfig, PING_TIMEOUT) diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 6119696d..5183bb4b 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -61,7 +61,7 @@ pub struct RpcClient<M: RpcMessage> { local_handler: ArcSwapOption<(UUID, LocalHandlerFn<M>)>, - pub rpc_addr_client: RpcAddrClient<M>, + rpc_addr_client: RpcAddrClient<M>, } impl<M: RpcMessage + 'static> RpcClient<M> { @@ -215,8 +215,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> { pub struct RpcAddrClient<M: RpcMessage> { phantom: PhantomData<M>, - pub http_client: Arc<RpcHttpClient>, - pub path: String, + http_client: Arc<RpcHttpClient>, + path: String, } impl<M: RpcMessage> RpcAddrClient<M> { diff --git a/src/table/crdt.rs b/src/table/crdt.rs index 2b903cf0..386e478b 100644 --- a/src/table/crdt.rs +++ b/src/table/crdt.rs @@ -1,11 +1,48 @@ +//! This package provides a simple implementation of conflict-free replicated data types (CRDTs) +//! +//! CRDTs are a type of data structures that do not require coordination. In other words, we can +//! edit them in parallel, we will always find a way to merge it. +//! +//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the +//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now, +//! it is easy to merge their counters, order does not count: we always get 4. +//! +//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) + use serde::{Deserialize, Serialize}; use garage_util::data::*; +/// Definition of a CRDT - all CRDT Rust types implement this. +/// +/// A CRDT is defined as a merge operator that respects a certain set of axioms. +/// +/// In particular, the merge operator must be commutative, associative, +/// idempotent, and monotonic. +/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator, +/// the following axioms must apply: +/// +/// ```text +/// a ⊔ b = b ⊔ a (commutativity) +/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity) +/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence) +/// ``` +/// +/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order. +/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`, +/// as this would imply a cycle in the partial order. pub trait CRDT { + /// Merge the two datastructures according to the CRDT rules. + /// `self` is modified to contain the merged CRDT value. `other` is not modified. + /// + /// # Arguments + /// + /// * `other` - the other CRDT we wish to merge with fn merge(&mut self, other: &Self); } +/// All types that implement `Ord` (a total order) also implement a trivial CRDT +/// defined by the merge rule: `a ⊔ b = max(a, b)`. impl<T> CRDT for T where T: Ord + Clone, @@ -19,6 +56,37 @@ where // ---- LWW Register ---- +/// Last Write Win (LWW) +/// +/// An LWW CRDT associates a timestamp with a value, in order to implement a +/// time-based reconciliation rule: the most recent write wins. +/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs +/// with the same timestamp but different values. +/// +/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must +/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to +/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types +/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value. +/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is +/// generally desirable in this case to never explicitly produce LWW values with the same timestamp +/// but different inner values, as the rule to keep the maximum value isn't generally the desired +/// semantics.) +/// +/// As multiple computers clocks are always desynchronized, +/// when operations are close enough, it is equivalent to +/// take one copy and drop the other one. +/// +/// Given that clocks are not too desynchronized, this assumption +/// is enough for most cases, as there is few chance that two humans +/// coordonate themself faster than the time difference between two NTP servers. +/// +/// As a more concret example, let's suppose you want to upload a file +/// with the same key (path) in the same bucket at the very same time. +/// For each request, the file will be timestamped by the receiving server +/// and may differ from what you observed with your atomic clock! +/// +/// This scheme is used by AWS S3 or Soundcloud and often without knowing +/// in entreprise when reconciliating databases with ad-hoc scripts. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LWW<T> { ts: u64, @@ -29,22 +97,55 @@ impl<T> LWW<T> where T: CRDT, { + /// Creates a new CRDT + /// + /// CRDT's internal timestamp is set with current node's clock. pub fn new(value: T) -> Self { Self { ts: now_msec(), v: value, } } + + /// Build a new CRDT from a previous non-compatible one + /// + /// Compared to new, the CRDT's timestamp is not set to now + /// but must be set to the previous, non-compatible, CRDT's timestamp. pub fn migrate_from_raw(ts: u64, value: T) -> Self { Self { ts, v: value } } + + /// Update the LWW CRDT while keeping some causal ordering. + /// + /// The timestamp of the LWW CRDT is updated to be the current node's clock + /// at time of update, or the previous timestamp + 1 if that's bigger, + /// so that the new timestamp is always strictly larger than the previous one. + /// This ensures that merging the update with the old value will result in keeping + /// the updated value. pub fn update(&mut self, new_value: T) { self.ts = std::cmp::max(self.ts + 1, now_msec()); self.v = new_value; } + + /// Get the CRDT value pub fn get(&self) -> &T { &self.v } + + /// Get a mutable reference to the CRDT's value + /// + /// This is usefull to mutate the inside value without changing the LWW timestamp. + /// When such mutation is done, the merge between two LWW values is done using the inner + /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large + /// data type, such as a map, and we only want to change a single item in the map. + /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification. + /// This delta consists in a LWW with the same timestamp, and the map + /// inside only contains the updated value. + /// The advantage of such a delta is that it is much smaller than the whole map. + /// + /// Avoid using this if the inner data type is a primitive type such as a number or a string, + /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum + /// of both values. pub fn get_mut(&mut self) -> &mut T { &mut self.v } @@ -64,18 +165,20 @@ where } } -// ---- Boolean (true as absorbing state) ---- - +/// Boolean, where `true` is an absorbing state #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] pub struct Bool(bool); impl Bool { + /// Create a new boolean with the specified value pub fn new(b: bool) -> Self { Self(b) } + /// Set the boolean to true pub fn set(&mut self) { self.0 = true; } + /// Get the boolean value pub fn get(&self) -> bool { self.0 } @@ -87,8 +190,23 @@ impl CRDT for Bool { } } -// ---- LWW Map ---- - +/// Last Write Win Map +/// +/// This types defines a CRDT for a map from keys to values. +/// The values have an associated timestamp, such that the last written value +/// takes precedence over previous ones. As for the simpler `LWW` type, the value +/// type `V` is also required to implement the CRDT trait. +/// We do not encourage mutating the values associated with a given key +/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()` +/// method that would allow that. +/// +/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. +/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, +/// such that two values can be compared for equality based on their hashes). As a consequence, +/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps. +/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, +/// the serialization cost `O(n)` would still have to be paid at each modification, so we are +/// actually not losing anything here. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LWWMap<K, V> { vals: Vec<(K, u64, V)>, @@ -99,21 +217,35 @@ where K: Ord, V: CRDT, { + /// Create a new empty map CRDT pub fn new() -> Self { Self { vals: vec![] } } + /// Used to migrate from a map defined in an incompatible format. This produces + /// a map that contains a single item with the specified timestamp (copied from + /// the incompatible format). Do this as many times as you have items to migrate, + /// and put them all together using the CRDT merge operator. pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { Self { vals: vec![(k, ts, v)], } } - pub fn take_and_clear(&mut self) -> Self { - let vals = std::mem::replace(&mut self.vals, vec![]); - Self { vals } - } - pub fn clear(&mut self) { - self.vals.clear(); - } + /// Returns a map that contains a single mapping from the specified key to the specified value. + /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map, + /// the previous value will be replaced with the one specified here. + /// The timestamp in the provided mutator is set to the maximum of the current system's clock + /// and 1 + the previous value's timestamp (if there is one), so that the new value will always + /// take precedence (LWW rule). + /// + /// Typically, to update the value associated to a key in the map, you would do the following: + /// + /// ``` + /// let my_update = my_crdt.update_mutator(key_to_modify, new_value); + /// my_crdt.merge(&my_update); + /// ``` + /// + /// However extracting the mutator on its own and only sending that on the network is very + /// interesting as it is much smaller than the whole map. pub fn update_mutator(&self, k: K, new_v: V) -> Self { let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { @@ -125,12 +257,45 @@ where }; Self { vals: new_vals } } + /// Takes all of the values of the map and returns them. The current map is reset to the + /// empty map. This is very usefull to produce in-place a new map that contains only a delta + /// that modifies a certain value: + /// + /// ``` + /// let mut a = get_my_crdt_value(); + /// let old_a = a.take_and_clear(); + /// a.merge(&old_a.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + /// + /// Of course in this simple example we could have written simply + /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`, + /// but in the case where the map is a field in a struct for instance (as is always the case), + /// this becomes very handy: + /// + /// ``` + /// let mut a = get_my_crdt_value(); + /// let old_a_map = a.map_field.take_and_clear(); + /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + pub fn take_and_clear(&mut self) -> Self { + let vals = std::mem::replace(&mut self.vals, vec![]); + Self { vals } + } + /// Removes all values from the map + pub fn clear(&mut self) { + self.vals.clear(); + } + /// Get a reference to the value assigned to a key pub fn get(&self, k: &K) -> Option<&V> { match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => Some(&self.vals[i].2), Err(_) => None, } } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. + /// In most case you will want to ignore the timestamp (second item of the tuple). pub fn items(&self) -> &[(K, u64, V)] { &self.vals[..] } |