diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/rpc/system.rs | 73 |
2 files changed, 60 insertions, 14 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index e9a0929a..2daa0176 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -23,6 +23,7 @@ hex = "0.4" tracing = "0.1.30" rand = "0.8" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } +systemstat = "0.2.3" async-trait = "0.1.7" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 90f6a4c2..a9e91e19 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -118,18 +118,28 @@ pub struct System { /// Path to metadata directory pub metadata_dir: PathBuf, + /// Path to data directory + pub data_dir: PathBuf, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeStatus { /// Hostname of the node pub hostname: String, + /// Replication factor configured on the node pub replication_factor: usize, /// Cluster layout version pub cluster_layout_version: u64, /// Hash of cluster layout staging data pub cluster_layout_staging_hash: Hash, + + /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) + #[serde(default)] + pub meta_disk_avail: Option<(u64, u64)>, + /// Disk usage on partition containing data directory (tuple: `(avail, total)`) + #[serde(default)] + pub data_disk_avail: Option<(u64, u64)>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -271,14 +281,8 @@ impl System { } }; - let local_status = NodeStatus { - hostname: gethostname::gethostname() - .into_string() - .unwrap_or_else(|_| "<invalid utf-8>".to_string()), - replication_factor, - cluster_layout_version: cluster_layout.version, - cluster_layout_staging_hash: cluster_layout.staging_hash, - }; + let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); + local_status.update_disk_usage(&config.metadata_dir, &config.data_dir); #[cfg(feature = "metrics")] let metrics = SystemMetrics::new(replication_factor); @@ -379,6 +383,7 @@ impl System { ring, update_ring: Mutex::new(update_ring), metadata_dir: config.metadata_dir.clone(), + data_dir: config.data_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); Ok(sys) @@ -416,12 +421,7 @@ impl System { .get(&n.id.into()) .cloned() .map(|(_, st)| st) - .unwrap_or(NodeStatus { - hostname: "?".to_string(), - replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), - }), + .unwrap_or(NodeStatus::unknown()), }) .collect::<Vec<_>>(); known_nodes @@ -600,6 +600,9 @@ impl System { let ring = self.ring.borrow(); new_si.cluster_layout_version = ring.layout.version; new_si.cluster_layout_staging_hash = ring.layout.staging_hash; + + new_si.update_disk_usage(&self.metadata_dir, &self.data_dir); + self.local_status.swap(Arc::new(new_si)); } @@ -864,6 +867,48 @@ impl EndpointHandler<SystemRpc> for System { } } +impl NodeStatus { + fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self { + NodeStatus { + hostname: gethostname::gethostname() + .into_string() + .unwrap_or_else(|_| "<invalid utf-8>".to_string()), + replication_factor, + cluster_layout_version: layout.version, + cluster_layout_staging_hash: layout.staging_hash, + meta_disk_avail: None, + data_disk_avail: None, + } + } + + fn unknown() -> Self { + NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), + meta_disk_avail: None, + data_disk_avail: None, + } + } + + fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path) { + use systemstat::{Platform, System}; + let mounts = System::new().mounts().unwrap_or_default(); + + let mount_avail = |path: &Path| { + mounts + .iter() + .filter(|x| path.starts_with(&x.fs_mounted_on)) + .max_by_key(|x| x.fs_mounted_on.len()) + .map(|x| (x.avail.as_u64(), x.total.as_u64())) + }; + + self.meta_disk_avail = mount_avail(meta_dir); + self.data_disk_avail = mount_avail(data_dir); + } +} + fn get_default_ip() -> Option<IpAddr> { pnet_datalink::interfaces() .iter() |