diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-08 13:11:13 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-08 13:13:04 +0100 |
commit | 0962313ebd45abb116d6ad2ee0eb754f587fc299 (patch) | |
tree | 37b6c0fcd1e2df4d687caad8100b0dc74f1fa642 /src | |
parent | f4d3905d157869d98f9855cba77b4ba452012703 (diff) | |
download | garage-0962313ebd45abb116d6ad2ee0eb754f587fc299.tar.gz garage-0962313ebd45abb116d6ad2ee0eb754f587fc299.zip |
garage_rpc: reorder functions in layout.rs
Diffstat (limited to 'src')
-rw-r--r-- | src/rpc/layout.rs | 223 |
1 files changed, 113 insertions, 110 deletions
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index e02a180b..368a9d2c 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -278,86 +278,7 @@ impl ClusterLayout { ret } - fn calculate_staging_hash(&self) -> Hash { - let hashed_tuple = (&self.staging_roles, &self.staging_parameters); - blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) - } - - pub fn merge(&mut self, other: &ClusterLayout) -> bool { - match other.version.cmp(&self.version) { - Ordering::Greater => { - *self = other.clone(); - true - } - Ordering::Equal => { - self.staging_parameters.merge(&other.staging_parameters); - self.staging_roles.merge(&other.staging_roles); - - let new_staging_hash = self.calculate_staging_hash(); - let changed = new_staging_hash != self.staging_hash; - - self.staging_hash = new_staging_hash; - - changed - } - Ordering::Less => false, - } - } - - pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } - } - - self.roles.merge(&self.staging_roles); - self.roles.retain(|(_, _, v)| v.0.is_some()); - self.parameters = *self.staging_parameters.get(); - - self.staging_roles.clear(); - self.staging_hash = self.calculate_staging_hash(); - - let msg = self.calculate_partition_assignment()?; - - self.version += 1; - - Ok((self, msg)) - } - - pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } - } - - self.staging_roles.clear(); - self.staging_parameters.update(self.parameters); - self.staging_hash = self.calculate_staging_hash(); - - self.version += 1; - - Ok(self) - } + // ===================== accessors ====================== /// Returns a list of IDs of nodes that currently have /// a role in the cluster @@ -377,28 +298,6 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - /// Returns the uuids of the non_gateway nodes in self.node_id_vec. - fn nongateway_nodes(&self) -> Vec<Uuid> { - let mut result = Vec::<Uuid>::new(); - for uuid in self.node_id_vec.iter() { - match self.node_role(uuid) { - Some(role) if role.capacity.is_some() => result.push(*uuid), - _ => (), - } - } - result - } - - /// Given a node uuids, this function returns the label of its zone - fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> { - match self.node_role(uuid) { - Some(role) => Ok(role.zone.clone()), - _ => Err(Error::Message( - "The Uuid does not correspond to a node present in the cluster.".into(), - )), - } - } - /// Given a node uuids, this function returns its capacity or fails if it does not have any pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> { match self.node_role(uuid) { @@ -435,6 +334,30 @@ To know the correct value of the new layout version, invoke `garage layout show` )) } + // ===================== internal information extractors ====================== + + /// Returns the uuids of the non_gateway nodes in self.node_id_vec. + fn nongateway_nodes(&self) -> Vec<Uuid> { + let mut result = Vec::<Uuid>::new(); + for uuid in self.node_id_vec.iter() { + match self.node_role(uuid) { + Some(role) if role.capacity.is_some() => result.push(*uuid), + _ => (), + } + } + result + } + + /// Given a node uuids, this function returns the label of its zone + fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> { + match self.node_role(uuid) { + Some(role) => Ok(&role.zone), + _ => Err(Error::Message( + "The Uuid does not correspond to a node present in the cluster.".into(), + )), + } + } + /// Returns the sum of capacities of non gateway nodes in the cluster fn get_total_capacity(&self) -> Result<u64, Error> { let mut total_capacity = 0; @@ -461,6 +384,89 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + fn calculate_staging_hash(&self) -> Hash { + let hashed_tuple = (&self.staging_roles, &self.staging_parameters); + blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) + } + + // ================== updates to layout, public interface =================== + + pub fn merge(&mut self, other: &ClusterLayout) -> bool { + match other.version.cmp(&self.version) { + Ordering::Greater => { + *self = other.clone(); + true + } + Ordering::Equal => { + self.staging_parameters.merge(&other.staging_parameters); + self.staging_roles.merge(&other.staging_roles); + + let new_staging_hash = self.calculate_staging_hash(); + let changed = new_staging_hash != self.staging_hash; + + self.staging_hash = new_staging_hash; + + changed + } + Ordering::Less => false, + } + } + + pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.roles.merge(&self.staging_roles); + self.roles.retain(|(_, _, v)| v.0.is_some()); + self.parameters = *self.staging_parameters.get(); + + self.staging_roles.clear(); + self.staging_hash = self.calculate_staging_hash(); + + let msg = self.calculate_partition_assignment()?; + + self.version += 1; + + Ok((self, msg)) + } + + pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.staging_roles.clear(); + self.staging_parameters.update(self.parameters); + self.staging_hash = self.calculate_staging_hash(); + + self.version += 1; + + Ok(self) + } + /// Check a cluster layout for internal consistency /// (assignment, roles, parameters, partition size) /// returns true if consistent, false if error @@ -574,12 +580,9 @@ To know the correct value of the new layout version, invoke `garage layout show` Ok(()) } -} -// ==================================================================================== + // ================== updates to layout, internals =================== -// Implementation of the ClusterLayout methods related to the assignment algorithm. -impl ClusterLayout { /// This function calculates a new partition-to-node assignment. /// The computed assignment respects the node replication factor /// and the zone redundancy parameter It maximizes the capacity of a @@ -867,7 +870,7 @@ impl ClusterLayout { } for n in 0..self.nongateway_nodes().len() { let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; - let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?]; + let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?]; g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; for p in 0..NB_PARTITIONS { if !exclude_assoc.contains(&(p, n)) { @@ -913,7 +916,7 @@ impl ClusterLayout { // The algorithm is such that it will start with the flow that we just computed // and find ameliorating paths from that. for (p, n) in exclude_edge.iter() { - let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]; g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; } g.compute_maximal_flow()?; @@ -933,7 +936,7 @@ impl ClusterLayout { let mut cost = CostFunction::new(); for (p, assoc_p) in prev_assign.iter().enumerate() { for n in assoc_p.iter() { - let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; + let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]; cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1); } } @@ -1035,7 +1038,7 @@ impl ClusterLayout { let mut old_zones_of_p = Vec::<usize>::new(); for n in prev_assign[p].iter() { old_zones_of_p - .push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]); + .push(zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]); } if !old_zones_of_p.contains(&z) { new_partitions_zone[z] += 1; |