diff options
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/rpc/layout.rs | 881 | ||||
-rw-r--r-- | src/util/bipartite.rs | 378 | ||||
-rw-r--r-- | src/util/lib.rs | 1 |
4 files changed, 827 insertions, 434 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index efaacf2e..654c1dc6 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -23,6 +23,7 @@ gethostname = "0.2" hex = "0.4" tracing = "0.1.30" rand = "0.8" +itertools="0.10" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } async-trait = "0.1.7" diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index b9c02c21..afd7df17 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -1,10 +1,14 @@ use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; +use std::cmp::{min}; +use std::collections::{HashMap}; use serde::{Deserialize, Serialize}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; +use garage_util::bipartite::*; + +use rand::prelude::SliceRandom; use crate::ring::*; @@ -164,445 +168,454 @@ impl ClusterLayout { true } - /// Calculate an assignation of partitions to nodes - pub fn calculate_partition_assignation(&mut self) -> bool { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); - - println!("Calculating updated partition assignation, this may take some time..."); - println!(); - - // Get old partition assignation - let old_partitions = self.parse_assignation_data(); - - // Start new partition assignation with nodes from old assignation where it is relevant - let mut partitions = old_partitions - .iter() - .map(|old_part| { - let mut new_part = PartitionAss::new(); - for node in old_part.nodes.iter() { - if let Some(role) = node.1 { - if role.capacity.is_some() { - new_part.add(None, n_zones, node.0, role); - } - } - } - new_part - }) - .collect::<Vec<_>>(); - - // In various cases, not enough nodes will have been added for all partitions - // in the step above (e.g. due to node removals, or new zones being added). - // Here we add more nodes to make a complete (but sub-optimal) assignation, - // using an initial partition assignation that is calculated using the multi-dc maglev trick - match self.initial_partition_assignation() { - Some(initial_partitions) => { - for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { - for (id, info) in ipart.nodes.iter() { - if part.nodes.len() < self.replication_factor { - part.add(None, n_zones, id, info.unwrap()); - } - } - assert!(part.nodes.len() == self.replication_factor); - } - } - None => { - // Not enough nodes in cluster to build a correct assignation. - // Signal it by returning an error. - return false; - } - } - - // Calculate how many partitions each node should ideally store, - // and how many partitions they are storing with the current assignation - // This defines our target for which we will optimize in the following loop. - let total_capacity = configured_nodes - .iter() - .map(|(_, info)| info.capacity.unwrap_or(0)) - .sum::<u32>() as usize; - let total_partitions = self.replication_factor * (1 << PARTITION_BITS); - let target_partitions_per_node = configured_nodes - .iter() - .map(|(id, info)| { - ( - *id, - info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity, - ) - }) - .collect::<HashMap<&Uuid, usize>>(); - - let mut partitions_per_node = self.partitions_per_node(&partitions[..]); - - println!("Target number of partitions per node:"); - for (node, npart) in target_partitions_per_node.iter() { - println!("{:?}\t{}", node, npart); - } - println!(); - - // Shuffle partitions between nodes so that nodes will reach (or better approach) - // their target number of stored partitions - loop { - let mut option = None; - for (i, part) in partitions.iter_mut().enumerate() { - for (irm, (idrm, _)) in part.nodes.iter().enumerate() { - let errratio = |node, parts| { - let tgt = *target_partitions_per_node.get(node).unwrap() as f32; - (parts - tgt) / tgt - }; - let square = |x| x * x; - - let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32; - - for (idadd, infoadd) in configured_nodes.iter() { - // skip replacing a node by itself - // and skip replacing by gateway nodes - if idadd == idrm || infoadd.capacity.is_none() { - continue; - } - - // We want to try replacing node idrm by node idadd - // if that brings us close to our goal. - let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32; - let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd)); - let newcost = - square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.)); - if newcost >= oldcost { - // not closer to our goal - continue; - } - let gain = oldcost - newcost; - - let mut newpart = part.clone(); - - newpart.nodes.remove(irm); - if !newpart.add(None, n_zones, idadd, infoadd) { - continue; - } - assert!(newpart.nodes.len() == self.replication_factor); - - if !old_partitions[i] - .is_valid_transition_to(&newpart, self.replication_factor) - { - continue; - } - - if option - .as_ref() - .map(|(old_gain, _, _, _, _)| gain > *old_gain) - .unwrap_or(true) - { - option = Some((gain, i, idadd, idrm, newpart)); - } - } - } - } - if let Some((_gain, i, idadd, idrm, newpart)) = option { - *partitions_per_node.entry(idadd).or_insert(0) += 1; - *partitions_per_node.get_mut(idrm).unwrap() -= 1; - partitions[i] = newpart; - } else { - break; - } - } - // Check we completed the assignation correctly - // (this is a set of checks for the algorithm's consistency) - assert!(partitions.len() == (1 << PARTITION_BITS)); - assert!(partitions - .iter() - .all(|p| p.nodes.len() == self.replication_factor)); - - let new_partitions_per_node = self.partitions_per_node(&partitions[..]); - assert!(new_partitions_per_node == partitions_per_node); - - // Show statistics - println!("New number of partitions per node:"); - for (node, npart) in partitions_per_node.iter() { - let tgt = *target_partitions_per_node.get(node).unwrap(); - let pct = 100f32 * (*npart as f32) / (tgt as f32); - println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt); - } - println!(); - - let mut diffcount = HashMap::new(); - for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) { - let nminus = oldpart.txtplus(newpart); - let nplus = newpart.txtplus(oldpart); - if nminus != "[...]" || nplus != "[...]" { - let tup = (nminus, nplus); - *diffcount.entry(tup).or_insert(0) += 1; - } - } - if diffcount.is_empty() { - println!("No data will be moved between nodes."); - } else { - let mut diffcount = diffcount.into_iter().collect::<Vec<_>>(); - diffcount.sort(); - println!("Number of partitions that move:"); - for ((nminus, nplus), npart) in diffcount { - println!("\t{}\t{} -> {}", npart, nminus, nplus); - } - } - println!(); - - // Calculate and save new assignation data - let (nodes, assignation_data) = - self.compute_assignation_data(&configured_nodes[..], &partitions[..]); - - self.node_id_vec = nodes; - self.ring_assignation_data = assignation_data; - - true - } - - fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); - - // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) - let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); - - // Prepare ring - let mut partitions: Vec<PartitionAss> = partitions_idx - .iter() - .map(|_i| PartitionAss::new()) - .collect::<Vec<_>>(); - - // Create MagLev priority queues for each node - let mut queues = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(node_id, node_info)| { - let mut parts = partitions_idx - .iter() - .map(|i| { - let part_data = - [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat(); - (*i, fasthash(&part_data[..])) - }) - .collect::<Vec<_>>(); - parts.sort_by_key(|(_i, h)| *h); - let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>(); - (node_id, node_info, parts_i, 0) - }) - .collect::<Vec<_>>(); - - let max_capacity = configured_nodes - .iter() - .filter_map(|(_, node_info)| node_info.capacity) - .fold(0, std::cmp::max); - - // Fill up ring - for rep in 0..self.replication_factor { - queues.sort_by_key(|(ni, _np, _q, _p)| { - let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); - fasthash(&queue_data[..]) - }); - - for (_, _, _, pos) in queues.iter_mut() { - *pos = 0; - } - - let mut remaining = partitions_idx.len(); - while remaining > 0 { - let remaining0 = remaining; - for i_round in 0..max_capacity { - for (node_id, node_info, q, pos) in queues.iter_mut() { - if i_round >= node_info.capacity.unwrap() { - continue; - } - for (pos2, &qv) in q.iter().enumerate().skip(*pos) { - if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) { - remaining -= 1; - *pos = pos2 + 1; - break; - } - } - } - } - if remaining == remaining0 { - // No progress made, exit - return None; - } - } - } - - Some(partitions) - } + /// This function calculates a new partition-to-node assignation. + /// The computed assignation maximizes the capacity of a + /// partition (assuming all partitions have the same size). + /// Among such optimal assignation, it minimizes the distance to + /// the former assignation (if any) to minimize the amount of + /// data to be moved. A heuristic ensures node triplets + /// dispersion (in garage_util::bipartite::optimize_matching()). + pub fn calculate_partition_assignation(&mut self) -> bool { + + //The nodes might have been updated, some might have been deleted. + //So we need to first update the list of nodes and retrieve the + //assignation. + let old_node_assignation = self.update_nodes_and_ring(); + + let (node_zone, _) = self.get_node_zone_capacity(); + + //We compute the optimal number of partition to assign to + //every node and zone. + if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions(){ + //We collect part_per_zone in a vec to not rely on the + //arbitrary order in which elements are iterated in + //Hashmap::iter() + let part_per_zone_vec = part_per_zone.iter() + .map(|(x,y)| (x.clone(),*y)) + .collect::<Vec<(String,usize)>>(); + //We create an indexing of the zones + let mut zone_id = HashMap::<String,usize>::new(); + for i in 0..part_per_zone_vec.len(){ + zone_id.insert(part_per_zone_vec[i].0.clone(), i); + } + + //We compute a candidate for the new partition to zone + //assignation. + let nb_zones = part_per_zone.len(); + let nb_nodes = part_per_nod.len(); + let nb_partitions = 1<<PARTITION_BITS; + let left_cap_vec = vec![self.replication_factor as u32 ; nb_partitions]; + let right_cap_vec = part_per_zone_vec.iter().map(|(_,y)| *y as u32) + .collect(); + let mut zone_assignation = + dinic_compute_matching(left_cap_vec, right_cap_vec); + + + //We create the structure for the partition-to-node assignation. + let mut node_assignation = + vec![vec![None; self.replication_factor ];nb_partitions]; + //We will decrement part_per_nod to keep track of the number + //of partitions that we still have to associate. + let mut part_per_nod = part_per_nod.clone(); + + //We minimize the distance to the former assignation(if any) + + //We get the id of the zones of the former assignation + //(and the id no_zone if there is no node assignated) + let no_zone = part_per_zone_vec.len(); + let old_zone_assignation : Vec<Vec<usize>> = + old_node_assignation.iter().map(|x| x.iter().map( + |id| match *id { Some(i) => zone_id[&node_zone[i]] , + None => no_zone } + ).collect()).collect(); + + //We minimize the distance to the former zone assignation + zone_assignation = optimize_matching( + &old_zone_assignation, &zone_assignation, nb_zones+1); //+1 for no_zone + + //We need to assign partitions to nodes in their zone + //We first put the nodes assignation that can stay the same + for i in 0..nb_partitions{ + for j in 0..self.replication_factor { + if let Some(Some(former_node)) = old_node_assignation[i].iter().find( + |x| if let Some(id) = x { + zone_id[&node_zone[*id]] == zone_assignation[i][j] + } + else {false} + ) + { + if part_per_nod[*former_node] > 0 { + node_assignation[i][j] = Some(*former_node); + part_per_nod[*former_node] -= 1; + } + } + } + } + + + //We complete the assignation of partitions to nodes + let mut rng = rand::thread_rng(); + for i in 0..nb_partitions { + for j in 0..self.replication_factor { + if node_assignation[i][j] == None { + let possible_nodes : Vec<usize> = (0..nb_nodes) + .filter( + |id| zone_id[&node_zone[*id]] == zone_assignation[i][j] + && part_per_nod[*id] > 0).collect(); + assert!(possible_nodes.len()>0); + //We randomly pick a node + if let Some(nod) = possible_nodes.choose(&mut rng){ + node_assignation[i][j] = Some(*nod); + part_per_nod[*nod] -= 1; + } + } + } + } + + //We write the assignation in the 1D table + self.ring_assignation_data = Vec::<CompactNodeType>::new(); + for i in 0..nb_partitions{ + for j in 0..self.replication_factor { + if let Some(id) = node_assignation[i][j] { + self.ring_assignation_data.push(id as CompactNodeType); + } + else {assert!(false)} + } + } + + true + } + else { false } + } + + /// The LwwMap of node roles might have changed. This function updates the node_id_vec + /// and returns the assignation given by ring, with the new indices of the nodes, and + /// None of the node is not present anymore. + /// We work with the assumption that only this function and calculate_new_assignation + /// do modify assignation_ring and node_id_vec. + fn update_nodes_and_ring(&mut self) -> Vec<Vec<Option<usize>>> { + let nb_partitions = 1usize<<PARTITION_BITS; + let mut node_assignation = + vec![vec![None; self.replication_factor ];nb_partitions]; + let rf = self.replication_factor; + let ring = &self.ring_assignation_data; + + let new_node_id_vec : Vec::<Uuid> = self.roles.items().iter() + .map(|(k, _, _)| *k) + .collect(); + + if ring.len() == rf*nb_partitions { + for i in 0..nb_partitions { + for j in 0..self.replication_factor { + node_assignation[i][j] = new_node_id_vec.iter() + .position(|id| *id == self.node_id_vec[ring[i*rf + j] as usize]); + } + } + } + + self.node_id_vec = new_node_id_vec; + self.ring_assignation_data = vec![]; + return node_assignation; + } + + ///This function compute the number of partition to assign to + ///every node and zone, so that every partition is replicated + ///self.replication_factor times and the capacity of a partition + ///is maximized. + fn optimal_proportions(&mut self) -> Option<(Vec<usize>, HashMap<String, usize>)> { + + let mut zone_capacity :HashMap<String, u32>= HashMap::new(); + + let (node_zone, node_capacity) = self.get_node_zone_capacity(); + let nb_nodes = self.node_id_vec.len(); + + for i in 0..nb_nodes + { + if zone_capacity.contains_key(&node_zone[i]) { + zone_capacity.insert(node_zone[i].clone(), zone_capacity[&node_zone[i]] + node_capacity[i]); + } + else{ + zone_capacity.insert(node_zone[i].clone(), node_capacity[i]); + } + } + + //Compute the optimal number of partitions per zone + let sum_capacities: u32 =zone_capacity.values().sum(); + + if sum_capacities <= 0 { + println!("No storage capacity in the network."); + return None; + } + + let nb_partitions = 1<<PARTITION_BITS; + + //Initially we would like to use zones porportionally to + //their capacity. + //However, a large zone can be associated to at most + //nb_partitions to ensure replication of the date. + //So we take the min with nb_partitions: + let mut part_per_zone : HashMap<String, usize> = + zone_capacity.iter() + .map(|(k, v)| (k.clone(), min(nb_partitions, + (self.replication_factor*nb_partitions + **v as usize)/sum_capacities as usize) ) ).collect(); + + //The replication_factor-1 upper bounds the number of + //part_per_zones that are greater than nb_partitions + for _ in 1..self.replication_factor { + //The number of partitions that are not assignated to + //a zone that takes nb_partitions. + let sum_capleft : u32 = zone_capacity.keys() + .filter(| k | {part_per_zone[*k] < nb_partitions} ) + .map(|k| zone_capacity[k]).sum(); + + //The number of replication of the data that we need + //to ensure. + let repl_left = self.replication_factor + - part_per_zone.values() + .filter(|x| {**x == nb_partitions}) + .count(); + if repl_left == 0 { + break; + } + + for k in zone_capacity.keys() { + if part_per_zone[k] != nb_partitions + { + part_per_zone.insert(k.to_string() , min(nb_partitions, + (nb_partitions*zone_capacity[k] as usize + *repl_left)/sum_capleft as usize)); + } + } + } + + //Now we divide the zone's partition share proportionally + //between their nodes. + + let mut part_per_nod : Vec<usize> = (0..nb_nodes).map( + |i| (part_per_zone[&node_zone[i]]*node_capacity[i] as usize)/zone_capacity[&node_zone[i]] as usize + ) + .collect(); + + //We must update the part_per_zone to make it correspond to + //part_per_nod (because of integer rounding) + part_per_zone = part_per_zone.iter().map(|(k,_)| + (k.clone(), 0)) + .collect(); + for i in 0..nb_nodes { + part_per_zone.insert( + node_zone[i].clone() , + part_per_zone[&node_zone[i]] + part_per_nod[i]); + } + + //Because of integer rounding, the total sum of part_per_nod + //might not be replication_factor*nb_partitions. + // We need at most to add 1 to every non maximal value of + // part_per_nod. The capacity of a partition will be bounded + // by the minimal value of + // node_capacity_vec[i]/part_per_nod[i] + // so we try to maximize this minimal value, keeping the + // part_per_zone capped + + let discrepancy : usize = + nb_partitions*self.replication_factor + - part_per_nod.iter().sum::<usize>(); + + //We use a stupid O(N^2) algorithm. If the number of nodes + //is actually expected to be high, one should optimize this. + + for _ in 0..discrepancy { + if let Some(idmax) = (0..nb_nodes) + .filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions) + .max_by( |i,j| + (node_capacity[*i]*(part_per_nod[*j]+1) as u32) + .cmp(&(node_capacity[*j]*(part_per_nod[*i]+1) as u32)) + ) + { + part_per_nod[idmax] += 1; + part_per_zone.insert(node_zone[idmax].clone(),part_per_zone[&node_zone[idmax]]+1); + } + } + + //We check the algorithm consistency + + let discrepancy : usize = + nb_partitions*self.replication_factor + - part_per_nod.iter().sum::<usize>(); + assert!(discrepancy == 0); + assert!(if let Some(v) = part_per_zone.values().max() + {*v <= nb_partitions} else {false} ); + + Some((part_per_nod, part_per_zone)) + } + + + //Returns vectors of zone and capacity; indexed by the same (temporary) + //indices as node_id_vec. + fn get_node_zone_capacity(& self) -> (Vec<String> , Vec<u32>) { + + let node_zone = self.node_id_vec.iter().map( + |id_nod| match self.node_role(id_nod) { + Some(NodeRole{zone,capacity:_,tags:_}) => zone.clone() , + _ => "".to_string() + } + ).collect(); + + let node_capacity = self.node_id_vec.iter().map( + |id_nod| match self.node_role(id_nod) { + Some(NodeRole{zone:_,capacity,tags:_}) => + if let Some(c)=capacity + {*c} + else {0}, + _ => 0 + } + ).collect(); + + (node_zone,node_capacity) + } - fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) { - let configured_nodes = self - .roles - .items() - .iter() - .filter(|(_id, _, info)| info.0.is_some()) - .map(|(id, _, info)| (id, info.0.as_ref().unwrap())) - .collect::<Vec<(&Uuid, &NodeRole)>>(); - - let zones = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(_id, info)| info.zone.as_str()) - .collect::<HashSet<&str>>(); - - (configured_nodes, zones) - } - - fn compute_assignation_data<'a>( - &self, - configured_nodes: &[(&'a Uuid, &'a NodeRole)], - partitions: &[PartitionAss<'a>], - ) -> (Vec<Uuid>, Vec<CompactNodeType>) { - assert!(partitions.len() == (1 << PARTITION_BITS)); - - // Make a canonical order for nodes - let mut nodes = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(id, _)| **id) - .collect::<Vec<_>>(); - let nodes_rev = nodes - .iter() - .enumerate() - .map(|(i, id)| (*id, i as CompactNodeType)) - .collect::<HashMap<Uuid, CompactNodeType>>(); - - let mut assignation_data = vec![]; - for partition in partitions.iter() { - assert!(partition.nodes.len() == self.replication_factor); - for (id, _) in partition.nodes.iter() { - assignation_data.push(*nodes_rev.get(id).unwrap()); - } - } - - nodes.extend( - configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_none()) - .map(|(id, _)| **id), - ); - - (nodes, assignation_data) - } - - fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> { - if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) { - // If the previous assignation data is correct, use that - let mut partitions = vec![]; - for i in 0..(1 << PARTITION_BITS) { - let mut part = PartitionAss::new(); - for node_i in self.ring_assignation_data - [i * self.replication_factor..(i + 1) * self.replication_factor] - .iter() - { - let node_id = &self.node_id_vec[*node_i as usize]; - - if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) { - part.nodes.push((node_id, Some(info))); - } else { - part.nodes.push((node_id, None)); - } - } - partitions.push(part); - } - partitions - } else { - // Otherwise start fresh - (0..(1 << PARTITION_BITS)) - .map(|_| PartitionAss::new()) - .collect() - } - } - - fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> { - let mut partitions_per_node = HashMap::<&Uuid, usize>::new(); - for p in partitions.iter() { - for (id, _) in p.nodes.iter() { - *partitions_per_node.entry(*id).or_insert(0) += 1; - } - } - partitions_per_node - } -} - -// ---- Internal structs for partition assignation in layout ---- - -#[derive(Clone)] -struct PartitionAss<'a> { - nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>, } -impl<'a> PartitionAss<'a> { - fn new() -> Self { - Self { nodes: Vec::new() } - } - fn nplus(&self, other: &PartitionAss<'a>) -> usize { - self.nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .count() - } - fn txtplus(&self, other: &PartitionAss<'a>) -> String { - let mut nodes = self - .nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .map(|x| format!("{:?}", x.0)) - .collect::<Vec<_>>(); - nodes.sort(); - if self.nodes.iter().any(|x| other.nodes.contains(x)) { - nodes.push("...".into()); - } - format!("[{}]", nodes.join(" ")) - } +#[cfg(test)] +mod tests { + use super::*; + use itertools::Itertools; + + fn check_assignation(cl : &ClusterLayout) { + + //Check that input data has the right format + let nb_partitions = 1usize<<PARTITION_BITS; + assert!([1,2,3].contains(&cl.replication_factor)); + assert!(cl.ring_assignation_data.len() == nb_partitions*cl.replication_factor); + + let (node_zone, node_capacity) = cl.get_node_zone_capacity(); + + + //Check that is is a correct assignation with zone redundancy + let rf = cl.replication_factor; + for i in 0..nb_partitions{ + assert!( rf == + cl.ring_assignation_data[rf*i..rf*(i+1)].iter() + .map(|nod| node_zone[*nod as usize].clone()) + .unique() + .count() ); + } + + let nb_nodes = cl.node_id_vec.len(); + //Check optimality + let node_nb_part =(0..nb_nodes).map(|i| cl.ring_assignation_data + .iter() + .filter(|x| **x==i as u8) + .count()) + .collect::<Vec::<_>>(); + + let zone_vec = node_zone.iter().unique().collect::<Vec::<_>>(); + let zone_nb_part = zone_vec.iter().map( |z| cl.ring_assignation_data.iter() + .filter(|x| node_zone[**x as usize] == **z) + .count() + ).collect::<Vec::<_>>(); + + //Check optimality of the zone assignation : would it be better for the + //node_capacity/node_partitions ratio to change the assignation of a partition + + if let Some(idmin) = (0..nb_nodes).min_by( + |i,j| (node_capacity[*i]*node_nb_part[*j] as u32) + .cmp(&(node_capacity[*j]*node_nb_part[*i] as u32)) + ){ + if let Some(idnew) = (0..nb_nodes) + .filter( |i| if let Some(p) = zone_vec.iter().position(|z| **z==node_zone[*i]) + {zone_nb_part[p] < nb_partitions } + else { false }) + .max_by( + |i,j| (node_capacity[*i]*(node_nb_part[*j]as u32+1)) + .cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1))) + ){ + assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >= + node_capacity[idnew]*node_nb_part[idmin] as u32); + } + + } + + //In every zone, check optimality of the nod assignation + for z in zone_vec { + let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z ); + if let Some(idmin) = node_of_z_iter.clone().min_by( + |i,j| (node_capacity[*i]*node_nb_part[*j] as u32) + .cmp(&(node_capacity[*j]*node_nb_part[*i] as u32)) + ){ + if let Some(idnew) = node_of_z_iter.min_by( + |i,j| (node_capacity[*i]*(node_nb_part[*j] as u32+1)) + .cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1))) + ){ + assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >= + node_capacity[idnew]*node_nb_part[idmin] as u32); + } + } + } + + } + + fn update_layout(cl : &mut ClusterLayout, node_id_vec : &Vec<u8>, + node_capacity_vec : &Vec<u32> , node_zone_vec : &Vec<String>) { + for i in 0..node_id_vec.len(){ + if let Some(x) = FixedBytes32::try_from(&[i as u8;32]) { + cl.node_id_vec.push(x); + } + + let update = cl.roles.update_mutator(cl.node_id_vec[i] , + NodeRoleV(Some(NodeRole{ + zone : (node_zone_vec[i].to_string()), + capacity : (Some(node_capacity_vec[i])), + tags : (vec![])}))); + cl.roles.merge(&update); + } + } + + #[test] + fn test_assignation() { + + let mut node_id_vec = vec![1,2,3]; + let mut node_capacity_vec = vec![4000,1000,2000]; + let mut node_zone_vec= vec!["A", "B", "C"].into_iter().map(|x| x.to_string()).collect(); + + let mut cl = ClusterLayout { + node_id_vec: vec![], + + roles : LwwMap::new(), + + replication_factor: 3, + ring_assignation_data : vec![], + version:0, + staging: LwwMap::new(), + staging_hash: sha256sum(&[1;32]), + }; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + node_id_vec = vec![1,2,3, 4, 5, 6, 7, 8, 9]; + node_capacity_vec = vec![4000,1000,1000, 3000, 1000, 1000, 2000, 10000, 2000]; + node_zone_vec= vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"].into_iter().map(|x| x.to_string()).collect(); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + node_capacity_vec = vec![4000,1000,2000, 7000, 1000, 1000, 2000, 10000, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + + node_capacity_vec = vec![4000,4000,2000, 7000, 1000, 9000, 2000, 10, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + } +} - fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool { - let min_keep_nodes_per_part = (replication_factor + 1) / 2; - let n_removed = self.nplus(other); - if self.nodes.len() <= min_keep_nodes_per_part { - n_removed == 0 - } else { - n_removed <= self.nodes.len() - min_keep_nodes_per_part - } - } - // add is a key function in creating a PartitionAss, i.e. the list of nodes - // to which a partition is assigned. It tries to add a certain node id to the - // assignation, but checks that doing so is compatible with the NECESSARY - // condition that the partition assignation must be dispersed over different - // zones (datacenters) if enough zones exist. This is why it takes a n_zones - // parameter, which is the total number of zones that have existing nodes: - // if nodes in the assignation already cover all n_zones zones, then any node - // that is not yet in the assignation can be added. Otherwise, only nodes - // that are in a new zone can be added. - fn add( - &mut self, - target_len: Option<usize>, - n_zones: usize, - node: &'a Uuid, - role: &'a NodeRole, - ) -> bool { - if let Some(tl) = target_len { - if self.nodes.len() != tl - 1 { - return false; - } - } - - let p_zns = self - .nodes - .iter() - .map(|(_id, info)| info.unwrap().zone.as_str()) - .collect::<HashSet<&str>>(); - if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str())) - || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node)) - { - self.nodes.push((node, Some(role))); - true - } else { - false - } - } -} diff --git a/src/util/bipartite.rs b/src/util/bipartite.rs new file mode 100644 index 00000000..aec7b042 --- /dev/null +++ b/src/util/bipartite.rs @@ -0,0 +1,378 @@ +/* + * This module deals with graph algorithm in complete bipartite + * graphs. It is used in layout.rs to build the partition to node + * assignation. + * */ + +use std::cmp::{min,max}; +use std::collections::VecDeque; +use rand::prelude::SliceRandom; + +//Graph data structure for the flow algorithm. +#[derive(Clone,Copy,Debug)] +struct EdgeFlow{ + c : i32, + flow : i32, + v : usize, + rev : usize, +} + +//Graph data structure for the detection of positive cycles. +#[derive(Clone,Copy,Debug)] +struct WeightedEdge{ + w : i32, + u : usize, + v : usize, +} + + +/* This function takes two matchings (old_match and new_match) in a + * complete bipartite graph. It returns a matching that has the + * same degree as new_match at every vertex, and that is as close + * as possible to old_match. + * */ +pub fn optimize_matching( old_match : &Vec<Vec<usize>> , + new_match : &Vec<Vec<usize>> , + nb_right : usize ) + -> Vec<Vec<usize>> { + let nb_left = old_match.len(); + let ed = WeightedEdge{w:-1,u:0,v:0}; + let mut edge_vec = vec![ed ; nb_left*nb_right]; + + //We build the complete bipartite graph structure, represented + //by the list of all edges. + for i in 0..nb_left { + for j in 0..nb_right{ + edge_vec[i*nb_right + j].u = i; + edge_vec[i*nb_right + j].v = nb_left+j; + } + } + + for i in 0..edge_vec.len() { + //We add the old matchings + if old_match[edge_vec[i].u].contains(&(edge_vec[i].v-nb_left)) { + edge_vec[i].w *= -1; + } + //We add the new matchings + if new_match[edge_vec[i].u].contains(&(edge_vec[i].v-nb_left)) { + (edge_vec[i].u,edge_vec[i].v) = + (edge_vec[i].v,edge_vec[i].u); + edge_vec[i].w *= -1; + } + } + //Now edge_vec is a graph where edges are oriented LR if we + //can add them to new_match, and RL otherwise. If + //adding/removing them makes the matching closer to old_match + //they have weight 1; and -1 otherwise. + + //We shuffle the edge list so that there is no bias depending in + //partitions/zone label in the triplet dispersion + let mut rng = rand::thread_rng(); + edge_vec.shuffle(&mut rng); + + //Discovering and flipping a cycle with positive weight in this + //graph will make the matching closer to old_match. + //We use Bellman Ford algorithm to discover positive cycles + loop{ + if let Some(cycle) = positive_cycle(&edge_vec, nb_left, nb_right) { + for i in cycle { + //We flip the edges of the cycle. + (edge_vec[i].u,edge_vec[i].v) = + (edge_vec[i].v,edge_vec[i].u); + edge_vec[i].w *= -1; + } + } + else { + //If there is no cycle, we return the optimal matching. + break; + } + } + + //The optimal matching is build from the graph structure. + let mut matching = vec![Vec::<usize>::new() ; nb_left]; + for e in edge_vec { + if e.u > e.v { + matching[e.v].push(e.u-nb_left); + } + } + matching +} + +//This function finds a positive cycle in a bipartite wieghted graph. +fn positive_cycle( edge_vec : &Vec<WeightedEdge>, nb_left : usize, + nb_right : usize) -> Option<Vec<usize>> { + let nb_side_min = min(nb_left, nb_right); + let nb_vertices = nb_left+nb_right; + let weight_lowerbound = -((nb_left +nb_right) as i32) -1; + let mut accessed = vec![false ; nb_left]; + + //We try to find a positive cycle accessible from the left + //vertex i. + for i in 0..nb_left{ + if accessed[i] { + continue; + } + let mut weight =vec![weight_lowerbound ; nb_vertices]; + let mut prev =vec![ edge_vec.len() ; nb_vertices]; + weight[i] = 0; + //We compute largest weighted paths from i. + //Since the graph is bipartite, any simple cycle has length + //at most 2*nb_side_min. In the general Bellman-Ford + //algorithm, the bound here is the number of vertices. Since + //the number of partitions can be much larger than the + //number of nodes, we optimize that. + for _ in 0..(2*nb_side_min) { + for j in 0..edge_vec.len() { + let e = edge_vec[j]; + if weight[e.v] < weight[e.u]+e.w { + weight[e.v] = weight[e.u]+e.w; + prev[e.v] = j; + } + } + } + //We update the accessed table + for i in 0..nb_left { + if weight[i] > weight_lowerbound { + accessed[i] = true; + } + } + //We detect positive cycle + for e in edge_vec { + if weight[e.v] < weight[e.u]+e.w { + //it means e is on a path branching from a positive cycle + let mut was_seen = vec![false ; nb_vertices]; + let mut curr = e.u; + //We track back with prev until we reach the cycle. + while !was_seen[curr]{ + was_seen[curr] = true; + curr = edge_vec[prev[curr]].u; + } + //Now curr is on the cycle. We collect the edges ids. + let mut cycle = Vec::<usize>::new(); + cycle.push(prev[curr]); + let mut cycle_vert = edge_vec[prev[curr]].u; + while cycle_vert != curr { + cycle.push(prev[cycle_vert]); + cycle_vert = edge_vec[prev[cycle_vert]].u; + } + + return Some(cycle); + } + } + } + + None +} + + +// This function takes two arrays of capacity and computes the +// maximal matching in the complete bipartite graph such that the +// left vertex i is matched to left_cap_vec[i] right vertices, and +// the right vertex j is matched to right_cap_vec[j] left vertices. +// To do so, we use Dinic's maximum flow algorithm. +pub fn dinic_compute_matching( left_cap_vec : Vec<u32>, + right_cap_vec : Vec<u32>) -> Vec< Vec<usize> > +{ + let mut graph = Vec::<Vec::<EdgeFlow> >::new(); + let ed = EdgeFlow{c:0,flow:0,v:0, rev:0}; + + // 0 will be the source + graph.push(vec![ed ; left_cap_vec.len()]); + for i in 0..left_cap_vec.len() + { + graph[0][i].c = left_cap_vec[i] as i32; + graph[0][i].v = i+2; + graph[0][i].rev = 0; + } + + //1 will be the sink + graph.push(vec![ed ; right_cap_vec.len()]); + for i in 0..right_cap_vec.len() + { + graph[1][i].c = right_cap_vec[i] as i32; + graph[1][i].v = i+2+left_cap_vec.len(); + graph[1][i].rev = 0; + } + + //we add left vertices + for i in 0..left_cap_vec.len() { + graph.push(vec![ed ; 1+right_cap_vec.len()]); + graph[i+2][0].c = 0; //directed + graph[i+2][0].v = 0; + graph[i+2][0].rev = i; + + for j in 0..right_cap_vec.len() { + graph[i+2][j+1].c = 1; + graph[i+2][j+1].v = 2+left_cap_vec.len()+j; + graph[i+2][j+1].rev = i+1; + } + } + + //we add right vertices + for i in 0..right_cap_vec.len() { + let lft_ln = left_cap_vec.len(); + graph.push(vec![ed ; 1+lft_ln]); + graph[i+lft_ln+2][0].c = graph[1][i].c; + graph[i+lft_ln+2][0].v = 1; + graph[i+lft_ln+2][0].rev = i; + + for j in 0..left_cap_vec.len() { + graph[i+2+lft_ln][j+1].c = 0; //directed + graph[i+2+lft_ln][j+1].v = j+2; + graph[i+2+lft_ln][j+1].rev = i+1; + } + } + + //To ensure the dispersion of the triplets generated by the + //assignation, we shuffle the neighbours of the nodes. Hence, + //left vertices do not consider the right ones in the same order. + let mut rng = rand::thread_rng(); + for i in 0..graph.len() { + graph[i].shuffle(&mut rng); + //We need to update the ids of the reverse edges. + for j in 0..graph[i].len() { + let target_v = graph[i][j].v; + let target_rev = graph[i][j].rev; + graph[target_v][target_rev].rev = j; + } + } + + let nb_vertices = graph.len(); + + //We run Dinic's max flow algorithm + loop{ + //We build the level array from Dinic's algorithm. + let mut level = vec![-1; nb_vertices]; + + let mut fifo = VecDeque::new(); + fifo.push_back((0,0)); + while !fifo.is_empty() { + if let Some((id,lvl)) = fifo.pop_front(){ + if level[id] == -1 { + level[id] = lvl; + for e in graph[id].iter(){ + if e.c-e.flow > 0{ + fifo.push_back((e.v,lvl+1)); + } + } + } + } + } + if level[1] == -1 { + //There is no residual flow + break; + } + + //Now we run DFS respecting the level array + let mut next_nbd = vec![0; nb_vertices]; + let mut lifo = VecDeque::new(); + + let flow_upper_bound; + if let Some(x) = left_cap_vec.iter().max() { + flow_upper_bound=*x as i32; + } + else { + flow_upper_bound = 0; + assert!(false); + } + + lifo.push_back((0,flow_upper_bound)); + + loop + { + if let Some((id_tmp, f_tmp)) = lifo.back() { + let id = *id_tmp; + let f = *f_tmp; + if id == 1 { + //The DFS reached the sink, we can add a + //residual flow. + lifo.pop_back(); + while !lifo.is_empty() { + if let Some((id,_)) = lifo.pop_back(){ + let nbd=next_nbd[id]; + graph[id][nbd].flow += f; + let id_v = graph[id][nbd].v; + let nbd_v = graph[id][nbd].rev; + graph[id_v][nbd_v].flow -= f; + } + } + lifo.push_back((0,flow_upper_bound)); + continue; + } + //else we did not reach the sink + let nbd = next_nbd[id]; + if nbd >= graph[id].len() { + //There is nothing to explore from id anymore + lifo.pop_back(); + if let Some((parent, _)) = lifo.back(){ + next_nbd[*parent] +=1; + } + continue; + } + //else we can try to send flow from id to its nbd + let new_flow = min(f,graph[id][nbd].c + - graph[id][nbd].flow); + if level[graph[id][nbd].v] <= level[id] || + new_flow == 0 { + //We cannot send flow to nbd. + next_nbd[id] += 1; + continue; + } + //otherwise, we send flow to nbd. + lifo.push_back((graph[id][nbd].v, new_flow)); + } + else { + break; + } + } + } + + //We return the association + let assoc_table = (0..left_cap_vec.len()).map( + |id| graph[id+2].iter() + .filter(|e| e.flow > 0) + .map( |e| e.v-2-left_cap_vec.len()) + .collect()).collect(); + + //consistency check + + //it is a flow + for i in 3..graph.len(){ + assert!( graph[i].iter().map(|e| e.flow).sum::<i32>() == 0); + for e in graph[i].iter(){ + assert!(e.flow + graph[e.v][e.rev].flow == 0); + } + } + + //it solves the matching problem + for i in 0..left_cap_vec.len(){ + assert!(left_cap_vec[i] as i32 == + graph[i+2].iter().map(|e| max(0,e.flow)).sum::<i32>()); + } + for i in 0..right_cap_vec.len(){ + assert!(right_cap_vec[i] as i32 == + graph[i+2+left_cap_vec.len()].iter() + .map(|e| max(0,e.flow)).sum::<i32>()); + } + + + assoc_table +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_flow() { + let left_vec = vec![3;8]; + let right_vec = vec![0,4,8,4,8]; + //There are asserts in the function that computes the flow + let _ = dinic_compute_matching(left_vec, right_vec); + } + + //maybe add tests relative to the matching optilization ? +} + + diff --git a/src/util/lib.rs b/src/util/lib.rs index e83fc2e6..891549c3 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -4,6 +4,7 @@ extern crate tracing; pub mod background; +pub mod bipartite; pub mod config; pub mod crdt; pub mod data; |