aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rpc/layout.rs70
1 files changed, 33 insertions, 37 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index a24bd9f3..73b356ad 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -175,47 +175,32 @@ impl ClusterLayout {
// Get old partition assignation
let old_partitions = self.parse_assignation_data();
- // Create new partition assignation starting from old one
- let mut partitions = old_partitions.clone();
-
- // Cleanup steps in new partition assignation:
- let min_keep_nodes_per_part = (self.replication_factor + 1) / 2;
- for part in partitions.iter_mut() {
- // - remove from assignation nodes that don't have a role in the layout anymore
- part.nodes
- .retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false));
-
- // - remove from assignation some nodes that are in the same datacenter
- // if we can, so that the later steps can ensure datacenter variety
- // as much as possible (but still under the constraint that each partition
- // should not move from at least a certain number of nodes that is
- // min_keep_nodes_per_part)
- 'rmloop: while part.nodes.len() > min_keep_nodes_per_part {
- let mut zns_c = HashMap::<&str, usize>::new();
- for (_id, info) in part.nodes.iter() {
- *zns_c.entry(info.unwrap().zone.as_str()).or_insert(0) += 1;
- }
- for i in 0..part.nodes.len() {
- if zns_c[part.nodes[i].1.unwrap().zone.as_str()] > 1 {
- part.nodes.remove(i);
- continue 'rmloop;
+ // Start new partition assignation with nodes from old assignation where it is relevant
+ let mut partitions = old_partitions
+ .iter()
+ .map(|old_part| {
+ let mut new_part = PartitionAss::new();
+ for node in old_part.nodes.iter() {
+ if let Some(role) = node.1 {
+ if role.capacity.is_some() {
+ new_part.add(None, n_zones, node.0, role);
+ }
}
}
+ new_part
+ })
+ .collect::<Vec<_>>();
- break;
- }
- }
-
- // When nodes are removed, or when bootstraping an assignation from
- // scratch for a new cluster, the old partitions will have holes (or be empty).
- // Here we add more nodes to make a complete (sub-optimal) assignation,
+ // In various cases, not enough nodes will have been added for all partitions
+ // in the step above (e.g. due to node removals, or new zones being added).
+ // Here we add more nodes to make a complete (but sub-optimal) assignation,
// using an initial partition assignation that is calculated using the multi-dc maglev trick
match self.initial_partition_assignation() {
Some(initial_partitions) => {
for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) {
for (id, info) in ipart.nodes.iter() {
if part.nodes.len() < self.replication_factor {
- part.add(part.nodes.len() + 1, n_zones, id, info.unwrap());
+ part.add(None, n_zones, id, info.unwrap());
}
}
assert!(part.nodes.len() == self.replication_factor);
@@ -287,7 +272,7 @@ impl ClusterLayout {
let mut newpart = part.clone();
newpart.nodes.remove(irm);
- if !newpart.add(newpart.nodes.len() + 1, n_zones, idadd, infoadd) {
+ if !newpart.add(None, n_zones, idadd, infoadd) {
continue;
}
assert!(newpart.nodes.len() == self.replication_factor);
@@ -422,7 +407,7 @@ impl ClusterLayout {
continue;
}
for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
- if partitions[qv].add(rep + 1, n_zones, node_id, node_info) {
+ if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) {
remaining -= 1;
*pos = pos2 + 1;
break;
@@ -579,15 +564,26 @@ impl<'a> PartitionAss<'a> {
}
}
+ // add is a key function in creating a PartitionAss, i.e. the list of nodes
+ // to which a partition is assigned. It tries to add a certain node id to the
+ // assignation, but checks that doing so is compatible with the NECESSARY
+ // condition that the partition assignation must be dispersed over different
+ // zones (datacenters) if enough zones exist. This is why it takes a n_zones
+ // parameter, which is the total number of zones that have existing nodes:
+ // if nodes in the assignation already cover all n_zones zones, then any node
+ // that is not yet in the assignation can be added. Otherwise, only nodes
+ // that are in a new zone can be added.
fn add(
&mut self,
- target_len: usize,
+ target_len: Option<usize>,
n_zones: usize,
node: &'a Uuid,
role: &'a NodeRole,
) -> bool {
- if self.nodes.len() != target_len - 1 {
- return false;
+ if let Some(tl) = target_len {
+ if self.nodes.len() != tl - 1 {
+ return false;
+ }
}
let p_zns = self