aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rpc/system_metrics.rs94
1 files changed, 94 insertions, 0 deletions
diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs
index ad4aca2f..ffbef6df 100644
--- a/src/rpc/system_metrics.rs
+++ b/src/rpc/system_metrics.rs
@@ -3,6 +3,7 @@ use std::time::{Duration, Instant};
use opentelemetry::{global, metrics::*, KeyValue};
+use crate::ring::Ring;
use crate::system::{ClusterHealthStatus, System};
/// TableMetrics reference all counter used for metrics
@@ -25,6 +26,10 @@ pub struct SystemMetrics {
pub(crate) _partitions: ValueObserver<u64>,
pub(crate) _partitions_quorum: ValueObserver<u64>,
pub(crate) _partitions_all_ok: ValueObserver<u64>,
+
+ // Status report for individual cluster nodes
+ pub(crate) _layout_node_connected: ValueObserver<u64>,
+ pub(crate) _layout_node_disconnected_time: ValueObserver<u64>,
}
impl SystemMetrics {
@@ -204,6 +209,95 @@ impl SystemMetrics {
)
.init()
},
+
+ // Status report for individual cluster nodes
+ _layout_node_connected: {
+ let system = system.clone();
+ meter
+ .u64_value_observer("cluster_layout_node_connected", move |observer| {
+ let ring: Arc<Ring> = system.ring.borrow().clone();
+ let nodes = system.get_known_nodes();
+ for (id, _, config) in ring.layout.roles.items().iter() {
+ if let Some(role) = &config.0 {
+ let mut kv = vec![
+ KeyValue::new("id", format!("{:?}", id)),
+ KeyValue::new("role_zone", role.zone.clone()),
+ ];
+ match role.capacity {
+ Some(cap) => {
+ kv.push(KeyValue::new("role_capacity", cap as i64));
+ kv.push(KeyValue::new("role_gateway", 0));
+ }
+ None => {
+ kv.push(KeyValue::new("role_gateway", 1));
+ }
+ }
+
+ let value;
+ if let Some(node) = nodes.iter().find(|n| n.id == *id) {
+ value = if node.is_up { 1 } else { 0 };
+ // TODO: if we add address and hostname, and those change, we
+ // get duplicate metrics, due to bad otel aggregation :(
+ // Can probably be fixed when we upgrade opentelemetry
+ // kv.push(KeyValue::new("address", node.addr.to_string()));
+ // kv.push(KeyValue::new(
+ // "hostname",
+ // node.status.hostname.clone(),
+ // ));
+ } else {
+ value = 0;
+ }
+
+ observer.observe(value, &kv);
+ }
+ }
+ })
+ .with_description("Connection status for nodes in the cluster layout")
+ .init()
+ },
+ _layout_node_disconnected_time: {
+ let system = system.clone();
+ meter
+ .u64_value_observer("cluster_layout_node_disconnected_time", move |observer| {
+ let ring: Arc<Ring> = system.ring.borrow().clone();
+ let nodes = system.get_known_nodes();
+ for (id, _, config) in ring.layout.roles.items().iter() {
+ if let Some(role) = &config.0 {
+ let mut kv = vec![
+ KeyValue::new("id", format!("{:?}", id)),
+ KeyValue::new("role_zone", role.zone.clone()),
+ ];
+ match role.capacity {
+ Some(cap) => {
+ kv.push(KeyValue::new("role_capacity", cap as i64));
+ kv.push(KeyValue::new("role_gateway", 0));
+ }
+ None => {
+ kv.push(KeyValue::new("role_gateway", 1));
+ }
+ }
+
+ if let Some(node) = nodes.iter().find(|n| n.id == *id) {
+ // TODO: see comment above
+ // kv.push(KeyValue::new("address", node.addr.to_string()));
+ // kv.push(KeyValue::new(
+ // "hostname",
+ // node.status.hostname.clone(),
+ // ));
+ if node.is_up {
+ observer.observe(0, &kv);
+ } else if let Some(secs) = node.last_seen_secs_ago {
+ observer.observe(secs, &kv);
+ }
+ }
+ }
+ }
+ })
+ .with_description(
+ "Time (in seconds) since last connection to nodes in the cluster layout",
+ )
+ .init()
+ },
}
}
}