diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/garage/admin/mod.rs | 17 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 6 | ||||
-rw-r--r-- | src/model/helper/bucket.rs | 8 | ||||
-rw-r--r-- | src/rpc/layout/history.rs | 15 | ||||
-rw-r--r-- | src/rpc/layout/version.rs | 17 | ||||
-rw-r--r-- | src/rpc/system.rs | 2 | ||||
-rw-r--r-- | src/table/replication/fullcopy.rs | 6 |
7 files changed, 44 insertions, 27 deletions
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index e3ba6d35..77918a0f 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -126,8 +126,8 @@ impl AdminRpcHandler { opt_to_send.all_nodes = false; let mut failures = vec![]; - let layout = self.garage.system.cluster_layout().clone(); - for node in layout.current().node_ids().iter() { + let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); + for node in all_nodes.iter() { let node = (*node).into(); let resp = self .endpoint @@ -163,9 +163,9 @@ impl AdminRpcHandler { async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> { if opt.all_nodes { let mut ret = String::new(); - let layout = self.garage.system.cluster_layout().clone(); + let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in layout.current().node_ids().iter() { + for node in all_nodes.iter() { let mut opt = opt.clone(); opt.all_nodes = false; opt.skip_global = true; @@ -275,6 +275,7 @@ impl AdminRpcHandler { let mut ret = String::new(); // Gather storage node and free space statistics + // TODO: not only layout.current() ??? let layout = &self.garage.system.cluster_layout(); let mut node_partition_count = HashMap::<Uuid, u64>::new(); for short_id in layout.current().ring_assignment_data.iter() { @@ -440,8 +441,8 @@ impl AdminRpcHandler { ) -> Result<AdminRpc, Error> { if all_nodes { let mut ret = vec![]; - let layout = self.garage.system.cluster_layout().clone(); - for node in layout.current().node_ids().iter() { + let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); + for node in all_nodes.iter() { let node = (*node).into(); match self .endpoint @@ -488,8 +489,8 @@ impl AdminRpcHandler { ) -> Result<AdminRpc, Error> { if all_nodes { let mut ret = vec![]; - let layout = self.garage.system.cluster_layout().clone(); - for node in layout.current().node_ids().iter() { + let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); + for node in all_nodes.iter() { let node = (*node).into(); match self .endpoint diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 15727448..0f01a37a 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -49,6 +49,7 @@ pub async fn cmd_assign_role( }; let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + let all_nodes = layout.all_nodes().into_owned(); let added_nodes = args .node_ids @@ -58,7 +59,7 @@ pub async fn cmd_assign_role( status .iter() .map(|adv| adv.id) - .chain(layout.current().node_ids().iter().cloned()), + .chain(all_nodes.iter().cloned()), node_id, ) }) @@ -68,8 +69,7 @@ pub async fn cmd_assign_role( roles.merge(&layout.staging.get().roles); for replaced in args.replace.iter() { - let replaced_node = - find_matching_node(layout.current().node_ids().iter().cloned(), replaced)?; + let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?; match roles.get(&replaced_node) { Some(NodeRoleV(Some(_))) => { layout diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 2a9c0fb1..2cb53424 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -450,8 +450,12 @@ impl<'a> BucketHelper<'a> { #[cfg(feature = "k2v")] { - // TODO: not only current - let node_id_vec = self.0.system.cluster_layout().current().node_ids().to_vec(); + let node_id_vec = self + .0 + .system + .cluster_layout() + .all_nongateway_nodes() + .into_owned(); let k2vindexes = self .0 .k2v diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 877ad3a7..69348873 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -60,6 +60,21 @@ impl LayoutHistory { (self.current().version, self.all_ack(), self.min_stored()) } + pub fn all_nodes(&self) -> Cow<'_, [Uuid]> { + // TODO: cache this + if self.versions.len() == 1 { + self.versions[0].all_nodes().into() + } else { + let set = self + .versions + .iter() + .map(|x| x.all_nodes()) + .flatten() + .collect::<HashSet<_>>(); + set.into_iter().copied().collect::<Vec<_>>().into() + } + } + pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { // TODO: cache this if self.versions.len() == 1 { diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index a7f387b6..2cbdcee2 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -38,22 +38,19 @@ impl LayoutVersion { // ===================== accessors ====================== - /// Returns a list of IDs of nodes that currently have - /// a role in the cluster - pub fn node_ids(&self) -> &[Uuid] { + /// Returns a list of IDs of nodes that have a role in this + /// version of the cluster layout, including gateway nodes + pub fn all_nodes(&self) -> &[Uuid] { &self.node_id_vec[..] } - /// Returns the uuids of the non_gateway nodes in self.node_id_vec. + /// Returns a list of IDs of nodes that have a storage capacity + /// assigned in this version of the cluster layout pub fn nongateway_nodes(&self) -> &[Uuid] { &self.node_id_vec[..self.nongateway_node_count] } - pub fn num_nodes(&self) -> usize { - self.node_id_vec.len() - } - - /// Returns the role of a node in the layout + /// Returns the role of a node in the layout, if it has one pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> { match self.roles.get(node) { Some(NodeRoleV(Some(v))) => Some(v), @@ -61,7 +58,7 @@ impl LayoutVersion { } } - /// Given a node uuids, this function returns its capacity or fails if it does not have any + /// Returns the capacity of a node in the layout, if it has one pub fn get_node_capacity(&self, uuid: &Uuid) -> Option<u64> { match self.node_role(uuid) { Some(NodeRole { diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 3418600b..ab3c96b8 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -609,7 +609,7 @@ impl System { while !*stop_signal.borrow() { let not_configured = self.cluster_layout().check().is_err(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; - let expected_n_nodes = self.cluster_layout().current().num_nodes(); + let expected_n_nodes = self.cluster_layout().all_nodes().len(); let bad_peers = self .fullmesh .get_peer_list() diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 5653a229..beaacc2b 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -35,10 +35,10 @@ impl TableReplication for TableFullReplication { } fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> { - self.system.cluster_layout().current().node_ids().to_vec() + self.system.cluster_layout().current().all_nodes().to_vec() } fn write_quorum(&self) -> usize { - let nmembers = self.system.cluster_layout().current().node_ids().len(); + let nmembers = self.system.cluster_layout().current().all_nodes().len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { @@ -62,7 +62,7 @@ impl TableReplication for TableFullReplication { partition: 0u16, first_hash: [0u8; 32].into(), last_hash: [0xff; 32].into(), - storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()), + storage_nodes: Vec::from_iter(layout.current().all_nodes().to_vec()), }], } } |