aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout.rs')
-rw-r--r--src/rpc/layout.rs162
1 files changed, 93 insertions, 69 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index d2ed8af8..38e56b88 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -17,6 +17,8 @@ use crate::ring::*;
use std::convert::TryInto;
+const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
+
//The Message type will be used to collect information on the algorithm.
type Message = Vec<String>;
@@ -28,9 +30,11 @@ pub struct ClusterLayout {
pub replication_factor: usize,
- //This attribute is only used to retain the previously computed partition size,
- //to know to what extent does it change with the layout update.
+ ///This attribute is only used to retain the previously computed partition size,
+ ///to know to what extent does it change with the layout update.
pub partition_size: u32,
+ ///Parameters used to compute the assignation currently given by
+ ///ring_assignation_data
pub parameters: LayoutParameters,
pub roles: LwwMap<Uuid, NodeRoleV>,
@@ -48,8 +52,9 @@ pub struct ClusterLayout {
#[serde(with = "serde_bytes")]
pub ring_assignation_data: Vec<CompactNodeType>,
- /// Role changes which are staged for the next version of the layout
+ /// Parameters to be used in the next partition assignation computation.
pub staged_parameters: Lww<LayoutParameters>,
+ /// Role changes which are staged for the next version of the layout
pub staging: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
@@ -65,8 +70,6 @@ impl AutoCrdt for LayoutParameters {
const WARN_IF_DIFFERENT: bool = true;
}
-const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
-
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
@@ -77,12 +80,13 @@ impl AutoCrdt for NodeRoleV {
/// The user-assigned roles of cluster nodes
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRole {
- /// Datacenter at which this entry belong. This information might be used to perform a better
- /// geodistribution
+ /// Datacenter at which this entry belong. This information is used to
+ /// perform a better geodistribution
pub zone: String,
- /// The (relative) capacity of the node
+ /// The capacity of the node
/// If this is set to None, the node does not participate in storing data for the system
/// and is only active as an API gateway to other nodes
+ // TODO : change the capacity to u64 and use byte unit input/output
pub capacity: Option<u32>,
/// A set of tags to recognize the node
pub tags: Vec<String>,
@@ -110,6 +114,7 @@ impl NodeRole {
}
}
+//Implementation of the ClusterLayout methods unrelated to the assignation algorithm.
impl ClusterLayout {
pub fn new(replication_factor: usize) -> Self {
//We set the default zone redundancy to be equal to the replication factor,
@@ -231,7 +236,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
///Returns the uuids of the non_gateway nodes in self.node_id_vec.
- pub fn useful_nodes(&self) -> Vec<Uuid> {
+ pub fn nongateway_nodes(&self) -> Vec<Uuid> {
let mut result = Vec::<Uuid>::new();
for uuid in self.node_id_vec.iter() {
match self.node_role(uuid) {
@@ -291,13 +296,14 @@ To know the correct value of the new layout version, invoke `garage layout show`
///Returns the sum of capacities of non gateway nodes in the cluster
pub fn get_total_capacity(&self) -> Result<u32, Error> {
let mut total_capacity = 0;
- for uuid in self.useful_nodes().iter() {
+ for uuid in self.nongateway_nodes().iter() {
total_capacity += self.get_node_capacity(uuid)?;
}
Ok(total_capacity)
}
/// Check a cluster layout for internal consistency
+ /// (assignation, roles, parameters, partition size)
/// returns true if consistent, false if error
pub fn check(&self) -> bool {
// Check that the hash of the staging data is correct
@@ -377,7 +383,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
//Check that the partition size stored is the one computed by the asignation
//algorithm.
let cl2 = self.clone();
- let (_, zone_to_id) = cl2.generate_useful_zone_ids().expect("Critical Error");
+ let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().expect("Critical Error");
match cl2.compute_optimal_partition_size(&zone_to_id) {
Ok(s) if s != self.partition_size => return false,
Err(_) => return false,
@@ -388,6 +394,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
+//Implementation of the ClusterLayout methods related to the assignation algorithm.
impl ClusterLayout {
/// This function calculates a new partition-to-node assignation.
/// The computed assignation respects the node replication factor
@@ -397,16 +404,13 @@ impl ClusterLayout {
/// the former assignation (if any) to minimize the amount of
/// data to be moved.
// Staged role changes must be merged with nodes roles before calling this function,
- // hence it must only be called from apply_staged_changes() and it is not public.
+ // hence it must only be called from apply_staged_changes() and hence is not public.
fn calculate_partition_assignation(&mut self) -> Result<Message, Error> {
- //The nodes might have been updated, some might have been deleted.
- //So we need to first update the list of nodes and retrieve the
- //assignation.
-
//We update the node ids, since the node role list might have changed with the
- //changes in the layout. We retrieve the old_assignation reframed with the new ids
+ //changes in the layout. We retrieve the old_assignation reframed with new ids
let old_assignation_opt = self.update_node_id_vec()?;
+ //We update the parameters
self.parameters = self.staged_parameters.get().clone();
let mut msg = Message::new();
@@ -420,14 +424,14 @@ impl ClusterLayout {
//We generate for once numerical ids for the zones of non gateway nodes,
//to use them as indices in the flow graphs.
- let (id_to_zone, zone_to_id) = self.generate_useful_zone_ids()?;
+ let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
- let nb_useful_nodes = self.useful_nodes().len();
- if nb_useful_nodes < self.replication_factor {
+ let nb_nongateway_nodes = self.nongateway_nodes().len();
+ if nb_nongateway_nodes < self.replication_factor {
return Err(Error::Message(format!(
"The number of nodes with positive \
capacity ({}) is smaller than the replication factor ({}).",
- nb_useful_nodes, self.replication_factor
+ nb_nongateway_nodes, self.replication_factor
)));
}
if id_to_zone.len() < self.parameters.zone_redundancy {
@@ -457,6 +461,7 @@ impl ClusterLayout {
partition_size
));
}
+ //We write the partition size.
self.partition_size = partition_size;
if partition_size < 100 {
@@ -467,14 +472,15 @@ impl ClusterLayout {
);
}
- //We compute a first flow/assignment that is heuristically close to the previous
- //assignment
- let mut gflow = self.compute_candidate_assignment(&zone_to_id, &old_assignation_opt)?;
+ //We compute a first flow/assignation that is heuristically close to the previous
+ //assignation
+ let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?;
if let Some(assoc) = &old_assignation_opt {
- //We minimize the distance to the previous assignment.
+ //We minimize the distance to the previous assignation.
self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?;
}
+ //We display statistics of the computation
msg.append(&mut self.output_stat(
&gflow,
&old_assignation_opt,
@@ -538,14 +544,13 @@ impl ClusterLayout {
// (2) We retrieve the old association
//We rewrite the old association with the new indices. We only consider partition
//to node assignations where the node is still in use.
- let nb_partitions = 1usize << PARTITION_BITS;
- let mut old_assignation = vec![Vec::<usize>::new(); nb_partitions];
+ let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS];
if self.ring_assignation_data.is_empty() {
//This is a new association
return Ok(None);
}
- if self.ring_assignation_data.len() != nb_partitions * self.replication_factor {
+ if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor {
return Err(Error::Message(
"The old assignation does not have a size corresponding to \
the old replication factor or the number of partitions."
@@ -580,11 +585,11 @@ impl ClusterLayout {
///This function generates ids for the zone of the nodes appearing in
///self.node_id_vec.
- fn generate_useful_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
+ fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
let mut id_to_zone = Vec::<String>::new();
let mut zone_to_id = HashMap::<String, usize>::new();
- for uuid in self.useful_nodes().iter() {
+ for uuid in self.nongateway_nodes().iter() {
if self.roles.get(uuid) == None {
return Err(Error::Message(
"The uuid was not found in the node roles (this should \
@@ -603,17 +608,16 @@ impl ClusterLayout {
}
///This function computes by dichotomy the largest realizable partition size, given
- ///the layout.
+ ///the layout roles and parameters.
fn compute_optimal_partition_size(
&self,
zone_to_id: &HashMap<String, usize>,
) -> Result<u32, Error> {
- let nb_partitions = 1usize << PARTITION_BITS;
let empty_set = HashSet::<(usize, usize)>::new();
let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?;
g.compute_maximal_flow()?;
if g.get_flow_value()?
- < (nb_partitions * self.replication_factor)
+ < (NB_PARTITIONS * self.replication_factor)
.try_into()
.unwrap()
{
@@ -630,7 +634,7 @@ impl ClusterLayout {
g = self.generate_flow_graph((s_down + s_up) / 2, zone_to_id, &empty_set)?;
g.compute_maximal_flow()?;
if g.get_flow_value()?
- < (nb_partitions * self.replication_factor)
+ < (NB_PARTITIONS * self.replication_factor)
.try_into()
.unwrap()
{
@@ -658,14 +662,21 @@ impl ClusterLayout {
vertices
}
+ ///Generates the graph to compute the maximal flow corresponding to the optimal
+ ///partition assignation.
+ ///exclude_assoc is the set of (partition, node) association that we are forbidden
+ ///to use (hence we do not add the corresponding edge to the graph). This parameter
+ ///is used to compute a first flow that uses only edges appearing in the previous
+ ///assignation. This produces a solution that heuristically should be close to the
+ ///previous one.
fn generate_flow_graph(
&self,
- size: u32,
+ partition_size: u32,
zone_to_id: &HashMap<String, usize>,
exclude_assoc: &HashSet<(usize, usize)>,
) -> Result<Graph<FlowEdge>, Error> {
let vertices =
- ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.useful_nodes().len());
+ ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
let mut g = Graph::<FlowEdge>::new(&vertices);
let nb_zones = zone_to_id.len();
let redundancy = self.parameters.zone_redundancy;
@@ -685,10 +696,10 @@ impl ClusterLayout {
)?;
}
}
- for n in 0..self.useful_nodes().len() {
+ for n in 0..self.nongateway_nodes().len() {
let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?];
- g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / size)?;
+ g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
for p in 0..NB_PARTITIONS {
if !exclude_assoc.contains(&(p, n)) {
g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?;
@@ -698,28 +709,34 @@ impl ClusterLayout {
Ok(g)
}
- fn compute_candidate_assignment(
+ ///This function computes a first optimal assignation (in the form of a flow graph).
+ fn compute_candidate_assignation(
&self,
zone_to_id: &HashMap<String, usize>,
- old_assoc_opt: &Option<Vec<Vec<usize>>>,
+ prev_assign_opt: &Option<Vec<Vec<usize>>>,
) -> Result<Graph<FlowEdge>, Error> {
- //We list the edges that are not used in the old association
+ //We list the (partition,node) associations that are not used in the
+ //previous assignation
let mut exclude_edge = HashSet::<(usize, usize)>::new();
- if let Some(old_assoc) = old_assoc_opt {
- let nb_nodes = self.useful_nodes().len();
- for (p, old_assoc_p) in old_assoc.iter().enumerate() {
+ if let Some(prev_assign) = prev_assign_opt {
+ let nb_nodes = self.nongateway_nodes().len();
+ for (p, prev_assign_p) in prev_assign.iter().enumerate() {
for n in 0..nb_nodes {
exclude_edge.insert((p, n));
}
- for n in old_assoc_p.iter() {
+ for n in prev_assign_p.iter() {
exclude_edge.remove(&(p, *n));
}
}
}
- //We compute the best flow using only the edges used in the old assoc
+ //We compute the best flow using only the edges used in the previous assignation
let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?;
g.compute_maximal_flow()?;
+
+ //We add the excluded edges and compute the maximal flow with the full graph.
+ //The algorithm is such that it will start with the flow that we just computed
+ //and find ameliorating paths from that.
for (p, n) in exclude_edge.iter() {
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
@@ -728,26 +745,35 @@ impl ClusterLayout {
Ok(g)
}
+ ///This function updates the flow graph gflow to minimize the distance between
+ ///its corresponding assignation and the previous one
fn minimize_rebalance_load(
&self,
gflow: &mut Graph<FlowEdge>,
zone_to_id: &HashMap<String, usize>,
- old_assoc: &[Vec<usize>],
+ prev_assign: &[Vec<usize>],
) -> Result<(), Error> {
+ //We define a cost function on the edges (pairs of vertices) corresponding
+ //to the distance between the two assignations.
let mut cost = CostFunction::new();
- for (p, assoc_p) in old_assoc.iter().enumerate() {
+ for (p, assoc_p) in prev_assign.iter().enumerate() {
for n in assoc_p.iter() {
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
}
}
- let nb_nodes = self.useful_nodes().len();
+
+ //We compute the maximal length of a simple path in gflow. It is used in the
+ //Bellman-Ford algorithm in optimize_flow_with_cost to set the number
+ //of iterations.
+ let nb_nodes = self.nongateway_nodes().len();
let path_length = 4 * nb_nodes;
gflow.optimize_flow_with_cost(&cost, path_length)?;
Ok(())
}
+ ///This function updates the assignation ring from the flow graph.
fn update_ring_from_flow(
&mut self,
nb_zones: usize,
@@ -775,19 +801,18 @@ impl ClusterLayout {
Ok(())
}
- //This function returns a message summing up the partition repartition of the new
- //layout.
+ ///This function returns a message summing up the partition repartition of the new
+ ///layout, and other statistics of the partition assignation computation.
fn output_stat(
&self,
gflow: &Graph<FlowEdge>,
- old_assoc_opt: &Option<Vec<Vec<usize>>>,
+ prev_assign_opt: &Option<Vec<Vec<usize>>>,
zone_to_id: &HashMap<String, usize>,
id_to_zone: &[String],
) -> Result<Message, Error> {
let mut msg = Message::new();
- let nb_partitions = 1usize << PARTITION_BITS;
- let used_cap = self.partition_size * nb_partitions as u32 * self.replication_factor as u32;
+ let used_cap = self.partition_size * NB_PARTITIONS as u32 * self.replication_factor as u32;
let total_cap = self.get_total_capacity()?;
let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32);
msg.push("".into());
@@ -813,21 +838,21 @@ impl ClusterLayout {
));
//We define and fill in the following tables
- let storing_nodes = self.useful_nodes();
+ let storing_nodes = self.nongateway_nodes();
let mut new_partitions = vec![0; storing_nodes.len()];
let mut stored_partitions = vec![0; storing_nodes.len()];
let mut new_partitions_zone = vec![0; id_to_zone.len()];
let mut stored_partitions_zone = vec![0; id_to_zone.len()];
- for p in 0..nb_partitions {
+ for p in 0..NB_PARTITIONS {
for z in 0..id_to_zone.len() {
let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?;
if !pz_nodes.is_empty() {
stored_partitions_zone[z] += 1;
- if let Some(old_assoc) = old_assoc_opt {
+ if let Some(prev_assign) = prev_assign_opt {
let mut old_zones_of_p = Vec::<usize>::new();
- for n in old_assoc[p].iter() {
+ for n in prev_assign[p].iter() {
old_zones_of_p
.push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]);
}
@@ -839,8 +864,8 @@ impl ClusterLayout {
for vert in pz_nodes.iter() {
if let Vertex::N(n) = *vert {
stored_partitions[n] += 1;
- if let Some(old_assoc) = old_assoc_opt {
- if !old_assoc[p].contains(&n) {
+ if let Some(prev_assign) = prev_assign_opt {
+ if !prev_assign[p].contains(&n) {
new_partitions[n] += 1;
}
}
@@ -849,7 +874,7 @@ impl ClusterLayout {
}
}
- if *old_assoc_opt == None {
+ if *prev_assign_opt == None {
new_partitions = stored_partitions.clone();
new_partitions_zone = stored_partitions_zone.clone();
}
@@ -857,7 +882,7 @@ impl ClusterLayout {
//We display the statistics
msg.push("".into());
- if *old_assoc_opt != None {
+ if *prev_assign_opt != None {
let total_new_partitions: usize = new_partitions.iter().sum();
msg.push(format!(
"A total of {} new copies of partitions need to be \
@@ -950,9 +975,8 @@ mod tests {
fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
let over_size = cl.partition_size + 1;
let mut zone_token = HashMap::<String, usize>::new();
- let nb_partitions = 1usize << PARTITION_BITS;
- let (zones, zone_to_id) = cl.generate_useful_zone_ids()?;
+ let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?;
if zones.is_empty() {
return Ok(false);
@@ -961,12 +985,12 @@ mod tests {
for z in zones.iter() {
zone_token.insert(z.clone(), 0);
}
- for uuid in cl.useful_nodes().iter() {
+ for uuid in cl.nongateway_nodes().iter() {
let z = cl.get_node_zone(uuid)?;
let c = cl.get_node_capacity(uuid)?;
zone_token.insert(
z.clone(),
- zone_token[&z] + min(nb_partitions, (c / over_size) as usize),
+ zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize),
);
}
@@ -978,15 +1002,15 @@ mod tests {
id_zone_token[zone_to_id[z]] = *t;
}
- let mut nb_token = vec![0; nb_partitions];
- let mut last_zone = vec![zones.len(); nb_partitions];
+ let mut nb_token = vec![0; NB_PARTITIONS];
+ let mut last_zone = vec![zones.len(); NB_PARTITIONS];
let mut curr_zone = 0;
let redundancy = cl.parameters.zone_redundancy;
for replic in 0..cl.replication_factor {
- for p in 0..nb_partitions {
+ for p in 0..NB_PARTITIONS {
while id_zone_token[curr_zone] == 0
|| (last_zone[p] == curr_zone
&& redundancy - nb_token[p] <= cl.replication_factor - replic)