diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/rpc/layout.rs | 881 |
2 files changed, 448 insertions, 434 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index efaacf2e..654c1dc6 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -23,6 +23,7 @@ gethostname = "0.2" hex = "0.4" tracing = "0.1.30" rand = "0.8" +itertools="0.10" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } async-trait = "0.1.7" diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index b9c02c21..afd7df17 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -1,10 +1,14 @@ use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; +use std::cmp::{min}; +use std::collections::{HashMap}; use serde::{Deserialize, Serialize}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; +use garage_util::bipartite::*; + +use rand::prelude::SliceRandom; use crate::ring::*; @@ -164,445 +168,454 @@ impl ClusterLayout { true } - /// Calculate an assignation of partitions to nodes - pub fn calculate_partition_assignation(&mut self) -> bool { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); - - println!("Calculating updated partition assignation, this may take some time..."); - println!(); - - // Get old partition assignation - let old_partitions = self.parse_assignation_data(); - - // Start new partition assignation with nodes from old assignation where it is relevant - let mut partitions = old_partitions - .iter() - .map(|old_part| { - let mut new_part = PartitionAss::new(); - for node in old_part.nodes.iter() { - if let Some(role) = node.1 { - if role.capacity.is_some() { - new_part.add(None, n_zones, node.0, role); - } - } - } - new_part - }) - .collect::<Vec<_>>(); - - // In various cases, not enough nodes will have been added for all partitions - // in the step above (e.g. due to node removals, or new zones being added). - // Here we add more nodes to make a complete (but sub-optimal) assignation, - // using an initial partition assignation that is calculated using the multi-dc maglev trick - match self.initial_partition_assignation() { - Some(initial_partitions) => { - for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { - for (id, info) in ipart.nodes.iter() { - if part.nodes.len() < self.replication_factor { - part.add(None, n_zones, id, info.unwrap()); - } - } - assert!(part.nodes.len() == self.replication_factor); - } - } - None => { - // Not enough nodes in cluster to build a correct assignation. - // Signal it by returning an error. - return false; - } - } - - // Calculate how many partitions each node should ideally store, - // and how many partitions they are storing with the current assignation - // This defines our target for which we will optimize in the following loop. - let total_capacity = configured_nodes - .iter() - .map(|(_, info)| info.capacity.unwrap_or(0)) - .sum::<u32>() as usize; - let total_partitions = self.replication_factor * (1 << PARTITION_BITS); - let target_partitions_per_node = configured_nodes - .iter() - .map(|(id, info)| { - ( - *id, - info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity, - ) - }) - .collect::<HashMap<&Uuid, usize>>(); - - let mut partitions_per_node = self.partitions_per_node(&partitions[..]); - - println!("Target number of partitions per node:"); - for (node, npart) in target_partitions_per_node.iter() { - println!("{:?}\t{}", node, npart); - } - println!(); - - // Shuffle partitions between nodes so that nodes will reach (or better approach) - // their target number of stored partitions - loop { - let mut option = None; - for (i, part) in partitions.iter_mut().enumerate() { - for (irm, (idrm, _)) in part.nodes.iter().enumerate() { - let errratio = |node, parts| { - let tgt = *target_partitions_per_node.get(node).unwrap() as f32; - (parts - tgt) / tgt - }; - let square = |x| x * x; - - let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32; - - for (idadd, infoadd) in configured_nodes.iter() { - // skip replacing a node by itself - // and skip replacing by gateway nodes - if idadd == idrm || infoadd.capacity.is_none() { - continue; - } - - // We want to try replacing node idrm by node idadd - // if that brings us close to our goal. - let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32; - let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd)); - let newcost = - square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.)); - if newcost >= oldcost { - // not closer to our goal - continue; - } - let gain = oldcost - newcost; - - let mut newpart = part.clone(); - - newpart.nodes.remove(irm); - if !newpart.add(None, n_zones, idadd, infoadd) { - continue; - } - assert!(newpart.nodes.len() == self.replication_factor); - - if !old_partitions[i] - .is_valid_transition_to(&newpart, self.replication_factor) - { - continue; - } - - if option - .as_ref() - .map(|(old_gain, _, _, _, _)| gain > *old_gain) - .unwrap_or(true) - { - option = Some((gain, i, idadd, idrm, newpart)); - } - } - } - } - if let Some((_gain, i, idadd, idrm, newpart)) = option { - *partitions_per_node.entry(idadd).or_insert(0) += 1; - *partitions_per_node.get_mut(idrm).unwrap() -= 1; - partitions[i] = newpart; - } else { - break; - } - } - // Check we completed the assignation correctly - // (this is a set of checks for the algorithm's consistency) - assert!(partitions.len() == (1 << PARTITION_BITS)); - assert!(partitions - .iter() - .all(|p| p.nodes.len() == self.replication_factor)); - - let new_partitions_per_node = self.partitions_per_node(&partitions[..]); - assert!(new_partitions_per_node == partitions_per_node); - - // Show statistics - println!("New number of partitions per node:"); - for (node, npart) in partitions_per_node.iter() { - let tgt = *target_partitions_per_node.get(node).unwrap(); - let pct = 100f32 * (*npart as f32) / (tgt as f32); - println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt); - } - println!(); - - let mut diffcount = HashMap::new(); - for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) { - let nminus = oldpart.txtplus(newpart); - let nplus = newpart.txtplus(oldpart); - if nminus != "[...]" || nplus != "[...]" { - let tup = (nminus, nplus); - *diffcount.entry(tup).or_insert(0) += 1; - } - } - if diffcount.is_empty() { - println!("No data will be moved between nodes."); - } else { - let mut diffcount = diffcount.into_iter().collect::<Vec<_>>(); - diffcount.sort(); - println!("Number of partitions that move:"); - for ((nminus, nplus), npart) in diffcount { - println!("\t{}\t{} -> {}", npart, nminus, nplus); - } - } - println!(); - - // Calculate and save new assignation data - let (nodes, assignation_data) = - self.compute_assignation_data(&configured_nodes[..], &partitions[..]); - - self.node_id_vec = nodes; - self.ring_assignation_data = assignation_data; - - true - } - - fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); - - // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) - let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); - - // Prepare ring - let mut partitions: Vec<PartitionAss> = partitions_idx - .iter() - .map(|_i| PartitionAss::new()) - .collect::<Vec<_>>(); - - // Create MagLev priority queues for each node - let mut queues = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(node_id, node_info)| { - let mut parts = partitions_idx - .iter() - .map(|i| { - let part_data = - [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat(); - (*i, fasthash(&part_data[..])) - }) - .collect::<Vec<_>>(); - parts.sort_by_key(|(_i, h)| *h); - let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>(); - (node_id, node_info, parts_i, 0) - }) - .collect::<Vec<_>>(); - - let max_capacity = configured_nodes - .iter() - .filter_map(|(_, node_info)| node_info.capacity) - .fold(0, std::cmp::max); - - // Fill up ring - for rep in 0..self.replication_factor { - queues.sort_by_key(|(ni, _np, _q, _p)| { - let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); - fasthash(&queue_data[..]) - }); - - for (_, _, _, pos) in queues.iter_mut() { - *pos = 0; - } - - let mut remaining = partitions_idx.len(); - while remaining > 0 { - let remaining0 = remaining; - for i_round in 0..max_capacity { - for (node_id, node_info, q, pos) in queues.iter_mut() { - if i_round >= node_info.capacity.unwrap() { - continue; - } - for (pos2, &qv) in q.iter().enumerate().skip(*pos) { - if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) { - remaining -= 1; - *pos = pos2 + 1; - break; - } - } - } - } - if remaining == remaining0 { - // No progress made, exit - return None; - } - } - } - - Some(partitions) - } + /// This function calculates a new partition-to-node assignation. + /// The computed assignation maximizes the capacity of a + /// partition (assuming all partitions have the same size). + /// Among such optimal assignation, it minimizes the distance to + /// the former assignation (if any) to minimize the amount of + /// data to be moved. A heuristic ensures node triplets + /// dispersion (in garage_util::bipartite::optimize_matching()). + pub fn calculate_partition_assignation(&mut self) -> bool { + + //The nodes might have been updated, some might have been deleted. + //So we need to first update the list of nodes and retrieve the + //assignation. + let old_node_assignation = self.update_nodes_and_ring(); + + let (node_zone, _) = self.get_node_zone_capacity(); + + //We compute the optimal number of partition to assign to + //every node and zone. + if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions(){ + //We collect part_per_zone in a vec to not rely on the + //arbitrary order in which elements are iterated in + //Hashmap::iter() + let part_per_zone_vec = part_per_zone.iter() + .map(|(x,y)| (x.clone(),*y)) + .collect::<Vec<(String,usize)>>(); + //We create an indexing of the zones + let mut zone_id = HashMap::<String,usize>::new(); + for i in 0..part_per_zone_vec.len(){ + zone_id.insert(part_per_zone_vec[i].0.clone(), i); + } + + //We compute a candidate for the new partition to zone + //assignation. + let nb_zones = part_per_zone.len(); + let nb_nodes = part_per_nod.len(); + let nb_partitions = 1<<PARTITION_BITS; + let left_cap_vec = vec![self.replication_factor as u32 ; nb_partitions]; + let right_cap_vec = part_per_zone_vec.iter().map(|(_,y)| *y as u32) + .collect(); + let mut zone_assignation = + dinic_compute_matching(left_cap_vec, right_cap_vec); + + + //We create the structure for the partition-to-node assignation. + let mut node_assignation = + vec![vec![None; self.replication_factor ];nb_partitions]; + //We will decrement part_per_nod to keep track of the number + //of partitions that we still have to associate. + let mut part_per_nod = part_per_nod.clone(); + + //We minimize the distance to the former assignation(if any) + + //We get the id of the zones of the former assignation + //(and the id no_zone if there is no node assignated) + let no_zone = part_per_zone_vec.len(); + let old_zone_assignation : Vec<Vec<usize>> = + old_node_assignation.iter().map(|x| x.iter().map( + |id| match *id { Some(i) => zone_id[&node_zone[i]] , + None => no_zone } + ).collect()).collect(); + + //We minimize the distance to the former zone assignation + zone_assignation = optimize_matching( + &old_zone_assignation, &zone_assignation, nb_zones+1); //+1 for no_zone + + //We need to assign partitions to nodes in their zone + //We first put the nodes assignation that can stay the same + for i in 0..nb_partitions{ + for j in 0..self.replication_factor { + if let Some(Some(former_node)) = old_node_assignation[i].iter().find( + |x| if let Some(id) = x { + zone_id[&node_zone[*id]] == zone_assignation[i][j] + } + else {false} + ) + { + if part_per_nod[*former_node] > 0 { + node_assignation[i][j] = Some(*former_node); + part_per_nod[*former_node] -= 1; + } + } + } + } + + + //We complete the assignation of partitions to nodes + let mut rng = rand::thread_rng(); + for i in 0..nb_partitions { + for j in 0..self.replication_factor { + if node_assignation[i][j] == None { + let possible_nodes : Vec<usize> = (0..nb_nodes) + .filter( + |id| zone_id[&node_zone[*id]] == zone_assignation[i][j] + && part_per_nod[*id] > 0).collect(); + assert!(possible_nodes.len()>0); + //We randomly pick a node + if let Some(nod) = possible_nodes.choose(&mut rng){ + node_assignation[i][j] = Some(*nod); + part_per_nod[*nod] -= 1; + } + } + } + } + + //We write the assignation in the 1D table + self.ring_assignation_data = Vec::<CompactNodeType>::new(); + for i in 0..nb_partitions{ + for j in 0..self.replication_factor { + if let Some(id) = node_assignation[i][j] { + self.ring_assignation_data.push(id as CompactNodeType); + } + else {assert!(false)} + } + } + + true + } + else { false } + } + + /// The LwwMap of node roles might have changed. This function updates the node_id_vec + /// and returns the assignation given by ring, with the new indices of the nodes, and + /// None of the node is not present anymore. + /// We work with the assumption that only this function and calculate_new_assignation + /// do modify assignation_ring and node_id_vec. + fn update_nodes_and_ring(&mut self) -> Vec<Vec<Option<usize>>> { + let nb_partitions = 1usize<<PARTITION_BITS; + let mut node_assignation = + vec![vec![None; self.replication_factor ];nb_partitions]; + let rf = self.replication_factor; + let ring = &self.ring_assignation_data; + + let new_node_id_vec : Vec::<Uuid> = self.roles.items().iter() + .map(|(k, _, _)| *k) + .collect(); + + if ring.len() == rf*nb_partitions { + for i in 0..nb_partitions { + for j in 0..self.replication_factor { + node_assignation[i][j] = new_node_id_vec.iter() + .position(|id| *id == self.node_id_vec[ring[i*rf + j] as usize]); + } + } + } + + self.node_id_vec = new_node_id_vec; + self.ring_assignation_data = vec![]; + return node_assignation; + } + + ///This function compute the number of partition to assign to + ///every node and zone, so that every partition is replicated + ///self.replication_factor times and the capacity of a partition + ///is maximized. + fn optimal_proportions(&mut self) -> Option<(Vec<usize>, HashMap<String, usize>)> { + + let mut zone_capacity :HashMap<String, u32>= HashMap::new(); + + let (node_zone, node_capacity) = self.get_node_zone_capacity(); + let nb_nodes = self.node_id_vec.len(); + + for i in 0..nb_nodes + { + if zone_capacity.contains_key(&node_zone[i]) { + zone_capacity.insert(node_zone[i].clone(), zone_capacity[&node_zone[i]] + node_capacity[i]); + } + else{ + zone_capacity.insert(node_zone[i].clone(), node_capacity[i]); + } + } + + //Compute the optimal number of partitions per zone + let sum_capacities: u32 =zone_capacity.values().sum(); + + if sum_capacities <= 0 { + println!("No storage capacity in the network."); + return None; + } + + let nb_partitions = 1<<PARTITION_BITS; + + //Initially we would like to use zones porportionally to + //their capacity. + //However, a large zone can be associated to at most + //nb_partitions to ensure replication of the date. + //So we take the min with nb_partitions: + let mut part_per_zone : HashMap<String, usize> = + zone_capacity.iter() + .map(|(k, v)| (k.clone(), min(nb_partitions, + (self.replication_factor*nb_partitions + **v as usize)/sum_capacities as usize) ) ).collect(); + + //The replication_factor-1 upper bounds the number of + //part_per_zones that are greater than nb_partitions + for _ in 1..self.replication_factor { + //The number of partitions that are not assignated to + //a zone that takes nb_partitions. + let sum_capleft : u32 = zone_capacity.keys() + .filter(| k | {part_per_zone[*k] < nb_partitions} ) + .map(|k| zone_capacity[k]).sum(); + + //The number of replication of the data that we need + //to ensure. + let repl_left = self.replication_factor + - part_per_zone.values() + .filter(|x| {**x == nb_partitions}) + .count(); + if repl_left == 0 { + break; + } + + for k in zone_capacity.keys() { + if part_per_zone[k] != nb_partitions + { + part_per_zone.insert(k.to_string() , min(nb_partitions, + (nb_partitions*zone_capacity[k] as usize + *repl_left)/sum_capleft as usize)); + } + } + } + + //Now we divide the zone's partition share proportionally + //between their nodes. + + let mut part_per_nod : Vec<usize> = (0..nb_nodes).map( + |i| (part_per_zone[&node_zone[i]]*node_capacity[i] as usize)/zone_capacity[&node_zone[i]] as usize + ) + .collect(); + + //We must update the part_per_zone to make it correspond to + //part_per_nod (because of integer rounding) + part_per_zone = part_per_zone.iter().map(|(k,_)| + (k.clone(), 0)) + .collect(); + for i in 0..nb_nodes { + part_per_zone.insert( + node_zone[i].clone() , + part_per_zone[&node_zone[i]] + part_per_nod[i]); + } + + //Because of integer rounding, the total sum of part_per_nod + //might not be replication_factor*nb_partitions. + // We need at most to add 1 to every non maximal value of + // part_per_nod. The capacity of a partition will be bounded + // by the minimal value of + // node_capacity_vec[i]/part_per_nod[i] + // so we try to maximize this minimal value, keeping the + // part_per_zone capped + + let discrepancy : usize = + nb_partitions*self.replication_factor + - part_per_nod.iter().sum::<usize>(); + + //We use a stupid O(N^2) algorithm. If the number of nodes + //is actually expected to be high, one should optimize this. + + for _ in 0..discrepancy { + if let Some(idmax) = (0..nb_nodes) + .filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions) + .max_by( |i,j| + (node_capacity[*i]*(part_per_nod[*j]+1) as u32) + .cmp(&(node_capacity[*j]*(part_per_nod[*i]+1) as u32)) + ) + { + part_per_nod[idmax] += 1; + part_per_zone.insert(node_zone[idmax].clone(),part_per_zone[&node_zone[idmax]]+1); + } + } + + //We check the algorithm consistency + + let discrepancy : usize = + nb_partitions*self.replication_factor + - part_per_nod.iter().sum::<usize>(); + assert!(discrepancy == 0); + assert!(if let Some(v) = part_per_zone.values().max() + {*v <= nb_partitions} else {false} ); + + Some((part_per_nod, part_per_zone)) + } + + + //Returns vectors of zone and capacity; indexed by the same (temporary) + //indices as node_id_vec. + fn get_node_zone_capacity(& self) -> (Vec<String> , Vec<u32>) { + + let node_zone = self.node_id_vec.iter().map( + |id_nod| match self.node_role(id_nod) { + Some(NodeRole{zone,capacity:_,tags:_}) => zone.clone() , + _ => "".to_string() + } + ).collect(); + + let node_capacity = self.node_id_vec.iter().map( + |id_nod| match self.node_role(id_nod) { + Some(NodeRole{zone:_,capacity,tags:_}) => + if let Some(c)=capacity + {*c} + else {0}, + _ => 0 + } + ).collect(); + + (node_zone,node_capacity) + } - fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) { - let configured_nodes = self - .roles - .items() - .iter() - .filter(|(_id, _, info)| info.0.is_some()) - .map(|(id, _, info)| (id, info.0.as_ref().unwrap())) - .collect::<Vec<(&Uuid, &NodeRole)>>(); - - let zones = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(_id, info)| info.zone.as_str()) - .collect::<HashSet<&str>>(); - - (configured_nodes, zones) - } - - fn compute_assignation_data<'a>( - &self, - configured_nodes: &[(&'a Uuid, &'a NodeRole)], - partitions: &[PartitionAss<'a>], - ) -> (Vec<Uuid>, Vec<CompactNodeType>) { - assert!(partitions.len() == (1 << PARTITION_BITS)); - - // Make a canonical order for nodes - let mut nodes = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(id, _)| **id) - .collect::<Vec<_>>(); - let nodes_rev = nodes - .iter() - .enumerate() - .map(|(i, id)| (*id, i as CompactNodeType)) - .collect::<HashMap<Uuid, CompactNodeType>>(); - - let mut assignation_data = vec![]; - for partition in partitions.iter() { - assert!(partition.nodes.len() == self.replication_factor); - for (id, _) in partition.nodes.iter() { - assignation_data.push(*nodes_rev.get(id).unwrap()); - } - } - - nodes.extend( - configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_none()) - .map(|(id, _)| **id), - ); - - (nodes, assignation_data) - } - - fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> { - if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) { - // If the previous assignation data is correct, use that - let mut partitions = vec![]; - for i in 0..(1 << PARTITION_BITS) { - let mut part = PartitionAss::new(); - for node_i in self.ring_assignation_data - [i * self.replication_factor..(i + 1) * self.replication_factor] - .iter() - { - let node_id = &self.node_id_vec[*node_i as usize]; - - if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) { - part.nodes.push((node_id, Some(info))); - } else { - part.nodes.push((node_id, None)); - } - } - partitions.push(part); - } - partitions - } else { - // Otherwise start fresh - (0..(1 << PARTITION_BITS)) - .map(|_| PartitionAss::new()) - .collect() - } - } - - fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> { - let mut partitions_per_node = HashMap::<&Uuid, usize>::new(); - for p in partitions.iter() { - for (id, _) in p.nodes.iter() { - *partitions_per_node.entry(*id).or_insert(0) += 1; - } - } - partitions_per_node - } -} - -// ---- Internal structs for partition assignation in layout ---- - -#[derive(Clone)] -struct PartitionAss<'a> { - nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>, } -impl<'a> PartitionAss<'a> { - fn new() -> Self { - Self { nodes: Vec::new() } - } - fn nplus(&self, other: &PartitionAss<'a>) -> usize { - self.nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .count() - } - fn txtplus(&self, other: &PartitionAss<'a>) -> String { - let mut nodes = self - .nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .map(|x| format!("{:?}", x.0)) - .collect::<Vec<_>>(); - nodes.sort(); - if self.nodes.iter().any(|x| other.nodes.contains(x)) { - nodes.push("...".into()); - } - format!("[{}]", nodes.join(" ")) - } +#[cfg(test)] +mod tests { + use super::*; + use itertools::Itertools; + + fn check_assignation(cl : &ClusterLayout) { + + //Check that input data has the right format + let nb_partitions = 1usize<<PARTITION_BITS; + assert!([1,2,3].contains(&cl.replication_factor)); + assert!(cl.ring_assignation_data.len() == nb_partitions*cl.replication_factor); + + let (node_zone, node_capacity) = cl.get_node_zone_capacity(); + + + //Check that is is a correct assignation with zone redundancy + let rf = cl.replication_factor; + for i in 0..nb_partitions{ + assert!( rf == + cl.ring_assignation_data[rf*i..rf*(i+1)].iter() + .map(|nod| node_zone[*nod as usize].clone()) + .unique() + .count() ); + } + + let nb_nodes = cl.node_id_vec.len(); + //Check optimality + let node_nb_part =(0..nb_nodes).map(|i| cl.ring_assignation_data + .iter() + .filter(|x| **x==i as u8) + .count()) + .collect::<Vec::<_>>(); + + let zone_vec = node_zone.iter().unique().collect::<Vec::<_>>(); + let zone_nb_part = zone_vec.iter().map( |z| cl.ring_assignation_data.iter() + .filter(|x| node_zone[**x as usize] == **z) + .count() + ).collect::<Vec::<_>>(); + + //Check optimality of the zone assignation : would it be better for the + //node_capacity/node_partitions ratio to change the assignation of a partition + + if let Some(idmin) = (0..nb_nodes).min_by( + |i,j| (node_capacity[*i]*node_nb_part[*j] as u32) + .cmp(&(node_capacity[*j]*node_nb_part[*i] as u32)) + ){ + if let Some(idnew) = (0..nb_nodes) + .filter( |i| if let Some(p) = zone_vec.iter().position(|z| **z==node_zone[*i]) + {zone_nb_part[p] < nb_partitions } + else { false }) + .max_by( + |i,j| (node_capacity[*i]*(node_nb_part[*j]as u32+1)) + .cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1))) + ){ + assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >= + node_capacity[idnew]*node_nb_part[idmin] as u32); + } + + } + + //In every zone, check optimality of the nod assignation + for z in zone_vec { + let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z ); + if let Some(idmin) = node_of_z_iter.clone().min_by( + |i,j| (node_capacity[*i]*node_nb_part[*j] as u32) + .cmp(&(node_capacity[*j]*node_nb_part[*i] as u32)) + ){ + if let Some(idnew) = node_of_z_iter.min_by( + |i,j| (node_capacity[*i]*(node_nb_part[*j] as u32+1)) + .cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1))) + ){ + assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >= + node_capacity[idnew]*node_nb_part[idmin] as u32); + } + } + } + + } + + fn update_layout(cl : &mut ClusterLayout, node_id_vec : &Vec<u8>, + node_capacity_vec : &Vec<u32> , node_zone_vec : &Vec<String>) { + for i in 0..node_id_vec.len(){ + if let Some(x) = FixedBytes32::try_from(&[i as u8;32]) { + cl.node_id_vec.push(x); + } + + let update = cl.roles.update_mutator(cl.node_id_vec[i] , + NodeRoleV(Some(NodeRole{ + zone : (node_zone_vec[i].to_string()), + capacity : (Some(node_capacity_vec[i])), + tags : (vec![])}))); + cl.roles.merge(&update); + } + } + + #[test] + fn test_assignation() { + + let mut node_id_vec = vec![1,2,3]; + let mut node_capacity_vec = vec![4000,1000,2000]; + let mut node_zone_vec= vec!["A", "B", "C"].into_iter().map(|x| x.to_string()).collect(); + + let mut cl = ClusterLayout { + node_id_vec: vec![], + + roles : LwwMap::new(), + + replication_factor: 3, + ring_assignation_data : vec![], + version:0, + staging: LwwMap::new(), + staging_hash: sha256sum(&[1;32]), + }; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + node_id_vec = vec![1,2,3, 4, 5, 6, 7, 8, 9]; + node_capacity_vec = vec![4000,1000,1000, 3000, 1000, 1000, 2000, 10000, 2000]; + node_zone_vec= vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"].into_iter().map(|x| x.to_string()).collect(); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + node_capacity_vec = vec![4000,1000,2000, 7000, 1000, 1000, 2000, 10000, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + + node_capacity_vec = vec![4000,4000,2000, 7000, 1000, 9000, 2000, 10, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + } +} - fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool { - let min_keep_nodes_per_part = (replication_factor + 1) / 2; - let n_removed = self.nplus(other); - if self.nodes.len() <= min_keep_nodes_per_part { - n_removed == 0 - } else { - n_removed <= self.nodes.len() - min_keep_nodes_per_part - } - } - // add is a key function in creating a PartitionAss, i.e. the list of nodes - // to which a partition is assigned. It tries to add a certain node id to the - // assignation, but checks that doing so is compatible with the NECESSARY - // condition that the partition assignation must be dispersed over different - // zones (datacenters) if enough zones exist. This is why it takes a n_zones - // parameter, which is the total number of zones that have existing nodes: - // if nodes in the assignation already cover all n_zones zones, then any node - // that is not yet in the assignation can be added. Otherwise, only nodes - // that are in a new zone can be added. - fn add( - &mut self, - target_len: Option<usize>, - n_zones: usize, - node: &'a Uuid, - role: &'a NodeRole, - ) -> bool { - if let Some(tl) = target_len { - if self.nodes.len() != tl - 1 { - return false; - } - } - - let p_zns = self - .nodes - .iter() - .map(|(_id, info)| info.unwrap().zone.as_str()) - .collect::<HashSet<&str>>(); - if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str())) - || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node)) - { - self.nodes.push((node, Some(role))); - true - } else { - false - } - } -} |