aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs225
1 files changed, 153 insertions, 72 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 2c6f14fd..e0ced8cc 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
+use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -21,7 +22,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
@@ -39,6 +39,8 @@ use crate::replication_mode::*;
use crate::ring::*;
use crate::rpc_helper::*;
+use crate::system_metrics::*;
+
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
@@ -50,8 +52,6 @@ pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
-pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret";
-
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum SystemRpc {
@@ -76,13 +76,17 @@ impl Rpc for SystemRpc {
type Response = Result<SystemRpc, Error>;
}
+#[derive(Serialize, Deserialize)]
+pub struct PeerList(Vec<(Uuid, SocketAddr)>);
+impl garage_util::migrate::InitialFormat for PeerList {}
+
/// This node's membership manager
pub struct System {
/// The id of this node
pub id: Uuid,
persist_cluster_layout: Persister<ClusterLayout>,
- persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
+ persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
@@ -103,6 +107,8 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ metrics: SystemMetrics,
+
replication_mode: ReplicationMode,
replication_factor: usize,
@@ -110,23 +116,30 @@ pub struct System {
pub ring: watch::Receiver<Arc<Ring>>,
update_ring: Mutex<watch::Sender<Arc<Ring>>>,
- /// The job runner of this node
- pub background: Arc<BackgroundRunner>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
+ /// Path to data directory
+ pub data_dir: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeStatus {
/// Hostname of the node
pub hostname: String,
+
/// Replication factor configured on the node
pub replication_factor: usize,
/// Cluster layout version
pub cluster_layout_version: u64,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
+
+ /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
+ #[serde(default)]
+ pub meta_disk_avail: Option<(u64, u64)>,
+ /// Disk usage on partition containing data directory (tuple: `(avail, total)`)
+ #[serde(default)]
+ pub data_disk_avail: Option<(u64, u64)>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -232,7 +245,6 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
- background: Arc<BackgroundRunner>,
replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
@@ -269,14 +281,10 @@ impl System {
}
};
- let local_status = NodeStatus {
- hostname: gethostname::gethostname()
- .into_string()
- .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
- replication_factor,
- cluster_layout_version: cluster_layout.version,
- cluster_layout_staging_hash: cluster_layout.staging_hash,
- };
+ let metrics = SystemMetrics::new(replication_factor);
+
+ let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
+ local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
@@ -354,7 +362,6 @@ impl System {
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
- background.clone(),
ring.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
@@ -369,11 +376,12 @@ impl System {
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
+ metrics,
ring,
update_ring: Mutex::new(update_ring),
- background,
metadata_dir: config.metadata_dir.clone(),
+ data_dir: config.data_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
Ok(sys)
@@ -411,12 +419,7 @@ impl System {
.get(&n.id.into())
.cloned()
.map(|(_, st)| st)
- .unwrap_or(NodeStatus {
- hostname: "?".to_string(),
- replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
- }),
+ .unwrap_or(NodeStatus::unknown()),
})
.collect::<Vec<_>>();
known_nodes
@@ -444,17 +447,14 @@ impl System {
))
})?;
let mut errors = vec![];
- for ip in addrs.iter() {
- match self
- .netapp
- .clone()
- .try_connect(*ip, pubkey)
- .await
- .err_context(CONNECT_ERROR_MESSAGE)
- {
+ for addr in addrs.iter() {
+ match self.netapp.clone().try_connect(*addr, pubkey).await {
Ok(()) => return Ok(()),
Err(e) => {
- errors.push((*ip, e));
+ errors.push((
+ *addr,
+ Error::Message(connect_error_message(*addr, pubkey, e)),
+ ));
}
}
}
@@ -529,56 +529,61 @@ impl System {
// ---- INTERNALS ----
#[cfg(feature = "consul-discovery")]
- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
+ async fn advertise_to_consul(self: Arc<Self>) {
let c = match &self.consul_discovery {
Some(c) => c,
- _ => return Ok(()),
+ _ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected.");
- return Ok(());
+ return;
}
};
- c.publish_consul_service(
- self.netapp.id,
- &self.local_status.load_full().hostname,
- rpc_public_addr,
- )
- .await
- .err_context("Error while publishing Consul service")
+ if let Err(e) = c
+ .publish_consul_service(
+ self.netapp.id,
+ &self.local_status.load_full().hostname,
+ rpc_public_addr,
+ )
+ .await
+ {
+ error!("Error while publishing Consul service: {}", e);
+ }
}
#[cfg(feature = "kubernetes-discovery")]
- async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
+ async fn advertise_to_kubernetes(self: Arc<Self>) {
let k = match &self.kubernetes_discovery {
Some(k) => k,
- _ => return Ok(()),
+ _ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected.");
- return Ok(());
+ return;
}
};
- publish_kubernetes_node(
+ if let Err(e) = publish_kubernetes_node(
k,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
)
.await
- .err_context("Error while publishing node to kubernetes")
+ {
+ error!("Error while publishing node to Kubernetes: {}", e);
+ }
}
/// Save network configuration to disc
- async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
+ async fn save_cluster_layout(&self) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
self.persist_cluster_layout
.save_async(&ring.layout)
@@ -593,6 +598,9 @@ impl System {
let ring = self.ring.borrow();
new_si.cluster_layout_version = ring.layout.version;
new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
+
+ new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
+
self.local_status.swap(Arc::new(new_si));
}
@@ -630,11 +638,7 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{
- let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2.pull_cluster_layout(from).await;
- Ok(())
- });
+ tokio::spawn(self.clone().pull_cluster_layout(from));
}
self.node_status
@@ -676,18 +680,21 @@ impl System {
drop(update_ring);
let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2
+ tokio::spawn(async move {
+ if let Err(e) = self2
.rpc
.broadcast(
&self2.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
- .await?;
- Ok(())
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
+ }
});
- self.background.spawn(self.clone().save_cluster_layout());
+
+ self.save_cluster_layout().await?;
}
Ok(SystemRpc::Ok)
@@ -734,7 +741,7 @@ impl System {
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
- ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr)))
+ ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
// Fetch peer list from Consul
@@ -773,12 +780,12 @@ impl System {
}
for (node_id, node_addr) in ping_list {
- tokio::spawn(
- self.netapp
- .clone()
- .try_connect(node_addr, node_id)
- .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)),
- );
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await {
+ error!("{}", connect_error_message(node_addr, node_id, e));
+ }
+ });
}
}
@@ -787,11 +794,10 @@ impl System {
}
#[cfg(feature = "consul-discovery")]
- self.background.spawn(self.clone().advertise_to_consul());
+ tokio::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
- self.background
- .spawn(self.clone().advertise_to_kubernetes());
+ tokio::spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
@@ -815,12 +821,16 @@ impl System {
// and append it to the list we are about to save,
// so that no peer ID gets lost in the process.
if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await {
- prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
- peer_list.extend(prev_peer_list);
+ prev_peer_list
+ .0
+ .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
+ peer_list.extend(prev_peer_list.0);
}
// Save new peer list to file
- self.persist_peer_list.save_async(&peer_list).await
+ self.persist_peer_list
+ .save_async(&PeerList(peer_list))
+ .await
}
async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
@@ -855,6 +865,69 @@ impl EndpointHandler<SystemRpc> for System {
}
}
+impl NodeStatus {
+ fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
+ NodeStatus {
+ hostname: gethostname::gethostname()
+ .into_string()
+ .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ replication_factor,
+ cluster_layout_version: layout.version,
+ cluster_layout_staging_hash: layout.staging_hash,
+ meta_disk_avail: None,
+ data_disk_avail: None,
+ }
+ }
+
+ fn unknown() -> Self {
+ NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ cluster_layout_version: 0,
+ cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ meta_disk_avail: None,
+ data_disk_avail: None,
+ }
+ }
+
+ fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) {
+ use systemstat::{Platform, System};
+ let mounts = System::new().mounts().unwrap_or_default();
+
+ let mount_avail = |path: &Path| {
+ mounts
+ .iter()
+ .filter(|x| path.starts_with(&x.fs_mounted_on))
+ .max_by_key(|x| x.fs_mounted_on.len())
+ .map(|x| (x.avail.as_u64(), x.total.as_u64()))
+ };
+
+ self.meta_disk_avail = mount_avail(meta_dir);
+ self.data_disk_avail = mount_avail(data_dir);
+
+ if let Some((avail, total)) = self.meta_disk_avail {
+ metrics
+ .values
+ .meta_disk_avail
+ .store(avail, Ordering::Relaxed);
+ metrics
+ .values
+ .meta_disk_total
+ .store(total, Ordering::Relaxed);
+ }
+ if let Some((avail, total)) = self.data_disk_avail {
+ metrics
+ .values
+ .data_disk_avail
+ .store(avail, Ordering::Relaxed);
+ metrics
+ .values
+ .data_disk_total
+ .store(total, Ordering::Relaxed);
+ }
+ }
+}
+
fn get_default_ip() -> Option<IpAddr> {
pnet_datalink::interfaces()
.iter()
@@ -881,3 +954,11 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
ret
}
+
+fn connect_error_message(
+ addr: SocketAddr,
+ pubkey: ed25519::PublicKey,
+ e: netapp::error::Error,
+) -> String {
+ format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e)
+}