diff options
Diffstat (limited to 'src/rpc/layout')
-rw-r--r-- | src/rpc/layout/graph_algo.rs | 405 | ||||
-rw-r--r-- | src/rpc/layout/helper.rs | 294 | ||||
-rw-r--r-- | src/rpc/layout/history.rs | 306 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 378 | ||||
-rw-r--r-- | src/rpc/layout/mod.rs | 478 | ||||
-rw-r--r-- | src/rpc/layout/test.rs | 157 | ||||
-rw-r--r-- | src/rpc/layout/version.rs | 846 |
7 files changed, 2864 insertions, 0 deletions
diff --git a/src/rpc/layout/graph_algo.rs b/src/rpc/layout/graph_algo.rs new file mode 100644 index 00000000..bd33e97f --- /dev/null +++ b/src/rpc/layout/graph_algo.rs @@ -0,0 +1,405 @@ +//! This module deals with graph algorithms. +//! It is used in layout.rs to build the partition to node assignment. + +use rand::prelude::{SeedableRng, SliceRandom}; +use std::cmp::{max, min}; +use std::collections::HashMap; +use std::collections::VecDeque; + +/// Vertex data structures used in all the graphs used in layout.rs. +/// usize parameters correspond to node/zone/partitions ids. +/// To understand the vertex roles below, please refer to the formal description +/// of the layout computation algorithm. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Vertex { + Source, + Pup(usize), // The vertex p+ of partition p + Pdown(usize), // The vertex p- of partition p + PZ(usize, usize), // The vertex corresponding to x_(partition p, zone z) + N(usize), // The vertex corresponding to node n + Sink, +} + +/// Edge data structure for the flow algorithm. +#[derive(Clone, Copy, Debug)] +pub struct FlowEdge { + cap: u64, // flow maximal capacity of the edge + flow: i64, // flow value on the edge + dest: usize, // destination vertex id + rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v +} + +/// Edge data structure for the detection of negative cycles. +#[derive(Clone, Copy, Debug)] +pub struct WeightedEdge { + w: i64, // weight of the edge + dest: usize, +} + +pub trait Edge: Clone + Copy {} +impl Edge for FlowEdge {} +impl Edge for WeightedEdge {} + +/// Struct for the graph structure. We do encapsulation here to be able to both +/// provide user friendly Vertex enum to address vertices, and to use internally usize +/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed. +pub struct Graph<E: Edge> { + vertex_to_id: HashMap<Vertex, usize>, + id_to_vertex: Vec<Vertex>, + + // The graph is stored as an adjacency list + graph: Vec<Vec<E>>, +} + +pub type CostFunction = HashMap<(Vertex, Vertex), i64>; + +impl<E: Edge> Graph<E> { + pub fn new(vertices: &[Vertex]) -> Self { + let mut map = HashMap::<Vertex, usize>::new(); + for (i, vert) in vertices.iter().enumerate() { + map.insert(*vert, i); + } + Graph::<E> { + vertex_to_id: map, + id_to_vertex: vertices.to_vec(), + graph: vec![Vec::<E>::new(); vertices.len()], + } + } + + fn get_vertex_id(&self, v: &Vertex) -> Result<usize, String> { + self.vertex_to_id + .get(v) + .cloned() + .ok_or_else(|| format!("The graph does not contain vertex {:?}", v)) + } +} + +impl Graph<FlowEdge> { + /// This function adds a directed edge to the graph with capacity c, and the + /// corresponding reversed edge with capacity 0. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> { + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; + if idu == idv { + return Err("Cannot add edge from vertex to itself in flow graph".into()); + } + + let rev_u = self.graph[idu].len(); + let rev_v = self.graph[idv].len(); + self.graph[idu].push(FlowEdge { + cap: c, + dest: idv, + flow: 0, + rev: rev_v, + }); + self.graph[idv].push(FlowEdge { + cap: 0, + dest: idu, + flow: 0, + rev: rev_u, + }); + Ok(()) + } + + /// This function returns the list of vertices that receive a positive flow from + /// vertex v. + pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> { + let idv = self.get_vertex_id(&v)?; + let mut result = Vec::<Vertex>::new(); + for edge in self.graph[idv].iter() { + if edge.flow > 0 { + result.push(self.id_to_vertex[edge.dest]); + } + } + Ok(result) + } + + /// This function returns the value of the flow outgoing from v. + pub fn get_outflow(&self, v: Vertex) -> Result<i64, String> { + let idv = self.get_vertex_id(&v)?; + let mut result = 0; + for edge in self.graph[idv].iter() { + result += max(0, edge.flow); + } + Ok(result) + } + + /// This function computes the flow total value by computing the outgoing flow + /// from the source. + pub fn get_flow_value(&mut self) -> Result<i64, String> { + self.get_outflow(Vertex::Source) + } + + /// This function shuffles the order of the edge lists. It keeps the ids of the + /// reversed edges consistent. + fn shuffle_edges(&mut self) { + // We use deterministic randomness so that the layout calculation algorihtm + // will output the same thing every time it is run. This way, the results + // pre-calculated in `garage layout show` will match exactly those used + // in practice with `garage layout apply` + let mut rng = rand::rngs::StdRng::from_seed([0x12u8; 32]); + for i in 0..self.graph.len() { + self.graph[i].shuffle(&mut rng); + // We need to update the ids of the reverse edges. + for j in 0..self.graph[i].len() { + let target_v = self.graph[i][j].dest; + let target_rev = self.graph[i][j].rev; + self.graph[target_v][target_rev].rev = j; + } + } + } + + /// Computes an upper bound of the flow on the graph + pub fn flow_upper_bound(&self) -> Result<u64, String> { + let idsource = self.get_vertex_id(&Vertex::Source)?; + let mut flow_upper_bound = 0; + for edge in self.graph[idsource].iter() { + flow_upper_bound += edge.cap; + } + Ok(flow_upper_bound) + } + + /// This function computes the maximal flow using Dinic's algorithm. It starts with + /// the flow values already present in the graph. So it is possible to add some edge to + /// the graph, compute a flow, add other edges, update the flow. + pub fn compute_maximal_flow(&mut self) -> Result<(), String> { + let idsource = self.get_vertex_id(&Vertex::Source)?; + let idsink = self.get_vertex_id(&Vertex::Sink)?; + + let nb_vertices = self.graph.len(); + + let flow_upper_bound = self.flow_upper_bound()?; + + // To ensure the dispersion of the associations generated by the + // assignment, we shuffle the neighbours of the nodes. Hence, + // the vertices do not consider their neighbours in the same order. + self.shuffle_edges(); + + // We run Dinic's max flow algorithm + loop { + // We build the level array from Dinic's algorithm. + let mut level = vec![None; nb_vertices]; + + let mut fifo = VecDeque::new(); + fifo.push_back((idsource, 0)); + while let Some((id, lvl)) = fifo.pop_front() { + if level[id].is_none() { + // it means id has not yet been reached + level[id] = Some(lvl); + for edge in self.graph[id].iter() { + if edge.cap as i64 - edge.flow > 0 { + fifo.push_back((edge.dest, lvl + 1)); + } + } + } + } + if level[idsink].is_none() { + // There is no residual flow + break; + } + // Now we run DFS respecting the level array + let mut next_nbd = vec![0; nb_vertices]; + let mut lifo = Vec::new(); + + lifo.push((idsource, flow_upper_bound)); + + while let Some((id, f)) = lifo.last().cloned() { + if id == idsink { + // The DFS reached the sink, we can add a + // residual flow. + lifo.pop(); + while let Some((id, _)) = lifo.pop() { + let nbd = next_nbd[id]; + self.graph[id][nbd].flow += f as i64; + let id_rev = self.graph[id][nbd].dest; + let nbd_rev = self.graph[id][nbd].rev; + self.graph[id_rev][nbd_rev].flow -= f as i64; + } + lifo.push((idsource, flow_upper_bound)); + continue; + } + // else we did not reach the sink + let nbd = next_nbd[id]; + if nbd >= self.graph[id].len() { + // There is nothing to explore from id anymore + lifo.pop(); + if let Some((parent, _)) = lifo.last() { + next_nbd[*parent] += 1; + } + continue; + } + // else we can try to send flow from id to its nbd + let new_flow = min( + f as i64, + self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow, + ) as u64; + if new_flow == 0 { + next_nbd[id] += 1; + continue; + } + if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) { + if lvldest <= lvlid { + // We cannot send flow to nbd. + next_nbd[id] += 1; + continue; + } + } + // otherwise, we send flow to nbd. + lifo.push((self.graph[id][nbd].dest, new_flow)); + } + } + Ok(()) + } + + /// This function takes a flow, and a cost function on the edges, and tries to find an + /// equivalent flow with a better cost, by finding improving overflow cycles. It uses + /// as subroutine the Bellman Ford algorithm run up to path_length. + /// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and + /// only one needs to be present in the cost function. + pub fn optimize_flow_with_cost( + &mut self, + cost: &CostFunction, + path_length: usize, + ) -> Result<(), String> { + // We build the weighted graph g where we will look for negative cycle + let mut gf = self.build_cost_graph(cost)?; + let mut cycles = gf.list_negative_cycles(path_length); + while !cycles.is_empty() { + // we enumerate negative cycles + for c in cycles.iter() { + for i in 0..c.len() { + // We add one flow unit to the edge (u,v) of cycle c + let idu = self.vertex_to_id[&c[i]]; + let idv = self.vertex_to_id[&c[(i + 1) % c.len()]]; + for j in 0..self.graph[idu].len() { + // since idu appears at most once in the cycles, we enumerate every + // edge at most once. + let edge = self.graph[idu][j]; + if edge.dest == idv { + self.graph[idu][j].flow += 1; + self.graph[idv][edge.rev].flow -= 1; + break; + } + } + } + } + + gf = self.build_cost_graph(cost)?; + cycles = gf.list_negative_cycles(path_length); + } + Ok(()) + } + + /// Construct the weighted graph G_f from the flow and the cost function + fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> { + let mut g = Graph::<WeightedEdge>::new(&self.id_to_vertex); + let nb_vertices = self.id_to_vertex.len(); + for i in 0..nb_vertices { + for edge in self.graph[i].iter() { + if edge.cap as i64 - edge.flow > 0 { + // It is possible to send overflow through this edge + let u = self.id_to_vertex[i]; + let v = self.id_to_vertex[edge.dest]; + if cost.contains_key(&(u, v)) { + g.add_edge(u, v, cost[&(u, v)])?; + } else if cost.contains_key(&(v, u)) { + g.add_edge(u, v, -cost[&(v, u)])?; + } else { + g.add_edge(u, v, 0)?; + } + } + } + } + Ok(g) + } +} + +impl Graph<WeightedEdge> { + /// This function adds a single directed weighted edge to the graph. + pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> { + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; + self.graph[idu].push(WeightedEdge { w, dest: idv }); + Ok(()) + } + + /// This function lists the negative cycles it manages to find after path_length + /// iterations of the main loop of the Bellman-Ford algorithm. For the classical + /// algorithm, path_length needs to be equal to the number of vertices. However, + /// for particular graph structures like in our case, the algorithm is still correct + /// when path_length is the length of the longest possible simple path. + /// See the formal description of the algorithm for more details. + fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> { + let nb_vertices = self.graph.len(); + + // We start with every vertex at distance 0 of some imaginary extra -1 vertex. + let mut distance = vec![0; nb_vertices]; + // The prev vector collects for every vertex from where does the shortest path come + let mut prev = vec![None; nb_vertices]; + + for _ in 0..path_length + 1 { + for id in 0..nb_vertices { + for e in self.graph[id].iter() { + if distance[id] + e.w < distance[e.dest] { + distance[e.dest] = distance[id] + e.w; + prev[e.dest] = Some(id); + } + } + } + } + + // If self.graph contains a negative cycle, then at this point the graph described + // by prev (which is a directed 1-forest/functional graph) + // must contain a cycle. We list the cycles of prev. + let cycles_prev = cycles_of_1_forest(&prev); + + // Remark that the cycle in prev is in the reverse order compared to the cycle + // in the graph. Thus the .rev(). + return cycles_prev + .iter() + .map(|cycle| { + cycle + .iter() + .rev() + .map(|id| self.id_to_vertex[*id]) + .collect() + }) + .collect(); + } +} + +/// This function returns the list of cycles of a directed 1 forest. It does not +/// check for the consistency of the input. +fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> { + let mut cycles = Vec::<Vec<usize>>::new(); + let mut time_of_discovery = vec![None; forest.len()]; + + for t in 0..forest.len() { + let mut id = t; + // while we are on a valid undiscovered node + while time_of_discovery[id].is_none() { + time_of_discovery[id] = Some(t); + if let Some(i) = forest[id] { + id = i; + } else { + break; + } + } + if forest[id].is_some() && time_of_discovery[id] == Some(t) { + // We discovered an id that we explored at this iteration t. + // It means we are on a cycle + let mut cy = vec![id; 1]; + let mut id2 = id; + while let Some(id_next) = forest[id2] { + id2 = id_next; + if id2 != id { + cy.push(id2); + } else { + break; + } + } + cycles.push(cy); + } + } + cycles +} diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs new file mode 100644 index 00000000..9fb738ea --- /dev/null +++ b/src/rpc/layout/helper.rs @@ -0,0 +1,294 @@ +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +use super::*; +use crate::replication_mode::ReplicationMode; + +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +pub struct RpcLayoutDigest { + /// Cluster layout version + pub current_version: u64, + /// Number of active layout versions + pub active_versions: usize, + /// Hash of cluster layout update trackers + pub trackers_hash: Hash, + /// Hash of cluster layout staging data + pub staging_hash: Hash, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct SyncLayoutDigest { + current: u64, + ack_map_min: u64, + min_stored: u64, +} + +pub struct LayoutHelper { + replication_mode: ReplicationMode, + layout: Option<LayoutHistory>, + + // cached values + ack_map_min: u64, + sync_map_min: u64, + + all_nodes: Vec<Uuid>, + all_nongateway_nodes: Vec<Uuid>, + + trackers_hash: Hash, + staging_hash: Hash, + + // ack lock: counts in-progress write operations for each + // layout version ; we don't increase the ack update tracker + // while this lock is nonzero + pub(crate) ack_lock: HashMap<u64, AtomicUsize>, +} + +impl Deref for LayoutHelper { + type Target = LayoutHistory; + fn deref(&self) -> &LayoutHistory { + self.layout() + } +} + +impl LayoutHelper { + pub fn new( + replication_mode: ReplicationMode, + mut layout: LayoutHistory, + mut ack_lock: HashMap<u64, AtomicUsize>, + ) -> Self { + // In the new() function of the helper, we do a bunch of cleanup + // and calculations on the layout history to make sure things are + // correct and we have rapid access to important values such as + // the layout versions to use when reading to ensure consistency. + + if !replication_mode.is_read_after_write_consistent() { + // Fast path for when no consistency is required. + // In this case we only need to keep the last version of the layout, + // we don't care about coordinating stuff in the cluster. + layout.keep_current_version_only(); + } + + layout.cleanup_old_versions(); + + let all_nodes = layout.get_all_nodes(); + let all_nongateway_nodes = layout.get_all_nongateway_nodes(); + + layout.clamp_update_trackers(&all_nodes); + + let min_version = layout.min_stored(); + + // ack_map_min is the minimum value of ack_map among all nodes + // in the cluster (gateway, non-gateway, current and previous layouts). + // It is the highest layout version which all of these nodes have + // acknowledged, indicating that they are aware of it and are no + // longer processing write operations that did not take it into account. + let ack_map_min = layout + .update_trackers + .ack_map + .min_among(&all_nodes, min_version); + + // sync_map_min is the minimum value of sync_map among storage nodes + // in the cluster (non-gateway nodes only, current and previous layouts). + // It is the highest layout version for which we know that all relevant + // storage nodes have fullfilled a sync, and therefore it is safe to + // use a read quorum within that layout to ensure consistency. + // Gateway nodes are excluded here because they hold no relevant data + // (they store the bucket and access key tables, but we don't have + // consistency on those). + // This value is calculated using quorums to allow progress even + // if not all nodes have successfully completed a sync. + let sync_map_min = + layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes); + + let trackers_hash = layout.calculate_trackers_hash(); + let staging_hash = layout.calculate_staging_hash(); + + ack_lock.retain(|_, cnt| *cnt.get_mut() > 0); + ack_lock + .entry(layout.current().version) + .or_insert(AtomicUsize::new(0)); + + LayoutHelper { + replication_mode, + layout: Some(layout), + ack_map_min, + sync_map_min, + all_nodes, + all_nongateway_nodes, + trackers_hash, + staging_hash, + ack_lock, + } + } + + // ------------------ single updating function -------------- + + fn layout(&self) -> &LayoutHistory { + self.layout.as_ref().unwrap() + } + + pub(crate) fn update<F>(&mut self, f: F) -> bool + where + F: FnOnce(&mut LayoutHistory) -> bool, + { + let changed = f(self.layout.as_mut().unwrap()); + if changed { + *self = Self::new( + self.replication_mode, + self.layout.take().unwrap(), + std::mem::take(&mut self.ack_lock), + ); + } + changed + } + + // ------------------ read helpers --------------- + + pub fn all_nodes(&self) -> &[Uuid] { + &self.all_nodes + } + + pub fn all_nongateway_nodes(&self) -> &[Uuid] { + &self.all_nongateway_nodes + } + + pub fn ack_map_min(&self) -> u64 { + self.ack_map_min + } + + pub fn sync_map_min(&self) -> u64 { + self.sync_map_min + } + + pub fn sync_digest(&self) -> SyncLayoutDigest { + SyncLayoutDigest { + current: self.layout().current().version, + ack_map_min: self.ack_map_min(), + min_stored: self.layout().min_stored(), + } + } + + pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { + let sync_min = self.sync_map_min; + let version = self + .layout() + .versions + .iter() + .find(|x| x.version == sync_min) + .or(self.layout().versions.last()) + .unwrap(); + version + .nodes_of(position, version.replication_factor) + .collect() + } + + pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { + self.layout() + .versions + .iter() + .map(|x| x.nodes_of(position, x.replication_factor).collect()) + .collect() + } + + pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> { + let mut ret = vec![]; + for version in self.layout().versions.iter() { + ret.extend(version.nodes_of(position, version.replication_factor)); + } + ret.sort(); + ret.dedup(); + ret + } + + pub fn trackers_hash(&self) -> Hash { + self.trackers_hash + } + + pub fn staging_hash(&self) -> Hash { + self.staging_hash + } + + pub fn digest(&self) -> RpcLayoutDigest { + RpcLayoutDigest { + current_version: self.current().version, + active_versions: self.versions.len(), + trackers_hash: self.trackers_hash, + staging_hash: self.staging_hash, + } + } + + // ------------------ helpers for update tracking --------------- + + pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version which is not currently + // locked by an in-progress write operation + self.ack_max_free(local_node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(local_node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(local_node_id); + + debug!("ack_map: {:?}", self.update_trackers.ack_map); + debug!("sync_map: {:?}", self.update_trackers.sync_map); + debug!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + } + + fn sync_first(&mut self, local_node_id: Uuid) { + let first_version = self.min_stored(); + self.update(|layout| { + layout + .update_trackers + .sync_map + .set_max(local_node_id, first_version) + }); + } + + fn sync_ack(&mut self, local_node_id: Uuid) { + let sync_map_min = self.sync_map_min; + self.update(|layout| { + layout + .update_trackers + .sync_ack_map + .set_max(local_node_id, sync_map_min) + }); + } + + pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { + let max_ack = self.max_free_ack(); + let changed = self.update(|layout| { + layout + .update_trackers + .ack_map + .set_max(local_node_id, max_ack) + }); + if changed { + info!("ack_until updated to {}", max_ack); + } + changed + } + + pub(crate) fn max_free_ack(&self) -> u64 { + self.layout() + .versions + .iter() + .map(|x| x.version) + .skip_while(|v| { + self.ack_lock + .get(v) + .map(|x| x.load(Ordering::Relaxed) == 0) + .unwrap_or(true) + }) + .next() + .unwrap_or(self.current().version) + } +} diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs new file mode 100644 index 00000000..b8cc27da --- /dev/null +++ b/src/rpc/layout/history.rs @@ -0,0 +1,306 @@ +use std::collections::HashSet; + +use garage_util::crdt::{Crdt, Lww, LwwMap}; +use garage_util::data::*; +use garage_util::encode::nonversioned_encode; +use garage_util::error::*; + +use super::*; +use crate::replication_mode::ReplicationMode; + +impl LayoutHistory { + pub fn new(replication_factor: usize) -> Self { + let version = LayoutVersion::new(replication_factor); + + let staging = LayoutStaging { + parameters: Lww::<LayoutParameters>::new(version.parameters), + roles: LwwMap::new(), + }; + + LayoutHistory { + versions: vec![version], + old_versions: vec![], + update_trackers: Default::default(), + staging: Lww::raw(0, staging), + } + } + + // ------------------ who stores what now? --------------- + + pub fn current(&self) -> &LayoutVersion { + self.versions.last().as_ref().unwrap() + } + + pub fn min_stored(&self) -> u64 { + self.versions.first().as_ref().unwrap().version + } + + pub fn get_all_nodes(&self) -> Vec<Uuid> { + if self.versions.len() == 1 { + self.versions[0].all_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .flat_map(|x| x.all_nodes()) + .collect::<HashSet<_>>(); + set.into_iter().copied().collect::<Vec<_>>() + } + } + + pub(crate) fn get_all_nongateway_nodes(&self) -> Vec<Uuid> { + if self.versions.len() == 1 { + self.versions[0].nongateway_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .flat_map(|x| x.nongateway_nodes()) + .collect::<HashSet<_>>(); + set.into_iter().copied().collect::<Vec<_>>() + } + } + + // ---- housekeeping (all invoked by LayoutHelper) ---- + + pub(crate) fn keep_current_version_only(&mut self) { + while self.versions.len() > 1 { + let removed = self.versions.remove(0); + self.old_versions.push(removed); + } + } + + pub(crate) fn cleanup_old_versions(&mut self) { + // If there are invalid versions before valid versions, remove them + if self.versions.len() > 1 && self.current().check().is_ok() { + while self.versions.len() > 1 && self.versions.first().unwrap().check().is_err() { + let removed = self.versions.remove(0); + info!( + "Layout history: pruning old invalid version {}", + removed.version + ); + } + } + + // If there are old versions that no one is reading from anymore, + // remove them (keep them in self.old_versions). + // ASSUMPTION: we only care about where nodes in the current layout version + // are reading from, as we assume older nodes are being discarded. + let current_nodes = &self.current().node_id_vec; + let min_version = self.min_stored(); + let sync_ack_map_min = self + .update_trackers + .sync_ack_map + .min_among(current_nodes, min_version); + while self.min_stored() < sync_ack_map_min { + assert!(self.versions.len() > 1); + let removed = self.versions.remove(0); + info!( + "Layout history: moving version {} to old_versions", + removed.version + ); + self.old_versions.push(removed); + } + + while self.old_versions.len() > OLD_VERSION_COUNT { + let removed = self.old_versions.remove(0); + info!("Layout history: removing old_version {}", removed.version); + } + } + + pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { + let min_v = self.min_stored(); + for node in nodes { + self.update_trackers.ack_map.set_max(*node, min_v); + self.update_trackers.sync_map.set_max(*node, min_v); + self.update_trackers.sync_ack_map.set_max(*node, min_v); + } + } + + pub(crate) fn calculate_sync_map_min_with_quorum( + &self, + replication_mode: ReplicationMode, + all_nongateway_nodes: &[Uuid], + ) -> u64 { + // This function calculates the minimum layout version from which + // it is safe to read if we want to maintain read-after-write consistency. + // In the general case the computation can be a bit expensive so + // we try to optimize it in several ways. + + // If there is only one layout version, we know that's the one + // we need to read from. + if self.versions.len() == 1 { + return self.current().version; + } + + let quorum = replication_mode.write_quorum(); + + let min_version = self.min_stored(); + let global_min = self + .update_trackers + .sync_map + .min_among(all_nongateway_nodes, min_version); + + // If the write quorums are equal to the total number of nodes, + // i.e. no writes can succeed while they are not written to all nodes, + // then we must in all case wait for all nodes to complete a sync. + // This is represented by reading from the layout with version + // number global_min, the smallest layout version for which all nodes + // have completed a sync. + if quorum == self.current().replication_factor { + return global_min; + } + + // In the general case, we need to look at all write sets for all partitions, + // and find a safe layout version to read for that partition. We then + // take the minimum value among all partition as the safe layout version + // to read in all cases (the layout version to which all reads are directed). + let mut current_min = self.current().version; + let mut sets_done = HashSet::<Vec<Uuid>>::new(); + + for (_, p_hash) in self.current().partitions() { + for v in self.versions.iter() { + if v.version == self.current().version { + // We don't care about whether nodes in the latest layout version + // have completed a sync or not, as the sync is push-only + // and by definition nodes in the latest layout version do not + // hold data that must be pushed to nodes in the latest layout + // version, since that's the same version (any data that's + // already in the latest version is assumed to have been written + // by an operation that ensured a quorum of writes within + // that version). + continue; + } + + // Determine set of nodes for partition p in layout version v. + // Sort the node set to avoid duplicate computations. + let mut set = v + .nodes_of(&p_hash, v.replication_factor) + .collect::<Vec<Uuid>>(); + set.sort(); + + // If this set was already processed, skip it. + if sets_done.contains(&set) { + continue; + } + + // Find the value of the sync update trackers that is the + // highest possible minimum within a quorum of nodes. + let mut sync_values = set + .iter() + .map(|x| self.update_trackers.sync_map.get(x, min_version)) + .collect::<Vec<_>>(); + sync_values.sort(); + let set_min = sync_values[sync_values.len() - quorum]; + if set_min < current_min { + current_min = set_min; + } + // defavorable case, we know we are at the smallest possible version, + // so we can stop early + assert!(current_min >= global_min); + if current_min == global_min { + return current_min; + } + + // Add set to already processed sets + sets_done.insert(set); + } + } + + current_min + } + + pub(crate) fn calculate_trackers_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) + } + + pub(crate) fn calculate_staging_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) + } + + // ================== updates to layout, public interface =================== + + pub fn merge(&mut self, other: &LayoutHistory) -> bool { + let mut changed = false; + + // Add any new versions to history + for v2 in other.versions.iter() { + if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) { + // Version is already present, check consistency + if v1 != v2 { + error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); + } + } else if self.versions.iter().all(|v| v.version != v2.version - 1) { + error!( + "Cannot receive new layout version {}, version {} is missing", + v2.version, + v2.version - 1 + ); + } else { + self.versions.push(v2.clone()); + changed = true; + } + } + + // Merge trackers + let c = self.update_trackers.merge(&other.update_trackers); + changed = changed || c; + + // Merge staged layout changes + if self.staging != other.staging { + let prev_staging = self.staging.clone(); + self.staging.merge(&other.staging); + changed = changed || self.staging != prev_staging; + } + + changed + } + + pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.current().version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + // Compute new version and add it to history + let (new_version, msg) = self + .current() + .clone() + .calculate_next_version(self.staging.get())?; + + self.versions.push(new_version); + self.cleanup_old_versions(); + + // Reset the staged layout changes + self.staging.update(LayoutStaging { + parameters: self.staging.get().parameters.clone(), + roles: LwwMap::new(), + }); + + Ok((self, msg)) + } + + pub fn revert_staged_changes(mut self) -> Result<Self, Error> { + self.staging.update(LayoutStaging { + parameters: Lww::new(self.current().parameters), + roles: LwwMap::new(), + }); + + Ok(self) + } + + pub fn check(&self) -> Result<(), String> { + // TODO: anything more ? + self.current().check() + } +} diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs new file mode 100644 index 00000000..6747b79d --- /dev/null +++ b/src/rpc/layout/manager.rs @@ -0,0 +1,378 @@ +use std::collections::HashMap; +use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard}; +use std::time::Duration; + +use tokio::sync::Notify; + +use netapp::endpoint::Endpoint; +use netapp::peering::fullmesh::FullMeshPeeringStrategy; +use netapp::NodeID; + +use garage_util::config::Config; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::persister::Persister; + +use super::*; +use crate::replication_mode::ReplicationMode; +use crate::rpc_helper::*; +use crate::system::*; + +pub struct LayoutManager { + node_id: Uuid, + replication_mode: ReplicationMode, + persist_cluster_layout: Persister<LayoutHistory>, + + layout: Arc<RwLock<LayoutHelper>>, + pub(crate) change_notify: Arc<Notify>, + + table_sync_version: Mutex<HashMap<String, u64>>, + + pub(crate) rpc_helper: RpcHelper, + system_endpoint: Arc<Endpoint<SystemRpc, System>>, +} + +impl LayoutManager { + pub fn new( + config: &Config, + node_id: NodeID, + system_endpoint: Arc<Endpoint<SystemRpc, System>>, + fullmesh: Arc<FullMeshPeeringStrategy>, + replication_mode: ReplicationMode, + ) -> Result<Arc<Self>, Error> { + let replication_factor = replication_mode.replication_factor(); + + let persist_cluster_layout: Persister<LayoutHistory> = + Persister::new(&config.metadata_dir, "cluster_layout"); + + let cluster_layout = match persist_cluster_layout.load() { + Ok(x) => { + if x.current().replication_factor != replication_mode.replication_factor() { + return Err(Error::Message(format!( + "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", + x.current().replication_factor, + replication_factor + ))); + } + x + } + Err(e) => { + info!( + "No valid previous cluster layout stored ({}), starting fresh.", + e + ); + LayoutHistory::new(replication_factor) + } + }; + + let mut cluster_layout = + LayoutHelper::new(replication_mode, cluster_layout, Default::default()); + cluster_layout.update_trackers(node_id.into()); + + let layout = Arc::new(RwLock::new(cluster_layout)); + let change_notify = Arc::new(Notify::new()); + + let rpc_helper = RpcHelper::new( + node_id.into(), + fullmesh, + layout.clone(), + config.rpc_timeout_msec.map(Duration::from_millis), + ); + + Ok(Arc::new(Self { + node_id: node_id.into(), + replication_mode, + persist_cluster_layout, + layout, + change_notify, + table_sync_version: Mutex::new(HashMap::new()), + system_endpoint, + rpc_helper, + })) + } + + // ---- PUBLIC INTERFACE ---- + + pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHelper> { + self.layout.read().unwrap() + } + + pub async fn update_cluster_layout( + self: &Arc<Self>, + layout: &LayoutHistory, + ) -> Result<(), Error> { + self.handle_advertise_cluster_layout(layout).await?; + Ok(()) + } + + pub fn add_table(&self, table_name: &'static str) { + let first_version = self.layout().versions.first().unwrap().version; + + self.table_sync_version + .lock() + .unwrap() + .insert(table_name.to_string(), first_version); + } + + pub fn sync_table_until(self: &Arc<Self>, table_name: &'static str, version: u64) { + let mut table_sync_version = self.table_sync_version.lock().unwrap(); + *table_sync_version.get_mut(table_name).unwrap() = version; + let sync_until = table_sync_version.iter().map(|(_, v)| *v).min().unwrap(); + drop(table_sync_version); + + let mut layout = self.layout.write().unwrap(); + if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) { + info!("sync_until updated to {}", sync_until); + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( + layout.update_trackers.clone(), + )); + } + } + + fn ack_new_version(self: &Arc<Self>) { + let mut layout = self.layout.write().unwrap(); + if layout.ack_max_free(self.node_id) { + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( + layout.update_trackers.clone(), + )); + } + } + + // ---- ACK LOCKING ---- + + pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> { + let layout = self.layout(); + let version = layout.current().version; + let nodes = layout.storage_sets_of(position); + layout + .ack_lock + .get(&version) + .unwrap() + .fetch_add(1, Ordering::Relaxed); + WriteLock::new(version, self, nodes) + } + + // ---- INTERNALS --- + + fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { + let mut layout = self.layout.write().unwrap(); + let prev_digest = layout.digest(); + let prev_layout_check = layout.check().is_ok(); + + if !prev_layout_check || adv.check().is_ok() { + if layout.update(|l| l.merge(adv)) { + layout.update_trackers(self.node_id); + if prev_layout_check && layout.check().is_err() { + panic!("Merged two correct layouts and got an incorrect layout."); + } + assert!(layout.digest() != prev_digest); + return Some(layout.clone()); + } + } + + None + } + + fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> { + let mut layout = self.layout.write().unwrap(); + let prev_digest = layout.digest(); + + if layout.update_trackers != *adv { + if layout.update(|l| l.update_trackers.merge(adv)) { + layout.update_trackers(self.node_id); + assert!(layout.digest() != prev_digest); + return Some(layout.update_trackers.clone()); + } + } + + None + } + + async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) { + let resp = self + .rpc_helper + .call( + &self.system_endpoint, + peer, + SystemRpc::PullClusterLayout, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await; + if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { + if let Err(e) = self.handle_advertise_cluster_layout(&layout).await { + warn!("In pull_cluster_layout: {}", e); + } + } + } + + async fn pull_cluster_layout_trackers(self: &Arc<Self>, peer: Uuid) { + let resp = self + .rpc_helper + .call( + &self.system_endpoint, + peer, + SystemRpc::PullClusterLayoutTrackers, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await; + if let Ok(SystemRpc::AdvertiseClusterLayoutTrackers(trackers)) = resp { + if let Err(e) = self + .handle_advertise_cluster_layout_trackers(&trackers) + .await + { + warn!("In pull_cluster_layout_trackers: {}", e); + } + } + } + + /// Save cluster layout data to disk + async fn save_cluster_layout(&self) -> Result<(), Error> { + let layout = self.layout.read().unwrap().clone(); + self.persist_cluster_layout + .save_async(&layout) + .await + .expect("Cannot save current cluster layout"); + Ok(()) + } + + fn broadcast_update(self: &Arc<Self>, rpc: SystemRpc) { + tokio::spawn({ + let this = self.clone(); + async move { + if let Err(e) = this + .rpc_helper + .broadcast( + &this.system_endpoint, + rpc, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } + } + }); + } + + // ---- RPC HANDLERS ---- + + pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &RpcLayoutDigest) { + let local = self.layout().digest(); + if remote.current_version > local.current_version + || remote.active_versions != local.active_versions + || remote.staging_hash != local.staging_hash + { + tokio::spawn({ + let this = self.clone(); + async move { this.pull_cluster_layout(from).await } + }); + } else if remote.trackers_hash != local.trackers_hash { + tokio::spawn({ + let this = self.clone(); + async move { this.pull_cluster_layout_trackers(from).await } + }); + } + } + + pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { + let layout = self.layout.read().unwrap().clone(); + SystemRpc::AdvertiseClusterLayout(layout) + } + + pub(crate) fn handle_pull_cluster_layout_trackers(&self) -> SystemRpc { + let layout = self.layout.read().unwrap(); + SystemRpc::AdvertiseClusterLayoutTrackers(layout.update_trackers.clone()) + } + + pub(crate) async fn handle_advertise_cluster_layout( + self: &Arc<Self>, + adv: &LayoutHistory, + ) -> Result<SystemRpc, Error> { + debug!( + "handle_advertise_cluster_layout: {} versions, last={}, trackers={:?}", + adv.versions.len(), + adv.current().version, + adv.update_trackers + ); + + if adv.current().replication_factor != self.replication_mode.replication_factor() { + let msg = format!( + "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", + adv.current().replication_factor, + self.replication_mode.replication_factor() + ); + error!("{}", msg); + return Err(Error::Message(msg)); + } + + if let Some(new_layout) = self.merge_layout(adv) { + debug!("handle_advertise_cluster_layout: some changes were added to the current stuff"); + + self.change_notify.notify_waiters(); + self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout)); + self.save_cluster_layout().await?; + } + + Ok(SystemRpc::Ok) + } + + pub(crate) async fn handle_advertise_cluster_layout_trackers( + self: &Arc<Self>, + trackers: &UpdateTrackers, + ) -> Result<SystemRpc, Error> { + debug!("handle_advertise_cluster_layout_trackers: {:?}", trackers); + + if let Some(new_trackers) = self.merge_layout_trackers(trackers) { + self.change_notify.notify_waiters(); + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers)); + self.save_cluster_layout().await?; + } + + Ok(SystemRpc::Ok) + } +} + +// ---- ack lock ---- + +pub struct WriteLock<T> { + layout_version: u64, + layout_manager: Arc<LayoutManager>, + value: T, +} + +impl<T> WriteLock<T> { + fn new(version: u64, layout_manager: &Arc<LayoutManager>, value: T) -> Self { + Self { + layout_version: version, + layout_manager: layout_manager.clone(), + value, + } + } +} + +impl<T> AsRef<T> for WriteLock<T> { + fn as_ref(&self) -> &T { + &self.value + } +} + +impl<T> AsMut<T> for WriteLock<T> { + fn as_mut(&mut self) -> &mut T { + &mut self.value + } +} + +impl<T> Drop for WriteLock<T> { + fn drop(&mut self) { + let layout = self.layout_manager.layout(); // acquire read lock + if let Some(counter) = layout.ack_lock.get(&self.layout_version) { + let prev_lock = counter.fetch_sub(1, Ordering::Relaxed); + if prev_lock == 1 && layout.current().version > self.layout_version { + drop(layout); // release read lock, write lock will be acquired + self.layout_manager.ack_new_version(); + } + } else { + error!("Could not find ack lock counter for layout version {}. This probably indicates a bug in Garage.", self.layout_version); + } + } +} diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs new file mode 100644 index 00000000..33676c37 --- /dev/null +++ b/src/rpc/layout/mod.rs @@ -0,0 +1,478 @@ +use std::fmt; + +use bytesize::ByteSize; + +use garage_util::crdt::{AutoCrdt, Crdt}; +use garage_util::data::Uuid; + +mod graph_algo; +mod helper; +mod history; +mod version; + +#[cfg(test)] +mod test; + +pub mod manager; + +// ---- re-exports ---- + +pub use helper::{LayoutHelper, RpcLayoutDigest, SyncLayoutDigest}; +pub use manager::WriteLock; +pub use version::*; + +// ---- defines: partitions ---- + +/// A partition id, which is stored on 16 bits +/// i.e. we have up to 2**16 partitions. +/// (in practice we have exactly 2**PARTITION_BITS partitions) +pub type Partition = u16; + +// TODO: make this constant parametrizable in the config file +// For deployments with many nodes it might make sense to bump +// it up to 10. +// Maximum value : 16 +/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in +/// presence of numerous nodes, but exponentially bigger ring. Max 16 +pub const PARTITION_BITS: usize = 8; + +const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; + +// ---- defines: nodes ---- + +// Type to store compactly the id of a node in the system +// Change this to u16 the day we want to have more than 256 nodes in a cluster +pub type CompactNodeType = u8; +pub const MAX_NODE_NUMBER: usize = 256; + +// ======== actual data structures for the layout data ======== +// ======== that is persisted to disk ======== +// some small utility impls are at the end of this file, +// but most of the code that actually computes stuff is in +// version.rs, history.rs and helper.rs + +mod v08 { + use crate::layout::CompactNodeType; + use garage_util::crdt::LwwMap; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + pub roles: LwwMap<Uuid, NodeRoleV>, + + // see comments in v010::ClusterLayout + pub node_id_vec: Vec<Uuid>, + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec<CompactNodeType>, + + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRoleV(pub Option<NodeRole>); + + /// The user-assigned roles of cluster nodes + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRole { + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution + pub zone: String, + /// The capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option<u64>, + /// A set of tags to recognize the node + pub tags: Vec<String>, + } + + impl garage_util::migrate::InitialFormat for ClusterLayout {} +} + +mod v09 { + use super::v08; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + pub use v08::{NodeRole, NodeRoleV}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap<Uuid, NodeRoleV>, + + // see comments in v010::ClusterLayout + pub node_id_vec: Vec<Uuid>, + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec<CompactNodeType>, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap<Uuid, NodeRoleV>, + pub staging_hash: Hash, + } + + /// This struct is used to set the parameters to be used in the assignment computation + /// algorithm. It is stored as a Crdt. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct LayoutParameters { + pub zone_redundancy: ZoneRedundancy, + } + + /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies + /// of each partition on at least that number of different zones. + /// Otherwise, copies will be stored on the maximum possible number of zones. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub enum ZoneRedundancy { + AtLeast(usize), + Maximum, + } + + impl garage_util::migrate::Migrate for ClusterLayout { + const VERSION_MARKER: &'static [u8] = b"G09layout"; + + type Previous = v08::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + use itertools::Itertools; + + // In the old layout, capacities are in an arbitrary unit, + // but in the new layout they are in bytes. + // Here we arbitrarily multiply everything by 1G, + // such that 1 old capacity unit = 1GB in the new units. + // This is totally arbitrary and won't work for most users. + let cap_mul = 1024 * 1024 * 1024; + let roles = multiply_all_capacities(previous.roles, cap_mul); + let staging_roles = multiply_all_capacities(previous.staging, cap_mul); + let node_id_vec = previous.node_id_vec; + + // Determine partition size + let mut tmp = previous.ring_assignation_data.clone(); + tmp.sort(); + let partition_size = tmp + .into_iter() + .dedup_with_count() + .map(|(npart, node)| { + roles + .get(&node_id_vec[node as usize]) + .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) + .unwrap_or(0) / npart as u64 + }) + .min() + .unwrap_or(0); + + // By default, zone_redundancy is maximum possible value + let parameters = LayoutParameters { + zone_redundancy: ZoneRedundancy::Maximum, + }; + + Self { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size, + parameters, + roles, + node_id_vec, + ring_assignment_data: previous.ring_assignation_data, + staging_parameters: Lww::new(parameters), + staging_roles, + staging_hash: [0u8; 32].into(), // will be set in the next migration + } + } + } + + fn multiply_all_capacities( + old_roles: LwwMap<Uuid, NodeRoleV>, + mul: u64, + ) -> LwwMap<Uuid, NodeRoleV> { + let mut new_roles = LwwMap::new(); + for (node, ts, role) in old_roles.items() { + let mut role = role.clone(); + if let NodeRoleV(Some(NodeRole { + capacity: Some(ref mut cap), + .. + })) = role + { + *cap *= mul; + } + new_roles.merge_raw(node, *ts, &role); + } + new_roles + } +} + +mod v010 { + use super::v09; + use crate::layout::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; + + /// Number of old (non-live) versions to keep, see LayoutHistory::old_versions + pub const OLD_VERSION_COUNT: usize = 5; + + /// The history of cluster layouts, with trackers to keep a record + /// of which nodes are up-to-date to current cluster data + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutHistory { + /// The versions currently in use in the cluster + pub versions: Vec<LayoutVersion>, + /// At most 5 of the previous versions, not used by the garage_table + /// module, but usefull for the garage_block module to find data blocks + /// that have not yet been moved + pub old_versions: Vec<LayoutVersion>, + + /// Update trackers + pub update_trackers: UpdateTrackers, + + /// Staged changes for the next version + pub staging: Lww<LayoutStaging>, + } + + /// A version of the layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutVersion { + /// The number of this version + pub version: u64, + + /// Roles assigned to nodes in this version + pub roles: LwwMap<Uuid, NodeRoleV>, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + /// The number of replicas for each data partition + pub replication_factor: usize, + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec<Uuid>, + /// number of non-gateway nodes, which are the first ids in node_id_vec + pub nongateway_node_count: usize, + /// The assignation of data partitions to nodes, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec<CompactNodeType>, + } + + /// The staged changes for the next layout version + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutStaging { + /// Parameters to be used in the next partition assignment computation. + pub parameters: Lww<LayoutParameters>, + /// Role changes which are staged for the next version of the layout + pub roles: LwwMap<Uuid, NodeRoleV>, + } + + /// The tracker of acknowlegments and data syncs around the cluster + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] + pub struct UpdateTrackers { + /// The highest layout version number each node has ack'ed + pub ack_map: UpdateTracker, + /// The highest layout version number each node has synced data for + pub sync_map: UpdateTracker, + /// The highest layout version number each node has + /// ack'ed that all other nodes have synced data for + pub sync_ack_map: UpdateTracker, + } + + /// Generic update tracker struct + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] + pub struct UpdateTracker(pub BTreeMap<Uuid, u64>); + + impl garage_util::migrate::Migrate for LayoutHistory { + const VERSION_MARKER: &'static [u8] = b"G010lh"; + + type Previous = v09::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + let nongateway_node_count = previous + .node_id_vec + .iter() + .enumerate() + .filter(|(_, uuid)| { + let role = previous.roles.get(uuid); + matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) + }) + .map(|(i, _)| i + 1) + .max() + .unwrap_or(0); + + let version = LayoutVersion { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size: previous.partition_size, + parameters: previous.parameters, + roles: previous.roles, + node_id_vec: previous.node_id_vec, + nongateway_node_count, + ring_assignment_data: previous.ring_assignment_data, + }; + let update_tracker = UpdateTracker( + version + .nongateway_nodes() + .iter() + .copied() + .map(|x| (x, version.version)) + .collect::<BTreeMap<Uuid, u64>>(), + ); + let staging = LayoutStaging { + parameters: previous.staging_parameters, + roles: previous.staging_roles, + }; + Self { + versions: vec![version], + old_versions: vec![], + update_trackers: UpdateTrackers { + ack_map: update_tracker.clone(), + sync_map: update_tracker.clone(), + sync_ack_map: update_tracker, + }, + staging: Lww::raw(previous.version, staging), + } + } + } +} + +pub use v010::*; + +// ---- utility functions ---- + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for LayoutStaging { + fn merge(&mut self, other: &LayoutStaging) { + self.parameters.merge(&other.parameters); + self.roles.merge(&other.roles); + } +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } + } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } +} + +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::<usize>() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) -> bool { + let mut changed = false; + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + if *v > *v_mut { + *v_mut = *v; + changed = true; + } + } else { + self.0.insert(*k, *v); + changed = true; + } + } + changed + } + + /// This bumps the update tracker for a given node up to the specified value. + /// This has potential impacts on the correctness of Garage and should only + /// be used in very specific circumstances. + pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool { + match self.0.get_mut(&peer) { + Some(e) if *e < value => { + *e = value; + true + } + None => { + self.0.insert(peer, value); + true + } + _ => false, + } + } + + pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { + storage_nodes + .iter() + .map(|x| self.get(x, min_version)) + .min() + .unwrap_or(min_version) + } + + pub fn get(&self, node: &Uuid, min_version: u64) -> u64 { + self.0.get(node).copied().unwrap_or(min_version) + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool { + let c1 = self.ack_map.merge(&other.ack_map); + let c2 = self.sync_map.merge(&other.sync_map); + let c3 = self.sync_ack_map.merge(&other.sync_ack_map); + c1 || c2 || c3 + } +} diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs new file mode 100644 index 00000000..88eb518e --- /dev/null +++ b/src/rpc/layout/test.rs @@ -0,0 +1,157 @@ +use std::cmp::min; +use std::collections::HashMap; + +use garage_util::crdt::Crdt; +use garage_util::error::*; + +use crate::layout::*; + +// This function checks that the partition size S computed is at least better than the +// one given by a very naive algorithm. To do so, we try to run the naive algorithm +// assuming a partion size of S+1. If we succed, it means that the optimal assignment +// was not optimal. The naive algorithm is the following : +// - we compute the max number of partitions associated to every node, capped at the +// partition number. It gives the number of tokens of every node. +// - every zone has a number of tokens equal to the sum of the tokens of its nodes. +// - we cycle over the partitions and associate zone tokens while respecting the +// zone redundancy constraint. +// NOTE: the naive algorithm is not optimal. Counter example: +// take nb_partition = 3 ; replication_factor = 5; redundancy = 4; +// number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) +// With these parameters, the naive algo fails, whereas there is a solution: +// (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) +fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> { + let over_size = cl.partition_size + 1; + let mut zone_token = HashMap::<String, usize>::new(); + + let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?; + + if zones.is_empty() { + return Ok(false); + } + + for z in zones.iter() { + zone_token.insert(z.clone(), 0); + } + for uuid in cl.nongateway_nodes() { + let z = cl.expect_get_node_zone(&uuid); + let c = cl.expect_get_node_capacity(&uuid); + zone_token.insert( + z.to_string(), + zone_token[z] + min(NB_PARTITIONS, (c / over_size) as usize), + ); + } + + // For every partition, we count the number of zone already associated and + // the name of the last zone associated + + let mut id_zone_token = vec![0; zones.len()]; + for (z, t) in zone_token.iter() { + id_zone_token[zone_to_id[z]] = *t; + } + + let mut nb_token = vec![0; NB_PARTITIONS]; + let mut last_zone = vec![zones.len(); NB_PARTITIONS]; + + let mut curr_zone = 0; + + let redundancy = cl.effective_zone_redundancy(); + + for replic in 0..cl.replication_factor { + for p in 0..NB_PARTITIONS { + while id_zone_token[curr_zone] == 0 + || (last_zone[p] == curr_zone + && redundancy - nb_token[p] <= cl.replication_factor - replic) + { + curr_zone += 1; + if curr_zone >= zones.len() { + return Ok(true); + } + } + id_zone_token[curr_zone] -= 1; + if last_zone[p] != curr_zone { + nb_token[p] += 1; + last_zone[p] = curr_zone; + } + } + } + + return Ok(false); +} + +fn show_msg(msg: &Message) { + for s in msg.iter() { + println!("{}", s); + } +} + +fn update_layout( + cl: &mut LayoutHistory, + node_capacity_vec: &[u64], + node_zone_vec: &[&'static str], + zone_redundancy: usize, +) { + let staging = cl.staging.get_mut(); + + for (i, (capacity, zone)) in node_capacity_vec + .iter() + .zip(node_zone_vec.iter()) + .enumerate() + { + let node_id = [i as u8; 32].into(); + + let update = staging.roles.update_mutator( + node_id, + NodeRoleV(Some(NodeRole { + zone: zone.to_string(), + capacity: Some(*capacity), + tags: (vec![]), + })), + ); + staging.roles.merge(&update); + } + staging.parameters.update(LayoutParameters { + zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy), + }); +} + +#[test] +fn test_assignment() { + let mut node_capacity_vec = vec![4000, 1000, 2000]; + let mut node_zone_vec = vec!["A", "B", "C"]; + + let mut cl = LayoutHistory::new(3); + update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.current().version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(check_against_naive(cl.current()).unwrap()); + + node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; + node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"]; + update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 2); + let v = cl.current().version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(check_against_naive(cl.current()).unwrap()); + + node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; + update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); + let v = cl.current().version; + let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(check_against_naive(cl.current()).unwrap()); + + node_capacity_vec = vec![ + 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000, + ]; + update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 1); + let v = cl.current().version; + let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + show_msg(&msg); + assert_eq!(cl.check(), Ok(())); + assert!(check_against_naive(cl.current()).unwrap()); +} diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs new file mode 100644 index 00000000..ee4b2821 --- /dev/null +++ b/src/rpc/layout/version.rs @@ -0,0 +1,846 @@ +use std::collections::HashMap; +use std::collections::HashSet; +use std::convert::TryInto; + +use bytesize::ByteSize; +use itertools::Itertools; + +use garage_util::crdt::{Crdt, LwwMap}; +use garage_util::data::*; +use garage_util::error::*; + +use super::graph_algo::*; +use super::*; + +// The Message type will be used to collect information on the algorithm. +pub type Message = Vec<String>; + +impl LayoutVersion { + pub fn new(replication_factor: usize) -> Self { + // We set the default zone redundancy to be Maximum, meaning that the maximum + // possible value will be used depending on the cluster topology + let parameters = LayoutParameters { + zone_redundancy: ZoneRedundancy::Maximum, + }; + + LayoutVersion { + version: 0, + replication_factor, + partition_size: 0, + roles: LwwMap::new(), + node_id_vec: Vec::new(), + nongateway_node_count: 0, + ring_assignment_data: Vec::new(), + parameters, + } + } + + // ===================== accessors ====================== + + /// Returns a list of IDs of nodes that have a role in this + /// version of the cluster layout, including gateway nodes + pub fn all_nodes(&self) -> &[Uuid] { + &self.node_id_vec[..] + } + + /// Returns a list of IDs of nodes that have a storage capacity + /// assigned in this version of the cluster layout + pub fn nongateway_nodes(&self) -> &[Uuid] { + &self.node_id_vec[..self.nongateway_node_count] + } + + /// Returns the role of a node in the layout, if it has one + pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> { + match self.roles.get(node) { + Some(NodeRoleV(Some(v))) => Some(v), + _ => None, + } + } + + /// Returns the capacity of a node in the layout, if it has one + pub fn get_node_capacity(&self, uuid: &Uuid) -> Option<u64> { + match self.node_role(uuid) { + Some(NodeRole { + capacity: Some(cap), + zone: _, + tags: _, + }) => Some(*cap), + _ => None, + } + } + + /// Given a node uuids, this function returns the label of its zone if it has one + pub fn get_node_zone(&self, uuid: &Uuid) -> Option<&str> { + match self.node_role(uuid) { + Some(role) => Some(&role.zone), + _ => None, + } + } + + /// Returns the number of partitions associated to this node in the ring + pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> { + for (i, id) in self.node_id_vec.iter().enumerate() { + if id == uuid { + let mut count = 0; + for nod in self.ring_assignment_data.iter() { + if i as u8 == *nod { + count += 1 + } + } + return Ok(count); + } + } + Err(Error::Message( + "The Uuid does not correspond to a node present in the \ + cluster or this node does not have a positive capacity." + .into(), + )) + } + + /// Get the partition in which data would fall on + pub fn partition_of(&self, position: &Hash) -> Partition { + let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); + top >> (16 - PARTITION_BITS) + } + + /// Get the list of partitions and the first hash of a partition key that would fall in it + pub fn partitions(&self) -> impl Iterator<Item = (Partition, Hash)> + '_ { + (0..(1 << PARTITION_BITS)).map(|i| { + let top = (i as u16) << (16 - PARTITION_BITS); + let mut location = [0u8; 32]; + location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); + (i as u16, Hash::from(location)) + }) + } + + /// Return the n servers in which data for this hash should be replicated + pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator<Item = Uuid> + '_ { + assert_eq!(n, self.replication_factor); + + let data = &self.ring_assignment_data; + + let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) { + let partition_idx = self.partition_of(position) as usize; + let partition_start = partition_idx * self.replication_factor; + let partition_end = (partition_idx + 1) * self.replication_factor; + &data[partition_start..partition_end] + } else { + warn!("Ring not yet ready, read/writes will be lost!"); + &[] + }; + + partition_nodes + .iter() + .map(move |i| self.node_id_vec[*i as usize]) + } + + // ===================== internal information extractors ====================== + + pub(crate) fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 { + self.get_node_capacity(uuid) + .expect("non-gateway node with zero capacity") + } + + pub(crate) fn expect_get_node_zone(&self, uuid: &Uuid) -> &str { + self.get_node_zone(uuid).expect("node without a zone") + } + + /// Returns the sum of capacities of non gateway nodes in the cluster + fn get_total_capacity(&self) -> u64 { + let mut total_capacity = 0; + for uuid in self.nongateway_nodes() { + total_capacity += self.expect_get_node_capacity(uuid); + } + total_capacity + } + + /// Returns the effective value of the zone_redundancy parameter + pub(crate) fn effective_zone_redundancy(&self) -> usize { + match self.parameters.zone_redundancy { + ZoneRedundancy::AtLeast(v) => v, + ZoneRedundancy::Maximum => { + let n_zones = self + .roles + .items() + .iter() + .filter_map(|(_, _, role)| role.0.as_ref().map(|x| x.zone.as_str())) + .collect::<HashSet<&str>>() + .len(); + std::cmp::min(n_zones, self.replication_factor) + } + } + } + + /// Check a cluster layout for internal consistency + /// (assignment, roles, parameters, partition size) + /// returns true if consistent, false if error + pub fn check(&self) -> Result<(), String> { + // Check that the assignment data has the correct length + let expected_assignment_data_len = (1 << PARTITION_BITS) * self.replication_factor; + if self.ring_assignment_data.len() != expected_assignment_data_len { + return Err(format!( + "ring_assignment_data has incorrect length {} instead of {}", + self.ring_assignment_data.len(), + expected_assignment_data_len + )); + } + + // Check that node_id_vec contains the correct list of nodes + let mut expected_nodes = self + .roles + .items() + .iter() + .filter(|(_, _, v)| v.0.is_some()) + .map(|(id, _, _)| *id) + .collect::<Vec<_>>(); + expected_nodes.sort(); + let mut node_id_vec = self.node_id_vec.clone(); + node_id_vec.sort(); + if expected_nodes != node_id_vec { + return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes)); + } + + // Check that the assigned nodes are correct identifiers + // of nodes that are assigned a role + // and that role is not the role of a gateway nodes + for x in self.ring_assignment_data.iter() { + if *x as usize >= self.node_id_vec.len() { + return Err(format!( + "ring_assignment_data contains invalid node id {}", + *x + )); + } + let node = self.node_id_vec[*x as usize]; + match self.roles.get(&node) { + Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (), + _ => return Err("ring_assignment_data contains id of a gateway node".into()), + } + } + + // Check that every partition is associated to distinct nodes + let zone_redundancy = self.effective_zone_redundancy(); + let rf = self.replication_factor; + for p in 0..(1 << PARTITION_BITS) { + let nodes_of_p = self.ring_assignment_data[rf * p..rf * (p + 1)].to_vec(); + if nodes_of_p.iter().unique().count() != rf { + return Err(format!("partition does not contain {} unique node ids", rf)); + } + // Check that every partition is spread over at least zone_redundancy zones. + let zones_of_p = nodes_of_p + .iter() + .map(|n| self.expect_get_node_zone(&self.node_id_vec[*n as usize])) + .collect::<Vec<_>>(); + if zones_of_p.iter().unique().count() < zone_redundancy { + return Err(format!( + "nodes of partition are in less than {} distinct zones", + zone_redundancy + )); + } + } + + // Check that the nodes capacities is consistent with the stored partitions + let mut node_usage = vec![0; MAX_NODE_NUMBER]; + for n in self.ring_assignment_data.iter() { + node_usage[*n as usize] += 1; + } + for (n, usage) in node_usage.iter().enumerate() { + if *usage > 0 { + let uuid = self.node_id_vec[n]; + let partusage = usage * self.partition_size; + let nodecap = self.expect_get_node_capacity(&uuid); + if partusage > nodecap { + return Err(format!( + "node usage ({}) is bigger than node capacity ({})", + usage * self.partition_size, + nodecap + )); + } + } + } + + // Check that the partition size stored is the one computed by the asignation + // algorithm. + let cl2 = self.clone(); + let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().unwrap(); + match cl2.compute_optimal_partition_size(&zone_to_id, zone_redundancy) { + Ok(s) if s != self.partition_size => { + return Err(format!( + "partition_size ({}) is different than optimal value ({})", + self.partition_size, s + )) + } + Err(e) => return Err(format!("could not calculate optimal partition size: {}", e)), + _ => (), + } + + Ok(()) + } + + // ================== updates to layout, internals =================== + + pub(crate) fn calculate_next_version( + mut self, + staging: &LayoutStaging, + ) -> Result<(Self, Message), Error> { + self.version += 1; + + self.roles.merge(&staging.roles); + self.roles.retain(|(_, _, v)| v.0.is_some()); + self.parameters = *staging.parameters.get(); + + let msg = self.calculate_partition_assignment()?; + + Ok((self, msg)) + } + + /// This function calculates a new partition-to-node assignment. + /// The computed assignment respects the node replication factor + /// and the zone redundancy parameter It maximizes the capacity of a + /// partition (assuming all partitions have the same size). + /// Among such optimal assignment, it minimizes the distance to + /// the former assignment (if any) to minimize the amount of + /// data to be moved. + /// Staged role changes must be merged with nodes roles before calling this function, + /// hence it must only be called from apply_staged_changes() and hence is not public. + fn calculate_partition_assignment(&mut self) -> Result<Message, Error> { + // We update the node ids, since the node role list might have changed with the + // changes in the layout. We retrieve the old_assignment reframed with new ids + let old_assignment_opt = self.update_node_id_vec()?; + + let zone_redundancy = self.effective_zone_redundancy(); + + let mut msg = Message::new(); + msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into()); + msg.push("".into()); + msg.push(format!( + "Partitions are \ + replicated {} times on at least {} distinct zones.", + self.replication_factor, zone_redundancy + )); + + // We generate for once numerical ids for the zones of non gateway nodes, + // to use them as indices in the flow graphs. + let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; + + if self.nongateway_nodes().len() < self.replication_factor { + return Err(Error::Message(format!( + "The number of nodes with positive \ + capacity ({}) is smaller than the replication factor ({}).", + self.nongateway_nodes().len(), + self.replication_factor + ))); + } + if id_to_zone.len() < zone_redundancy { + return Err(Error::Message(format!( + "The number of zones with non-gateway \ + nodes ({}) is smaller than the redundancy parameter ({})", + id_to_zone.len(), + zone_redundancy + ))); + } + + // We compute the optimal partition size + // Capacities should be given in a unit so that partition size is at least 100. + // In this case, integer rounding plays a marginal role in the percentages of + // optimality. + let partition_size = self.compute_optimal_partition_size(&zone_to_id, zone_redundancy)?; + + msg.push("".into()); + if old_assignment_opt.is_some() { + msg.push(format!( + "Optimal partition size: {} ({} in previous layout)", + ByteSize::b(partition_size).to_string_as(false), + ByteSize::b(self.partition_size).to_string_as(false) + )); + } else { + msg.push(format!( + "Optimal partition size: {}", + ByteSize::b(partition_size).to_string_as(false) + )); + } + // We write the partition size. + self.partition_size = partition_size; + + if partition_size < 100 { + msg.push( + "WARNING: The partition size is low (< 100), make sure the capacities of your nodes are correct and are of at least a few MB" + .into(), + ); + } + + // We compute a first flow/assignment that is heuristically close to the previous + // assignment + let mut gflow = + self.compute_candidate_assignment(&zone_to_id, &old_assignment_opt, zone_redundancy)?; + if let Some(assoc) = &old_assignment_opt { + // We minimize the distance to the previous assignment. + self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; + } + + // We display statistics of the computation + msg.extend(self.output_stat(&gflow, &old_assignment_opt, &zone_to_id, &id_to_zone)?); + + // We update the layout structure + self.update_ring_from_flow(id_to_zone.len(), &gflow)?; + + if let Err(e) = self.check() { + return Err(Error::Message( + format!("Layout check returned an error: {}\nOriginal result of computation: <<<<\n{}\n>>>>", e, msg.join("\n")) + )); + } + + Ok(msg) + } + + /// The LwwMap of node roles might have changed. This function updates the node_id_vec + /// and returns the assignment given by ring, with the new indices of the nodes, and + /// None if the node is not present anymore. + /// We work with the assumption that only this function and calculate_new_assignment + /// do modify assignment_ring and node_id_vec. + fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> { + // (1) We compute the new node list + // Non gateway nodes should be coded on 8bits, hence they must be first in the list + // We build the new node ids + let new_non_gateway_nodes: Vec<Uuid> = self + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity.is_some())) + .map(|(k, _, _)| *k) + .collect(); + + if new_non_gateway_nodes.len() > MAX_NODE_NUMBER { + return Err(Error::Message(format!( + "There are more than {} non-gateway nodes in the new \ + layout. This is not allowed.", + MAX_NODE_NUMBER + ))); + } + + let new_gateway_nodes: Vec<Uuid> = self + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_none())) + .map(|(k, _, _)| *k) + .collect(); + + let old_node_id_vec = std::mem::take(&mut self.node_id_vec); + + self.nongateway_node_count = new_non_gateway_nodes.len(); + self.node_id_vec.clear(); + self.node_id_vec.extend(new_non_gateway_nodes); + self.node_id_vec.extend(new_gateway_nodes); + + let new_node_id_vec = &self.node_id_vec; + + // (2) We retrieve the old association + // We rewrite the old association with the new indices. We only consider partition + // to node assignments where the node is still in use. + if self.ring_assignment_data.is_empty() { + // This is a new association + return Ok(None); + } + + if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor { + return Err(Error::Message( + "The old assignment does not have a size corresponding to \ + the old replication factor or the number of partitions." + .into(), + )); + } + + // We build a translation table between the uuid and new ids + let mut uuid_to_new_id = HashMap::<Uuid, usize>::new(); + + // We add the indices of only the new non-gateway nodes that can be used in the + // association ring + for (i, uuid) in new_node_id_vec.iter().enumerate() { + uuid_to_new_id.insert(*uuid, i); + } + + let mut old_assignment = vec![Vec::<usize>::new(); NB_PARTITIONS]; + let rf = self.replication_factor; + + for (p, old_assign_p) in old_assignment.iter_mut().enumerate() { + for old_id in &self.ring_assignment_data[p * rf..(p + 1) * rf] { + let uuid = old_node_id_vec[*old_id as usize]; + if uuid_to_new_id.contains_key(&uuid) { + old_assign_p.push(uuid_to_new_id[&uuid]); + } + } + } + + // We clear the ring assignemnt data + self.ring_assignment_data = Vec::<CompactNodeType>::new(); + + Ok(Some(old_assignment)) + } + + /// This function generates ids for the zone of the nodes appearing in + /// self.node_id_vec. + pub(crate) fn generate_nongateway_zone_ids( + &self, + ) -> Result<(Vec<String>, HashMap<String, usize>), Error> { + let mut id_to_zone = Vec::<String>::new(); + let mut zone_to_id = HashMap::<String, usize>::new(); + + for uuid in self.nongateway_nodes().iter() { + let r = self.node_role(uuid).unwrap(); + if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() { + zone_to_id.insert(r.zone.clone(), id_to_zone.len()); + id_to_zone.push(r.zone.clone()); + } + } + Ok((id_to_zone, zone_to_id)) + } + + /// This function computes by dichotomy the largest realizable partition size, given + /// the layout roles and parameters. + fn compute_optimal_partition_size( + &self, + zone_to_id: &HashMap<String, usize>, + zone_redundancy: usize, + ) -> Result<u64, Error> { + let empty_set = HashSet::<(usize, usize)>::new(); + let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set, zone_redundancy)?; + g.compute_maximal_flow()?; + if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 { + return Err(Error::Message( + "The storage capacity of he cluster is to small. It is \ + impossible to store partitions of size 1." + .into(), + )); + } + + let mut s_down = 1; + let mut s_up = self.get_total_capacity(); + while s_down + 1 < s_up { + g = self.generate_flow_graph( + (s_down + s_up) / 2, + zone_to_id, + &empty_set, + zone_redundancy, + )?; + g.compute_maximal_flow()?; + if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 { + s_up = (s_down + s_up) / 2; + } else { + s_down = (s_down + s_up) / 2; + } + } + + Ok(s_down) + } + + fn generate_graph_vertices(nb_zones: usize, nb_nodes: usize) -> Vec<Vertex> { + let mut vertices = vec![Vertex::Source, Vertex::Sink]; + for p in 0..NB_PARTITIONS { + vertices.push(Vertex::Pup(p)); + vertices.push(Vertex::Pdown(p)); + for z in 0..nb_zones { + vertices.push(Vertex::PZ(p, z)); + } + } + for n in 0..nb_nodes { + vertices.push(Vertex::N(n)); + } + vertices + } + + /// Generates the graph to compute the maximal flow corresponding to the optimal + /// partition assignment. + /// exclude_assoc is the set of (partition, node) association that we are forbidden + /// to use (hence we do not add the corresponding edge to the graph). This parameter + /// is used to compute a first flow that uses only edges appearing in the previous + /// assignment. This produces a solution that heuristically should be close to the + /// previous one. + fn generate_flow_graph( + &self, + partition_size: u64, + zone_to_id: &HashMap<String, usize>, + exclude_assoc: &HashSet<(usize, usize)>, + zone_redundancy: usize, + ) -> Result<Graph<FlowEdge>, Error> { + let vertices = + LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); + let mut g = Graph::<FlowEdge>::new(&vertices); + let nb_zones = zone_to_id.len(); + for p in 0..NB_PARTITIONS { + g.add_edge(Vertex::Source, Vertex::Pup(p), zone_redundancy as u64)?; + g.add_edge( + Vertex::Source, + Vertex::Pdown(p), + (self.replication_factor - zone_redundancy) as u64, + )?; + for z in 0..nb_zones { + g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?; + g.add_edge( + Vertex::Pdown(p), + Vertex::PZ(p, z), + self.replication_factor as u64, + )?; + } + } + for n in 0..self.nongateway_nodes().len() { + let node_capacity = self.expect_get_node_capacity(&self.node_id_vec[n]); + let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[n])]; + g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; + for p in 0..NB_PARTITIONS { + if !exclude_assoc.contains(&(p, n)) { + g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?; + } + } + } + Ok(g) + } + + /// This function computes a first optimal assignment (in the form of a flow graph). + fn compute_candidate_assignment( + &self, + zone_to_id: &HashMap<String, usize>, + prev_assign_opt: &Option<Vec<Vec<usize>>>, + zone_redundancy: usize, + ) -> Result<Graph<FlowEdge>, Error> { + // We list the (partition,node) associations that are not used in the + // previous assignment + let mut exclude_edge = HashSet::<(usize, usize)>::new(); + if let Some(prev_assign) = prev_assign_opt { + let nb_nodes = self.nongateway_nodes().len(); + for (p, prev_assign_p) in prev_assign.iter().enumerate() { + for n in 0..nb_nodes { + exclude_edge.insert((p, n)); + } + for n in prev_assign_p.iter() { + exclude_edge.remove(&(p, *n)); + } + } + } + + // We compute the best flow using only the edges used in the previous assignment + let mut g = self.generate_flow_graph( + self.partition_size, + zone_to_id, + &exclude_edge, + zone_redundancy, + )?; + g.compute_maximal_flow()?; + + // We add the excluded edges and compute the maximal flow with the full graph. + // The algorithm is such that it will start with the flow that we just computed + // and find ameliorating paths from that. + for (p, n) in exclude_edge.iter() { + let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])]; + g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; + } + g.compute_maximal_flow()?; + Ok(g) + } + + /// This function updates the flow graph gflow to minimize the distance between + /// its corresponding assignment and the previous one + fn minimize_rebalance_load( + &self, + gflow: &mut Graph<FlowEdge>, + zone_to_id: &HashMap<String, usize>, + prev_assign: &[Vec<usize>], + ) -> Result<(), Error> { + // We define a cost function on the edges (pairs of vertices) corresponding + // to the distance between the two assignments. + let mut cost = CostFunction::new(); + for (p, assoc_p) in prev_assign.iter().enumerate() { + for n in assoc_p.iter() { + let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])]; + cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1); + } + } + + // We compute the maximal length of a simple path in gflow. It is used in the + // Bellman-Ford algorithm in optimize_flow_with_cost to set the number + // of iterations. + let nb_nodes = self.nongateway_nodes().len(); + let path_length = 4 * nb_nodes; + gflow.optimize_flow_with_cost(&cost, path_length)?; + + Ok(()) + } + + /// This function updates the assignment ring from the flow graph. + fn update_ring_from_flow( + &mut self, + nb_zones: usize, + gflow: &Graph<FlowEdge>, + ) -> Result<(), Error> { + self.ring_assignment_data = Vec::<CompactNodeType>::new(); + for p in 0..NB_PARTITIONS { + for z in 0..nb_zones { + let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + for vertex in assoc_vertex.iter() { + if let Vertex::N(n) = vertex { + self.ring_assignment_data.push((*n).try_into().unwrap()); + } + } + } + } + + if self.ring_assignment_data.len() != NB_PARTITIONS * self.replication_factor { + return Err(Error::Message( + "Critical Error : the association ring we produced does not \ + have the right size." + .into(), + )); + } + Ok(()) + } + + /// This function returns a message summing up the partition repartition of the new + /// layout, and other statistics of the partition assignment computation. + fn output_stat( + &self, + gflow: &Graph<FlowEdge>, + prev_assign_opt: &Option<Vec<Vec<usize>>>, + zone_to_id: &HashMap<String, usize>, + id_to_zone: &[String], + ) -> Result<Message, Error> { + let mut msg = Message::new(); + + let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64; + let total_cap = self.get_total_capacity(); + let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); + msg.push(format!( + "Usable capacity / total cluster capacity: {} / {} ({:.1} %)", + ByteSize::b(used_cap).to_string_as(false), + ByteSize::b(total_cap).to_string_as(false), + percent_cap + )); + msg.push(format!( + "Effective capacity (replication factor {}): {}", + self.replication_factor, + ByteSize::b(used_cap / self.replication_factor as u64).to_string_as(false) + )); + if percent_cap < 80. { + msg.push("".into()); + msg.push( + "If the percentage is too low, it might be that the \ + cluster topology and redundancy constraints are forcing the use of nodes/zones with small \ + storage capacities." + .into(), + ); + msg.push( + "You might want to move storage capacity between zones or relax the redundancy constraint." + .into(), + ); + msg.push( + "See the detailed statistics below and look for saturated nodes/zones.".into(), + ); + } + + // We define and fill in the following tables + let storing_nodes = self.nongateway_nodes(); + let mut new_partitions = vec![0; storing_nodes.len()]; + let mut stored_partitions = vec![0; storing_nodes.len()]; + + let mut new_partitions_zone = vec![0; id_to_zone.len()]; + let mut stored_partitions_zone = vec![0; id_to_zone.len()]; + + for p in 0..NB_PARTITIONS { + for z in 0..id_to_zone.len() { + let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?; + if !pz_nodes.is_empty() { + stored_partitions_zone[z] += 1; + if let Some(prev_assign) = prev_assign_opt { + let mut old_zones_of_p = Vec::<usize>::new(); + for n in prev_assign[p].iter() { + old_zones_of_p + .push(zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])]); + } + if !old_zones_of_p.contains(&z) { + new_partitions_zone[z] += 1; + } + } + } + for vert in pz_nodes.iter() { + if let Vertex::N(n) = *vert { + stored_partitions[n] += 1; + if let Some(prev_assign) = prev_assign_opt { + if !prev_assign[p].contains(&n) { + new_partitions[n] += 1; + } + } + } + } + } + } + + if prev_assign_opt.is_none() { + new_partitions = stored_partitions.clone(); + //new_partitions_zone = stored_partitions_zone.clone(); + } + + // We display the statistics + + msg.push("".into()); + if prev_assign_opt.is_some() { + let total_new_partitions: usize = new_partitions.iter().sum(); + msg.push(format!( + "A total of {} new copies of partitions need to be \ + transferred.", + total_new_partitions + )); + msg.push("".into()); + } + + let mut table = vec![]; + for z in 0..id_to_zone.len() { + let mut nodes_of_z = Vec::<usize>::new(); + for n in 0..storing_nodes.len() { + if self.expect_get_node_zone(&self.node_id_vec[n]) == id_to_zone[z] { + nodes_of_z.push(n); + } + } + let replicated_partitions: usize = + nodes_of_z.iter().map(|n| stored_partitions[*n]).sum(); + table.push(format!( + "{}\tTags\tPartitions\tCapacity\tUsable capacity", + id_to_zone[z] + )); + + let available_cap_z: u64 = self.partition_size * replicated_partitions as u64; + let mut total_cap_z = 0; + for n in nodes_of_z.iter() { + total_cap_z += self.expect_get_node_capacity(&self.node_id_vec[*n]); + } + let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); + + for n in nodes_of_z.iter() { + let available_cap_n = stored_partitions[*n] as u64 * self.partition_size; + let total_cap_n = self.expect_get_node_capacity(&self.node_id_vec[*n]); + let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or("<??>"))?.tags_string(); + table.push(format!( + " {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)", + self.node_id_vec[*n], + tags_n, + stored_partitions[*n], + new_partitions[*n], + ByteSize::b(total_cap_n).to_string_as(false), + ByteSize::b(available_cap_n).to_string_as(false), + (available_cap_n as f32) / (total_cap_n as f32) * 100.0, + )); + } + + table.push(format!( + " TOTAL\t\t{} ({} unique)\t{}\t{} ({:.1}%)", + replicated_partitions, + stored_partitions_zone[z], + //new_partitions_zone[z], + ByteSize::b(total_cap_z).to_string_as(false), + ByteSize::b(available_cap_z).to_string_as(false), + percent_cap_z + )); + table.push("".into()); + } + msg.push(format_table::format_table_to_string(table)); + + Ok(msg) + } +} |