aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-09 11:19:43 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-09 11:19:43 +0100
commit523d2ecb9511f74e144cd116b942d6c1bf0f546d (patch)
tree7ba0323fb691eac4f05308676cd24771a8a6a8bb /src/rpc/layout
parent1da0a5676edcd20fc5c7412596edb5772da9f606 (diff)
downloadgarage-523d2ecb9511f74e144cd116b942d6c1bf0f546d.tar.gz
garage-523d2ecb9511f74e144cd116b942d6c1bf0f546d.zip
layout: use separate CRDT for staged layout changes
Diffstat (limited to 'src/rpc/layout')
-rw-r--r--src/rpc/layout/graph_algo.rs405
-rw-r--r--src/rpc/layout/history.rs82
-rw-r--r--src/rpc/layout/mod.rs4
-rw-r--r--src/rpc/layout/schema.rs106
-rw-r--r--src/rpc/layout/tracker.rs21
-rw-r--r--src/rpc/layout/version.rs54
6 files changed, 535 insertions, 137 deletions
diff --git a/src/rpc/layout/graph_algo.rs b/src/rpc/layout/graph_algo.rs
new file mode 100644
index 00000000..bd33e97f
--- /dev/null
+++ b/src/rpc/layout/graph_algo.rs
@@ -0,0 +1,405 @@
+//! This module deals with graph algorithms.
+//! It is used in layout.rs to build the partition to node assignment.
+
+use rand::prelude::{SeedableRng, SliceRandom};
+use std::cmp::{max, min};
+use std::collections::HashMap;
+use std::collections::VecDeque;
+
+/// Vertex data structures used in all the graphs used in layout.rs.
+/// usize parameters correspond to node/zone/partitions ids.
+/// To understand the vertex roles below, please refer to the formal description
+/// of the layout computation algorithm.
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum Vertex {
+ Source,
+ Pup(usize), // The vertex p+ of partition p
+ Pdown(usize), // The vertex p- of partition p
+ PZ(usize, usize), // The vertex corresponding to x_(partition p, zone z)
+ N(usize), // The vertex corresponding to node n
+ Sink,
+}
+
+/// Edge data structure for the flow algorithm.
+#[derive(Clone, Copy, Debug)]
+pub struct FlowEdge {
+ cap: u64, // flow maximal capacity of the edge
+ flow: i64, // flow value on the edge
+ dest: usize, // destination vertex id
+ rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v
+}
+
+/// Edge data structure for the detection of negative cycles.
+#[derive(Clone, Copy, Debug)]
+pub struct WeightedEdge {
+ w: i64, // weight of the edge
+ dest: usize,
+}
+
+pub trait Edge: Clone + Copy {}
+impl Edge for FlowEdge {}
+impl Edge for WeightedEdge {}
+
+/// Struct for the graph structure. We do encapsulation here to be able to both
+/// provide user friendly Vertex enum to address vertices, and to use internally usize
+/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed.
+pub struct Graph<E: Edge> {
+ vertex_to_id: HashMap<Vertex, usize>,
+ id_to_vertex: Vec<Vertex>,
+
+ // The graph is stored as an adjacency list
+ graph: Vec<Vec<E>>,
+}
+
+pub type CostFunction = HashMap<(Vertex, Vertex), i64>;
+
+impl<E: Edge> Graph<E> {
+ pub fn new(vertices: &[Vertex]) -> Self {
+ let mut map = HashMap::<Vertex, usize>::new();
+ for (i, vert) in vertices.iter().enumerate() {
+ map.insert(*vert, i);
+ }
+ Graph::<E> {
+ vertex_to_id: map,
+ id_to_vertex: vertices.to_vec(),
+ graph: vec![Vec::<E>::new(); vertices.len()],
+ }
+ }
+
+ fn get_vertex_id(&self, v: &Vertex) -> Result<usize, String> {
+ self.vertex_to_id
+ .get(v)
+ .cloned()
+ .ok_or_else(|| format!("The graph does not contain vertex {:?}", v))
+ }
+}
+
+impl Graph<FlowEdge> {
+ /// This function adds a directed edge to the graph with capacity c, and the
+ /// corresponding reversed edge with capacity 0.
+ pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> {
+ let idu = self.get_vertex_id(&u)?;
+ let idv = self.get_vertex_id(&v)?;
+ if idu == idv {
+ return Err("Cannot add edge from vertex to itself in flow graph".into());
+ }
+
+ let rev_u = self.graph[idu].len();
+ let rev_v = self.graph[idv].len();
+ self.graph[idu].push(FlowEdge {
+ cap: c,
+ dest: idv,
+ flow: 0,
+ rev: rev_v,
+ });
+ self.graph[idv].push(FlowEdge {
+ cap: 0,
+ dest: idu,
+ flow: 0,
+ rev: rev_u,
+ });
+ Ok(())
+ }
+
+ /// This function returns the list of vertices that receive a positive flow from
+ /// vertex v.
+ pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> {
+ let idv = self.get_vertex_id(&v)?;
+ let mut result = Vec::<Vertex>::new();
+ for edge in self.graph[idv].iter() {
+ if edge.flow > 0 {
+ result.push(self.id_to_vertex[edge.dest]);
+ }
+ }
+ Ok(result)
+ }
+
+ /// This function returns the value of the flow outgoing from v.
+ pub fn get_outflow(&self, v: Vertex) -> Result<i64, String> {
+ let idv = self.get_vertex_id(&v)?;
+ let mut result = 0;
+ for edge in self.graph[idv].iter() {
+ result += max(0, edge.flow);
+ }
+ Ok(result)
+ }
+
+ /// This function computes the flow total value by computing the outgoing flow
+ /// from the source.
+ pub fn get_flow_value(&mut self) -> Result<i64, String> {
+ self.get_outflow(Vertex::Source)
+ }
+
+ /// This function shuffles the order of the edge lists. It keeps the ids of the
+ /// reversed edges consistent.
+ fn shuffle_edges(&mut self) {
+ // We use deterministic randomness so that the layout calculation algorihtm
+ // will output the same thing every time it is run. This way, the results
+ // pre-calculated in `garage layout show` will match exactly those used
+ // in practice with `garage layout apply`
+ let mut rng = rand::rngs::StdRng::from_seed([0x12u8; 32]);
+ for i in 0..self.graph.len() {
+ self.graph[i].shuffle(&mut rng);
+ // We need to update the ids of the reverse edges.
+ for j in 0..self.graph[i].len() {
+ let target_v = self.graph[i][j].dest;
+ let target_rev = self.graph[i][j].rev;
+ self.graph[target_v][target_rev].rev = j;
+ }
+ }
+ }
+
+ /// Computes an upper bound of the flow on the graph
+ pub fn flow_upper_bound(&self) -> Result<u64, String> {
+ let idsource = self.get_vertex_id(&Vertex::Source)?;
+ let mut flow_upper_bound = 0;
+ for edge in self.graph[idsource].iter() {
+ flow_upper_bound += edge.cap;
+ }
+ Ok(flow_upper_bound)
+ }
+
+ /// This function computes the maximal flow using Dinic's algorithm. It starts with
+ /// the flow values already present in the graph. So it is possible to add some edge to
+ /// the graph, compute a flow, add other edges, update the flow.
+ pub fn compute_maximal_flow(&mut self) -> Result<(), String> {
+ let idsource = self.get_vertex_id(&Vertex::Source)?;
+ let idsink = self.get_vertex_id(&Vertex::Sink)?;
+
+ let nb_vertices = self.graph.len();
+
+ let flow_upper_bound = self.flow_upper_bound()?;
+
+ // To ensure the dispersion of the associations generated by the
+ // assignment, we shuffle the neighbours of the nodes. Hence,
+ // the vertices do not consider their neighbours in the same order.
+ self.shuffle_edges();
+
+ // We run Dinic's max flow algorithm
+ loop {
+ // We build the level array from Dinic's algorithm.
+ let mut level = vec![None; nb_vertices];
+
+ let mut fifo = VecDeque::new();
+ fifo.push_back((idsource, 0));
+ while let Some((id, lvl)) = fifo.pop_front() {
+ if level[id].is_none() {
+ // it means id has not yet been reached
+ level[id] = Some(lvl);
+ for edge in self.graph[id].iter() {
+ if edge.cap as i64 - edge.flow > 0 {
+ fifo.push_back((edge.dest, lvl + 1));
+ }
+ }
+ }
+ }
+ if level[idsink].is_none() {
+ // There is no residual flow
+ break;
+ }
+ // Now we run DFS respecting the level array
+ let mut next_nbd = vec![0; nb_vertices];
+ let mut lifo = Vec::new();
+
+ lifo.push((idsource, flow_upper_bound));
+
+ while let Some((id, f)) = lifo.last().cloned() {
+ if id == idsink {
+ // The DFS reached the sink, we can add a
+ // residual flow.
+ lifo.pop();
+ while let Some((id, _)) = lifo.pop() {
+ let nbd = next_nbd[id];
+ self.graph[id][nbd].flow += f as i64;
+ let id_rev = self.graph[id][nbd].dest;
+ let nbd_rev = self.graph[id][nbd].rev;
+ self.graph[id_rev][nbd_rev].flow -= f as i64;
+ }
+ lifo.push((idsource, flow_upper_bound));
+ continue;
+ }
+ // else we did not reach the sink
+ let nbd = next_nbd[id];
+ if nbd >= self.graph[id].len() {
+ // There is nothing to explore from id anymore
+ lifo.pop();
+ if let Some((parent, _)) = lifo.last() {
+ next_nbd[*parent] += 1;
+ }
+ continue;
+ }
+ // else we can try to send flow from id to its nbd
+ let new_flow = min(
+ f as i64,
+ self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow,
+ ) as u64;
+ if new_flow == 0 {
+ next_nbd[id] += 1;
+ continue;
+ }
+ if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) {
+ if lvldest <= lvlid {
+ // We cannot send flow to nbd.
+ next_nbd[id] += 1;
+ continue;
+ }
+ }
+ // otherwise, we send flow to nbd.
+ lifo.push((self.graph[id][nbd].dest, new_flow));
+ }
+ }
+ Ok(())
+ }
+
+ /// This function takes a flow, and a cost function on the edges, and tries to find an
+ /// equivalent flow with a better cost, by finding improving overflow cycles. It uses
+ /// as subroutine the Bellman Ford algorithm run up to path_length.
+ /// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and
+ /// only one needs to be present in the cost function.
+ pub fn optimize_flow_with_cost(
+ &mut self,
+ cost: &CostFunction,
+ path_length: usize,
+ ) -> Result<(), String> {
+ // We build the weighted graph g where we will look for negative cycle
+ let mut gf = self.build_cost_graph(cost)?;
+ let mut cycles = gf.list_negative_cycles(path_length);
+ while !cycles.is_empty() {
+ // we enumerate negative cycles
+ for c in cycles.iter() {
+ for i in 0..c.len() {
+ // We add one flow unit to the edge (u,v) of cycle c
+ let idu = self.vertex_to_id[&c[i]];
+ let idv = self.vertex_to_id[&c[(i + 1) % c.len()]];
+ for j in 0..self.graph[idu].len() {
+ // since idu appears at most once in the cycles, we enumerate every
+ // edge at most once.
+ let edge = self.graph[idu][j];
+ if edge.dest == idv {
+ self.graph[idu][j].flow += 1;
+ self.graph[idv][edge.rev].flow -= 1;
+ break;
+ }
+ }
+ }
+ }
+
+ gf = self.build_cost_graph(cost)?;
+ cycles = gf.list_negative_cycles(path_length);
+ }
+ Ok(())
+ }
+
+ /// Construct the weighted graph G_f from the flow and the cost function
+ fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> {
+ let mut g = Graph::<WeightedEdge>::new(&self.id_to_vertex);
+ let nb_vertices = self.id_to_vertex.len();
+ for i in 0..nb_vertices {
+ for edge in self.graph[i].iter() {
+ if edge.cap as i64 - edge.flow > 0 {
+ // It is possible to send overflow through this edge
+ let u = self.id_to_vertex[i];
+ let v = self.id_to_vertex[edge.dest];
+ if cost.contains_key(&(u, v)) {
+ g.add_edge(u, v, cost[&(u, v)])?;
+ } else if cost.contains_key(&(v, u)) {
+ g.add_edge(u, v, -cost[&(v, u)])?;
+ } else {
+ g.add_edge(u, v, 0)?;
+ }
+ }
+ }
+ }
+ Ok(g)
+ }
+}
+
+impl Graph<WeightedEdge> {
+ /// This function adds a single directed weighted edge to the graph.
+ pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> {
+ let idu = self.get_vertex_id(&u)?;
+ let idv = self.get_vertex_id(&v)?;
+ self.graph[idu].push(WeightedEdge { w, dest: idv });
+ Ok(())
+ }
+
+ /// This function lists the negative cycles it manages to find after path_length
+ /// iterations of the main loop of the Bellman-Ford algorithm. For the classical
+ /// algorithm, path_length needs to be equal to the number of vertices. However,
+ /// for particular graph structures like in our case, the algorithm is still correct
+ /// when path_length is the length of the longest possible simple path.
+ /// See the formal description of the algorithm for more details.
+ fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> {
+ let nb_vertices = self.graph.len();
+
+ // We start with every vertex at distance 0 of some imaginary extra -1 vertex.
+ let mut distance = vec![0; nb_vertices];
+ // The prev vector collects for every vertex from where does the shortest path come
+ let mut prev = vec![None; nb_vertices];
+
+ for _ in 0..path_length + 1 {
+ for id in 0..nb_vertices {
+ for e in self.graph[id].iter() {
+ if distance[id] + e.w < distance[e.dest] {
+ distance[e.dest] = distance[id] + e.w;
+ prev[e.dest] = Some(id);
+ }
+ }
+ }
+ }
+
+ // If self.graph contains a negative cycle, then at this point the graph described
+ // by prev (which is a directed 1-forest/functional graph)
+ // must contain a cycle. We list the cycles of prev.
+ let cycles_prev = cycles_of_1_forest(&prev);
+
+ // Remark that the cycle in prev is in the reverse order compared to the cycle
+ // in the graph. Thus the .rev().
+ return cycles_prev
+ .iter()
+ .map(|cycle| {
+ cycle
+ .iter()
+ .rev()
+ .map(|id| self.id_to_vertex[*id])
+ .collect()
+ })
+ .collect();
+ }
+}
+
+/// This function returns the list of cycles of a directed 1 forest. It does not
+/// check for the consistency of the input.
+fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> {
+ let mut cycles = Vec::<Vec<usize>>::new();
+ let mut time_of_discovery = vec![None; forest.len()];
+
+ for t in 0..forest.len() {
+ let mut id = t;
+ // while we are on a valid undiscovered node
+ while time_of_discovery[id].is_none() {
+ time_of_discovery[id] = Some(t);
+ if let Some(i) = forest[id] {
+ id = i;
+ } else {
+ break;
+ }
+ }
+ if forest[id].is_some() && time_of_discovery[id] == Some(t) {
+ // We discovered an id that we explored at this iteration t.
+ // It means we are on a cycle
+ let mut cy = vec![id; 1];
+ let mut id2 = id;
+ while let Some(id_next) = forest[id2] {
+ id2 = id_next;
+ if id2 != id {
+ cy.push(id2);
+ } else {
+ break;
+ }
+ }
+ cycles.push(cy);
+ }
+ }
+ cycles
+}
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index e59c9e9c..9ae28887 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -1,5 +1,3 @@
-use std::cmp::Ordering;
-
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
use garage_util::encode::nonversioned_encode;
@@ -12,14 +10,15 @@ impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self {
let version = LayoutVersion::new(replication_factor);
- let staging_parameters = Lww::<LayoutParameters>::new(version.parameters);
- let empty_lwwmap = LwwMap::new();
+ let staging = LayoutStaging {
+ parameters: Lww::<LayoutParameters>::new(version.parameters),
+ roles: LwwMap::new(),
+ };
let mut ret = LayoutHistory {
versions: vec![version].into_boxed_slice().into(),
update_trackers: Default::default(),
- staging_parameters,
- staging_roles: empty_lwwmap,
+ staging: Lww::raw(0, staging),
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
@@ -31,8 +30,7 @@ impl LayoutHistory {
}
pub(crate) fn calculate_staging_hash(&self) -> Hash {
- let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
- blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
+ blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
// ================== updates to layout, public interface ===================
@@ -41,26 +39,10 @@ impl LayoutHistory {
let mut changed = false;
// Merge staged layout changes
- match other.current().version.cmp(&self.current().version) {
- Ordering::Greater => {
- self.staging_parameters = other.staging_parameters.clone();
- self.staging_roles = other.staging_roles.clone();
- self.staging_hash = other.staging_hash;
- changed = true;
- }
- Ordering::Equal => {
- self.staging_parameters.merge(&other.staging_parameters);
- self.staging_roles.merge(&other.staging_roles);
-
- let new_staging_hash = self.calculate_staging_hash();
- if new_staging_hash != self.staging_hash {
- changed = true;
- }
-
- self.staging_hash = new_staging_hash;
- }
- Ordering::Less => (),
+ if self.staging != other.staging {
+ changed = true;
}
+ self.staging.merge(&other.staging);
// Add any new versions to history
for v2 in other.versions.iter() {
@@ -102,50 +84,34 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
+ // Compute new version and add it to history
let mut new_version = self.current().clone();
new_version.version += 1;
- new_version.roles.merge(&self.staging_roles);
+ new_version.roles.merge(&self.staging.get().roles);
new_version.roles.retain(|(_, _, v)| v.0.is_some());
- new_version.parameters = *self.staging_parameters.get();
-
- self.staging_roles.clear();
- self.staging_hash = self.calculate_staging_hash();
+ new_version.parameters = *self.staging.get().parameters.get();
let msg = new_version.calculate_partition_assignment()?;
-
self.versions.push(new_version);
+ // Reset the staged layout changes
+ self.staging.update(LayoutStaging {
+ parameters: self.staging.get().parameters.clone(),
+ roles: LwwMap::new(),
+ });
+ self.staging_hash = self.calculate_staging_hash();
+
Ok((self, msg))
}
- pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
- match version {
- None => {
- let error = r#"
-Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
-To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
- "#;
- return Err(Error::Message(error.into()));
- }
- Some(v) => {
- if v != self.current().version + 1 {
- return Err(Error::Message("Invalid new layout version".into()));
- }
- }
- }
-
- self.staging_roles.clear();
- self.staging_parameters.update(self.current().parameters);
+ pub fn revert_staged_changes(mut self) -> Result<Self, Error> {
+ self.staging.update(LayoutStaging {
+ parameters: Lww::new(self.current().parameters.clone()),
+ roles: LwwMap::new(),
+ });
self.staging_hash = self.calculate_staging_hash();
- // TODO this is stupid, we should have a separate version counter/LWW
- // for the staging params
- let mut new_version = self.current().clone();
- new_version.version += 1;
-
- self.versions.push(new_version);
-
Ok(self)
}
diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs
index 122d4b65..7c15988a 100644
--- a/src/rpc/layout/mod.rs
+++ b/src/rpc/layout/mod.rs
@@ -1,8 +1,10 @@
+mod graph_algo;
mod history;
mod schema;
-mod tracker;
mod version;
+// ---- re-exports ----
+
pub use history::*;
pub use schema::*;
pub use version::*;
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
index 14e797be..c5b9b1d3 100644
--- a/src/rpc/layout/schema.rs
+++ b/src/rpc/layout/schema.rs
@@ -1,3 +1,9 @@
+use std::fmt;
+
+use bytesize::ByteSize;
+
+use garage_util::crdt::{AutoCrdt, Crdt};
+
mod v08 {
use crate::layout::CompactNodeType;
use garage_util::crdt::LwwMap;
@@ -210,6 +216,15 @@ mod v010 {
pub ring_assignment_data: Vec<CompactNodeType>,
}
+ /// The staged changes for the next layout version
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+ pub struct LayoutStaging {
+ /// Parameters to be used in the next partition assignment computation.
+ pub parameters: Lww<LayoutParameters>,
+ /// Role changes which are staged for the next version of the layout
+ pub roles: LwwMap<Uuid, NodeRoleV>,
+ }
+
/// The history of cluster layouts
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LayoutHistory {
@@ -219,10 +234,8 @@ mod v010 {
/// Update trackers
pub update_trackers: UpdateTrackers,
- /// Parameters to be used in the next partition assignment computation.
- pub staging_parameters: Lww<LayoutParameters>,
- /// Role changes which are staged for the next version of the layout
- pub staging_roles: LwwMap<Uuid, NodeRoleV>,
+ /// Staged changes for the next version
+ pub staging: Lww<LayoutStaging>,
/// Hash of the serialized staging_parameters + staging_roles
pub staging_hash: Hash,
}
@@ -265,6 +278,10 @@ mod v010 {
.map(|x| (*x, version.version))
.collect::<HashMap<Uuid, u64>>(),
);
+ let staging = LayoutStaging {
+ parameters: previous.staging_parameters,
+ roles: previous.staging_roles,
+ };
let mut ret = Self {
versions: vec![version],
update_trackers: UpdateTrackers {
@@ -272,8 +289,7 @@ mod v010 {
sync_map: update_tracker.clone(),
sync_ack_map: update_tracker.clone(),
},
- staging_parameters: previous.staging_parameters,
- staging_roles: previous.staging_roles,
+ staging: Lww::raw(previous.version, staging),
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
@@ -283,3 +299,81 @@ mod v010 {
}
pub use v010::*;
+
+// ---- utility functions ----
+
+impl AutoCrdt for LayoutParameters {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCrdt for NodeRoleV {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl Crdt for LayoutStaging {
+ fn merge(&mut self, other: &LayoutStaging) {
+ self.parameters.merge(&other.parameters);
+ self.roles.merge(&other.roles);
+ }
+}
+
+impl NodeRole {
+ pub fn capacity_string(&self) -> String {
+ match self.capacity {
+ Some(c) => ByteSize::b(c).to_string_as(false),
+ None => "gateway".to_string(),
+ }
+ }
+
+ pub fn tags_string(&self) -> String {
+ self.tags.join(",")
+ }
+}
+
+impl fmt::Display for ZoneRedundancy {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ ZoneRedundancy::Maximum => write!(f, "maximum"),
+ ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
+ }
+ }
+}
+
+impl core::str::FromStr for ZoneRedundancy {
+ type Err = &'static str;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
+ x => {
+ let v = x
+ .parse::<usize>()
+ .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
+ Ok(ZoneRedundancy::AtLeast(v))
+ }
+ }
+ }
+}
+
+impl UpdateTracker {
+ fn merge(&mut self, other: &UpdateTracker) {
+ for (k, v) in other.0.iter() {
+ if let Some(v_mut) = self.0.get_mut(k) {
+ *v_mut = std::cmp::max(*v_mut, *v);
+ } else {
+ self.0.insert(*k, *v);
+ }
+ }
+ }
+
+ pub(crate) fn min(&self) -> u64 {
+ self.0.iter().map(|(_, v)| *v).min().unwrap_or(0)
+ }
+}
+
+impl UpdateTrackers {
+ pub(crate) fn merge(&mut self, other: &UpdateTrackers) {
+ self.ack_map.merge(&other.ack_map);
+ self.sync_map.merge(&other.sync_map);
+ self.sync_ack_map.merge(&other.sync_ack_map);
+ }
+}
diff --git a/src/rpc/layout/tracker.rs b/src/rpc/layout/tracker.rs
deleted file mode 100644
index 778121e4..00000000
--- a/src/rpc/layout/tracker.rs
+++ /dev/null
@@ -1,21 +0,0 @@
-use super::*;
-
-impl UpdateTracker {
- fn merge(&mut self, other: &UpdateTracker) {
- for (k, v) in other.0.iter() {
- if let Some(v_mut) = self.0.get_mut(k) {
- *v_mut = std::cmp::max(*v_mut, *v);
- } else {
- self.0.insert(*k, *v);
- }
- }
- }
-}
-
-impl UpdateTrackers {
- pub(crate) fn merge(&mut self, other: &UpdateTrackers) {
- self.ack_map.merge(&other.ack_map);
- self.sync_map.merge(&other.sync_map);
- self.sync_ack_map.merge(&other.sync_ack_map);
- }
-}
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index 363bc204..6918fdf9 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -1,69 +1,21 @@
use std::collections::HashMap;
use std::collections::HashSet;
-use std::fmt;
+use std::convert::TryInto;
use bytesize::ByteSize;
use itertools::Itertools;
-use garage_util::crdt::{AutoCrdt, LwwMap};
+use garage_util::crdt::LwwMap;
use garage_util::data::*;
use garage_util::error::*;
-use crate::graph_algo::*;
-
-use std::convert::TryInto;
-
+use super::graph_algo::*;
use super::schema::*;
use super::*;
// The Message type will be used to collect information on the algorithm.
pub type Message = Vec<String>;
-impl AutoCrdt for LayoutParameters {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl AutoCrdt for NodeRoleV {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl NodeRole {
- pub fn capacity_string(&self) -> String {
- match self.capacity {
- Some(c) => ByteSize::b(c).to_string_as(false),
- None => "gateway".to_string(),
- }
- }
-
- pub fn tags_string(&self) -> String {
- self.tags.join(",")
- }
-}
-
-impl fmt::Display for ZoneRedundancy {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- match self {
- ZoneRedundancy::Maximum => write!(f, "maximum"),
- ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
- }
- }
-}
-
-impl core::str::FromStr for ZoneRedundancy {
- type Err = &'static str;
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s {
- "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
- x => {
- let v = x
- .parse::<usize>()
- .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
- Ok(ZoneRedundancy::AtLeast(v))
- }
- }
- }
-}
-
impl LayoutVersion {
pub fn new(replication_factor: usize) -> Self {
// We set the default zone redundancy to be Maximum, meaning that the maximum