diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 5 | ||||
-rw-r--r-- | src/rpc/graph_algo.rs | 4 | ||||
-rw-r--r-- | src/rpc/layout.rs | 356 | ||||
-rw-r--r-- | src/rpc/lib.rs | 1 | ||||
-rw-r--r-- | src/rpc/replication_mode.rs | 57 | ||||
-rw-r--r-- | src/rpc/ring.rs | 8 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 18 | ||||
-rw-r--r-- | src/rpc/system.rs | 222 |
8 files changed, 472 insertions, 199 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 1b411c6a..3d4d3ff5 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_rpc" -version = "0.8.0" +version = "0.8.1" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,7 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_util = { version = "0.8.0", path = "../util" } +garage_util = { version = "0.8.1", path = "../util" } arc-swap = "1.0" bytes = "1.0" @@ -27,7 +27,6 @@ itertools="0.10" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } async-trait = "0.1.7" -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" serde_json = "1.0" diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs index f181e2ba..65450d64 100644 --- a/src/rpc/graph_algo.rs +++ b/src/rpc/graph_algo.rs @@ -1,5 +1,5 @@ //! This module deals with graph algorithms. -//! It is used in layout.rs to build the partition to node assignation. +//! It is used in layout.rs to build the partition to node assignment. use rand::prelude::SliceRandom; use std::cmp::{max, min}; @@ -177,7 +177,7 @@ impl Graph<FlowEdge> { let flow_upper_bound = self.flow_upper_bound()?; // To ensure the dispersion of the associations generated by the - // assignation, we shuffle the neighbours of the nodes. Hence, + // assignment, we shuffle the neighbours of the nodes. Hence, // the vertices do not consider their neighbours in the same order. self.shuffle_edges(); diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 1cef44d1..b6c2fd27 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -5,10 +5,9 @@ use std::collections::HashSet; use bytesize::ByteSize; use itertools::Itertools; -use serde::{Deserialize, Serialize}; - use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap}; use garage_util::data::*; +use garage_util::encode::nonversioned_encode; use garage_util::error::*; use crate::graph_algo::*; @@ -22,75 +21,196 @@ const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; // The Message type will be used to collect information on the algorithm. type Message = Vec<String>; -/// The layout of the cluster, i.e. the list of roles -/// which are assigned to each cluster node -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ClusterLayout { - pub version: u64, - - pub replication_factor: usize, - - /// This attribute is only used to retain the previously computed partition size, - /// to know to what extent does it change with the layout update. - pub partition_size: u64, - /// Parameters used to compute the assignation currently given by - /// ring_assignation_data - pub parameters: LayoutParameters, - - pub roles: LwwMap<Uuid, NodeRoleV>, - - /// node_id_vec: a vector of node IDs with a role assigned - /// in the system (this includes gateway nodes). - /// The order here is different than the vec stored by `roles`, because: - /// 1. non-gateway nodes are first so that they have lower numbers holding - /// in u8 (the number of non-gateway nodes is at most 256). - /// 2. nodes that don't have a role are excluded (but they need to - /// stay in the CRDT as tombstones) - pub node_id_vec: Vec<Uuid>, - /// the assignation of data partitions to node, the values - /// are indices in node_id_vec - #[serde(with = "serde_bytes")] - pub ring_assignation_data: Vec<CompactNodeType>, - - /// Parameters to be used in the next partition assignation computation. - pub staging_parameters: Lww<LayoutParameters>, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap<Uuid, NodeRoleV>, - pub staging_hash: Hash, +mod v08 { + use crate::ring::CompactNodeType; + use garage_util::crdt::LwwMap; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + pub roles: LwwMap<Uuid, NodeRoleV>, + + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec<Uuid>, + /// the assignation of data partitions to node, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec<CompactNodeType>, + + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRoleV(pub Option<NodeRole>); + + /// The user-assigned roles of cluster nodes + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRole { + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution + pub zone: String, + /// The capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option<u64>, + /// A set of tags to recognize the node + pub tags: Vec<String>, + } + + impl garage_util::migrate::InitialFormat for ClusterLayout {} } -/// This struct is used to set the parameters to be used in the assignation computation -/// algorithm. It is stored as a Crdt. -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct LayoutParameters { - pub zone_redundancy: usize, +mod v09 { + use super::v08; + use crate::ring::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + pub use v08::{NodeRole, NodeRoleV}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap<Uuid, NodeRoleV>, + + /// see comment in v08::ClusterLayout + pub node_id_vec: Vec<Uuid>, + /// see comment in v08::ClusterLayout + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec<CompactNodeType>, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + /// This struct is used to set the parameters to be used in the assignment computation + /// algorithm. It is stored as a Crdt. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct LayoutParameters { + pub zone_redundancy: usize, + } + + impl garage_util::migrate::Migrate for ClusterLayout { + const VERSION_MARKER: &'static [u8] = b"Glayout09"; + + type Previous = v08::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + use itertools::Itertools; + use std::collections::HashSet; + + // In the old layout, capacities are in an arbitrary unit, + // but in the new layout they are in bytes. + // Here we arbitrarily multiply everything by 1G, + // such that 1 old capacity unit = 1GB in the new units. + // This is totally arbitrary and won't work for most users. + let cap_mul = 1024 * 1024 * 1024; + let roles = multiply_all_capacities(previous.roles, cap_mul); + let staging_roles = multiply_all_capacities(previous.staging, cap_mul); + let node_id_vec = previous.node_id_vec; + + // Determine partition size + let mut tmp = previous.ring_assignation_data.clone(); + tmp.sort(); + let partition_size = tmp + .into_iter() + .dedup_with_count() + .map(|(npart, node)| { + roles + .get(&node_id_vec[node as usize]) + .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) + .unwrap_or(0) / npart as u64 + }) + .min() + .unwrap_or(0); + + // Determine zone redundancy parameter + let zone_redundancy = std::cmp::min( + previous.replication_factor, + roles + .items() + .iter() + .filter_map(|(_, _, r)| r.0.as_ref().map(|p| p.zone.as_str())) + .collect::<HashSet<&str>>() + .len(), + ); + let parameters = LayoutParameters { zone_redundancy }; + + let mut res = Self { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size, + parameters, + roles, + node_id_vec, + ring_assignment_data: previous.ring_assignation_data, + staging_parameters: Lww::new(parameters), + staging_roles, + staging_hash: [0u8; 32].into(), + }; + res.staging_hash = res.calculate_staging_hash(); + res + } + } + + fn multiply_all_capacities( + old_roles: LwwMap<Uuid, NodeRoleV>, + mul: u64, + ) -> LwwMap<Uuid, NodeRoleV> { + let mut new_roles = LwwMap::new(); + for (node, ts, role) in old_roles.items() { + let mut role = role.clone(); + if let NodeRoleV(Some(NodeRole { + capacity: Some(ref mut cap), + .. + })) = role + { + *cap = *cap * mul; + } + new_roles.merge_raw(node, *ts, &role); + } + new_roles + } } +pub use v09::*; + impl AutoCrdt for LayoutParameters { const WARN_IF_DIFFERENT: bool = true; } -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct NodeRoleV(pub Option<NodeRole>); - impl AutoCrdt for NodeRoleV { const WARN_IF_DIFFERENT: bool = true; } -/// The user-assigned roles of cluster nodes -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct NodeRole { - /// Datacenter at which this entry belong. This information is used to - /// perform a better geodistribution - pub zone: String, - /// The capacity of the node - /// If this is set to None, the node does not participate in storing data for the system - /// and is only active as an API gateway to other nodes - pub capacity: Option<u64>, - /// A set of tags to recognize the node - pub tags: Vec<String>, -} - impl NodeRole { pub fn capacity_string(&self) -> String { match self.capacity { @@ -104,7 +224,7 @@ impl NodeRole { } } -// Implementation of the ClusterLayout methods unrelated to the assignation algorithm. +// Implementation of the ClusterLayout methods unrelated to the assignment algorithm. impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { // We set the default zone redundancy to be equal to the replication factor, @@ -122,7 +242,7 @@ impl ClusterLayout { partition_size: 0, roles: LwwMap::new(), node_id_vec: Vec::new(), - ring_assignation_data: Vec::new(), + ring_assignment_data: Vec::new(), parameters, staging_parameters, staging_roles: empty_lwwmap, @@ -134,7 +254,7 @@ impl ClusterLayout { fn calculate_staging_hash(&self) -> Hash { let hashed_tuple = (&self.staging_roles, &self.staging_parameters); - blake2sum(&rmp_to_vec_all_named(&hashed_tuple).unwrap()[..]) + blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) } pub fn merge(&mut self, other: &ClusterLayout) -> bool { @@ -181,7 +301,7 @@ To know the correct value of the new layout version, invoke `garage layout show` self.staging_roles.clear(); self.staging_hash = self.calculate_staging_hash(); - let msg = self.calculate_partition_assignation()?; + let msg = self.calculate_partition_assignment()?; self.version += 1; @@ -274,7 +394,7 @@ To know the correct value of the new layout version, invoke `garage layout show` for (i, id) in self.node_id_vec.iter().enumerate() { if id == uuid { let mut count = 0; - for nod in self.ring_assignation_data.iter() { + for nod in self.ring_assignment_data.iter() { if i as u8 == *nod { count += 1 } @@ -299,7 +419,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } /// Check a cluster layout for internal consistency - /// (assignation, roles, parameters, partition size) + /// (assignment, roles, parameters, partition size) /// returns true if consistent, false if error pub fn check(&self) -> Result<(), String> { // Check that the hash of the staging data is correct @@ -323,37 +443,37 @@ To know the correct value of the new layout version, invoke `garage layout show` return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes)); } - // Check that the assignation data has the correct length - let expected_assignation_data_len = (1 << PARTITION_BITS) * self.replication_factor; - if self.ring_assignation_data.len() != expected_assignation_data_len { + // Check that the assignment data has the correct length + let expected_assignment_data_len = (1 << PARTITION_BITS) * self.replication_factor; + if self.ring_assignment_data.len() != expected_assignment_data_len { return Err(format!( - "ring_assignation_data has incorrect length {} instead of {}", - self.ring_assignation_data.len(), - expected_assignation_data_len + "ring_assignment_data has incorrect length {} instead of {}", + self.ring_assignment_data.len(), + expected_assignment_data_len )); } // Check that the assigned nodes are correct identifiers // of nodes that are assigned a role // and that role is not the role of a gateway nodes - for x in self.ring_assignation_data.iter() { + for x in self.ring_assignment_data.iter() { if *x as usize >= self.node_id_vec.len() { return Err(format!( - "ring_assignation_data contains invalid node id {}", + "ring_assignment_data contains invalid node id {}", *x )); } let node = self.node_id_vec[*x as usize]; match self.roles.get(&node) { Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (), - _ => return Err("ring_assignation_data contains id of a gateway node".into()), + _ => return Err("ring_assignment_data contains id of a gateway node".into()), } } // Check that every partition is associated to distinct nodes let rf = self.replication_factor; for p in 0..(1 << PARTITION_BITS) { - let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec(); + let nodes_of_p = self.ring_assignment_data[rf * p..rf * (p + 1)].to_vec(); if nodes_of_p.iter().unique().count() != rf { return Err(format!("partition does not contain {} unique node ids", rf)); } @@ -376,7 +496,7 @@ To know the correct value of the new layout version, invoke `garage layout show` // Check that the nodes capacities is consistent with the stored partitions let mut node_usage = vec![0; MAX_NODE_NUMBER]; - for n in self.ring_assignation_data.iter() { + for n in self.ring_assignment_data.iter() { node_usage[*n as usize] += 1; } for (n, usage) in node_usage.iter().enumerate() { @@ -413,21 +533,21 @@ To know the correct value of the new layout version, invoke `garage layout show` } } -// Implementation of the ClusterLayout methods related to the assignation algorithm. +// Implementation of the ClusterLayout methods related to the assignment algorithm. impl ClusterLayout { - /// This function calculates a new partition-to-node assignation. - /// The computed assignation respects the node replication factor + /// This function calculates a new partition-to-node assignment. + /// The computed assignment respects the node replication factor /// and the zone redundancy parameter It maximizes the capacity of a /// partition (assuming all partitions have the same size). - /// Among such optimal assignation, it minimizes the distance to - /// the former assignation (if any) to minimize the amount of + /// Among such optimal assignment, it minimizes the distance to + /// the former assignment (if any) to minimize the amount of /// data to be moved. /// Staged role changes must be merged with nodes roles before calling this function, /// hence it must only be called from apply_staged_changes() and hence is not public. - fn calculate_partition_assignation(&mut self) -> Result<Message, Error> { + fn calculate_partition_assignment(&mut self) -> Result<Message, Error> { // We update the node ids, since the node role list might have changed with the - // changes in the layout. We retrieve the old_assignation reframed with new ids - let old_assignation_opt = self.update_node_id_vec()?; + // changes in the layout. We retrieve the old_assignment reframed with new ids + let old_assignment_opt = self.update_node_id_vec()?; let mut msg = Message::new(); msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into()); @@ -465,7 +585,7 @@ impl ClusterLayout { // optimality. let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; - if old_assignation_opt != None { + if old_assignment_opt != None { msg.push(format!( "Optimal size of a partition: {} (was {} in the previous layout).", ByteSize::b(partition_size).to_string_as(false), @@ -488,16 +608,16 @@ impl ClusterLayout { ); } - // We compute a first flow/assignation that is heuristically close to the previous - // assignation - let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?; - if let Some(assoc) = &old_assignation_opt { - // We minimize the distance to the previous assignation. + // We compute a first flow/assignment that is heuristically close to the previous + // assignment + let mut gflow = self.compute_candidate_assignment(&zone_to_id, &old_assignment_opt)?; + if let Some(assoc) = &old_assignment_opt { + // We minimize the distance to the previous assignment. self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; } // We display statistics of the computation - msg.extend(self.output_stat(&gflow, &old_assignation_opt, &zone_to_id, &id_to_zone)?); + msg.extend(self.output_stat(&gflow, &old_assignment_opt, &zone_to_id, &id_to_zone)?); msg.push("".to_string()); // We update the layout structure @@ -513,10 +633,10 @@ impl ClusterLayout { } /// The LwwMap of node roles might have changed. This function updates the node_id_vec - /// and returns the assignation given by ring, with the new indices of the nodes, and + /// and returns the assignment given by ring, with the new indices of the nodes, and /// None if the node is not present anymore. - /// We work with the assumption that only this function and calculate_new_assignation - /// do modify assignation_ring and node_id_vec. + /// We work with the assumption that only this function and calculate_new_assignment + /// do modify assignment_ring and node_id_vec. fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> { // (1) We compute the new node list // Non gateway nodes should be coded on 8bits, hence they must be first in the list @@ -554,15 +674,15 @@ impl ClusterLayout { // (2) We retrieve the old association // We rewrite the old association with the new indices. We only consider partition - // to node assignations where the node is still in use. - if self.ring_assignation_data.is_empty() { + // to node assignments where the node is still in use. + if self.ring_assignment_data.is_empty() { // This is a new association return Ok(None); } - if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { + if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor { return Err(Error::Message( - "The old assignation does not have a size corresponding to \ + "The old assignment does not have a size corresponding to \ the old replication factor or the number of partitions." .into(), )); @@ -577,11 +697,11 @@ impl ClusterLayout { uuid_to_new_id.insert(*uuid, i); } - let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS]; + let mut old_assignment = vec![Vec::<usize>::new(); NB_PARTITIONS]; let rf = self.replication_factor; - for (p, old_assign_p) in old_assignation.iter_mut().enumerate() { - for old_id in &self.ring_assignation_data[p * rf..(p + 1) * rf] { + for (p, old_assign_p) in old_assignment.iter_mut().enumerate() { + for old_id in &self.ring_assignment_data[p * rf..(p + 1) * rf] { let uuid = old_node_id_vec[*old_id as usize]; if uuid_to_new_id.contains_key(&uuid) { old_assign_p.push(uuid_to_new_id[&uuid]); @@ -590,9 +710,9 @@ impl ClusterLayout { } // We write the ring - self.ring_assignation_data = Vec::<CompactNodeType>::new(); + self.ring_assignment_data = Vec::<CompactNodeType>::new(); - Ok(Some(old_assignation)) + Ok(Some(old_assignment)) } /// This function generates ids for the zone of the nodes appearing in @@ -659,11 +779,11 @@ impl ClusterLayout { } /// Generates the graph to compute the maximal flow corresponding to the optimal - /// partition assignation. + /// partition assignment. /// exclude_assoc is the set of (partition, node) association that we are forbidden /// to use (hence we do not add the corresponding edge to the graph). This parameter /// is used to compute a first flow that uses only edges appearing in the previous - /// assignation. This produces a solution that heuristically should be close to the + /// assignment. This produces a solution that heuristically should be close to the /// previous one. fn generate_flow_graph( &self, @@ -705,14 +825,14 @@ impl ClusterLayout { Ok(g) } - /// This function computes a first optimal assignation (in the form of a flow graph). - fn compute_candidate_assignation( + /// This function computes a first optimal assignment (in the form of a flow graph). + fn compute_candidate_assignment( &self, zone_to_id: &HashMap<String, usize>, prev_assign_opt: &Option<Vec<Vec<usize>>>, ) -> Result<Graph<FlowEdge>, Error> { // We list the (partition,node) associations that are not used in the - // previous assignation + // previous assignment let mut exclude_edge = HashSet::<(usize, usize)>::new(); if let Some(prev_assign) = prev_assign_opt { let nb_nodes = self.nongateway_nodes().len(); @@ -726,7 +846,7 @@ impl ClusterLayout { } } - // We compute the best flow using only the edges used in the previous assignation + // We compute the best flow using only the edges used in the previous assignment let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?; g.compute_maximal_flow()?; @@ -742,7 +862,7 @@ impl ClusterLayout { } /// This function updates the flow graph gflow to minimize the distance between - /// its corresponding assignation and the previous one + /// its corresponding assignment and the previous one fn minimize_rebalance_load( &self, gflow: &mut Graph<FlowEdge>, @@ -750,7 +870,7 @@ impl ClusterLayout { prev_assign: &[Vec<usize>], ) -> Result<(), Error> { // We define a cost function on the edges (pairs of vertices) corresponding - // to the distance between the two assignations. + // to the distance between the two assignments. let mut cost = CostFunction::new(); for (p, assoc_p) in prev_assign.iter().enumerate() { for n in assoc_p.iter() { @@ -769,25 +889,25 @@ impl ClusterLayout { Ok(()) } - /// This function updates the assignation ring from the flow graph. + /// This function updates the assignment ring from the flow graph. fn update_ring_from_flow( &mut self, nb_zones: usize, gflow: &Graph<FlowEdge>, ) -> Result<(), Error> { - self.ring_assignation_data = Vec::<CompactNodeType>::new(); + self.ring_assignment_data = Vec::<CompactNodeType>::new(); for p in 0..NB_PARTITIONS { for z in 0..nb_zones { let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; for vertex in assoc_vertex.iter() { if let Vertex::N(n) = vertex { - self.ring_assignation_data.push((*n).try_into().unwrap()); + self.ring_assignment_data.push((*n).try_into().unwrap()); } } } } - if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { + if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor { return Err(Error::Message( "Critical Error : the association ring we produced does not \ have the right size." @@ -798,7 +918,7 @@ impl ClusterLayout { } /// This function returns a message summing up the partition repartition of the new - /// layout, and other statistics of the partition assignation computation. + /// layout, and other statistics of the partition assignment computation. fn output_stat( &self, gflow: &Graph<FlowEdge>, @@ -958,7 +1078,7 @@ mod tests { // This function checks that the partition size S computed is at least better than the // one given by a very naive algorithm. To do so, we try to run the naive algorithm - // assuming a partion size of S+1. If we succed, it means that the optimal assignation + // assuming a partion size of S+1. If we succed, it means that the optimal assignment // was not optimal. The naive algorithm is the following : // - we compute the max number of partitions associated to every node, capped at the // partition number. It gives the number of tokens of every node. @@ -1063,7 +1183,7 @@ mod tests { } #[test] - fn test_assignation() { + fn test_assignment() { let mut node_id_vec = vec![1, 2, 3]; let mut node_capacity_vec = vec![4000, 1000, 2000]; let mut node_zone_vec = vec!["A", "B", "C"] diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 248b9b52..f734942d 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -10,6 +10,7 @@ mod kubernetes; pub mod graph_algo; pub mod layout; +pub mod replication_mode; pub mod ring; pub mod system; diff --git a/src/rpc/replication_mode.rs b/src/rpc/replication_mode.rs new file mode 100644 index 00000000..e244e063 --- /dev/null +++ b/src/rpc/replication_mode.rs @@ -0,0 +1,57 @@ +#[derive(Clone, Copy)] +pub enum ReplicationMode { + None, + TwoWay, + TwoWayDangerous, + ThreeWay, + ThreeWayDegraded, + ThreeWayDangerous, +} + +impl ReplicationMode { + pub fn parse(v: &str) -> Option<Self> { + match v { + "none" | "1" => Some(Self::None), + "2" => Some(Self::TwoWay), + "2-dangerous" => Some(Self::TwoWayDangerous), + "3" => Some(Self::ThreeWay), + "3-degraded" => Some(Self::ThreeWayDegraded), + "3-dangerous" => Some(Self::ThreeWayDangerous), + _ => None, + } + } + + pub fn control_write_max_faults(&self) -> usize { + match self { + Self::None => 0, + _ => 1, + } + } + + pub fn replication_factor(&self) -> usize { + match self { + Self::None => 1, + Self::TwoWay | Self::TwoWayDangerous => 2, + Self::ThreeWay | Self::ThreeWayDegraded | Self::ThreeWayDangerous => 3, + } + } + + pub fn read_quorum(&self) -> usize { + match self { + Self::None => 1, + Self::TwoWay | Self::TwoWayDangerous => 1, + Self::ThreeWay => 2, + Self::ThreeWayDegraded | Self::ThreeWayDangerous => 1, + } + } + + pub fn write_quorum(&self) -> usize { + match self { + Self::None => 1, + Self::TwoWay => 2, + Self::TwoWayDangerous => 1, + Self::ThreeWay | Self::ThreeWayDegraded => 2, + Self::ThreeWayDangerous => 1, + } + } +} diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 743a5cba..6a2e5c72 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -63,12 +63,12 @@ struct RingEntry { impl Ring { pub(crate) fn new(layout: ClusterLayout, replication_factor: usize) -> Self { if replication_factor != layout.replication_factor { - warn!("Could not build ring: replication factor does not match between local configuration and network role assignation."); + warn!("Could not build ring: replication factor does not match between local configuration and network role assignment."); return Self::empty(layout, replication_factor); } - if layout.ring_assignation_data.len() != replication_factor * (1 << PARTITION_BITS) { - warn!("Could not build ring: network role assignation data has invalid length"); + if layout.ring_assignment_data.len() != replication_factor * (1 << PARTITION_BITS) { + warn!("Could not build ring: network role assignment data has invalid length"); return Self::empty(layout, replication_factor); } @@ -78,7 +78,7 @@ impl Ring { let top = (i as u16) << (16 - PARTITION_BITS); let mut nodes_buf = [0u8; MAX_REPLICATION]; nodes_buf[..replication_factor].copy_from_slice( - &layout.ring_assignation_data + &layout.ring_assignment_data [replication_factor * i..replication_factor * (i + 1)], ); RingEntry { diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 949aced6..1ec250c3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -5,7 +5,6 @@ use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; -use futures_util::future::FutureExt; use tokio::select; use tokio::sync::watch; @@ -24,7 +23,6 @@ pub use netapp::message::{ use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -94,7 +92,6 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -104,7 +101,6 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, rpc_timeout: Option<Duration>, ) -> Self { @@ -113,7 +109,6 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - background, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -377,16 +372,13 @@ impl RpcHelper { if !resp_stream.is_empty() { // Continue remaining requests in background. - // Continue the remaining requests immediately using tokio::spawn - // but enqueue a task in the background runner - // to ensure that the process won't exit until the requests are done - // (if we had just enqueued the resp_stream.collect directly in the background runner, - // the requests might have been put on hold in the background runner's queue, - // in which case they might timeout or otherwise fail) - let wait_finished_fut = tokio::spawn(async move { + // Note: these requests can get interrupted on process shutdown, + // we must not count on them being executed for certain. + // For all background things that have to happen with certainty, + // they have to be put in a proper queue that is persisted to disk. + tokio::spawn(async move { resp_stream.collect::<Vec<Result<_, _>>>().await; }); - self.0.background.spawn(wait_finished_fut.map(|_| Ok(()))); } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 224fbabb..1f4d86e7 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -21,7 +21,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::background::BackgroundRunner; use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; @@ -35,6 +34,7 @@ use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; +use crate::replication_mode::*; use crate::ring::*; use crate::rpc_helper::*; @@ -49,8 +49,6 @@ pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008 /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; -pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret"; - /// RPC messages related to membership #[derive(Debug, Serialize, Deserialize, Clone)] pub enum SystemRpc { @@ -75,13 +73,17 @@ impl Rpc for SystemRpc { type Response = Result<SystemRpc, Error>; } +#[derive(Serialize, Deserialize)] +pub struct PeerList(Vec<(Uuid, SocketAddr)>); +impl garage_util::migrate::InitialFormat for PeerList {} + /// This node's membership manager pub struct System { /// The id of this node pub id: Uuid, persist_cluster_layout: Persister<ClusterLayout>, - persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>, + persist_peer_list: Persister<PeerList>, local_status: ArcSwap<NodeStatus>, node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>, @@ -102,15 +104,13 @@ pub struct System { #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option<KubernetesDiscoveryConfig>, + replication_mode: ReplicationMode, replication_factor: usize, /// The ring pub ring: watch::Receiver<Arc<Ring>>, update_ring: Mutex<watch::Sender<Arc<Ring>>>, - /// The job runner of this node - pub background: Arc<BackgroundRunner>, - /// Path to metadata directory pub metadata_dir: PathBuf, } @@ -136,6 +136,37 @@ pub struct KnownNodeInfo { pub status: NodeStatus, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct ClusterHealth { + /// The current health status of the cluster (see below) + pub status: ClusterHealthStatus, + /// Number of nodes already seen once in the cluster + pub known_nodes: usize, + /// Number of nodes currently connected + pub connected_nodes: usize, + /// Number of storage nodes declared in the current layout + pub storage_nodes: usize, + /// Number of storage nodes currently connected + pub storage_nodes_ok: usize, + /// Number of partitions in the layout + pub partitions: usize, + /// Number of partitions for which we have a quorum of connected nodes + pub partitions_quorum: usize, + /// Number of partitions for which all storage nodes are connected + pub partitions_all_ok: usize, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum ClusterHealthStatus { + /// All nodes are available + Healthy, + /// Some storage nodes are unavailable, but quorum is stil + /// achieved for all partitions + Degraded, + /// Quorum is not available for some partitions + Unavailable, +} + pub fn read_node_id(metadata_dir: &Path) -> Result<NodeID, Error> { let mut pubkey_file = metadata_dir.to_path_buf(); pubkey_file.push("node_key.pub"); @@ -199,10 +230,11 @@ impl System { /// Create this node's membership manager pub fn new( network_key: NetworkKey, - background: Arc<BackgroundRunner>, - replication_factor: usize, + replication_mode: ReplicationMode, config: &Config, ) -> Result<Arc<Self>, Error> { + let replication_factor = replication_mode.replication_factor(); + let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!( @@ -319,11 +351,11 @@ impl System { rpc: RpcHelper::new( netapp.id.into(), fullmesh, - background.clone(), ring.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), system_endpoint, + replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] @@ -336,7 +368,6 @@ impl System { ring, update_ring: Mutex::new(update_ring), - background, metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); @@ -408,17 +439,14 @@ impl System { )) })?; let mut errors = vec![]; - for ip in addrs.iter() { - match self - .netapp - .clone() - .try_connect(*ip, pubkey) - .await - .err_context(CONNECT_ERROR_MESSAGE) - { + for addr in addrs.iter() { + match self.netapp.clone().try_connect(*addr, pubkey).await { Ok(()) => return Ok(()), Err(e) => { - errors.push((*ip, e)); + errors.push(( + *addr, + Error::Message(connect_error_message(*addr, pubkey, e)), + )); } } } @@ -429,59 +457,125 @@ impl System { } } + pub fn health(&self) -> ClusterHealth { + let ring: Arc<_> = self.ring.borrow().clone(); + let quorum = self.replication_mode.write_quorum(); + let replication_factor = self.replication_factor; + + let nodes = self + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::<HashMap<Uuid, _>>(); + let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); + + let storage_nodes = ring + .layout + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some())) + .collect::<Vec<_>>(); + let storage_nodes_ok = storage_nodes + .iter() + .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) + .count(); + + let partitions = ring.partitions(); + let partitions_n_up = partitions + .iter() + .map(|(_, h)| { + let pn = ring.get_nodes(h, ring.replication_factor); + pn.iter() + .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) + .count() + }) + .collect::<Vec<usize>>(); + let partitions_all_ok = partitions_n_up + .iter() + .filter(|c| **c == replication_factor) + .count(); + let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count(); + + let status = + if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() { + ClusterHealthStatus::Healthy + } else if partitions_quorum == partitions.len() { + ClusterHealthStatus::Degraded + } else { + ClusterHealthStatus::Unavailable + }; + + ClusterHealth { + status, + known_nodes: nodes.len(), + connected_nodes, + storage_nodes: storage_nodes.len(), + storage_nodes_ok, + partitions: partitions.len(), + partitions_quorum, + partitions_all_ok, + } + } + // ---- INTERNALS ---- #[cfg(feature = "consul-discovery")] - async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { + async fn advertise_to_consul(self: Arc<Self>) { let c = match &self.consul_discovery { Some(c) => c, - _ => return Ok(()), + _ => return, }; let rpc_public_addr = match self.rpc_public_addr { Some(addr) => addr, None => { warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected."); - return Ok(()); + return; } }; - c.publish_consul_service( - self.netapp.id, - &self.local_status.load_full().hostname, - rpc_public_addr, - ) - .await - .err_context("Error while publishing Consul service") + if let Err(e) = c + .publish_consul_service( + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + { + error!("Error while publishing Consul service: {}", e); + } } #[cfg(feature = "kubernetes-discovery")] - async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> { + async fn advertise_to_kubernetes(self: Arc<Self>) { let k = match &self.kubernetes_discovery { Some(k) => k, - _ => return Ok(()), + _ => return, }; let rpc_public_addr = match self.rpc_public_addr { Some(addr) => addr, None => { warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected."); - return Ok(()); + return; } }; - publish_kubernetes_node( + if let Err(e) = publish_kubernetes_node( k, self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, ) .await - .err_context("Error while publishing node to kubernetes") + { + error!("Error while publishing node to Kubernetes: {}", e); + } } /// Save network configuration to disc - async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> { + async fn save_cluster_layout(&self) -> Result<(), Error> { let ring: Arc<Ring> = self.ring.borrow().clone(); self.persist_cluster_layout .save_async(&ring.layout) @@ -533,11 +627,7 @@ impl System { if info.cluster_layout_version > local_info.cluster_layout_version || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { - let self2 = self.clone(); - self.background.spawn_cancellable(async move { - self2.pull_cluster_layout(from).await; - Ok(()) - }); + tokio::spawn(self.clone().pull_cluster_layout(from)); } self.node_status @@ -579,18 +669,21 @@ impl System { drop(update_ring); let self2 = self.clone(); - self.background.spawn_cancellable(async move { - self2 + tokio::spawn(async move { + if let Err(e) = self2 .rpc .broadcast( &self2.system_endpoint, SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) - .await?; - Ok(()) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } }); - self.background.spawn(self.clone().save_cluster_layout()); + + self.save_cluster_layout().await?; } Ok(SystemRpc::Ok) @@ -637,7 +730,7 @@ impl System { // Add peer list from list stored on disk if let Ok(peers) = self.persist_peer_list.load_async().await { - ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr))) + ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr))) } // Fetch peer list from Consul @@ -676,12 +769,12 @@ impl System { } for (node_id, node_addr) in ping_list { - tokio::spawn( - self.netapp - .clone() - .try_connect(node_addr, node_id) - .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)), - ); + let self2 = self.clone(); + tokio::spawn(async move { + if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await { + error!("{}", connect_error_message(node_addr, node_id, e)); + } + }); } } @@ -690,11 +783,10 @@ impl System { } #[cfg(feature = "consul-discovery")] - self.background.spawn(self.clone().advertise_to_consul()); + tokio::spawn(self.clone().advertise_to_consul()); #[cfg(feature = "kubernetes-discovery")] - self.background - .spawn(self.clone().advertise_to_kubernetes()); + tokio::spawn(self.clone().advertise_to_kubernetes()); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { @@ -718,12 +810,16 @@ impl System { // and append it to the list we are about to save, // so that no peer ID gets lost in the process. if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await { - prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id)); - peer_list.extend(prev_peer_list); + prev_peer_list + .0 + .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id)); + peer_list.extend(prev_peer_list.0); } // Save new peer list to file - self.persist_peer_list.save_async(&peer_list).await + self.persist_peer_list + .save_async(&PeerList(peer_list)) + .await } async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) { @@ -784,3 +880,11 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { ret } + +fn connect_error_message( + addr: SocketAddr, + pubkey: ed25519::PublicKey, + e: netapp::error::Error, +) -> String { + format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e) +} |