diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/rpc/graph_algo.rs | 420 | ||||
-rw-r--r-- | src/rpc/layout.rs | 1225 | ||||
-rw-r--r-- | src/rpc/lib.rs | 1 | ||||
-rw-r--r-- | src/rpc/ring.rs | 1 |
5 files changed, 1261 insertions, 387 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 2c2ddc0b..5a427131 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -22,6 +22,7 @@ gethostname = "0.2" hex = "0.4" tracing = "0.1.30" rand = "0.8" +itertools="0.10" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } async-trait = "0.1.7" diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs new file mode 100644 index 00000000..5bd6cc51 --- /dev/null +++ b/src/rpc/graph_algo.rs @@ -0,0 +1,420 @@ +//! This module deals with graph algorithms. +//! It is used in layout.rs to build the partition to node assignation. + +use rand::prelude::SliceRandom; +use std::cmp::{max, min}; +use std::collections::HashMap; +use std::collections::VecDeque; + +///Vertex data structures used in all the graphs used in layout.rs. +///usize parameters correspond to node/zone/partitions ids. +///To understand the vertex roles below, please refer to the formal description +///of the layout computation algorithm. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Vertex { + Source, + Pup(usize), //The vertex p+ of partition p + Pdown(usize), //The vertex p- of partition p + PZ(usize, usize), //The vertex corresponding to x_(partition p, zone z) + N(usize), //The vertex corresponding to node n + Sink, +} + +///Edge data structure for the flow algorithm. +#[derive(Clone, Copy, Debug)] +pub struct FlowEdge { + cap: u32, //flow maximal capacity of the edge + flow: i32, //flow value on the edge + dest: usize, //destination vertex id + rev: usize, //index of the reversed edge (v, self) in the edge list of vertex v +} + +///Edge data structure for the detection of negative cycles. +#[derive(Clone, Copy, Debug)] +pub struct WeightedEdge { + w: i32, //weight of the edge + dest: usize, +} + +pub trait Edge: Clone + Copy {} +impl Edge for FlowEdge {} +impl Edge for WeightedEdge {} + +///Struct for the graph structure. We do encapsulation here to be able to both +///provide user friendly Vertex enum to address vertices, and to use internally usize +///indices and Vec instead of HashMap in the graph algorithm to optimize execution speed. +pub struct Graph<E: Edge> { + vertextoid: HashMap<Vertex, usize>, + idtovertex: Vec<Vertex>, + + //The graph is stored as an adjacency list + graph: Vec<Vec<E>>, +} + +pub type CostFunction = HashMap<(Vertex, Vertex), i32>; + +impl<E: Edge> Graph<E> { + pub fn new(vertices: &[Vertex]) -> Self { + let mut map = HashMap::<Vertex, usize>::new(); + for (i, vert) in vertices.iter().enumerate() { + map.insert(*vert, i); + } + Graph::<E> { + vertextoid: map, + idtovertex: vertices.to_vec(), + graph: vec![Vec::<E>::new(); vertices.len()], + } + } +} + +impl Graph<FlowEdge> { + ///This function adds a directed edge to the graph with capacity c, and the + ///corresponding reversed edge with capacity 0. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> { + if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idu = self.vertextoid[&u]; + let idv = self.vertextoid[&v]; + let rev_u = self.graph[idu].len(); + let rev_v = self.graph[idv].len(); + self.graph[idu].push(FlowEdge { + cap: c, + dest: idv, + flow: 0, + rev: rev_v, + }); + self.graph[idv].push(FlowEdge { + cap: 0, + dest: idu, + flow: 0, + rev: rev_u, + }); + Ok(()) + } + + ///This function returns the list of vertices that receive a positive flow from + ///vertex v. + pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> { + if !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idv = self.vertextoid[&v]; + let mut result = Vec::<Vertex>::new(); + for edge in self.graph[idv].iter() { + if edge.flow > 0 { + result.push(self.idtovertex[edge.dest]); + } + } + Ok(result) + } + + ///This function returns the value of the flow incoming to v. + pub fn get_inflow(&self, v: Vertex) -> Result<i32, String> { + if !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idv = self.vertextoid[&v]; + let mut result = 0; + for edge in self.graph[idv].iter() { + result += max(0, self.graph[edge.dest][edge.rev].flow); + } + Ok(result) + } + + ///This function returns the value of the flow outgoing from v. + pub fn get_outflow(&self, v: Vertex) -> Result<i32, String> { + if !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idv = self.vertextoid[&v]; + let mut result = 0; + for edge in self.graph[idv].iter() { + result += max(0, edge.flow); + } + Ok(result) + } + + ///This function computes the flow total value by computing the outgoing flow + ///from the source. + pub fn get_flow_value(&mut self) -> Result<i32, String> { + self.get_outflow(Vertex::Source) + } + + ///This function shuffles the order of the edge lists. It keeps the ids of the + ///reversed edges consistent. + fn shuffle_edges(&mut self) { + let mut rng = rand::thread_rng(); + for i in 0..self.graph.len() { + self.graph[i].shuffle(&mut rng); + //We need to update the ids of the reverse edges. + for j in 0..self.graph[i].len() { + let target_v = self.graph[i][j].dest; + let target_rev = self.graph[i][j].rev; + self.graph[target_v][target_rev].rev = j; + } + } + } + + ///Computes an upper bound of the flow on the graph + pub fn flow_upper_bound(&self) -> u32 { + let idsource = self.vertextoid[&Vertex::Source]; + let mut flow_upper_bound = 0; + for edge in self.graph[idsource].iter() { + flow_upper_bound += edge.cap; + } + flow_upper_bound + } + + ///This function computes the maximal flow using Dinic's algorithm. It starts with + ///the flow values already present in the graph. So it is possible to add some edge to + ///the graph, compute a flow, add other edges, update the flow. + pub fn compute_maximal_flow(&mut self) -> Result<(), String> { + if !self.vertextoid.contains_key(&Vertex::Source) { + return Err("The graph does not contain a source.".to_string()); + } + if !self.vertextoid.contains_key(&Vertex::Sink) { + return Err("The graph does not contain a sink.".to_string()); + } + + let idsource = self.vertextoid[&Vertex::Source]; + let idsink = self.vertextoid[&Vertex::Sink]; + + let nb_vertices = self.graph.len(); + + let flow_upper_bound = self.flow_upper_bound(); + + //To ensure the dispersion of the associations generated by the + //assignation, we shuffle the neighbours of the nodes. Hence, + //the vertices do not consider their neighbours in the same order. + self.shuffle_edges(); + + //We run Dinic's max flow algorithm + loop { + //We build the level array from Dinic's algorithm. + let mut level = vec![None; nb_vertices]; + + let mut fifo = VecDeque::new(); + fifo.push_back((idsource, 0)); + while !fifo.is_empty() { + if let Some((id, lvl)) = fifo.pop_front() { + if level[id] == None { + //it means id has not yet been reached + level[id] = Some(lvl); + for edge in self.graph[id].iter() { + if edge.cap as i32 - edge.flow > 0 { + fifo.push_back((edge.dest, lvl + 1)); + } + } + } + } + } + if level[idsink] == None { + //There is no residual flow + break; + } + //Now we run DFS respecting the level array + let mut next_nbd = vec![0; nb_vertices]; + let mut lifo = VecDeque::new(); + + lifo.push_back((idsource, flow_upper_bound)); + + while let Some((id_tmp, f_tmp)) = lifo.back() { + let id = *id_tmp; + let f = *f_tmp; + if id == idsink { + //The DFS reached the sink, we can add a + //residual flow. + lifo.pop_back(); + while let Some((id, _)) = lifo.pop_back() { + let nbd = next_nbd[id]; + self.graph[id][nbd].flow += f as i32; + let id_rev = self.graph[id][nbd].dest; + let nbd_rev = self.graph[id][nbd].rev; + self.graph[id_rev][nbd_rev].flow -= f as i32; + } + lifo.push_back((idsource, flow_upper_bound)); + continue; + } + //else we did not reach the sink + let nbd = next_nbd[id]; + if nbd >= self.graph[id].len() { + //There is nothing to explore from id anymore + lifo.pop_back(); + if let Some((parent, _)) = lifo.back() { + next_nbd[*parent] += 1; + } + continue; + } + //else we can try to send flow from id to its nbd + let new_flow = min( + f as i32, + self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow, + ) as u32; + if new_flow == 0 { + next_nbd[id] += 1; + continue; + } + if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) { + if lvldest <= lvlid { + //We cannot send flow to nbd. + next_nbd[id] += 1; + continue; + } + } + //otherwise, we send flow to nbd. + lifo.push_back((self.graph[id][nbd].dest, new_flow)); + } + } + Ok(()) + } + + ///This function takes a flow, and a cost function on the edges, and tries to find an + /// equivalent flow with a better cost, by finding improving overflow cycles. It uses + /// as subroutine the Bellman Ford algorithm run up to path_length. + /// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and + /// only one needs to be present in the cost function. + pub fn optimize_flow_with_cost( + &mut self, + cost: &CostFunction, + path_length: usize, + ) -> Result<(), String> { + //We build the weighted graph g where we will look for negative cycle + let mut gf = self.build_cost_graph(cost)?; + let mut cycles = gf.list_negative_cycles(path_length); + while !cycles.is_empty() { + //we enumerate negative cycles + for c in cycles.iter() { + for i in 0..c.len() { + //We add one flow unit to the edge (u,v) of cycle c + let idu = self.vertextoid[&c[i]]; + let idv = self.vertextoid[&c[(i + 1) % c.len()]]; + for j in 0..self.graph[idu].len() { + //since idu appears at most once in the cycles, we enumerate every + //edge at most once. + let edge = self.graph[idu][j]; + if edge.dest == idv { + self.graph[idu][j].flow += 1; + self.graph[idv][edge.rev].flow -= 1; + break; + } + } + } + } + + gf = self.build_cost_graph(cost)?; + cycles = gf.list_negative_cycles(path_length); + } + Ok(()) + } + + ///Construct the weighted graph G_f from the flow and the cost function + fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> { + let mut g = Graph::<WeightedEdge>::new(&self.idtovertex); + let nb_vertices = self.idtovertex.len(); + for i in 0..nb_vertices { + for edge in self.graph[i].iter() { + if edge.cap as i32 - edge.flow > 0 { + //It is possible to send overflow through this edge + let u = self.idtovertex[i]; + let v = self.idtovertex[edge.dest]; + if cost.contains_key(&(u, v)) { + g.add_edge(u, v, cost[&(u, v)])?; + } else if cost.contains_key(&(v, u)) { + g.add_edge(u, v, -cost[&(v, u)])?; + } else { + g.add_edge(u, v, 0)?; + } + } + } + } + Ok(g) + } +} + +impl Graph<WeightedEdge> { + ///This function adds a single directed weighted edge to the graph. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> { + if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idu = self.vertextoid[&u]; + let idv = self.vertextoid[&v]; + self.graph[idu].push(WeightedEdge { w, dest: idv }); + Ok(()) + } + + ///This function lists the negative cycles it manages to find after path_length + ///iterations of the main loop of the Bellman-Ford algorithm. For the classical + ///algorithm, path_length needs to be equal to the number of vertices. However, + ///for particular graph structures like in our case, the algorithm is still correct + ///when path_length is the length of the longest possible simple path. + ///See the formal description of the algorithm for more details. + fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> { + let nb_vertices = self.graph.len(); + + //We start with every vertex at distance 0 of some imaginary extra -1 vertex. + let mut distance = vec![0; nb_vertices]; + //The prev vector collects for every vertex from where does the shortest path come + let mut prev = vec![None; nb_vertices]; + + for _ in 0..path_length + 1 { + for id in 0..nb_vertices { + for e in self.graph[id].iter() { + if distance[id] + e.w < distance[e.dest] { + distance[e.dest] = distance[id] + e.w; + prev[e.dest] = Some(id); + } + } + } + } + + //If self.graph contains a negative cycle, then at this point the graph described + //by prev (which is a directed 1-forest/functional graph) + //must contain a cycle. We list the cycles of prev. + let cycles_prev = cycles_of_1_forest(&prev); + + //Remark that the cycle in prev is in the reverse order compared to the cycle + //in the graph. Thus the .rev(). + return cycles_prev + .iter() + .map(|cycle| cycle.iter().rev().map(|id| self.idtovertex[*id]).collect()) + .collect(); + } +} + +///This function returns the list of cycles of a directed 1 forest. It does not +///check for the consistency of the input. +fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> { + let mut cycles = Vec::<Vec<usize>>::new(); + let mut time_of_discovery = vec![None; forest.len()]; + + for t in 0..forest.len() { + let mut id = t; + //while we are on a valid undiscovered node + while time_of_discovery[id] == None { + time_of_discovery[id] = Some(t); + if let Some(i) = forest[id] { + id = i; + } else { + break; + } + } + if forest[id] != None && time_of_discovery[id] == Some(t) { + //We discovered an id that we explored at this iteration t. + //It means we are on a cycle + let mut cy = vec![id; 1]; + let mut id2 = id; + while let Some(id_next) = forest[id2] { + id2 = id_next; + if id2 != id { + cy.push(id2); + } else { + break; + } + } + cycles.push(cy); + } + } + cycles +} diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index f517f36f..38e56b88 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -1,14 +1,27 @@ use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; +use std::collections::HashSet; + +use hex::ToHex; +use itertools::Itertools; use serde::{Deserialize, Serialize}; -use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; +use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::error::*; +use crate::graph_algo::*; + use crate::ring::*; +use std::convert::TryInto; + +const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; + +//The Message type will be used to collect information on the algorithm. +type Message = Vec<String>; + /// The layout of the cluster, i.e. the list of roles /// which are assigned to each cluster node #[derive(Clone, Debug, Serialize, Deserialize)] @@ -16,12 +29,21 @@ pub struct ClusterLayout { pub version: u64, pub replication_factor: usize, + + ///This attribute is only used to retain the previously computed partition size, + ///to know to what extent does it change with the layout update. + pub partition_size: u32, + ///Parameters used to compute the assignation currently given by + ///ring_assignation_data + pub parameters: LayoutParameters, + pub roles: LwwMap<Uuid, NodeRoleV>, /// node_id_vec: a vector of node IDs with a role assigned /// in the system (this includes gateway nodes). /// The order here is different than the vec stored by `roles`, because: - /// 1. non-gateway nodes are first so that they have lower numbers + /// 1. non-gateway nodes are first so that they have lower numbers holding + /// in u8 (the number of non-gateway nodes is at most 256). /// 2. nodes that don't have a role are excluded (but they need to /// stay in the CRDT as tombstones) pub node_id_vec: Vec<Uuid>, @@ -30,11 +52,24 @@ pub struct ClusterLayout { #[serde(with = "serde_bytes")] pub ring_assignation_data: Vec<CompactNodeType>, + /// Parameters to be used in the next partition assignation computation. + pub staged_parameters: Lww<LayoutParameters>, /// Role changes which are staged for the next version of the layout pub staging: LwwMap<Uuid, NodeRoleV>, pub staging_hash: Hash, } +///This struct is used to set the parameters to be used in the assignation computation +///algorithm. It is stored as a Crdt. +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct LayoutParameters { + pub zone_redundancy: usize, +} + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRoleV(pub Option<NodeRole>); @@ -45,12 +80,13 @@ impl AutoCrdt for NodeRoleV { /// The user-assigned roles of cluster nodes #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRole { - /// Datacenter at which this entry belong. This information might be used to perform a better - /// geodistribution + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution pub zone: String, - /// The (relative) capacity of the node + /// The capacity of the node /// If this is set to None, the node does not participate in storing data for the system /// and is only active as an API gateway to other nodes + // TODO : change the capacity to u64 and use byte unit input/output pub capacity: Option<u32>, /// A set of tags to recognize the node pub tags: Vec<String>, @@ -63,19 +99,43 @@ impl NodeRole { None => "gateway".to_string(), } } + + pub fn tags_string(&self) -> String { + let mut tags = String::new(); + if self.tags.is_empty() { + return tags; + } + tags.push_str(&self.tags[0].clone()); + for t in 1..self.tags.len() { + tags.push(','); + tags.push_str(&self.tags[t].clone()); + } + tags + } } +//Implementation of the ClusterLayout methods unrelated to the assignation algorithm. impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { + //We set the default zone redundancy to be equal to the replication factor, + //i.e. as strict as possible. + let parameters = LayoutParameters { + zone_redundancy: replication_factor, + }; + let staged_parameters = Lww::<LayoutParameters>::new(parameters.clone()); + let empty_lwwmap = LwwMap::new(); let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); ClusterLayout { version: 0, replication_factor, + partition_size: 0, roles: LwwMap::new(), node_id_vec: Vec::new(), ring_assignation_data: Vec::new(), + parameters, + staged_parameters, staging: empty_lwwmap, staging_hash: empty_lwwmap_hash, } @@ -88,20 +148,22 @@ impl ClusterLayout { true } Ordering::Equal => { + let param_changed = self.staged_parameters.get() != other.staged_parameters.get(); + self.staged_parameters.merge(&other.staged_parameters); self.staging.merge(&other.staging); let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); - let changed = new_staging_hash != self.staging_hash; + let stage_changed = new_staging_hash != self.staging_hash; self.staging_hash = new_staging_hash; - changed + stage_changed || param_changed } Ordering::Less => false, } } - pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> { match version { None => { let error = r#" @@ -120,16 +182,14 @@ To know the correct value of the new layout version, invoke `garage layout show` self.roles.merge(&self.staging); self.roles.retain(|(_, _, v)| v.0.is_some()); - if !self.calculate_partition_assignation() { - return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); - } + let msg = self.calculate_partition_assignation()?; self.staging.clear(); self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); self.version += 1; - Ok(self) + Ok((self, msg)) } pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { @@ -150,6 +210,7 @@ To know the correct value of the new layout version, invoke `garage layout show` self.staging.clear(); self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + self.staged_parameters.update(self.parameters.clone()); self.version += 1; @@ -174,7 +235,75 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + ///Returns the uuids of the non_gateway nodes in self.node_id_vec. + pub fn nongateway_nodes(&self) -> Vec<Uuid> { + let mut result = Vec::<Uuid>::new(); + for uuid in self.node_id_vec.iter() { + match self.node_role(uuid) { + Some(role) if role.capacity != None => result.push(*uuid), + _ => (), + } + } + result + } + + ///Given a node uuids, this function returns the label of its zone + pub fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> { + match self.node_role(uuid) { + Some(role) => Ok(role.zone.clone()), + _ => Err(Error::Message( + "The Uuid does not correspond to a node present in the cluster.".into(), + )), + } + } + + ///Given a node uuids, this function returns its capacity or fails if it does not have any + pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u32, Error> { + match self.node_role(uuid) { + Some(NodeRole { + capacity: Some(cap), + zone: _, + tags: _, + }) => Ok(*cap), + _ => Err(Error::Message( + "The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity." + .into(), + )), + } + } + + ///Returns the number of partitions associated to this node in the ring + pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> { + for (i, id) in self.node_id_vec.iter().enumerate() { + if id == uuid { + let mut count = 0; + for nod in self.ring_assignation_data.iter() { + if i as u8 == *nod { + count += 1 + } + } + return Ok(count); + } + } + Err(Error::Message( + "The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity." + .into(), + )) + } + + ///Returns the sum of capacities of non gateway nodes in the cluster + pub fn get_total_capacity(&self) -> Result<u32, Error> { + let mut total_capacity = 0; + for uuid in self.nongateway_nodes().iter() { + total_capacity += self.get_node_capacity(uuid)?; + } + Ok(total_capacity) + } + /// Check a cluster layout for internal consistency + /// (assignation, roles, parameters, partition size) /// returns true if consistent, false if error pub fn check(&self) -> bool { // Check that the hash of the staging data is correct @@ -217,448 +346,770 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + //Check that every partition is associated to distinct nodes + let rf = self.replication_factor; + for p in 0..(1 << PARTITION_BITS) { + let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec(); + if nodes_of_p.iter().unique().count() != rf { + return false; + } + //Check that every partition is spread over at least zone_redundancy zones. + let zones_of_p = nodes_of_p.iter().map(|n| { + self.get_node_zone(&self.node_id_vec[*n as usize]) + .expect("Zone not found.") + }); + let redundancy = self.parameters.zone_redundancy; + if zones_of_p.unique().count() < redundancy { + return false; + } + } + + //Check that the nodes capacities is consistent with the stored partitions + let mut node_usage = vec![0; MAX_NODE_NUMBER]; + for n in self.ring_assignation_data.iter() { + node_usage[*n as usize] += 1; + } + for (n, usage) in node_usage.iter().enumerate() { + if *usage > 0 { + let uuid = self.node_id_vec[n]; + if usage * self.partition_size + > self.get_node_capacity(&uuid).expect("Critical Error") + { + return false; + } + } + } + + //Check that the partition size stored is the one computed by the asignation + //algorithm. + let cl2 = self.clone(); + let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().expect("Critical Error"); + match cl2.compute_optimal_partition_size(&zone_to_id) { + Ok(s) if s != self.partition_size => return false, + Err(_) => return false, + _ => (), + } + true } +} - /// Calculate an assignation of partitions to nodes - pub fn calculate_partition_assignation(&mut self) -> bool { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); - - println!("Calculating updated partition assignation, this may take some time..."); - println!(); +//Implementation of the ClusterLayout methods related to the assignation algorithm. +impl ClusterLayout { + /// This function calculates a new partition-to-node assignation. + /// The computed assignation respects the node replication factor + /// and the zone redundancy parameter It maximizes the capacity of a + /// partition (assuming all partitions have the same size). + /// Among such optimal assignation, it minimizes the distance to + /// the former assignation (if any) to minimize the amount of + /// data to be moved. + // Staged role changes must be merged with nodes roles before calling this function, + // hence it must only be called from apply_staged_changes() and hence is not public. + fn calculate_partition_assignation(&mut self) -> Result<Message, Error> { + //We update the node ids, since the node role list might have changed with the + //changes in the layout. We retrieve the old_assignation reframed with new ids + let old_assignation_opt = self.update_node_id_vec()?; + + //We update the parameters + self.parameters = self.staged_parameters.get().clone(); + + let mut msg = Message::new(); + msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into()); + msg.push("".into()); + msg.push(format!( + "Partitions are \ + replicated {} times on at least {} distinct zones.", + self.replication_factor, self.parameters.zone_redundancy + )); + + //We generate for once numerical ids for the zones of non gateway nodes, + //to use them as indices in the flow graphs. + let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; + + let nb_nongateway_nodes = self.nongateway_nodes().len(); + if nb_nongateway_nodes < self.replication_factor { + return Err(Error::Message(format!( + "The number of nodes with positive \ + capacity ({}) is smaller than the replication factor ({}).", + nb_nongateway_nodes, self.replication_factor + ))); + } + if id_to_zone.len() < self.parameters.zone_redundancy { + return Err(Error::Message(format!( + "The number of zones with non-gateway \ + nodes ({}) is smaller than the redundancy parameter ({})", + id_to_zone.len(), + self.parameters.zone_redundancy + ))); + } - // Get old partition assignation - let old_partitions = self.parse_assignation_data(); + //We compute the optimal partition size + //Capacities should be given in a unit so that partition size is at least 100. + //In this case, integer rounding plays a marginal role in the percentages of + //optimality. + let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; + + if old_assignation_opt != None { + msg.push(format!( + "Optimal size of a partition: {} (was {} in the previous layout).", + partition_size, self.partition_size + )); + } else { + msg.push(format!( + "Given the replication and redundancy constraints, the \ + optimal size of a partition is {}.", + partition_size + )); + } + //We write the partition size. + self.partition_size = partition_size; + + if partition_size < 100 { + msg.push( + "WARNING: The partition size is low (< 100), you might consider to \ + provide the nodes capacities in a smaller unit (e.g. Mb instead of Gb)." + .into(), + ); + } - // Start new partition assignation with nodes from old assignation where it is relevant - let mut partitions = old_partitions - .iter() - .map(|old_part| { - let mut new_part = PartitionAss::new(); - for node in old_part.nodes.iter() { - if let Some(role) = node.1 { - if role.capacity.is_some() { - new_part.add(None, n_zones, node.0, role); - } - } - } - new_part - }) - .collect::<Vec<_>>(); + //We compute a first flow/assignation that is heuristically close to the previous + //assignation + let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?; + if let Some(assoc) = &old_assignation_opt { + //We minimize the distance to the previous assignation. + self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; + } - // In various cases, not enough nodes will have been added for all partitions - // in the step above (e.g. due to node removals, or new zones being added). - // Here we add more nodes to make a complete (but sub-optimal) assignation, - // using an initial partition assignation that is calculated using the multi-dc maglev trick - match self.initial_partition_assignation() { - Some(initial_partitions) => { - for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { - for (id, info) in ipart.nodes.iter() { - if part.nodes.len() < self.replication_factor { - part.add(None, n_zones, id, info.unwrap()); - } - } - assert!(part.nodes.len() == self.replication_factor); - } - } - None => { - // Not enough nodes in cluster to build a correct assignation. - // Signal it by returning an error. - return false; - } + //We display statistics of the computation + msg.append(&mut self.output_stat( + &gflow, + &old_assignation_opt, + &zone_to_id, + &id_to_zone, + )?); + msg.push("".to_string()); + + //We update the layout structure + self.update_ring_from_flow(id_to_zone.len(), &gflow)?; + + if !self.check() { + return Err(Error::Message( + "Critical error: The computed layout happens to be incorrect".into(), + )); } - // Calculate how many partitions each node should ideally store, - // and how many partitions they are storing with the current assignation - // This defines our target for which we will optimize in the following loop. - let total_capacity = configured_nodes - .iter() - .map(|(_, info)| info.capacity.unwrap_or(0)) - .sum::<u32>() as usize; - let total_partitions = self.replication_factor * (1 << PARTITION_BITS); - let target_partitions_per_node = configured_nodes + Ok(msg) + } + + /// The LwwMap of node roles might have changed. This function updates the node_id_vec + /// and returns the assignation given by ring, with the new indices of the nodes, and + /// None if the node is not present anymore. + /// We work with the assumption that only this function and calculate_new_assignation + /// do modify assignation_ring and node_id_vec. + fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> { + // (1) We compute the new node list + //Non gateway nodes should be coded on 8bits, hence they must be first in the list + //We build the new node ids + let mut new_non_gateway_nodes: Vec<Uuid> = self + .roles + .items() .iter() - .map(|(id, info)| { - ( - *id, - info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity, - ) - }) - .collect::<HashMap<&Uuid, usize>>(); - - let mut partitions_per_node = self.partitions_per_node(&partitions[..]); - - println!("Target number of partitions per node:"); - for (node, npart) in target_partitions_per_node.iter() { - println!("{:?}\t{}", node, npart); - } - println!(); - - // Shuffle partitions between nodes so that nodes will reach (or better approach) - // their target number of stored partitions - loop { - let mut option = None; - for (i, part) in partitions.iter_mut().enumerate() { - for (irm, (idrm, _)) in part.nodes.iter().enumerate() { - let errratio = |node, parts| { - let tgt = *target_partitions_per_node.get(node).unwrap() as f32; - (parts - tgt) / tgt - }; - let square = |x| x * x; - - let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32; - - for (idadd, infoadd) in configured_nodes.iter() { - // skip replacing a node by itself - // and skip replacing by gateway nodes - if idadd == idrm || infoadd.capacity.is_none() { - continue; - } + .filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity != None)) + .map(|(k, _, _)| *k) + .collect(); + + if new_non_gateway_nodes.len() > MAX_NODE_NUMBER { + return Err(Error::Message(format!( + "There are more than {} non-gateway nodes in the new \ + layout. This is not allowed.", + MAX_NODE_NUMBER + ))); + } - // We want to try replacing node idrm by node idadd - // if that brings us close to our goal. - let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32; - let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd)); - let newcost = - square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.)); - if newcost >= oldcost { - // not closer to our goal - continue; - } - let gain = oldcost - newcost; + let mut new_gateway_nodes: Vec<Uuid> = self + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity == None)) + .map(|(k, _, _)| *k) + .collect(); - let mut newpart = part.clone(); + let mut new_node_id_vec = Vec::<Uuid>::new(); + new_node_id_vec.append(&mut new_non_gateway_nodes); + new_node_id_vec.append(&mut new_gateway_nodes); - newpart.nodes.remove(irm); - if !newpart.add(None, n_zones, idadd, infoadd) { - continue; - } - assert!(newpart.nodes.len() == self.replication_factor); + let old_node_id_vec = self.node_id_vec.clone(); + self.node_id_vec = new_node_id_vec.clone(); - if !old_partitions[i] - .is_valid_transition_to(&newpart, self.replication_factor) - { - continue; - } + // (2) We retrieve the old association + //We rewrite the old association with the new indices. We only consider partition + //to node assignations where the node is still in use. + let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS]; - if option - .as_ref() - .map(|(old_gain, _, _, _, _)| gain > *old_gain) - .unwrap_or(true) - { - option = Some((gain, i, idadd, idrm, newpart)); - } - } - } - } - if let Some((_gain, i, idadd, idrm, newpart)) = option { - *partitions_per_node.entry(idadd).or_insert(0) += 1; - *partitions_per_node.get_mut(idrm).unwrap() -= 1; - partitions[i] = newpart; - } else { - break; - } + if self.ring_assignation_data.is_empty() { + //This is a new association + return Ok(None); + } + if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { + return Err(Error::Message( + "The old assignation does not have a size corresponding to \ + the old replication factor or the number of partitions." + .into(), + )); } - // Check we completed the assignation correctly - // (this is a set of checks for the algorithm's consistency) - assert!(partitions.len() == (1 << PARTITION_BITS)); - assert!(partitions - .iter() - .all(|p| p.nodes.len() == self.replication_factor)); - - let new_partitions_per_node = self.partitions_per_node(&partitions[..]); - assert!(new_partitions_per_node == partitions_per_node); - - // Show statistics - println!("New number of partitions per node:"); - for (node, npart) in partitions_per_node.iter() { - let tgt = *target_partitions_per_node.get(node).unwrap(); - let pct = 100f32 * (*npart as f32) / (tgt as f32); - println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt); - } - println!(); - - let mut diffcount = HashMap::new(); - for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) { - let nminus = oldpart.txtplus(newpart); - let nplus = newpart.txtplus(oldpart); - if nminus != "[...]" || nplus != "[...]" { - let tup = (nminus, nplus); - *diffcount.entry(tup).or_insert(0) += 1; - } + //We build a translation table between the uuid and new ids + let mut uuid_to_new_id = HashMap::<Uuid, usize>::new(); + + //We add the indices of only the new non-gateway nodes that can be used in the + //association ring + for (i, uuid) in new_node_id_vec.iter().enumerate() { + uuid_to_new_id.insert(*uuid, i); } - if diffcount.is_empty() { - println!("No data will be moved between nodes."); - } else { - let mut diffcount = diffcount.into_iter().collect::<Vec<_>>(); - diffcount.sort(); - println!("Number of partitions that move:"); - for ((nminus, nplus), npart) in diffcount { - println!("\t{}\t{} -> {}", npart, nminus, nplus); + + let rf = self.replication_factor; + for (p, old_assign_p) in old_assignation.iter_mut().enumerate() { + for old_id in &self.ring_assignation_data[p * rf..(p + 1) * rf] { + let uuid = old_node_id_vec[*old_id as usize]; + if uuid_to_new_id.contains_key(&uuid) { + old_assign_p.push(uuid_to_new_id[&uuid]); + } } } - println!(); - - // Calculate and save new assignation data - let (nodes, assignation_data) = - self.compute_assignation_data(&configured_nodes[..], &partitions[..]); - self.node_id_vec = nodes; - self.ring_assignation_data = assignation_data; + //We write the ring + self.ring_assignation_data = Vec::<CompactNodeType>::new(); - true + Ok(Some(old_assignation)) } - fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); - - // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) - let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); + ///This function generates ids for the zone of the nodes appearing in + ///self.node_id_vec. + fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> { + let mut id_to_zone = Vec::<String>::new(); + let mut zone_to_id = HashMap::<String, usize>::new(); + + for uuid in self.nongateway_nodes().iter() { + if self.roles.get(uuid) == None { + return Err(Error::Message( + "The uuid was not found in the node roles (this should \ + not happen, it might be a critical error)." + .into(), + )); + } + if let Some(r) = self.node_role(uuid) { + if !zone_to_id.contains_key(&r.zone) && r.capacity != None { + zone_to_id.insert(r.zone.clone(), id_to_zone.len()); + id_to_zone.push(r.zone.clone()); + } + } + } + Ok((id_to_zone, zone_to_id)) + } - // Prepare ring - let mut partitions: Vec<PartitionAss> = partitions_idx - .iter() - .map(|_i| PartitionAss::new()) - .collect::<Vec<_>>(); + ///This function computes by dichotomy the largest realizable partition size, given + ///the layout roles and parameters. + fn compute_optimal_partition_size( + &self, + zone_to_id: &HashMap<String, usize>, + ) -> Result<u32, Error> { + let empty_set = HashSet::<(usize, usize)>::new(); + let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? + < (NB_PARTITIONS * self.replication_factor) + .try_into() + .unwrap() + { + return Err(Error::Message( + "The storage capacity of he cluster is to small. It is \ + impossible to store partitions of size 1." + .into(), + )); + } - // Create MagLev priority queues for each node - let mut queues = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(node_id, node_info)| { - let mut parts = partitions_idx - .iter() - .map(|i| { - let part_data = - [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat(); - (*i, fasthash(&part_data[..])) - }) - .collect::<Vec<_>>(); - parts.sort_by_key(|(_i, h)| *h); - let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>(); - (node_id, node_info, parts_i, 0) - }) - .collect::<Vec<_>>(); + let mut s_down = 1; + let mut s_up = self.get_total_capacity()?; + while s_down + 1 < s_up { + g = self.generate_flow_graph((s_down + s_up) / 2, zone_to_id, &empty_set)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? + < (NB_PARTITIONS * self.replication_factor) + .try_into() + .unwrap() + { + s_up = (s_down + s_up) / 2; + } else { + s_down = (s_down + s_up) / 2; + } + } - let max_capacity = configured_nodes - .iter() - .filter_map(|(_, node_info)| node_info.capacity) - .fold(0, std::cmp::max); - - // Fill up ring - for rep in 0..self.replication_factor { - queues.sort_by_key(|(ni, _np, _q, _p)| { - let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); - fasthash(&queue_data[..]) - }); + Ok(s_down) + } - for (_, _, _, pos) in queues.iter_mut() { - *pos = 0; + fn generate_graph_vertices(nb_zones: usize, nb_nodes: usize) -> Vec<Vertex> { + let mut vertices = vec![Vertex::Source, Vertex::Sink]; + for p in 0..NB_PARTITIONS { + vertices.push(Vertex::Pup(p)); + vertices.push(Vertex::Pdown(p)); + for z in 0..nb_zones { + vertices.push(Vertex::PZ(p, z)); } + } + for n in 0..nb_nodes { + vertices.push(Vertex::N(n)); + } + vertices + } - let mut remaining = partitions_idx.len(); - while remaining > 0 { - let remaining0 = remaining; - for i_round in 0..max_capacity { - for (node_id, node_info, q, pos) in queues.iter_mut() { - if i_round >= node_info.capacity.unwrap() { - continue; - } - for (pos2, &qv) in q.iter().enumerate().skip(*pos) { - if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) { - remaining -= 1; - *pos = pos2 + 1; - break; - } - } - } - } - if remaining == remaining0 { - // No progress made, exit - return None; + ///Generates the graph to compute the maximal flow corresponding to the optimal + ///partition assignation. + ///exclude_assoc is the set of (partition, node) association that we are forbidden + ///to use (hence we do not add the corresponding edge to the graph). This parameter + ///is used to compute a first flow that uses only edges appearing in the previous + ///assignation. This produces a solution that heuristically should be close to the + ///previous one. + fn generate_flow_graph( + &self, + partition_size: u32, + zone_to_id: &HashMap<String, usize>, + exclude_assoc: &HashSet<(usize, usize)>, + ) -> Result<Graph<FlowEdge>, Error> { + let vertices = + ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); + let mut g = Graph::<FlowEdge>::new(&vertices); + let nb_zones = zone_to_id.len(); + let redundancy = self.parameters.zone_redundancy; + for p in 0..NB_PARTITIONS { + g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u32)?; + g.add_edge( + Vertex::Source, + Vertex::Pdown(p), + (self.replication_factor - redundancy) as u32, + )?; + for z in 0..nb_zones { + g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?; + g.add_edge( + Vertex::Pdown(p), + Vertex::PZ(p, z), + self.replication_factor as u32, + )?; + } + } + for n in 0..self.nongateway_nodes().len() { + let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?]; + g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; + for p in 0..NB_PARTITIONS { + if !exclude_assoc.contains(&(p, n)) { + g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?; } } } - - Some(partitions) + Ok(g) } - fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) { - let configured_nodes = self - .roles - .items() - .iter() - .filter(|(_id, _, info)| info.0.is_some()) - .map(|(id, _, info)| (id, info.0.as_ref().unwrap())) - .collect::<Vec<(&Uuid, &NodeRole)>>(); + ///This function computes a first optimal assignation (in the form of a flow graph). + fn compute_candidate_assignation( + &self, + zone_to_id: &HashMap<String, usize>, + prev_assign_opt: &Option<Vec<Vec<usize>>>, + ) -> Result<Graph<FlowEdge>, Error> { + //We list the (partition,node) associations that are not used in the + //previous assignation + let mut exclude_edge = HashSet::<(usize, usize)>::new(); + if let Some(prev_assign) = prev_assign_opt { + let nb_nodes = self.nongateway_nodes().len(); + for (p, prev_assign_p) in prev_assign.iter().enumerate() { + for n in 0..nb_nodes { + exclude_edge.insert((p, n)); + } + for n in prev_assign_p.iter() { + exclude_edge.remove(&(p, *n)); + } + } + } - let zones = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(_id, info)| info.zone.as_str()) - .collect::<HashSet<&str>>(); + //We compute the best flow using only the edges used in the previous assignation + let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?; + g.compute_maximal_flow()?; - (configured_nodes, zones) + //We add the excluded edges and compute the maximal flow with the full graph. + //The algorithm is such that it will start with the flow that we just computed + //and find ameliorating paths from that. + for (p, n) in exclude_edge.iter() { + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; + } + g.compute_maximal_flow()?; + Ok(g) } - fn compute_assignation_data<'a>( + ///This function updates the flow graph gflow to minimize the distance between + ///its corresponding assignation and the previous one + fn minimize_rebalance_load( &self, - configured_nodes: &[(&'a Uuid, &'a NodeRole)], - partitions: &[PartitionAss<'a>], - ) -> (Vec<Uuid>, Vec<CompactNodeType>) { - assert!(partitions.len() == (1 << PARTITION_BITS)); - - // Make a canonical order for nodes - let mut nodes = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(id, _)| **id) - .collect::<Vec<_>>(); - let nodes_rev = nodes - .iter() - .enumerate() - .map(|(i, id)| (*id, i as CompactNodeType)) - .collect::<HashMap<Uuid, CompactNodeType>>(); - - let mut assignation_data = vec![]; - for partition in partitions.iter() { - assert!(partition.nodes.len() == self.replication_factor); - for (id, _) in partition.nodes.iter() { - assignation_data.push(*nodes_rev.get(id).unwrap()); + gflow: &mut Graph<FlowEdge>, + zone_to_id: &HashMap<String, usize>, + prev_assign: &[Vec<usize>], + ) -> Result<(), Error> { + //We define a cost function on the edges (pairs of vertices) corresponding + //to the distance between the two assignations. + let mut cost = CostFunction::new(); + for (p, assoc_p) in prev_assign.iter().enumerate() { + for n in assoc_p.iter() { + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1); } } - nodes.extend( - configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_none()) - .map(|(id, _)| **id), - ); + //We compute the maximal length of a simple path in gflow. It is used in the + //Bellman-Ford algorithm in optimize_flow_with_cost to set the number + //of iterations. + let nb_nodes = self.nongateway_nodes().len(); + let path_length = 4 * nb_nodes; + gflow.optimize_flow_with_cost(&cost, path_length)?; - (nodes, assignation_data) + Ok(()) } - fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> { - if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) { - // If the previous assignation data is correct, use that - let mut partitions = vec![]; - for i in 0..(1 << PARTITION_BITS) { - let mut part = PartitionAss::new(); - for node_i in self.ring_assignation_data - [i * self.replication_factor..(i + 1) * self.replication_factor] - .iter() - { - let node_id = &self.node_id_vec[*node_i as usize]; - - if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) { - part.nodes.push((node_id, Some(info))); - } else { - part.nodes.push((node_id, None)); + ///This function updates the assignation ring from the flow graph. + fn update_ring_from_flow( + &mut self, + nb_zones: usize, + gflow: &Graph<FlowEdge>, + ) -> Result<(), Error> { + self.ring_assignation_data = Vec::<CompactNodeType>::new(); + for p in 0..NB_PARTITIONS { + for z in 0..nb_zones { + let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + for vertex in assoc_vertex.iter() { + if let Vertex::N(n) = vertex { + self.ring_assignation_data.push((*n).try_into().unwrap()); } } - partitions.push(part); } - partitions - } else { - // Otherwise start fresh - (0..(1 << PARTITION_BITS)) - .map(|_| PartitionAss::new()) - .collect() } + + if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { + return Err(Error::Message( + "Critical Error : the association ring we produced does not \ + have the right size." + .into(), + )); + } + Ok(()) } - fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> { - let mut partitions_per_node = HashMap::<&Uuid, usize>::new(); - for p in partitions.iter() { - for (id, _) in p.nodes.iter() { - *partitions_per_node.entry(*id).or_insert(0) += 1; + ///This function returns a message summing up the partition repartition of the new + ///layout, and other statistics of the partition assignation computation. + fn output_stat( + &self, + gflow: &Graph<FlowEdge>, + prev_assign_opt: &Option<Vec<Vec<usize>>>, + zone_to_id: &HashMap<String, usize>, + id_to_zone: &[String], + ) -> Result<Message, Error> { + let mut msg = Message::new(); + + let used_cap = self.partition_size * NB_PARTITIONS as u32 * self.replication_factor as u32; + let total_cap = self.get_total_capacity()?; + let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); + msg.push("".into()); + msg.push(format!( + "Usable capacity / Total cluster capacity: {} / {} ({:.1} %)", + used_cap, total_cap, percent_cap + )); + msg.push("".into()); + msg.push( + "If the percentage is to low, it might be that the \ + replication/redundancy constraints force the use of nodes/zones with small \ + storage capacities. \ + You might want to rebalance the storage capacities or relax the constraints. \ + See the detailed statistics below and look for saturated nodes/zones." + .into(), + ); + msg.push(format!( + "Recall that because of the replication factor, the actual available \ + storage capacity is {} / {} = {}.", + used_cap, + self.replication_factor, + used_cap / self.replication_factor as u32 + )); + + //We define and fill in the following tables + let storing_nodes = self.nongateway_nodes(); + let mut new_partitions = vec![0; storing_nodes.len()]; + let mut stored_partitions = vec![0; storing_nodes.len()]; + + let mut new_partitions_zone = vec![0; id_to_zone.len()]; + let mut stored_partitions_zone = vec![0; id_to_zone.len()]; + + for p in 0..NB_PARTITIONS { + for z in 0..id_to_zone.len() { + let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + if !pz_nodes.is_empty() { + stored_partitions_zone[z] += 1; + if let Some(prev_assign) = prev_assign_opt { + let mut old_zones_of_p = Vec::<usize>::new(); + for n in prev_assign[p].iter() { + old_zones_of_p + .push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]); + } + if !old_zones_of_p.contains(&z) { + new_partitions_zone[z] += 1; + } + } + } + for vert in pz_nodes.iter() { + if let Vertex::N(n) = *vert { + stored_partitions[n] += 1; + if let Some(prev_assign) = prev_assign_opt { + if !prev_assign[p].contains(&n) { + new_partitions[n] += 1; + } + } + } + } + } + } + + if *prev_assign_opt == None { + new_partitions = stored_partitions.clone(); + new_partitions_zone = stored_partitions_zone.clone(); + } + + //We display the statistics + + msg.push("".into()); + if *prev_assign_opt != None { + let total_new_partitions: usize = new_partitions.iter().sum(); + msg.push(format!( + "A total of {} new copies of partitions need to be \ + transferred.", + total_new_partitions + )); + } + msg.push("".into()); + msg.push("==== DETAILED STATISTICS BY ZONES AND NODES ====".into()); + + for z in 0..id_to_zone.len() { + let mut nodes_of_z = Vec::<usize>::new(); + for n in 0..storing_nodes.len() { + if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] { + nodes_of_z.push(n); + } + } + let replicated_partitions: usize = + nodes_of_z.iter().map(|n| stored_partitions[*n]).sum(); + msg.push("".into()); + + msg.push(format!( + "Zone {}: {} distinct partitions stored ({} new, \ + {} partition copies) ", + id_to_zone[z], + stored_partitions_zone[z], + new_partitions_zone[z], + replicated_partitions + )); + + let available_cap_z: u32 = self.partition_size * replicated_partitions as u32; + let mut total_cap_z = 0; + for n in nodes_of_z.iter() { + total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; + } + let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); + msg.push(format!( + " Usable capacity / Total capacity: {}/{} ({:.1}%).", + available_cap_z, total_cap_z, percent_cap_z + )); + + for n in nodes_of_z.iter() { + let available_cap_n = stored_partitions[*n] as u32 * self.partition_size; + let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; + let tags_n = (self + .node_role(&self.node_id_vec[*n]) + .ok_or("Node not found."))? + .tags_string(); + msg.push(format!( + " Node {}: {} partitions ({} new) ; \ + usable/total capacity: {} / {} ({:.1}%) ; tags:{}", + &self.node_id_vec[*n].to_vec()[0..2] + .to_vec() + .encode_hex::<String>(), + stored_partitions[*n], + new_partitions[*n], + available_cap_n, + total_cap_n, + (available_cap_n as f32) / (total_cap_n as f32) * 100.0, + tags_n + )); } } - partitions_per_node + + Ok(msg) } } -// ---- Internal structs for partition assignation in layout ---- +//==================================================================================== + +#[cfg(test)] +mod tests { + use super::{Error, *}; + use std::cmp::min; + + //This function checks that the partition size S computed is at least better than the + //one given by a very naive algorithm. To do so, we try to run the naive algorithm + //assuming a partion size of S+1. If we succed, it means that the optimal assignation + //was not optimal. The naive algorithm is the following : + //- we compute the max number of partitions associated to every node, capped at the + //partition number. It gives the number of tokens of every node. + //- every zone has a number of tokens equal to the sum of the tokens of its nodes. + //- we cycle over the partitions and associate zone tokens while respecting the + //zone redundancy constraint. + //NOTE: the naive algorithm is not optimal. Counter example: + //take nb_partition = 3 ; replication_factor = 5; redundancy = 4; + //number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) + //With these parameters, the naive algo fails, whereas there is a solution: + //(A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) + fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> { + let over_size = cl.partition_size + 1; + let mut zone_token = HashMap::<String, usize>::new(); + + let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?; + + if zones.is_empty() { + return Ok(false); + } -#[derive(Clone)] -struct PartitionAss<'a> { - nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>, -} + for z in zones.iter() { + zone_token.insert(z.clone(), 0); + } + for uuid in cl.nongateway_nodes().iter() { + let z = cl.get_node_zone(uuid)?; + let c = cl.get_node_capacity(uuid)?; + zone_token.insert( + z.clone(), + zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize), + ); + } -impl<'a> PartitionAss<'a> { - fn new() -> Self { - Self { nodes: Vec::new() } - } + //For every partition, we count the number of zone already associated and + //the name of the last zone associated - fn nplus(&self, other: &PartitionAss<'a>) -> usize { - self.nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .count() - } + let mut id_zone_token = vec![0; zones.len()]; + for (z, t) in zone_token.iter() { + id_zone_token[zone_to_id[z]] = *t; + } - fn txtplus(&self, other: &PartitionAss<'a>) -> String { - let mut nodes = self - .nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .map(|x| format!("{:?}", x.0)) - .collect::<Vec<_>>(); - nodes.sort(); - if self.nodes.iter().any(|x| other.nodes.contains(x)) { - nodes.push("...".into()); + let mut nb_token = vec![0; NB_PARTITIONS]; + let mut last_zone = vec![zones.len(); NB_PARTITIONS]; + + let mut curr_zone = 0; + + let redundancy = cl.parameters.zone_redundancy; + + for replic in 0..cl.replication_factor { + for p in 0..NB_PARTITIONS { + while id_zone_token[curr_zone] == 0 + || (last_zone[p] == curr_zone + && redundancy - nb_token[p] <= cl.replication_factor - replic) + { + curr_zone += 1; + if curr_zone >= zones.len() { + return Ok(true); + } + } + id_zone_token[curr_zone] -= 1; + if last_zone[p] != curr_zone { + nb_token[p] += 1; + last_zone[p] = curr_zone; + } + } } - format!("[{}]", nodes.join(" ")) - } - fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool { - let min_keep_nodes_per_part = (replication_factor + 1) / 2; - let n_removed = self.nplus(other); + return Ok(false); + } - if self.nodes.len() <= min_keep_nodes_per_part { - n_removed == 0 - } else { - n_removed <= self.nodes.len() - min_keep_nodes_per_part + fn show_msg(msg: &Message) { + for s in msg.iter() { + println!("{}", s); } } - // add is a key function in creating a PartitionAss, i.e. the list of nodes - // to which a partition is assigned. It tries to add a certain node id to the - // assignation, but checks that doing so is compatible with the NECESSARY - // condition that the partition assignation must be dispersed over different - // zones (datacenters) if enough zones exist. This is why it takes a n_zones - // parameter, which is the total number of zones that have existing nodes: - // if nodes in the assignation already cover all n_zones zones, then any node - // that is not yet in the assignation can be added. Otherwise, only nodes - // that are in a new zone can be added. - fn add( - &mut self, - target_len: Option<usize>, - n_zones: usize, - node: &'a Uuid, - role: &'a NodeRole, - ) -> bool { - if let Some(tl) = target_len { - if self.nodes.len() != tl - 1 { - return false; + fn update_layout( + cl: &mut ClusterLayout, + node_id_vec: &Vec<u8>, + node_capacity_vec: &Vec<u32>, + node_zone_vec: &Vec<String>, + zone_redundancy: usize, + ) { + for i in 0..node_id_vec.len() { + if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) { + cl.node_id_vec.push(x); } - } - let p_zns = self - .nodes - .iter() - .map(|(_id, info)| info.unwrap().zone.as_str()) - .collect::<HashSet<&str>>(); - if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str())) - || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node)) - { - self.nodes.push((node, Some(role))); - true - } else { - false + let update = cl.staging.update_mutator( + cl.node_id_vec[i], + NodeRoleV(Some(NodeRole { + zone: (node_zone_vec[i].to_string()), + capacity: (Some(node_capacity_vec[i])), + tags: (vec![]), + })), + ); + cl.staging.merge(&update); } + cl.staging_hash = blake2sum(&rmp_to_vec_all_named(&cl.staging).unwrap()[..]); + cl.staged_parameters + .update(LayoutParameters { zone_redundancy }); + } + + #[test] + fn test_assignation() { + let mut node_id_vec = vec![1, 2, 3]; + let mut node_capacity_vec = vec![4000, 1000, 2000]; + let mut node_zone_vec = vec!["A", "B", "C"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + + let mut cl = ClusterLayout::new(3); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert!(cl.check()); + assert!(matches!(check_against_naive(&cl), Ok(true))); + + node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; + node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; + node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 2); + let v = cl.version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert!(cl.check()); + assert!(matches!(check_against_naive(&cl), Ok(true))); + + node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert!(cl.check()); + assert!(matches!(check_against_naive(&cl), Ok(true))); + + node_capacity_vec = vec![ + 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000, + ]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 1); + let v = cl.version; + let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert!(cl.check()); + assert!(matches!(check_against_naive(&cl), Ok(true))); } } diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 92caf75d..248b9b52 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -8,6 +8,7 @@ mod consul; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; +pub mod graph_algo; pub mod layout; pub mod ring; pub mod system; diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 73a126a2..743a5cba 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -40,6 +40,7 @@ pub struct Ring { // Type to store compactly the id of a node in the system // Change this to u16 the day we want to have more than 256 nodes in a cluster pub type CompactNodeType = u8; +pub const MAX_NODE_NUMBER: usize = 256; // The maximum number of times an object might get replicated // This must be at least 3 because Garage supports 3-way replication |