diff options
-rw-r--r-- | src/rpc/layout.rs | 70 |
1 files changed, 33 insertions, 37 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index a24bd9f3..73b356ad 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -175,47 +175,32 @@ impl ClusterLayout { // Get old partition assignation let old_partitions = self.parse_assignation_data(); - // Create new partition assignation starting from old one - let mut partitions = old_partitions.clone(); - - // Cleanup steps in new partition assignation: - let min_keep_nodes_per_part = (self.replication_factor + 1) / 2; - for part in partitions.iter_mut() { - // - remove from assignation nodes that don't have a role in the layout anymore - part.nodes - .retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false)); - - // - remove from assignation some nodes that are in the same datacenter - // if we can, so that the later steps can ensure datacenter variety - // as much as possible (but still under the constraint that each partition - // should not move from at least a certain number of nodes that is - // min_keep_nodes_per_part) - 'rmloop: while part.nodes.len() > min_keep_nodes_per_part { - let mut zns_c = HashMap::<&str, usize>::new(); - for (_id, info) in part.nodes.iter() { - *zns_c.entry(info.unwrap().zone.as_str()).or_insert(0) += 1; - } - for i in 0..part.nodes.len() { - if zns_c[part.nodes[i].1.unwrap().zone.as_str()] > 1 { - part.nodes.remove(i); - continue 'rmloop; + // Start new partition assignation with nodes from old assignation where it is relevant + let mut partitions = old_partitions + .iter() + .map(|old_part| { + let mut new_part = PartitionAss::new(); + for node in old_part.nodes.iter() { + if let Some(role) = node.1 { + if role.capacity.is_some() { + new_part.add(None, n_zones, node.0, role); + } } } + new_part + }) + .collect::<Vec<_>>(); - break; - } - } - - // When nodes are removed, or when bootstraping an assignation from - // scratch for a new cluster, the old partitions will have holes (or be empty). - // Here we add more nodes to make a complete (sub-optimal) assignation, + // In various cases, not enough nodes will have been added for all partitions + // in the step above (e.g. due to node removals, or new zones being added). + // Here we add more nodes to make a complete (but sub-optimal) assignation, // using an initial partition assignation that is calculated using the multi-dc maglev trick match self.initial_partition_assignation() { Some(initial_partitions) => { for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { for (id, info) in ipart.nodes.iter() { if part.nodes.len() < self.replication_factor { - part.add(part.nodes.len() + 1, n_zones, id, info.unwrap()); + part.add(None, n_zones, id, info.unwrap()); } } assert!(part.nodes.len() == self.replication_factor); @@ -287,7 +272,7 @@ impl ClusterLayout { let mut newpart = part.clone(); newpart.nodes.remove(irm); - if !newpart.add(newpart.nodes.len() + 1, n_zones, idadd, infoadd) { + if !newpart.add(None, n_zones, idadd, infoadd) { continue; } assert!(newpart.nodes.len() == self.replication_factor); @@ -422,7 +407,7 @@ impl ClusterLayout { continue; } for (pos2, &qv) in q.iter().enumerate().skip(*pos) { - if partitions[qv].add(rep + 1, n_zones, node_id, node_info) { + if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) { remaining -= 1; *pos = pos2 + 1; break; @@ -579,15 +564,26 @@ impl<'a> PartitionAss<'a> { } } + // add is a key function in creating a PartitionAss, i.e. the list of nodes + // to which a partition is assigned. It tries to add a certain node id to the + // assignation, but checks that doing so is compatible with the NECESSARY + // condition that the partition assignation must be dispersed over different + // zones (datacenters) if enough zones exist. This is why it takes a n_zones + // parameter, which is the total number of zones that have existing nodes: + // if nodes in the assignation already cover all n_zones zones, then any node + // that is not yet in the assignation can be added. Otherwise, only nodes + // that are in a new zone can be added. fn add( &mut self, - target_len: usize, + target_len: Option<usize>, n_zones: usize, node: &'a Uuid, role: &'a NodeRole, ) -> bool { - if self.nodes.len() != target_len - 1 { - return false; + if let Some(tl) = target_len { + if self.nodes.len() != tl - 1 { + return false; + } } let p_zns = self |