diff options
author | Alex Auvolat <alex@adnab.me> | 2022-05-01 09:57:05 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-05-01 09:57:05 +0200 |
commit | 2aeaddd5e2e1911b084f6d49ccb2236b7fec31af (patch) | |
tree | 7bf19ac4ba213e46194d8a71872bdbcfbeab19d8 | |
parent | c1d1646c4d62300ec48503aa65623ee7e3df8685 (diff) | |
download | garage-2aeaddd5e2e1911b084f6d49ccb2236b7fec31af.tar.gz garage-2aeaddd5e2e1911b084f6d49ccb2236b7fec31af.zip |
Apply cargo fmt
-rw-r--r-- | src/rpc/layout.rs | 940 | ||||
-rw-r--r-- | src/util/bipartite.rs | 694 |
2 files changed, 842 insertions, 792 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index afd7df17..ac31da72 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -1,12 +1,12 @@ +use std::cmp::min; use std::cmp::Ordering; -use std::cmp::{min}; -use std::collections::{HashMap}; +use std::collections::HashMap; use serde::{Deserialize, Serialize}; +use garage_util::bipartite::*; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; -use garage_util::bipartite::*; use rand::prelude::SliceRandom; @@ -168,454 +168,506 @@ impl ClusterLayout { true } + /// This function calculates a new partition-to-node assignation. + /// The computed assignation maximizes the capacity of a + /// partition (assuming all partitions have the same size). + /// Among such optimal assignation, it minimizes the distance to + /// the former assignation (if any) to minimize the amount of + /// data to be moved. A heuristic ensures node triplets + /// dispersion (in garage_util::bipartite::optimize_matching()). + pub fn calculate_partition_assignation(&mut self) -> bool { + //The nodes might have been updated, some might have been deleted. + //So we need to first update the list of nodes and retrieve the + //assignation. + let old_node_assignation = self.update_nodes_and_ring(); + + let (node_zone, _) = self.get_node_zone_capacity(); + + //We compute the optimal number of partition to assign to + //every node and zone. + if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions() { + //We collect part_per_zone in a vec to not rely on the + //arbitrary order in which elements are iterated in + //Hashmap::iter() + let part_per_zone_vec = part_per_zone + .iter() + .map(|(x, y)| (x.clone(), *y)) + .collect::<Vec<(String, usize)>>(); + //We create an indexing of the zones + let mut zone_id = HashMap::<String, usize>::new(); + for i in 0..part_per_zone_vec.len() { + zone_id.insert(part_per_zone_vec[i].0.clone(), i); + } - /// This function calculates a new partition-to-node assignation. - /// The computed assignation maximizes the capacity of a - /// partition (assuming all partitions have the same size). - /// Among such optimal assignation, it minimizes the distance to - /// the former assignation (if any) to minimize the amount of - /// data to be moved. A heuristic ensures node triplets - /// dispersion (in garage_util::bipartite::optimize_matching()). - pub fn calculate_partition_assignation(&mut self) -> bool { - - //The nodes might have been updated, some might have been deleted. - //So we need to first update the list of nodes and retrieve the - //assignation. - let old_node_assignation = self.update_nodes_and_ring(); - - let (node_zone, _) = self.get_node_zone_capacity(); - - //We compute the optimal number of partition to assign to - //every node and zone. - if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions(){ - //We collect part_per_zone in a vec to not rely on the - //arbitrary order in which elements are iterated in - //Hashmap::iter() - let part_per_zone_vec = part_per_zone.iter() - .map(|(x,y)| (x.clone(),*y)) - .collect::<Vec<(String,usize)>>(); - //We create an indexing of the zones - let mut zone_id = HashMap::<String,usize>::new(); - for i in 0..part_per_zone_vec.len(){ - zone_id.insert(part_per_zone_vec[i].0.clone(), i); - } - - //We compute a candidate for the new partition to zone - //assignation. - let nb_zones = part_per_zone.len(); - let nb_nodes = part_per_nod.len(); - let nb_partitions = 1<<PARTITION_BITS; - let left_cap_vec = vec![self.replication_factor as u32 ; nb_partitions]; - let right_cap_vec = part_per_zone_vec.iter().map(|(_,y)| *y as u32) - .collect(); - let mut zone_assignation = - dinic_compute_matching(left_cap_vec, right_cap_vec); - - - //We create the structure for the partition-to-node assignation. - let mut node_assignation = - vec![vec![None; self.replication_factor ];nb_partitions]; - //We will decrement part_per_nod to keep track of the number - //of partitions that we still have to associate. - let mut part_per_nod = part_per_nod.clone(); - - //We minimize the distance to the former assignation(if any) - - //We get the id of the zones of the former assignation - //(and the id no_zone if there is no node assignated) - let no_zone = part_per_zone_vec.len(); - let old_zone_assignation : Vec<Vec<usize>> = - old_node_assignation.iter().map(|x| x.iter().map( - |id| match *id { Some(i) => zone_id[&node_zone[i]] , - None => no_zone } - ).collect()).collect(); - - //We minimize the distance to the former zone assignation - zone_assignation = optimize_matching( - &old_zone_assignation, &zone_assignation, nb_zones+1); //+1 for no_zone - - //We need to assign partitions to nodes in their zone - //We first put the nodes assignation that can stay the same - for i in 0..nb_partitions{ - for j in 0..self.replication_factor { - if let Some(Some(former_node)) = old_node_assignation[i].iter().find( - |x| if let Some(id) = x { - zone_id[&node_zone[*id]] == zone_assignation[i][j] - } - else {false} - ) - { - if part_per_nod[*former_node] > 0 { - node_assignation[i][j] = Some(*former_node); - part_per_nod[*former_node] -= 1; - } - } - } - } - - - //We complete the assignation of partitions to nodes - let mut rng = rand::thread_rng(); - for i in 0..nb_partitions { - for j in 0..self.replication_factor { - if node_assignation[i][j] == None { - let possible_nodes : Vec<usize> = (0..nb_nodes) - .filter( - |id| zone_id[&node_zone[*id]] == zone_assignation[i][j] - && part_per_nod[*id] > 0).collect(); - assert!(possible_nodes.len()>0); - //We randomly pick a node - if let Some(nod) = possible_nodes.choose(&mut rng){ - node_assignation[i][j] = Some(*nod); - part_per_nod[*nod] -= 1; - } - } - } - } - - //We write the assignation in the 1D table - self.ring_assignation_data = Vec::<CompactNodeType>::new(); - for i in 0..nb_partitions{ - for j in 0..self.replication_factor { - if let Some(id) = node_assignation[i][j] { - self.ring_assignation_data.push(id as CompactNodeType); - } - else {assert!(false)} - } - } - - true - } - else { false } - } - - /// The LwwMap of node roles might have changed. This function updates the node_id_vec - /// and returns the assignation given by ring, with the new indices of the nodes, and - /// None of the node is not present anymore. - /// We work with the assumption that only this function and calculate_new_assignation - /// do modify assignation_ring and node_id_vec. - fn update_nodes_and_ring(&mut self) -> Vec<Vec<Option<usize>>> { - let nb_partitions = 1usize<<PARTITION_BITS; - let mut node_assignation = - vec![vec![None; self.replication_factor ];nb_partitions]; - let rf = self.replication_factor; - let ring = &self.ring_assignation_data; - - let new_node_id_vec : Vec::<Uuid> = self.roles.items().iter() - .map(|(k, _, _)| *k) - .collect(); - - if ring.len() == rf*nb_partitions { - for i in 0..nb_partitions { - for j in 0..self.replication_factor { - node_assignation[i][j] = new_node_id_vec.iter() - .position(|id| *id == self.node_id_vec[ring[i*rf + j] as usize]); - } - } - } - - self.node_id_vec = new_node_id_vec; - self.ring_assignation_data = vec![]; - return node_assignation; - } - - ///This function compute the number of partition to assign to - ///every node and zone, so that every partition is replicated - ///self.replication_factor times and the capacity of a partition - ///is maximized. - fn optimal_proportions(&mut self) -> Option<(Vec<usize>, HashMap<String, usize>)> { - - let mut zone_capacity :HashMap<String, u32>= HashMap::new(); - - let (node_zone, node_capacity) = self.get_node_zone_capacity(); - let nb_nodes = self.node_id_vec.len(); - - for i in 0..nb_nodes - { - if zone_capacity.contains_key(&node_zone[i]) { - zone_capacity.insert(node_zone[i].clone(), zone_capacity[&node_zone[i]] + node_capacity[i]); - } - else{ - zone_capacity.insert(node_zone[i].clone(), node_capacity[i]); - } - } - - //Compute the optimal number of partitions per zone - let sum_capacities: u32 =zone_capacity.values().sum(); - - if sum_capacities <= 0 { - println!("No storage capacity in the network."); - return None; - } - - let nb_partitions = 1<<PARTITION_BITS; - - //Initially we would like to use zones porportionally to - //their capacity. - //However, a large zone can be associated to at most - //nb_partitions to ensure replication of the date. - //So we take the min with nb_partitions: - let mut part_per_zone : HashMap<String, usize> = - zone_capacity.iter() - .map(|(k, v)| (k.clone(), min(nb_partitions, - (self.replication_factor*nb_partitions - **v as usize)/sum_capacities as usize) ) ).collect(); - - //The replication_factor-1 upper bounds the number of - //part_per_zones that are greater than nb_partitions - for _ in 1..self.replication_factor { - //The number of partitions that are not assignated to - //a zone that takes nb_partitions. - let sum_capleft : u32 = zone_capacity.keys() - .filter(| k | {part_per_zone[*k] < nb_partitions} ) - .map(|k| zone_capacity[k]).sum(); - - //The number of replication of the data that we need - //to ensure. - let repl_left = self.replication_factor - - part_per_zone.values() - .filter(|x| {**x == nb_partitions}) - .count(); - if repl_left == 0 { - break; - } - - for k in zone_capacity.keys() { - if part_per_zone[k] != nb_partitions - { - part_per_zone.insert(k.to_string() , min(nb_partitions, - (nb_partitions*zone_capacity[k] as usize - *repl_left)/sum_capleft as usize)); - } - } - } - - //Now we divide the zone's partition share proportionally - //between their nodes. - - let mut part_per_nod : Vec<usize> = (0..nb_nodes).map( - |i| (part_per_zone[&node_zone[i]]*node_capacity[i] as usize)/zone_capacity[&node_zone[i]] as usize - ) - .collect(); - - //We must update the part_per_zone to make it correspond to - //part_per_nod (because of integer rounding) - part_per_zone = part_per_zone.iter().map(|(k,_)| - (k.clone(), 0)) - .collect(); - for i in 0..nb_nodes { - part_per_zone.insert( - node_zone[i].clone() , - part_per_zone[&node_zone[i]] + part_per_nod[i]); - } - - //Because of integer rounding, the total sum of part_per_nod - //might not be replication_factor*nb_partitions. - // We need at most to add 1 to every non maximal value of - // part_per_nod. The capacity of a partition will be bounded - // by the minimal value of - // node_capacity_vec[i]/part_per_nod[i] - // so we try to maximize this minimal value, keeping the - // part_per_zone capped - - let discrepancy : usize = - nb_partitions*self.replication_factor - - part_per_nod.iter().sum::<usize>(); - - //We use a stupid O(N^2) algorithm. If the number of nodes - //is actually expected to be high, one should optimize this. - - for _ in 0..discrepancy { - if let Some(idmax) = (0..nb_nodes) - .filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions) - .max_by( |i,j| - (node_capacity[*i]*(part_per_nod[*j]+1) as u32) - .cmp(&(node_capacity[*j]*(part_per_nod[*i]+1) as u32)) - ) - { - part_per_nod[idmax] += 1; - part_per_zone.insert(node_zone[idmax].clone(),part_per_zone[&node_zone[idmax]]+1); - } - } - - //We check the algorithm consistency - - let discrepancy : usize = - nb_partitions*self.replication_factor - - part_per_nod.iter().sum::<usize>(); - assert!(discrepancy == 0); - assert!(if let Some(v) = part_per_zone.values().max() - {*v <= nb_partitions} else {false} ); - - Some((part_per_nod, part_per_zone)) - } - - - //Returns vectors of zone and capacity; indexed by the same (temporary) - //indices as node_id_vec. - fn get_node_zone_capacity(& self) -> (Vec<String> , Vec<u32>) { - - let node_zone = self.node_id_vec.iter().map( - |id_nod| match self.node_role(id_nod) { - Some(NodeRole{zone,capacity:_,tags:_}) => zone.clone() , - _ => "".to_string() - } - ).collect(); - - let node_capacity = self.node_id_vec.iter().map( - |id_nod| match self.node_role(id_nod) { - Some(NodeRole{zone:_,capacity,tags:_}) => - if let Some(c)=capacity - {*c} - else {0}, - _ => 0 - } - ).collect(); - - (node_zone,node_capacity) - } + //We compute a candidate for the new partition to zone + //assignation. + let nb_zones = part_per_zone.len(); + let nb_nodes = part_per_nod.len(); + let nb_partitions = 1 << PARTITION_BITS; + let left_cap_vec = vec![self.replication_factor as u32; nb_partitions]; + let right_cap_vec = part_per_zone_vec.iter().map(|(_, y)| *y as u32).collect(); + let mut zone_assignation = dinic_compute_matching(left_cap_vec, right_cap_vec); + + //We create the structure for the partition-to-node assignation. + let mut node_assignation = vec![vec![None; self.replication_factor]; nb_partitions]; + //We will decrement part_per_nod to keep track of the number + //of partitions that we still have to associate. + let mut part_per_nod = part_per_nod.clone(); + + //We minimize the distance to the former assignation(if any) + + //We get the id of the zones of the former assignation + //(and the id no_zone if there is no node assignated) + let no_zone = part_per_zone_vec.len(); + let old_zone_assignation: Vec<Vec<usize>> = old_node_assignation + .iter() + .map(|x| { + x.iter() + .map(|id| match *id { + Some(i) => zone_id[&node_zone[i]], + None => no_zone, + }) + .collect() + }) + .collect(); + + //We minimize the distance to the former zone assignation + zone_assignation = + optimize_matching(&old_zone_assignation, &zone_assignation, nb_zones + 1); //+1 for no_zone + + //We need to assign partitions to nodes in their zone + //We first put the nodes assignation that can stay the same + for i in 0..nb_partitions { + for j in 0..self.replication_factor { + if let Some(Some(former_node)) = old_node_assignation[i].iter().find(|x| { + if let Some(id) = x { + zone_id[&node_zone[*id]] == zone_assignation[i][j] + } else { + false + } + }) { + if part_per_nod[*former_node] > 0 { + node_assignation[i][j] = Some(*former_node); + part_per_nod[*former_node] -= 1; + } + } + } + } -} + //We complete the assignation of partitions to nodes + let mut rng = rand::thread_rng(); + for i in 0..nb_partitions { + for j in 0..self.replication_factor { + if node_assignation[i][j] == None { + let possible_nodes: Vec<usize> = (0..nb_nodes) + .filter(|id| { + zone_id[&node_zone[*id]] == zone_assignation[i][j] + && part_per_nod[*id] > 0 + }) + .collect(); + assert!(possible_nodes.len() > 0); + //We randomly pick a node + if let Some(nod) = possible_nodes.choose(&mut rng) { + node_assignation[i][j] = Some(*nod); + part_per_nod[*nod] -= 1; + } + } + } + } + + //We write the assignation in the 1D table + self.ring_assignation_data = Vec::<CompactNodeType>::new(); + for i in 0..nb_partitions { + for j in 0..self.replication_factor { + if let Some(id) = node_assignation[i][j] { + self.ring_assignation_data.push(id as CompactNodeType); + } else { + assert!(false) + } + } + } + true + } else { + false + } + } + + /// The LwwMap of node roles might have changed. This function updates the node_id_vec + /// and returns the assignation given by ring, with the new indices of the nodes, and + /// None of the node is not present anymore. + /// We work with the assumption that only this function and calculate_new_assignation + /// do modify assignation_ring and node_id_vec. + fn update_nodes_and_ring(&mut self) -> Vec<Vec<Option<usize>>> { + let nb_partitions = 1usize << PARTITION_BITS; + let mut node_assignation = vec![vec![None; self.replication_factor]; nb_partitions]; + let rf = self.replication_factor; + let ring = &self.ring_assignation_data; + + let new_node_id_vec: Vec<Uuid> = self.roles.items().iter().map(|(k, _, _)| *k).collect(); + + if ring.len() == rf * nb_partitions { + for i in 0..nb_partitions { + for j in 0..self.replication_factor { + node_assignation[i][j] = new_node_id_vec + .iter() + .position(|id| *id == self.node_id_vec[ring[i * rf + j] as usize]); + } + } + } + + self.node_id_vec = new_node_id_vec; + self.ring_assignation_data = vec![]; + return node_assignation; + } + + ///This function compute the number of partition to assign to + ///every node and zone, so that every partition is replicated + ///self.replication_factor times and the capacity of a partition + ///is maximized. + fn optimal_proportions(&mut self) -> Option<(Vec<usize>, HashMap<String, usize>)> { + let mut zone_capacity: HashMap<String, u32> = HashMap::new(); + + let (node_zone, node_capacity) = self.get_node_zone_capacity(); + let nb_nodes = self.node_id_vec.len(); + + for i in 0..nb_nodes { + if zone_capacity.contains_key(&node_zone[i]) { + zone_capacity.insert( + node_zone[i].clone(), + zone_capacity[&node_zone[i]] + node_capacity[i], + ); + } else { + zone_capacity.insert(node_zone[i].clone(), node_capacity[i]); + } + } + + //Compute the optimal number of partitions per zone + let sum_capacities: u32 = zone_capacity.values().sum(); + + if sum_capacities <= 0 { + println!("No storage capacity in the network."); + return None; + } + + let nb_partitions = 1 << PARTITION_BITS; + + //Initially we would like to use zones porportionally to + //their capacity. + //However, a large zone can be associated to at most + //nb_partitions to ensure replication of the date. + //So we take the min with nb_partitions: + let mut part_per_zone: HashMap<String, usize> = zone_capacity + .iter() + .map(|(k, v)| { + ( + k.clone(), + min( + nb_partitions, + (self.replication_factor * nb_partitions * *v as usize) + / sum_capacities as usize, + ), + ) + }) + .collect(); + + //The replication_factor-1 upper bounds the number of + //part_per_zones that are greater than nb_partitions + for _ in 1..self.replication_factor { + //The number of partitions that are not assignated to + //a zone that takes nb_partitions. + let sum_capleft: u32 = zone_capacity + .keys() + .filter(|k| part_per_zone[*k] < nb_partitions) + .map(|k| zone_capacity[k]) + .sum(); + + //The number of replication of the data that we need + //to ensure. + let repl_left = self.replication_factor + - part_per_zone + .values() + .filter(|x| **x == nb_partitions) + .count(); + if repl_left == 0 { + break; + } + + for k in zone_capacity.keys() { + if part_per_zone[k] != nb_partitions { + part_per_zone.insert( + k.to_string(), + min( + nb_partitions, + (nb_partitions * zone_capacity[k] as usize * repl_left) + / sum_capleft as usize, + ), + ); + } + } + } + + //Now we divide the zone's partition share proportionally + //between their nodes. + + let mut part_per_nod: Vec<usize> = (0..nb_nodes) + .map(|i| { + (part_per_zone[&node_zone[i]] * node_capacity[i] as usize) + / zone_capacity[&node_zone[i]] as usize + }) + .collect(); + + //We must update the part_per_zone to make it correspond to + //part_per_nod (because of integer rounding) + part_per_zone = part_per_zone.iter().map(|(k, _)| (k.clone(), 0)).collect(); + for i in 0..nb_nodes { + part_per_zone.insert( + node_zone[i].clone(), + part_per_zone[&node_zone[i]] + part_per_nod[i], + ); + } + + //Because of integer rounding, the total sum of part_per_nod + //might not be replication_factor*nb_partitions. + // We need at most to add 1 to every non maximal value of + // part_per_nod. The capacity of a partition will be bounded + // by the minimal value of + // node_capacity_vec[i]/part_per_nod[i] + // so we try to maximize this minimal value, keeping the + // part_per_zone capped + + let discrepancy: usize = + nb_partitions * self.replication_factor - part_per_nod.iter().sum::<usize>(); + + //We use a stupid O(N^2) algorithm. If the number of nodes + //is actually expected to be high, one should optimize this. + + for _ in 0..discrepancy { + if let Some(idmax) = (0..nb_nodes) + .filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions) + .max_by(|i, j| { + (node_capacity[*i] * (part_per_nod[*j] + 1) as u32) + .cmp(&(node_capacity[*j] * (part_per_nod[*i] + 1) as u32)) + }) { + part_per_nod[idmax] += 1; + part_per_zone.insert( + node_zone[idmax].clone(), + part_per_zone[&node_zone[idmax]] + 1, + ); + } + } + //We check the algorithm consistency + + let discrepancy: usize = + nb_partitions * self.replication_factor - part_per_nod.iter().sum::<usize>(); + assert!(discrepancy == 0); + assert!(if let Some(v) = part_per_zone.values().max() { + *v <= nb_partitions + } else { + false + }); + + Some((part_per_nod, part_per_zone)) + } + + //Returns vectors of zone and capacity; indexed by the same (temporary) + //indices as node_id_vec. + fn get_node_zone_capacity(&self) -> (Vec<String>, Vec<u32>) { + let node_zone = self + .node_id_vec + .iter() + .map(|id_nod| match self.node_role(id_nod) { + Some(NodeRole { + zone, + capacity: _, + tags: _, + }) => zone.clone(), + _ => "".to_string(), + }) + .collect(); + + let node_capacity = self + .node_id_vec + .iter() + .map(|id_nod| match self.node_role(id_nod) { + Some(NodeRole { + zone: _, + capacity, + tags: _, + }) => { + if let Some(c) = capacity { + *c + } else { + 0 + } + } + _ => 0, + }) + .collect(); + + (node_zone, node_capacity) + } +} #[cfg(test)] mod tests { - use super::*; - use itertools::Itertools; - - fn check_assignation(cl : &ClusterLayout) { - - //Check that input data has the right format - let nb_partitions = 1usize<<PARTITION_BITS; - assert!([1,2,3].contains(&cl.replication_factor)); - assert!(cl.ring_assignation_data.len() == nb_partitions*cl.replication_factor); - - let (node_zone, node_capacity) = cl.get_node_zone_capacity(); - - - //Check that is is a correct assignation with zone redundancy - let rf = cl.replication_factor; - for i in 0..nb_partitions{ - assert!( rf == - cl.ring_assignation_data[rf*i..rf*(i+1)].iter() - .map(|nod| node_zone[*nod as usize].clone()) - .unique() - .count() ); - } - - let nb_nodes = cl.node_id_vec.len(); - //Check optimality - let node_nb_part =(0..nb_nodes).map(|i| cl.ring_assignation_data - .iter() - .filter(|x| **x==i as u8) - .count()) - .collect::<Vec::<_>>(); - - let zone_vec = node_zone.iter().unique().collect::<Vec::<_>>(); - let zone_nb_part = zone_vec.iter().map( |z| cl.ring_assignation_data.iter() - .filter(|x| node_zone[**x as usize] == **z) - .count() - ).collect::<Vec::<_>>(); - - //Check optimality of the zone assignation : would it be better for the - //node_capacity/node_partitions ratio to change the assignation of a partition - - if let Some(idmin) = (0..nb_nodes).min_by( - |i,j| (node_capacity[*i]*node_nb_part[*j] as u32) - .cmp(&(node_capacity[*j]*node_nb_part[*i] as u32)) - ){ - if let Some(idnew) = (0..nb_nodes) - .filter( |i| if let Some(p) = zone_vec.iter().position(|z| **z==node_zone[*i]) - {zone_nb_part[p] < nb_partitions } - else { false }) - .max_by( - |i,j| (node_capacity[*i]*(node_nb_part[*j]as u32+1)) - .cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1))) - ){ - assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >= - node_capacity[idnew]*node_nb_part[idmin] as u32); - } - - } - - //In every zone, check optimality of the nod assignation - for z in zone_vec { - let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z ); - if let Some(idmin) = node_of_z_iter.clone().min_by( - |i,j| (node_capacity[*i]*node_nb_part[*j] as u32) - .cmp(&(node_capacity[*j]*node_nb_part[*i] as u32)) - ){ - if let Some(idnew) = node_of_z_iter.min_by( - |i,j| (node_capacity[*i]*(node_nb_part[*j] as u32+1)) - .cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1))) - ){ - assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >= - node_capacity[idnew]*node_nb_part[idmin] as u32); - } - } - } - - } - - fn update_layout(cl : &mut ClusterLayout, node_id_vec : &Vec<u8>, - node_capacity_vec : &Vec<u32> , node_zone_vec : &Vec<String>) { - for i in 0..node_id_vec.len(){ - if let Some(x) = FixedBytes32::try_from(&[i as u8;32]) { - cl.node_id_vec.push(x); - } - - let update = cl.roles.update_mutator(cl.node_id_vec[i] , - NodeRoleV(Some(NodeRole{ - zone : (node_zone_vec[i].to_string()), - capacity : (Some(node_capacity_vec[i])), - tags : (vec![])}))); - cl.roles.merge(&update); - } - } - - #[test] - fn test_assignation() { - - let mut node_id_vec = vec![1,2,3]; - let mut node_capacity_vec = vec![4000,1000,2000]; - let mut node_zone_vec= vec!["A", "B", "C"].into_iter().map(|x| x.to_string()).collect(); - - let mut cl = ClusterLayout { - node_id_vec: vec![], - - roles : LwwMap::new(), - - replication_factor: 3, - ring_assignation_data : vec![], - version:0, - staging: LwwMap::new(), - staging_hash: sha256sum(&[1;32]), - }; - update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); - cl.calculate_partition_assignation(); - check_assignation(&cl); - - node_id_vec = vec![1,2,3, 4, 5, 6, 7, 8, 9]; - node_capacity_vec = vec![4000,1000,1000, 3000, 1000, 1000, 2000, 10000, 2000]; - node_zone_vec= vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"].into_iter().map(|x| x.to_string()).collect(); - update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); - cl.calculate_partition_assignation(); - check_assignation(&cl); - - node_capacity_vec = vec![4000,1000,2000, 7000, 1000, 1000, 2000, 10000, 2000]; - update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); - cl.calculate_partition_assignation(); - check_assignation(&cl); - - - node_capacity_vec = vec![4000,4000,2000, 7000, 1000, 9000, 2000, 10, 2000]; - update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); - cl.calculate_partition_assignation(); - check_assignation(&cl); - - } -} + use super::*; + use itertools::Itertools; + + fn check_assignation(cl: &ClusterLayout) { + //Check that input data has the right format + let nb_partitions = 1usize << PARTITION_BITS; + assert!([1, 2, 3].contains(&cl.replication_factor)); + assert!(cl.ring_assignation_data.len() == nb_partitions * cl.replication_factor); + + let (node_zone, node_capacity) = cl.get_node_zone_capacity(); + + //Check that is is a correct assignation with zone redundancy + let rf = cl.replication_factor; + for i in 0..nb_partitions { + assert!( + rf == cl.ring_assignation_data[rf * i..rf * (i + 1)] + .iter() + .map(|nod| node_zone[*nod as usize].clone()) + .unique() + .count() + ); + } + + let nb_nodes = cl.node_id_vec.len(); + //Check optimality + let node_nb_part = (0..nb_nodes) + .map(|i| { + cl.ring_assignation_data + .iter() + .filter(|x| **x == i as u8) + .count() + }) + .collect::<Vec<_>>(); + + let zone_vec = node_zone.iter().unique().collect::<Vec<_>>(); + let zone_nb_part = zone_vec + .iter() + .map(|z| { + cl.ring_assignation_data + .iter() + .filter(|x| node_zone[**x as usize] == **z) + .count() + }) + .collect::<Vec<_>>(); + + //Check optimality of the zone assignation : would it be better for the + //node_capacity/node_partitions ratio to change the assignation of a partition + + if let Some(idmin) = (0..nb_nodes).min_by(|i, j| { + (node_capacity[*i] * node_nb_part[*j] as u32) + .cmp(&(node_capacity[*j] * node_nb_part[*i] as u32)) + }) { + if let Some(idnew) = (0..nb_nodes) + .filter(|i| { + if let Some(p) = zone_vec.iter().position(|z| **z == node_zone[*i]) { + zone_nb_part[p] < nb_partitions + } else { + false + } + }) + .max_by(|i, j| { + (node_capacity[*i] * (node_nb_part[*j] as u32 + 1)) + .cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1))) + }) { + assert!( + node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1) + >= node_capacity[idnew] * node_nb_part[idmin] as u32 + ); + } + } + + //In every zone, check optimality of the nod assignation + for z in zone_vec { + let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z); + if let Some(idmin) = node_of_z_iter.clone().min_by(|i, j| { + (node_capacity[*i] * node_nb_part[*j] as u32) + .cmp(&(node_capacity[*j] * node_nb_part[*i] as u32)) + }) { + if let Some(idnew) = node_of_z_iter.min_by(|i, j| { + (node_capacity[*i] * (node_nb_part[*j] as u32 + 1)) + .cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1))) + }) { + assert!( + node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1) + >= node_capacity[idnew] * node_nb_part[idmin] as u32 + ); + } + } + } + } + + fn update_layout( + cl: &mut ClusterLayout, + node_id_vec: &Vec<u8>, + node_capacity_vec: &Vec<u32>, + node_zone_vec: &Vec<String>, + ) { + for i in 0..node_id_vec.len() { + if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) { + cl.node_id_vec.push(x); + } + let update = cl.roles.update_mutator( + cl.node_id_vec[i], + NodeRoleV(Some(NodeRole { + zone: (node_zone_vec[i].to_string()), + capacity: (Some(node_capacity_vec[i])), + tags: (vec![]), + })), + ); + cl.roles.merge(&update); + } + } + + #[test] + fn test_assignation() { + let mut node_id_vec = vec![1, 2, 3]; + let mut node_capacity_vec = vec![4000, 1000, 2000]; + let mut node_zone_vec = vec!["A", "B", "C"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + + let mut cl = ClusterLayout { + node_id_vec: vec![], + roles: LwwMap::new(), + replication_factor: 3, + ring_assignation_data: vec![], + version: 0, + staging: LwwMap::new(), + staging_hash: sha256sum(&[1; 32]), + }; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; + node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; + node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + + node_capacity_vec = vec![4000, 4000, 2000, 7000, 1000, 9000, 2000, 10, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + cl.calculate_partition_assignation(); + check_assignation(&cl); + } +} diff --git a/src/util/bipartite.rs b/src/util/bipartite.rs index aec7b042..ade831a4 100644 --- a/src/util/bipartite.rs +++ b/src/util/bipartite.rs @@ -1,378 +1,376 @@ /* - * This module deals with graph algorithm in complete bipartite + * This module deals with graph algorithm in complete bipartite * graphs. It is used in layout.rs to build the partition to node * assignation. * */ -use std::cmp::{min,max}; -use std::collections::VecDeque; use rand::prelude::SliceRandom; +use std::cmp::{max, min}; +use std::collections::VecDeque; //Graph data structure for the flow algorithm. -#[derive(Clone,Copy,Debug)] -struct EdgeFlow{ - c : i32, - flow : i32, - v : usize, - rev : usize, +#[derive(Clone, Copy, Debug)] +struct EdgeFlow { + c: i32, + flow: i32, + v: usize, + rev: usize, } //Graph data structure for the detection of positive cycles. -#[derive(Clone,Copy,Debug)] -struct WeightedEdge{ - w : i32, - u : usize, - v : usize, +#[derive(Clone, Copy, Debug)] +struct WeightedEdge { + w: i32, + u: usize, + v: usize, } - -/* This function takes two matchings (old_match and new_match) in a - * complete bipartite graph. It returns a matching that has the +/* This function takes two matchings (old_match and new_match) in a + * complete bipartite graph. It returns a matching that has the * same degree as new_match at every vertex, and that is as close * as possible to old_match. * */ -pub fn optimize_matching( old_match : &Vec<Vec<usize>> , - new_match : &Vec<Vec<usize>> , - nb_right : usize ) - -> Vec<Vec<usize>> { - let nb_left = old_match.len(); - let ed = WeightedEdge{w:-1,u:0,v:0}; - let mut edge_vec = vec![ed ; nb_left*nb_right]; - - //We build the complete bipartite graph structure, represented - //by the list of all edges. - for i in 0..nb_left { - for j in 0..nb_right{ - edge_vec[i*nb_right + j].u = i; - edge_vec[i*nb_right + j].v = nb_left+j; - } - } - - for i in 0..edge_vec.len() { - //We add the old matchings - if old_match[edge_vec[i].u].contains(&(edge_vec[i].v-nb_left)) { - edge_vec[i].w *= -1; - } - //We add the new matchings - if new_match[edge_vec[i].u].contains(&(edge_vec[i].v-nb_left)) { - (edge_vec[i].u,edge_vec[i].v) = - (edge_vec[i].v,edge_vec[i].u); - edge_vec[i].w *= -1; - } - } - //Now edge_vec is a graph where edges are oriented LR if we - //can add them to new_match, and RL otherwise. If - //adding/removing them makes the matching closer to old_match - //they have weight 1; and -1 otherwise. - - //We shuffle the edge list so that there is no bias depending in - //partitions/zone label in the triplet dispersion - let mut rng = rand::thread_rng(); - edge_vec.shuffle(&mut rng); - - //Discovering and flipping a cycle with positive weight in this - //graph will make the matching closer to old_match. - //We use Bellman Ford algorithm to discover positive cycles - loop{ - if let Some(cycle) = positive_cycle(&edge_vec, nb_left, nb_right) { - for i in cycle { - //We flip the edges of the cycle. - (edge_vec[i].u,edge_vec[i].v) = - (edge_vec[i].v,edge_vec[i].u); - edge_vec[i].w *= -1; - } - } - else { - //If there is no cycle, we return the optimal matching. - break; - } - } - - //The optimal matching is build from the graph structure. - let mut matching = vec![Vec::<usize>::new() ; nb_left]; - for e in edge_vec { - if e.u > e.v { - matching[e.v].push(e.u-nb_left); - } - } - matching +pub fn optimize_matching( + old_match: &Vec<Vec<usize>>, + new_match: &Vec<Vec<usize>>, + nb_right: usize, +) -> Vec<Vec<usize>> { + let nb_left = old_match.len(); + let ed = WeightedEdge { w: -1, u: 0, v: 0 }; + let mut edge_vec = vec![ed; nb_left * nb_right]; + + //We build the complete bipartite graph structure, represented + //by the list of all edges. + for i in 0..nb_left { + for j in 0..nb_right { + edge_vec[i * nb_right + j].u = i; + edge_vec[i * nb_right + j].v = nb_left + j; + } + } + + for i in 0..edge_vec.len() { + //We add the old matchings + if old_match[edge_vec[i].u].contains(&(edge_vec[i].v - nb_left)) { + edge_vec[i].w *= -1; + } + //We add the new matchings + if new_match[edge_vec[i].u].contains(&(edge_vec[i].v - nb_left)) { + (edge_vec[i].u, edge_vec[i].v) = (edge_vec[i].v, edge_vec[i].u); + edge_vec[i].w *= -1; + } + } + //Now edge_vec is a graph where edges are oriented LR if we + //can add them to new_match, and RL otherwise. If + //adding/removing them makes the matching closer to old_match + //they have weight 1; and -1 otherwise. + + //We shuffle the edge list so that there is no bias depending in + //partitions/zone label in the triplet dispersion + let mut rng = rand::thread_rng(); + edge_vec.shuffle(&mut rng); + + //Discovering and flipping a cycle with positive weight in this + //graph will make the matching closer to old_match. + //We use Bellman Ford algorithm to discover positive cycles + loop { + if let Some(cycle) = positive_cycle(&edge_vec, nb_left, nb_right) { + for i in cycle { + //We flip the edges of the cycle. + (edge_vec[i].u, edge_vec[i].v) = (edge_vec[i].v, edge_vec[i].u); + edge_vec[i].w *= -1; + } + } else { + //If there is no cycle, we return the optimal matching. + break; + } + } + + //The optimal matching is build from the graph structure. + let mut matching = vec![Vec::<usize>::new(); nb_left]; + for e in edge_vec { + if e.u > e.v { + matching[e.v].push(e.u - nb_left); + } + } + matching } //This function finds a positive cycle in a bipartite wieghted graph. -fn positive_cycle( edge_vec : &Vec<WeightedEdge>, nb_left : usize, - nb_right : usize) -> Option<Vec<usize>> { - let nb_side_min = min(nb_left, nb_right); - let nb_vertices = nb_left+nb_right; - let weight_lowerbound = -((nb_left +nb_right) as i32) -1; - let mut accessed = vec![false ; nb_left]; - - //We try to find a positive cycle accessible from the left - //vertex i. - for i in 0..nb_left{ - if accessed[i] { - continue; - } - let mut weight =vec![weight_lowerbound ; nb_vertices]; - let mut prev =vec![ edge_vec.len() ; nb_vertices]; - weight[i] = 0; - //We compute largest weighted paths from i. - //Since the graph is bipartite, any simple cycle has length - //at most 2*nb_side_min. In the general Bellman-Ford - //algorithm, the bound here is the number of vertices. Since - //the number of partitions can be much larger than the - //number of nodes, we optimize that. - for _ in 0..(2*nb_side_min) { - for j in 0..edge_vec.len() { - let e = edge_vec[j]; - if weight[e.v] < weight[e.u]+e.w { - weight[e.v] = weight[e.u]+e.w; - prev[e.v] = j; - } - } - } - //We update the accessed table - for i in 0..nb_left { - if weight[i] > weight_lowerbound { - accessed[i] = true; - } - } - //We detect positive cycle - for e in edge_vec { - if weight[e.v] < weight[e.u]+e.w { - //it means e is on a path branching from a positive cycle - let mut was_seen = vec![false ; nb_vertices]; - let mut curr = e.u; - //We track back with prev until we reach the cycle. - while !was_seen[curr]{ - was_seen[curr] = true; - curr = edge_vec[prev[curr]].u; - } - //Now curr is on the cycle. We collect the edges ids. - let mut cycle = Vec::<usize>::new(); - cycle.push(prev[curr]); - let mut cycle_vert = edge_vec[prev[curr]].u; - while cycle_vert != curr { - cycle.push(prev[cycle_vert]); - cycle_vert = edge_vec[prev[cycle_vert]].u; - } - - return Some(cycle); - } - } - } - - None -} +fn positive_cycle( + edge_vec: &Vec<WeightedEdge>, + nb_left: usize, + nb_right: usize, +) -> Option<Vec<usize>> { + let nb_side_min = min(nb_left, nb_right); + let nb_vertices = nb_left + nb_right; + let weight_lowerbound = -((nb_left + nb_right) as i32) - 1; + let mut accessed = vec![false; nb_left]; + //We try to find a positive cycle accessible from the left + //vertex i. + for i in 0..nb_left { + if accessed[i] { + continue; + } + let mut weight = vec![weight_lowerbound; nb_vertices]; + let mut prev = vec![edge_vec.len(); nb_vertices]; + weight[i] = 0; + //We compute largest weighted paths from i. + //Since the graph is bipartite, any simple cycle has length + //at most 2*nb_side_min. In the general Bellman-Ford + //algorithm, the bound here is the number of vertices. Since + //the number of partitions can be much larger than the + //number of nodes, we optimize that. + for _ in 0..(2 * nb_side_min) { + for j in 0..edge_vec.len() { + let e = edge_vec[j]; + if weight[e.v] < weight[e.u] + e.w { + weight[e.v] = weight[e.u] + e.w; + prev[e.v] = j; + } + } + } + //We update the accessed table + for i in 0..nb_left { + if weight[i] > weight_lowerbound { + accessed[i] = true; + } + } + //We detect positive cycle + for e in edge_vec { + if weight[e.v] < weight[e.u] + e.w { + //it means e is on a path branching from a positive cycle + let mut was_seen = vec![false; nb_vertices]; + let mut curr = e.u; + //We track back with prev until we reach the cycle. + while !was_seen[curr] { + was_seen[curr] = true; + curr = edge_vec[prev[curr]].u; + } + //Now curr is on the cycle. We collect the edges ids. + let mut cycle = Vec::<usize>::new(); + cycle.push(prev[curr]); + let mut cycle_vert = edge_vec[prev[curr]].u; + while cycle_vert != curr { + cycle.push(prev[cycle_vert]); + cycle_vert = edge_vec[prev[cycle_vert]].u; + } -// This function takes two arrays of capacity and computes the -// maximal matching in the complete bipartite graph such that the + return Some(cycle); + } + } + } + + None +} + +// This function takes two arrays of capacity and computes the +// maximal matching in the complete bipartite graph such that the // left vertex i is matched to left_cap_vec[i] right vertices, and // the right vertex j is matched to right_cap_vec[j] left vertices. // To do so, we use Dinic's maximum flow algorithm. -pub fn dinic_compute_matching( left_cap_vec : Vec<u32>, - right_cap_vec : Vec<u32>) -> Vec< Vec<usize> > -{ - let mut graph = Vec::<Vec::<EdgeFlow> >::new(); - let ed = EdgeFlow{c:0,flow:0,v:0, rev:0}; - - // 0 will be the source - graph.push(vec![ed ; left_cap_vec.len()]); - for i in 0..left_cap_vec.len() - { - graph[0][i].c = left_cap_vec[i] as i32; - graph[0][i].v = i+2; - graph[0][i].rev = 0; - } - - //1 will be the sink - graph.push(vec![ed ; right_cap_vec.len()]); - for i in 0..right_cap_vec.len() - { - graph[1][i].c = right_cap_vec[i] as i32; - graph[1][i].v = i+2+left_cap_vec.len(); - graph[1][i].rev = 0; - } - - //we add left vertices - for i in 0..left_cap_vec.len() { - graph.push(vec![ed ; 1+right_cap_vec.len()]); - graph[i+2][0].c = 0; //directed - graph[i+2][0].v = 0; - graph[i+2][0].rev = i; - - for j in 0..right_cap_vec.len() { - graph[i+2][j+1].c = 1; - graph[i+2][j+1].v = 2+left_cap_vec.len()+j; - graph[i+2][j+1].rev = i+1; - } - } - - //we add right vertices - for i in 0..right_cap_vec.len() { - let lft_ln = left_cap_vec.len(); - graph.push(vec![ed ; 1+lft_ln]); - graph[i+lft_ln+2][0].c = graph[1][i].c; - graph[i+lft_ln+2][0].v = 1; - graph[i+lft_ln+2][0].rev = i; - - for j in 0..left_cap_vec.len() { - graph[i+2+lft_ln][j+1].c = 0; //directed - graph[i+2+lft_ln][j+1].v = j+2; - graph[i+2+lft_ln][j+1].rev = i+1; - } - } - - //To ensure the dispersion of the triplets generated by the - //assignation, we shuffle the neighbours of the nodes. Hence, - //left vertices do not consider the right ones in the same order. - let mut rng = rand::thread_rng(); - for i in 0..graph.len() { - graph[i].shuffle(&mut rng); - //We need to update the ids of the reverse edges. - for j in 0..graph[i].len() { - let target_v = graph[i][j].v; - let target_rev = graph[i][j].rev; - graph[target_v][target_rev].rev = j; - } - } - - let nb_vertices = graph.len(); - - //We run Dinic's max flow algorithm - loop{ - //We build the level array from Dinic's algorithm. - let mut level = vec![-1; nb_vertices]; - - let mut fifo = VecDeque::new(); - fifo.push_back((0,0)); - while !fifo.is_empty() { - if let Some((id,lvl)) = fifo.pop_front(){ - if level[id] == -1 { - level[id] = lvl; - for e in graph[id].iter(){ - if e.c-e.flow > 0{ - fifo.push_back((e.v,lvl+1)); - } - } - } - } - } - if level[1] == -1 { - //There is no residual flow - break; - } - - //Now we run DFS respecting the level array - let mut next_nbd = vec![0; nb_vertices]; - let mut lifo = VecDeque::new(); - - let flow_upper_bound; - if let Some(x) = left_cap_vec.iter().max() { - flow_upper_bound=*x as i32; - } - else { - flow_upper_bound = 0; - assert!(false); - } - - lifo.push_back((0,flow_upper_bound)); - - loop - { - if let Some((id_tmp, f_tmp)) = lifo.back() { - let id = *id_tmp; - let f = *f_tmp; - if id == 1 { - //The DFS reached the sink, we can add a - //residual flow. - lifo.pop_back(); - while !lifo.is_empty() { - if let Some((id,_)) = lifo.pop_back(){ - let nbd=next_nbd[id]; - graph[id][nbd].flow += f; - let id_v = graph[id][nbd].v; - let nbd_v = graph[id][nbd].rev; - graph[id_v][nbd_v].flow -= f; - } - } - lifo.push_back((0,flow_upper_bound)); - continue; - } - //else we did not reach the sink - let nbd = next_nbd[id]; - if nbd >= graph[id].len() { - //There is nothing to explore from id anymore - lifo.pop_back(); - if let Some((parent, _)) = lifo.back(){ - next_nbd[*parent] +=1; - } - continue; - } - //else we can try to send flow from id to its nbd - let new_flow = min(f,graph[id][nbd].c - - graph[id][nbd].flow); - if level[graph[id][nbd].v] <= level[id] || - new_flow == 0 { - //We cannot send flow to nbd. - next_nbd[id] += 1; - continue; - } - //otherwise, we send flow to nbd. - lifo.push_back((graph[id][nbd].v, new_flow)); - } - else { - break; - } - } - } - - //We return the association - let assoc_table = (0..left_cap_vec.len()).map( - |id| graph[id+2].iter() - .filter(|e| e.flow > 0) - .map( |e| e.v-2-left_cap_vec.len()) - .collect()).collect(); - - //consistency check - - //it is a flow - for i in 3..graph.len(){ - assert!( graph[i].iter().map(|e| e.flow).sum::<i32>() == 0); - for e in graph[i].iter(){ - assert!(e.flow + graph[e.v][e.rev].flow == 0); - } - } - - //it solves the matching problem - for i in 0..left_cap_vec.len(){ - assert!(left_cap_vec[i] as i32 == - graph[i+2].iter().map(|e| max(0,e.flow)).sum::<i32>()); - } - for i in 0..right_cap_vec.len(){ - assert!(right_cap_vec[i] as i32 == - graph[i+2+left_cap_vec.len()].iter() - .map(|e| max(0,e.flow)).sum::<i32>()); - } - - - assoc_table -} +pub fn dinic_compute_matching(left_cap_vec: Vec<u32>, right_cap_vec: Vec<u32>) -> Vec<Vec<usize>> { + let mut graph = Vec::<Vec<EdgeFlow>>::new(); + let ed = EdgeFlow { + c: 0, + flow: 0, + v: 0, + rev: 0, + }; + + // 0 will be the source + graph.push(vec![ed; left_cap_vec.len()]); + for i in 0..left_cap_vec.len() { + graph[0][i].c = left_cap_vec[i] as i32; + graph[0][i].v = i + 2; + graph[0][i].rev = 0; + } + + //1 will be the sink + graph.push(vec![ed; right_cap_vec.len()]); + for i in 0..right_cap_vec.len() { + graph[1][i].c = right_cap_vec[i] as i32; + graph[1][i].v = i + 2 + left_cap_vec.len(); + graph[1][i].rev = 0; + } + + //we add left vertices + for i in 0..left_cap_vec.len() { + graph.push(vec![ed; 1 + right_cap_vec.len()]); + graph[i + 2][0].c = 0; //directed + graph[i + 2][0].v = 0; + graph[i + 2][0].rev = i; + + for j in 0..right_cap_vec.len() { + graph[i + 2][j + 1].c = 1; + graph[i + 2][j + 1].v = 2 + left_cap_vec.len() + j; + graph[i + 2][j + 1].rev = i + 1; + } + } + //we add right vertices + for i in 0..right_cap_vec.len() { + let lft_ln = left_cap_vec.len(); + graph.push(vec![ed; 1 + lft_ln]); + graph[i + lft_ln + 2][0].c = graph[1][i].c; + graph[i + lft_ln + 2][0].v = 1; + graph[i + lft_ln + 2][0].rev = i; + + for j in 0..left_cap_vec.len() { + graph[i + 2 + lft_ln][j + 1].c = 0; //directed + graph[i + 2 + lft_ln][j + 1].v = j + 2; + graph[i + 2 + lft_ln][j + 1].rev = i + 1; + } + } + + //To ensure the dispersion of the triplets generated by the + //assignation, we shuffle the neighbours of the nodes. Hence, + //left vertices do not consider the right ones in the same order. + let mut rng = rand::thread_rng(); + for i in 0..graph.len() { + graph[i].shuffle(&mut rng); + //We need to update the ids of the reverse edges. + for j in 0..graph[i].len() { + let target_v = graph[i][j].v; + let target_rev = graph[i][j].rev; + graph[target_v][target_rev].rev = j; + } + } + + let nb_vertices = graph.len(); + + //We run Dinic's max flow algorithm + loop { + //We build the level array from Dinic's algorithm. + let mut level = vec![-1; nb_vertices]; + + let mut fifo = VecDeque::new(); + fifo.push_back((0, 0)); + while !fifo.is_empty() { + if let Some((id, lvl)) = fifo.pop_front() { + if level[id] == -1 { + level[id] = lvl; + for e in graph[id].iter() { + if e.c - e.flow > 0 { + fifo.push_back((e.v, lvl + 1)); + } + } + } + } + } + if level[1] == -1 { + //There is no residual flow + break; + } + + //Now we run DFS respecting the level array + let mut next_nbd = vec![0; nb_vertices]; + let mut lifo = VecDeque::new(); + + let flow_upper_bound; + if let Some(x) = left_cap_vec.iter().max() { + flow_upper_bound = *x as i32; + } else { + flow_upper_bound = 0; + assert!(false); + } + + lifo.push_back((0, flow_upper_bound)); + + loop { + if let Some((id_tmp, f_tmp)) = lifo.back() { + let id = *id_tmp; + let f = *f_tmp; + if id == 1 { + //The DFS reached the sink, we can add a + //residual flow. + lifo.pop_back(); + while !lifo.is_empty() { + if let Some((id, _)) = lifo.pop_back() { + let nbd = next_nbd[id]; + graph[id][nbd].flow += f; + let id_v = graph[id][nbd].v; + let nbd_v = graph[id][nbd].rev; + graph[id_v][nbd_v].flow -= f; + } + } + lifo.push_back((0, flow_upper_bound)); + continue; + } + //else we did not reach the sink + let nbd = next_nbd[id]; + if nbd >= graph[id].len() { + //There is nothing to explore from id anymore + lifo.pop_back(); + if let Some((parent, _)) = lifo.back() { + next_nbd[*parent] += 1; + } + continue; + } + //else we can try to send flow from id to its nbd + let new_flow = min(f, graph[id][nbd].c - graph[id][nbd].flow); + if level[graph[id][nbd].v] <= level[id] || new_flow == 0 { + //We cannot send flow to nbd. + next_nbd[id] += 1; + continue; + } + //otherwise, we send flow to nbd. + lifo.push_back((graph[id][nbd].v, new_flow)); + } else { + break; + } + } + } + + //We return the association + let assoc_table = (0..left_cap_vec.len()) + .map(|id| { + graph[id + 2] + .iter() + .filter(|e| e.flow > 0) + .map(|e| e.v - 2 - left_cap_vec.len()) + .collect() + }) + .collect(); + + //consistency check + + //it is a flow + for i in 3..graph.len() { + assert!(graph[i].iter().map(|e| e.flow).sum::<i32>() == 0); + for e in graph[i].iter() { + assert!(e.flow + graph[e.v][e.rev].flow == 0); + } + } + + //it solves the matching problem + for i in 0..left_cap_vec.len() { + assert!(left_cap_vec[i] as i32 == graph[i + 2].iter().map(|e| max(0, e.flow)).sum::<i32>()); + } + for i in 0..right_cap_vec.len() { + assert!( + right_cap_vec[i] as i32 + == graph[i + 2 + left_cap_vec.len()] + .iter() + .map(|e| max(0, e.flow)) + .sum::<i32>() + ); + } + + assoc_table +} #[cfg(test)] mod tests { - use super::*; - - #[test] - fn test_flow() { - let left_vec = vec![3;8]; - let right_vec = vec![0,4,8,4,8]; - //There are asserts in the function that computes the flow - let _ = dinic_compute_matching(left_vec, right_vec); - } - - //maybe add tests relative to the matching optilization ? -} + use super::*; + #[test] + fn test_flow() { + let left_vec = vec![3; 8]; + let right_vec = vec![0, 4, 8, 4, 8]; + //There are asserts in the function that computes the flow + let _ = dinic_compute_matching(left_vec, right_vec); + } + //maybe add tests relative to the matching optilization ? +} |