From 523d2ecb9511f74e144cd116b942d6c1bf0f546d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 11:19:43 +0100 Subject: layout: use separate CRDT for staged layout changes --- src/rpc/graph_algo.rs | 415 ------------------------------------------- src/rpc/layout/graph_algo.rs | 405 +++++++++++++++++++++++++++++++++++++++++ src/rpc/layout/history.rs | 82 +++------ src/rpc/layout/mod.rs | 4 +- src/rpc/layout/schema.rs | 106 ++++++++++- src/rpc/layout/tracker.rs | 21 --- src/rpc/layout/version.rs | 54 +----- src/rpc/lib.rs | 1 - 8 files changed, 535 insertions(+), 553 deletions(-) delete mode 100644 src/rpc/graph_algo.rs create mode 100644 src/rpc/layout/graph_algo.rs delete mode 100644 src/rpc/layout/tracker.rs (limited to 'src/rpc') diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs deleted file mode 100644 index d8c6c9b9..00000000 --- a/src/rpc/graph_algo.rs +++ /dev/null @@ -1,415 +0,0 @@ -//! This module deals with graph algorithms. -//! It is used in layout.rs to build the partition to node assignment. - -use rand::prelude::{SeedableRng, SliceRandom}; -use std::cmp::{max, min}; -use std::collections::HashMap; -use std::collections::VecDeque; - -/// Vertex data structures used in all the graphs used in layout.rs. -/// usize parameters correspond to node/zone/partitions ids. -/// To understand the vertex roles below, please refer to the formal description -/// of the layout computation algorithm. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -pub enum Vertex { - Source, - Pup(usize), // The vertex p+ of partition p - Pdown(usize), // The vertex p- of partition p - PZ(usize, usize), // The vertex corresponding to x_(partition p, zone z) - N(usize), // The vertex corresponding to node n - Sink, -} - -/// Edge data structure for the flow algorithm. -#[derive(Clone, Copy, Debug)] -pub struct FlowEdge { - cap: u64, // flow maximal capacity of the edge - flow: i64, // flow value on the edge - dest: usize, // destination vertex id - rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v -} - -/// Edge data structure for the detection of negative cycles. -#[derive(Clone, Copy, Debug)] -pub struct WeightedEdge { - w: i64, // weight of the edge - dest: usize, -} - -pub trait Edge: Clone + Copy {} -impl Edge for FlowEdge {} -impl Edge for WeightedEdge {} - -/// Struct for the graph structure. We do encapsulation here to be able to both -/// provide user friendly Vertex enum to address vertices, and to use internally usize -/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed. -pub struct Graph { - vertex_to_id: HashMap, - id_to_vertex: Vec, - - // The graph is stored as an adjacency list - graph: Vec>, -} - -pub type CostFunction = HashMap<(Vertex, Vertex), i64>; - -impl Graph { - pub fn new(vertices: &[Vertex]) -> Self { - let mut map = HashMap::::new(); - for (i, vert) in vertices.iter().enumerate() { - map.insert(*vert, i); - } - Graph:: { - vertex_to_id: map, - id_to_vertex: vertices.to_vec(), - graph: vec![Vec::::new(); vertices.len()], - } - } - - fn get_vertex_id(&self, v: &Vertex) -> Result { - self.vertex_to_id - .get(v) - .cloned() - .ok_or_else(|| format!("The graph does not contain vertex {:?}", v)) - } -} - -impl Graph { - /// This function adds a directed edge to the graph with capacity c, and the - /// corresponding reversed edge with capacity 0. - pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> { - let idu = self.get_vertex_id(&u)?; - let idv = self.get_vertex_id(&v)?; - if idu == idv { - return Err("Cannot add edge from vertex to itself in flow graph".into()); - } - - let rev_u = self.graph[idu].len(); - let rev_v = self.graph[idv].len(); - self.graph[idu].push(FlowEdge { - cap: c, - dest: idv, - flow: 0, - rev: rev_v, - }); - self.graph[idv].push(FlowEdge { - cap: 0, - dest: idu, - flow: 0, - rev: rev_u, - }); - Ok(()) - } - - /// This function returns the list of vertices that receive a positive flow from - /// vertex v. - pub fn get_positive_flow_from(&self, v: Vertex) -> Result, String> { - let idv = self.get_vertex_id(&v)?; - let mut result = Vec::::new(); - for edge in self.graph[idv].iter() { - if edge.flow > 0 { - result.push(self.id_to_vertex[edge.dest]); - } - } - Ok(result) - } - - /// This function returns the value of the flow incoming to v. - pub fn get_inflow(&self, v: Vertex) -> Result { - let idv = self.get_vertex_id(&v)?; - let mut result = 0; - for edge in self.graph[idv].iter() { - result += max(0, self.graph[edge.dest][edge.rev].flow); - } - Ok(result) - } - - /// This function returns the value of the flow outgoing from v. - pub fn get_outflow(&self, v: Vertex) -> Result { - let idv = self.get_vertex_id(&v)?; - let mut result = 0; - for edge in self.graph[idv].iter() { - result += max(0, edge.flow); - } - Ok(result) - } - - /// This function computes the flow total value by computing the outgoing flow - /// from the source. - pub fn get_flow_value(&mut self) -> Result { - self.get_outflow(Vertex::Source) - } - - /// This function shuffles the order of the edge lists. It keeps the ids of the - /// reversed edges consistent. - fn shuffle_edges(&mut self) { - // We use deterministic randomness so that the layout calculation algorihtm - // will output the same thing every time it is run. This way, the results - // pre-calculated in `garage layout show` will match exactly those used - // in practice with `garage layout apply` - let mut rng = rand::rngs::StdRng::from_seed([0x12u8; 32]); - for i in 0..self.graph.len() { - self.graph[i].shuffle(&mut rng); - // We need to update the ids of the reverse edges. - for j in 0..self.graph[i].len() { - let target_v = self.graph[i][j].dest; - let target_rev = self.graph[i][j].rev; - self.graph[target_v][target_rev].rev = j; - } - } - } - - /// Computes an upper bound of the flow on the graph - pub fn flow_upper_bound(&self) -> Result { - let idsource = self.get_vertex_id(&Vertex::Source)?; - let mut flow_upper_bound = 0; - for edge in self.graph[idsource].iter() { - flow_upper_bound += edge.cap; - } - Ok(flow_upper_bound) - } - - /// This function computes the maximal flow using Dinic's algorithm. It starts with - /// the flow values already present in the graph. So it is possible to add some edge to - /// the graph, compute a flow, add other edges, update the flow. - pub fn compute_maximal_flow(&mut self) -> Result<(), String> { - let idsource = self.get_vertex_id(&Vertex::Source)?; - let idsink = self.get_vertex_id(&Vertex::Sink)?; - - let nb_vertices = self.graph.len(); - - let flow_upper_bound = self.flow_upper_bound()?; - - // To ensure the dispersion of the associations generated by the - // assignment, we shuffle the neighbours of the nodes. Hence, - // the vertices do not consider their neighbours in the same order. - self.shuffle_edges(); - - // We run Dinic's max flow algorithm - loop { - // We build the level array from Dinic's algorithm. - let mut level = vec![None; nb_vertices]; - - let mut fifo = VecDeque::new(); - fifo.push_back((idsource, 0)); - while let Some((id, lvl)) = fifo.pop_front() { - if level[id].is_none() { - // it means id has not yet been reached - level[id] = Some(lvl); - for edge in self.graph[id].iter() { - if edge.cap as i64 - edge.flow > 0 { - fifo.push_back((edge.dest, lvl + 1)); - } - } - } - } - if level[idsink].is_none() { - // There is no residual flow - break; - } - // Now we run DFS respecting the level array - let mut next_nbd = vec![0; nb_vertices]; - let mut lifo = Vec::new(); - - lifo.push((idsource, flow_upper_bound)); - - while let Some((id, f)) = lifo.last().cloned() { - if id == idsink { - // The DFS reached the sink, we can add a - // residual flow. - lifo.pop(); - while let Some((id, _)) = lifo.pop() { - let nbd = next_nbd[id]; - self.graph[id][nbd].flow += f as i64; - let id_rev = self.graph[id][nbd].dest; - let nbd_rev = self.graph[id][nbd].rev; - self.graph[id_rev][nbd_rev].flow -= f as i64; - } - lifo.push((idsource, flow_upper_bound)); - continue; - } - // else we did not reach the sink - let nbd = next_nbd[id]; - if nbd >= self.graph[id].len() { - // There is nothing to explore from id anymore - lifo.pop(); - if let Some((parent, _)) = lifo.last() { - next_nbd[*parent] += 1; - } - continue; - } - // else we can try to send flow from id to its nbd - let new_flow = min( - f as i64, - self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow, - ) as u64; - if new_flow == 0 { - next_nbd[id] += 1; - continue; - } - if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) { - if lvldest <= lvlid { - // We cannot send flow to nbd. - next_nbd[id] += 1; - continue; - } - } - // otherwise, we send flow to nbd. - lifo.push((self.graph[id][nbd].dest, new_flow)); - } - } - Ok(()) - } - - /// This function takes a flow, and a cost function on the edges, and tries to find an - /// equivalent flow with a better cost, by finding improving overflow cycles. It uses - /// as subroutine the Bellman Ford algorithm run up to path_length. - /// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and - /// only one needs to be present in the cost function. - pub fn optimize_flow_with_cost( - &mut self, - cost: &CostFunction, - path_length: usize, - ) -> Result<(), String> { - // We build the weighted graph g where we will look for negative cycle - let mut gf = self.build_cost_graph(cost)?; - let mut cycles = gf.list_negative_cycles(path_length); - while !cycles.is_empty() { - // we enumerate negative cycles - for c in cycles.iter() { - for i in 0..c.len() { - // We add one flow unit to the edge (u,v) of cycle c - let idu = self.vertex_to_id[&c[i]]; - let idv = self.vertex_to_id[&c[(i + 1) % c.len()]]; - for j in 0..self.graph[idu].len() { - // since idu appears at most once in the cycles, we enumerate every - // edge at most once. - let edge = self.graph[idu][j]; - if edge.dest == idv { - self.graph[idu][j].flow += 1; - self.graph[idv][edge.rev].flow -= 1; - break; - } - } - } - } - - gf = self.build_cost_graph(cost)?; - cycles = gf.list_negative_cycles(path_length); - } - Ok(()) - } - - /// Construct the weighted graph G_f from the flow and the cost function - fn build_cost_graph(&self, cost: &CostFunction) -> Result, String> { - let mut g = Graph::::new(&self.id_to_vertex); - let nb_vertices = self.id_to_vertex.len(); - for i in 0..nb_vertices { - for edge in self.graph[i].iter() { - if edge.cap as i64 - edge.flow > 0 { - // It is possible to send overflow through this edge - let u = self.id_to_vertex[i]; - let v = self.id_to_vertex[edge.dest]; - if cost.contains_key(&(u, v)) { - g.add_edge(u, v, cost[&(u, v)])?; - } else if cost.contains_key(&(v, u)) { - g.add_edge(u, v, -cost[&(v, u)])?; - } else { - g.add_edge(u, v, 0)?; - } - } - } - } - Ok(g) - } -} - -impl Graph { - /// This function adds a single directed weighted edge to the graph. - pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> { - let idu = self.get_vertex_id(&u)?; - let idv = self.get_vertex_id(&v)?; - self.graph[idu].push(WeightedEdge { w, dest: idv }); - Ok(()) - } - - /// This function lists the negative cycles it manages to find after path_length - /// iterations of the main loop of the Bellman-Ford algorithm. For the classical - /// algorithm, path_length needs to be equal to the number of vertices. However, - /// for particular graph structures like in our case, the algorithm is still correct - /// when path_length is the length of the longest possible simple path. - /// See the formal description of the algorithm for more details. - fn list_negative_cycles(&self, path_length: usize) -> Vec> { - let nb_vertices = self.graph.len(); - - // We start with every vertex at distance 0 of some imaginary extra -1 vertex. - let mut distance = vec![0; nb_vertices]; - // The prev vector collects for every vertex from where does the shortest path come - let mut prev = vec![None; nb_vertices]; - - for _ in 0..path_length + 1 { - for id in 0..nb_vertices { - for e in self.graph[id].iter() { - if distance[id] + e.w < distance[e.dest] { - distance[e.dest] = distance[id] + e.w; - prev[e.dest] = Some(id); - } - } - } - } - - // If self.graph contains a negative cycle, then at this point the graph described - // by prev (which is a directed 1-forest/functional graph) - // must contain a cycle. We list the cycles of prev. - let cycles_prev = cycles_of_1_forest(&prev); - - // Remark that the cycle in prev is in the reverse order compared to the cycle - // in the graph. Thus the .rev(). - return cycles_prev - .iter() - .map(|cycle| { - cycle - .iter() - .rev() - .map(|id| self.id_to_vertex[*id]) - .collect() - }) - .collect(); - } -} - -/// This function returns the list of cycles of a directed 1 forest. It does not -/// check for the consistency of the input. -fn cycles_of_1_forest(forest: &[Option]) -> Vec> { - let mut cycles = Vec::>::new(); - let mut time_of_discovery = vec![None; forest.len()]; - - for t in 0..forest.len() { - let mut id = t; - // while we are on a valid undiscovered node - while time_of_discovery[id].is_none() { - time_of_discovery[id] = Some(t); - if let Some(i) = forest[id] { - id = i; - } else { - break; - } - } - if forest[id].is_some() && time_of_discovery[id] == Some(t) { - // We discovered an id that we explored at this iteration t. - // It means we are on a cycle - let mut cy = vec![id; 1]; - let mut id2 = id; - while let Some(id_next) = forest[id2] { - id2 = id_next; - if id2 != id { - cy.push(id2); - } else { - break; - } - } - cycles.push(cy); - } - } - cycles -} diff --git a/src/rpc/layout/graph_algo.rs b/src/rpc/layout/graph_algo.rs new file mode 100644 index 00000000..bd33e97f --- /dev/null +++ b/src/rpc/layout/graph_algo.rs @@ -0,0 +1,405 @@ +//! This module deals with graph algorithms. +//! It is used in layout.rs to build the partition to node assignment. + +use rand::prelude::{SeedableRng, SliceRandom}; +use std::cmp::{max, min}; +use std::collections::HashMap; +use std::collections::VecDeque; + +/// Vertex data structures used in all the graphs used in layout.rs. +/// usize parameters correspond to node/zone/partitions ids. +/// To understand the vertex roles below, please refer to the formal description +/// of the layout computation algorithm. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Vertex { + Source, + Pup(usize), // The vertex p+ of partition p + Pdown(usize), // The vertex p- of partition p + PZ(usize, usize), // The vertex corresponding to x_(partition p, zone z) + N(usize), // The vertex corresponding to node n + Sink, +} + +/// Edge data structure for the flow algorithm. +#[derive(Clone, Copy, Debug)] +pub struct FlowEdge { + cap: u64, // flow maximal capacity of the edge + flow: i64, // flow value on the edge + dest: usize, // destination vertex id + rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v +} + +/// Edge data structure for the detection of negative cycles. +#[derive(Clone, Copy, Debug)] +pub struct WeightedEdge { + w: i64, // weight of the edge + dest: usize, +} + +pub trait Edge: Clone + Copy {} +impl Edge for FlowEdge {} +impl Edge for WeightedEdge {} + +/// Struct for the graph structure. We do encapsulation here to be able to both +/// provide user friendly Vertex enum to address vertices, and to use internally usize +/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed. +pub struct Graph { + vertex_to_id: HashMap, + id_to_vertex: Vec, + + // The graph is stored as an adjacency list + graph: Vec>, +} + +pub type CostFunction = HashMap<(Vertex, Vertex), i64>; + +impl Graph { + pub fn new(vertices: &[Vertex]) -> Self { + let mut map = HashMap::::new(); + for (i, vert) in vertices.iter().enumerate() { + map.insert(*vert, i); + } + Graph:: { + vertex_to_id: map, + id_to_vertex: vertices.to_vec(), + graph: vec![Vec::::new(); vertices.len()], + } + } + + fn get_vertex_id(&self, v: &Vertex) -> Result { + self.vertex_to_id + .get(v) + .cloned() + .ok_or_else(|| format!("The graph does not contain vertex {:?}", v)) + } +} + +impl Graph { + /// This function adds a directed edge to the graph with capacity c, and the + /// corresponding reversed edge with capacity 0. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> { + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; + if idu == idv { + return Err("Cannot add edge from vertex to itself in flow graph".into()); + } + + let rev_u = self.graph[idu].len(); + let rev_v = self.graph[idv].len(); + self.graph[idu].push(FlowEdge { + cap: c, + dest: idv, + flow: 0, + rev: rev_v, + }); + self.graph[idv].push(FlowEdge { + cap: 0, + dest: idu, + flow: 0, + rev: rev_u, + }); + Ok(()) + } + + /// This function returns the list of vertices that receive a positive flow from + /// vertex v. + pub fn get_positive_flow_from(&self, v: Vertex) -> Result, String> { + let idv = self.get_vertex_id(&v)?; + let mut result = Vec::::new(); + for edge in self.graph[idv].iter() { + if edge.flow > 0 { + result.push(self.id_to_vertex[edge.dest]); + } + } + Ok(result) + } + + /// This function returns the value of the flow outgoing from v. + pub fn get_outflow(&self, v: Vertex) -> Result { + let idv = self.get_vertex_id(&v)?; + let mut result = 0; + for edge in self.graph[idv].iter() { + result += max(0, edge.flow); + } + Ok(result) + } + + /// This function computes the flow total value by computing the outgoing flow + /// from the source. + pub fn get_flow_value(&mut self) -> Result { + self.get_outflow(Vertex::Source) + } + + /// This function shuffles the order of the edge lists. It keeps the ids of the + /// reversed edges consistent. + fn shuffle_edges(&mut self) { + // We use deterministic randomness so that the layout calculation algorihtm + // will output the same thing every time it is run. This way, the results + // pre-calculated in `garage layout show` will match exactly those used + // in practice with `garage layout apply` + let mut rng = rand::rngs::StdRng::from_seed([0x12u8; 32]); + for i in 0..self.graph.len() { + self.graph[i].shuffle(&mut rng); + // We need to update the ids of the reverse edges. + for j in 0..self.graph[i].len() { + let target_v = self.graph[i][j].dest; + let target_rev = self.graph[i][j].rev; + self.graph[target_v][target_rev].rev = j; + } + } + } + + /// Computes an upper bound of the flow on the graph + pub fn flow_upper_bound(&self) -> Result { + let idsource = self.get_vertex_id(&Vertex::Source)?; + let mut flow_upper_bound = 0; + for edge in self.graph[idsource].iter() { + flow_upper_bound += edge.cap; + } + Ok(flow_upper_bound) + } + + /// This function computes the maximal flow using Dinic's algorithm. It starts with + /// the flow values already present in the graph. So it is possible to add some edge to + /// the graph, compute a flow, add other edges, update the flow. + pub fn compute_maximal_flow(&mut self) -> Result<(), String> { + let idsource = self.get_vertex_id(&Vertex::Source)?; + let idsink = self.get_vertex_id(&Vertex::Sink)?; + + let nb_vertices = self.graph.len(); + + let flow_upper_bound = self.flow_upper_bound()?; + + // To ensure the dispersion of the associations generated by the + // assignment, we shuffle the neighbours of the nodes. Hence, + // the vertices do not consider their neighbours in the same order. + self.shuffle_edges(); + + // We run Dinic's max flow algorithm + loop { + // We build the level array from Dinic's algorithm. + let mut level = vec![None; nb_vertices]; + + let mut fifo = VecDeque::new(); + fifo.push_back((idsource, 0)); + while let Some((id, lvl)) = fifo.pop_front() { + if level[id].is_none() { + // it means id has not yet been reached + level[id] = Some(lvl); + for edge in self.graph[id].iter() { + if edge.cap as i64 - edge.flow > 0 { + fifo.push_back((edge.dest, lvl + 1)); + } + } + } + } + if level[idsink].is_none() { + // There is no residual flow + break; + } + // Now we run DFS respecting the level array + let mut next_nbd = vec![0; nb_vertices]; + let mut lifo = Vec::new(); + + lifo.push((idsource, flow_upper_bound)); + + while let Some((id, f)) = lifo.last().cloned() { + if id == idsink { + // The DFS reached the sink, we can add a + // residual flow. + lifo.pop(); + while let Some((id, _)) = lifo.pop() { + let nbd = next_nbd[id]; + self.graph[id][nbd].flow += f as i64; + let id_rev = self.graph[id][nbd].dest; + let nbd_rev = self.graph[id][nbd].rev; + self.graph[id_rev][nbd_rev].flow -= f as i64; + } + lifo.push((idsource, flow_upper_bound)); + continue; + } + // else we did not reach the sink + let nbd = next_nbd[id]; + if nbd >= self.graph[id].len() { + // There is nothing to explore from id anymore + lifo.pop(); + if let Some((parent, _)) = lifo.last() { + next_nbd[*parent] += 1; + } + continue; + } + // else we can try to send flow from id to its nbd + let new_flow = min( + f as i64, + self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow, + ) as u64; + if new_flow == 0 { + next_nbd[id] += 1; + continue; + } + if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) { + if lvldest <= lvlid { + // We cannot send flow to nbd. + next_nbd[id] += 1; + continue; + } + } + // otherwise, we send flow to nbd. + lifo.push((self.graph[id][nbd].dest, new_flow)); + } + } + Ok(()) + } + + /// This function takes a flow, and a cost function on the edges, and tries to find an + /// equivalent flow with a better cost, by finding improving overflow cycles. It uses + /// as subroutine the Bellman Ford algorithm run up to path_length. + /// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and + /// only one needs to be present in the cost function. + pub fn optimize_flow_with_cost( + &mut self, + cost: &CostFunction, + path_length: usize, + ) -> Result<(), String> { + // We build the weighted graph g where we will look for negative cycle + let mut gf = self.build_cost_graph(cost)?; + let mut cycles = gf.list_negative_cycles(path_length); + while !cycles.is_empty() { + // we enumerate negative cycles + for c in cycles.iter() { + for i in 0..c.len() { + // We add one flow unit to the edge (u,v) of cycle c + let idu = self.vertex_to_id[&c[i]]; + let idv = self.vertex_to_id[&c[(i + 1) % c.len()]]; + for j in 0..self.graph[idu].len() { + // since idu appears at most once in the cycles, we enumerate every + // edge at most once. + let edge = self.graph[idu][j]; + if edge.dest == idv { + self.graph[idu][j].flow += 1; + self.graph[idv][edge.rev].flow -= 1; + break; + } + } + } + } + + gf = self.build_cost_graph(cost)?; + cycles = gf.list_negative_cycles(path_length); + } + Ok(()) + } + + /// Construct the weighted graph G_f from the flow and the cost function + fn build_cost_graph(&self, cost: &CostFunction) -> Result, String> { + let mut g = Graph::::new(&self.id_to_vertex); + let nb_vertices = self.id_to_vertex.len(); + for i in 0..nb_vertices { + for edge in self.graph[i].iter() { + if edge.cap as i64 - edge.flow > 0 { + // It is possible to send overflow through this edge + let u = self.id_to_vertex[i]; + let v = self.id_to_vertex[edge.dest]; + if cost.contains_key(&(u, v)) { + g.add_edge(u, v, cost[&(u, v)])?; + } else if cost.contains_key(&(v, u)) { + g.add_edge(u, v, -cost[&(v, u)])?; + } else { + g.add_edge(u, v, 0)?; + } + } + } + } + Ok(g) + } +} + +impl Graph { + /// This function adds a single directed weighted edge to the graph. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> { + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; + self.graph[idu].push(WeightedEdge { w, dest: idv }); + Ok(()) + } + + /// This function lists the negative cycles it manages to find after path_length + /// iterations of the main loop of the Bellman-Ford algorithm. For the classical + /// algorithm, path_length needs to be equal to the number of vertices. However, + /// for particular graph structures like in our case, the algorithm is still correct + /// when path_length is the length of the longest possible simple path. + /// See the formal description of the algorithm for more details. + fn list_negative_cycles(&self, path_length: usize) -> Vec> { + let nb_vertices = self.graph.len(); + + // We start with every vertex at distance 0 of some imaginary extra -1 vertex. + let mut distance = vec![0; nb_vertices]; + // The prev vector collects for every vertex from where does the shortest path come + let mut prev = vec![None; nb_vertices]; + + for _ in 0..path_length + 1 { + for id in 0..nb_vertices { + for e in self.graph[id].iter() { + if distance[id] + e.w < distance[e.dest] { + distance[e.dest] = distance[id] + e.w; + prev[e.dest] = Some(id); + } + } + } + } + + // If self.graph contains a negative cycle, then at this point the graph described + // by prev (which is a directed 1-forest/functional graph) + // must contain a cycle. We list the cycles of prev. + let cycles_prev = cycles_of_1_forest(&prev); + + // Remark that the cycle in prev is in the reverse order compared to the cycle + // in the graph. Thus the .rev(). + return cycles_prev + .iter() + .map(|cycle| { + cycle + .iter() + .rev() + .map(|id| self.id_to_vertex[*id]) + .collect() + }) + .collect(); + } +} + +/// This function returns the list of cycles of a directed 1 forest. It does not +/// check for the consistency of the input. +fn cycles_of_1_forest(forest: &[Option]) -> Vec> { + let mut cycles = Vec::>::new(); + let mut time_of_discovery = vec![None; forest.len()]; + + for t in 0..forest.len() { + let mut id = t; + // while we are on a valid undiscovered node + while time_of_discovery[id].is_none() { + time_of_discovery[id] = Some(t); + if let Some(i) = forest[id] { + id = i; + } else { + break; + } + } + if forest[id].is_some() && time_of_discovery[id] == Some(t) { + // We discovered an id that we explored at this iteration t. + // It means we are on a cycle + let mut cy = vec![id; 1]; + let mut id2 = id; + while let Some(id_next) = forest[id2] { + id2 = id_next; + if id2 != id { + cy.push(id2); + } else { + break; + } + } + cycles.push(cy); + } + } + cycles +} diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index e59c9e9c..9ae28887 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -12,14 +10,15 @@ impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); - let staging_parameters = Lww::::new(version.parameters); - let empty_lwwmap = LwwMap::new(); + let staging = LayoutStaging { + parameters: Lww::::new(version.parameters), + roles: LwwMap::new(), + }; let mut ret = LayoutHistory { versions: vec![version].into_boxed_slice().into(), update_trackers: Default::default(), - staging_parameters, - staging_roles: empty_lwwmap, + staging: Lww::raw(0, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -31,8 +30,7 @@ impl LayoutHistory { } pub(crate) fn calculate_staging_hash(&self) -> Hash { - let hashed_tuple = (&self.staging_roles, &self.staging_parameters); - blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } // ================== updates to layout, public interface =================== @@ -41,26 +39,10 @@ impl LayoutHistory { let mut changed = false; // Merge staged layout changes - match other.current().version.cmp(&self.current().version) { - Ordering::Greater => { - self.staging_parameters = other.staging_parameters.clone(); - self.staging_roles = other.staging_roles.clone(); - self.staging_hash = other.staging_hash; - changed = true; - } - Ordering::Equal => { - self.staging_parameters.merge(&other.staging_parameters); - self.staging_roles.merge(&other.staging_roles); - - let new_staging_hash = self.calculate_staging_hash(); - if new_staging_hash != self.staging_hash { - changed = true; - } - - self.staging_hash = new_staging_hash; - } - Ordering::Less => (), + if self.staging != other.staging { + changed = true; } + self.staging.merge(&other.staging); // Add any new versions to history for v2 in other.versions.iter() { @@ -102,50 +84,34 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + // Compute new version and add it to history let mut new_version = self.current().clone(); new_version.version += 1; - new_version.roles.merge(&self.staging_roles); + new_version.roles.merge(&self.staging.get().roles); new_version.roles.retain(|(_, _, v)| v.0.is_some()); - new_version.parameters = *self.staging_parameters.get(); - - self.staging_roles.clear(); - self.staging_hash = self.calculate_staging_hash(); + new_version.parameters = *self.staging.get().parameters.get(); let msg = new_version.calculate_partition_assignment()?; - self.versions.push(new_version); + // Reset the staged layout changes + self.staging.update(LayoutStaging { + parameters: self.staging.get().parameters.clone(), + roles: LwwMap::new(), + }); + self.staging_hash = self.calculate_staging_hash(); + Ok((self, msg)) } - pub fn revert_staged_changes(mut self, version: Option) -> Result { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.current().version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } - } - - self.staging_roles.clear(); - self.staging_parameters.update(self.current().parameters); + pub fn revert_staged_changes(mut self) -> Result { + self.staging.update(LayoutStaging { + parameters: Lww::new(self.current().parameters.clone()), + roles: LwwMap::new(), + }); self.staging_hash = self.calculate_staging_hash(); - // TODO this is stupid, we should have a separate version counter/LWW - // for the staging params - let mut new_version = self.current().clone(); - new_version.version += 1; - - self.versions.push(new_version); - Ok(self) } diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 122d4b65..7c15988a 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -1,8 +1,10 @@ +mod graph_algo; mod history; mod schema; -mod tracker; mod version; +// ---- re-exports ---- + pub use history::*; pub use schema::*; pub use version::*; diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 14e797be..c5b9b1d3 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -1,3 +1,9 @@ +use std::fmt; + +use bytesize::ByteSize; + +use garage_util::crdt::{AutoCrdt, Crdt}; + mod v08 { use crate::layout::CompactNodeType; use garage_util::crdt::LwwMap; @@ -210,6 +216,15 @@ mod v010 { pub ring_assignment_data: Vec, } + /// The staged changes for the next layout version + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutStaging { + /// Parameters to be used in the next partition assignment computation. + pub parameters: Lww, + /// Role changes which are staged for the next version of the layout + pub roles: LwwMap, + } + /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LayoutHistory { @@ -219,10 +234,8 @@ mod v010 { /// Update trackers pub update_trackers: UpdateTrackers, - /// Parameters to be used in the next partition assignment computation. - pub staging_parameters: Lww, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap, + /// Staged changes for the next version + pub staging: Lww, /// Hash of the serialized staging_parameters + staging_roles pub staging_hash: Hash, } @@ -265,6 +278,10 @@ mod v010 { .map(|x| (*x, version.version)) .collect::>(), ); + let staging = LayoutStaging { + parameters: previous.staging_parameters, + roles: previous.staging_roles, + }; let mut ret = Self { versions: vec![version], update_trackers: UpdateTrackers { @@ -272,8 +289,7 @@ mod v010 { sync_map: update_tracker.clone(), sync_ack_map: update_tracker.clone(), }, - staging_parameters: previous.staging_parameters, - staging_roles: previous.staging_roles, + staging: Lww::raw(previous.version, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -283,3 +299,81 @@ mod v010 { } pub use v010::*; + +// ---- utility functions ---- + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for LayoutStaging { + fn merge(&mut self, other: &LayoutStaging) { + self.parameters.merge(&other.parameters); + self.roles.merge(&other.roles); + } +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } + } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } +} + +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) { + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + *v_mut = std::cmp::max(*v_mut, *v); + } else { + self.0.insert(*k, *v); + } + } + } + + pub(crate) fn min(&self) -> u64 { + self.0.iter().map(|(_, v)| *v).min().unwrap_or(0) + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) { + self.ack_map.merge(&other.ack_map); + self.sync_map.merge(&other.sync_map); + self.sync_ack_map.merge(&other.sync_ack_map); + } +} diff --git a/src/rpc/layout/tracker.rs b/src/rpc/layout/tracker.rs deleted file mode 100644 index 778121e4..00000000 --- a/src/rpc/layout/tracker.rs +++ /dev/null @@ -1,21 +0,0 @@ -use super::*; - -impl UpdateTracker { - fn merge(&mut self, other: &UpdateTracker) { - for (k, v) in other.0.iter() { - if let Some(v_mut) = self.0.get_mut(k) { - *v_mut = std::cmp::max(*v_mut, *v); - } else { - self.0.insert(*k, *v); - } - } - } -} - -impl UpdateTrackers { - pub(crate) fn merge(&mut self, other: &UpdateTrackers) { - self.ack_map.merge(&other.ack_map); - self.sync_map.merge(&other.sync_map); - self.sync_ack_map.merge(&other.sync_ack_map); - } -} diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 363bc204..6918fdf9 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -1,69 +1,21 @@ use std::collections::HashMap; use std::collections::HashSet; -use std::fmt; +use std::convert::TryInto; use bytesize::ByteSize; use itertools::Itertools; -use garage_util::crdt::{AutoCrdt, LwwMap}; +use garage_util::crdt::LwwMap; use garage_util::data::*; use garage_util::error::*; -use crate::graph_algo::*; - -use std::convert::TryInto; - +use super::graph_algo::*; use super::schema::*; use super::*; // The Message type will be used to collect information on the algorithm. pub type Message = Vec; -impl AutoCrdt for LayoutParameters { - const WARN_IF_DIFFERENT: bool = true; -} - -impl AutoCrdt for NodeRoleV { - const WARN_IF_DIFFERENT: bool = true; -} - -impl NodeRole { - pub fn capacity_string(&self) -> String { - match self.capacity { - Some(c) => ByteSize::b(c).to_string_as(false), - None => "gateway".to_string(), - } - } - - pub fn tags_string(&self) -> String { - self.tags.join(",") - } -} - -impl fmt::Display for ZoneRedundancy { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ZoneRedundancy::Maximum => write!(f, "maximum"), - ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), - } - } -} - -impl core::str::FromStr for ZoneRedundancy { - type Err = &'static str; - fn from_str(s: &str) -> Result { - match s { - "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), - x => { - let v = x - .parse::() - .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; - Ok(ZoneRedundancy::AtLeast(v)) - } - } - } -} - impl LayoutVersion { pub fn new(replication_factor: usize) -> Self { // We set the default zone redundancy to be Maximum, meaning that the maximum diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 1af8b78e..b5b31c05 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -11,7 +11,6 @@ mod consul; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; -pub mod graph_algo; pub mod layout; pub mod replication_mode; pub mod system; -- cgit v1.2.3