aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-14 13:06:16 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-14 13:12:32 +0100
commit1aab1f4e688ebc3f3adcb41c817c16c688a3291c (patch)
treecf006db1b3aeb6ebff8d976aee33469ffabe75f2 /src
parent8e292e06b3fde1d3b5b019a26eabd4f0d9ac22c3 (diff)
downloadgarage-1aab1f4e688ebc3f3adcb41c817c16c688a3291c.tar.gz
garage-1aab1f4e688ebc3f3adcb41c817c16c688a3291c.zip
layout: refactoring of all_nodes
Diffstat (limited to 'src')
-rw-r--r--src/garage/admin/mod.rs17
-rw-r--r--src/garage/cli/layout.rs6
-rw-r--r--src/model/helper/bucket.rs8
-rw-r--r--src/rpc/layout/history.rs15
-rw-r--r--src/rpc/layout/version.rs17
-rw-r--r--src/rpc/system.rs2
-rw-r--r--src/table/replication/fullcopy.rs6
7 files changed, 44 insertions, 27 deletions
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index e3ba6d35..77918a0f 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -126,8 +126,8 @@ impl AdminRpcHandler {
opt_to_send.all_nodes = false;
let mut failures = vec![];
- let layout = self.garage.system.cluster_layout().clone();
- for node in layout.current().node_ids().iter() {
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in all_nodes.iter() {
let node = (*node).into();
let resp = self
.endpoint
@@ -163,9 +163,9 @@ impl AdminRpcHandler {
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
- let layout = self.garage.system.cluster_layout().clone();
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in layout.current().node_ids().iter() {
+ for node in all_nodes.iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
opt.skip_global = true;
@@ -275,6 +275,7 @@ impl AdminRpcHandler {
let mut ret = String::new();
// Gather storage node and free space statistics
+ // TODO: not only layout.current() ???
let layout = &self.garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new();
for short_id in layout.current().ring_assignment_data.iter() {
@@ -440,8 +441,8 @@ impl AdminRpcHandler {
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
- let layout = self.garage.system.cluster_layout().clone();
- for node in layout.current().node_ids().iter() {
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
@@ -488,8 +489,8 @@ impl AdminRpcHandler {
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
- let layout = self.garage.system.cluster_layout().clone();
- for node in layout.current().node_ids().iter() {
+ let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 15727448..0f01a37a 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -49,6 +49,7 @@ pub async fn cmd_assign_role(
};
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let all_nodes = layout.all_nodes().into_owned();
let added_nodes = args
.node_ids
@@ -58,7 +59,7 @@ pub async fn cmd_assign_role(
status
.iter()
.map(|adv| adv.id)
- .chain(layout.current().node_ids().iter().cloned()),
+ .chain(all_nodes.iter().cloned()),
node_id,
)
})
@@ -68,8 +69,7 @@ pub async fn cmd_assign_role(
roles.merge(&layout.staging.get().roles);
for replaced in args.replace.iter() {
- let replaced_node =
- find_matching_node(layout.current().node_ids().iter().cloned(), replaced)?;
+ let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?;
match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => {
layout
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 2a9c0fb1..2cb53424 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -450,8 +450,12 @@ impl<'a> BucketHelper<'a> {
#[cfg(feature = "k2v")]
{
- // TODO: not only current
- let node_id_vec = self.0.system.cluster_layout().current().node_ids().to_vec();
+ let node_id_vec = self
+ .0
+ .system
+ .cluster_layout()
+ .all_nongateway_nodes()
+ .into_owned();
let k2vindexes = self
.0
.k2v
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 877ad3a7..69348873 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -60,6 +60,21 @@ impl LayoutHistory {
(self.current().version, self.all_ack(), self.min_stored())
}
+ pub fn all_nodes(&self) -> Cow<'_, [Uuid]> {
+ // TODO: cache this
+ if self.versions.len() == 1 {
+ self.versions[0].all_nodes().into()
+ } else {
+ let set = self
+ .versions
+ .iter()
+ .map(|x| x.all_nodes())
+ .flatten()
+ .collect::<HashSet<_>>();
+ set.into_iter().copied().collect::<Vec<_>>().into()
+ }
+ }
+
pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> {
// TODO: cache this
if self.versions.len() == 1 {
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index a7f387b6..2cbdcee2 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -38,22 +38,19 @@ impl LayoutVersion {
// ===================== accessors ======================
- /// Returns a list of IDs of nodes that currently have
- /// a role in the cluster
- pub fn node_ids(&self) -> &[Uuid] {
+ /// Returns a list of IDs of nodes that have a role in this
+ /// version of the cluster layout, including gateway nodes
+ pub fn all_nodes(&self) -> &[Uuid] {
&self.node_id_vec[..]
}
- /// Returns the uuids of the non_gateway nodes in self.node_id_vec.
+ /// Returns a list of IDs of nodes that have a storage capacity
+ /// assigned in this version of the cluster layout
pub fn nongateway_nodes(&self) -> &[Uuid] {
&self.node_id_vec[..self.nongateway_node_count]
}
- pub fn num_nodes(&self) -> usize {
- self.node_id_vec.len()
- }
-
- /// Returns the role of a node in the layout
+ /// Returns the role of a node in the layout, if it has one
pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> {
match self.roles.get(node) {
Some(NodeRoleV(Some(v))) => Some(v),
@@ -61,7 +58,7 @@ impl LayoutVersion {
}
}
- /// Given a node uuids, this function returns its capacity or fails if it does not have any
+ /// Returns the capacity of a node in the layout, if it has one
pub fn get_node_capacity(&self, uuid: &Uuid) -> Option<u64> {
match self.node_role(uuid) {
Some(NodeRole {
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 3418600b..ab3c96b8 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -609,7 +609,7 @@ impl System {
while !*stop_signal.borrow() {
let not_configured = self.cluster_layout().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
- let expected_n_nodes = self.cluster_layout().current().num_nodes();
+ let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = self
.fullmesh
.get_peer_list()
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 5653a229..beaacc2b 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -35,10 +35,10 @@ impl TableReplication for TableFullReplication {
}
fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
- self.system.cluster_layout().current().node_ids().to_vec()
+ self.system.cluster_layout().current().all_nodes().to_vec()
}
fn write_quorum(&self) -> usize {
- let nmembers = self.system.cluster_layout().current().node_ids().len();
+ let nmembers = self.system.cluster_layout().current().all_nodes().len();
if nmembers > self.max_faults {
nmembers - self.max_faults
} else {
@@ -62,7 +62,7 @@ impl TableReplication for TableFullReplication {
partition: 0u16,
first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(),
- storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()),
+ storage_nodes: Vec::from_iter(layout.current().all_nodes().to_vec()),
}],
}
}