aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-04-25 12:34:26 +0200
committerAlex Auvolat <alex@adnab.me>2023-04-25 12:34:26 +0200
commitfa78d806e3ae40031e80eebb86e4eb1756d7baea (patch)
tree144662fb430c484093f6f9a585a2441c2ff26494 /src/rpc
parent654999e254e6c1f46bb5d668bc1230f226575716 (diff)
parenta16eb7e4b8344d2f58c09a249b7b1bd17d339a35 (diff)
downloadgarage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.tar.gz
garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.zip
Merge branch 'main' into next
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml10
-rw-r--r--src/rpc/consul.rs2
-rw-r--r--src/rpc/lib.rs4
-rw-r--r--src/rpc/rpc_helper.rs5
-rw-r--r--src/rpc/system.rs104
-rw-r--r--src/rpc/system_metrics.rs77
6 files changed, 177 insertions, 25 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 3d4d3ff5..57c157d0 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_rpc"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,17 +14,18 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_util = { version = "0.8.1", path = "../util" }
+garage_util = { version = "0.8.2", path = "../util" }
arc-swap = "1.0"
bytes = "1.0"
bytesize = "1.1"
gethostname = "0.2"
hex = "0.4"
-tracing = "0.1.30"
+tracing = "0.1"
rand = "0.8"
itertools="0.10"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
+systemstat = "0.2.3"
async-trait = "0.1.7"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
@@ -38,8 +39,7 @@ k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true }
schemars = { version = "0.8", optional = true }
reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] }
-# newer version requires rust edition 2021
-pnet_datalink = "0.28"
+pnet_datalink = "0.33"
futures = "0.3"
futures-util = "0.3"
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index b1772a1a..f85f789c 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -113,7 +113,7 @@ impl ConsulDiscovery {
let pubkey = ent
.node_meta
.get("pubkey")
- .and_then(|k| hex::decode(&k).ok())
+ .and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index f734942d..a5f8fc6e 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -3,6 +3,9 @@
#[macro_use]
extern crate tracing;
+mod metrics;
+mod system_metrics;
+
#[cfg(feature = "consul-discovery")]
mod consul;
#[cfg(feature = "kubernetes-discovery")]
@@ -14,7 +17,6 @@ pub mod replication_mode;
pub mod ring;
pub mod system;
-mod metrics;
pub mod rpc_helper;
pub use rpc_helper::*;
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 1ec250c3..e59c372a 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -15,10 +15,9 @@ use opentelemetry::{
};
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
-use netapp::message::IntoReq;
pub use netapp::message::{
- Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
- PRIO_SECONDARY,
+ IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
+ PRIO_NORMAL, PRIO_SECONDARY,
};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{self, NetApp, NodeID};
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 1f4d86e7..c549d8fc 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
+use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -38,6 +39,8 @@ use crate::replication_mode::*;
use crate::ring::*;
use crate::rpc_helper::*;
+use crate::system_metrics::*;
+
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
@@ -104,6 +107,8 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ metrics: SystemMetrics,
+
replication_mode: ReplicationMode,
replication_factor: usize,
@@ -113,18 +118,28 @@ pub struct System {
/// Path to metadata directory
pub metadata_dir: PathBuf,
+ /// Path to data directory
+ pub data_dir: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeStatus {
/// Hostname of the node
pub hostname: String,
+
/// Replication factor configured on the node
pub replication_factor: usize,
/// Cluster layout version
pub cluster_layout_version: u64,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
+
+ /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
+ #[serde(default)]
+ pub meta_disk_avail: Option<(u64, u64)>,
+ /// Disk usage on partition containing data directory (tuple: `(avail, total)`)
+ #[serde(default)]
+ pub data_disk_avail: Option<(u64, u64)>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -200,7 +215,7 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
} else {
if !metadata_dir.exists() {
info!("Metadata directory does not exist, creating it.");
- std::fs::create_dir(&metadata_dir)?;
+ std::fs::create_dir(metadata_dir)?;
}
info!("Generating new node key pair.");
@@ -266,14 +281,10 @@ impl System {
}
};
- let local_status = NodeStatus {
- hostname: gethostname::gethostname()
- .into_string()
- .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
- replication_factor,
- cluster_layout_version: cluster_layout.version,
- cluster_layout_staging_hash: cluster_layout.staging_hash,
- };
+ let metrics = SystemMetrics::new(replication_factor);
+
+ let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
+ local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
@@ -365,10 +376,12 @@ impl System {
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
+ metrics,
ring,
update_ring: Mutex::new(update_ring),
metadata_dir: config.metadata_dir.clone(),
+ data_dir: config.data_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
Ok(sys)
@@ -406,12 +419,7 @@ impl System {
.get(&n.id.into())
.cloned()
.map(|(_, st)| st)
- .unwrap_or(NodeStatus {
- hostname: "?".to_string(),
- replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
- }),
+ .unwrap_or_else(NodeStatus::unknown),
})
.collect::<Vec<_>>();
known_nodes
@@ -590,6 +598,9 @@ impl System {
let ring = self.ring.borrow();
new_si.cluster_layout_version = ring.layout.version;
new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
+
+ new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
+
self.local_status.swap(Arc::new(new_si));
}
@@ -854,6 +865,69 @@ impl EndpointHandler<SystemRpc> for System {
}
}
+impl NodeStatus {
+ fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
+ NodeStatus {
+ hostname: gethostname::gethostname()
+ .into_string()
+ .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ replication_factor,
+ cluster_layout_version: layout.version,
+ cluster_layout_staging_hash: layout.staging_hash,
+ meta_disk_avail: None,
+ data_disk_avail: None,
+ }
+ }
+
+ fn unknown() -> Self {
+ NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ cluster_layout_version: 0,
+ cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ meta_disk_avail: None,
+ data_disk_avail: None,
+ }
+ }
+
+ fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) {
+ use systemstat::{Platform, System};
+ let mounts = System::new().mounts().unwrap_or_default();
+
+ let mount_avail = |path: &Path| {
+ mounts
+ .iter()
+ .filter(|x| path.starts_with(&x.fs_mounted_on))
+ .max_by_key(|x| x.fs_mounted_on.len())
+ .map(|x| (x.avail.as_u64(), x.total.as_u64()))
+ };
+
+ self.meta_disk_avail = mount_avail(meta_dir);
+ self.data_disk_avail = mount_avail(data_dir);
+
+ if let Some((avail, total)) = self.meta_disk_avail {
+ metrics
+ .values
+ .meta_disk_avail
+ .store(avail, Ordering::Relaxed);
+ metrics
+ .values
+ .meta_disk_total
+ .store(total, Ordering::Relaxed);
+ }
+ if let Some((avail, total)) = self.data_disk_avail {
+ metrics
+ .values
+ .data_disk_avail
+ .store(avail, Ordering::Relaxed);
+ metrics
+ .values
+ .data_disk_total
+ .store(total, Ordering::Relaxed);
+ }
+ }
+}
+
fn get_default_ip() -> Option<IpAddr> {
pnet_datalink::interfaces()
.iter()
diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs
new file mode 100644
index 00000000..af81b71f
--- /dev/null
+++ b/src/rpc/system_metrics.rs
@@ -0,0 +1,77 @@
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
+
+use opentelemetry::{global, metrics::*, KeyValue};
+
+/// TableMetrics reference all counter used for metrics
+pub struct SystemMetrics {
+ pub(crate) _garage_build_info: ValueObserver<u64>,
+ pub(crate) _replication_factor: ValueObserver<u64>,
+ pub(crate) _disk_avail: ValueObserver<u64>,
+ pub(crate) _disk_total: ValueObserver<u64>,
+ pub(crate) values: Arc<SystemMetricsValues>,
+}
+
+#[derive(Default)]
+pub struct SystemMetricsValues {
+ pub(crate) data_disk_total: AtomicU64,
+ pub(crate) data_disk_avail: AtomicU64,
+ pub(crate) meta_disk_total: AtomicU64,
+ pub(crate) meta_disk_avail: AtomicU64,
+}
+
+impl SystemMetrics {
+ pub fn new(replication_factor: usize) -> Self {
+ let meter = global::meter("garage_system");
+ let values = Arc::new(SystemMetricsValues::default());
+ let values1 = values.clone();
+ let values2 = values.clone();
+ Self {
+ _garage_build_info: meter
+ .u64_value_observer("garage_build_info", move |observer| {
+ observer.observe(
+ 1,
+ &[
+ KeyValue::new("rustversion", garage_util::version::rust_version()),
+ KeyValue::new("version", garage_util::version::garage_version()),
+ ],
+ )
+ })
+ .with_description("Garage build info")
+ .init(),
+ _replication_factor: meter
+ .u64_value_observer("garage_replication_factor", move |observer| {
+ observer.observe(replication_factor as u64, &[])
+ })
+ .with_description("Garage replication factor setting")
+ .init(),
+ _disk_avail: meter
+ .u64_value_observer("garage_local_disk_avail", move |observer| {
+ match values1.data_disk_avail.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "data")]),
+ };
+ match values1.meta_disk_avail.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "metadata")]),
+ };
+ })
+ .with_description("Garage available disk space on each node")
+ .init(),
+ _disk_total: meter
+ .u64_value_observer("garage_local_disk_total", move |observer| {
+ match values2.data_disk_total.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "data")]),
+ };
+ match values2.meta_disk_total.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "metadata")]),
+ };
+ })
+ .with_description("Garage total disk space on each node")
+ .init(),
+ values,
+ }
+ }
+}