diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 3 | ||||
-rw-r--r-- | src/rpc/graph_algo.rs | 411 | ||||
-rw-r--r-- | src/rpc/layout.rs | 1271 | ||||
-rw-r--r-- | src/rpc/lib.rs | 1 | ||||
-rw-r--r-- | src/rpc/ring.rs | 1 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 18 | ||||
-rw-r--r-- | src/rpc/system.rs | 129 |
7 files changed, 1348 insertions, 486 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index b87374ad..3d4d3ff5 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -18,14 +18,15 @@ garage_util = { version = "0.8.1", path = "../util" } arc-swap = "1.0" bytes = "1.0" +bytesize = "1.1" gethostname = "0.2" hex = "0.4" tracing = "0.1.30" rand = "0.8" +itertools="0.10" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } async-trait = "0.1.7" -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" serde_json = "1.0" diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs new file mode 100644 index 00000000..f181e2ba --- /dev/null +++ b/src/rpc/graph_algo.rs @@ -0,0 +1,411 @@ +//! This module deals with graph algorithms. +//! It is used in layout.rs to build the partition to node assignation. + +use rand::prelude::SliceRandom; +use std::cmp::{max, min}; +use std::collections::HashMap; +use std::collections::VecDeque; + +/// Vertex data structures used in all the graphs used in layout.rs. +/// usize parameters correspond to node/zone/partitions ids. +/// To understand the vertex roles below, please refer to the formal description +/// of the layout computation algorithm. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Vertex { + Source, + Pup(usize), // The vertex p+ of partition p + Pdown(usize), // The vertex p- of partition p + PZ(usize, usize), // The vertex corresponding to x_(partition p, zone z) + N(usize), // The vertex corresponding to node n + Sink, +} + +/// Edge data structure for the flow algorithm. +#[derive(Clone, Copy, Debug)] +pub struct FlowEdge { + cap: u64, // flow maximal capacity of the edge + flow: i64, // flow value on the edge + dest: usize, // destination vertex id + rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v +} + +/// Edge data structure for the detection of negative cycles. +#[derive(Clone, Copy, Debug)] +pub struct WeightedEdge { + w: i64, // weight of the edge + dest: usize, +} + +pub trait Edge: Clone + Copy {} +impl Edge for FlowEdge {} +impl Edge for WeightedEdge {} + +/// Struct for the graph structure. We do encapsulation here to be able to both +/// provide user friendly Vertex enum to address vertices, and to use internally usize +/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed. +pub struct Graph<E: Edge> { + vertex_to_id: HashMap<Vertex, usize>, + id_to_vertex: Vec<Vertex>, + + // The graph is stored as an adjacency list + graph: Vec<Vec<E>>, +} + +pub type CostFunction = HashMap<(Vertex, Vertex), i64>; + +impl<E: Edge> Graph<E> { + pub fn new(vertices: &[Vertex]) -> Self { + let mut map = HashMap::<Vertex, usize>::new(); + for (i, vert) in vertices.iter().enumerate() { + map.insert(*vert, i); + } + Graph::<E> { + vertex_to_id: map, + id_to_vertex: vertices.to_vec(), + graph: vec![Vec::<E>::new(); vertices.len()], + } + } + + fn get_vertex_id(&self, v: &Vertex) -> Result<usize, String> { + self.vertex_to_id + .get(v) + .cloned() + .ok_or_else(|| format!("The graph does not contain vertex {:?}", v)) + } +} + +impl Graph<FlowEdge> { + /// This function adds a directed edge to the graph with capacity c, and the + /// corresponding reversed edge with capacity 0. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> { + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; + if idu == idv { + return Err("Cannot add edge from vertex to itself in flow graph".into()); + } + + let rev_u = self.graph[idu].len(); + let rev_v = self.graph[idv].len(); + self.graph[idu].push(FlowEdge { + cap: c, + dest: idv, + flow: 0, + rev: rev_v, + }); + self.graph[idv].push(FlowEdge { + cap: 0, + dest: idu, + flow: 0, + rev: rev_u, + }); + Ok(()) + } + + /// This function returns the list of vertices that receive a positive flow from + /// vertex v. + pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> { + let idv = self.get_vertex_id(&v)?; + let mut result = Vec::<Vertex>::new(); + for edge in self.graph[idv].iter() { + if edge.flow > 0 { + result.push(self.id_to_vertex[edge.dest]); + } + } + Ok(result) + } + + /// This function returns the value of the flow incoming to v. + pub fn get_inflow(&self, v: Vertex) -> Result<i64, String> { + let idv = self.get_vertex_id(&v)?; + let mut result = 0; + for edge in self.graph[idv].iter() { + result += max(0, self.graph[edge.dest][edge.rev].flow); + } + Ok(result) + } + + /// This function returns the value of the flow outgoing from v. + pub fn get_outflow(&self, v: Vertex) -> Result<i64, String> { + let idv = self.get_vertex_id(&v)?; + let mut result = 0; + for edge in self.graph[idv].iter() { + result += max(0, edge.flow); + } + Ok(result) + } + + /// This function computes the flow total value by computing the outgoing flow + /// from the source. + pub fn get_flow_value(&mut self) -> Result<i64, String> { + self.get_outflow(Vertex::Source) + } + + /// This function shuffles the order of the edge lists. It keeps the ids of the + /// reversed edges consistent. + fn shuffle_edges(&mut self) { + let mut rng = rand::thread_rng(); + for i in 0..self.graph.len() { + self.graph[i].shuffle(&mut rng); + // We need to update the ids of the reverse edges. + for j in 0..self.graph[i].len() { + let target_v = self.graph[i][j].dest; + let target_rev = self.graph[i][j].rev; + self.graph[target_v][target_rev].rev = j; + } + } + } + + /// Computes an upper bound of the flow on the graph + pub fn flow_upper_bound(&self) -> Result<u64, String> { + let idsource = self.get_vertex_id(&Vertex::Source)?; + let mut flow_upper_bound = 0; + for edge in self.graph[idsource].iter() { + flow_upper_bound += edge.cap; + } + Ok(flow_upper_bound) + } + + /// This function computes the maximal flow using Dinic's algorithm. It starts with + /// the flow values already present in the graph. So it is possible to add some edge to + /// the graph, compute a flow, add other edges, update the flow. + pub fn compute_maximal_flow(&mut self) -> Result<(), String> { + let idsource = self.get_vertex_id(&Vertex::Source)?; + let idsink = self.get_vertex_id(&Vertex::Sink)?; + + let nb_vertices = self.graph.len(); + + let flow_upper_bound = self.flow_upper_bound()?; + + // To ensure the dispersion of the associations generated by the + // assignation, we shuffle the neighbours of the nodes. Hence, + // the vertices do not consider their neighbours in the same order. + self.shuffle_edges(); + + // We run Dinic's max flow algorithm + loop { + // We build the level array from Dinic's algorithm. + let mut level = vec![None; nb_vertices]; + + let mut fifo = VecDeque::new(); + fifo.push_back((idsource, 0)); + while let Some((id, lvl)) = fifo.pop_front() { + if level[id] == None { + // it means id has not yet been reached + level[id] = Some(lvl); + for edge in self.graph[id].iter() { + if edge.cap as i64 - edge.flow > 0 { + fifo.push_back((edge.dest, lvl + 1)); + } + } + } + } + if level[idsink] == None { + // There is no residual flow + break; + } + // Now we run DFS respecting the level array + let mut next_nbd = vec![0; nb_vertices]; + let mut lifo = Vec::new(); + + lifo.push((idsource, flow_upper_bound)); + + while let Some((id, f)) = lifo.last().cloned() { + if id == idsink { + // The DFS reached the sink, we can add a + // residual flow. + lifo.pop(); + while let Some((id, _)) = lifo.pop() { + let nbd = next_nbd[id]; + self.graph[id][nbd].flow += f as i64; + let id_rev = self.graph[id][nbd].dest; + let nbd_rev = self.graph[id][nbd].rev; + self.graph[id_rev][nbd_rev].flow -= f as i64; + } + lifo.push((idsource, flow_upper_bound)); + continue; + } + // else we did not reach the sink + let nbd = next_nbd[id]; + if nbd >= self.graph[id].len() { + // There is nothing to explore from id anymore + lifo.pop(); + if let Some((parent, _)) = lifo.last() { + next_nbd[*parent] += 1; + } + continue; + } + // else we can try to send flow from id to its nbd + let new_flow = min( + f as i64, + self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow, + ) as u64; + if new_flow == 0 { + next_nbd[id] += 1; + continue; + } + if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) { + if lvldest <= lvlid { + // We cannot send flow to nbd. + next_nbd[id] += 1; + continue; + } + } + // otherwise, we send flow to nbd. + lifo.push((self.graph[id][nbd].dest, new_flow)); + } + } + Ok(()) + } + + /// This function takes a flow, and a cost function on the edges, and tries to find an + /// equivalent flow with a better cost, by finding improving overflow cycles. It uses + /// as subroutine the Bellman Ford algorithm run up to path_length. + /// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and + /// only one needs to be present in the cost function. + pub fn optimize_flow_with_cost( + &mut self, + cost: &CostFunction, + path_length: usize, + ) -> Result<(), String> { + // We build the weighted graph g where we will look for negative cycle + let mut gf = self.build_cost_graph(cost)?; + let mut cycles = gf.list_negative_cycles(path_length); + while !cycles.is_empty() { + // we enumerate negative cycles + for c in cycles.iter() { + for i in 0..c.len() { + // We add one flow unit to the edge (u,v) of cycle c + let idu = self.vertex_to_id[&c[i]]; + let idv = self.vertex_to_id[&c[(i + 1) % c.len()]]; + for j in 0..self.graph[idu].len() { + // since idu appears at most once in the cycles, we enumerate every + // edge at most once. + let edge = self.graph[idu][j]; + if edge.dest == idv { + self.graph[idu][j].flow += 1; + self.graph[idv][edge.rev].flow -= 1; + break; + } + } + } + } + + gf = self.build_cost_graph(cost)?; + cycles = gf.list_negative_cycles(path_length); + } + Ok(()) + } + + /// Construct the weighted graph G_f from the flow and the cost function + fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> { + let mut g = Graph::<WeightedEdge>::new(&self.id_to_vertex); + let nb_vertices = self.id_to_vertex.len(); + for i in 0..nb_vertices { + for edge in self.graph[i].iter() { + if edge.cap as i64 - edge.flow > 0 { + // It is possible to send overflow through this edge + let u = self.id_to_vertex[i]; + let v = self.id_to_vertex[edge.dest]; + if cost.contains_key(&(u, v)) { + g.add_edge(u, v, cost[&(u, v)])?; + } else if cost.contains_key(&(v, u)) { + g.add_edge(u, v, -cost[&(v, u)])?; + } else { + g.add_edge(u, v, 0)?; + } + } + } + } + Ok(g) + } +} + +impl Graph<WeightedEdge> { + /// This function adds a single directed weighted edge to the graph. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> { + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; + self.graph[idu].push(WeightedEdge { w, dest: idv }); + Ok(()) + } + + /// This function lists the negative cycles it manages to find after path_length + /// iterations of the main loop of the Bellman-Ford algorithm. For the classical + /// algorithm, path_length needs to be equal to the number of vertices. However, + /// for particular graph structures like in our case, the algorithm is still correct + /// when path_length is the length of the longest possible simple path. + /// See the formal description of the algorithm for more details. + fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> { + let nb_vertices = self.graph.len(); + + // We start with every vertex at distance 0 of some imaginary extra -1 vertex. + let mut distance = vec![0; nb_vertices]; + // The prev vector collects for every vertex from where does the shortest path come + let mut prev = vec![None; nb_vertices]; + + for _ in 0..path_length + 1 { + for id in 0..nb_vertices { + for e in self.graph[id].iter() { + if distance[id] + e.w < distance[e.dest] { + distance[e.dest] = distance[id] + e.w; + prev[e.dest] = Some(id); + } + } + } + } + + // If self.graph contains a negative cycle, then at this point the graph described + // by prev (which is a directed 1-forest/functional graph) + // must contain a cycle. We list the cycles of prev. + let cycles_prev = cycles_of_1_forest(&prev); + + // Remark that the cycle in prev is in the reverse order compared to the cycle + // in the graph. Thus the .rev(). + return cycles_prev + .iter() + .map(|cycle| { + cycle + .iter() + .rev() + .map(|id| self.id_to_vertex[*id]) + .collect() + }) + .collect(); + } +} + +/// This function returns the list of cycles of a directed 1 forest. It does not +/// check for the consistency of the input. +fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> { + let mut cycles = Vec::<Vec<usize>>::new(); + let mut time_of_discovery = vec![None; forest.len()]; + + for t in 0..forest.len() { + let mut id = t; + // while we are on a valid undiscovered node + while time_of_discovery[id] == None { + time_of_discovery[id] = Some(t); + if let Some(i) = forest[id] { + id = i; + } else { + break; + } + } + if forest[id] != None && time_of_discovery[id] == Some(t) { + // We discovered an id that we explored at this iteration t. + // It means we are on a cycle + let mut cy = vec![id; 1]; + let mut id2 = id; + while let Some(id_next) = forest[id2] { + id2 = id_next; + if id2 != id { + cy.push(id2); + } else { + break; + } + } + cycles.push(cy); + } + } + cycles +} diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 2fd5acfc..d756f0aa 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -1,14 +1,28 @@ use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; +use std::collections::HashSet; + +use bytesize::ByteSize; +use itertools::Itertools; use serde::{Deserialize, Serialize}; -use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; +use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap}; use garage_util::data::*; +use garage_util::encode::nonversioned_encode; use garage_util::error::*; +use crate::graph_algo::*; + use crate::ring::*; +use std::convert::TryInto; + +const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; + +// The Message type will be used to collect information on the algorithm. +type Message = Vec<String>; + /// The layout of the cluster, i.e. the list of roles /// which are assigned to each cluster node #[derive(Clone, Debug, Serialize, Deserialize)] @@ -16,12 +30,21 @@ pub struct ClusterLayout { pub version: u64, pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignation currently given by + /// ring_assignation_data + pub parameters: LayoutParameters, + pub roles: LwwMap<Uuid, NodeRoleV>, /// node_id_vec: a vector of node IDs with a role assigned /// in the system (this includes gateway nodes). /// The order here is different than the vec stored by `roles`, because: - /// 1. non-gateway nodes are first so that they have lower numbers + /// 1. non-gateway nodes are first so that they have lower numbers holding + /// in u8 (the number of non-gateway nodes is at most 256). /// 2. nodes that don't have a role are excluded (but they need to /// stay in the CRDT as tombstones) pub node_id_vec: Vec<Uuid>, @@ -30,10 +53,24 @@ pub struct ClusterLayout { #[serde(with = "serde_bytes")] pub ring_assignation_data: Vec<CompactNodeType>, + /// Parameters to be used in the next partition assignation computation. + pub staging_parameters: Lww<LayoutParameters>, /// Role changes which are staged for the next version of the layout - pub staging: LwwMap<Uuid, NodeRoleV>, + pub staging_roles: LwwMap<Uuid, NodeRoleV>, pub staging_hash: Hash, } +impl garage_util::migrate::InitialFormat for ClusterLayout {} + +/// This struct is used to set the parameters to be used in the assignation computation +/// algorithm. It is stored as a Crdt. +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct LayoutParameters { + pub zone_redundancy: usize, +} + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRoleV(pub Option<NodeRole>); @@ -45,13 +82,13 @@ impl AutoCrdt for NodeRoleV { /// The user-assigned roles of cluster nodes #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRole { - /// Datacenter at which this entry belong. This information might be used to perform a better - /// geodistribution + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution pub zone: String, - /// The (relative) capacity of the node + /// The capacity of the node /// If this is set to None, the node does not participate in storing data for the system /// and is only active as an API gateway to other nodes - pub capacity: Option<u32>, + pub capacity: Option<u64>, /// A set of tags to recognize the node pub tags: Vec<String>, } @@ -59,26 +96,47 @@ pub struct NodeRole { impl NodeRole { pub fn capacity_string(&self) -> String { match self.capacity { - Some(c) => format!("{}", c), + Some(c) => ByteSize::b(c).to_string_as(false), None => "gateway".to_string(), } } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } } +// Implementation of the ClusterLayout methods unrelated to the assignation algorithm. impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { + // We set the default zone redundancy to be equal to the replication factor, + // i.e. as strict as possible. + let parameters = LayoutParameters { + zone_redundancy: replication_factor, + }; + let staging_parameters = Lww::<LayoutParameters>::new(parameters.clone()); + let empty_lwwmap = LwwMap::new(); - let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); - ClusterLayout { + let mut ret = ClusterLayout { version: 0, replication_factor, + partition_size: 0, roles: LwwMap::new(), node_id_vec: Vec::new(), ring_assignation_data: Vec::new(), - staging: empty_lwwmap, - staging_hash: empty_lwwmap_hash, - } + parameters, + staging_parameters, + staging_roles: empty_lwwmap, + staging_hash: [0u8; 32].into(), + }; + ret.staging_hash = ret.calculate_staging_hash(); + ret + } + + fn calculate_staging_hash(&self) -> Hash { + let hashed_tuple = (&self.staging_roles, &self.staging_parameters); + blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) } pub fn merge(&mut self, other: &ClusterLayout) -> bool { @@ -88,9 +146,10 @@ impl ClusterLayout { true } Ordering::Equal => { - self.staging.merge(&other.staging); + self.staging_parameters.merge(&other.staging_parameters); + self.staging_roles.merge(&other.staging_roles); - let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let new_staging_hash = self.calculate_staging_hash(); let changed = new_staging_hash != self.staging_hash; self.staging_hash = new_staging_hash; @@ -101,7 +160,7 @@ impl ClusterLayout { } } - pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> { match version { None => { let error = r#" @@ -117,19 +176,18 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - self.roles.merge(&self.staging); + self.roles.merge(&self.staging_roles); self.roles.retain(|(_, _, v)| v.0.is_some()); + self.parameters = self.staging_parameters.get().clone(); - if !self.calculate_partition_assignation() { - return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); - } + self.staging_roles.clear(); + self.staging_hash = self.calculate_staging_hash(); - self.staging.clear(); - self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let msg = self.calculate_partition_assignation()?; self.version += 1; - Ok(self) + Ok((self, msg)) } pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { @@ -148,8 +206,9 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - self.staging.clear(); - self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + self.staging_roles.clear(); + self.staging_parameters.update(self.parameters.clone()); + self.staging_hash = self.calculate_staging_hash(); self.version += 1; @@ -174,13 +233,81 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + /// Returns the uuids of the non_gateway nodes in self.node_id_vec. + fn nongateway_nodes(&self) -> Vec<Uuid> { + let mut result = Vec::<Uuid>::new(); + for uuid in self.node_id_vec.iter() { + match self.node_role(uuid) { + Some(role) if role.capacity != None => result.push(*uuid), + _ => (), + } + } + result + } + + /// Given a node uuids, this function returns the label of its zone + fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> { + match self.node_role(uuid) { + Some(role) => Ok(role.zone.clone()), + _ => Err(Error::Message( + "The Uuid does not correspond to a node present in the cluster.".into(), + )), + } + } + + /// Given a node uuids, this function returns its capacity or fails if it does not have any + pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> { + match self.node_role(uuid) { + Some(NodeRole { + capacity: Some(cap), + zone: _, + tags: _, + }) => Ok(*cap), + _ => Err(Error::Message( + "The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity." + .into(), + )), + } + } + + /// Returns the number of partitions associated to this node in the ring + pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> { + for (i, id) in self.node_id_vec.iter().enumerate() { + if id == uuid { + let mut count = 0; + for nod in self.ring_assignation_data.iter() { + if i as u8 == *nod { + count += 1 + } + } + return Ok(count); + } + } + Err(Error::Message( + "The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity." + .into(), + )) + } + + /// Returns the sum of capacities of non gateway nodes in the cluster + fn get_total_capacity(&self) -> Result<u64, Error> { + let mut total_capacity = 0; + for uuid in self.nongateway_nodes().iter() { + total_capacity += self.get_node_capacity(uuid)?; + } + Ok(total_capacity) + } + /// Check a cluster layout for internal consistency + /// (assignation, roles, parameters, partition size) /// returns true if consistent, false if error - pub fn check(&self) -> bool { + pub fn check(&self) -> Result<(), String> { // Check that the hash of the staging data is correct - let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let staging_hash = self.calculate_staging_hash(); if staging_hash != self.staging_hash { - return false; + return Err("staging_hash is incorrect".into()); } // Check that node_id_vec contains the correct list of nodes @@ -195,12 +322,17 @@ To know the correct value of the new layout version, invoke `garage layout show` let mut node_id_vec = self.node_id_vec.clone(); node_id_vec.sort(); if expected_nodes != node_id_vec { - return false; + return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes)); } // Check that the assignation data has the correct length - if self.ring_assignation_data.len() != (1 << PARTITION_BITS) * self.replication_factor { - return false; + let expected_assignation_data_len = (1 << PARTITION_BITS) * self.replication_factor; + if self.ring_assignation_data.len() != expected_assignation_data_len { + return Err(format!( + "ring_assignation_data has incorrect length {} instead of {}", + self.ring_assignation_data.len(), + expected_assignation_data_len + )); } // Check that the assigned nodes are correct identifiers @@ -208,459 +340,776 @@ To know the correct value of the new layout version, invoke `garage layout show` // and that role is not the role of a gateway nodes for x in self.ring_assignation_data.iter() { if *x as usize >= self.node_id_vec.len() { - return false; + return Err(format!( + "ring_assignation_data contains invalid node id {}", + *x + )); } let node = self.node_id_vec[*x as usize]; match self.roles.get(&node) { Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (), - _ => return false, + _ => return Err("ring_assignation_data contains id of a gateway node".into()), } } - true - } + // Check that every partition is associated to distinct nodes + let rf = self.replication_factor; + for p in 0..(1 << PARTITION_BITS) { + let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec(); + if nodes_of_p.iter().unique().count() != rf { + return Err(format!("partition does not contain {} unique node ids", rf)); + } + // Check that every partition is spread over at least zone_redundancy zones. + let zones_of_p = nodes_of_p + .iter() + .map(|n| { + self.get_node_zone(&self.node_id_vec[*n as usize]) + .expect("Zone not found.") + }) + .collect::<Vec<_>>(); + let redundancy = self.parameters.zone_redundancy; + if zones_of_p.iter().unique().count() < redundancy { + return Err(format!( + "nodes of partition are in less than {} distinct zones", + redundancy + )); + } + } - /// Calculate an assignation of partitions to nodes - pub fn calculate_partition_assignation(&mut self) -> bool { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); + // Check that the nodes capacities is consistent with the stored partitions + let mut node_usage = vec![0; MAX_NODE_NUMBER]; + for n in self.ring_assignation_data.iter() { + node_usage[*n as usize] += 1; + } + for (n, usage) in node_usage.iter().enumerate() { + if *usage > 0 { + let uuid = self.node_id_vec[n]; + let partusage = usage * self.partition_size; + let nodecap = self.get_node_capacity(&uuid).unwrap(); + if partusage > nodecap { + return Err(format!( + "node usage ({}) is bigger than node capacity ({})", + usage * self.partition_size, + nodecap + )); + } + } + } - println!("Calculating updated partition assignation, this may take some time..."); - println!(); + // Check that the partition size stored is the one computed by the asignation + // algorithm. + let cl2 = self.clone(); + let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().unwrap(); + match cl2.compute_optimal_partition_size(&zone_to_id) { + Ok(s) if s != self.partition_size => { + return Err(format!( + "partition_size ({}) is different than optimal value ({})", + self.partition_size, s + )) + } + Err(e) => return Err(format!("could not calculate optimal partition size: {}", e)), + _ => (), + } - // Get old partition assignation - let old_partitions = self.parse_assignation_data(); + Ok(()) + } +} - // Start new partition assignation with nodes from old assignation where it is relevant - let mut partitions = old_partitions - .iter() - .map(|old_part| { - let mut new_part = PartitionAss::new(); - for node in old_part.nodes.iter() { - if let Some(role) = node.1 { - if role.capacity.is_some() { - new_part.add(None, n_zones, node.0, role); - } - } - } - new_part - }) - .collect::<Vec<_>>(); +// Implementation of the ClusterLayout methods related to the assignation algorithm. +impl ClusterLayout { + /// This function calculates a new partition-to-node assignation. + /// The computed assignation respects the node replication factor + /// and the zone redundancy parameter It maximizes the capacity of a + /// partition (assuming all partitions have the same size). + /// Among such optimal assignation, it minimizes the distance to + /// the former assignation (if any) to minimize the amount of + /// data to be moved. + /// Staged role changes must be merged with nodes roles before calling this function, + /// hence it must only be called from apply_staged_changes() and hence is not public. + fn calculate_partition_assignation(&mut self) -> Result<Message, Error> { + // We update the node ids, since the node role list might have changed with the + // changes in the layout. We retrieve the old_assignation reframed with new ids + let old_assignation_opt = self.update_node_id_vec()?; + + let mut msg = Message::new(); + msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into()); + msg.push("".into()); + msg.push(format!( + "Partitions are \ + replicated {} times on at least {} distinct zones.", + self.replication_factor, self.parameters.zone_redundancy + )); + + // We generate for once numerical ids for the zones of non gateway nodes, + // to use them as indices in the flow graphs. + let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; + + let nb_nongateway_nodes = self.nongateway_nodes().len(); + if nb_nongateway_nodes < self.replication_factor { + return Err(Error::Message(format!( + "The number of nodes with positive \ + capacity ({}) is smaller than the replication factor ({}).", + nb_nongateway_nodes, self.replication_factor + ))); + } + if id_to_zone.len() < self.parameters.zone_redundancy { + return Err(Error::Message(format!( + "The number of zones with non-gateway \ + nodes ({}) is smaller than the redundancy parameter ({})", + id_to_zone.len(), + self.parameters.zone_redundancy + ))); + } - // In various cases, not enough nodes will have been added for all partitions - // in the step above (e.g. due to node removals, or new zones being added). - // Here we add more nodes to make a complete (but sub-optimal) assignation, - // using an initial partition assignation that is calculated using the multi-dc maglev trick - match self.initial_partition_assignation() { - Some(initial_partitions) => { - for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { - for _ in 0..2 { - for (id, info) in ipart.nodes.iter() { - if part.nodes.len() < self.replication_factor { - part.add(None, n_zones, id, info.unwrap()); - } - } - } - assert!(part.nodes.len() == self.replication_factor); - } - } - None => { - // Not enough nodes in cluster to build a correct assignation. - // Signal it by returning an error. - return false; - } + // We compute the optimal partition size + // Capacities should be given in a unit so that partition size is at least 100. + // In this case, integer rounding plays a marginal role in the percentages of + // optimality. + let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; + + if old_assignation_opt != None { + msg.push(format!( + "Optimal size of a partition: {} (was {} in the previous layout).", + ByteSize::b(partition_size).to_string_as(false), + ByteSize::b(self.partition_size).to_string_as(false) + )); + } else { + msg.push(format!( + "Given the replication and redundancy constraints, the \ + optimal size of a partition is {}.", + ByteSize::b(partition_size).to_string_as(false) + )); + } + // We write the partition size. + self.partition_size = partition_size; + + if partition_size < 100 { + msg.push( + "WARNING: The partition size is low (< 100), make sure the capacities of your nodes are correct and are of at least a few MB" + .into(), + ); } - // Calculate how many partitions each node should ideally store, - // and how many partitions they are storing with the current assignation - // This defines our target for which we will optimize in the following loop. - let total_capacity = configured_nodes - .iter() - .map(|(_, info)| info.capacity.unwrap_or(0)) - .sum::<u32>() as usize; - let total_partitions = self.replication_factor * (1 << PARTITION_BITS); - let target_partitions_per_node = configured_nodes - .iter() - .map(|(id, info)| { - ( - *id, - info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity, - ) - }) - .collect::<HashMap<&Uuid, usize>>(); - - let mut partitions_per_node = self.partitions_per_node(&partitions[..]); - - println!("Target number of partitions per node:"); - for (node, npart) in target_partitions_per_node.iter() { - println!("{:?}\t{}", node, npart); - } - println!(); - - // Shuffle partitions between nodes so that nodes will reach (or better approach) - // their target number of stored partitions - loop { - let mut option = None; - for (i, part) in partitions.iter_mut().enumerate() { - for (irm, (idrm, _)) in part.nodes.iter().enumerate() { - let errratio = |node, parts| { - let tgt = *target_partitions_per_node.get(node).unwrap() as f32; - (parts - tgt) / tgt - }; - let square = |x| x * x; - - let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32; - - for (idadd, infoadd) in configured_nodes.iter() { - // skip replacing a node by itself - // and skip replacing by gateway nodes - if idadd == idrm || infoadd.capacity.is_none() { - continue; - } + // We compute a first flow/assignation that is heuristically close to the previous + // assignation + let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?; + if let Some(assoc) = &old_assignation_opt { + // We minimize the distance to the previous assignation. + self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; + } - // We want to try replacing node idrm by node idadd - // if that brings us close to our goal. - let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32; - let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd)); - let newcost = - square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.)); - if newcost >= oldcost { - // not closer to our goal - continue; - } - let gain = oldcost - newcost; + // We display statistics of the computation + msg.extend(self.output_stat(&gflow, &old_assignation_opt, &zone_to_id, &id_to_zone)?); + msg.push("".to_string()); - let mut newpart = part.clone(); + // We update the layout structure + self.update_ring_from_flow(id_to_zone.len(), &gflow)?; - newpart.nodes.remove(irm); - if !newpart.add(None, n_zones, idadd, infoadd) { - continue; - } - assert!(newpart.nodes.len() == self.replication_factor); + if let Err(e) = self.check() { + return Err(Error::Message( + format!("Layout check returned an error: {}\nOriginal result of computation: <<<<\n{}\n>>>>", e, msg.join("\n")) + )); + } - if !old_partitions[i] - .is_valid_transition_to(&newpart, self.replication_factor) - { - continue; - } + Ok(msg) + } - if option - .as_ref() - .map(|(old_gain, _, _, _, _)| gain > *old_gain) - .unwrap_or(true) - { - option = Some((gain, i, idadd, idrm, newpart)); - } - } - } - } - if let Some((_gain, i, idadd, idrm, newpart)) = option { - *partitions_per_node.entry(idadd).or_insert(0) += 1; - *partitions_per_node.get_mut(idrm).unwrap() -= 1; - partitions[i] = newpart; - } else { - break; - } + /// The LwwMap of node roles might have changed. This function updates the node_id_vec + /// and returns the assignation given by ring, with the new indices of the nodes, and + /// None if the node is not present anymore. + /// We work with the assumption that only this function and calculate_new_assignation + /// do modify assignation_ring and node_id_vec. + fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> { + // (1) We compute the new node list + // Non gateway nodes should be coded on 8bits, hence they must be first in the list + // We build the new node ids + let new_non_gateway_nodes: Vec<Uuid> = self + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity != None)) + .map(|(k, _, _)| *k) + .collect(); + + if new_non_gateway_nodes.len() > MAX_NODE_NUMBER { + return Err(Error::Message(format!( + "There are more than {} non-gateway nodes in the new \ + layout. This is not allowed.", + MAX_NODE_NUMBER + ))); } - // Check we completed the assignation correctly - // (this is a set of checks for the algorithm's consistency) - assert!(partitions.len() == (1 << PARTITION_BITS)); - assert!(partitions + let new_gateway_nodes: Vec<Uuid> = self + .roles + .items() .iter() - .all(|p| p.nodes.len() == self.replication_factor)); - - let new_partitions_per_node = self.partitions_per_node(&partitions[..]); - assert!(new_partitions_per_node == partitions_per_node); - - // Show statistics - println!("New number of partitions per node:"); - for (node, npart) in partitions_per_node.iter() { - let tgt = *target_partitions_per_node.get(node).unwrap(); - let pct = 100f32 * (*npart as f32) / (tgt as f32); - println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt); - } - println!(); - - let mut diffcount = HashMap::new(); - for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) { - let nminus = oldpart.txtplus(newpart); - let nplus = newpart.txtplus(oldpart); - if nminus != "[...]" || nplus != "[...]" { - let tup = (nminus, nplus); - *diffcount.entry(tup).or_insert(0) += 1; - } + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity == None)) + .map(|(k, _, _)| *k) + .collect(); + + let mut new_node_id_vec = Vec::<Uuid>::new(); + new_node_id_vec.extend(new_non_gateway_nodes); + new_node_id_vec.extend(new_gateway_nodes); + + let old_node_id_vec = self.node_id_vec.clone(); + self.node_id_vec = new_node_id_vec.clone(); + + // (2) We retrieve the old association + // We rewrite the old association with the new indices. We only consider partition + // to node assignations where the node is still in use. + if self.ring_assignation_data.is_empty() { + // This is a new association + return Ok(None); } - if diffcount.is_empty() { - println!("No data will be moved between nodes."); - } else { - let mut diffcount = diffcount.into_iter().collect::<Vec<_>>(); - diffcount.sort(); - println!("Number of partitions that move:"); - for ((nminus, nplus), npart) in diffcount { - println!("\t{}\t{} -> {}", npart, nminus, nplus); - } + + if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { + return Err(Error::Message( + "The old assignation does not have a size corresponding to \ + the old replication factor or the number of partitions." + .into(), + )); } - println!(); - // Calculate and save new assignation data - let (nodes, assignation_data) = - self.compute_assignation_data(&configured_nodes[..], &partitions[..]); + // We build a translation table between the uuid and new ids + let mut uuid_to_new_id = HashMap::<Uuid, usize>::new(); + + // We add the indices of only the new non-gateway nodes that can be used in the + // association ring + for (i, uuid) in new_node_id_vec.iter().enumerate() { + uuid_to_new_id.insert(*uuid, i); + } - self.node_id_vec = nodes; - self.ring_assignation_data = assignation_data; + let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS]; + let rf = self.replication_factor; - true + for (p, old_assign_p) in old_assignation.iter_mut().enumerate() { + for old_id in &self.ring_assignation_data[p * rf..(p + 1) * rf] { + let uuid = old_node_id_vec[*old_id as usize]; + if uuid_to_new_id.contains_key(&uuid) { + old_assign_p.push(uuid_to_new_id[&uuid]); + } + } + } + + // We write the ring + self.ring_assignation_data = Vec::<CompactNodeType>::new(); + + Ok(Some(old_assignation)) } - fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> { - let (configured_nodes, zones) = self.configured_nodes_and_zones(); - let n_zones = zones.len(); + /// This function generates ids for the zone of the nodes appearing in + /// self.node_id_vec. + fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> { + let mut id_to_zone = Vec::<String>::new(); + let mut zone_to_id = HashMap::<String, usize>::new(); + + for uuid in self.nongateway_nodes().iter() { + let r = self.node_role(uuid).unwrap(); + if !zone_to_id.contains_key(&r.zone) && r.capacity != None { + zone_to_id.insert(r.zone.clone(), id_to_zone.len()); + id_to_zone.push(r.zone.clone()); + } + } + Ok((id_to_zone, zone_to_id)) + } - // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) - let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); + /// This function computes by dichotomy the largest realizable partition size, given + /// the layout roles and parameters. + fn compute_optimal_partition_size( + &self, + zone_to_id: &HashMap<String, usize>, + ) -> Result<u64, Error> { + let empty_set = HashSet::<(usize, usize)>::new(); + let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 { + return Err(Error::Message( + "The storage capacity of he cluster is to small. It is \ + impossible to store partitions of size 1." + .into(), + )); + } - // Prepare ring - let mut partitions: Vec<PartitionAss> = partitions_idx - .iter() - .map(|_i| PartitionAss::new()) - .collect::<Vec<_>>(); + let mut s_down = 1; + let mut s_up = self.get_total_capacity()?; + while s_down + 1 < s_up { + g = self.generate_flow_graph((s_down + s_up) / 2, zone_to_id, &empty_set)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 { + s_up = (s_down + s_up) / 2; + } else { + s_down = (s_down + s_up) / 2; + } + } - // Create MagLev priority queues for each node - let mut queues = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(node_id, node_info)| { - let mut parts = partitions_idx - .iter() - .map(|i| { - let part_data = - [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat(); - (*i, fasthash(&part_data[..])) - }) - .collect::<Vec<_>>(); - parts.sort_by_key(|(_i, h)| *h); - let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>(); - (node_id, node_info, parts_i, 0) - }) - .collect::<Vec<_>>(); + Ok(s_down) + } - let max_capacity = configured_nodes - .iter() - .filter_map(|(_, node_info)| node_info.capacity) - .fold(0, std::cmp::max); - - // Fill up ring - for rep in 0..self.replication_factor { - queues.sort_by_key(|(ni, _np, _q, _p)| { - let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); - fasthash(&queue_data[..]) - }); - - for (_, _, _, pos) in queues.iter_mut() { - *pos = 0; + fn generate_graph_vertices(nb_zones: usize, nb_nodes: usize) -> Vec<Vertex> { + let mut vertices = vec![Vertex::Source, Vertex::Sink]; + for p in 0..NB_PARTITIONS { + vertices.push(Vertex::Pup(p)); + vertices.push(Vertex::Pdown(p)); + for z in 0..nb_zones { + vertices.push(Vertex::PZ(p, z)); } + } + for n in 0..nb_nodes { + vertices.push(Vertex::N(n)); + } + vertices + } - let mut remaining = partitions_idx.len(); - while remaining > 0 { - let remaining0 = remaining; - for i_round in 0..max_capacity { - for (node_id, node_info, q, pos) in queues.iter_mut() { - if i_round >= node_info.capacity.unwrap() { - continue; - } - for (pos2, &qv) in q.iter().enumerate().skip(*pos) { - if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) { - remaining -= 1; - *pos = pos2 + 1; - break; - } - } - } - } - if remaining == remaining0 { - // No progress made, exit - return None; + /// Generates the graph to compute the maximal flow corresponding to the optimal + /// partition assignation. + /// exclude_assoc is the set of (partition, node) association that we are forbidden + /// to use (hence we do not add the corresponding edge to the graph). This parameter + /// is used to compute a first flow that uses only edges appearing in the previous + /// assignation. This produces a solution that heuristically should be close to the + /// previous one. + fn generate_flow_graph( + &self, + partition_size: u64, + zone_to_id: &HashMap<String, usize>, + exclude_assoc: &HashSet<(usize, usize)>, + ) -> Result<Graph<FlowEdge>, Error> { + let vertices = + ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); + let mut g = Graph::<FlowEdge>::new(&vertices); + let nb_zones = zone_to_id.len(); + let redundancy = self.parameters.zone_redundancy; + for p in 0..NB_PARTITIONS { + g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u64)?; + g.add_edge( + Vertex::Source, + Vertex::Pdown(p), + (self.replication_factor - redundancy) as u64, + )?; + for z in 0..nb_zones { + g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?; + g.add_edge( + Vertex::Pdown(p), + Vertex::PZ(p, z), + self.replication_factor as u64, + )?; + } + } + for n in 0..self.nongateway_nodes().len() { + let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?]; + g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; + for p in 0..NB_PARTITIONS { + if !exclude_assoc.contains(&(p, n)) { + g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?; } } } - - Some(partitions) + Ok(g) } - fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) { - let configured_nodes = self - .roles - .items() - .iter() - .filter(|(_id, _, info)| info.0.is_some()) - .map(|(id, _, info)| (id, info.0.as_ref().unwrap())) - .collect::<Vec<(&Uuid, &NodeRole)>>(); + /// This function computes a first optimal assignation (in the form of a flow graph). + fn compute_candidate_assignation( + &self, + zone_to_id: &HashMap<String, usize>, + prev_assign_opt: &Option<Vec<Vec<usize>>>, + ) -> Result<Graph<FlowEdge>, Error> { + // We list the (partition,node) associations that are not used in the + // previous assignation + let mut exclude_edge = HashSet::<(usize, usize)>::new(); + if let Some(prev_assign) = prev_assign_opt { + let nb_nodes = self.nongateway_nodes().len(); + for (p, prev_assign_p) in prev_assign.iter().enumerate() { + for n in 0..nb_nodes { + exclude_edge.insert((p, n)); + } + for n in prev_assign_p.iter() { + exclude_edge.remove(&(p, *n)); + } + } + } - let zones = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(_id, info)| info.zone.as_str()) - .collect::<HashSet<&str>>(); + // We compute the best flow using only the edges used in the previous assignation + let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?; + g.compute_maximal_flow()?; - (configured_nodes, zones) + // We add the excluded edges and compute the maximal flow with the full graph. + // The algorithm is such that it will start with the flow that we just computed + // and find ameliorating paths from that. + for (p, n) in exclude_edge.iter() { + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; + } + g.compute_maximal_flow()?; + Ok(g) } - fn compute_assignation_data<'a>( + /// This function updates the flow graph gflow to minimize the distance between + /// its corresponding assignation and the previous one + fn minimize_rebalance_load( &self, - configured_nodes: &[(&'a Uuid, &'a NodeRole)], - partitions: &[PartitionAss<'a>], - ) -> (Vec<Uuid>, Vec<CompactNodeType>) { - assert!(partitions.len() == (1 << PARTITION_BITS)); - - // Make a canonical order for nodes - let mut nodes = configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(id, _)| **id) - .collect::<Vec<_>>(); - let nodes_rev = nodes - .iter() - .enumerate() - .map(|(i, id)| (*id, i as CompactNodeType)) - .collect::<HashMap<Uuid, CompactNodeType>>(); - - let mut assignation_data = vec![]; - for partition in partitions.iter() { - assert!(partition.nodes.len() == self.replication_factor); - for (id, _) in partition.nodes.iter() { - assignation_data.push(*nodes_rev.get(id).unwrap()); + gflow: &mut Graph<FlowEdge>, + zone_to_id: &HashMap<String, usize>, + prev_assign: &[Vec<usize>], + ) -> Result<(), Error> { + // We define a cost function on the edges (pairs of vertices) corresponding + // to the distance between the two assignations. + let mut cost = CostFunction::new(); + for (p, assoc_p) in prev_assign.iter().enumerate() { + for n in assoc_p.iter() { + let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1); } } - nodes.extend( - configured_nodes - .iter() - .filter(|(_id, info)| info.capacity.is_none()) - .map(|(id, _)| **id), - ); + // We compute the maximal length of a simple path in gflow. It is used in the + // Bellman-Ford algorithm in optimize_flow_with_cost to set the number + // of iterations. + let nb_nodes = self.nongateway_nodes().len(); + let path_length = 4 * nb_nodes; + gflow.optimize_flow_with_cost(&cost, path_length)?; - (nodes, assignation_data) + Ok(()) } - fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> { - if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) { - // If the previous assignation data is correct, use that - let mut partitions = vec![]; - for i in 0..(1 << PARTITION_BITS) { - let mut part = PartitionAss::new(); - for node_i in self.ring_assignation_data - [i * self.replication_factor..(i + 1) * self.replication_factor] - .iter() - { - let node_id = &self.node_id_vec[*node_i as usize]; - - if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) { - part.nodes.push((node_id, Some(info))); - } else { - part.nodes.push((node_id, None)); + /// This function updates the assignation ring from the flow graph. + fn update_ring_from_flow( + &mut self, + nb_zones: usize, + gflow: &Graph<FlowEdge>, + ) -> Result<(), Error> { + self.ring_assignation_data = Vec::<CompactNodeType>::new(); + for p in 0..NB_PARTITIONS { + for z in 0..nb_zones { + let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + for vertex in assoc_vertex.iter() { + if let Vertex::N(n) = vertex { + self.ring_assignation_data.push((*n).try_into().unwrap()); } } - partitions.push(part); } - partitions - } else { - // Otherwise start fresh - (0..(1 << PARTITION_BITS)) - .map(|_| PartitionAss::new()) - .collect() } + + if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { + return Err(Error::Message( + "Critical Error : the association ring we produced does not \ + have the right size." + .into(), + )); + } + Ok(()) } - fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> { - let mut partitions_per_node = HashMap::<&Uuid, usize>::new(); - for p in partitions.iter() { - for (id, _) in p.nodes.iter() { - *partitions_per_node.entry(*id).or_insert(0) += 1; + /// This function returns a message summing up the partition repartition of the new + /// layout, and other statistics of the partition assignation computation. + fn output_stat( + &self, + gflow: &Graph<FlowEdge>, + prev_assign_opt: &Option<Vec<Vec<usize>>>, + zone_to_id: &HashMap<String, usize>, + id_to_zone: &[String], + ) -> Result<Message, Error> { + let mut msg = Message::new(); + + let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64; + let total_cap = self.get_total_capacity()?; + let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); + msg.push("".into()); + msg.push(format!( + "Usable capacity / Total cluster capacity: {} / {} ({:.1} %)", + ByteSize::b(used_cap).to_string_as(false), + ByteSize::b(total_cap).to_string_as(false), + percent_cap + )); + msg.push("".into()); + msg.push( + "If the percentage is too low, it might be that the \ + replication/redundancy constraints force the use of nodes/zones with small \ + storage capacities. \ + You might want to rebalance the storage capacities or relax the constraints. \ + See the detailed statistics below and look for saturated nodes/zones." + .into(), + ); + msg.push(format!( + "Recall that because of the replication factor, the actual available \ + storage capacity is {} / {} = {}.", + ByteSize::b(used_cap).to_string_as(false), + self.replication_factor, + ByteSize::b(used_cap / self.replication_factor as u64).to_string_as(false) + )); + + // We define and fill in the following tables + let storing_nodes = self.nongateway_nodes(); + let mut new_partitions = vec![0; storing_nodes.len()]; + let mut stored_partitions = vec![0; storing_nodes.len()]; + + let mut new_partitions_zone = vec![0; id_to_zone.len()]; + let mut stored_partitions_zone = vec![0; id_to_zone.len()]; + + for p in 0..NB_PARTITIONS { + for z in 0..id_to_zone.len() { + let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + if !pz_nodes.is_empty() { + stored_partitions_zone[z] += 1; + if let Some(prev_assign) = prev_assign_opt { + let mut old_zones_of_p = Vec::<usize>::new(); + for n in prev_assign[p].iter() { + old_zones_of_p + .push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]); + } + if !old_zones_of_p.contains(&z) { + new_partitions_zone[z] += 1; + } + } + } + for vert in pz_nodes.iter() { + if let Vertex::N(n) = *vert { + stored_partitions[n] += 1; + if let Some(prev_assign) = prev_assign_opt { + if !prev_assign[p].contains(&n) { + new_partitions[n] += 1; + } + } + } + } } } - partitions_per_node + + if *prev_assign_opt == None { + new_partitions = stored_partitions.clone(); + new_partitions_zone = stored_partitions_zone.clone(); + } + + // We display the statistics + + msg.push("".into()); + if *prev_assign_opt != None { + let total_new_partitions: usize = new_partitions.iter().sum(); + msg.push(format!( + "A total of {} new copies of partitions need to be \ + transferred.", + total_new_partitions + )); + } + msg.push("".into()); + msg.push("==== DETAILED STATISTICS BY ZONES AND NODES ====".into()); + + for z in 0..id_to_zone.len() { + let mut nodes_of_z = Vec::<usize>::new(); + for n in 0..storing_nodes.len() { + if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] { + nodes_of_z.push(n); + } + } + let replicated_partitions: usize = + nodes_of_z.iter().map(|n| stored_partitions[*n]).sum(); + msg.push("".into()); + + msg.push(format!( + "Zone {}: {} distinct partitions stored ({} new, \ + {} partition copies) ", + id_to_zone[z], + stored_partitions_zone[z], + new_partitions_zone[z], + replicated_partitions + )); + + let available_cap_z: u64 = self.partition_size * replicated_partitions as u64; + let mut total_cap_z = 0; + for n in nodes_of_z.iter() { + total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; + } + let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); + msg.push(format!( + " Usable capacity / Total capacity: {} / {} ({:.1}%).", + ByteSize::b(available_cap_z).to_string_as(false), + ByteSize::b(total_cap_z).to_string_as(false), + percent_cap_z + )); + + for n in nodes_of_z.iter() { + let available_cap_n = stored_partitions[*n] as u64 * self.partition_size; + let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; + let tags_n = (self + .node_role(&self.node_id_vec[*n]) + .ok_or("Node not found."))? + .tags_string(); + msg.push(format!( + " Node {:?}: {} partitions ({} new) ; \ + usable/total capacity: {} / {} ({:.1}%) ; tags:{}", + self.node_id_vec[*n], + stored_partitions[*n], + new_partitions[*n], + ByteSize::b(available_cap_n).to_string_as(false), + ByteSize::b(total_cap_n).to_string_as(false), + (available_cap_n as f32) / (total_cap_n as f32) * 100.0, + tags_n + )); + } + } + + Ok(msg) } } -// ---- Internal structs for partition assignation in layout ---- +// ==================================================================================== + +#[cfg(test)] +mod tests { + use super::{Error, *}; + use std::cmp::min; + + // This function checks that the partition size S computed is at least better than the + // one given by a very naive algorithm. To do so, we try to run the naive algorithm + // assuming a partion size of S+1. If we succed, it means that the optimal assignation + // was not optimal. The naive algorithm is the following : + // - we compute the max number of partitions associated to every node, capped at the + // partition number. It gives the number of tokens of every node. + // - every zone has a number of tokens equal to the sum of the tokens of its nodes. + // - we cycle over the partitions and associate zone tokens while respecting the + // zone redundancy constraint. + // NOTE: the naive algorithm is not optimal. Counter example: + // take nb_partition = 3 ; replication_factor = 5; redundancy = 4; + // number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) + // With these parameters, the naive algo fails, whereas there is a solution: + // (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) + fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> { + let over_size = cl.partition_size + 1; + let mut zone_token = HashMap::<String, usize>::new(); + + let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?; + + if zones.is_empty() { + return Ok(false); + } -#[derive(Clone)] -struct PartitionAss<'a> { - nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>, -} + for z in zones.iter() { + zone_token.insert(z.clone(), 0); + } + for uuid in cl.nongateway_nodes().iter() { + let z = cl.get_node_zone(uuid)?; + let c = cl.get_node_capacity(uuid)?; + zone_token.insert( + z.clone(), + zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize), + ); + } -impl<'a> PartitionAss<'a> { - fn new() -> Self { - Self { nodes: Vec::new() } - } + // For every partition, we count the number of zone already associated and + // the name of the last zone associated - fn nplus(&self, other: &PartitionAss<'a>) -> usize { - self.nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .count() - } + let mut id_zone_token = vec![0; zones.len()]; + for (z, t) in zone_token.iter() { + id_zone_token[zone_to_id[z]] = *t; + } - fn txtplus(&self, other: &PartitionAss<'a>) -> String { - let mut nodes = self - .nodes - .iter() - .filter(|x| !other.nodes.contains(x)) - .map(|x| format!("{:?}", x.0)) - .collect::<Vec<_>>(); - nodes.sort(); - if self.nodes.iter().any(|x| other.nodes.contains(x)) { - nodes.push("...".into()); + let mut nb_token = vec![0; NB_PARTITIONS]; + let mut last_zone = vec![zones.len(); NB_PARTITIONS]; + + let mut curr_zone = 0; + + let redundancy = cl.parameters.zone_redundancy; + + for replic in 0..cl.replication_factor { + for p in 0..NB_PARTITIONS { + while id_zone_token[curr_zone] == 0 + || (last_zone[p] == curr_zone + && redundancy - nb_token[p] <= cl.replication_factor - replic) + { + curr_zone += 1; + if curr_zone >= zones.len() { + return Ok(true); + } + } + id_zone_token[curr_zone] -= 1; + if last_zone[p] != curr_zone { + nb_token[p] += 1; + last_zone[p] = curr_zone; + } + } } - format!("[{}]", nodes.join(" ")) - } - fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool { - let min_keep_nodes_per_part = (replication_factor + 1) / 2; - let n_removed = self.nplus(other); + return Ok(false); + } - if self.nodes.len() <= min_keep_nodes_per_part { - n_removed == 0 - } else { - n_removed <= self.nodes.len() - min_keep_nodes_per_part + fn show_msg(msg: &Message) { + for s in msg.iter() { + println!("{}", s); } } - // add is a key function in creating a PartitionAss, i.e. the list of nodes - // to which a partition is assigned. It tries to add a certain node id to the - // assignation, but checks that doing so is compatible with the NECESSARY - // condition that the partition assignation must be dispersed over different - // zones (datacenters) if enough zones exist. This is why it takes a n_zones - // parameter, which is the total number of zones that have existing nodes: - // if nodes in the assignation already cover all n_zones zones, then any node - // that is not yet in the assignation can be added. Otherwise, only nodes - // that are in a new zone can be added. - fn add( - &mut self, - target_len: Option<usize>, - n_zones: usize, - node: &'a Uuid, - role: &'a NodeRole, - ) -> bool { - if let Some(tl) = target_len { - if self.nodes.len() != tl - 1 { - return false; + fn update_layout( + cl: &mut ClusterLayout, + node_id_vec: &Vec<u8>, + node_capacity_vec: &Vec<u64>, + node_zone_vec: &Vec<String>, + zone_redundancy: usize, + ) { + for i in 0..node_id_vec.len() { + if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) { + cl.node_id_vec.push(x); } - } - let p_zns = self - .nodes - .iter() - .map(|(_id, info)| info.unwrap().zone.as_str()) - .collect::<HashSet<&str>>(); - if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str())) - || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node)) - { - self.nodes.push((node, Some(role))); - true - } else { - false + let update = cl.staging_roles.update_mutator( + cl.node_id_vec[i], + NodeRoleV(Some(NodeRole { + zone: (node_zone_vec[i].to_string()), + capacity: (Some(node_capacity_vec[i])), + tags: (vec![]), + })), + ); + cl.staging_roles.merge(&update); } + cl.staging_parameters + .update(LayoutParameters { zone_redundancy }); + cl.staging_hash = cl.calculate_staging_hash(); + } + + #[test] + fn test_assignation() { + let mut node_id_vec = vec![1, 2, 3]; + let mut node_capacity_vec = vec![4000, 1000, 2000]; + let mut node_zone_vec = vec!["A", "B", "C"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + + let mut cl = ClusterLayout::new(3); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(matches!(check_against_naive(&cl), Ok(true))); + + node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; + node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; + node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 2); + let v = cl.version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(matches!(check_against_naive(&cl), Ok(true))); + + node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(matches!(check_against_naive(&cl), Ok(true))); + + node_capacity_vec = vec![ + 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000, + ]; + update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 1); + let v = cl.version; + let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(matches!(check_against_naive(&cl), Ok(true))); } } diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 86f63568..f734942d 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -8,6 +8,7 @@ mod consul; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; +pub mod graph_algo; pub mod layout; pub mod replication_mode; pub mod ring; diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 73a126a2..743a5cba 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -40,6 +40,7 @@ pub struct Ring { // Type to store compactly the id of a node in the system // Change this to u16 the day we want to have more than 256 nodes in a cluster pub type CompactNodeType = u8; +pub const MAX_NODE_NUMBER: usize = 256; // The maximum number of times an object might get replicated // This must be at least 3 because Garage supports 3-way replication diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 949aced6..1ec250c3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -5,7 +5,6 @@ use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; -use futures_util::future::FutureExt; use tokio::select; use tokio::sync::watch; @@ -24,7 +23,6 @@ pub use netapp::message::{ use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -94,7 +92,6 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -104,7 +101,6 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, rpc_timeout: Option<Duration>, ) -> Self { @@ -113,7 +109,6 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - background, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -377,16 +372,13 @@ impl RpcHelper { if !resp_stream.is_empty() { // Continue remaining requests in background. - // Continue the remaining requests immediately using tokio::spawn - // but enqueue a task in the background runner - // to ensure that the process won't exit until the requests are done - // (if we had just enqueued the resp_stream.collect directly in the background runner, - // the requests might have been put on hold in the background runner's queue, - // in which case they might timeout or otherwise fail) - let wait_finished_fut = tokio::spawn(async move { + // Note: these requests can get interrupted on process shutdown, + // we must not count on them being executed for certain. + // For all background things that have to happen with certainty, + // they have to be put in a proper queue that is persisted to disk. + tokio::spawn(async move { resp_stream.collect::<Vec<Result<_, _>>>().await; }); - self.0.background.spawn(wait_finished_fut.map(|_| Ok(()))); } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 2c6f14fd..1f4d86e7 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -21,7 +21,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::background::BackgroundRunner; use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; @@ -50,8 +49,6 @@ pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008 /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; -pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret"; - /// RPC messages related to membership #[derive(Debug, Serialize, Deserialize, Clone)] pub enum SystemRpc { @@ -76,13 +73,17 @@ impl Rpc for SystemRpc { type Response = Result<SystemRpc, Error>; } +#[derive(Serialize, Deserialize)] +pub struct PeerList(Vec<(Uuid, SocketAddr)>); +impl garage_util::migrate::InitialFormat for PeerList {} + /// This node's membership manager pub struct System { /// The id of this node pub id: Uuid, persist_cluster_layout: Persister<ClusterLayout>, - persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>, + persist_peer_list: Persister<PeerList>, local_status: ArcSwap<NodeStatus>, node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>, @@ -110,9 +111,6 @@ pub struct System { pub ring: watch::Receiver<Arc<Ring>>, update_ring: Mutex<watch::Sender<Arc<Ring>>>, - /// The job runner of this node - pub background: Arc<BackgroundRunner>, - /// Path to metadata directory pub metadata_dir: PathBuf, } @@ -232,7 +230,6 @@ impl System { /// Create this node's membership manager pub fn new( network_key: NetworkKey, - background: Arc<BackgroundRunner>, replication_mode: ReplicationMode, config: &Config, ) -> Result<Arc<Self>, Error> { @@ -354,7 +351,6 @@ impl System { rpc: RpcHelper::new( netapp.id.into(), fullmesh, - background.clone(), ring.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), @@ -372,7 +368,6 @@ impl System { ring, update_ring: Mutex::new(update_ring), - background, metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); @@ -444,17 +439,14 @@ impl System { )) })?; let mut errors = vec![]; - for ip in addrs.iter() { - match self - .netapp - .clone() - .try_connect(*ip, pubkey) - .await - .err_context(CONNECT_ERROR_MESSAGE) - { + for addr in addrs.iter() { + match self.netapp.clone().try_connect(*addr, pubkey).await { Ok(()) => return Ok(()), Err(e) => { - errors.push((*ip, e)); + errors.push(( + *addr, + Error::Message(connect_error_message(*addr, pubkey, e)), + )); } } } @@ -529,56 +521,61 @@ impl System { // ---- INTERNALS ---- #[cfg(feature = "consul-discovery")] - async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { + async fn advertise_to_consul(self: Arc<Self>) { let c = match &self.consul_discovery { Some(c) => c, - _ => return Ok(()), + _ => return, }; let rpc_public_addr = match self.rpc_public_addr { Some(addr) => addr, None => { warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected."); - return Ok(()); + return; } }; - c.publish_consul_service( - self.netapp.id, - &self.local_status.load_full().hostname, - rpc_public_addr, - ) - .await - .err_context("Error while publishing Consul service") + if let Err(e) = c + .publish_consul_service( + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + { + error!("Error while publishing Consul service: {}", e); + } } #[cfg(feature = "kubernetes-discovery")] - async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> { + async fn advertise_to_kubernetes(self: Arc<Self>) { let k = match &self.kubernetes_discovery { Some(k) => k, - _ => return Ok(()), + _ => return, }; let rpc_public_addr = match self.rpc_public_addr { Some(addr) => addr, None => { warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected."); - return Ok(()); + return; } }; - publish_kubernetes_node( + if let Err(e) = publish_kubernetes_node( k, self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, ) .await - .err_context("Error while publishing node to kubernetes") + { + error!("Error while publishing node to Kubernetes: {}", e); + } } /// Save network configuration to disc - async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> { + async fn save_cluster_layout(&self) -> Result<(), Error> { let ring: Arc<Ring> = self.ring.borrow().clone(); self.persist_cluster_layout .save_async(&ring.layout) @@ -630,11 +627,7 @@ impl System { if info.cluster_layout_version > local_info.cluster_layout_version || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { - let self2 = self.clone(); - self.background.spawn_cancellable(async move { - self2.pull_cluster_layout(from).await; - Ok(()) - }); + tokio::spawn(self.clone().pull_cluster_layout(from)); } self.node_status @@ -662,9 +655,9 @@ impl System { let update_ring = self.update_ring.lock().await; let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); - let prev_layout_check = layout.check(); + let prev_layout_check = layout.check().is_ok(); if layout.merge(adv) { - if prev_layout_check && !layout.check() { + if prev_layout_check && !layout.check().is_ok() { error!("New cluster layout is invalid, discarding."); return Err(Error::Message( "New cluster layout is invalid, discarding.".into(), @@ -676,18 +669,21 @@ impl System { drop(update_ring); let self2 = self.clone(); - self.background.spawn_cancellable(async move { - self2 + tokio::spawn(async move { + if let Err(e) = self2 .rpc .broadcast( &self2.system_endpoint, SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) - .await?; - Ok(()) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } }); - self.background.spawn(self.clone().save_cluster_layout()); + + self.save_cluster_layout().await?; } Ok(SystemRpc::Ok) @@ -717,7 +713,7 @@ impl System { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { - let not_configured = !self.ring.borrow().layout.check(); + let not_configured = !self.ring.borrow().layout.check().is_ok(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; let expected_n_nodes = self.ring.borrow().layout.num_nodes(); let bad_peers = self @@ -734,7 +730,7 @@ impl System { // Add peer list from list stored on disk if let Ok(peers) = self.persist_peer_list.load_async().await { - ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr))) + ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr))) } // Fetch peer list from Consul @@ -773,12 +769,12 @@ impl System { } for (node_id, node_addr) in ping_list { - tokio::spawn( - self.netapp - .clone() - .try_connect(node_addr, node_id) - .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)), - ); + let self2 = self.clone(); + tokio::spawn(async move { + if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await { + error!("{}", connect_error_message(node_addr, node_id, e)); + } + }); } } @@ -787,11 +783,10 @@ impl System { } #[cfg(feature = "consul-discovery")] - self.background.spawn(self.clone().advertise_to_consul()); + tokio::spawn(self.clone().advertise_to_consul()); #[cfg(feature = "kubernetes-discovery")] - self.background - .spawn(self.clone().advertise_to_kubernetes()); + tokio::spawn(self.clone().advertise_to_kubernetes()); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { @@ -815,12 +810,16 @@ impl System { // and append it to the list we are about to save, // so that no peer ID gets lost in the process. if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await { - prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id)); - peer_list.extend(prev_peer_list); + prev_peer_list + .0 + .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id)); + peer_list.extend(prev_peer_list.0); } // Save new peer list to file - self.persist_peer_list.save_async(&peer_list).await + self.persist_peer_list + .save_async(&PeerList(peer_list)) + .await } async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) { @@ -881,3 +880,11 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { ret } + +fn connect_error_message( + addr: SocketAddr, + pubkey: ed25519::PublicKey, + e: netapp::error::Error, +) -> String { + format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e) +} |