diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-14 12:48:38 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-14 12:48:38 +0100 |
commit | 8e292e06b3fde1d3b5b019a26eabd4f0d9ac22c3 (patch) | |
tree | c4d252eaf122d9dd583cb755dffeb7d776ae4b66 | |
parent | 9a491fa1372a23e91c793ee1d2b313607752826a (diff) | |
download | garage-8e292e06b3fde1d3b5b019a26eabd4f0d9ac22c3.tar.gz garage-8e292e06b3fde1d3b5b019a26eabd4f0d9ac22c3.zip |
layout: some refactoring of nongateway nodes
-rw-r--r-- | src/api/k2v/index.rs | 7 | ||||
-rw-r--r-- | src/model/index_counter.rs | 4 | ||||
-rw-r--r-- | src/rpc/layout/history.rs | 30 | ||||
-rw-r--r-- | src/rpc/layout/schema.rs | 17 | ||||
-rw-r--r-- | src/rpc/layout/version.rs | 98 |
5 files changed, 95 insertions, 61 deletions
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 3c2f51a9..c189232a 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -25,8 +25,11 @@ pub async fn handle_read_index( ) -> Result<Response<Body>, Error> { let reverse = reverse.unwrap_or(false); - // TODO: not only current - let node_id_vec = garage.system.cluster_layout().current().node_ids().to_vec(); + let node_id_vec = garage + .system + .cluster_layout() + .all_nongateway_nodes() + .into_owned(); let (partition_keys, more, next_start) = read_range( &garage.k2v.counter_table.table, diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 9637cc4c..2d968733 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -84,8 +84,8 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { impl<T: CountedItem> CounterEntry<T> { pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap<String, i64> { - let nodes = &layout.current().node_id_vec[..]; - self.filtered_values_with_nodes(nodes) + let nodes = layout.all_nongateway_nodes(); + self.filtered_values_with_nodes(&nodes) } pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> { diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 050f5d0a..877ad3a7 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::HashSet; use garage_util::crdt::{Crdt, Lww, LwwMap}; @@ -59,13 +60,19 @@ impl LayoutHistory { (self.current().version, self.all_ack(), self.min_stored()) } - pub fn all_nongateway_nodes(&self) -> HashSet<Uuid> { + pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { // TODO: cache this - self.versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::<HashSet<_>>() + if self.versions.len() == 1 { + self.versions[0].nongateway_nodes().into() + } else { + let set = self + .versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::<HashSet<_>>(); + set.into_iter().copied().collect::<Vec<_>>().into() + } } pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { @@ -202,14 +209,11 @@ To know the correct value of the new layout version, invoke `garage layout show` } // Compute new version and add it to history - let mut new_version = self.current().clone(); - new_version.version += 1; - - new_version.roles.merge(&self.staging.get().roles); - new_version.roles.retain(|(_, _, v)| v.0.is_some()); - new_version.parameters = *self.staging.get().parameters.get(); + let (new_version, msg) = self + .current() + .clone() + .calculate_next_version(&self.staging.get())?; - let msg = new_version.calculate_partition_assignment()?; self.versions.push(new_version); if self.current().check().is_ok() { while self.versions.first().unwrap().check().is_err() { diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 89f5c361..db298ee6 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -212,6 +212,8 @@ mod v010 { /// see comment in v08::ClusterLayout pub node_id_vec: Vec<Uuid>, + /// number of non-gateway nodes, which are the first ids in node_id_vec + pub nongateway_node_count: usize, /// see comment in v08::ClusterLayout #[serde(with = "serde_bytes")] pub ring_assignment_data: Vec<CompactNodeType>, @@ -265,6 +267,18 @@ mod v010 { type Previous = v09::ClusterLayout; fn migrate(previous: Self::Previous) -> Self { + let nongateway_node_count = previous + .node_id_vec + .iter() + .enumerate() + .filter(|(_, uuid)| { + let role = previous.roles.get(uuid); + matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) + }) + .map(|(i, _)| i) + .max() + .unwrap_or(0); + let version = LayoutVersion { version: previous.version, replication_factor: previous.replication_factor, @@ -272,11 +286,14 @@ mod v010 { parameters: previous.parameters, roles: previous.roles, node_id_vec: previous.node_id_vec, + nongateway_node_count, ring_assignment_data: previous.ring_assignment_data, }; let update_tracker = UpdateTracker( version .nongateway_nodes() + .iter() + .copied() .map(|x| (x, version.version)) .collect::<BTreeMap<Uuid, u64>>(), ); diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index ffbdf277..a7f387b6 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -5,7 +5,7 @@ use std::convert::TryInto; use bytesize::ByteSize; use itertools::Itertools; -use garage_util::crdt::LwwMap; +use garage_util::crdt::{Crdt, LwwMap}; use garage_util::data::*; use garage_util::error::*; @@ -30,6 +30,7 @@ impl LayoutVersion { partition_size: 0, roles: LwwMap::new(), node_id_vec: Vec::new(), + nongateway_node_count: 0, ring_assignment_data: Vec::new(), parameters, } @@ -43,6 +44,11 @@ impl LayoutVersion { &self.node_id_vec[..] } + /// Returns the uuids of the non_gateway nodes in self.node_id_vec. + pub fn nongateway_nodes(&self) -> &[Uuid] { + &self.node_id_vec[..self.nongateway_node_count] + } + pub fn num_nodes(&self) -> usize { self.node_id_vec.len() } @@ -56,18 +62,14 @@ impl LayoutVersion { } /// Given a node uuids, this function returns its capacity or fails if it does not have any - pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> { + pub fn get_node_capacity(&self, uuid: &Uuid) -> Option<u64> { match self.node_role(uuid) { Some(NodeRole { capacity: Some(cap), zone: _, tags: _, - }) => Ok(*cap), - _ => Err(Error::Message( - "The Uuid does not correspond to a node present in the \ - cluster or this node does not have a positive capacity." - .into(), - )), + }) => Some(*cap), + _ => None, } } @@ -131,17 +133,6 @@ impl LayoutVersion { // ===================== internal information extractors ====================== - /// Returns the uuids of the non_gateway nodes in self.node_id_vec. - pub(crate) fn nongateway_nodes(&self) -> impl Iterator<Item = Uuid> + '_ { - self.node_id_vec - .iter() - .copied() - .filter(move |uuid| match self.node_role(uuid) { - Some(role) if role.capacity.is_some() => true, - _ => false, - }) - } - /// Given a node uuids, this function returns the label of its zone pub(crate) fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> { match self.node_role(uuid) { @@ -152,11 +143,16 @@ impl LayoutVersion { } } + fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 { + self.get_node_capacity(&uuid) + .expect("non-gateway node with zero capacity") + } + /// Returns the sum of capacities of non gateway nodes in the cluster fn get_total_capacity(&self) -> Result<u64, Error> { let mut total_capacity = 0; for uuid in self.nongateway_nodes() { - total_capacity += self.get_node_capacity(&uuid)?; + total_capacity += self.expect_get_node_capacity(&uuid); } Ok(total_capacity) } @@ -257,7 +253,7 @@ impl LayoutVersion { if *usage > 0 { let uuid = self.node_id_vec[n]; let partusage = usage * self.partition_size; - let nodecap = self.get_node_capacity(&uuid).unwrap(); + let nodecap = self.expect_get_node_capacity(&uuid); if partusage > nodecap { return Err(format!( "node usage ({}) is bigger than node capacity ({})", @@ -288,6 +284,21 @@ impl LayoutVersion { // ================== updates to layout, internals =================== + pub(crate) fn calculate_next_version( + mut self, + staging: &LayoutStaging, + ) -> Result<(Self, Message), Error> { + self.version += 1; + + self.roles.merge(&staging.roles); + self.roles.retain(|(_, _, v)| v.0.is_some()); + self.parameters = *staging.parameters.get(); + + let msg = self.calculate_partition_assignment()?; + + Ok((self, msg)) + } + /// This function calculates a new partition-to-node assignment. /// The computed assignment respects the node replication factor /// and the zone redundancy parameter It maximizes the capacity of a @@ -297,7 +308,7 @@ impl LayoutVersion { /// data to be moved. /// Staged role changes must be merged with nodes roles before calling this function, /// hence it must only be called from apply_staged_changes() and hence is not public. - pub(crate) fn calculate_partition_assignment(&mut self) -> Result<Message, Error> { + fn calculate_partition_assignment(&mut self) -> Result<Message, Error> { // We update the node ids, since the node role list might have changed with the // changes in the layout. We retrieve the old_assignment reframed with new ids let old_assignment_opt = self.update_node_id_vec()?; @@ -317,12 +328,12 @@ impl LayoutVersion { // to use them as indices in the flow graphs. let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; - let nb_nongateway_nodes = self.nongateway_nodes().count(); - if nb_nongateway_nodes < self.replication_factor { + if self.nongateway_nodes().len() < self.replication_factor { return Err(Error::Message(format!( "The number of nodes with positive \ capacity ({}) is smaller than the replication factor ({}).", - nb_nongateway_nodes, self.replication_factor + self.nongateway_nodes().len(), + self.replication_factor ))); } if id_to_zone.len() < zone_redundancy { @@ -420,12 +431,14 @@ impl LayoutVersion { .map(|(k, _, _)| *k) .collect(); - let mut new_node_id_vec = Vec::<Uuid>::new(); - new_node_id_vec.extend(new_non_gateway_nodes); - new_node_id_vec.extend(new_gateway_nodes); + let old_node_id_vec = std::mem::take(&mut self.node_id_vec); + + self.nongateway_node_count = new_non_gateway_nodes.len(); + self.node_id_vec.clear(); + self.node_id_vec.extend(new_non_gateway_nodes); + self.node_id_vec.extend(new_gateway_nodes); - let old_node_id_vec = self.node_id_vec.clone(); - self.node_id_vec = new_node_id_vec.clone(); + let new_node_id_vec = &self.node_id_vec; // (2) We retrieve the old association // We rewrite the old association with the new indices. We only consider partition @@ -464,7 +477,7 @@ impl LayoutVersion { } } - // We write the ring + // We clear the ring assignemnt data self.ring_assignment_data = Vec::<CompactNodeType>::new(); Ok(Some(old_assignment)) @@ -478,8 +491,7 @@ impl LayoutVersion { let mut id_to_zone = Vec::<String>::new(); let mut zone_to_id = HashMap::<String, usize>::new(); - let nongateway_nodes = self.nongateway_nodes().collect::<Vec<_>>(); - for uuid in nongateway_nodes.iter() { + for uuid in self.nongateway_nodes().iter() { let r = self.node_role(uuid).unwrap(); if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() { zone_to_id.insert(r.zone.clone(), id_to_zone.len()); @@ -556,10 +568,8 @@ impl LayoutVersion { exclude_assoc: &HashSet<(usize, usize)>, zone_redundancy: usize, ) -> Result<Graph<FlowEdge>, Error> { - let vertices = LayoutVersion::generate_graph_vertices( - zone_to_id.len(), - self.nongateway_nodes().count(), - ); + let vertices = + LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); let mut g = Graph::<FlowEdge>::new(&vertices); let nb_zones = zone_to_id.len(); for p in 0..NB_PARTITIONS { @@ -578,8 +588,8 @@ impl LayoutVersion { )?; } } - for n in 0..self.nongateway_nodes().count() { - let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; + for n in 0..self.nongateway_nodes().len() { + let node_capacity = self.expect_get_node_capacity(&self.node_id_vec[n]); let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?]; g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; for p in 0..NB_PARTITIONS { @@ -602,7 +612,7 @@ impl LayoutVersion { // previous assignment let mut exclude_edge = HashSet::<(usize, usize)>::new(); if let Some(prev_assign) = prev_assign_opt { - let nb_nodes = self.nongateway_nodes().count(); + let nb_nodes = self.nongateway_nodes().len(); for (p, prev_assign_p) in prev_assign.iter().enumerate() { for n in 0..nb_nodes { exclude_edge.insert((p, n)); @@ -654,7 +664,7 @@ impl LayoutVersion { // We compute the maximal length of a simple path in gflow. It is used in the // Bellman-Ford algorithm in optimize_flow_with_cost to set the number // of iterations. - let nb_nodes = self.nongateway_nodes().count(); + let nb_nodes = self.nongateway_nodes().len(); let path_length = 4 * nb_nodes; gflow.optimize_flow_with_cost(&cost, path_length)?; @@ -732,7 +742,7 @@ impl LayoutVersion { } // We define and fill in the following tables - let storing_nodes = self.nongateway_nodes().collect::<Vec<_>>(); + let storing_nodes = self.nongateway_nodes(); let mut new_partitions = vec![0; storing_nodes.len()]; let mut stored_partitions = vec![0; storing_nodes.len()]; @@ -804,13 +814,13 @@ impl LayoutVersion { let available_cap_z: u64 = self.partition_size * replicated_partitions as u64; let mut total_cap_z = 0; for n in nodes_of_z.iter() { - total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; + total_cap_z += self.expect_get_node_capacity(&self.node_id_vec[*n]); } let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); for n in nodes_of_z.iter() { let available_cap_n = stored_partitions[*n] as u64 * self.partition_size; - let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; + let total_cap_n = self.expect_get_node_capacity(&self.node_id_vec[*n]); let tags_n = (self.node_role(&self.node_id_vec[*n]).ok_or("<??>"))?.tags_string(); table.push(format!( " {:?}\t{}\t{} ({} new)\t{}\t{} ({:.1}%)", |