aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs222
1 files changed, 163 insertions, 59 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 224fbabb..1f4d86e7 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -21,7 +21,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
@@ -35,6 +34,7 @@ use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::*;
+use crate::replication_mode::*;
use crate::ring::*;
use crate::rpc_helper::*;
@@ -49,8 +49,6 @@ pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
-pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret";
-
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum SystemRpc {
@@ -75,13 +73,17 @@ impl Rpc for SystemRpc {
type Response = Result<SystemRpc, Error>;
}
+#[derive(Serialize, Deserialize)]
+pub struct PeerList(Vec<(Uuid, SocketAddr)>);
+impl garage_util::migrate::InitialFormat for PeerList {}
+
/// This node's membership manager
pub struct System {
/// The id of this node
pub id: Uuid,
persist_cluster_layout: Persister<ClusterLayout>,
- persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
+ persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
@@ -102,15 +104,13 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ replication_mode: ReplicationMode,
replication_factor: usize,
/// The ring
pub ring: watch::Receiver<Arc<Ring>>,
update_ring: Mutex<watch::Sender<Arc<Ring>>>,
- /// The job runner of this node
- pub background: Arc<BackgroundRunner>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
}
@@ -136,6 +136,37 @@ pub struct KnownNodeInfo {
pub status: NodeStatus,
}
+#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+pub struct ClusterHealth {
+ /// The current health status of the cluster (see below)
+ pub status: ClusterHealthStatus,
+ /// Number of nodes already seen once in the cluster
+ pub known_nodes: usize,
+ /// Number of nodes currently connected
+ pub connected_nodes: usize,
+ /// Number of storage nodes declared in the current layout
+ pub storage_nodes: usize,
+ /// Number of storage nodes currently connected
+ pub storage_nodes_ok: usize,
+ /// Number of partitions in the layout
+ pub partitions: usize,
+ /// Number of partitions for which we have a quorum of connected nodes
+ pub partitions_quorum: usize,
+ /// Number of partitions for which all storage nodes are connected
+ pub partitions_all_ok: usize,
+}
+
+#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+pub enum ClusterHealthStatus {
+ /// All nodes are available
+ Healthy,
+ /// Some storage nodes are unavailable, but quorum is stil
+ /// achieved for all partitions
+ Degraded,
+ /// Quorum is not available for some partitions
+ Unavailable,
+}
+
pub fn read_node_id(metadata_dir: &Path) -> Result<NodeID, Error> {
let mut pubkey_file = metadata_dir.to_path_buf();
pubkey_file.push("node_key.pub");
@@ -199,10 +230,11 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
- background: Arc<BackgroundRunner>,
- replication_factor: usize,
+ replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
+ let replication_factor = replication_mode.replication_factor();
+
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!(
@@ -319,11 +351,11 @@ impl System {
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
- background.clone(),
ring.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
system_endpoint,
+ replication_mode,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
@@ -336,7 +368,6 @@ impl System {
ring,
update_ring: Mutex::new(update_ring),
- background,
metadata_dir: config.metadata_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
@@ -408,17 +439,14 @@ impl System {
))
})?;
let mut errors = vec![];
- for ip in addrs.iter() {
- match self
- .netapp
- .clone()
- .try_connect(*ip, pubkey)
- .await
- .err_context(CONNECT_ERROR_MESSAGE)
- {
+ for addr in addrs.iter() {
+ match self.netapp.clone().try_connect(*addr, pubkey).await {
Ok(()) => return Ok(()),
Err(e) => {
- errors.push((*ip, e));
+ errors.push((
+ *addr,
+ Error::Message(connect_error_message(*addr, pubkey, e)),
+ ));
}
}
}
@@ -429,59 +457,125 @@ impl System {
}
}
+ pub fn health(&self) -> ClusterHealth {
+ let ring: Arc<_> = self.ring.borrow().clone();
+ let quorum = self.replication_mode.write_quorum();
+ let replication_factor = self.replication_factor;
+
+ let nodes = self
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<Uuid, _>>();
+ let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+
+ let storage_nodes = ring
+ .layout
+ .roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
+ .collect::<Vec<_>>();
+ let storage_nodes_ok = storage_nodes
+ .iter()
+ .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
+ .count();
+
+ let partitions = ring.partitions();
+ let partitions_n_up = partitions
+ .iter()
+ .map(|(_, h)| {
+ let pn = ring.get_nodes(h, ring.replication_factor);
+ pn.iter()
+ .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
+ .count()
+ })
+ .collect::<Vec<usize>>();
+ let partitions_all_ok = partitions_n_up
+ .iter()
+ .filter(|c| **c == replication_factor)
+ .count();
+ let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
+
+ let status =
+ if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() {
+ ClusterHealthStatus::Healthy
+ } else if partitions_quorum == partitions.len() {
+ ClusterHealthStatus::Degraded
+ } else {
+ ClusterHealthStatus::Unavailable
+ };
+
+ ClusterHealth {
+ status,
+ known_nodes: nodes.len(),
+ connected_nodes,
+ storage_nodes: storage_nodes.len(),
+ storage_nodes_ok,
+ partitions: partitions.len(),
+ partitions_quorum,
+ partitions_all_ok,
+ }
+ }
+
// ---- INTERNALS ----
#[cfg(feature = "consul-discovery")]
- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
+ async fn advertise_to_consul(self: Arc<Self>) {
let c = match &self.consul_discovery {
Some(c) => c,
- _ => return Ok(()),
+ _ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected.");
- return Ok(());
+ return;
}
};
- c.publish_consul_service(
- self.netapp.id,
- &self.local_status.load_full().hostname,
- rpc_public_addr,
- )
- .await
- .err_context("Error while publishing Consul service")
+ if let Err(e) = c
+ .publish_consul_service(
+ self.netapp.id,
+ &self.local_status.load_full().hostname,
+ rpc_public_addr,
+ )
+ .await
+ {
+ error!("Error while publishing Consul service: {}", e);
+ }
}
#[cfg(feature = "kubernetes-discovery")]
- async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
+ async fn advertise_to_kubernetes(self: Arc<Self>) {
let k = match &self.kubernetes_discovery {
Some(k) => k,
- _ => return Ok(()),
+ _ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected.");
- return Ok(());
+ return;
}
};
- publish_kubernetes_node(
+ if let Err(e) = publish_kubernetes_node(
k,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
)
.await
- .err_context("Error while publishing node to kubernetes")
+ {
+ error!("Error while publishing node to Kubernetes: {}", e);
+ }
}
/// Save network configuration to disc
- async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
+ async fn save_cluster_layout(&self) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
self.persist_cluster_layout
.save_async(&ring.layout)
@@ -533,11 +627,7 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{
- let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2.pull_cluster_layout(from).await;
- Ok(())
- });
+ tokio::spawn(self.clone().pull_cluster_layout(from));
}
self.node_status
@@ -579,18 +669,21 @@ impl System {
drop(update_ring);
let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2
+ tokio::spawn(async move {
+ if let Err(e) = self2
.rpc
.broadcast(
&self2.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
- .await?;
- Ok(())
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
+ }
});
- self.background.spawn(self.clone().save_cluster_layout());
+
+ self.save_cluster_layout().await?;
}
Ok(SystemRpc::Ok)
@@ -637,7 +730,7 @@ impl System {
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
- ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr)))
+ ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
// Fetch peer list from Consul
@@ -676,12 +769,12 @@ impl System {
}
for (node_id, node_addr) in ping_list {
- tokio::spawn(
- self.netapp
- .clone()
- .try_connect(node_addr, node_id)
- .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)),
- );
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await {
+ error!("{}", connect_error_message(node_addr, node_id, e));
+ }
+ });
}
}
@@ -690,11 +783,10 @@ impl System {
}
#[cfg(feature = "consul-discovery")]
- self.background.spawn(self.clone().advertise_to_consul());
+ tokio::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
- self.background
- .spawn(self.clone().advertise_to_kubernetes());
+ tokio::spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
@@ -718,12 +810,16 @@ impl System {
// and append it to the list we are about to save,
// so that no peer ID gets lost in the process.
if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await {
- prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
- peer_list.extend(prev_peer_list);
+ prev_peer_list
+ .0
+ .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
+ peer_list.extend(prev_peer_list.0);
}
// Save new peer list to file
- self.persist_peer_list.save_async(&peer_list).await
+ self.persist_peer_list
+ .save_async(&PeerList(peer_list))
+ .await
}
async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
@@ -784,3 +880,11 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
ret
}
+
+fn connect_error_message(
+ addr: SocketAddr,
+ pubkey: ed25519::PublicKey,
+ e: netapp::error::Error,
+) -> String {
+ format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e)
+}