aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout.rs')
-rw-r--r--src/rpc/layout.rs247
1 files changed, 125 insertions, 122 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index 38e56b88..95f69dc8 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -19,7 +19,7 @@ use std::convert::TryInto;
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
-//The Message type will be used to collect information on the algorithm.
+// The Message type will be used to collect information on the algorithm.
type Message = Vec<String>;
/// The layout of the cluster, i.e. the list of roles
@@ -30,11 +30,11 @@ pub struct ClusterLayout {
pub replication_factor: usize,
- ///This attribute is only used to retain the previously computed partition size,
- ///to know to what extent does it change with the layout update.
+ /// This attribute is only used to retain the previously computed partition size,
+ /// to know to what extent does it change with the layout update.
pub partition_size: u32,
- ///Parameters used to compute the assignation currently given by
- ///ring_assignation_data
+ /// Parameters used to compute the assignation currently given by
+ /// ring_assignation_data
pub parameters: LayoutParameters,
pub roles: LwwMap<Uuid, NodeRoleV>,
@@ -53,14 +53,14 @@ pub struct ClusterLayout {
pub ring_assignation_data: Vec<CompactNodeType>,
/// Parameters to be used in the next partition assignation computation.
- pub staged_parameters: Lww<LayoutParameters>,
+ pub staging_parameters: Lww<LayoutParameters>,
/// Role changes which are staged for the next version of the layout
- pub staging: LwwMap<Uuid, NodeRoleV>,
+ pub staging_roles: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
-///This struct is used to set the parameters to be used in the assignation computation
-///algorithm. It is stored as a Crdt.
+/// This struct is used to set the parameters to be used in the assignation computation
+/// algorithm. It is stored as a Crdt.
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct LayoutParameters {
pub zone_redundancy: usize,
@@ -114,20 +114,19 @@ impl NodeRole {
}
}
-//Implementation of the ClusterLayout methods unrelated to the assignation algorithm.
+// Implementation of the ClusterLayout methods unrelated to the assignation algorithm.
impl ClusterLayout {
pub fn new(replication_factor: usize) -> Self {
- //We set the default zone redundancy to be equal to the replication factor,
- //i.e. as strict as possible.
+ // We set the default zone redundancy to be equal to the replication factor,
+ // i.e. as strict as possible.
let parameters = LayoutParameters {
zone_redundancy: replication_factor,
};
- let staged_parameters = Lww::<LayoutParameters>::new(parameters.clone());
+ let staging_parameters = Lww::<LayoutParameters>::new(parameters.clone());
let empty_lwwmap = LwwMap::new();
- let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
- ClusterLayout {
+ let mut ret = ClusterLayout {
version: 0,
replication_factor,
partition_size: 0,
@@ -135,10 +134,17 @@ impl ClusterLayout {
node_id_vec: Vec::new(),
ring_assignation_data: Vec::new(),
parameters,
- staged_parameters,
- staging: empty_lwwmap,
- staging_hash: empty_lwwmap_hash,
- }
+ staging_parameters,
+ staging_roles: empty_lwwmap,
+ staging_hash: [0u8; 32].into(),
+ };
+ ret.staging_hash = ret.calculate_staging_hash();
+ ret
+ }
+
+ fn calculate_staging_hash(&self) -> Hash {
+ let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
+ blake2sum(&rmp_to_vec_all_named(&hashed_tuple).unwrap()[..])
}
pub fn merge(&mut self, other: &ClusterLayout) -> bool {
@@ -148,16 +154,15 @@ impl ClusterLayout {
true
}
Ordering::Equal => {
- let param_changed = self.staged_parameters.get() != other.staged_parameters.get();
- self.staged_parameters.merge(&other.staged_parameters);
- self.staging.merge(&other.staging);
+ self.staging_parameters.merge(&other.staging_parameters);
+ self.staging_roles.merge(&other.staging_roles);
- let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
- let stage_changed = new_staging_hash != self.staging_hash;
+ let new_staging_hash = self.calculate_staging_hash();
+ let changed = new_staging_hash != self.staging_hash;
self.staging_hash = new_staging_hash;
- stage_changed || param_changed
+ changed
}
Ordering::Less => false,
}
@@ -179,13 +184,14 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- self.roles.merge(&self.staging);
+ self.roles.merge(&self.staging_roles);
self.roles.retain(|(_, _, v)| v.0.is_some());
+ self.parameters = self.staging_parameters.get().clone();
let msg = self.calculate_partition_assignation()?;
- self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ self.staging_roles.clear();
+ self.staging_hash = self.calculate_staging_hash();
self.version += 1;
@@ -208,9 +214,9 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
- self.staged_parameters.update(self.parameters.clone());
+ self.staging_roles.clear();
+ self.staging_hash = self.calculate_staging_hash();
+ self.staging_parameters.update(self.parameters.clone());
self.version += 1;
@@ -235,7 +241,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- ///Returns the uuids of the non_gateway nodes in self.node_id_vec.
+ /// Returns the uuids of the non_gateway nodes in self.node_id_vec.
pub fn nongateway_nodes(&self) -> Vec<Uuid> {
let mut result = Vec::<Uuid>::new();
for uuid in self.node_id_vec.iter() {
@@ -247,7 +253,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
result
}
- ///Given a node uuids, this function returns the label of its zone
+ /// Given a node uuids, this function returns the label of its zone
pub fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> {
match self.node_role(uuid) {
Some(role) => Ok(role.zone.clone()),
@@ -257,7 +263,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- ///Given a node uuids, this function returns its capacity or fails if it does not have any
+ /// Given a node uuids, this function returns its capacity or fails if it does not have any
pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u32, Error> {
match self.node_role(uuid) {
Some(NodeRole {
@@ -273,7 +279,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- ///Returns the number of partitions associated to this node in the ring
+ /// Returns the number of partitions associated to this node in the ring
pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> {
for (i, id) in self.node_id_vec.iter().enumerate() {
if id == uuid {
@@ -293,7 +299,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
))
}
- ///Returns the sum of capacities of non gateway nodes in the cluster
+ /// Returns the sum of capacities of non gateway nodes in the cluster
pub fn get_total_capacity(&self) -> Result<u32, Error> {
let mut total_capacity = 0;
for uuid in self.nongateway_nodes().iter() {
@@ -307,7 +313,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
/// returns true if consistent, false if error
pub fn check(&self) -> bool {
// Check that the hash of the staging data is correct
- let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let staging_hash = self.calculate_staging_hash();
if staging_hash != self.staging_hash {
return false;
}
@@ -346,14 +352,14 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- //Check that every partition is associated to distinct nodes
+ // Check that every partition is associated to distinct nodes
let rf = self.replication_factor;
for p in 0..(1 << PARTITION_BITS) {
let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec();
if nodes_of_p.iter().unique().count() != rf {
return false;
}
- //Check that every partition is spread over at least zone_redundancy zones.
+ // Check that every partition is spread over at least zone_redundancy zones.
let zones_of_p = nodes_of_p.iter().map(|n| {
self.get_node_zone(&self.node_id_vec[*n as usize])
.expect("Zone not found.")
@@ -364,7 +370,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- //Check that the nodes capacities is consistent with the stored partitions
+ // Check that the nodes capacities is consistent with the stored partitions
let mut node_usage = vec![0; MAX_NODE_NUMBER];
for n in self.ring_assignation_data.iter() {
node_usage[*n as usize] += 1;
@@ -380,8 +386,8 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- //Check that the partition size stored is the one computed by the asignation
- //algorithm.
+ // Check that the partition size stored is the one computed by the asignation
+ // algorithm.
let cl2 = self.clone();
let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().expect("Critical Error");
match cl2.compute_optimal_partition_size(&zone_to_id) {
@@ -394,7 +400,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
-//Implementation of the ClusterLayout methods related to the assignation algorithm.
+// Implementation of the ClusterLayout methods related to the assignation algorithm.
impl ClusterLayout {
/// This function calculates a new partition-to-node assignation.
/// The computed assignation respects the node replication factor
@@ -403,16 +409,13 @@ impl ClusterLayout {
/// Among such optimal assignation, it minimizes the distance to
/// the former assignation (if any) to minimize the amount of
/// data to be moved.
- // Staged role changes must be merged with nodes roles before calling this function,
- // hence it must only be called from apply_staged_changes() and hence is not public.
+ /// Staged role changes must be merged with nodes roles before calling this function,
+ /// hence it must only be called from apply_staged_changes() and hence is not public.
fn calculate_partition_assignation(&mut self) -> Result<Message, Error> {
- //We update the node ids, since the node role list might have changed with the
- //changes in the layout. We retrieve the old_assignation reframed with new ids
+ // We update the node ids, since the node role list might have changed with the
+ // changes in the layout. We retrieve the old_assignation reframed with new ids
let old_assignation_opt = self.update_node_id_vec()?;
- //We update the parameters
- self.parameters = self.staged_parameters.get().clone();
-
let mut msg = Message::new();
msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into());
msg.push("".into());
@@ -422,8 +425,8 @@ impl ClusterLayout {
self.replication_factor, self.parameters.zone_redundancy
));
- //We generate for once numerical ids for the zones of non gateway nodes,
- //to use them as indices in the flow graphs.
+ // We generate for once numerical ids for the zones of non gateway nodes,
+ // to use them as indices in the flow graphs.
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
let nb_nongateway_nodes = self.nongateway_nodes().len();
@@ -443,10 +446,10 @@ impl ClusterLayout {
)));
}
- //We compute the optimal partition size
- //Capacities should be given in a unit so that partition size is at least 100.
- //In this case, integer rounding plays a marginal role in the percentages of
- //optimality.
+ // We compute the optimal partition size
+ // Capacities should be given in a unit so that partition size is at least 100.
+ // In this case, integer rounding plays a marginal role in the percentages of
+ // optimality.
let partition_size = self.compute_optimal_partition_size(&zone_to_id)?;
if old_assignation_opt != None {
@@ -461,7 +464,7 @@ impl ClusterLayout {
partition_size
));
}
- //We write the partition size.
+ // We write the partition size.
self.partition_size = partition_size;
if partition_size < 100 {
@@ -472,15 +475,15 @@ impl ClusterLayout {
);
}
- //We compute a first flow/assignation that is heuristically close to the previous
- //assignation
+ // We compute a first flow/assignation that is heuristically close to the previous
+ // assignation
let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?;
if let Some(assoc) = &old_assignation_opt {
- //We minimize the distance to the previous assignation.
+ // We minimize the distance to the previous assignation.
self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?;
}
- //We display statistics of the computation
+ // We display statistics of the computation
msg.append(&mut self.output_stat(
&gflow,
&old_assignation_opt,
@@ -489,7 +492,7 @@ impl ClusterLayout {
)?);
msg.push("".to_string());
- //We update the layout structure
+ // We update the layout structure
self.update_ring_from_flow(id_to_zone.len(), &gflow)?;
if !self.check() {
@@ -508,8 +511,8 @@ impl ClusterLayout {
/// do modify assignation_ring and node_id_vec.
fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> {
// (1) We compute the new node list
- //Non gateway nodes should be coded on 8bits, hence they must be first in the list
- //We build the new node ids
+ // Non gateway nodes should be coded on 8bits, hence they must be first in the list
+ // We build the new node ids
let mut new_non_gateway_nodes: Vec<Uuid> = self
.roles
.items()
@@ -542,12 +545,12 @@ impl ClusterLayout {
self.node_id_vec = new_node_id_vec.clone();
// (2) We retrieve the old association
- //We rewrite the old association with the new indices. We only consider partition
- //to node assignations where the node is still in use.
+ // We rewrite the old association with the new indices. We only consider partition
+ // to node assignations where the node is still in use.
let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS];
if self.ring_assignation_data.is_empty() {
- //This is a new association
+ // This is a new association
return Ok(None);
}
if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor {
@@ -558,11 +561,11 @@ impl ClusterLayout {
));
}
- //We build a translation table between the uuid and new ids
+ // We build a translation table between the uuid and new ids
let mut uuid_to_new_id = HashMap::<Uuid, usize>::new();
- //We add the indices of only the new non-gateway nodes that can be used in the
- //association ring
+ // We add the indices of only the new non-gateway nodes that can be used in the
+ // association ring
for (i, uuid) in new_node_id_vec.iter().enumerate() {
uuid_to_new_id.insert(*uuid, i);
}
@@ -577,14 +580,14 @@ impl ClusterLayout {
}
}
- //We write the ring
+ // We write the ring
self.ring_assignation_data = Vec::<CompactNodeType>::new();
Ok(Some(old_assignation))
}
- ///This function generates ids for the zone of the nodes appearing in
- ///self.node_id_vec.
+ /// This function generates ids for the zone of the nodes appearing in
+ /// self.node_id_vec.
fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
let mut id_to_zone = Vec::<String>::new();
let mut zone_to_id = HashMap::<String, usize>::new();
@@ -607,8 +610,8 @@ impl ClusterLayout {
Ok((id_to_zone, zone_to_id))
}
- ///This function computes by dichotomy the largest realizable partition size, given
- ///the layout roles and parameters.
+ /// This function computes by dichotomy the largest realizable partition size, given
+ /// the layout roles and parameters.
fn compute_optimal_partition_size(
&self,
zone_to_id: &HashMap<String, usize>,
@@ -662,13 +665,13 @@ impl ClusterLayout {
vertices
}
- ///Generates the graph to compute the maximal flow corresponding to the optimal
- ///partition assignation.
- ///exclude_assoc is the set of (partition, node) association that we are forbidden
- ///to use (hence we do not add the corresponding edge to the graph). This parameter
- ///is used to compute a first flow that uses only edges appearing in the previous
- ///assignation. This produces a solution that heuristically should be close to the
- ///previous one.
+ /// Generates the graph to compute the maximal flow corresponding to the optimal
+ /// partition assignation.
+ /// exclude_assoc is the set of (partition, node) association that we are forbidden
+ /// to use (hence we do not add the corresponding edge to the graph). This parameter
+ /// is used to compute a first flow that uses only edges appearing in the previous
+ /// assignation. This produces a solution that heuristically should be close to the
+ /// previous one.
fn generate_flow_graph(
&self,
partition_size: u32,
@@ -709,14 +712,14 @@ impl ClusterLayout {
Ok(g)
}
- ///This function computes a first optimal assignation (in the form of a flow graph).
+ /// This function computes a first optimal assignation (in the form of a flow graph).
fn compute_candidate_assignation(
&self,
zone_to_id: &HashMap<String, usize>,
prev_assign_opt: &Option<Vec<Vec<usize>>>,
) -> Result<Graph<FlowEdge>, Error> {
- //We list the (partition,node) associations that are not used in the
- //previous assignation
+ // We list the (partition,node) associations that are not used in the
+ // previous assignation
let mut exclude_edge = HashSet::<(usize, usize)>::new();
if let Some(prev_assign) = prev_assign_opt {
let nb_nodes = self.nongateway_nodes().len();
@@ -730,13 +733,13 @@ impl ClusterLayout {
}
}
- //We compute the best flow using only the edges used in the previous assignation
+ // We compute the best flow using only the edges used in the previous assignation
let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?;
g.compute_maximal_flow()?;
- //We add the excluded edges and compute the maximal flow with the full graph.
- //The algorithm is such that it will start with the flow that we just computed
- //and find ameliorating paths from that.
+ // We add the excluded edges and compute the maximal flow with the full graph.
+ // The algorithm is such that it will start with the flow that we just computed
+ // and find ameliorating paths from that.
for (p, n) in exclude_edge.iter() {
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
@@ -745,16 +748,16 @@ impl ClusterLayout {
Ok(g)
}
- ///This function updates the flow graph gflow to minimize the distance between
- ///its corresponding assignation and the previous one
+ /// This function updates the flow graph gflow to minimize the distance between
+ /// its corresponding assignation and the previous one
fn minimize_rebalance_load(
&self,
gflow: &mut Graph<FlowEdge>,
zone_to_id: &HashMap<String, usize>,
prev_assign: &[Vec<usize>],
) -> Result<(), Error> {
- //We define a cost function on the edges (pairs of vertices) corresponding
- //to the distance between the two assignations.
+ // We define a cost function on the edges (pairs of vertices) corresponding
+ // to the distance between the two assignations.
let mut cost = CostFunction::new();
for (p, assoc_p) in prev_assign.iter().enumerate() {
for n in assoc_p.iter() {
@@ -763,9 +766,9 @@ impl ClusterLayout {
}
}
- //We compute the maximal length of a simple path in gflow. It is used in the
- //Bellman-Ford algorithm in optimize_flow_with_cost to set the number
- //of iterations.
+ // We compute the maximal length of a simple path in gflow. It is used in the
+ // Bellman-Ford algorithm in optimize_flow_with_cost to set the number
+ // of iterations.
let nb_nodes = self.nongateway_nodes().len();
let path_length = 4 * nb_nodes;
gflow.optimize_flow_with_cost(&cost, path_length)?;
@@ -773,7 +776,7 @@ impl ClusterLayout {
Ok(())
}
- ///This function updates the assignation ring from the flow graph.
+ /// This function updates the assignation ring from the flow graph.
fn update_ring_from_flow(
&mut self,
nb_zones: usize,
@@ -801,8 +804,8 @@ impl ClusterLayout {
Ok(())
}
- ///This function returns a message summing up the partition repartition of the new
- ///layout, and other statistics of the partition assignation computation.
+ /// This function returns a message summing up the partition repartition of the new
+ /// layout, and other statistics of the partition assignation computation.
fn output_stat(
&self,
gflow: &Graph<FlowEdge>,
@@ -837,7 +840,7 @@ impl ClusterLayout {
used_cap / self.replication_factor as u32
));
- //We define and fill in the following tables
+ // We define and fill in the following tables
let storing_nodes = self.nongateway_nodes();
let mut new_partitions = vec![0; storing_nodes.len()];
let mut stored_partitions = vec![0; storing_nodes.len()];
@@ -879,7 +882,7 @@ impl ClusterLayout {
new_partitions_zone = stored_partitions_zone.clone();
}
- //We display the statistics
+ // We display the statistics
msg.push("".into());
if *prev_assign_opt != None {
@@ -951,27 +954,27 @@ impl ClusterLayout {
}
}
-//====================================================================================
+// ====================================================================================
#[cfg(test)]
mod tests {
use super::{Error, *};
use std::cmp::min;
- //This function checks that the partition size S computed is at least better than the
- //one given by a very naive algorithm. To do so, we try to run the naive algorithm
- //assuming a partion size of S+1. If we succed, it means that the optimal assignation
- //was not optimal. The naive algorithm is the following :
- //- we compute the max number of partitions associated to every node, capped at the
- //partition number. It gives the number of tokens of every node.
- //- every zone has a number of tokens equal to the sum of the tokens of its nodes.
- //- we cycle over the partitions and associate zone tokens while respecting the
- //zone redundancy constraint.
- //NOTE: the naive algorithm is not optimal. Counter example:
- //take nb_partition = 3 ; replication_factor = 5; redundancy = 4;
- //number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
- //With these parameters, the naive algo fails, whereas there is a solution:
- //(A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
+ // This function checks that the partition size S computed is at least better than the
+ // one given by a very naive algorithm. To do so, we try to run the naive algorithm
+ // assuming a partion size of S+1. If we succed, it means that the optimal assignation
+ // was not optimal. The naive algorithm is the following :
+ // - we compute the max number of partitions associated to every node, capped at the
+ // partition number. It gives the number of tokens of every node.
+ // - every zone has a number of tokens equal to the sum of the tokens of its nodes.
+ // - we cycle over the partitions and associate zone tokens while respecting the
+ // zone redundancy constraint.
+ // NOTE: the naive algorithm is not optimal. Counter example:
+ // take nb_partition = 3 ; replication_factor = 5; redundancy = 4;
+ // number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
+ // With these parameters, the naive algo fails, whereas there is a solution:
+ // (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
let over_size = cl.partition_size + 1;
let mut zone_token = HashMap::<String, usize>::new();
@@ -994,8 +997,8 @@ mod tests {
);
}
- //For every partition, we count the number of zone already associated and
- //the name of the last zone associated
+ // For every partition, we count the number of zone already associated and
+ // the name of the last zone associated
let mut id_zone_token = vec![0; zones.len()];
for (z, t) in zone_token.iter() {
@@ -1049,7 +1052,7 @@ mod tests {
cl.node_id_vec.push(x);
}
- let update = cl.staging.update_mutator(
+ let update = cl.staging_roles.update_mutator(
cl.node_id_vec[i],
NodeRoleV(Some(NodeRole {
zone: (node_zone_vec[i].to_string()),
@@ -1057,10 +1060,10 @@ mod tests {
tags: (vec![]),
})),
);
- cl.staging.merge(&update);
+ cl.staging_roles.merge(&update);
}
- cl.staging_hash = blake2sum(&rmp_to_vec_all_named(&cl.staging).unwrap()[..]);
- cl.staged_parameters
+ cl.staging_hash = cl.calculate_staging_hash();
+ cl.staging_parameters
.update(LayoutParameters { zone_redundancy });
}