aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs83
1 files changed, 55 insertions, 28 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index b42e49fc..4b40bec4 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -9,10 +9,10 @@ use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
use async_trait::async_trait;
-use futures::{join, select};
-use futures_util::future::*;
+use futures::join;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
+use tokio::select;
use tokio::sync::watch;
use tokio::sync::Mutex;
@@ -22,9 +22,9 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
+use garage_util::config::{Config, DataDirEnum};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
@@ -119,7 +119,7 @@ pub struct System {
/// Path to metadata directory
pub metadata_dir: PathBuf,
/// Path to data directory
- pub data_dir: PathBuf,
+ pub data_dir: DataDirEnum,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -151,7 +151,7 @@ pub struct KnownNodeInfo {
pub status: NodeStatus,
}
-#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+#[derive(Debug, Clone, Copy)]
pub struct ClusterHealth {
/// The current health status of the cluster (see below)
pub status: ClusterHealthStatus,
@@ -171,7 +171,7 @@ pub struct ClusterHealth {
pub partitions_all_ok: usize,
}
-#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+#[derive(Debug, Clone, Copy)]
pub enum ClusterHealthStatus {
/// All nodes are available
Healthy,
@@ -666,9 +666,9 @@ impl System {
let update_ring = self.update_ring.lock().await;
let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
- let prev_layout_check = layout.check();
+ let prev_layout_check = layout.check().is_ok();
if layout.merge(adv) {
- if prev_layout_check && !layout.check() {
+ if prev_layout_check && layout.check().is_err() {
error!("New cluster layout is invalid, discarding.");
return Err(Error::Message(
"New cluster layout is invalid, discarding.".into(),
@@ -702,7 +702,7 @@ impl System {
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL);
+ let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
@@ -711,20 +711,21 @@ impl System {
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
- RequestStrategy::with_priority(PRIO_HIGH),
+ RequestStrategy::with_priority(PRIO_HIGH)
+ .with_custom_timeout(STATUS_EXCHANGE_INTERVAL),
)
.await;
select! {
- _ = restart_at.fuse() => {},
- _ = stop_signal.changed().fuse() => {},
+ _ = tokio::time::sleep_until(restart_at.into()) => {},
+ _ = stop_signal.changed() => {},
}
}
}
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let not_configured = !self.ring.borrow().layout.check();
+ let not_configured = self.ring.borrow().layout.check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
let expected_n_nodes = self.ring.borrow().layout.num_nodes();
let bad_peers = self
@@ -799,10 +800,9 @@ impl System {
#[cfg(feature = "kubernetes-discovery")]
tokio::spawn(self.clone().advertise_to_kubernetes());
- let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
- _ = restart_at.fuse() => {},
- _ = stop_signal.changed().fuse() => {},
+ _ = tokio::time::sleep(DISCOVERY_INTERVAL) => {},
+ _ = stop_signal.changed() => {},
}
}
}
@@ -890,20 +890,47 @@ impl NodeStatus {
}
}
- fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) {
- use systemstat::{Platform, System};
- let mounts = System::new().mounts().unwrap_or_default();
-
- let mount_avail = |path: &Path| {
- mounts
- .iter()
- .filter(|x| path.starts_with(&x.fs_mounted_on))
- .max_by_key(|x| x.fs_mounted_on.len())
- .map(|x| (x.avail.as_u64(), x.total.as_u64()))
+ fn update_disk_usage(
+ &mut self,
+ meta_dir: &Path,
+ data_dir: &DataDirEnum,
+ metrics: &SystemMetrics,
+ ) {
+ use nix::sys::statvfs::statvfs;
+ let mount_avail = |path: &Path| match statvfs(path) {
+ Ok(x) => {
+ let avail = x.blocks_available() as u64 * x.fragment_size() as u64;
+ let total = x.blocks() as u64 * x.fragment_size() as u64;
+ Some((x.filesystem_id(), avail, total))
+ }
+ Err(_) => None,
};
- self.meta_disk_avail = mount_avail(meta_dir);
- self.data_disk_avail = mount_avail(data_dir);
+ self.meta_disk_avail = mount_avail(meta_dir).map(|(_, a, t)| (a, t));
+ self.data_disk_avail = match data_dir {
+ DataDirEnum::Single(dir) => mount_avail(dir).map(|(_, a, t)| (a, t)),
+ DataDirEnum::Multiple(dirs) => (|| {
+ // TODO: more precise calculation that takes into account
+ // how data is going to be spread among partitions
+ let mut mounts = HashMap::new();
+ for dir in dirs.iter() {
+ if dir.capacity.is_none() {
+ continue;
+ }
+ match mount_avail(&dir.path) {
+ Some((fsid, avail, total)) => {
+ mounts.insert(fsid, (avail, total));
+ }
+ None => return None,
+ }
+ }
+ Some(
+ mounts
+ .into_iter()
+ .fold((0, 0), |(x, y), (_, (a, b))| (x + a, y + b)),
+ )
+ })(),
+ };
if let Some((avail, total)) = self.meta_disk_avail {
metrics