diff options
author | Alex Auvolat <alex@adnab.me> | 2023-04-25 12:34:26 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-04-25 12:34:26 +0200 |
commit | fa78d806e3ae40031e80eebb86e4eb1756d7baea (patch) | |
tree | 144662fb430c484093f6f9a585a2441c2ff26494 /src/rpc | |
parent | 654999e254e6c1f46bb5d668bc1230f226575716 (diff) | |
parent | a16eb7e4b8344d2f58c09a249b7b1bd17d339a35 (diff) | |
download | garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.tar.gz garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.zip |
Merge branch 'main' into next
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 10 | ||||
-rw-r--r-- | src/rpc/consul.rs | 2 | ||||
-rw-r--r-- | src/rpc/lib.rs | 4 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 5 | ||||
-rw-r--r-- | src/rpc/system.rs | 104 | ||||
-rw-r--r-- | src/rpc/system_metrics.rs | 77 |
6 files changed, 177 insertions, 25 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 3d4d3ff5..57c157d0 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_rpc" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,17 +14,18 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_util = { version = "0.8.1", path = "../util" } +garage_util = { version = "0.8.2", path = "../util" } arc-swap = "1.0" bytes = "1.0" bytesize = "1.1" gethostname = "0.2" hex = "0.4" -tracing = "0.1.30" +tracing = "0.1" rand = "0.8" itertools="0.10" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } +systemstat = "0.2.3" async-trait = "0.1.7" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } @@ -38,8 +39,7 @@ k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true } schemars = { version = "0.8", optional = true } reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] } -# newer version requires rust edition 2021 -pnet_datalink = "0.28" +pnet_datalink = "0.33" futures = "0.3" futures-util = "0.3" diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index b1772a1a..f85f789c 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -113,7 +113,7 @@ impl ConsulDiscovery { let pubkey = ent .node_meta .get("pubkey") - .and_then(|k| hex::decode(&k).ok()) + .and_then(|k| hex::decode(k).ok()) .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index f734942d..a5f8fc6e 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -3,6 +3,9 @@ #[macro_use] extern crate tracing; +mod metrics; +mod system_metrics; + #[cfg(feature = "consul-discovery")] mod consul; #[cfg(feature = "kubernetes-discovery")] @@ -14,7 +17,6 @@ pub mod replication_mode; pub mod ring; pub mod system; -mod metrics; pub mod rpc_helper; pub use rpc_helper::*; diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 1ec250c3..e59c372a 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,10 +15,9 @@ use opentelemetry::{ }; pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; -use netapp::message::IntoReq; pub use netapp::message::{ - Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, - PRIO_SECONDARY, + IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, + PRIO_NORMAL, PRIO_SECONDARY, }; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 1f4d86e7..c549d8fc 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; +use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -38,6 +39,8 @@ use crate::replication_mode::*; use crate::ring::*; use crate::rpc_helper::*; +use crate::system_metrics::*; + const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); @@ -104,6 +107,8 @@ pub struct System { #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option<KubernetesDiscoveryConfig>, + metrics: SystemMetrics, + replication_mode: ReplicationMode, replication_factor: usize, @@ -113,18 +118,28 @@ pub struct System { /// Path to metadata directory pub metadata_dir: PathBuf, + /// Path to data directory + pub data_dir: PathBuf, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeStatus { /// Hostname of the node pub hostname: String, + /// Replication factor configured on the node pub replication_factor: usize, /// Cluster layout version pub cluster_layout_version: u64, /// Hash of cluster layout staging data pub cluster_layout_staging_hash: Hash, + + /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) + #[serde(default)] + pub meta_disk_avail: Option<(u64, u64)>, + /// Disk usage on partition containing data directory (tuple: `(avail, total)`) + #[serde(default)] + pub data_disk_avail: Option<(u64, u64)>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -200,7 +215,7 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> { } else { if !metadata_dir.exists() { info!("Metadata directory does not exist, creating it."); - std::fs::create_dir(&metadata_dir)?; + std::fs::create_dir(metadata_dir)?; } info!("Generating new node key pair."); @@ -266,14 +281,10 @@ impl System { } }; - let local_status = NodeStatus { - hostname: gethostname::gethostname() - .into_string() - .unwrap_or_else(|_| "<invalid utf-8>".to_string()), - replication_factor, - cluster_layout_version: cluster_layout.version, - cluster_layout_staging_hash: cluster_layout.staging_hash, - }; + let metrics = SystemMetrics::new(replication_factor); + + let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); + local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); @@ -365,10 +376,12 @@ impl System { consul_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), + metrics, ring, update_ring: Mutex::new(update_ring), metadata_dir: config.metadata_dir.clone(), + data_dir: config.data_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); Ok(sys) @@ -406,12 +419,7 @@ impl System { .get(&n.id.into()) .cloned() .map(|(_, st)| st) - .unwrap_or(NodeStatus { - hostname: "?".to_string(), - replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), - }), + .unwrap_or_else(NodeStatus::unknown), }) .collect::<Vec<_>>(); known_nodes @@ -590,6 +598,9 @@ impl System { let ring = self.ring.borrow(); new_si.cluster_layout_version = ring.layout.version; new_si.cluster_layout_staging_hash = ring.layout.staging_hash; + + new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); + self.local_status.swap(Arc::new(new_si)); } @@ -854,6 +865,69 @@ impl EndpointHandler<SystemRpc> for System { } } +impl NodeStatus { + fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self { + NodeStatus { + hostname: gethostname::gethostname() + .into_string() + .unwrap_or_else(|_| "<invalid utf-8>".to_string()), + replication_factor, + cluster_layout_version: layout.version, + cluster_layout_staging_hash: layout.staging_hash, + meta_disk_avail: None, + data_disk_avail: None, + } + } + + fn unknown() -> Self { + NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), + meta_disk_avail: None, + data_disk_avail: None, + } + } + + fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) { + use systemstat::{Platform, System}; + let mounts = System::new().mounts().unwrap_or_default(); + + let mount_avail = |path: &Path| { + mounts + .iter() + .filter(|x| path.starts_with(&x.fs_mounted_on)) + .max_by_key(|x| x.fs_mounted_on.len()) + .map(|x| (x.avail.as_u64(), x.total.as_u64())) + }; + + self.meta_disk_avail = mount_avail(meta_dir); + self.data_disk_avail = mount_avail(data_dir); + + if let Some((avail, total)) = self.meta_disk_avail { + metrics + .values + .meta_disk_avail + .store(avail, Ordering::Relaxed); + metrics + .values + .meta_disk_total + .store(total, Ordering::Relaxed); + } + if let Some((avail, total)) = self.data_disk_avail { + metrics + .values + .data_disk_avail + .store(avail, Ordering::Relaxed); + metrics + .values + .data_disk_total + .store(total, Ordering::Relaxed); + } + } +} + fn get_default_ip() -> Option<IpAddr> { pnet_datalink::interfaces() .iter() diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs new file mode 100644 index 00000000..af81b71f --- /dev/null +++ b/src/rpc/system_metrics.rs @@ -0,0 +1,77 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use opentelemetry::{global, metrics::*, KeyValue}; + +/// TableMetrics reference all counter used for metrics +pub struct SystemMetrics { + pub(crate) _garage_build_info: ValueObserver<u64>, + pub(crate) _replication_factor: ValueObserver<u64>, + pub(crate) _disk_avail: ValueObserver<u64>, + pub(crate) _disk_total: ValueObserver<u64>, + pub(crate) values: Arc<SystemMetricsValues>, +} + +#[derive(Default)] +pub struct SystemMetricsValues { + pub(crate) data_disk_total: AtomicU64, + pub(crate) data_disk_avail: AtomicU64, + pub(crate) meta_disk_total: AtomicU64, + pub(crate) meta_disk_avail: AtomicU64, +} + +impl SystemMetrics { + pub fn new(replication_factor: usize) -> Self { + let meter = global::meter("garage_system"); + let values = Arc::new(SystemMetricsValues::default()); + let values1 = values.clone(); + let values2 = values.clone(); + Self { + _garage_build_info: meter + .u64_value_observer("garage_build_info", move |observer| { + observer.observe( + 1, + &[ + KeyValue::new("rustversion", garage_util::version::rust_version()), + KeyValue::new("version", garage_util::version::garage_version()), + ], + ) + }) + .with_description("Garage build info") + .init(), + _replication_factor: meter + .u64_value_observer("garage_replication_factor", move |observer| { + observer.observe(replication_factor as u64, &[]) + }) + .with_description("Garage replication factor setting") + .init(), + _disk_avail: meter + .u64_value_observer("garage_local_disk_avail", move |observer| { + match values1.data_disk_avail.load(Ordering::Relaxed) { + 0 => (), + x => observer.observe(x, &[KeyValue::new("volume", "data")]), + }; + match values1.meta_disk_avail.load(Ordering::Relaxed) { + 0 => (), + x => observer.observe(x, &[KeyValue::new("volume", "metadata")]), + }; + }) + .with_description("Garage available disk space on each node") + .init(), + _disk_total: meter + .u64_value_observer("garage_local_disk_total", move |observer| { + match values2.data_disk_total.load(Ordering::Relaxed) { + 0 => (), + x => observer.observe(x, &[KeyValue::new("volume", "data")]), + }; + match values2.meta_disk_total.load(Ordering::Relaxed) { + 0 => (), + x => observer.observe(x, &[KeyValue::new("volume", "metadata")]), + }; + }) + .with_description("Garage total disk space on each node") + .init(), + values, + } + } +} |