diff options
Diffstat (limited to 'src/rpc/layout.rs')
-rw-r--r-- | src/rpc/layout.rs | 356 |
1 files changed, 238 insertions, 118 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 1cef44d1..b6c2fd27 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -5,10 +5,9 @@ use std::collections::HashSet; use bytesize::ByteSize; use itertools::Itertools; -use serde::{Deserialize, Serialize}; - use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap}; use garage_util::data::*; +use garage_util::encode::nonversioned_encode; use garage_util::error::*; use crate::graph_algo::*; @@ -22,75 +21,196 @@ const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; // The Message type will be used to collect information on the algorithm. type Message = Vec<String>; -/// The layout of the cluster, i.e. the list of roles -/// which are assigned to each cluster node -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ClusterLayout { - pub version: u64, - - pub replication_factor: usize, - - /// This attribute is only used to retain the previously computed partition size, - /// to know to what extent does it change with the layout update. - pub partition_size: u64, - /// Parameters used to compute the assignation currently given by - /// ring_assignation_data - pub parameters: LayoutParameters, - - pub roles: LwwMap<Uuid, NodeRoleV>, - - /// node_id_vec: a vector of node IDs with a role assigned - /// in the system (this includes gateway nodes). - /// The order here is different than the vec stored by `roles`, because: - /// 1. non-gateway nodes are first so that they have lower numbers holding - /// in u8 (the number of non-gateway nodes is at most 256). - /// 2. nodes that don't have a role are excluded (but they need to - /// stay in the CRDT as tombstones) - pub node_id_vec: Vec<Uuid>, - /// the assignation of data partitions to node, the values - /// are indices in node_id_vec - #[serde(with = "serde_bytes")] - pub ring_assignation_data: Vec<CompactNodeType>, - - /// Parameters to be used in the next partition assignation computation. - pub staging_parameters: Lww<LayoutParameters>, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap<Uuid, NodeRoleV>, - pub staging_hash: Hash, +mod v08 { + use crate::ring::CompactNodeType; + use garage_util::crdt::LwwMap; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + pub roles: LwwMap<Uuid, NodeRoleV>, + + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec<Uuid>, + /// the assignation of data partitions to node, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec<CompactNodeType>, + + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRoleV(pub Option<NodeRole>); + + /// The user-assigned roles of cluster nodes + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRole { + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution + pub zone: String, + /// The capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option<u64>, + /// A set of tags to recognize the node + pub tags: Vec<String>, + } + + impl garage_util::migrate::InitialFormat for ClusterLayout {} } -/// This struct is used to set the parameters to be used in the assignation computation -/// algorithm. It is stored as a Crdt. -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct LayoutParameters { - pub zone_redundancy: usize, +mod v09 { + use super::v08; + use crate::ring::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + pub use v08::{NodeRole, NodeRoleV}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap<Uuid, NodeRoleV>, + + /// see comment in v08::ClusterLayout + pub node_id_vec: Vec<Uuid>, + /// see comment in v08::ClusterLayout + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec<CompactNodeType>, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + /// This struct is used to set the parameters to be used in the assignment computation + /// algorithm. It is stored as a Crdt. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct LayoutParameters { + pub zone_redundancy: usize, + } + + impl garage_util::migrate::Migrate for ClusterLayout { + const VERSION_MARKER: &'static [u8] = b"Glayout09"; + + type Previous = v08::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + use itertools::Itertools; + use std::collections::HashSet; + + // In the old layout, capacities are in an arbitrary unit, + // but in the new layout they are in bytes. + // Here we arbitrarily multiply everything by 1G, + // such that 1 old capacity unit = 1GB in the new units. + // This is totally arbitrary and won't work for most users. + let cap_mul = 1024 * 1024 * 1024; + let roles = multiply_all_capacities(previous.roles, cap_mul); + let staging_roles = multiply_all_capacities(previous.staging, cap_mul); + let node_id_vec = previous.node_id_vec; + + // Determine partition size + let mut tmp = previous.ring_assignation_data.clone(); + tmp.sort(); + let partition_size = tmp + .into_iter() + .dedup_with_count() + .map(|(npart, node)| { + roles + .get(&node_id_vec[node as usize]) + .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) + .unwrap_or(0) / npart as u64 + }) + .min() + .unwrap_or(0); + + // Determine zone redundancy parameter + let zone_redundancy = std::cmp::min( + previous.replication_factor, + roles + .items() + .iter() + .filter_map(|(_, _, r)| r.0.as_ref().map(|p| p.zone.as_str())) + .collect::<HashSet<&str>>() + .len(), + ); + let parameters = LayoutParameters { zone_redundancy }; + + let mut res = Self { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size, + parameters, + roles, + node_id_vec, + ring_assignment_data: previous.ring_assignation_data, + staging_parameters: Lww::new(parameters), + staging_roles, + staging_hash: [0u8; 32].into(), + }; + res.staging_hash = res.calculate_staging_hash(); + res + } + } + + fn multiply_all_capacities( + old_roles: LwwMap<Uuid, NodeRoleV>, + mul: u64, + ) -> LwwMap<Uuid, NodeRoleV> { + let mut new_roles = LwwMap::new(); + for (node, ts, role) in old_roles.items() { + let mut role = role.clone(); + if let NodeRoleV(Some(NodeRole { + capacity: Some(ref mut cap), + .. + })) = role + { + *cap = *cap * mul; + } + new_roles.merge_raw(node, *ts, &role); + } + new_roles + } } +pub use v09::*; + impl AutoCrdt for LayoutParameters { const WARN_IF_DIFFERENT: bool = true; } -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct NodeRoleV(pub Option<NodeRole>); - impl AutoCrdt for NodeRoleV { const WARN_IF_DIFFERENT: bool = true; } -/// The user-assigned roles of cluster nodes -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct NodeRole { - /// Datacenter at which this entry belong. This information is used to - /// perform a better geodistribution - pub zone: String, - /// The capacity of the node - /// If this is set to None, the node does not participate in storing data for the system - /// and is only active as an API gateway to other nodes - pub capacity: Option<u64>, - /// A set of tags to recognize the node - pub tags: Vec<String>, -} - impl NodeRole { pub fn capacity_string(&self) -> String { match self.capacity { @@ -104,7 +224,7 @@ impl NodeRole { } } -// Implementation of the ClusterLayout methods unrelated to the assignation algorithm. +// Implementation of the ClusterLayout methods unrelated to the assignment algorithm. impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { // We set the default zone redundancy to be equal to the replication factor, @@ -122,7 +242,7 @@ impl ClusterLayout { partition_size: 0, roles: LwwMap::new(), node_id_vec: Vec::new(), - ring_assignation_data: Vec::new(), + ring_assignment_data: Vec::new(), parameters, staging_parameters, staging_roles: empty_lwwmap, @@ -134,7 +254,7 @@ impl ClusterLayout { fn calculate_staging_hash(&self) -> Hash { let hashed_tuple = (&self.staging_roles, &self.staging_parameters); - blake2sum(&rmp_to_vec_all_named(&hashed_tuple).unwrap()[..]) + blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) } pub fn merge(&mut self, other: &ClusterLayout) -> bool { @@ -181,7 +301,7 @@ To know the correct value of the new layout version, invoke `garage layout show` self.staging_roles.clear(); self.staging_hash = self.calculate_staging_hash(); - let msg = self.calculate_partition_assignation()?; + let msg = self.calculate_partition_assignment()?; self.version += 1; @@ -274,7 +394,7 @@ To know the correct value of the new layout version, invoke `garage layout show` for (i, id) in self.node_id_vec.iter().enumerate() { if id == uuid { let mut count = 0; - for nod in self.ring_assignation_data.iter() { + for nod in self.ring_assignment_data.iter() { if i as u8 == *nod { count += 1 } @@ -299,7 +419,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } /// Check a cluster layout for internal consistency - /// (assignation, roles, parameters, partition size) + /// (assignment, roles, parameters, partition size) /// returns true if consistent, false if error pub fn check(&self) -> Result<(), String> { // Check that the hash of the staging data is correct @@ -323,37 +443,37 @@ To know the correct value of the new layout version, invoke `garage layout show` return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes)); } - // Check that the assignation data has the correct length - let expected_assignation_data_len = (1 << PARTITION_BITS) * self.replication_factor; - if self.ring_assignation_data.len() != expected_assignation_data_len { + // Check that the assignment data has the correct length + let expected_assignment_data_len = (1 << PARTITION_BITS) * self.replication_factor; + if self.ring_assignment_data.len() != expected_assignment_data_len { return Err(format!( - "ring_assignation_data has incorrect length {} instead of {}", - self.ring_assignation_data.len(), - expected_assignation_data_len + "ring_assignment_data has incorrect length {} instead of {}", + self.ring_assignment_data.len(), + expected_assignment_data_len )); } // Check that the assigned nodes are correct identifiers // of nodes that are assigned a role // and that role is not the role of a gateway nodes - for x in self.ring_assignation_data.iter() { + for x in self.ring_assignment_data.iter() { if *x as usize >= self.node_id_vec.len() { return Err(format!( - "ring_assignation_data contains invalid node id {}", + "ring_assignment_data contains invalid node id {}", *x )); } let node = self.node_id_vec[*x as usize]; match self.roles.get(&node) { Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (), - _ => return Err("ring_assignation_data contains id of a gateway node".into()), + _ => return Err("ring_assignment_data contains id of a gateway node".into()), } } // Check that every partition is associated to distinct nodes let rf = self.replication_factor; for p in 0..(1 << PARTITION_BITS) { - let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec(); + let nodes_of_p = self.ring_assignment_data[rf * p..rf * (p + 1)].to_vec(); if nodes_of_p.iter().unique().count() != rf { return Err(format!("partition does not contain {} unique node ids", rf)); } @@ -376,7 +496,7 @@ To know the correct value of the new layout version, invoke `garage layout show` // Check that the nodes capacities is consistent with the stored partitions let mut node_usage = vec![0; MAX_NODE_NUMBER]; - for n in self.ring_assignation_data.iter() { + for n in self.ring_assignment_data.iter() { node_usage[*n as usize] += 1; } for (n, usage) in node_usage.iter().enumerate() { @@ -413,21 +533,21 @@ To know the correct value of the new layout version, invoke `garage layout show` } } -// Implementation of the ClusterLayout methods related to the assignation algorithm. +// Implementation of the ClusterLayout methods related to the assignment algorithm. impl ClusterLayout { - /// This function calculates a new partition-to-node assignation. - /// The computed assignation respects the node replication factor + /// This function calculates a new partition-to-node assignment. + /// The computed assignment respects the node replication factor /// and the zone redundancy parameter It maximizes the capacity of a /// partition (assuming all partitions have the same size). - /// Among such optimal assignation, it minimizes the distance to - /// the former assignation (if any) to minimize the amount of + /// Among such optimal assignment, it minimizes the distance to + /// the former assignment (if any) to minimize the amount of /// data to be moved. /// Staged role changes must be merged with nodes roles before calling this function, /// hence it must only be called from apply_staged_changes() and hence is not public. - fn calculate_partition_assignation(&mut self) -> Result<Message, Error> { + fn calculate_partition_assignment(&mut self) -> Result<Message, Error> { // We update the node ids, since the node role list might have changed with the - // changes in the layout. We retrieve the old_assignation reframed with new ids - let old_assignation_opt = self.update_node_id_vec()?; + // changes in the layout. We retrieve the old_assignment reframed with new ids + let old_assignment_opt = self.update_node_id_vec()?; let mut msg = Message::new(); msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into()); @@ -465,7 +585,7 @@ impl ClusterLayout { // optimality. let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; - if old_assignation_opt != None { + if old_assignment_opt != None { msg.push(format!( "Optimal size of a partition: {} (was {} in the previous layout).", ByteSize::b(partition_size).to_string_as(false), @@ -488,16 +608,16 @@ impl ClusterLayout { ); } - // We compute a first flow/assignation that is heuristically close to the previous - // assignation - let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?; - if let Some(assoc) = &old_assignation_opt { - // We minimize the distance to the previous assignation. + // We compute a first flow/assignment that is heuristically close to the previous + // assignment + let mut gflow = self.compute_candidate_assignment(&zone_to_id, &old_assignment_opt)?; + if let Some(assoc) = &old_assignment_opt { + // We minimize the distance to the previous assignment. self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; } // We display statistics of the computation - msg.extend(self.output_stat(&gflow, &old_assignation_opt, &zone_to_id, &id_to_zone)?); + msg.extend(self.output_stat(&gflow, &old_assignment_opt, &zone_to_id, &id_to_zone)?); msg.push("".to_string()); // We update the layout structure @@ -513,10 +633,10 @@ impl ClusterLayout { } /// The LwwMap of node roles might have changed. This function updates the node_id_vec - /// and returns the assignation given by ring, with the new indices of the nodes, and + /// and returns the assignment given by ring, with the new indices of the nodes, and /// None if the node is not present anymore. - /// We work with the assumption that only this function and calculate_new_assignation - /// do modify assignation_ring and node_id_vec. + /// We work with the assumption that only this function and calculate_new_assignment + /// do modify assignment_ring and node_id_vec. fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> { // (1) We compute the new node list // Non gateway nodes should be coded on 8bits, hence they must be first in the list @@ -554,15 +674,15 @@ impl ClusterLayout { // (2) We retrieve the old association // We rewrite the old association with the new indices. We only consider partition - // to node assignations where the node is still in use. - if self.ring_assignation_data.is_empty() { + // to node assignments where the node is still in use. + if self.ring_assignment_data.is_empty() { // This is a new association return Ok(None); } - if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { + if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor { return Err(Error::Message( - "The old assignation does not have a size corresponding to \ + "The old assignment does not have a size corresponding to \ the old replication factor or the number of partitions." .into(), )); @@ -577,11 +697,11 @@ impl ClusterLayout { uuid_to_new_id.insert(*uuid, i); } - let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS]; + let mut old_assignment = vec![Vec::<usize>::new(); NB_PARTITIONS]; let rf = self.replication_factor; - for (p, old_assign_p) in old_assignation.iter_mut().enumerate() { - for old_id in &self.ring_assignation_data[p * rf..(p + 1) * rf] { + for (p, old_assign_p) in old_assignment.iter_mut().enumerate() { + for old_id in &self.ring_assignment_data[p * rf..(p + 1) * rf] { let uuid = old_node_id_vec[*old_id as usize]; if uuid_to_new_id.contains_key(&uuid) { old_assign_p.push(uuid_to_new_id[&uuid]); @@ -590,9 +710,9 @@ impl ClusterLayout { } // We write the ring - self.ring_assignation_data = Vec::<CompactNodeType>::new(); + self.ring_assignment_data = Vec::<CompactNodeType>::new(); - Ok(Some(old_assignation)) + Ok(Some(old_assignment)) } /// This function generates ids for the zone of the nodes appearing in @@ -659,11 +779,11 @@ impl ClusterLayout { } /// Generates the graph to compute the maximal flow corresponding to the optimal - /// partition assignation. + /// partition assignment. /// exclude_assoc is the set of (partition, node) association that we are forbidden /// to use (hence we do not add the corresponding edge to the graph). This parameter /// is used to compute a first flow that uses only edges appearing in the previous - /// assignation. This produces a solution that heuristically should be close to the + /// assignment. This produces a solution that heuristically should be close to the /// previous one. fn generate_flow_graph( &self, @@ -705,14 +825,14 @@ impl ClusterLayout { Ok(g) } - /// This function computes a first optimal assignation (in the form of a flow graph). - fn compute_candidate_assignation( + /// This function computes a first optimal assignment (in the form of a flow graph). + fn compute_candidate_assignment( &self, zone_to_id: &HashMap<String, usize>, prev_assign_opt: &Option<Vec<Vec<usize>>>, ) -> Result<Graph<FlowEdge>, Error> { // We list the (partition,node) associations that are not used in the - // previous assignation + // previous assignment let mut exclude_edge = HashSet::<(usize, usize)>::new(); if let Some(prev_assign) = prev_assign_opt { let nb_nodes = self.nongateway_nodes().len(); @@ -726,7 +846,7 @@ impl ClusterLayout { } } - // We compute the best flow using only the edges used in the previous assignation + // We compute the best flow using only the edges used in the previous assignment let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?; g.compute_maximal_flow()?; @@ -742,7 +862,7 @@ impl ClusterLayout { } /// This function updates the flow graph gflow to minimize the distance between - /// its corresponding assignation and the previous one + /// its corresponding assignment and the previous one fn minimize_rebalance_load( &self, gflow: &mut Graph<FlowEdge>, @@ -750,7 +870,7 @@ impl ClusterLayout { prev_assign: &[Vec<usize>], ) -> Result<(), Error> { // We define a cost function on the edges (pairs of vertices) corresponding - // to the distance between the two assignations. + // to the distance between the two assignments. let mut cost = CostFunction::new(); for (p, assoc_p) in prev_assign.iter().enumerate() { for n in assoc_p.iter() { @@ -769,25 +889,25 @@ impl ClusterLayout { Ok(()) } - /// This function updates the assignation ring from the flow graph. + /// This function updates the assignment ring from the flow graph. fn update_ring_from_flow( &mut self, nb_zones: usize, gflow: &Graph<FlowEdge>, ) -> Result<(), Error> { - self.ring_assignation_data = Vec::<CompactNodeType>::new(); + self.ring_assignment_data = Vec::<CompactNodeType>::new(); for p in 0..NB_PARTITIONS { for z in 0..nb_zones { let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; for vertex in assoc_vertex.iter() { if let Vertex::N(n) = vertex { - self.ring_assignation_data.push((*n).try_into().unwrap()); + self.ring_assignment_data.push((*n).try_into().unwrap()); } } } } - if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { + if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor { return Err(Error::Message( "Critical Error : the association ring we produced does not \ have the right size." @@ -798,7 +918,7 @@ impl ClusterLayout { } /// This function returns a message summing up the partition repartition of the new - /// layout, and other statistics of the partition assignation computation. + /// layout, and other statistics of the partition assignment computation. fn output_stat( &self, gflow: &Graph<FlowEdge>, @@ -958,7 +1078,7 @@ mod tests { // This function checks that the partition size S computed is at least better than the // one given by a very naive algorithm. To do so, we try to run the naive algorithm - // assuming a partion size of S+1. If we succed, it means that the optimal assignation + // assuming a partion size of S+1. If we succed, it means that the optimal assignment // was not optimal. The naive algorithm is the following : // - we compute the max number of partitions associated to every node, capped at the // partition number. It gives the number of tokens of every node. @@ -1063,7 +1183,7 @@ mod tests { } #[test] - fn test_assignation() { + fn test_assignment() { let mut node_id_vec = vec![1, 2, 3]; let mut node_capacity_vec = vec![4000, 1000, 2000]; let mut node_zone_vec = vec!["A", "B", "C"] |