diff options
Diffstat (limited to 'src/rpc/layout.rs')
-rw-r--r-- | src/rpc/layout.rs | 1097 |
1 files changed, 684 insertions, 413 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index f517f36f..16d573c7 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -1,5 +1,9 @@ use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; +use std::collections::HashSet; + +use hex::ToHex; +use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -7,8 +11,15 @@ use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; use garage_util::error::*; +use crate::graph_algo::*; + use crate::ring::*; +use std::convert::TryInto; + +//The Message type will be used to collect information on the algorithm. +type Message = Vec<String>; + /// The layout of the cluster, i.e. the list of roles /// which are assigned to each cluster node #[derive(Clone, Debug, Serialize, Deserialize)] @@ -16,12 +27,21 @@ pub struct ClusterLayout { pub version: u64, pub replication_factor: usize, + #[serde(default="default_one")] + pub zone_redundancy: usize, + + //This attribute is only used to retain the previously computed partition size, + //to know to what extent does it change with the layout update. + #[serde(default="default_zero")] + pub partition_size: u32, + pub roles: LwwMap<Uuid, NodeRoleV>, /// node_id_vec: a vector of node IDs with a role assigned /// in the system (this includes gateway nodes). /// The order here is different than the vec stored by `roles`, because: - /// 1. non-gateway nodes are first so that they have lower numbers + /// 1. non-gateway nodes are first so that they have lower numbers holding + /// in u8 (the number of non-gateway nodes is at most 256). /// 2. nodes that don't have a role are excluded (but they need to /// stay in the CRDT as tombstones) pub node_id_vec: Vec<Uuid>, @@ -35,6 +55,15 @@ pub struct ClusterLayout { pub staging_hash: Hash, } +fn default_one() -> usize{ + return 1; +} +fn default_zero() -> u32{ + return 0; +} + +const NB_PARTITIONS : usize = 1usize << PARTITION_BITS; + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRoleV(pub Option<NodeRole>); @@ -63,16 +92,31 @@ impl NodeRole { None => "gateway".to_string(), } } + + pub fn tags_string(&self) -> String { + let mut tags = String::new(); + if self.tags.len() == 0 { + return tags + } + tags.push_str(&self.tags[0].clone()); + for t in 1..self.tags.len(){ + tags.push_str(","); + tags.push_str(&self.tags[t].clone()); + } + return tags; + } } impl ClusterLayout { - pub fn new(replication_factor: usize) -> Self { + pub fn new(replication_factor: usize, zone_redundancy: usize) -> Self { let empty_lwwmap = LwwMap::new(); let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); ClusterLayout { version: 0, replication_factor, + zone_redundancy, + partition_size: 0, roles: LwwMap::new(), node_id_vec: Vec::new(), ring_assignation_data: Vec::new(), @@ -174,6 +218,45 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + ///Returns the uuids of the non_gateway nodes in self.node_id_vec. + pub fn useful_nodes(&self) -> Vec<Uuid> { + let mut result = Vec::<Uuid>::new(); + for uuid in self.node_id_vec.iter() { + match self.node_role(uuid) { + Some(role) if role.capacity != None => result.push(*uuid), + _ => () + } + } + return result; + } + + ///Given a node uuids, this function returns the label of its zone + pub fn get_node_zone(&self, uuid : &Uuid) -> Result<String,String> { + match self.node_role(uuid) { + Some(role) => return Ok(role.zone.clone()), + _ => return Err("The Uuid does not correspond to a node present in the cluster.".to_string()) + } + } + + ///Given a node uuids, this function returns its capacity or fails if it does not have any + pub fn get_node_capacity(&self, uuid : &Uuid) -> Result<u32,String> { + match self.node_role(uuid) { + Some(NodeRole{capacity : Some(cap), zone: _, tags: _}) => return Ok(*cap), + _ => return Err("The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity.".to_string()) + } + } + + ///Returns the sum of capacities of non gateway nodes in the cluster + pub fn get_total_capacity(&self) -> Result<u32,String> { + let mut total_capacity = 0; + for uuid in self.useful_nodes().iter() { + total_capacity += self.get_node_capacity(uuid)?; + } + return Ok(total_capacity); + } + + /// Check a cluster layout for internal consistency /// returns true if consistent, false if error pub fn check(&self) -> bool { @@ -217,448 +300,636 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + //Check that every partition is associated to distinct nodes + let rf = self.replication_factor; + for p in 0..(1 << PARTITION_BITS) { + let nodes_of_p = self.ring_assignation_data[rf*p..rf*(p+1)].to_vec(); + if nodes_of_p.iter().unique().count() != rf { + return false; + } + //Check that every partition is spread over at least zone_redundancy zones. + let zones_of_p = nodes_of_p.iter() + .map(|n| self.get_node_zone(&self.node_id_vec[*n as usize]) + .expect("Zone not found.")); + if zones_of_p.unique().count() < self.zone_redundancy { + return false; + } + } + + //Check that the nodes capacities is consistent with the stored partitions + let mut node_usage = vec![0; MAX_NODE_NUMBER]; + for n in self.ring_assignation_data.iter() { + node_usage[*n as usize] += 1; + } + for n in 0..MAX_NODE_NUMBER { + if node_usage[n] > 0 { + let uuid = self.node_id_vec[n]; + if node_usage[n]*self.partition_size > self.get_node_capacity(&uuid) + .expect("Critical Error"){ + return false; + } + } + } + + //Check that the partition size stored is the one computed by the asignation + //algorithm. + let cl2 = self.clone(); + let (_ , zone_to_id) = cl2.generate_zone_ids().expect("Critical Error"); + let partition_size = cl2.compute_optimal_partition_size(&zone_to_id).expect("Critical Error"); + if partition_size != self.partition_size { + return false; + } + + true } - /// Calculate an assignation of partitions to nodes - pub fn calculate_partition_assignation(&mut self) -> bool { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); +} - println!("Calculating updated partition assignation, this may take some time..."); - println!(); +impl ClusterLayout { + /// This function calculates a new partition-to-node assignation. + /// The computed assignation respects the node replication factor + /// and the zone redundancy parameter It maximizes the capacity of a + /// partition (assuming all partitions have the same size). + /// Among such optimal assignation, it minimizes the distance to + /// the former assignation (if any) to minimize the amount of + /// data to be moved. + pub fn calculate_partition_assignation(&mut self, replication:usize, redundancy:usize) -> Result<Message,String> { + //The nodes might have been updated, some might have been deleted. + //So we need to first update the list of nodes and retrieve the + //assignation. + + //We update the node ids, since the node list might have changed with the staged + //changes in the layout. We retrieve the old_assignation reframed with the new ids + let old_assignation_opt = self.update_node_id_vec()?; + self.replication_factor = replication; + self.zone_redundancy = redundancy; + + let mut msg = Message::new(); + msg.push(format!("Computation of a new cluster layout where partitions are \ + replicated {} times on at least {} distinct zones.", replication, redundancy)); + + //We generate for once numerical ids for the zone, to use them as indices in the + //flow graphs. + let (id_to_zone , zone_to_id) = self.generate_zone_ids()?; + + msg.push(format!("The cluster contains {} nodes spread over {} zones.", + self.useful_nodes().len(), id_to_zone.len())); + + //We compute the optimal partition size + //Capacities should be given in a unit so that partition size is at least 100. + //In this case, integer rounding plays a marginal role in the percentages of + //optimality. + let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; + if old_assignation_opt != None { + msg.push(format!("Given the replication and redundancy constraint, the \ + optimal size of a partition is {}. In the previous layout, it used to \ + be {}.", partition_size, self.partition_size)); + } + else { + msg.push(format!("Given the replication and redundancy constraints, the \ + optimal size of a partition is {}.", partition_size)); + } + self.partition_size = partition_size; + + //We compute a first flow/assignment that is heuristically close to the previous + //assignment + let mut gflow = self.compute_candidate_assignment( &zone_to_id, &old_assignation_opt)?; + if let Some(assoc) = &old_assignation_opt { + //We minimize the distance to the previous assignment. + self.minimize_rebalance_load(&mut gflow, &zone_to_id, &assoc)?; + } + + msg.append(&mut self.output_stat(&gflow, &old_assignation_opt, &zone_to_id,&id_to_zone)?); + msg.push("".to_string()); + + //We update the layout structure + self.update_ring_from_flow(id_to_zone.len() , &gflow)?; + return Ok(msg); + } + + /// The LwwMap of node roles might have changed. This function updates the node_id_vec + /// and returns the assignation given by ring, with the new indices of the nodes, and + /// None if the node is not present anymore. + /// We work with the assumption that only this function and calculate_new_assignation + /// do modify assignation_ring and node_id_vec. + fn update_node_id_vec(&mut self) -> Result< Option< Vec<Vec<usize> > > ,String> { + // (1) We compute the new node list + //Non gateway nodes should be coded on 8bits, hence they must be first in the list + //We build the new node ids + let mut new_non_gateway_nodes: Vec<Uuid> = self.roles.items().iter() + .filter(|(_, _, v)| + match &v.0 {Some(r) if r.capacity != None => true, _=> false }) + .map(|(k, _, _)| *k).collect(); + + if new_non_gateway_nodes.len() > MAX_NODE_NUMBER { + return Err(format!("There are more than {} non-gateway nodes in the new \ + layout. This is not allowed.", MAX_NODE_NUMBER).to_string()); + } + + let mut new_gateway_nodes: Vec<Uuid> = self.roles.items().iter() + .filter(|(_, _, v)| + match v {NodeRoleV(Some(r)) if r.capacity == None => true, _=> false }) + .map(|(k, _, _)| *k).collect(); + + let nb_useful_nodes = new_non_gateway_nodes.len(); + let mut new_node_id_vec = Vec::<Uuid>::new(); + new_node_id_vec.append(&mut new_non_gateway_nodes); + new_node_id_vec.append(&mut new_gateway_nodes); + + + // (2) We retrieve the old association + //We rewrite the old association with the new indices. We only consider partition + //to node assignations where the node is still in use. + let nb_partitions = 1usize << PARTITION_BITS; + let mut old_assignation = vec![ Vec::<usize>::new() ; nb_partitions]; + + if self.ring_assignation_data.len() == 0 { + //This is a new association + return Ok(None); + } + if self.ring_assignation_data.len() != nb_partitions * self.replication_factor { + return Err("The old assignation does not have a size corresponding to \ + the old replication factor or the number of partitions.".to_string()); + } + + //We build a translation table between the uuid and new ids + let mut uuid_to_new_id = HashMap::<Uuid, usize>::new(); + + //We add the indices of only the new non-gateway nodes that can be used in the + //association ring + for i in 0..nb_useful_nodes { + uuid_to_new_id.insert(new_node_id_vec[i], i ); + } + + let rf= self.replication_factor; + for p in 0..nb_partitions { + for old_id in &self.ring_assignation_data[p*rf..(p+1)*rf] { + let uuid = self.node_id_vec[*old_id as usize]; + if uuid_to_new_id.contains_key(&uuid) { + old_assignation[p].push(uuid_to_new_id[&uuid]); + } + } + } + + //We write the results + self.node_id_vec = new_node_id_vec; + self.ring_assignation_data = Vec::<CompactNodeType>::new(); + + return Ok(Some(old_assignation)); + } - // Get old partition assignation - let old_partitions = self.parse_assignation_data(); - // Start new partition assignation with nodes from old assignation where it is relevant - let mut partitions = old_partitions - .iter() - .map(|old_part| { - let mut new_part = PartitionAss::new(); - for node in old_part.nodes.iter() { - if let Some(role) = node.1 { - if role.capacity.is_some() { - new_part.add(None, n_zones, node.0, role); - } - } - } - new_part - }) - .collect::<Vec<_>>(); + ///This function generates ids for the zone of the nodes appearing in + ///self.node_id_vec. + fn generate_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>),String>{ + let mut id_to_zone = Vec::<String>::new(); + let mut zone_to_id = HashMap::<String,usize>::new(); + + for uuid in self.node_id_vec.iter() { + if self.roles.get(uuid) == None { + return Err("The uuid was not found in the node roles (this should \ + not happen, it might be a critical error).".to_string()); + } + match self.node_role(&uuid) { + Some(r) => if !zone_to_id.contains_key(&r.zone) && r.capacity != None { + zone_to_id.insert(r.zone.clone() , id_to_zone.len()); + id_to_zone.push(r.zone.clone()); + } + _ => () + } + } + return Ok((id_to_zone, zone_to_id)); + } + + ///This function computes by dichotomy the largest realizable partition size, given + ///the layout. + fn compute_optimal_partition_size(&self, zone_to_id: &HashMap<String, usize>) -> Result<u32,String>{ + let nb_partitions = 1usize << PARTITION_BITS; + let empty_set = HashSet::<(usize,usize)>::new(); + let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? < (nb_partitions*self.replication_factor).try_into().unwrap() { + return Err("The storage capacity of he cluster is to small. It is \ + impossible to store partitions of size 1.".to_string()); + } + + let mut s_down = 1; + let mut s_up = self.get_total_capacity()?; + while s_down +1 < s_up { + g = self.generate_flow_graph((s_down+s_up)/2, zone_to_id, &empty_set)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? < (nb_partitions*self.replication_factor).try_into().unwrap() { + s_up = (s_down+s_up)/2; + } + else { + s_down = (s_down+s_up)/2; + } + } + + return Ok(s_down); + } + + fn generate_graph_vertices(nb_zones : usize, nb_nodes : usize) -> Vec<Vertex> { + let mut vertices = vec![Vertex::Source, Vertex::Sink]; + for p in 0..NB_PARTITIONS { + vertices.push(Vertex::Pup(p)); + vertices.push(Vertex::Pdown(p)); + for z in 0..nb_zones { + vertices.push(Vertex::PZ(p, z)); + } + } + for n in 0..nb_nodes { + vertices.push(Vertex::N(n)); + } + return vertices; + } + + fn generate_flow_graph(&self, size: u32, zone_to_id: &HashMap<String, usize>, exclude_assoc : &HashSet<(usize,usize)>) -> Result<Graph<FlowEdge>, String> { + let vertices = ClusterLayout::generate_graph_vertices(zone_to_id.len(), + self.useful_nodes().len()); + let mut g= Graph::<FlowEdge>::new(&vertices); + let nb_zones = zone_to_id.len(); + for p in 0..NB_PARTITIONS { + g.add_edge(Vertex::Source, Vertex::Pup(p), self.zone_redundancy as u32)?; + g.add_edge(Vertex::Source, Vertex::Pdown(p), (self.replication_factor - self.zone_redundancy) as u32)?; + for z in 0..nb_zones { + g.add_edge(Vertex::Pup(p) , Vertex::PZ(p,z) , 1)?; + g.add_edge(Vertex::Pdown(p) , Vertex::PZ(p,z) , + self.replication_factor as u32)?; + } + } + for n in 0..self.useful_nodes().len() { + let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?]; + g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity/size)?; + for p in 0..NB_PARTITIONS { + if !exclude_assoc.contains(&(p,n)) { + g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?; + } + } + } + return Ok(g); + } + + + fn compute_candidate_assignment(&self, zone_to_id: &HashMap<String, usize>, + old_assoc_opt : &Option<Vec< Vec<usize> >>) -> Result<Graph<FlowEdge>, String > { + + //We list the edges that are not used in the old association + let mut exclude_edge = HashSet::<(usize,usize)>::new(); + if let Some(old_assoc) = old_assoc_opt { + let nb_nodes = self.useful_nodes().len(); + for p in 0..NB_PARTITIONS { + for n in 0..nb_nodes { + exclude_edge.insert((p,n)); + } + for n in old_assoc[p].iter() { + exclude_edge.remove(&(p,*n)); + } + } + } + + //We compute the best flow using only the edges used in the old assoc + let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge )?; + g.compute_maximal_flow()?; + for (p,n) in exclude_edge.iter() { + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + g.add_edge(Vertex::PZ(*p,node_zone), Vertex::N(*n), 1)?; + } + g.compute_maximal_flow()?; + return Ok(g); + } + + fn minimize_rebalance_load(&self, gflow: &mut Graph<FlowEdge>, zone_to_id: &HashMap<String, usize>, old_assoc : &Vec< Vec<usize> >) -> Result<(), String > { + let mut cost = CostFunction::new(); + for p in 0..NB_PARTITIONS { + for n in old_assoc[p].iter() { + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + cost.insert((Vertex::PZ(p,node_zone), Vertex::N(*n)), -1); + } + } + let nb_nodes = self.useful_nodes().len(); + let path_length = 4*nb_nodes; + gflow.optimize_flow_with_cost(&cost, path_length)?; + + return Ok(()); + } + + fn update_ring_from_flow(&mut self, nb_zones : usize, gflow: &Graph<FlowEdge> ) -> Result<(), String>{ + self.ring_assignation_data = Vec::<CompactNodeType>::new(); + for p in 0..NB_PARTITIONS { + for z in 0..nb_zones { + let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p,z))?; + for vertex in assoc_vertex.iter() { + match vertex{ + Vertex::N(n) => self.ring_assignation_data.push((*n).try_into().unwrap()), + _ => () + } + } + } + } + + if self.ring_assignation_data.len() != NB_PARTITIONS*self.replication_factor { + return Err("Critical Error : the association ring we produced does not \ + have the right size.".to_string()); + } + return Ok(()); + } + + + //This function returns a message summing up the partition repartition of the new + //layout. + fn output_stat(&self , gflow : &Graph<FlowEdge>, + old_assoc_opt : &Option< Vec<Vec<usize>> >, + zone_to_id: &HashMap<String, usize>, + id_to_zone : &Vec<String>) -> Result<Message, String>{ + let mut msg = Message::new(); + + let nb_partitions = 1usize << PARTITION_BITS; + let used_cap = self.partition_size * nb_partitions as u32 * + self.replication_factor as u32; + let total_cap = self.get_total_capacity()?; + let percent_cap = 100.0*(used_cap as f32)/(total_cap as f32); + msg.push(format!("Available capacity / Total cluster capacity: {} / {} ({:.1} %)", + used_cap , total_cap , percent_cap )); + msg.push(format!("")); + msg.push(format!("If the percentage is to low, it might be that the \ + replication/redundancy constraints force the use of nodes/zones with small \ + storage capacities. \ + You might want to rebalance the storage capacities or relax the constraints. \ + See the detailed statistics below and look for saturated nodes/zones.")); + msg.push(format!("Recall that because of the replication, the actual available \ + storage capacity is {} / {} = {}.", + used_cap , self.replication_factor , + used_cap/self.replication_factor as u32)); + + //We define and fill in the following tables + let storing_nodes = self.useful_nodes(); + let mut new_partitions = vec![0; storing_nodes.len()]; + let mut stored_partitions = vec![0; storing_nodes.len()]; + + let mut new_partitions_zone = vec![0; id_to_zone.len()]; + let mut stored_partitions_zone = vec![0; id_to_zone.len()]; + + for p in 0..nb_partitions { + for z in 0..id_to_zone.len() { + let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p,z))?; + if pz_nodes.len() > 0 { + stored_partitions_zone[z] += 1; + if let Some(old_assoc) = old_assoc_opt { + let mut old_zones_of_p = Vec::<usize>::new(); + for n in old_assoc[p].iter() { + old_zones_of_p.push( + zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]); + } + if !old_zones_of_p.contains(&z) { + new_partitions_zone[z] += 1; + } + } + } + for vert in pz_nodes.iter() { + if let Vertex::N(n) = *vert { + stored_partitions[n] += 1; + if let Some(old_assoc) = old_assoc_opt { + if !old_assoc[p].contains(&n) { + new_partitions[n] += 1; + } + } + } + } + } + } + + if *old_assoc_opt == None { + new_partitions = stored_partitions.clone(); + new_partitions_zone = stored_partitions_zone.clone(); + } + + //We display the statistics + + msg.push(format!("")); + if *old_assoc_opt != None { + let total_new_partitions : usize = new_partitions.iter().sum(); + msg.push(format!("A total of {} new copies of partitions need to be \ + transferred.", total_new_partitions)); + } + msg.push(format!("")); + msg.push(format!("Detailed statistics by zones and nodes.")); + + for z in 0..id_to_zone.len(){ + let mut nodes_of_z = Vec::<usize>::new(); + for n in 0..storing_nodes.len(){ + if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] { + nodes_of_z.push(n); + } + } + let replicated_partitions : usize = nodes_of_z.iter() + .map(|n| stored_partitions[*n]).sum(); + msg.push(format!("")); + + msg.push(format!("Zone {}: {} distinct partitions stored ({} new, \ + {} partition copies) ", id_to_zone[z], stored_partitions_zone[z], + new_partitions_zone[z], replicated_partitions)); + + let available_cap_z : u32 = self.partition_size*replicated_partitions as u32; + let mut total_cap_z = 0; + for n in nodes_of_z.iter() { + total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; + } + let percent_cap_z = 100.0*(available_cap_z as f32)/(total_cap_z as f32); + msg.push(format!(" Available capacity / Total capacity: {}/{} ({:.1}%).", + available_cap_z, total_cap_z, percent_cap_z)); + + for n in nodes_of_z.iter() { + let available_cap_n = stored_partitions[*n] as u32 *self.partition_size; + let total_cap_n =self.get_node_capacity(&self.node_id_vec[*n])?; + let tags_n = (self.node_role(&self.node_id_vec[*n]) + .ok_or("Node not found."))?.tags_string(); + msg.push(format!(" Node {}: {} partitions ({} new) ; \ + available/total capacity: {} / {} ({:.1}%) ; tags:{}", + &self.node_id_vec[*n].to_vec()[0..2].to_vec().encode_hex::<String>(), + stored_partitions[*n], + new_partitions[*n], available_cap_n, total_cap_n, + (available_cap_n as f32)/(total_cap_n as f32)*100.0 , + tags_n)); + } + } + + return Ok(msg); + } + +} - // In various cases, not enough nodes will have been added for all partitions - // in the step above (e.g. due to node removals, or new zones being added). - // Here we add more nodes to make a complete (but sub-optimal) assignation, - // using an initial partition assignation that is calculated using the multi-dc maglev trick - match self.initial_partition_assignation() { - Some(initial_partitions) => { - for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { - for (id, info) in ipart.nodes.iter() { - if part.nodes.len() < self.replication_factor { - part.add(None, n_zones, id, info.unwrap()); - } - } - assert!(part.nodes.len() == self.replication_factor); - } - } - None => { - // Not enough nodes in cluster to build a correct assignation. - // Signal it by returning an error. - return false; - } +//==================================================================================== + +#[cfg(test)] +mod tests { + use super::*; + use std::io::*; +// use itertools::Itertools; +/* + fn check_assignation(cl: &ClusterLayout) { + //Check that input data has the right format + let nb_partitions = 1usize << PARTITION_BITS; + assert!(cl.ring_assignation_data.len() == nb_partitions * cl.replication_factor); + + //Check that is is a correct assignation with zone redundancy + let rf = cl.replication_factor; + for i in 0..nb_partitions { + assert!( + rf == cl.ring_assignation_data[rf * i..rf * (i + 1)] + .iter() + .map(|nod| node_zone[*nod as usize].clone()) + .unique() + .count() + ); } - // Calculate how many partitions each node should ideally store, - // and how many partitions they are storing with the current assignation - // This defines our target for which we will optimize in the following loop. - let total_capacity = configured_nodes - .iter() - .map(|(_, info)| info.capacity.unwrap_or(0)) - .sum::<u32>() as usize; - let total_partitions = self.replication_factor * (1 << PARTITION_BITS); - let target_partitions_per_node = configured_nodes - .iter() - .map(|(id, info)| { - ( - *id, - info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity, - ) + let nb_nodes = cl.node_id_vec.len(); + //Check optimality + let node_nb_part = (0..nb_nodes) + .map(|i| { + cl.ring_assignation_data + .iter() + .filter(|x| **x == i as u8) + .count() }) - .collect::<HashMap<&Uuid, usize>>(); - - let mut partitions_per_node = self.partitions_per_node(&partitions[..]); - - println!("Target number of partitions per node:"); - for (node, npart) in target_partitions_per_node.iter() { - println!("{:?}\t{}", node, npart); - } - println!(); - - // Shuffle partitions between nodes so that nodes will reach (or better approach) - // their target number of stored partitions - loop { - let mut option = None; - for (i, part) in partitions.iter_mut().enumerate() { - for (irm, (idrm, _)) in part.nodes.iter().enumerate() { - let errratio = |node, parts| { - let tgt = *target_partitions_per_node.get(node).unwrap() as f32; - (parts - tgt) / tgt - }; - let square = |x| x * x; - - let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32; - - for (idadd, infoadd) in configured_nodes.iter() { - // skip replacing a node by itself - // and skip replacing by gateway nodes - if idadd == idrm || infoadd.capacity.is_none() { - continue; - } - - // We want to try replacing node idrm by node idadd - // if that brings us close to our goal. - let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32; - let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd)); - let newcost = - square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.)); - if newcost >= oldcost { - // not closer to our goal - continue; - } - let gain = oldcost - newcost; - - let mut newpart = part.clone(); - - newpart.nodes.remove(irm); - if !newpart.add(None, n_zones, idadd, infoadd) { - continue; - } - assert!(newpart.nodes.len() == self.replication_factor); - - if !old_partitions[i] - .is_valid_transition_to(&newpart, self.replication_factor) - { - continue; - } - - if option - .as_ref() - .map(|(old_gain, _, _, _, _)| gain > *old_gain) - .unwrap_or(true) - { - option = Some((gain, i, idadd, idrm, newpart)); - } - } - } - } - if let Some((_gain, i, idadd, idrm, newpart)) = option { - *partitions_per_node.entry(idadd).or_insert(0) += 1; - *partitions_per_node.get_mut(idrm).unwrap() -= 1; - partitions[i] = newpart; - } else { - break; - } - } - - // Check we completed the assignation correctly - // (this is a set of checks for the algorithm's consistency) - assert!(partitions.len() == (1 << PARTITION_BITS)); - assert!(partitions - .iter() - .all(|p| p.nodes.len() == self.replication_factor)); - - let new_partitions_per_node = self.partitions_per_node(&partitions[..]); - assert!(new_partitions_per_node == partitions_per_node); - - // Show statistics - println!("New number of partitions per node:"); - for (node, npart) in partitions_per_node.iter() { - let tgt = *target_partitions_per_node.get(node).unwrap(); - let pct = 100f32 * (*npart as f32) / (tgt as f32); - println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt); - } - println!(); - - let mut diffcount = HashMap::new(); - for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) { - let nminus = oldpart.txtplus(newpart); - let nplus = newpart.txtplus(oldpart); - if nminus != "[...]" || nplus != "[...]" { - let tup = (nminus, nplus); - *diffcount.entry(tup).or_insert(0) += 1; - } - } - if diffcount.is_empty() { - println!("No data will be moved between nodes."); - } else { - let mut diffcount = diffcount.into_iter().collect::<Vec<_>>(); - diffcount.sort(); - println!("Number of partitions that move:"); - for ((nminus, nplus), npart) in diffcount { - println!("\t{}\t{} -> {}", npart, nminus, nplus); - } - } - println!(); - - // Calculate and save new assignation data - let (nodes, assignation_data) = - self.compute_assignation_data(&configured_nodes[..], &partitions[..]); - - self.node_id_vec = nodes; - self.ring_assignation_data = assignation_data; - - true - } - - fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); - - // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) - let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); - - // Prepare ring - let mut partitions: Vec<PartitionAss> = partitions_idx - .iter() - .map(|_i| PartitionAss::new()) .collect::<Vec<_>>(); - // Create MagLev priority queues for each node - let mut queues = configured_nodes + let zone_vec = node_zone.iter().unique().collect::<Vec<_>>(); + let zone_nb_part = zone_vec .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(node_id, node_info)| { - let mut parts = partitions_idx + .map(|z| { + cl.ring_assignation_data .iter() - .map(|i| { - let part_data = - [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat(); - (*i, fasthash(&part_data[..])) - }) - .collect::<Vec<_>>(); - parts.sort_by_key(|(_i, h)| *h); - let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>(); - (node_id, node_info, parts_i, 0) + .filter(|x| node_zone[**x as usize] == **z) + .count() }) .collect::<Vec<_>>(); - let max_capacity = configured_nodes - .iter() - .filter_map(|(_, node_info)| node_info.capacity) - .fold(0, std::cmp::max); - - // Fill up ring - for rep in 0..self.replication_factor { - queues.sort_by_key(|(ni, _np, _q, _p)| { - let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); - fasthash(&queue_data[..]) - }); - - for (_, _, _, pos) in queues.iter_mut() { - *pos = 0; - } - - let mut remaining = partitions_idx.len(); - while remaining > 0 { - let remaining0 = remaining; - for i_round in 0..max_capacity { - for (node_id, node_info, q, pos) in queues.iter_mut() { - if i_round >= node_info.capacity.unwrap() { - continue; - } - for (pos2, &qv) in q.iter().enumerate().skip(*pos) { - if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) { - remaining -= 1; - *pos = pos2 + 1; - break; - } - } + //Check optimality of the zone assignation : would it be better for the + //node_capacity/node_partitions ratio to change the assignation of a partition + + if let Some(idmin) = (0..nb_nodes).min_by(|i, j| { + (node_capacity[*i] * node_nb_part[*j] as u32) + .cmp(&(node_capacity[*j] * node_nb_part[*i] as u32)) + }) { + if let Some(idnew) = (0..nb_nodes) + .filter(|i| { + if let Some(p) = zone_vec.iter().position(|z| **z == node_zone[*i]) { + zone_nb_part[p] < nb_partitions + } else { + false } - } - if remaining == remaining0 { - // No progress made, exit - return None; - } - } - } - - Some(partitions) - } - - fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) { - let configured_nodes = self - .roles - .items() - .iter() - .filter(|(_id, _, info)| info.0.is_some()) - .map(|(id, _, info)| (id, info.0.as_ref().unwrap())) - .collect::<Vec<(&Uuid, &NodeRole)>>(); - - let zones = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(_id, info)| info.zone.as_str()) - .collect::<HashSet<&str>>(); - - (configured_nodes, zones) - } - - fn compute_assignation_data<'a>( - &self, - configured_nodes: &[(&'a Uuid, &'a NodeRole)], - partitions: &[PartitionAss<'a>], - ) -> (Vec<Uuid>, Vec<CompactNodeType>) { - assert!(partitions.len() == (1 << PARTITION_BITS)); - - // Make a canonical order for nodes - let mut nodes = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(id, _)| **id) - .collect::<Vec<_>>(); - let nodes_rev = nodes - .iter() - .enumerate() - .map(|(i, id)| (*id, i as CompactNodeType)) - .collect::<HashMap<Uuid, CompactNodeType>>(); - - let mut assignation_data = vec![]; - for partition in partitions.iter() { - assert!(partition.nodes.len() == self.replication_factor); - for (id, _) in partition.nodes.iter() { - assignation_data.push(*nodes_rev.get(id).unwrap()); + }) + .max_by(|i, j| { + (node_capacity[*i] * (node_nb_part[*j] as u32 + 1)) + .cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1))) + }) { + assert!( + node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1) + >= node_capacity[idnew] * node_nb_part[idmin] as u32 + ); } } - nodes.extend( - configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_none()) - .map(|(id, _)| **id), - ); - - (nodes, assignation_data) - } - - fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> { - if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) { - // If the previous assignation data is correct, use that - let mut partitions = vec![]; - for i in 0..(1 << PARTITION_BITS) { - let mut part = PartitionAss::new(); - for node_i in self.ring_assignation_data - [i * self.replication_factor..(i + 1) * self.replication_factor] - .iter() - { - let node_id = &self.node_id_vec[*node_i as usize]; - - if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) { - part.nodes.push((node_id, Some(info))); - } else { - part.nodes.push((node_id, None)); - } + //In every zone, check optimality of the nod assignation + for z in zone_vec { + let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z); + if let Some(idmin) = node_of_z_iter.clone().min_by(|i, j| { + (node_capacity[*i] * node_nb_part[*j] as u32) + .cmp(&(node_capacity[*j] * node_nb_part[*i] as u32)) + }) { + if let Some(idnew) = node_of_z_iter.min_by(|i, j| { + (node_capacity[*i] * (node_nb_part[*j] as u32 + 1)) + .cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1))) + }) { + assert!( + node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1) + >= node_capacity[idnew] * node_nb_part[idmin] as u32 + ); } - partitions.push(part); } - partitions - } else { - // Otherwise start fresh - (0..(1 << PARTITION_BITS)) - .map(|_| PartitionAss::new()) - .collect() } } - - fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> { - let mut partitions_per_node = HashMap::<&Uuid, usize>::new(); - for p in partitions.iter() { - for (id, _) in p.nodes.iter() { - *partitions_per_node.entry(*id).or_insert(0) += 1; +*/ + + fn show_msg(msg : &Message) { + for s in msg.iter(){ + println!("{}",s); + } + } + + fn update_layout( + cl: &mut ClusterLayout, + node_id_vec: &Vec<u8>, + node_capacity_vec: &Vec<u32>, + node_zone_vec: &Vec<String>, + ) { + for i in 0..node_id_vec.len() { + if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) { + cl.node_id_vec.push(x); } - } - partitions_per_node - } -} - -// ---- Internal structs for partition assignation in layout ---- - -#[derive(Clone)] -struct PartitionAss<'a> { - nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>, -} - -impl<'a> PartitionAss<'a> { - fn new() -> Self { - Self { nodes: Vec::new() } - } - - fn nplus(&self, other: &PartitionAss<'a>) -> usize { - self.nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .count() - } - fn txtplus(&self, other: &PartitionAss<'a>) -> String { - let mut nodes = self - .nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .map(|x| format!("{:?}", x.0)) - .collect::<Vec<_>>(); - nodes.sort(); - if self.nodes.iter().any(|x| other.nodes.contains(x)) { - nodes.push("...".into()); + let update = cl.roles.update_mutator( + cl.node_id_vec[i], + NodeRoleV(Some(NodeRole { + zone: (node_zone_vec[i].to_string()), + capacity: (Some(node_capacity_vec[i])), + tags: (vec![]), + })), + ); + cl.roles.merge(&update); } - format!("[{}]", nodes.join(" ")) } - fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool { - let min_keep_nodes_per_part = (replication_factor + 1) / 2; - let n_removed = self.nplus(other); + #[test] + fn test_assignation() { + std::io::stdout().flush().ok().expect("Could not flush stdout"); + let mut node_id_vec = vec![1, 2, 3]; + let mut node_capacity_vec = vec![4000, 1000, 2000]; + let mut node_zone_vec = vec!["A", "B", "C"] + .into_iter() + .map(|x| x.to_string()) + .collect(); - if self.nodes.len() <= min_keep_nodes_per_part { - n_removed == 0 - } else { - n_removed <= self.nodes.len() - min_keep_nodes_per_part - } - } + let mut cl = ClusterLayout { + node_id_vec: vec![], - // add is a key function in creating a PartitionAss, i.e. the list of nodes - // to which a partition is assigned. It tries to add a certain node id to the - // assignation, but checks that doing so is compatible with the NECESSARY - // condition that the partition assignation must be dispersed over different - // zones (datacenters) if enough zones exist. This is why it takes a n_zones - // parameter, which is the total number of zones that have existing nodes: - // if nodes in the assignation already cover all n_zones zones, then any node - // that is not yet in the assignation can be added. Otherwise, only nodes - // that are in a new zone can be added. - fn add( - &mut self, - target_len: Option<usize>, - n_zones: usize, - node: &'a Uuid, - role: &'a NodeRole, - ) -> bool { - if let Some(tl) = target_len { - if self.nodes.len() != tl - 1 { - return false; - } - } + roles: LwwMap::new(), + + replication_factor: 3, + zone_redundancy: 1, + partition_size: 0, + ring_assignation_data: vec![], + version: 0, + staging: LwwMap::new(), + staging_hash: blake2sum(&rmp_to_vec_all_named(&LwwMap::<Uuid, NodeRoleV>::new()).unwrap()[..]), + }; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + show_msg(&cl.calculate_partition_assignation(3,3).unwrap()); + assert!(cl.check()); + + node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; + node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; + node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + show_msg(&cl.calculate_partition_assignation(3,3).unwrap()); + assert!(cl.check()); + + node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + show_msg(&cl.calculate_partition_assignation(3,3).unwrap()); + assert!(cl.check()); + + node_capacity_vec = vec![4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); + show_msg(&cl.calculate_partition_assignation(3,1).unwrap()); + assert!(cl.check()); - let p_zns = self - .nodes - .iter() - .map(|(_id, info)| info.unwrap().zone.as_str()) - .collect::<HashSet<&str>>(); - if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str())) - || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node)) - { - self.nodes.push((node, Some(role))); - true - } else { - false - } } } |