From 4abab246f1113a9a1988fdfca81c1dd8ffa323c8 Mon Sep 17 00:00:00 2001 From: Mendes Date: Mon, 10 Oct 2022 17:21:13 +0200 Subject: cargo fmt --- src/api/admin/cluster.rs | 8 +- src/db/lib.rs | 1 - src/garage/cli/layout.rs | 94 ++-- src/garage/cli/structs.rs | 7 +- src/rpc/graph_algo.rs | 754 ++++++++++++------------- src/rpc/layout.rs | 1332 ++++++++++++++++++++++++--------------------- src/rpc/lib.rs | 3 +- src/rpc/system.rs | 1 - 8 files changed, 1162 insertions(+), 1038 deletions(-) diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 630179b5..da3d8c44 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -163,10 +163,10 @@ pub async fn handle_apply_cluster_layout( let layout = garage.system.get_cluster_layout(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; - //TODO : how to display msg ? Should it be in the Body Response ? - for s in msg.iter() { - println!("{}", s); - } + //TODO : how to display msg ? Should it be in the Body Response ? + for s in msg.iter() { + println!("{}", s); + } garage.system.update_cluster_layout(&layout).await?; diff --git a/src/db/lib.rs b/src/db/lib.rs index af539494..0a776a91 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -4,7 +4,6 @@ extern crate tracing; #[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))] //compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite."); - #[cfg(feature = "lmdb")] pub mod lmdb_adapter; #[cfg(feature = "sled")] diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 32f637eb..f747fbe4 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -27,9 +27,9 @@ pub async fn cli_layout_command_dispatch( LayoutOperation::Revert(revert_opt) => { cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await } - LayoutOperation::Config(config_opt) => { - cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await - } + LayoutOperation::Config(config_opt) => { + cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await + } } } @@ -188,30 +188,37 @@ pub async fn cmd_show_layout( println!("No nodes have a role in the new layout."); } println!(); - + println!("==== PARAMETERS OF THE LAYOUT COMPUTATION ===="); - println!("Zone redundancy: {}", layout.staged_parameters.get().zone_redundancy); + println!( + "Zone redundancy: {}", + layout.staged_parameters.get().zone_redundancy + ); println!(); // this will print the stats of what partitions // will move around when we apply - match layout.calculate_partition_assignation() { - Ok(msg) => { - for line in msg.iter() { - println!("{}", line); - } - println!("To enact the staged role changes, type:"); - println!(); - println!(" garage layout apply --version {}", layout.version + 1); - println!(); - println!( + match layout.calculate_partition_assignation() { + Ok(msg) => { + for line in msg.iter() { + println!("{}", line); + } + println!("To enact the staged role changes, type:"); + println!(); + println!(" garage layout apply --version {}", layout.version + 1); + println!(); + println!( "You can also revert all proposed changes with: garage layout revert --version {}", - layout.version + 1)}, - Err(Error::Message(s)) => { - println!("Error while trying to compute the assignation: {}", s); - println!("This new layout cannot yet be applied.");}, - _ => { println!("Unknown Error"); }, - } + layout.version + 1) + } + Err(Error::Message(s)) => { + println!("Error while trying to compute the assignation: {}", s); + println!("This new layout cannot yet be applied."); + } + _ => { + println!("Unknown Error"); + } + } } Ok(()) @@ -225,9 +232,9 @@ pub async fn cmd_apply_layout( let layout = fetch_layout(rpc_cli, rpc_host).await?; let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?; - for line in msg.iter() { - println!("{}", line); - } + for line in msg.iter() { + println!("{}", line); + } send_layout(rpc_cli, rpc_host, layout).await?; @@ -258,26 +265,29 @@ pub async fn cmd_config_layout( config_opt: ConfigLayoutOpt, ) -> Result<(), Error> { let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - match config_opt.redundancy { - None => (), - Some(r) => { - if r > layout.replication_factor { - println!("The zone redundancy must be smaller or equal to the \ - replication factor ({}).", layout.replication_factor); - } - else if r < 1 { - println!("The zone redundancy must be at least 1."); - } - else { - layout.staged_parameters.update(LayoutParameters{ zone_redundancy: r }); - println!("The new zone redundancy has been saved ({}).", r); - } - } - } + + match config_opt.redundancy { + None => (), + Some(r) => { + if r > layout.replication_factor { + println!( + "The zone redundancy must be smaller or equal to the \ + replication factor ({}).", + layout.replication_factor + ); + } else if r < 1 { + println!("The zone redundancy must be at least 1."); + } else { + layout + .staged_parameters + .update(LayoutParameters { zone_redundancy: r }); + println!("The new zone redundancy has been saved ({}).", r); + } + } + } send_layout(rpc_cli, rpc_host, layout).await?; - Ok(()) + Ok(()) } // --- utility --- diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 896379bb..02ed8992 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -86,10 +86,10 @@ pub enum LayoutOperation { /// Remove role from Garage cluster node #[structopt(name = "remove", version = garage_version())] Remove(RemoveRoleOpt), - - /// Configure parameters value for the layout computation + + /// Configure parameters value for the layout computation #[structopt(name = "config", version = garage_version())] - Config(ConfigLayoutOpt), + Config(ConfigLayoutOpt), /// Show roles currently assigned to nodes and changes staged for commit #[structopt(name = "show", version = garage_version())] @@ -104,7 +104,6 @@ pub enum LayoutOperation { Revert(RevertLayoutOpt), } - #[derive(StructOpt, Debug)] pub struct AssignRoleOpt { /// Node(s) to which to assign role (prefix of hexadecimal node id) diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs index 70ccf35a..13c60692 100644 --- a/src/rpc/graph_algo.rs +++ b/src/rpc/graph_algo.rs @@ -1,42 +1,40 @@ - //! This module deals with graph algorithms. //! It is used in layout.rs to build the partition to node assignation. use rand::prelude::SliceRandom; use std::cmp::{max, min}; -use std::collections::VecDeque; use std::collections::HashMap; +use std::collections::VecDeque; //Vertex data structures used in all the graphs used in layout.rs. //usize parameters correspond to node/zone/partitions ids. //To understand the vertex roles below, please refer to the formal description //of the layout computation algorithm. -#[derive(Clone,Copy,Debug, PartialEq, Eq, Hash)] -pub enum Vertex{ - Source, - Pup(usize), //The vertex p+ of partition p - Pdown(usize), //The vertex p- of partition p - PZ(usize,usize), //The vertex corresponding to x_(partition p, zone z) - N(usize), //The vertex corresponding to node n - Sink +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Vertex { + Source, + Pup(usize), //The vertex p+ of partition p + Pdown(usize), //The vertex p- of partition p + PZ(usize, usize), //The vertex corresponding to x_(partition p, zone z) + N(usize), //The vertex corresponding to node n + Sink, } - //Edge data structure for the flow algorithm. //The graph is stored as an adjacency list #[derive(Clone, Copy, Debug)] pub struct FlowEdge { - cap: u32, //flow maximal capacity of the edge - flow: i32, //flow value on the edge - dest: usize, //destination vertex id - rev: usize, //index of the reversed edge (v, self) in the edge list of vertex v + cap: u32, //flow maximal capacity of the edge + flow: i32, //flow value on the edge + dest: usize, //destination vertex id + rev: usize, //index of the reversed edge (v, self) in the edge list of vertex v } //Edge data structure for the detection of negative cycles. //The graph is stored as a list of edges (u,v). #[derive(Clone, Copy, Debug)] pub struct WeightedEdge { - w: i32, //weight of the edge + w: i32, //weight of the edge dest: usize, } @@ -47,375 +45,377 @@ impl Edge for WeightedEdge {} //Struct for the graph structure. We do encapsulation here to be able to both //provide user friendly Vertex enum to address vertices, and to use usize indices //and Vec instead of HashMap in the graph algorithm to optimize execution speed. -pub struct Graph{ - vertextoid : HashMap, - idtovertex : Vec, - - graph : Vec< Vec > -} +pub struct Graph { + vertextoid: HashMap, + idtovertex: Vec, -pub type CostFunction = HashMap<(Vertex,Vertex), i32>; - -impl Graph{ - pub fn new(vertices : &[Vertex]) -> Self { - let mut map = HashMap::::new(); - for (i, vert) in vertices.iter().enumerate(){ - map.insert(*vert , i); - } - Graph:: { - vertextoid : map, - idtovertex: vertices.to_vec(), - graph : vec![Vec::< E >::new(); vertices.len() ] - } - } + graph: Vec>, } -impl Graph{ - //This function adds a directed edge to the graph with capacity c, and the - //corresponding reversed edge with capacity 0. - pub fn add_edge(&mut self, u: Vertex, v:Vertex, c: u32) -> Result<(), String>{ - if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); - } - let idu = self.vertextoid[&u]; - let idv = self.vertextoid[&v]; - let rev_u = self.graph[idu].len(); - let rev_v = self.graph[idv].len(); - self.graph[idu].push( FlowEdge{cap: c , dest: idv , flow: 0, rev : rev_v} ); - self.graph[idv].push( FlowEdge{cap: 0 , dest: idu , flow: 0, rev : rev_u} ); - Ok(()) - } - - //This function returns the list of vertices that receive a positive flow from - //vertex v. - pub fn get_positive_flow_from(&self , v:Vertex) -> Result< Vec , String>{ - if !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); - } - let idv = self.vertextoid[&v]; - let mut result = Vec::::new(); - for edge in self.graph[idv].iter() { - if edge.flow > 0 { - result.push(self.idtovertex[edge.dest]); - } - } - Ok(result) - } - - - //This function returns the value of the flow incoming to v. - pub fn get_inflow(&self , v:Vertex) -> Result< i32 , String>{ - if !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); - } - let idv = self.vertextoid[&v]; - let mut result = 0; - for edge in self.graph[idv].iter() { - result += max(0,self.graph[edge.dest][edge.rev].flow); - } - Ok(result) - } - - //This function returns the value of the flow outgoing from v. - pub fn get_outflow(&self , v:Vertex) -> Result< i32 , String>{ - if !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); - } - let idv = self.vertextoid[&v]; - let mut result = 0; - for edge in self.graph[idv].iter() { - result += max(0,edge.flow); - } - Ok(result) - } - - //This function computes the flow total value by computing the outgoing flow - //from the source. - pub fn get_flow_value(&mut self) -> Result { - self.get_outflow(Vertex::Source) - } - - //This function shuffles the order of the edge lists. It keeps the ids of the - //reversed edges consistent. - fn shuffle_edges(&mut self) { - let mut rng = rand::thread_rng(); - for i in 0..self.graph.len() { - self.graph[i].shuffle(&mut rng); - //We need to update the ids of the reverse edges. - for j in 0..self.graph[i].len() { - let target_v = self.graph[i][j].dest; - let target_rev = self.graph[i][j].rev; - self.graph[target_v][target_rev].rev = j; - } - } - } - - //Computes an upper bound of the flow n the graph - pub fn flow_upper_bound(&self) -> u32{ - let idsource = self.vertextoid[&Vertex::Source]; - let mut flow_upper_bound = 0; - for edge in self.graph[idsource].iter(){ - flow_upper_bound += edge.cap; - } - flow_upper_bound - } - - //This function computes the maximal flow using Dinic's algorithm. It starts with - //the flow values already present in the graph. So it is possible to add some edge to - //the graph, compute a flow, add other edges, update the flow. - pub fn compute_maximal_flow(&mut self) -> Result<(), String> { - if !self.vertextoid.contains_key(&Vertex::Source) { - return Err("The graph does not contain a source.".to_string()); - } - if !self.vertextoid.contains_key(&Vertex::Sink) { - return Err("The graph does not contain a sink.".to_string()); - } - - let idsource = self.vertextoid[&Vertex::Source]; - let idsink = self.vertextoid[&Vertex::Sink]; - - let nb_vertices = self.graph.len(); - - let flow_upper_bound = self.flow_upper_bound(); - - //To ensure the dispersion of the associations generated by the - //assignation, we shuffle the neighbours of the nodes. Hence, - //the vertices do not consider their neighbours in the same order. - self.shuffle_edges(); - - //We run Dinic's max flow algorithm - loop { - //We build the level array from Dinic's algorithm. - let mut level = vec![None; nb_vertices]; - - let mut fifo = VecDeque::new(); - fifo.push_back((idsource, 0)); - while !fifo.is_empty() { - if let Some((id, lvl)) = fifo.pop_front() { - if level[id] == None { //it means id has not yet been reached - level[id] = Some(lvl); - for edge in self.graph[id].iter() { - if edge.cap as i32 - edge.flow > 0 { - fifo.push_back((edge.dest, lvl + 1)); - } - } - } - } - } - if level[idsink] == None { - //There is no residual flow - break; - } - //Now we run DFS respecting the level array - let mut next_nbd = vec![0; nb_vertices]; - let mut lifo = VecDeque::new(); - - lifo.push_back((idsource, flow_upper_bound)); - - while let Some((id_tmp, f_tmp)) = lifo.back() { - let id = *id_tmp; - let f = *f_tmp; - if id == idsink { - //The DFS reached the sink, we can add a - //residual flow. - lifo.pop_back(); - while let Some((id, _)) = lifo.pop_back() { - let nbd = next_nbd[id]; - self.graph[id][nbd].flow += f as i32; - let id_rev = self.graph[id][nbd].dest; - let nbd_rev = self.graph[id][nbd].rev; - self.graph[id_rev][nbd_rev].flow -= f as i32; - } - lifo.push_back((idsource, flow_upper_bound)); - continue; - } - //else we did not reach the sink - let nbd = next_nbd[id]; - if nbd >= self.graph[id].len() { - //There is nothing to explore from id anymore - lifo.pop_back(); - if let Some((parent, _)) = lifo.back() { - next_nbd[*parent] += 1; - } - continue; - } - //else we can try to send flow from id to its nbd - let new_flow = min(f as i32, self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow) as u32; - if new_flow == 0 { - next_nbd[id] += 1; - continue; - } - if let (Some(lvldest), Some(lvlid)) = - (level[self.graph[id][nbd].dest], level[id]){ - if lvldest <= lvlid { - //We cannot send flow to nbd. - next_nbd[id] += 1; - continue; - } - } - //otherwise, we send flow to nbd. - lifo.push_back((self.graph[id][nbd].dest, new_flow)); - } - } - Ok(()) - } - - //This function takes a flow, and a cost function on the edges, and tries to find an - // equivalent flow with a better cost, by finding improving overflow cycles. It uses - // as subroutine the Bellman Ford algorithm run up to path_length. - // We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and only - // one needs to be present in the cost function. - pub fn optimize_flow_with_cost(&mut self , cost: &CostFunction, path_length: usize ) - -> Result<(),String>{ - //We build the weighted graph g where we will look for negative cycle - let mut gf = self.build_cost_graph(cost)?; - let mut cycles = gf.list_negative_cycles(path_length); - while !cycles.is_empty() { - //we enumerate negative cycles - for c in cycles.iter(){ - for i in 0..c.len(){ - //We add one flow unit to the edge (u,v) of cycle c - let idu = self.vertextoid[&c[i]]; - let idv = self.vertextoid[&c[(i+1)%c.len()]]; - for j in 0..self.graph[idu].len(){ - //since idu appears at most once in the cycles, we enumerate every - //edge at most once. - let edge = self.graph[idu][j]; - if edge.dest == idv { - self.graph[idu][j].flow += 1; - self.graph[idv][edge.rev].flow -=1; - break; - } - } - } - } - - gf = self.build_cost_graph(cost)?; - cycles = gf.list_negative_cycles(path_length); - } - Ok(()) - } - - //Construct the weighted graph G_f from the flow and the cost function - fn build_cost_graph(&self , cost: &CostFunction) -> Result,String>{ - - let mut g = Graph::::new(&self.idtovertex); - let nb_vertices = self.idtovertex.len(); - for i in 0..nb_vertices { - for edge in self.graph[i].iter() { - if edge.cap as i32 -edge.flow > 0 { - //It is possible to send overflow through this edge - let u = self.idtovertex[i]; - let v = self.idtovertex[edge.dest]; - if cost.contains_key(&(u,v)) { - g.add_edge(u,v, cost[&(u,v)])?; - } - else if cost.contains_key(&(v,u)) { - g.add_edge(u,v, -cost[&(v,u)])?; - } - else{ - g.add_edge(u,v, 0)?; - } - } - } - } - Ok(g) - - } - - +pub type CostFunction = HashMap<(Vertex, Vertex), i32>; + +impl Graph { + pub fn new(vertices: &[Vertex]) -> Self { + let mut map = HashMap::::new(); + for (i, vert) in vertices.iter().enumerate() { + map.insert(*vert, i); + } + Graph:: { + vertextoid: map, + idtovertex: vertices.to_vec(), + graph: vec![Vec::::new(); vertices.len()], + } + } } -impl Graph{ - //This function adds a single directed weighted edge to the graph. - pub fn add_edge(&mut self, u: Vertex, v:Vertex, w: i32) -> Result<(), String>{ - if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); - } - let idu = self.vertextoid[&u]; - let idv = self.vertextoid[&v]; - self.graph[idu].push( WeightedEdge{ w , dest: idv} ); - Ok(()) - } - - //This function lists the negative cycles it manages to find after path_length - //iterations of the main loop of the Bellman-Ford algorithm. For the classical - //algorithm, path_length needs to be equal to the number of vertices. However, - //for particular graph structures like our case, the algorithm is still correct - //when path_length is the length of the longest possible simple path. - //See the formal description of the algorithm for more details. - fn list_negative_cycles(&self, path_length: usize) -> Vec< Vec > { - - let nb_vertices = self.graph.len(); - - //We start with every vertex at distance 0 of some imaginary extra -1 vertex. - let mut distance = vec![0 ; nb_vertices]; - //The prev vector collects for every vertex from where does the shortest path come - let mut prev = vec![None; nb_vertices]; - - for _ in 0..path_length +1 { - for id in 0..nb_vertices{ - for e in self.graph[id].iter(){ - if distance[id] + e.w < distance[e.dest] { - distance[e.dest] = distance[id] + e.w; - prev[e.dest] = Some(id); - } - } - } - } - - - //If self.graph contains a negative cycle, then at this point the graph described - //by prev (which is a directed 1-forest/functional graph) - //must contain a cycle. We list the cycles of prev. - let cycles_prev = cycles_of_1_forest(&prev); - - //Remark that the cycle in prev is in the reverse order compared to the cycle - //in the graph. Thus the .rev(). - return cycles_prev.iter().map(|cycle| cycle.iter().rev().map( - |id| self.idtovertex[*id] - ).collect() ).collect(); - } - +impl Graph { + //This function adds a directed edge to the graph with capacity c, and the + //corresponding reversed edge with capacity 0. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> { + if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idu = self.vertextoid[&u]; + let idv = self.vertextoid[&v]; + let rev_u = self.graph[idu].len(); + let rev_v = self.graph[idv].len(); + self.graph[idu].push(FlowEdge { + cap: c, + dest: idv, + flow: 0, + rev: rev_v, + }); + self.graph[idv].push(FlowEdge { + cap: 0, + dest: idu, + flow: 0, + rev: rev_u, + }); + Ok(()) + } + + //This function returns the list of vertices that receive a positive flow from + //vertex v. + pub fn get_positive_flow_from(&self, v: Vertex) -> Result, String> { + if !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idv = self.vertextoid[&v]; + let mut result = Vec::::new(); + for edge in self.graph[idv].iter() { + if edge.flow > 0 { + result.push(self.idtovertex[edge.dest]); + } + } + Ok(result) + } + + //This function returns the value of the flow incoming to v. + pub fn get_inflow(&self, v: Vertex) -> Result { + if !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idv = self.vertextoid[&v]; + let mut result = 0; + for edge in self.graph[idv].iter() { + result += max(0, self.graph[edge.dest][edge.rev].flow); + } + Ok(result) + } + + //This function returns the value of the flow outgoing from v. + pub fn get_outflow(&self, v: Vertex) -> Result { + if !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idv = self.vertextoid[&v]; + let mut result = 0; + for edge in self.graph[idv].iter() { + result += max(0, edge.flow); + } + Ok(result) + } + + //This function computes the flow total value by computing the outgoing flow + //from the source. + pub fn get_flow_value(&mut self) -> Result { + self.get_outflow(Vertex::Source) + } + + //This function shuffles the order of the edge lists. It keeps the ids of the + //reversed edges consistent. + fn shuffle_edges(&mut self) { + let mut rng = rand::thread_rng(); + for i in 0..self.graph.len() { + self.graph[i].shuffle(&mut rng); + //We need to update the ids of the reverse edges. + for j in 0..self.graph[i].len() { + let target_v = self.graph[i][j].dest; + let target_rev = self.graph[i][j].rev; + self.graph[target_v][target_rev].rev = j; + } + } + } + + //Computes an upper bound of the flow n the graph + pub fn flow_upper_bound(&self) -> u32 { + let idsource = self.vertextoid[&Vertex::Source]; + let mut flow_upper_bound = 0; + for edge in self.graph[idsource].iter() { + flow_upper_bound += edge.cap; + } + flow_upper_bound + } + + //This function computes the maximal flow using Dinic's algorithm. It starts with + //the flow values already present in the graph. So it is possible to add some edge to + //the graph, compute a flow, add other edges, update the flow. + pub fn compute_maximal_flow(&mut self) -> Result<(), String> { + if !self.vertextoid.contains_key(&Vertex::Source) { + return Err("The graph does not contain a source.".to_string()); + } + if !self.vertextoid.contains_key(&Vertex::Sink) { + return Err("The graph does not contain a sink.".to_string()); + } + + let idsource = self.vertextoid[&Vertex::Source]; + let idsink = self.vertextoid[&Vertex::Sink]; + + let nb_vertices = self.graph.len(); + + let flow_upper_bound = self.flow_upper_bound(); + + //To ensure the dispersion of the associations generated by the + //assignation, we shuffle the neighbours of the nodes. Hence, + //the vertices do not consider their neighbours in the same order. + self.shuffle_edges(); + + //We run Dinic's max flow algorithm + loop { + //We build the level array from Dinic's algorithm. + let mut level = vec![None; nb_vertices]; + + let mut fifo = VecDeque::new(); + fifo.push_back((idsource, 0)); + while !fifo.is_empty() { + if let Some((id, lvl)) = fifo.pop_front() { + if level[id] == None { + //it means id has not yet been reached + level[id] = Some(lvl); + for edge in self.graph[id].iter() { + if edge.cap as i32 - edge.flow > 0 { + fifo.push_back((edge.dest, lvl + 1)); + } + } + } + } + } + if level[idsink] == None { + //There is no residual flow + break; + } + //Now we run DFS respecting the level array + let mut next_nbd = vec![0; nb_vertices]; + let mut lifo = VecDeque::new(); + + lifo.push_back((idsource, flow_upper_bound)); + + while let Some((id_tmp, f_tmp)) = lifo.back() { + let id = *id_tmp; + let f = *f_tmp; + if id == idsink { + //The DFS reached the sink, we can add a + //residual flow. + lifo.pop_back(); + while let Some((id, _)) = lifo.pop_back() { + let nbd = next_nbd[id]; + self.graph[id][nbd].flow += f as i32; + let id_rev = self.graph[id][nbd].dest; + let nbd_rev = self.graph[id][nbd].rev; + self.graph[id_rev][nbd_rev].flow -= f as i32; + } + lifo.push_back((idsource, flow_upper_bound)); + continue; + } + //else we did not reach the sink + let nbd = next_nbd[id]; + if nbd >= self.graph[id].len() { + //There is nothing to explore from id anymore + lifo.pop_back(); + if let Some((parent, _)) = lifo.back() { + next_nbd[*parent] += 1; + } + continue; + } + //else we can try to send flow from id to its nbd + let new_flow = min( + f as i32, + self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow, + ) as u32; + if new_flow == 0 { + next_nbd[id] += 1; + continue; + } + if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) { + if lvldest <= lvlid { + //We cannot send flow to nbd. + next_nbd[id] += 1; + continue; + } + } + //otherwise, we send flow to nbd. + lifo.push_back((self.graph[id][nbd].dest, new_flow)); + } + } + Ok(()) + } + + //This function takes a flow, and a cost function on the edges, and tries to find an + // equivalent flow with a better cost, by finding improving overflow cycles. It uses + // as subroutine the Bellman Ford algorithm run up to path_length. + // We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and only + // one needs to be present in the cost function. + pub fn optimize_flow_with_cost( + &mut self, + cost: &CostFunction, + path_length: usize, + ) -> Result<(), String> { + //We build the weighted graph g where we will look for negative cycle + let mut gf = self.build_cost_graph(cost)?; + let mut cycles = gf.list_negative_cycles(path_length); + while !cycles.is_empty() { + //we enumerate negative cycles + for c in cycles.iter() { + for i in 0..c.len() { + //We add one flow unit to the edge (u,v) of cycle c + let idu = self.vertextoid[&c[i]]; + let idv = self.vertextoid[&c[(i + 1) % c.len()]]; + for j in 0..self.graph[idu].len() { + //since idu appears at most once in the cycles, we enumerate every + //edge at most once. + let edge = self.graph[idu][j]; + if edge.dest == idv { + self.graph[idu][j].flow += 1; + self.graph[idv][edge.rev].flow -= 1; + break; + } + } + } + } + + gf = self.build_cost_graph(cost)?; + cycles = gf.list_negative_cycles(path_length); + } + Ok(()) + } + + //Construct the weighted graph G_f from the flow and the cost function + fn build_cost_graph(&self, cost: &CostFunction) -> Result, String> { + let mut g = Graph::::new(&self.idtovertex); + let nb_vertices = self.idtovertex.len(); + for i in 0..nb_vertices { + for edge in self.graph[i].iter() { + if edge.cap as i32 - edge.flow > 0 { + //It is possible to send overflow through this edge + let u = self.idtovertex[i]; + let v = self.idtovertex[edge.dest]; + if cost.contains_key(&(u, v)) { + g.add_edge(u, v, cost[&(u, v)])?; + } else if cost.contains_key(&(v, u)) { + g.add_edge(u, v, -cost[&(v, u)])?; + } else { + g.add_edge(u, v, 0)?; + } + } + } + } + Ok(g) + } } +impl Graph { + //This function adds a single directed weighted edge to the graph. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> { + if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { + return Err("The graph does not contain the provided vertex.".to_string()); + } + let idu = self.vertextoid[&u]; + let idv = self.vertextoid[&v]; + self.graph[idu].push(WeightedEdge { w, dest: idv }); + Ok(()) + } + + //This function lists the negative cycles it manages to find after path_length + //iterations of the main loop of the Bellman-Ford algorithm. For the classical + //algorithm, path_length needs to be equal to the number of vertices. However, + //for particular graph structures like our case, the algorithm is still correct + //when path_length is the length of the longest possible simple path. + //See the formal description of the algorithm for more details. + fn list_negative_cycles(&self, path_length: usize) -> Vec> { + let nb_vertices = self.graph.len(); + + //We start with every vertex at distance 0 of some imaginary extra -1 vertex. + let mut distance = vec![0; nb_vertices]; + //The prev vector collects for every vertex from where does the shortest path come + let mut prev = vec![None; nb_vertices]; + + for _ in 0..path_length + 1 { + for id in 0..nb_vertices { + for e in self.graph[id].iter() { + if distance[id] + e.w < distance[e.dest] { + distance[e.dest] = distance[id] + e.w; + prev[e.dest] = Some(id); + } + } + } + } + + //If self.graph contains a negative cycle, then at this point the graph described + //by prev (which is a directed 1-forest/functional graph) + //must contain a cycle. We list the cycles of prev. + let cycles_prev = cycles_of_1_forest(&prev); + + //Remark that the cycle in prev is in the reverse order compared to the cycle + //in the graph. Thus the .rev(). + return cycles_prev + .iter() + .map(|cycle| cycle.iter().rev().map(|id| self.idtovertex[*id]).collect()) + .collect(); + } +} //This function returns the list of cycles of a directed 1 forest. It does not //check for the consistency of the input. -fn cycles_of_1_forest(forest: &[Option]) -> Vec> { - let mut cycles = Vec::>::new(); - let mut time_of_discovery = vec![None; forest.len()]; - - for t in 0..forest.len(){ - let mut id = t; - //while we are on a valid undiscovered node - while time_of_discovery[id] == None { - time_of_discovery[id] = Some(t); - if let Some(i) = forest[id] { - id = i; - } - else{ - break; - } - } - if forest[id] != None && time_of_discovery[id] == Some(t) { - //We discovered an id that we explored at this iteration t. - //It means we are on a cycle - let mut cy = vec![id; 1]; - let mut id2 = id; - while let Some(id_next) = forest[id2] { - id2 = id_next; - if id2 != id { - cy.push(id2); - } - else { - break; - } - } - cycles.push(cy); - } - } - cycles +fn cycles_of_1_forest(forest: &[Option]) -> Vec> { + let mut cycles = Vec::>::new(); + let mut time_of_discovery = vec![None; forest.len()]; + + for t in 0..forest.len() { + let mut id = t; + //while we are on a valid undiscovered node + while time_of_discovery[id] == None { + time_of_discovery[id] = Some(t); + if let Some(i) = forest[id] { + id = i; + } else { + break; + } + } + if forest[id] != None && time_of_discovery[id] == Some(t) { + //We discovered an id that we explored at this iteration t. + //It means we are on a cycle + let mut cy = vec![id; 1]; + let mut id2 = id; + while let Some(id_next) = forest[id2] { + id2 = id_next; + if id2 != id { + cy.push(id2); + } else { + break; + } + } + cycles.push(cy); + } + } + cycles } - - diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 976f94af..3a6f42ee 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -7,7 +7,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; -use garage_util::crdt::{AutoCrdt, Crdt, LwwMap, Lww}; +use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::error::*; @@ -27,11 +27,11 @@ pub struct ClusterLayout { pub version: u64, pub replication_factor: usize, - - //This attribute is only used to retain the previously computed partition size, - //to know to what extent does it change with the layout update. - pub partition_size: u32, - pub parameters: LayoutParameters, + + //This attribute is only used to retain the previously computed partition size, + //to know to what extent does it change with the layout update. + pub partition_size: u32, + pub parameters: LayoutParameters, pub roles: LwwMap, @@ -39,7 +39,7 @@ pub struct ClusterLayout { /// in the system (this includes gateway nodes). /// The order here is different than the vec stored by `roles`, because: /// 1. non-gateway nodes are first so that they have lower numbers holding - /// in u8 (the number of non-gateway nodes is at most 256). + /// in u8 (the number of non-gateway nodes is at most 256). /// 2. nodes that don't have a role are excluded (but they need to /// stay in the CRDT as tombstones) pub node_id_vec: Vec, @@ -49,7 +49,7 @@ pub struct ClusterLayout { pub ring_assignation_data: Vec, /// Role changes which are staged for the next version of the layout - pub staged_parameters: Lww, + pub staged_parameters: Lww, pub staging: LwwMap, pub staging_hash: Hash, } @@ -58,14 +58,14 @@ pub struct ClusterLayout { ///algorithm. It is stored as a Crdt. #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct LayoutParameters { - pub zone_redundancy:usize, + pub zone_redundancy: usize, } impl AutoCrdt for LayoutParameters { const WARN_IF_DIFFERENT: bool = true; } -const NB_PARTITIONS : usize = 1usize << PARTITION_BITS; +const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRoleV(pub Option); @@ -96,27 +96,28 @@ impl NodeRole { } } - pub fn tags_string(&self) -> String { - let mut tags = String::new(); - if self.tags.is_empty() { - return tags - } - tags.push_str(&self.tags[0].clone()); - for t in 1..self.tags.len(){ - tags.push(','); - tags.push_str(&self.tags[t].clone()); - } - tags - } + pub fn tags_string(&self) -> String { + let mut tags = String::new(); + if self.tags.is_empty() { + return tags; + } + tags.push_str(&self.tags[0].clone()); + for t in 1..self.tags.len() { + tags.push(','); + tags.push_str(&self.tags[t].clone()); + } + tags + } } impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { - - //We set the default zone redundancy to be equal to the replication factor, - //i.e. as strict as possible. - let parameters = LayoutParameters{ zone_redundancy: replication_factor}; - let staged_parameters = Lww::::new(parameters.clone()); + //We set the default zone redundancy to be equal to the replication factor, + //i.e. as strict as possible. + let parameters = LayoutParameters { + zone_redundancy: replication_factor, + }; + let staged_parameters = Lww::::new(parameters.clone()); let empty_lwwmap = LwwMap::new(); let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); @@ -124,12 +125,12 @@ impl ClusterLayout { ClusterLayout { version: 0, replication_factor, - partition_size: 0, + partition_size: 0, roles: LwwMap::new(), node_id_vec: Vec::new(), ring_assignation_data: Vec::new(), - parameters, - staged_parameters, + parameters, + staged_parameters, staging: empty_lwwmap, staging_hash: empty_lwwmap_hash, } @@ -142,11 +143,10 @@ impl ClusterLayout { true } Ordering::Equal => { - let param_changed = self.staged_parameters.get() != other.staged_parameters.get(); - self.staged_parameters.merge(&other.staged_parameters); + let param_changed = self.staged_parameters.get() != other.staged_parameters.get(); + self.staged_parameters.merge(&other.staged_parameters); self.staging.merge(&other.staging); - let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); let stage_changed = new_staging_hash != self.staging_hash; @@ -158,7 +158,7 @@ impl ClusterLayout { } } - pub fn apply_staged_changes(mut self, version: Option) -> Result<(Self,Message), Error> { + pub fn apply_staged_changes(mut self, version: Option) -> Result<(Self, Message), Error> { match version { None => { let error = r#" @@ -177,14 +177,14 @@ To know the correct value of the new layout version, invoke `garage layout show` self.roles.merge(&self.staging); self.roles.retain(|(_, _, v)| v.0.is_some()); - let msg = self.calculate_partition_assignation()?; + let msg = self.calculate_partition_assignation()?; self.staging.clear(); self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); self.version += 1; - Ok((self,msg)) + Ok((self, msg)) } pub fn revert_staged_changes(mut self, version: Option) -> Result { @@ -229,44 +229,52 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - ///Returns the uuids of the non_gateway nodes in self.node_id_vec. - pub fn useful_nodes(&self) -> Vec { - let mut result = Vec::::new(); - for uuid in self.node_id_vec.iter() { - match self.node_role(uuid) { - Some(role) if role.capacity != None => result.push(*uuid), - _ => () - } - } - result - } - - ///Given a node uuids, this function returns the label of its zone - pub fn get_node_zone(&self, uuid : &Uuid) -> Result { - match self.node_role(uuid) { - Some(role) => Ok(role.zone.clone()), - _ => Err(Error::Message("The Uuid does not correspond to a node present in the cluster.".into())) - } - } - - ///Given a node uuids, this function returns its capacity or fails if it does not have any - pub fn get_node_capacity(&self, uuid : &Uuid) -> Result { - match self.node_role(uuid) { - Some(NodeRole{capacity : Some(cap), zone: _, tags: _}) => Ok(*cap), - _ => Err(Error::Message("The Uuid does not correspond to a node present in the \ - cluster or this node does not have a positive capacity.".into())) - } - } - - ///Returns the sum of capacities of non gateway nodes in the cluster - pub fn get_total_capacity(&self) -> Result { - let mut total_capacity = 0; - for uuid in self.useful_nodes().iter() { - total_capacity += self.get_node_capacity(uuid)?; - } - Ok(total_capacity) - } + ///Returns the uuids of the non_gateway nodes in self.node_id_vec. + pub fn useful_nodes(&self) -> Vec { + let mut result = Vec::::new(); + for uuid in self.node_id_vec.iter() { + match self.node_role(uuid) { + Some(role) if role.capacity != None => result.push(*uuid), + _ => (), + } + } + result + } + + ///Given a node uuids, this function returns the label of its zone + pub fn get_node_zone(&self, uuid: &Uuid) -> Result { + match self.node_role(uuid) { + Some(role) => Ok(role.zone.clone()), + _ => Err(Error::Message( + "The Uuid does not correspond to a node present in the cluster.".into(), + )), + } + } + + ///Given a node uuids, this function returns its capacity or fails if it does not have any + pub fn get_node_capacity(&self, uuid: &Uuid) -> Result { + match self.node_role(uuid) { + Some(NodeRole { + capacity: Some(cap), + zone: _, + tags: _, + }) => Ok(*cap), + _ => Err(Error::Message( + "The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity." + .into(), + )), + } + } + ///Returns the sum of capacities of non gateway nodes in the cluster + pub fn get_total_capacity(&self) -> Result { + let mut total_capacity = 0; + for uuid in self.useful_nodes().iter() { + total_capacity += self.get_node_capacity(uuid)?; + } + Ok(total_capacity) + } /// Check a cluster layout for internal consistency /// returns true if consistent, false if error @@ -311,580 +319,689 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - //Check that every partition is associated to distinct nodes - let rf = self.replication_factor; - for p in 0..(1 << PARTITION_BITS) { - let nodes_of_p = self.ring_assignation_data[rf*p..rf*(p+1)].to_vec(); - if nodes_of_p.iter().unique().count() != rf { - return false; - } - //Check that every partition is spread over at least zone_redundancy zones. - let zones_of_p = nodes_of_p.iter() - .map(|n| self.get_node_zone(&self.node_id_vec[*n as usize]) - .expect("Zone not found.")); - let redundancy = self.parameters.zone_redundancy; - if zones_of_p.unique().count() < redundancy { - return false; - } - } - - //Check that the nodes capacities is consistent with the stored partitions - let mut node_usage = vec![0; MAX_NODE_NUMBER]; - for n in self.ring_assignation_data.iter() { - node_usage[*n as usize] += 1; - } - for (n, usage) in node_usage.iter().enumerate(){ - if *usage > 0 { - let uuid = self.node_id_vec[n]; - if usage*self.partition_size > self.get_node_capacity(&uuid) - .expect("Critical Error"){ - return false; - } - } - } - - //Check that the partition size stored is the one computed by the asignation - //algorithm. - let cl2 = self.clone(); - let (_ , zone_to_id) = cl2.generate_useful_zone_ids().expect("Critical Error"); - let partition_size = cl2.compute_optimal_partition_size(&zone_to_id).expect("Critical Error"); - if partition_size != self.partition_size { - return false; - } + //Check that every partition is associated to distinct nodes + let rf = self.replication_factor; + for p in 0..(1 << PARTITION_BITS) { + let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec(); + if nodes_of_p.iter().unique().count() != rf { + return false; + } + //Check that every partition is spread over at least zone_redundancy zones. + let zones_of_p = nodes_of_p.iter().map(|n| { + self.get_node_zone(&self.node_id_vec[*n as usize]) + .expect("Zone not found.") + }); + let redundancy = self.parameters.zone_redundancy; + if zones_of_p.unique().count() < redundancy { + return false; + } + } + + //Check that the nodes capacities is consistent with the stored partitions + let mut node_usage = vec![0; MAX_NODE_NUMBER]; + for n in self.ring_assignation_data.iter() { + node_usage[*n as usize] += 1; + } + for (n, usage) in node_usage.iter().enumerate() { + if *usage > 0 { + let uuid = self.node_id_vec[n]; + if usage * self.partition_size + > self.get_node_capacity(&uuid).expect("Critical Error") + { + return false; + } + } + } + //Check that the partition size stored is the one computed by the asignation + //algorithm. + let cl2 = self.clone(); + let (_, zone_to_id) = cl2.generate_useful_zone_ids().expect("Critical Error"); + let partition_size = cl2 + .compute_optimal_partition_size(&zone_to_id) + .expect("Critical Error"); + if partition_size != self.partition_size { + return false; + } true } - } impl ClusterLayout { /// This function calculates a new partition-to-node assignation. /// The computed assignation respects the node replication factor - /// and the zone redundancy parameter It maximizes the capacity of a + /// and the zone redundancy parameter It maximizes the capacity of a /// partition (assuming all partitions have the same size). /// Among such optimal assignation, it minimizes the distance to /// the former assignation (if any) to minimize the amount of /// data to be moved. - /// Staged changes must be merged with nodes roles before calling this function. - pub fn calculate_partition_assignation(&mut self) -> Result { + /// Staged changes must be merged with nodes roles before calling this function. + pub fn calculate_partition_assignation(&mut self) -> Result { //The nodes might have been updated, some might have been deleted. //So we need to first update the list of nodes and retrieve the //assignation. - - //We update the node ids, since the node role list might have changed with the - //changes in the layout. We retrieve the old_assignation reframed with the new ids - let old_assignation_opt = self.update_node_id_vec()?; - - let redundancy = self.staged_parameters.get().zone_redundancy; - - - let mut msg = Message::new(); - msg.push(format!("Computation of a new cluster layout where partitions are \ - replicated {} times on at least {} distinct zones.", self.replication_factor, redundancy)); - - //We generate for once numerical ids for the zones of non gateway nodes, - //to use them as indices in the flow graphs. - let (id_to_zone , zone_to_id) = self.generate_useful_zone_ids()?; - - let nb_useful_nodes = self.useful_nodes().len(); - msg.push(format!("The cluster contains {} nodes spread over {} zones.", - nb_useful_nodes, id_to_zone.len())); - if nb_useful_nodes < self.replication_factor{ - return Err(Error::Message(format!("The number of nodes with positive \ + + //We update the node ids, since the node role list might have changed with the + //changes in the layout. We retrieve the old_assignation reframed with the new ids + let old_assignation_opt = self.update_node_id_vec()?; + + let redundancy = self.staged_parameters.get().zone_redundancy; + + let mut msg = Message::new(); + msg.push(format!( + "Computation of a new cluster layout where partitions are \ + replicated {} times on at least {} distinct zones.", + self.replication_factor, redundancy + )); + + //We generate for once numerical ids for the zones of non gateway nodes, + //to use them as indices in the flow graphs. + let (id_to_zone, zone_to_id) = self.generate_useful_zone_ids()?; + + let nb_useful_nodes = self.useful_nodes().len(); + msg.push(format!( + "The cluster contains {} nodes spread over {} zones.", + nb_useful_nodes, + id_to_zone.len() + )); + if nb_useful_nodes < self.replication_factor { + return Err(Error::Message(format!( + "The number of nodes with positive \ capacity ({}) is smaller than the replication factor ({}).", - nb_useful_nodes, self.replication_factor))); - } - if id_to_zone.len() < redundancy { - return Err(Error::Message(format!("The number of zones with non-gateway \ + nb_useful_nodes, self.replication_factor + ))); + } + if id_to_zone.len() < redundancy { + return Err(Error::Message(format!( + "The number of zones with non-gateway \ nodes ({}) is smaller than the redundancy parameter ({})", - id_to_zone.len() , redundancy))); - } - - //We compute the optimal partition size - //Capacities should be given in a unit so that partition size is at least 100. - //In this case, integer rounding plays a marginal role in the percentages of - //optimality. - let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; - - if old_assignation_opt != None { - msg.push(format!("Given the replication and redundancy constraint, the \ + id_to_zone.len(), + redundancy + ))); + } + + //We compute the optimal partition size + //Capacities should be given in a unit so that partition size is at least 100. + //In this case, integer rounding plays a marginal role in the percentages of + //optimality. + let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; + + if old_assignation_opt != None { + msg.push(format!( + "Given the replication and redundancy constraint, the \ optimal size of a partition is {}. In the previous layout, it used to \ - be {} (the zone redundancy was {}).", partition_size, self.partition_size, - self.parameters.zone_redundancy)); - } - else { - msg.push(format!("Given the replication and redundancy constraints, the \ - optimal size of a partition is {}.", partition_size)); - } - self.partition_size = partition_size; - self.parameters = self.staged_parameters.get().clone(); - - if partition_size < 100 { - msg.push("WARNING: The partition size is low (< 100), you might consider to \ - provide the nodes capacities in a smaller unit (e.g. Mb instead of Gb).".into()); - } - - //We compute a first flow/assignment that is heuristically close to the previous - //assignment - let mut gflow = self.compute_candidate_assignment( &zone_to_id, &old_assignation_opt)?; - if let Some(assoc) = &old_assignation_opt { - //We minimize the distance to the previous assignment. - self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; - } - - msg.append(&mut self.output_stat(&gflow, &old_assignation_opt, &zone_to_id,&id_to_zone)?); - msg.push("".to_string()); - - //We update the layout structure - self.update_ring_from_flow(id_to_zone.len() , &gflow)?; - Ok(msg) - } + be {} (the zone redundancy was {}).", + partition_size, self.partition_size, self.parameters.zone_redundancy + )); + } else { + msg.push(format!( + "Given the replication and redundancy constraints, the \ + optimal size of a partition is {}.", + partition_size + )); + } + self.partition_size = partition_size; + self.parameters = self.staged_parameters.get().clone(); + + if partition_size < 100 { + msg.push( + "WARNING: The partition size is low (< 100), you might consider to \ + provide the nodes capacities in a smaller unit (e.g. Mb instead of Gb)." + .into(), + ); + } + + //We compute a first flow/assignment that is heuristically close to the previous + //assignment + let mut gflow = self.compute_candidate_assignment(&zone_to_id, &old_assignation_opt)?; + if let Some(assoc) = &old_assignation_opt { + //We minimize the distance to the previous assignment. + self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; + } + + msg.append(&mut self.output_stat( + &gflow, + &old_assignation_opt, + &zone_to_id, + &id_to_zone, + )?); + msg.push("".to_string()); + + //We update the layout structure + self.update_ring_from_flow(id_to_zone.len(), &gflow)?; + Ok(msg) + } /// The LwwMap of node roles might have changed. This function updates the node_id_vec /// and returns the assignation given by ring, with the new indices of the nodes, and /// None if the node is not present anymore. /// We work with the assumption that only this function and calculate_new_assignation /// do modify assignation_ring and node_id_vec. - fn update_node_id_vec(&mut self) -> Result< Option< Vec > > ,Error> { - // (1) We compute the new node list - //Non gateway nodes should be coded on 8bits, hence they must be first in the list - //We build the new node ids - let mut new_non_gateway_nodes: Vec = self.roles.items().iter() - .filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity != None)) - .map(|(k, _, _)| *k).collect(); - - if new_non_gateway_nodes.len() > MAX_NODE_NUMBER { - return Err(Error::Message(format!("There are more than {} non-gateway nodes in the new \ - layout. This is not allowed.", MAX_NODE_NUMBER) )); - } - - let mut new_gateway_nodes: Vec = self.roles.items().iter() - .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity == None)) - .map(|(k, _, _)| *k).collect(); - - let mut new_node_id_vec = Vec::::new(); - new_node_id_vec.append(&mut new_non_gateway_nodes); - new_node_id_vec.append(&mut new_gateway_nodes); - - let old_node_id_vec = self.node_id_vec.clone(); - self.node_id_vec = new_node_id_vec.clone(); - - // (2) We retrieve the old association - //We rewrite the old association with the new indices. We only consider partition - //to node assignations where the node is still in use. - let nb_partitions = 1usize << PARTITION_BITS; - let mut old_assignation = vec![ Vec::::new() ; nb_partitions]; - - if self.ring_assignation_data.is_empty() { - //This is a new association - return Ok(None); - } - if self.ring_assignation_data.len() != nb_partitions * self.replication_factor { - return Err(Error::Message("The old assignation does not have a size corresponding to \ - the old replication factor or the number of partitions.".into())); - } - - //We build a translation table between the uuid and new ids - let mut uuid_to_new_id = HashMap::::new(); - - //We add the indices of only the new non-gateway nodes that can be used in the - //association ring - for (i, uuid) in new_node_id_vec.iter().enumerate() { - uuid_to_new_id.insert(*uuid, i ); - } - - let rf= self.replication_factor; - for (p, old_assign_p) in old_assignation.iter_mut().enumerate() { - for old_id in &self.ring_assignation_data[p*rf..(p+1)*rf] { - let uuid = old_node_id_vec[*old_id as usize]; - if uuid_to_new_id.contains_key(&uuid) { - old_assign_p.push(uuid_to_new_id[&uuid]); - } - } - } - - //We write the ring - self.ring_assignation_data = Vec::::new(); - - if !self.check() { - return Err(Error::Message("Critical error: The computed layout happens to be incorrect".into())); - } - - Ok(Some(old_assignation)) + fn update_node_id_vec(&mut self) -> Result>>, Error> { + // (1) We compute the new node list + //Non gateway nodes should be coded on 8bits, hence they must be first in the list + //We build the new node ids + let mut new_non_gateway_nodes: Vec = self + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity != None)) + .map(|(k, _, _)| *k) + .collect(); + + if new_non_gateway_nodes.len() > MAX_NODE_NUMBER { + return Err(Error::Message(format!( + "There are more than {} non-gateway nodes in the new \ + layout. This is not allowed.", + MAX_NODE_NUMBER + ))); + } + + let mut new_gateway_nodes: Vec = self + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity == None)) + .map(|(k, _, _)| *k) + .collect(); + + let mut new_node_id_vec = Vec::::new(); + new_node_id_vec.append(&mut new_non_gateway_nodes); + new_node_id_vec.append(&mut new_gateway_nodes); + + let old_node_id_vec = self.node_id_vec.clone(); + self.node_id_vec = new_node_id_vec.clone(); + + // (2) We retrieve the old association + //We rewrite the old association with the new indices. We only consider partition + //to node assignations where the node is still in use. + let nb_partitions = 1usize << PARTITION_BITS; + let mut old_assignation = vec![Vec::::new(); nb_partitions]; + + if self.ring_assignation_data.is_empty() { + //This is a new association + return Ok(None); + } + if self.ring_assignation_data.len() != nb_partitions * self.replication_factor { + return Err(Error::Message( + "The old assignation does not have a size corresponding to \ + the old replication factor or the number of partitions." + .into(), + )); + } + + //We build a translation table between the uuid and new ids + let mut uuid_to_new_id = HashMap::::new(); + + //We add the indices of only the new non-gateway nodes that can be used in the + //association ring + for (i, uuid) in new_node_id_vec.iter().enumerate() { + uuid_to_new_id.insert(*uuid, i); + } + + let rf = self.replication_factor; + for (p, old_assign_p) in old_assignation.iter_mut().enumerate() { + for old_id in &self.ring_assignation_data[p * rf..(p + 1) * rf] { + let uuid = old_node_id_vec[*old_id as usize]; + if uuid_to_new_id.contains_key(&uuid) { + old_assign_p.push(uuid_to_new_id[&uuid]); + } + } + } + + //We write the ring + self.ring_assignation_data = Vec::::new(); + + if !self.check() { + return Err(Error::Message( + "Critical error: The computed layout happens to be incorrect".into(), + )); + } + + Ok(Some(old_assignation)) } + ///This function generates ids for the zone of the nodes appearing in + ///self.node_id_vec. + fn generate_useful_zone_ids(&self) -> Result<(Vec, HashMap), Error> { + let mut id_to_zone = Vec::::new(); + let mut zone_to_id = HashMap::::new(); + + for uuid in self.useful_nodes().iter() { + if self.roles.get(uuid) == None { + return Err(Error::Message( + "The uuid was not found in the node roles (this should \ + not happen, it might be a critical error)." + .into(), + )); + } + if let Some(r) = self.node_role(uuid) { + if !zone_to_id.contains_key(&r.zone) && r.capacity != None { + zone_to_id.insert(r.zone.clone(), id_to_zone.len()); + id_to_zone.push(r.zone.clone()); + } + } + } + Ok((id_to_zone, zone_to_id)) + } - ///This function generates ids for the zone of the nodes appearing in - ///self.node_id_vec. - fn generate_useful_zone_ids(&self) -> Result<(Vec, HashMap),Error>{ - let mut id_to_zone = Vec::::new(); - let mut zone_to_id = HashMap::::new(); - - for uuid in self.useful_nodes().iter() { - if self.roles.get(uuid) == None { - return Err(Error::Message("The uuid was not found in the node roles (this should \ - not happen, it might be a critical error).".into())); - } - if let Some(r) = self.node_role(uuid) { - if !zone_to_id.contains_key(&r.zone) && r.capacity != None { - zone_to_id.insert(r.zone.clone() , id_to_zone.len()); - id_to_zone.push(r.zone.clone()); - } - } - } - Ok((id_to_zone, zone_to_id)) - } - - ///This function computes by dichotomy the largest realizable partition size, given - ///the layout. - fn compute_optimal_partition_size(&self, zone_to_id: &HashMap) -> Result{ - let nb_partitions = 1usize << PARTITION_BITS; - let empty_set = HashSet::<(usize,usize)>::new(); - let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; - g.compute_maximal_flow()?; - if g.get_flow_value()? < (nb_partitions*self.replication_factor).try_into().unwrap() { - return Err(Error::Message("The storage capacity of he cluster is to small. It is \ - impossible to store partitions of size 1.".into())); - } - - let mut s_down = 1; - let mut s_up = self.get_total_capacity()?; - while s_down +1 < s_up { - g = self.generate_flow_graph((s_down+s_up)/2, zone_to_id, &empty_set)?; - g.compute_maximal_flow()?; - if g.get_flow_value()? < (nb_partitions*self.replication_factor).try_into().unwrap() { - s_up = (s_down+s_up)/2; - } - else { - s_down = (s_down+s_up)/2; - } - } - - Ok(s_down) - } - - fn generate_graph_vertices(nb_zones : usize, nb_nodes : usize) -> Vec { - let mut vertices = vec![Vertex::Source, Vertex::Sink]; - for p in 0..NB_PARTITIONS { - vertices.push(Vertex::Pup(p)); - vertices.push(Vertex::Pdown(p)); - for z in 0..nb_zones { - vertices.push(Vertex::PZ(p, z)); - } - } - for n in 0..nb_nodes { - vertices.push(Vertex::N(n)); - } - vertices - } - - fn generate_flow_graph(&self, size: u32, zone_to_id: &HashMap, exclude_assoc : &HashSet<(usize,usize)>) -> Result, Error> { - let vertices = ClusterLayout::generate_graph_vertices(zone_to_id.len(), - self.useful_nodes().len()); - let mut g= Graph::::new(&vertices); - let nb_zones = zone_to_id.len(); - let redundancy = self.staged_parameters.get().zone_redundancy; - for p in 0..NB_PARTITIONS { - g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u32)?; - g.add_edge(Vertex::Source, Vertex::Pdown(p), (self.replication_factor - redundancy) as u32)?; - for z in 0..nb_zones { - g.add_edge(Vertex::Pup(p) , Vertex::PZ(p,z) , 1)?; - g.add_edge(Vertex::Pdown(p) , Vertex::PZ(p,z) , - self.replication_factor as u32)?; - } - } - for n in 0..self.useful_nodes().len() { - let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; - let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?]; - g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity/size)?; - for p in 0..NB_PARTITIONS { - if !exclude_assoc.contains(&(p,n)) { - g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?; - } - } - } - Ok(g) - } - - - fn compute_candidate_assignment(&self, zone_to_id: &HashMap, - old_assoc_opt : &Option >>) -> Result, Error > { - - //We list the edges that are not used in the old association - let mut exclude_edge = HashSet::<(usize,usize)>::new(); - if let Some(old_assoc) = old_assoc_opt { - let nb_nodes = self.useful_nodes().len(); - for (p, old_assoc_p) in old_assoc.iter().enumerate() { - for n in 0..nb_nodes { - exclude_edge.insert((p,n)); - } - for n in old_assoc_p.iter() { - exclude_edge.remove(&(p,*n)); - } - } - } - - //We compute the best flow using only the edges used in the old assoc - let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge )?; - g.compute_maximal_flow()?; - for (p,n) in exclude_edge.iter() { - let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; - g.add_edge(Vertex::PZ(*p,node_zone), Vertex::N(*n), 1)?; - } - g.compute_maximal_flow()?; - Ok(g) - } - - fn minimize_rebalance_load(&self, gflow: &mut Graph, zone_to_id: &HashMap, old_assoc : &[Vec ]) -> Result<(), Error > { - let mut cost = CostFunction::new(); - for (p, assoc_p) in old_assoc.iter().enumerate(){ - for n in assoc_p.iter() { - let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; - cost.insert((Vertex::PZ(p,node_zone), Vertex::N(*n)), -1); - } - } - let nb_nodes = self.useful_nodes().len(); - let path_length = 4*nb_nodes; - gflow.optimize_flow_with_cost(&cost, path_length)?; - - Ok(()) - } - - fn update_ring_from_flow(&mut self, nb_zones : usize, gflow: &Graph ) -> Result<(), Error>{ - self.ring_assignation_data = Vec::::new(); - for p in 0..NB_PARTITIONS { - for z in 0..nb_zones { - let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p,z))?; - for vertex in assoc_vertex.iter() { - if let Vertex::N(n) = vertex { - self.ring_assignation_data.push((*n).try_into().unwrap()); - } - } - } - } - - if self.ring_assignation_data.len() != NB_PARTITIONS*self.replication_factor { - return Err(Error::Message("Critical Error : the association ring we produced does not \ - have the right size.".into())); - } - Ok(()) - } - - - //This function returns a message summing up the partition repartition of the new - //layout. - fn output_stat(&self , gflow : &Graph, - old_assoc_opt : &Option< Vec> >, - zone_to_id: &HashMap, - id_to_zone : &[String]) -> Result{ - let mut msg = Message::new(); - + ///This function computes by dichotomy the largest realizable partition size, given + ///the layout. + fn compute_optimal_partition_size( + &self, + zone_to_id: &HashMap, + ) -> Result { let nb_partitions = 1usize << PARTITION_BITS; - let used_cap = self.partition_size * nb_partitions as u32 * - self.replication_factor as u32; - let total_cap = self.get_total_capacity()?; - let percent_cap = 100.0*(used_cap as f32)/(total_cap as f32); - msg.push(format!("Available capacity / Total cluster capacity: {} / {} ({:.1} %)", - used_cap , total_cap , percent_cap )); - msg.push("".into()); - msg.push("If the percentage is to low, it might be that the \ + let empty_set = HashSet::<(usize, usize)>::new(); + let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? + < (nb_partitions * self.replication_factor) + .try_into() + .unwrap() + { + return Err(Error::Message( + "The storage capacity of he cluster is to small. It is \ + impossible to store partitions of size 1." + .into(), + )); + } + + let mut s_down = 1; + let mut s_up = self.get_total_capacity()?; + while s_down + 1 < s_up { + g = self.generate_flow_graph((s_down + s_up) / 2, zone_to_id, &empty_set)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? + < (nb_partitions * self.replication_factor) + .try_into() + .unwrap() + { + s_up = (s_down + s_up) / 2; + } else { + s_down = (s_down + s_up) / 2; + } + } + + Ok(s_down) + } + + fn generate_graph_vertices(nb_zones: usize, nb_nodes: usize) -> Vec { + let mut vertices = vec![Vertex::Source, Vertex::Sink]; + for p in 0..NB_PARTITIONS { + vertices.push(Vertex::Pup(p)); + vertices.push(Vertex::Pdown(p)); + for z in 0..nb_zones { + vertices.push(Vertex::PZ(p, z)); + } + } + for n in 0..nb_nodes { + vertices.push(Vertex::N(n)); + } + vertices + } + + fn generate_flow_graph( + &self, + size: u32, + zone_to_id: &HashMap, + exclude_assoc: &HashSet<(usize, usize)>, + ) -> Result, Error> { + let vertices = + ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.useful_nodes().len()); + let mut g = Graph::::new(&vertices); + let nb_zones = zone_to_id.len(); + let redundancy = self.staged_parameters.get().zone_redundancy; + for p in 0..NB_PARTITIONS { + g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u32)?; + g.add_edge( + Vertex::Source, + Vertex::Pdown(p), + (self.replication_factor - redundancy) as u32, + )?; + for z in 0..nb_zones { + g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?; + g.add_edge( + Vertex::Pdown(p), + Vertex::PZ(p, z), + self.replication_factor as u32, + )?; + } + } + for n in 0..self.useful_nodes().len() { + let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?]; + g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / size)?; + for p in 0..NB_PARTITIONS { + if !exclude_assoc.contains(&(p, n)) { + g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?; + } + } + } + Ok(g) + } + + fn compute_candidate_assignment( + &self, + zone_to_id: &HashMap, + old_assoc_opt: &Option>>, + ) -> Result, Error> { + //We list the edges that are not used in the old association + let mut exclude_edge = HashSet::<(usize, usize)>::new(); + if let Some(old_assoc) = old_assoc_opt { + let nb_nodes = self.useful_nodes().len(); + for (p, old_assoc_p) in old_assoc.iter().enumerate() { + for n in 0..nb_nodes { + exclude_edge.insert((p, n)); + } + for n in old_assoc_p.iter() { + exclude_edge.remove(&(p, *n)); + } + } + } + + //We compute the best flow using only the edges used in the old assoc + let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?; + g.compute_maximal_flow()?; + for (p, n) in exclude_edge.iter() { + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; + } + g.compute_maximal_flow()?; + Ok(g) + } + + fn minimize_rebalance_load( + &self, + gflow: &mut Graph, + zone_to_id: &HashMap, + old_assoc: &[Vec], + ) -> Result<(), Error> { + let mut cost = CostFunction::new(); + for (p, assoc_p) in old_assoc.iter().enumerate() { + for n in assoc_p.iter() { + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1); + } + } + let nb_nodes = self.useful_nodes().len(); + let path_length = 4 * nb_nodes; + gflow.optimize_flow_with_cost(&cost, path_length)?; + + Ok(()) + } + + fn update_ring_from_flow( + &mut self, + nb_zones: usize, + gflow: &Graph, + ) -> Result<(), Error> { + self.ring_assignation_data = Vec::::new(); + for p in 0..NB_PARTITIONS { + for z in 0..nb_zones { + let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + for vertex in assoc_vertex.iter() { + if let Vertex::N(n) = vertex { + self.ring_assignation_data.push((*n).try_into().unwrap()); + } + } + } + } + + if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { + return Err(Error::Message( + "Critical Error : the association ring we produced does not \ + have the right size." + .into(), + )); + } + Ok(()) + } + + //This function returns a message summing up the partition repartition of the new + //layout. + fn output_stat( + &self, + gflow: &Graph, + old_assoc_opt: &Option>>, + zone_to_id: &HashMap, + id_to_zone: &[String], + ) -> Result { + let mut msg = Message::new(); + + let nb_partitions = 1usize << PARTITION_BITS; + let used_cap = self.partition_size * nb_partitions as u32 * self.replication_factor as u32; + let total_cap = self.get_total_capacity()?; + let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); + msg.push(format!( + "Available capacity / Total cluster capacity: {} / {} ({:.1} %)", + used_cap, total_cap, percent_cap + )); + msg.push("".into()); + msg.push( + "If the percentage is to low, it might be that the \ replication/redundancy constraints force the use of nodes/zones with small \ storage capacities. \ You might want to rebalance the storage capacities or relax the constraints. \ - See the detailed statistics below and look for saturated nodes/zones.".into()); - msg.push(format!("Recall that because of the replication factor, the actual available \ - storage capacity is {} / {} = {}.", - used_cap , self.replication_factor , - used_cap/self.replication_factor as u32)); - - //We define and fill in the following tables - let storing_nodes = self.useful_nodes(); - let mut new_partitions = vec![0; storing_nodes.len()]; - let mut stored_partitions = vec![0; storing_nodes.len()]; - - let mut new_partitions_zone = vec![0; id_to_zone.len()]; - let mut stored_partitions_zone = vec![0; id_to_zone.len()]; - - for p in 0..nb_partitions { - for z in 0..id_to_zone.len() { - let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p,z))?; - if !pz_nodes.is_empty() { - stored_partitions_zone[z] += 1; - if let Some(old_assoc) = old_assoc_opt { - let mut old_zones_of_p = Vec::::new(); - for n in old_assoc[p].iter() { - old_zones_of_p.push( - zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]); - } - if !old_zones_of_p.contains(&z) { - new_partitions_zone[z] += 1; - } - } - } - for vert in pz_nodes.iter() { - if let Vertex::N(n) = *vert { - stored_partitions[n] += 1; - if let Some(old_assoc) = old_assoc_opt { - if !old_assoc[p].contains(&n) { - new_partitions[n] += 1; - } - } - } - } - } - } - - if *old_assoc_opt == None { - new_partitions = stored_partitions.clone(); - new_partitions_zone = stored_partitions_zone.clone(); - } - - //We display the statistics - - msg.push("".into()); - if *old_assoc_opt != None { - let total_new_partitions : usize = new_partitions.iter().sum(); - msg.push(format!("A total of {} new copies of partitions need to be \ - transferred.", total_new_partitions)); - } - msg.push("".into()); - msg.push("==== DETAILED STATISTICS BY ZONES AND NODES ====".into()); - - for z in 0..id_to_zone.len(){ - let mut nodes_of_z = Vec::::new(); - for n in 0..storing_nodes.len(){ - if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] { - nodes_of_z.push(n); - } - } - let replicated_partitions : usize = nodes_of_z.iter() - .map(|n| stored_partitions[*n]).sum(); - msg.push("".into()); - - msg.push(format!("Zone {}: {} distinct partitions stored ({} new, \ - {} partition copies) ", id_to_zone[z], stored_partitions_zone[z], - new_partitions_zone[z], replicated_partitions)); - - let available_cap_z : u32 = self.partition_size*replicated_partitions as u32; - let mut total_cap_z = 0; - for n in nodes_of_z.iter() { - total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; - } - let percent_cap_z = 100.0*(available_cap_z as f32)/(total_cap_z as f32); - msg.push(format!(" Available capacity / Total capacity: {}/{} ({:.1}%).", - available_cap_z, total_cap_z, percent_cap_z)); - - for n in nodes_of_z.iter() { - let available_cap_n = stored_partitions[*n] as u32 *self.partition_size; - let total_cap_n =self.get_node_capacity(&self.node_id_vec[*n])?; - let tags_n = (self.node_role(&self.node_id_vec[*n]) - .ok_or("Node not found."))?.tags_string(); - msg.push(format!(" Node {}: {} partitions ({} new) ; \ - available/total capacity: {} / {} ({:.1}%) ; tags:{}", - &self.node_id_vec[*n].to_vec()[0..2].to_vec().encode_hex::(), - stored_partitions[*n], - new_partitions[*n], available_cap_n, total_cap_n, - (available_cap_n as f32)/(total_cap_n as f32)*100.0 , - tags_n)); - } - } - - Ok(msg) - } - + See the detailed statistics below and look for saturated nodes/zones." + .into(), + ); + msg.push(format!( + "Recall that because of the replication factor, the actual available \ + storage capacity is {} / {} = {}.", + used_cap, + self.replication_factor, + used_cap / self.replication_factor as u32 + )); + + //We define and fill in the following tables + let storing_nodes = self.useful_nodes(); + let mut new_partitions = vec![0; storing_nodes.len()]; + let mut stored_partitions = vec![0; storing_nodes.len()]; + + let mut new_partitions_zone = vec![0; id_to_zone.len()]; + let mut stored_partitions_zone = vec![0; id_to_zone.len()]; + + for p in 0..nb_partitions { + for z in 0..id_to_zone.len() { + let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + if !pz_nodes.is_empty() { + stored_partitions_zone[z] += 1; + if let Some(old_assoc) = old_assoc_opt { + let mut old_zones_of_p = Vec::::new(); + for n in old_assoc[p].iter() { + old_zones_of_p + .push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]); + } + if !old_zones_of_p.contains(&z) { + new_partitions_zone[z] += 1; + } + } + } + for vert in pz_nodes.iter() { + if let Vertex::N(n) = *vert { + stored_partitions[n] += 1; + if let Some(old_assoc) = old_assoc_opt { + if !old_assoc[p].contains(&n) { + new_partitions[n] += 1; + } + } + } + } + } + } + + if *old_assoc_opt == None { + new_partitions = stored_partitions.clone(); + new_partitions_zone = stored_partitions_zone.clone(); + } + + //We display the statistics + + msg.push("".into()); + if *old_assoc_opt != None { + let total_new_partitions: usize = new_partitions.iter().sum(); + msg.push(format!( + "A total of {} new copies of partitions need to be \ + transferred.", + total_new_partitions + )); + } + msg.push("".into()); + msg.push("==== DETAILED STATISTICS BY ZONES AND NODES ====".into()); + + for z in 0..id_to_zone.len() { + let mut nodes_of_z = Vec::::new(); + for n in 0..storing_nodes.len() { + if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] { + nodes_of_z.push(n); + } + } + let replicated_partitions: usize = + nodes_of_z.iter().map(|n| stored_partitions[*n]).sum(); + msg.push("".into()); + + msg.push(format!( + "Zone {}: {} distinct partitions stored ({} new, \ + {} partition copies) ", + id_to_zone[z], + stored_partitions_zone[z], + new_partitions_zone[z], + replicated_partitions + )); + + let available_cap_z: u32 = self.partition_size * replicated_partitions as u32; + let mut total_cap_z = 0; + for n in nodes_of_z.iter() { + total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; + } + let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); + msg.push(format!( + " Available capacity / Total capacity: {}/{} ({:.1}%).", + available_cap_z, total_cap_z, percent_cap_z + )); + + for n in nodes_of_z.iter() { + let available_cap_n = stored_partitions[*n] as u32 * self.partition_size; + let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; + let tags_n = (self + .node_role(&self.node_id_vec[*n]) + .ok_or("Node not found."))? + .tags_string(); + msg.push(format!( + " Node {}: {} partitions ({} new) ; \ + available/total capacity: {} / {} ({:.1}%) ; tags:{}", + &self.node_id_vec[*n].to_vec()[0..2] + .to_vec() + .encode_hex::(), + stored_partitions[*n], + new_partitions[*n], + available_cap_n, + total_cap_n, + (available_cap_n as f32) / (total_cap_n as f32) * 100.0, + tags_n + )); + } + } + + Ok(msg) + } } //==================================================================================== #[cfg(test)] mod tests { - use super::{*,Error}; - use std::cmp::min; - - - //This function checks that the partition size S computed is at least better than the - //one given by a very naive algorithm. To do so, we try to run the naive algorithm - //assuming a partion size of S+1. If we succed, it means that the optimal assignation - //was not optimal. The naive algorithm is the following : - //- we compute the max number of partitions associated to every node, capped at the - //partition number. It gives the number of tokens of every node. - //- every zone has a number of tokens equal to the sum of the tokens of its nodes. - //- we cycle over the partitions and associate zone tokens while respecting the - //zone redundancy constraint. - //NOTE: the naive algorithm is not optimal. Counter example: - //take nb_partition = 3 ; replication_factor = 5; redundancy = 4; - //number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) - //With these parameters, the naive algo fails, whereas there is a solution: - //(A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) - fn check_against_naive(cl: &ClusterLayout) -> Result { - let over_size = cl.partition_size +1; - let mut zone_token = HashMap::::new(); - let nb_partitions = 1usize << PARTITION_BITS; - - let (zones, zone_to_id) = cl.generate_useful_zone_ids()?; - - if zones.is_empty() { - return Ok(false); - } - - for z in zones.iter() { - zone_token.insert(z.clone(), 0); - } - for uuid in cl.useful_nodes().iter() { - let z = cl.get_node_zone(uuid)?; - let c = cl.get_node_capacity(uuid)?; - zone_token.insert(z.clone(), zone_token[&z] + min(nb_partitions , (c/over_size) as usize)); - } - - //For every partition, we count the number of zone already associated and - //the name of the last zone associated - - let mut id_zone_token = vec![0; zones.len()]; - for (z,t) in zone_token.iter() { - id_zone_token[zone_to_id[z]] = *t; - } - - let mut nb_token = vec![0; nb_partitions]; - let mut last_zone = vec![zones.len(); nb_partitions]; - - let mut curr_zone = 0; - - let redundancy = cl.parameters.zone_redundancy; - - for replic in 0..cl.replication_factor { - for p in 0..nb_partitions { - while id_zone_token[curr_zone] == 0 || - (last_zone[p] == curr_zone - && redundancy - nb_token[p] <= cl.replication_factor - replic) { - curr_zone += 1; - if curr_zone >= zones.len() { - return Ok(true); - } - } - id_zone_token[curr_zone] -= 1; - if last_zone[p] != curr_zone { - nb_token[p] += 1; - last_zone[p] = curr_zone; - } - } - } - - return Ok(false); - } - - fn show_msg(msg : &Message) { - for s in msg.iter(){ - println!("{}",s); - } - } + use super::{Error, *}; + use std::cmp::min; + + //This function checks that the partition size S computed is at least better than the + //one given by a very naive algorithm. To do so, we try to run the naive algorithm + //assuming a partion size of S+1. If we succed, it means that the optimal assignation + //was not optimal. The naive algorithm is the following : + //- we compute the max number of partitions associated to every node, capped at the + //partition number. It gives the number of tokens of every node. + //- every zone has a number of tokens equal to the sum of the tokens of its nodes. + //- we cycle over the partitions and associate zone tokens while respecting the + //zone redundancy constraint. + //NOTE: the naive algorithm is not optimal. Counter example: + //take nb_partition = 3 ; replication_factor = 5; redundancy = 4; + //number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) + //With these parameters, the naive algo fails, whereas there is a solution: + //(A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) + fn check_against_naive(cl: &ClusterLayout) -> Result { + let over_size = cl.partition_size + 1; + let mut zone_token = HashMap::::new(); + let nb_partitions = 1usize << PARTITION_BITS; + + let (zones, zone_to_id) = cl.generate_useful_zone_ids()?; + + if zones.is_empty() { + return Ok(false); + } + + for z in zones.iter() { + zone_token.insert(z.clone(), 0); + } + for uuid in cl.useful_nodes().iter() { + let z = cl.get_node_zone(uuid)?; + let c = cl.get_node_capacity(uuid)?; + zone_token.insert( + z.clone(), + zone_token[&z] + min(nb_partitions, (c / over_size) as usize), + ); + } + + //For every partition, we count the number of zone already associated and + //the name of the last zone associated + + let mut id_zone_token = vec![0; zones.len()]; + for (z, t) in zone_token.iter() { + id_zone_token[zone_to_id[z]] = *t; + } + + let mut nb_token = vec![0; nb_partitions]; + let mut last_zone = vec![zones.len(); nb_partitions]; + + let mut curr_zone = 0; + + let redundancy = cl.parameters.zone_redundancy; + + for replic in 0..cl.replication_factor { + for p in 0..nb_partitions { + while id_zone_token[curr_zone] == 0 + || (last_zone[p] == curr_zone + && redundancy - nb_token[p] <= cl.replication_factor - replic) + { + curr_zone += 1; + if curr_zone >= zones.len() { + return Ok(true); + } + } + id_zone_token[curr_zone] -= 1; + if last_zone[p] != curr_zone { + nb_token[p] += 1; + last_zone[p] = curr_zone; + } + } + } + + return Ok(false); + } + + fn show_msg(msg: &Message) { + for s in msg.iter() { + println!("{}", s); + } + } fn update_layout( cl: &mut ClusterLayout, node_id_vec: &Vec, node_capacity_vec: &Vec, node_zone_vec: &Vec, - zone_redundancy: usize + zone_redundancy: usize, ) { for i in 0..node_id_vec.len() { if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) { @@ -901,12 +1018,12 @@ mod tests { ); cl.roles.merge(&update); } - cl.staged_parameters = Lww::::new(LayoutParameters{zone_redundancy}); + cl.staged_parameters = Lww::::new(LayoutParameters { zone_redundancy }); } #[test] fn test_assignation() { - let mut node_id_vec = vec![1, 2, 3]; + let mut node_id_vec = vec![1, 2, 3]; let mut node_capacity_vec = vec![4000, 1000, 2000]; let mut node_zone_vec = vec!["A", "B", "C"] .into_iter() @@ -936,11 +1053,12 @@ mod tests { assert!(cl.check()); assert!(matches!(check_against_naive(&cl), Ok(true))); - node_capacity_vec = vec![4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000]; + node_capacity_vec = vec![ + 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000, + ]; update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 1); show_msg(&cl.calculate_partition_assignation().unwrap()); assert!(cl.check()); assert!(matches!(check_against_naive(&cl), Ok(true))); - } } diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 1036a8e1..17e92dd7 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -7,12 +7,11 @@ mod consul; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; -pub mod layout; pub mod graph_algo; +pub mod layout; pub mod ring; pub mod system; - mod metrics; pub mod rpc_helper; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 655d21de..9e0bfa11 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -565,7 +565,6 @@ impl System { return Err(Error::Message(msg)); } - let update_ring = self.update_ring.lock().await; let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); -- cgit v1.2.3