aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-05-01 09:54:19 +0200
committerAlex Auvolat <alex@adnab.me>2022-05-01 09:54:19 +0200
commitc1d1646c4d62300ec48503aa65623ee7e3df8685 (patch)
treec766c48972e660af6f5f952d4c12c5f7b7c217b7 /src/rpc/layout.rs
parentc9ef3e461b54f36b7fb60e03959416339cd61a9f (diff)
downloadgarage-c1d1646c4d62300ec48503aa65623ee7e3df8685.tar.gz
garage-c1d1646c4d62300ec48503aa65623ee7e3df8685.zip
Change the way new layout assignations are computed.
The function now computes an optimal assignation (with respect to partition size) that minimizes the distance to the former assignation, using flow algorithms. This commit was written by Mendes Oulamara <mendes.oulamara@pm.me>
Diffstat (limited to 'src/rpc/layout.rs')
-rw-r--r--src/rpc/layout.rs881
1 files changed, 447 insertions, 434 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index b9c02c21..afd7df17 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -1,10 +1,14 @@
use std::cmp::Ordering;
-use std::collections::{HashMap, HashSet};
+use std::cmp::{min};
+use std::collections::{HashMap};
use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
+use garage_util::bipartite::*;
+
+use rand::prelude::SliceRandom;
use crate::ring::*;
@@ -164,445 +168,454 @@ impl ClusterLayout {
true
}
- /// Calculate an assignation of partitions to nodes
- pub fn calculate_partition_assignation(&mut self) -> bool {
- let (configured_nodes, zones) = self.configured_nodes_and_zones();
- let n_zones = zones.len();
-
- println!("Calculating updated partition assignation, this may take some time...");
- println!();
-
- // Get old partition assignation
- let old_partitions = self.parse_assignation_data();
-
- // Start new partition assignation with nodes from old assignation where it is relevant
- let mut partitions = old_partitions
- .iter()
- .map(|old_part| {
- let mut new_part = PartitionAss::new();
- for node in old_part.nodes.iter() {
- if let Some(role) = node.1 {
- if role.capacity.is_some() {
- new_part.add(None, n_zones, node.0, role);
- }
- }
- }
- new_part
- })
- .collect::<Vec<_>>();
-
- // In various cases, not enough nodes will have been added for all partitions
- // in the step above (e.g. due to node removals, or new zones being added).
- // Here we add more nodes to make a complete (but sub-optimal) assignation,
- // using an initial partition assignation that is calculated using the multi-dc maglev trick
- match self.initial_partition_assignation() {
- Some(initial_partitions) => {
- for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) {
- for (id, info) in ipart.nodes.iter() {
- if part.nodes.len() < self.replication_factor {
- part.add(None, n_zones, id, info.unwrap());
- }
- }
- assert!(part.nodes.len() == self.replication_factor);
- }
- }
- None => {
- // Not enough nodes in cluster to build a correct assignation.
- // Signal it by returning an error.
- return false;
- }
- }
-
- // Calculate how many partitions each node should ideally store,
- // and how many partitions they are storing with the current assignation
- // This defines our target for which we will optimize in the following loop.
- let total_capacity = configured_nodes
- .iter()
- .map(|(_, info)| info.capacity.unwrap_or(0))
- .sum::<u32>() as usize;
- let total_partitions = self.replication_factor * (1 << PARTITION_BITS);
- let target_partitions_per_node = configured_nodes
- .iter()
- .map(|(id, info)| {
- (
- *id,
- info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity,
- )
- })
- .collect::<HashMap<&Uuid, usize>>();
-
- let mut partitions_per_node = self.partitions_per_node(&partitions[..]);
-
- println!("Target number of partitions per node:");
- for (node, npart) in target_partitions_per_node.iter() {
- println!("{:?}\t{}", node, npart);
- }
- println!();
-
- // Shuffle partitions between nodes so that nodes will reach (or better approach)
- // their target number of stored partitions
- loop {
- let mut option = None;
- for (i, part) in partitions.iter_mut().enumerate() {
- for (irm, (idrm, _)) in part.nodes.iter().enumerate() {
- let errratio = |node, parts| {
- let tgt = *target_partitions_per_node.get(node).unwrap() as f32;
- (parts - tgt) / tgt
- };
- let square = |x| x * x;
-
- let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32;
-
- for (idadd, infoadd) in configured_nodes.iter() {
- // skip replacing a node by itself
- // and skip replacing by gateway nodes
- if idadd == idrm || infoadd.capacity.is_none() {
- continue;
- }
-
- // We want to try replacing node idrm by node idadd
- // if that brings us close to our goal.
- let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32;
- let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd));
- let newcost =
- square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.));
- if newcost >= oldcost {
- // not closer to our goal
- continue;
- }
- let gain = oldcost - newcost;
-
- let mut newpart = part.clone();
-
- newpart.nodes.remove(irm);
- if !newpart.add(None, n_zones, idadd, infoadd) {
- continue;
- }
- assert!(newpart.nodes.len() == self.replication_factor);
-
- if !old_partitions[i]
- .is_valid_transition_to(&newpart, self.replication_factor)
- {
- continue;
- }
-
- if option
- .as_ref()
- .map(|(old_gain, _, _, _, _)| gain > *old_gain)
- .unwrap_or(true)
- {
- option = Some((gain, i, idadd, idrm, newpart));
- }
- }
- }
- }
- if let Some((_gain, i, idadd, idrm, newpart)) = option {
- *partitions_per_node.entry(idadd).or_insert(0) += 1;
- *partitions_per_node.get_mut(idrm).unwrap() -= 1;
- partitions[i] = newpart;
- } else {
- break;
- }
- }
- // Check we completed the assignation correctly
- // (this is a set of checks for the algorithm's consistency)
- assert!(partitions.len() == (1 << PARTITION_BITS));
- assert!(partitions
- .iter()
- .all(|p| p.nodes.len() == self.replication_factor));
-
- let new_partitions_per_node = self.partitions_per_node(&partitions[..]);
- assert!(new_partitions_per_node == partitions_per_node);
-
- // Show statistics
- println!("New number of partitions per node:");
- for (node, npart) in partitions_per_node.iter() {
- let tgt = *target_partitions_per_node.get(node).unwrap();
- let pct = 100f32 * (*npart as f32) / (tgt as f32);
- println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt);
- }
- println!();
-
- let mut diffcount = HashMap::new();
- for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) {
- let nminus = oldpart.txtplus(newpart);
- let nplus = newpart.txtplus(oldpart);
- if nminus != "[...]" || nplus != "[...]" {
- let tup = (nminus, nplus);
- *diffcount.entry(tup).or_insert(0) += 1;
- }
- }
- if diffcount.is_empty() {
- println!("No data will be moved between nodes.");
- } else {
- let mut diffcount = diffcount.into_iter().collect::<Vec<_>>();
- diffcount.sort();
- println!("Number of partitions that move:");
- for ((nminus, nplus), npart) in diffcount {
- println!("\t{}\t{} -> {}", npart, nminus, nplus);
- }
- }
- println!();
-
- // Calculate and save new assignation data
- let (nodes, assignation_data) =
- self.compute_assignation_data(&configured_nodes[..], &partitions[..]);
-
- self.node_id_vec = nodes;
- self.ring_assignation_data = assignation_data;
-
- true
- }
-
- fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> {
- let (configured_nodes, zones) = self.configured_nodes_and_zones();
- let n_zones = zones.len();
-
- // Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
- let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
-
- // Prepare ring
- let mut partitions: Vec<PartitionAss> = partitions_idx
- .iter()
- .map(|_i| PartitionAss::new())
- .collect::<Vec<_>>();
-
- // Create MagLev priority queues for each node
- let mut queues = configured_nodes
- .iter()
- .filter(|(_id, info)| info.capacity.is_some())
- .map(|(node_id, node_info)| {
- let mut parts = partitions_idx
- .iter()
- .map(|i| {
- let part_data =
- [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat();
- (*i, fasthash(&part_data[..]))
- })
- .collect::<Vec<_>>();
- parts.sort_by_key(|(_i, h)| *h);
- let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>();
- (node_id, node_info, parts_i, 0)
- })
- .collect::<Vec<_>>();
-
- let max_capacity = configured_nodes
- .iter()
- .filter_map(|(_, node_info)| node_info.capacity)
- .fold(0, std::cmp::max);
-
- // Fill up ring
- for rep in 0..self.replication_factor {
- queues.sort_by_key(|(ni, _np, _q, _p)| {
- let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
- fasthash(&queue_data[..])
- });
-
- for (_, _, _, pos) in queues.iter_mut() {
- *pos = 0;
- }
-
- let mut remaining = partitions_idx.len();
- while remaining > 0 {
- let remaining0 = remaining;
- for i_round in 0..max_capacity {
- for (node_id, node_info, q, pos) in queues.iter_mut() {
- if i_round >= node_info.capacity.unwrap() {
- continue;
- }
- for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
- if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) {
- remaining -= 1;
- *pos = pos2 + 1;
- break;
- }
- }
- }
- }
- if remaining == remaining0 {
- // No progress made, exit
- return None;
- }
- }
- }
-
- Some(partitions)
- }
+ /// This function calculates a new partition-to-node assignation.
+ /// The computed assignation maximizes the capacity of a
+ /// partition (assuming all partitions have the same size).
+ /// Among such optimal assignation, it minimizes the distance to
+ /// the former assignation (if any) to minimize the amount of
+ /// data to be moved. A heuristic ensures node triplets
+ /// dispersion (in garage_util::bipartite::optimize_matching()).
+ pub fn calculate_partition_assignation(&mut self) -> bool {
+
+ //The nodes might have been updated, some might have been deleted.
+ //So we need to first update the list of nodes and retrieve the
+ //assignation.
+ let old_node_assignation = self.update_nodes_and_ring();
+
+ let (node_zone, _) = self.get_node_zone_capacity();
+
+ //We compute the optimal number of partition to assign to
+ //every node and zone.
+ if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions(){
+ //We collect part_per_zone in a vec to not rely on the
+ //arbitrary order in which elements are iterated in
+ //Hashmap::iter()
+ let part_per_zone_vec = part_per_zone.iter()
+ .map(|(x,y)| (x.clone(),*y))
+ .collect::<Vec<(String,usize)>>();
+ //We create an indexing of the zones
+ let mut zone_id = HashMap::<String,usize>::new();
+ for i in 0..part_per_zone_vec.len(){
+ zone_id.insert(part_per_zone_vec[i].0.clone(), i);
+ }
+
+ //We compute a candidate for the new partition to zone
+ //assignation.
+ let nb_zones = part_per_zone.len();
+ let nb_nodes = part_per_nod.len();
+ let nb_partitions = 1<<PARTITION_BITS;
+ let left_cap_vec = vec![self.replication_factor as u32 ; nb_partitions];
+ let right_cap_vec = part_per_zone_vec.iter().map(|(_,y)| *y as u32)
+ .collect();
+ let mut zone_assignation =
+ dinic_compute_matching(left_cap_vec, right_cap_vec);
+
+
+ //We create the structure for the partition-to-node assignation.
+ let mut node_assignation =
+ vec![vec![None; self.replication_factor ];nb_partitions];
+ //We will decrement part_per_nod to keep track of the number
+ //of partitions that we still have to associate.
+ let mut part_per_nod = part_per_nod.clone();
+
+ //We minimize the distance to the former assignation(if any)
+
+ //We get the id of the zones of the former assignation
+ //(and the id no_zone if there is no node assignated)
+ let no_zone = part_per_zone_vec.len();
+ let old_zone_assignation : Vec<Vec<usize>> =
+ old_node_assignation.iter().map(|x| x.iter().map(
+ |id| match *id { Some(i) => zone_id[&node_zone[i]] ,
+ None => no_zone }
+ ).collect()).collect();
+
+ //We minimize the distance to the former zone assignation
+ zone_assignation = optimize_matching(
+ &old_zone_assignation, &zone_assignation, nb_zones+1); //+1 for no_zone
+
+ //We need to assign partitions to nodes in their zone
+ //We first put the nodes assignation that can stay the same
+ for i in 0..nb_partitions{
+ for j in 0..self.replication_factor {
+ if let Some(Some(former_node)) = old_node_assignation[i].iter().find(
+ |x| if let Some(id) = x {
+ zone_id[&node_zone[*id]] == zone_assignation[i][j]
+ }
+ else {false}
+ )
+ {
+ if part_per_nod[*former_node] > 0 {
+ node_assignation[i][j] = Some(*former_node);
+ part_per_nod[*former_node] -= 1;
+ }
+ }
+ }
+ }
+
+
+ //We complete the assignation of partitions to nodes
+ let mut rng = rand::thread_rng();
+ for i in 0..nb_partitions {
+ for j in 0..self.replication_factor {
+ if node_assignation[i][j] == None {
+ let possible_nodes : Vec<usize> = (0..nb_nodes)
+ .filter(
+ |id| zone_id[&node_zone[*id]] == zone_assignation[i][j]
+ && part_per_nod[*id] > 0).collect();
+ assert!(possible_nodes.len()>0);
+ //We randomly pick a node
+ if let Some(nod) = possible_nodes.choose(&mut rng){
+ node_assignation[i][j] = Some(*nod);
+ part_per_nod[*nod] -= 1;
+ }
+ }
+ }
+ }
+
+ //We write the assignation in the 1D table
+ self.ring_assignation_data = Vec::<CompactNodeType>::new();
+ for i in 0..nb_partitions{
+ for j in 0..self.replication_factor {
+ if let Some(id) = node_assignation[i][j] {
+ self.ring_assignation_data.push(id as CompactNodeType);
+ }
+ else {assert!(false)}
+ }
+ }
+
+ true
+ }
+ else { false }
+ }
+
+ /// The LwwMap of node roles might have changed. This function updates the node_id_vec
+ /// and returns the assignation given by ring, with the new indices of the nodes, and
+ /// None of the node is not present anymore.
+ /// We work with the assumption that only this function and calculate_new_assignation
+ /// do modify assignation_ring and node_id_vec.
+ fn update_nodes_and_ring(&mut self) -> Vec<Vec<Option<usize>>> {
+ let nb_partitions = 1usize<<PARTITION_BITS;
+ let mut node_assignation =
+ vec![vec![None; self.replication_factor ];nb_partitions];
+ let rf = self.replication_factor;
+ let ring = &self.ring_assignation_data;
+
+ let new_node_id_vec : Vec::<Uuid> = self.roles.items().iter()
+ .map(|(k, _, _)| *k)
+ .collect();
+
+ if ring.len() == rf*nb_partitions {
+ for i in 0..nb_partitions {
+ for j in 0..self.replication_factor {
+ node_assignation[i][j] = new_node_id_vec.iter()
+ .position(|id| *id == self.node_id_vec[ring[i*rf + j] as usize]);
+ }
+ }
+ }
+
+ self.node_id_vec = new_node_id_vec;
+ self.ring_assignation_data = vec![];
+ return node_assignation;
+ }
+
+ ///This function compute the number of partition to assign to
+ ///every node and zone, so that every partition is replicated
+ ///self.replication_factor times and the capacity of a partition
+ ///is maximized.
+ fn optimal_proportions(&mut self) -> Option<(Vec<usize>, HashMap<String, usize>)> {
+
+ let mut zone_capacity :HashMap<String, u32>= HashMap::new();
+
+ let (node_zone, node_capacity) = self.get_node_zone_capacity();
+ let nb_nodes = self.node_id_vec.len();
+
+ for i in 0..nb_nodes
+ {
+ if zone_capacity.contains_key(&node_zone[i]) {
+ zone_capacity.insert(node_zone[i].clone(), zone_capacity[&node_zone[i]] + node_capacity[i]);
+ }
+ else{
+ zone_capacity.insert(node_zone[i].clone(), node_capacity[i]);
+ }
+ }
+
+ //Compute the optimal number of partitions per zone
+ let sum_capacities: u32 =zone_capacity.values().sum();
+
+ if sum_capacities <= 0 {
+ println!("No storage capacity in the network.");
+ return None;
+ }
+
+ let nb_partitions = 1<<PARTITION_BITS;
+
+ //Initially we would like to use zones porportionally to
+ //their capacity.
+ //However, a large zone can be associated to at most
+ //nb_partitions to ensure replication of the date.
+ //So we take the min with nb_partitions:
+ let mut part_per_zone : HashMap<String, usize> =
+ zone_capacity.iter()
+ .map(|(k, v)| (k.clone(), min(nb_partitions,
+ (self.replication_factor*nb_partitions
+ **v as usize)/sum_capacities as usize) ) ).collect();
+
+ //The replication_factor-1 upper bounds the number of
+ //part_per_zones that are greater than nb_partitions
+ for _ in 1..self.replication_factor {
+ //The number of partitions that are not assignated to
+ //a zone that takes nb_partitions.
+ let sum_capleft : u32 = zone_capacity.keys()
+ .filter(| k | {part_per_zone[*k] < nb_partitions} )
+ .map(|k| zone_capacity[k]).sum();
+
+ //The number of replication of the data that we need
+ //to ensure.
+ let repl_left = self.replication_factor
+ - part_per_zone.values()
+ .filter(|x| {**x == nb_partitions})
+ .count();
+ if repl_left == 0 {
+ break;
+ }
+
+ for k in zone_capacity.keys() {
+ if part_per_zone[k] != nb_partitions
+ {
+ part_per_zone.insert(k.to_string() , min(nb_partitions,
+ (nb_partitions*zone_capacity[k] as usize
+ *repl_left)/sum_capleft as usize));
+ }
+ }
+ }
+
+ //Now we divide the zone's partition share proportionally
+ //between their nodes.
+
+ let mut part_per_nod : Vec<usize> = (0..nb_nodes).map(
+ |i| (part_per_zone[&node_zone[i]]*node_capacity[i] as usize)/zone_capacity[&node_zone[i]] as usize
+ )
+ .collect();
+
+ //We must update the part_per_zone to make it correspond to
+ //part_per_nod (because of integer rounding)
+ part_per_zone = part_per_zone.iter().map(|(k,_)|
+ (k.clone(), 0))
+ .collect();
+ for i in 0..nb_nodes {
+ part_per_zone.insert(
+ node_zone[i].clone() ,
+ part_per_zone[&node_zone[i]] + part_per_nod[i]);
+ }
+
+ //Because of integer rounding, the total sum of part_per_nod
+ //might not be replication_factor*nb_partitions.
+ // We need at most to add 1 to every non maximal value of
+ // part_per_nod. The capacity of a partition will be bounded
+ // by the minimal value of
+ // node_capacity_vec[i]/part_per_nod[i]
+ // so we try to maximize this minimal value, keeping the
+ // part_per_zone capped
+
+ let discrepancy : usize =
+ nb_partitions*self.replication_factor
+ - part_per_nod.iter().sum::<usize>();
+
+ //We use a stupid O(N^2) algorithm. If the number of nodes
+ //is actually expected to be high, one should optimize this.
+
+ for _ in 0..discrepancy {
+ if let Some(idmax) = (0..nb_nodes)
+ .filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions)
+ .max_by( |i,j|
+ (node_capacity[*i]*(part_per_nod[*j]+1) as u32)
+ .cmp(&(node_capacity[*j]*(part_per_nod[*i]+1) as u32))
+ )
+ {
+ part_per_nod[idmax] += 1;
+ part_per_zone.insert(node_zone[idmax].clone(),part_per_zone[&node_zone[idmax]]+1);
+ }
+ }
+
+ //We check the algorithm consistency
+
+ let discrepancy : usize =
+ nb_partitions*self.replication_factor
+ - part_per_nod.iter().sum::<usize>();
+ assert!(discrepancy == 0);
+ assert!(if let Some(v) = part_per_zone.values().max()
+ {*v <= nb_partitions} else {false} );
+
+ Some((part_per_nod, part_per_zone))
+ }
+
+
+ //Returns vectors of zone and capacity; indexed by the same (temporary)
+ //indices as node_id_vec.
+ fn get_node_zone_capacity(& self) -> (Vec<String> , Vec<u32>) {
+
+ let node_zone = self.node_id_vec.iter().map(
+ |id_nod| match self.node_role(id_nod) {
+ Some(NodeRole{zone,capacity:_,tags:_}) => zone.clone() ,
+ _ => "".to_string()
+ }
+ ).collect();
+
+ let node_capacity = self.node_id_vec.iter().map(
+ |id_nod| match self.node_role(id_nod) {
+ Some(NodeRole{zone:_,capacity,tags:_}) =>
+ if let Some(c)=capacity
+ {*c}
+ else {0},
+ _ => 0
+ }
+ ).collect();
+
+ (node_zone,node_capacity)
+ }
- fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) {
- let configured_nodes = self
- .roles
- .items()
- .iter()
- .filter(|(_id, _, info)| info.0.is_some())
- .map(|(id, _, info)| (id, info.0.as_ref().unwrap()))
- .collect::<Vec<(&Uuid, &NodeRole)>>();
-
- let zones = configured_nodes
- .iter()
- .filter(|(_id, info)| info.capacity.is_some())
- .map(|(_id, info)| info.zone.as_str())
- .collect::<HashSet<&str>>();
-
- (configured_nodes, zones)
- }
-
- fn compute_assignation_data<'a>(
- &self,
- configured_nodes: &[(&'a Uuid, &'a NodeRole)],
- partitions: &[PartitionAss<'a>],
- ) -> (Vec<Uuid>, Vec<CompactNodeType>) {
- assert!(partitions.len() == (1 << PARTITION_BITS));
-
- // Make a canonical order for nodes
- let mut nodes = configured_nodes
- .iter()
- .filter(|(_id, info)| info.capacity.is_some())
- .map(|(id, _)| **id)
- .collect::<Vec<_>>();
- let nodes_rev = nodes
- .iter()
- .enumerate()
- .map(|(i, id)| (*id, i as CompactNodeType))
- .collect::<HashMap<Uuid, CompactNodeType>>();
-
- let mut assignation_data = vec![];
- for partition in partitions.iter() {
- assert!(partition.nodes.len() == self.replication_factor);
- for (id, _) in partition.nodes.iter() {
- assignation_data.push(*nodes_rev.get(id).unwrap());
- }
- }
-
- nodes.extend(
- configured_nodes
- .iter()
- .filter(|(_id, info)| info.capacity.is_none())
- .map(|(id, _)| **id),
- );
-
- (nodes, assignation_data)
- }
-
- fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> {
- if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) {
- // If the previous assignation data is correct, use that
- let mut partitions = vec![];
- for i in 0..(1 << PARTITION_BITS) {
- let mut part = PartitionAss::new();
- for node_i in self.ring_assignation_data
- [i * self.replication_factor..(i + 1) * self.replication_factor]
- .iter()
- {
- let node_id = &self.node_id_vec[*node_i as usize];
-
- if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) {
- part.nodes.push((node_id, Some(info)));
- } else {
- part.nodes.push((node_id, None));
- }
- }
- partitions.push(part);
- }
- partitions
- } else {
- // Otherwise start fresh
- (0..(1 << PARTITION_BITS))
- .map(|_| PartitionAss::new())
- .collect()
- }
- }
-
- fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> {
- let mut partitions_per_node = HashMap::<&Uuid, usize>::new();
- for p in partitions.iter() {
- for (id, _) in p.nodes.iter() {
- *partitions_per_node.entry(*id).or_insert(0) += 1;
- }
- }
- partitions_per_node
- }
-}
-
-// ---- Internal structs for partition assignation in layout ----
-
-#[derive(Clone)]
-struct PartitionAss<'a> {
- nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>,
}
-impl<'a> PartitionAss<'a> {
- fn new() -> Self {
- Self { nodes: Vec::new() }
- }
- fn nplus(&self, other: &PartitionAss<'a>) -> usize {
- self.nodes
- .iter()
- .filter(|x| !other.nodes.contains(x))
- .count()
- }
- fn txtplus(&self, other: &PartitionAss<'a>) -> String {
- let mut nodes = self
- .nodes
- .iter()
- .filter(|x| !other.nodes.contains(x))
- .map(|x| format!("{:?}", x.0))
- .collect::<Vec<_>>();
- nodes.sort();
- if self.nodes.iter().any(|x| other.nodes.contains(x)) {
- nodes.push("...".into());
- }
- format!("[{}]", nodes.join(" "))
- }
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use itertools::Itertools;
+
+ fn check_assignation(cl : &ClusterLayout) {
+
+ //Check that input data has the right format
+ let nb_partitions = 1usize<<PARTITION_BITS;
+ assert!([1,2,3].contains(&cl.replication_factor));
+ assert!(cl.ring_assignation_data.len() == nb_partitions*cl.replication_factor);
+
+ let (node_zone, node_capacity) = cl.get_node_zone_capacity();
+
+
+ //Check that is is a correct assignation with zone redundancy
+ let rf = cl.replication_factor;
+ for i in 0..nb_partitions{
+ assert!( rf ==
+ cl.ring_assignation_data[rf*i..rf*(i+1)].iter()
+ .map(|nod| node_zone[*nod as usize].clone())
+ .unique()
+ .count() );
+ }
+
+ let nb_nodes = cl.node_id_vec.len();
+ //Check optimality
+ let node_nb_part =(0..nb_nodes).map(|i| cl.ring_assignation_data
+ .iter()
+ .filter(|x| **x==i as u8)
+ .count())
+ .collect::<Vec::<_>>();
+
+ let zone_vec = node_zone.iter().unique().collect::<Vec::<_>>();
+ let zone_nb_part = zone_vec.iter().map( |z| cl.ring_assignation_data.iter()
+ .filter(|x| node_zone[**x as usize] == **z)
+ .count()
+ ).collect::<Vec::<_>>();
+
+ //Check optimality of the zone assignation : would it be better for the
+ //node_capacity/node_partitions ratio to change the assignation of a partition
+
+ if let Some(idmin) = (0..nb_nodes).min_by(
+ |i,j| (node_capacity[*i]*node_nb_part[*j] as u32)
+ .cmp(&(node_capacity[*j]*node_nb_part[*i] as u32))
+ ){
+ if let Some(idnew) = (0..nb_nodes)
+ .filter( |i| if let Some(p) = zone_vec.iter().position(|z| **z==node_zone[*i])
+ {zone_nb_part[p] < nb_partitions }
+ else { false })
+ .max_by(
+ |i,j| (node_capacity[*i]*(node_nb_part[*j]as u32+1))
+ .cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1)))
+ ){
+ assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >=
+ node_capacity[idnew]*node_nb_part[idmin] as u32);
+ }
+
+ }
+
+ //In every zone, check optimality of the nod assignation
+ for z in zone_vec {
+ let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z );
+ if let Some(idmin) = node_of_z_iter.clone().min_by(
+ |i,j| (node_capacity[*i]*node_nb_part[*j] as u32)
+ .cmp(&(node_capacity[*j]*node_nb_part[*i] as u32))
+ ){
+ if let Some(idnew) = node_of_z_iter.min_by(
+ |i,j| (node_capacity[*i]*(node_nb_part[*j] as u32+1))
+ .cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1)))
+ ){
+ assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >=
+ node_capacity[idnew]*node_nb_part[idmin] as u32);
+ }
+ }
+ }
+
+ }
+
+ fn update_layout(cl : &mut ClusterLayout, node_id_vec : &Vec<u8>,
+ node_capacity_vec : &Vec<u32> , node_zone_vec : &Vec<String>) {
+ for i in 0..node_id_vec.len(){
+ if let Some(x) = FixedBytes32::try_from(&[i as u8;32]) {
+ cl.node_id_vec.push(x);
+ }
+
+ let update = cl.roles.update_mutator(cl.node_id_vec[i] ,
+ NodeRoleV(Some(NodeRole{
+ zone : (node_zone_vec[i].to_string()),
+ capacity : (Some(node_capacity_vec[i])),
+ tags : (vec![])})));
+ cl.roles.merge(&update);
+ }
+ }
+
+ #[test]
+ fn test_assignation() {
+
+ let mut node_id_vec = vec![1,2,3];
+ let mut node_capacity_vec = vec![4000,1000,2000];
+ let mut node_zone_vec= vec!["A", "B", "C"].into_iter().map(|x| x.to_string()).collect();
+
+ let mut cl = ClusterLayout {
+ node_id_vec: vec![],
+
+ roles : LwwMap::new(),
+
+ replication_factor: 3,
+ ring_assignation_data : vec![],
+ version:0,
+ staging: LwwMap::new(),
+ staging_hash: sha256sum(&[1;32]),
+ };
+ update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
+ cl.calculate_partition_assignation();
+ check_assignation(&cl);
+
+ node_id_vec = vec![1,2,3, 4, 5, 6, 7, 8, 9];
+ node_capacity_vec = vec![4000,1000,1000, 3000, 1000, 1000, 2000, 10000, 2000];
+ node_zone_vec= vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"].into_iter().map(|x| x.to_string()).collect();
+ update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
+ cl.calculate_partition_assignation();
+ check_assignation(&cl);
+
+ node_capacity_vec = vec![4000,1000,2000, 7000, 1000, 1000, 2000, 10000, 2000];
+ update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
+ cl.calculate_partition_assignation();
+ check_assignation(&cl);
+
+
+ node_capacity_vec = vec![4000,4000,2000, 7000, 1000, 9000, 2000, 10, 2000];
+ update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
+ cl.calculate_partition_assignation();
+ check_assignation(&cl);
+
+ }
+}
- fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool {
- let min_keep_nodes_per_part = (replication_factor + 1) / 2;
- let n_removed = self.nplus(other);
- if self.nodes.len() <= min_keep_nodes_per_part {
- n_removed == 0
- } else {
- n_removed <= self.nodes.len() - min_keep_nodes_per_part
- }
- }
- // add is a key function in creating a PartitionAss, i.e. the list of nodes
- // to which a partition is assigned. It tries to add a certain node id to the
- // assignation, but checks that doing so is compatible with the NECESSARY
- // condition that the partition assignation must be dispersed over different
- // zones (datacenters) if enough zones exist. This is why it takes a n_zones
- // parameter, which is the total number of zones that have existing nodes:
- // if nodes in the assignation already cover all n_zones zones, then any node
- // that is not yet in the assignation can be added. Otherwise, only nodes
- // that are in a new zone can be added.
- fn add(
- &mut self,
- target_len: Option<usize>,
- n_zones: usize,
- node: &'a Uuid,
- role: &'a NodeRole,
- ) -> bool {
- if let Some(tl) = target_len {
- if self.nodes.len() != tl - 1 {
- return false;
- }
- }
-
- let p_zns = self
- .nodes
- .iter()
- .map(|(_id, info)| info.unwrap().zone.as_str())
- .collect::<HashSet<&str>>();
- if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str()))
- || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node))
- {
- self.nodes.push((node, Some(role)));
- true
- } else {
- false
- }
- }
-}