aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-08 13:11:13 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-08 13:13:04 +0100
commit0962313ebd45abb116d6ad2ee0eb754f587fc299 (patch)
tree37b6c0fcd1e2df4d687caad8100b0dc74f1fa642 /src/rpc
parentf4d3905d157869d98f9855cba77b4ba452012703 (diff)
downloadgarage-0962313ebd45abb116d6ad2ee0eb754f587fc299.tar.gz
garage-0962313ebd45abb116d6ad2ee0eb754f587fc299.zip
garage_rpc: reorder functions in layout.rs
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/layout.rs223
1 files changed, 113 insertions, 110 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index e02a180b..368a9d2c 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -278,86 +278,7 @@ impl ClusterLayout {
ret
}
- fn calculate_staging_hash(&self) -> Hash {
- let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
- blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
- }
-
- pub fn merge(&mut self, other: &ClusterLayout) -> bool {
- match other.version.cmp(&self.version) {
- Ordering::Greater => {
- *self = other.clone();
- true
- }
- Ordering::Equal => {
- self.staging_parameters.merge(&other.staging_parameters);
- self.staging_roles.merge(&other.staging_roles);
-
- let new_staging_hash = self.calculate_staging_hash();
- let changed = new_staging_hash != self.staging_hash;
-
- self.staging_hash = new_staging_hash;
-
- changed
- }
- Ordering::Less => false,
- }
- }
-
- pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
- match version {
- None => {
- let error = r#"
-Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
-To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
- "#;
- return Err(Error::Message(error.into()));
- }
- Some(v) => {
- if v != self.version + 1 {
- return Err(Error::Message("Invalid new layout version".into()));
- }
- }
- }
-
- self.roles.merge(&self.staging_roles);
- self.roles.retain(|(_, _, v)| v.0.is_some());
- self.parameters = *self.staging_parameters.get();
-
- self.staging_roles.clear();
- self.staging_hash = self.calculate_staging_hash();
-
- let msg = self.calculate_partition_assignment()?;
-
- self.version += 1;
-
- Ok((self, msg))
- }
-
- pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
- match version {
- None => {
- let error = r#"
-Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
-To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
- "#;
- return Err(Error::Message(error.into()));
- }
- Some(v) => {
- if v != self.version + 1 {
- return Err(Error::Message("Invalid new layout version".into()));
- }
- }
- }
-
- self.staging_roles.clear();
- self.staging_parameters.update(self.parameters);
- self.staging_hash = self.calculate_staging_hash();
-
- self.version += 1;
-
- Ok(self)
- }
+ // ===================== accessors ======================
/// Returns a list of IDs of nodes that currently have
/// a role in the cluster
@@ -377,28 +298,6 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- /// Returns the uuids of the non_gateway nodes in self.node_id_vec.
- fn nongateway_nodes(&self) -> Vec<Uuid> {
- let mut result = Vec::<Uuid>::new();
- for uuid in self.node_id_vec.iter() {
- match self.node_role(uuid) {
- Some(role) if role.capacity.is_some() => result.push(*uuid),
- _ => (),
- }
- }
- result
- }
-
- /// Given a node uuids, this function returns the label of its zone
- fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> {
- match self.node_role(uuid) {
- Some(role) => Ok(role.zone.clone()),
- _ => Err(Error::Message(
- "The Uuid does not correspond to a node present in the cluster.".into(),
- )),
- }
- }
-
/// Given a node uuids, this function returns its capacity or fails if it does not have any
pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> {
match self.node_role(uuid) {
@@ -435,6 +334,30 @@ To know the correct value of the new layout version, invoke `garage layout show`
))
}
+ // ===================== internal information extractors ======================
+
+ /// Returns the uuids of the non_gateway nodes in self.node_id_vec.
+ fn nongateway_nodes(&self) -> Vec<Uuid> {
+ let mut result = Vec::<Uuid>::new();
+ for uuid in self.node_id_vec.iter() {
+ match self.node_role(uuid) {
+ Some(role) if role.capacity.is_some() => result.push(*uuid),
+ _ => (),
+ }
+ }
+ result
+ }
+
+ /// Given a node uuids, this function returns the label of its zone
+ fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> {
+ match self.node_role(uuid) {
+ Some(role) => Ok(&role.zone),
+ _ => Err(Error::Message(
+ "The Uuid does not correspond to a node present in the cluster.".into(),
+ )),
+ }
+ }
+
/// Returns the sum of capacities of non gateway nodes in the cluster
fn get_total_capacity(&self) -> Result<u64, Error> {
let mut total_capacity = 0;
@@ -461,6 +384,89 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
+ fn calculate_staging_hash(&self) -> Hash {
+ let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
+ blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
+ }
+
+ // ================== updates to layout, public interface ===================
+
+ pub fn merge(&mut self, other: &ClusterLayout) -> bool {
+ match other.version.cmp(&self.version) {
+ Ordering::Greater => {
+ *self = other.clone();
+ true
+ }
+ Ordering::Equal => {
+ self.staging_parameters.merge(&other.staging_parameters);
+ self.staging_roles.merge(&other.staging_roles);
+
+ let new_staging_hash = self.calculate_staging_hash();
+ let changed = new_staging_hash != self.staging_hash;
+
+ self.staging_hash = new_staging_hash;
+
+ changed
+ }
+ Ordering::Less => false,
+ }
+ }
+
+ pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.roles.merge(&self.staging_roles);
+ self.roles.retain(|(_, _, v)| v.0.is_some());
+ self.parameters = *self.staging_parameters.get();
+
+ self.staging_roles.clear();
+ self.staging_hash = self.calculate_staging_hash();
+
+ let msg = self.calculate_partition_assignment()?;
+
+ self.version += 1;
+
+ Ok((self, msg))
+ }
+
+ pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ match version {
+ None => {
+ let error = r#"
+Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
+To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
+ "#;
+ return Err(Error::Message(error.into()));
+ }
+ Some(v) => {
+ if v != self.version + 1 {
+ return Err(Error::Message("Invalid new layout version".into()));
+ }
+ }
+ }
+
+ self.staging_roles.clear();
+ self.staging_parameters.update(self.parameters);
+ self.staging_hash = self.calculate_staging_hash();
+
+ self.version += 1;
+
+ Ok(self)
+ }
+
/// Check a cluster layout for internal consistency
/// (assignment, roles, parameters, partition size)
/// returns true if consistent, false if error
@@ -574,12 +580,9 @@ To know the correct value of the new layout version, invoke `garage layout show`
Ok(())
}
-}
-// ====================================================================================
+ // ================== updates to layout, internals ===================
-// Implementation of the ClusterLayout methods related to the assignment algorithm.
-impl ClusterLayout {
/// This function calculates a new partition-to-node assignment.
/// The computed assignment respects the node replication factor
/// and the zone redundancy parameter It maximizes the capacity of a
@@ -867,7 +870,7 @@ impl ClusterLayout {
}
for n in 0..self.nongateway_nodes().len() {
let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
- let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?];
+ let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?];
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
for p in 0..NB_PARTITIONS {
if !exclude_assoc.contains(&(p, n)) {
@@ -913,7 +916,7 @@ impl ClusterLayout {
// The algorithm is such that it will start with the flow that we just computed
// and find ameliorating paths from that.
for (p, n) in exclude_edge.iter() {
- let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
+ let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?];
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
}
g.compute_maximal_flow()?;
@@ -933,7 +936,7 @@ impl ClusterLayout {
let mut cost = CostFunction::new();
for (p, assoc_p) in prev_assign.iter().enumerate() {
for n in assoc_p.iter() {
- let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
+ let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?];
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
}
}
@@ -1035,7 +1038,7 @@ impl ClusterLayout {
let mut old_zones_of_p = Vec::<usize>::new();
for n in prev_assign[p].iter() {
old_zones_of_p
- .push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]);
+ .push(zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]);
}
if !old_zones_of_p.contains(&z) {
new_partitions_zone[z] += 1;