diff options
Diffstat (limited to 'src/rpc/layout.rs')
-rw-r--r-- | src/rpc/layout.rs | 162 |
1 files changed, 93 insertions, 69 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index d2ed8af8..38e56b88 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -17,6 +17,8 @@ use crate::ring::*; use std::convert::TryInto; +const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; + //The Message type will be used to collect information on the algorithm. type Message = Vec<String>; @@ -28,9 +30,11 @@ pub struct ClusterLayout { pub replication_factor: usize, - //This attribute is only used to retain the previously computed partition size, - //to know to what extent does it change with the layout update. + ///This attribute is only used to retain the previously computed partition size, + ///to know to what extent does it change with the layout update. pub partition_size: u32, + ///Parameters used to compute the assignation currently given by + ///ring_assignation_data pub parameters: LayoutParameters, pub roles: LwwMap<Uuid, NodeRoleV>, @@ -48,8 +52,9 @@ pub struct ClusterLayout { #[serde(with = "serde_bytes")] pub ring_assignation_data: Vec<CompactNodeType>, - /// Role changes which are staged for the next version of the layout + /// Parameters to be used in the next partition assignation computation. pub staged_parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout pub staging: LwwMap<Uuid, NodeRoleV>, pub staging_hash: Hash, } @@ -65,8 +70,6 @@ impl AutoCrdt for LayoutParameters { const WARN_IF_DIFFERENT: bool = true; } -const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRoleV(pub Option<NodeRole>); @@ -77,12 +80,13 @@ impl AutoCrdt for NodeRoleV { /// The user-assigned roles of cluster nodes #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRole { - /// Datacenter at which this entry belong. This information might be used to perform a better - /// geodistribution + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution pub zone: String, - /// The (relative) capacity of the node + /// The capacity of the node /// If this is set to None, the node does not participate in storing data for the system /// and is only active as an API gateway to other nodes + // TODO : change the capacity to u64 and use byte unit input/output pub capacity: Option<u32>, /// A set of tags to recognize the node pub tags: Vec<String>, @@ -110,6 +114,7 @@ impl NodeRole { } } +//Implementation of the ClusterLayout methods unrelated to the assignation algorithm. impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { //We set the default zone redundancy to be equal to the replication factor, @@ -231,7 +236,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } ///Returns the uuids of the non_gateway nodes in self.node_id_vec. - pub fn useful_nodes(&self) -> Vec<Uuid> { + pub fn nongateway_nodes(&self) -> Vec<Uuid> { let mut result = Vec::<Uuid>::new(); for uuid in self.node_id_vec.iter() { match self.node_role(uuid) { @@ -291,13 +296,14 @@ To know the correct value of the new layout version, invoke `garage layout show` ///Returns the sum of capacities of non gateway nodes in the cluster pub fn get_total_capacity(&self) -> Result<u32, Error> { let mut total_capacity = 0; - for uuid in self.useful_nodes().iter() { + for uuid in self.nongateway_nodes().iter() { total_capacity += self.get_node_capacity(uuid)?; } Ok(total_capacity) } /// Check a cluster layout for internal consistency + /// (assignation, roles, parameters, partition size) /// returns true if consistent, false if error pub fn check(&self) -> bool { // Check that the hash of the staging data is correct @@ -377,7 +383,7 @@ To know the correct value of the new layout version, invoke `garage layout show` //Check that the partition size stored is the one computed by the asignation //algorithm. let cl2 = self.clone(); - let (_, zone_to_id) = cl2.generate_useful_zone_ids().expect("Critical Error"); + let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().expect("Critical Error"); match cl2.compute_optimal_partition_size(&zone_to_id) { Ok(s) if s != self.partition_size => return false, Err(_) => return false, @@ -388,6 +394,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } } +//Implementation of the ClusterLayout methods related to the assignation algorithm. impl ClusterLayout { /// This function calculates a new partition-to-node assignation. /// The computed assignation respects the node replication factor @@ -397,16 +404,13 @@ impl ClusterLayout { /// the former assignation (if any) to minimize the amount of /// data to be moved. // Staged role changes must be merged with nodes roles before calling this function, - // hence it must only be called from apply_staged_changes() and it is not public. + // hence it must only be called from apply_staged_changes() and hence is not public. fn calculate_partition_assignation(&mut self) -> Result<Message, Error> { - //The nodes might have been updated, some might have been deleted. - //So we need to first update the list of nodes and retrieve the - //assignation. - //We update the node ids, since the node role list might have changed with the - //changes in the layout. We retrieve the old_assignation reframed with the new ids + //changes in the layout. We retrieve the old_assignation reframed with new ids let old_assignation_opt = self.update_node_id_vec()?; + //We update the parameters self.parameters = self.staged_parameters.get().clone(); let mut msg = Message::new(); @@ -420,14 +424,14 @@ impl ClusterLayout { //We generate for once numerical ids for the zones of non gateway nodes, //to use them as indices in the flow graphs. - let (id_to_zone, zone_to_id) = self.generate_useful_zone_ids()?; + let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; - let nb_useful_nodes = self.useful_nodes().len(); - if nb_useful_nodes < self.replication_factor { + let nb_nongateway_nodes = self.nongateway_nodes().len(); + if nb_nongateway_nodes < self.replication_factor { return Err(Error::Message(format!( "The number of nodes with positive \ capacity ({}) is smaller than the replication factor ({}).", - nb_useful_nodes, self.replication_factor + nb_nongateway_nodes, self.replication_factor ))); } if id_to_zone.len() < self.parameters.zone_redundancy { @@ -457,6 +461,7 @@ impl ClusterLayout { partition_size )); } + //We write the partition size. self.partition_size = partition_size; if partition_size < 100 { @@ -467,14 +472,15 @@ impl ClusterLayout { ); } - //We compute a first flow/assignment that is heuristically close to the previous - //assignment - let mut gflow = self.compute_candidate_assignment(&zone_to_id, &old_assignation_opt)?; + //We compute a first flow/assignation that is heuristically close to the previous + //assignation + let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?; if let Some(assoc) = &old_assignation_opt { - //We minimize the distance to the previous assignment. + //We minimize the distance to the previous assignation. self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; } + //We display statistics of the computation msg.append(&mut self.output_stat( &gflow, &old_assignation_opt, @@ -538,14 +544,13 @@ impl ClusterLayout { // (2) We retrieve the old association //We rewrite the old association with the new indices. We only consider partition //to node assignations where the node is still in use. - let nb_partitions = 1usize << PARTITION_BITS; - let mut old_assignation = vec![Vec::<usize>::new(); nb_partitions]; + let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS]; if self.ring_assignation_data.is_empty() { //This is a new association return Ok(None); } - if self.ring_assignation_data.len() != nb_partitions * self.replication_factor { + if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { return Err(Error::Message( "The old assignation does not have a size corresponding to \ the old replication factor or the number of partitions." @@ -580,11 +585,11 @@ impl ClusterLayout { ///This function generates ids for the zone of the nodes appearing in ///self.node_id_vec. - fn generate_useful_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> { + fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> { let mut id_to_zone = Vec::<String>::new(); let mut zone_to_id = HashMap::<String, usize>::new(); - for uuid in self.useful_nodes().iter() { + for uuid in self.nongateway_nodes().iter() { if self.roles.get(uuid) == None { return Err(Error::Message( "The uuid was not found in the node roles (this should \ @@ -603,17 +608,16 @@ impl ClusterLayout { } ///This function computes by dichotomy the largest realizable partition size, given - ///the layout. + ///the layout roles and parameters. fn compute_optimal_partition_size( &self, zone_to_id: &HashMap<String, usize>, ) -> Result<u32, Error> { - let nb_partitions = 1usize << PARTITION_BITS; let empty_set = HashSet::<(usize, usize)>::new(); let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; g.compute_maximal_flow()?; if g.get_flow_value()? - < (nb_partitions * self.replication_factor) + < (NB_PARTITIONS * self.replication_factor) .try_into() .unwrap() { @@ -630,7 +634,7 @@ impl ClusterLayout { g = self.generate_flow_graph((s_down + s_up) / 2, zone_to_id, &empty_set)?; g.compute_maximal_flow()?; if g.get_flow_value()? - < (nb_partitions * self.replication_factor) + < (NB_PARTITIONS * self.replication_factor) .try_into() .unwrap() { @@ -658,14 +662,21 @@ impl ClusterLayout { vertices } + ///Generates the graph to compute the maximal flow corresponding to the optimal + ///partition assignation. + ///exclude_assoc is the set of (partition, node) association that we are forbidden + ///to use (hence we do not add the corresponding edge to the graph). This parameter + ///is used to compute a first flow that uses only edges appearing in the previous + ///assignation. This produces a solution that heuristically should be close to the + ///previous one. fn generate_flow_graph( &self, - size: u32, + partition_size: u32, zone_to_id: &HashMap<String, usize>, exclude_assoc: &HashSet<(usize, usize)>, ) -> Result<Graph<FlowEdge>, Error> { let vertices = - ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.useful_nodes().len()); + ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); let mut g = Graph::<FlowEdge>::new(&vertices); let nb_zones = zone_to_id.len(); let redundancy = self.parameters.zone_redundancy; @@ -685,10 +696,10 @@ impl ClusterLayout { )?; } } - for n in 0..self.useful_nodes().len() { + for n in 0..self.nongateway_nodes().len() { let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?]; - g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / size)?; + g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; for p in 0..NB_PARTITIONS { if !exclude_assoc.contains(&(p, n)) { g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?; @@ -698,28 +709,34 @@ impl ClusterLayout { Ok(g) } - fn compute_candidate_assignment( + ///This function computes a first optimal assignation (in the form of a flow graph). + fn compute_candidate_assignation( &self, zone_to_id: &HashMap<String, usize>, - old_assoc_opt: &Option<Vec<Vec<usize>>>, + prev_assign_opt: &Option<Vec<Vec<usize>>>, ) -> Result<Graph<FlowEdge>, Error> { - //We list the edges that are not used in the old association + //We list the (partition,node) associations that are not used in the + //previous assignation let mut exclude_edge = HashSet::<(usize, usize)>::new(); - if let Some(old_assoc) = old_assoc_opt { - let nb_nodes = self.useful_nodes().len(); - for (p, old_assoc_p) in old_assoc.iter().enumerate() { + if let Some(prev_assign) = prev_assign_opt { + let nb_nodes = self.nongateway_nodes().len(); + for (p, prev_assign_p) in prev_assign.iter().enumerate() { for n in 0..nb_nodes { exclude_edge.insert((p, n)); } - for n in old_assoc_p.iter() { + for n in prev_assign_p.iter() { exclude_edge.remove(&(p, *n)); } } } - //We compute the best flow using only the edges used in the old assoc + //We compute the best flow using only the edges used in the previous assignation let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?; g.compute_maximal_flow()?; + + //We add the excluded edges and compute the maximal flow with the full graph. + //The algorithm is such that it will start with the flow that we just computed + //and find ameliorating paths from that. for (p, n) in exclude_edge.iter() { let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; @@ -728,26 +745,35 @@ impl ClusterLayout { Ok(g) } + ///This function updates the flow graph gflow to minimize the distance between + ///its corresponding assignation and the previous one fn minimize_rebalance_load( &self, gflow: &mut Graph<FlowEdge>, zone_to_id: &HashMap<String, usize>, - old_assoc: &[Vec<usize>], + prev_assign: &[Vec<usize>], ) -> Result<(), Error> { + //We define a cost function on the edges (pairs of vertices) corresponding + //to the distance between the two assignations. let mut cost = CostFunction::new(); - for (p, assoc_p) in old_assoc.iter().enumerate() { + for (p, assoc_p) in prev_assign.iter().enumerate() { for n in assoc_p.iter() { let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1); } } - let nb_nodes = self.useful_nodes().len(); + + //We compute the maximal length of a simple path in gflow. It is used in the + //Bellman-Ford algorithm in optimize_flow_with_cost to set the number + //of iterations. + let nb_nodes = self.nongateway_nodes().len(); let path_length = 4 * nb_nodes; gflow.optimize_flow_with_cost(&cost, path_length)?; Ok(()) } + ///This function updates the assignation ring from the flow graph. fn update_ring_from_flow( &mut self, nb_zones: usize, @@ -775,19 +801,18 @@ impl ClusterLayout { Ok(()) } - //This function returns a message summing up the partition repartition of the new - //layout. + ///This function returns a message summing up the partition repartition of the new + ///layout, and other statistics of the partition assignation computation. fn output_stat( &self, gflow: &Graph<FlowEdge>, - old_assoc_opt: &Option<Vec<Vec<usize>>>, + prev_assign_opt: &Option<Vec<Vec<usize>>>, zone_to_id: &HashMap<String, usize>, id_to_zone: &[String], ) -> Result<Message, Error> { let mut msg = Message::new(); - let nb_partitions = 1usize << PARTITION_BITS; - let used_cap = self.partition_size * nb_partitions as u32 * self.replication_factor as u32; + let used_cap = self.partition_size * NB_PARTITIONS as u32 * self.replication_factor as u32; let total_cap = self.get_total_capacity()?; let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); msg.push("".into()); @@ -813,21 +838,21 @@ impl ClusterLayout { )); //We define and fill in the following tables - let storing_nodes = self.useful_nodes(); + let storing_nodes = self.nongateway_nodes(); let mut new_partitions = vec![0; storing_nodes.len()]; let mut stored_partitions = vec![0; storing_nodes.len()]; let mut new_partitions_zone = vec![0; id_to_zone.len()]; let mut stored_partitions_zone = vec![0; id_to_zone.len()]; - for p in 0..nb_partitions { + for p in 0..NB_PARTITIONS { for z in 0..id_to_zone.len() { let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; if !pz_nodes.is_empty() { stored_partitions_zone[z] += 1; - if let Some(old_assoc) = old_assoc_opt { + if let Some(prev_assign) = prev_assign_opt { let mut old_zones_of_p = Vec::<usize>::new(); - for n in old_assoc[p].iter() { + for n in prev_assign[p].iter() { old_zones_of_p .push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]); } @@ -839,8 +864,8 @@ impl ClusterLayout { for vert in pz_nodes.iter() { if let Vertex::N(n) = *vert { stored_partitions[n] += 1; - if let Some(old_assoc) = old_assoc_opt { - if !old_assoc[p].contains(&n) { + if let Some(prev_assign) = prev_assign_opt { + if !prev_assign[p].contains(&n) { new_partitions[n] += 1; } } @@ -849,7 +874,7 @@ impl ClusterLayout { } } - if *old_assoc_opt == None { + if *prev_assign_opt == None { new_partitions = stored_partitions.clone(); new_partitions_zone = stored_partitions_zone.clone(); } @@ -857,7 +882,7 @@ impl ClusterLayout { //We display the statistics msg.push("".into()); - if *old_assoc_opt != None { + if *prev_assign_opt != None { let total_new_partitions: usize = new_partitions.iter().sum(); msg.push(format!( "A total of {} new copies of partitions need to be \ @@ -950,9 +975,8 @@ mod tests { fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> { let over_size = cl.partition_size + 1; let mut zone_token = HashMap::<String, usize>::new(); - let nb_partitions = 1usize << PARTITION_BITS; - let (zones, zone_to_id) = cl.generate_useful_zone_ids()?; + let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?; if zones.is_empty() { return Ok(false); @@ -961,12 +985,12 @@ mod tests { for z in zones.iter() { zone_token.insert(z.clone(), 0); } - for uuid in cl.useful_nodes().iter() { + for uuid in cl.nongateway_nodes().iter() { let z = cl.get_node_zone(uuid)?; let c = cl.get_node_capacity(uuid)?; zone_token.insert( z.clone(), - zone_token[&z] + min(nb_partitions, (c / over_size) as usize), + zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize), ); } @@ -978,15 +1002,15 @@ mod tests { id_zone_token[zone_to_id[z]] = *t; } - let mut nb_token = vec![0; nb_partitions]; - let mut last_zone = vec![zones.len(); nb_partitions]; + let mut nb_token = vec![0; NB_PARTITIONS]; + let mut last_zone = vec![zones.len(); NB_PARTITIONS]; let mut curr_zone = 0; let redundancy = cl.parameters.zone_redundancy; for replic in 0..cl.replication_factor { - for p in 0..nb_partitions { + for p in 0..NB_PARTITIONS { while id_zone_token[curr_zone] == 0 || (last_zone[p] == curr_zone && redundancy - nb_token[p] <= cl.replication_factor - replic) |