aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml8
-rw-r--r--src/rpc/layout.rs13
-rw-r--r--src/rpc/lib.rs4
-rw-r--r--src/rpc/rpc_helper.rs23
-rw-r--r--src/rpc/system.rs225
-rw-r--r--src/rpc/system_metrics.rs77
6 files changed, 252 insertions, 98 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 2c2ddc0b..87ae15ac 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_rpc"
-version = "0.8.0"
+version = "0.8.1"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,18 +14,18 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_util = { version = "0.8.0", path = "../util" }
+garage_util = { version = "0.8.1", path = "../util" }
arc-swap = "1.0"
bytes = "1.0"
gethostname = "0.2"
hex = "0.4"
-tracing = "0.1.30"
+tracing = "0.1"
rand = "0.8"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
+systemstat = "0.2.3"
async-trait = "0.1.7"
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_json = "1.0"
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index 2fd5acfc..1030e3a6 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
+use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use crate::ring::*;
@@ -35,6 +36,8 @@ pub struct ClusterLayout {
pub staging_hash: Hash,
}
+impl garage_util::migrate::InitialFormat for ClusterLayout {}
+
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
@@ -68,7 +71,7 @@ impl NodeRole {
impl ClusterLayout {
pub fn new(replication_factor: usize) -> Self {
let empty_lwwmap = LwwMap::new();
- let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
+ let empty_lwwmap_hash = blake2sum(&nonversioned_encode(&empty_lwwmap).unwrap()[..]);
ClusterLayout {
version: 0,
@@ -90,7 +93,7 @@ impl ClusterLayout {
Ordering::Equal => {
self.staging.merge(&other.staging);
- let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let new_staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
let changed = new_staging_hash != self.staging_hash;
self.staging_hash = new_staging_hash;
@@ -125,7 +128,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
self.version += 1;
@@ -149,7 +152,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
self.version += 1;
@@ -178,7 +181,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
/// returns true if consistent, false if error
pub fn check(&self) -> bool {
// Check that the hash of the staging data is correct
- let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
if staging_hash != self.staging_hash {
return false;
}
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 86f63568..5aec92c0 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -3,6 +3,9 @@
#[macro_use]
extern crate tracing;
+mod metrics;
+mod system_metrics;
+
#[cfg(feature = "consul-discovery")]
mod consul;
#[cfg(feature = "kubernetes-discovery")]
@@ -13,7 +16,6 @@ pub mod replication_mode;
pub mod ring;
pub mod system;
-mod metrics;
pub mod rpc_helper;
pub use rpc_helper::*;
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 949aced6..e59c372a 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -5,7 +5,6 @@ use std::time::Duration;
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
-use futures_util::future::FutureExt;
use tokio::select;
use tokio::sync::watch;
@@ -16,15 +15,13 @@ use opentelemetry::{
};
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
-use netapp::message::IntoReq;
pub use netapp::message::{
- Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
- PRIO_SECONDARY,
+ IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
+ PRIO_NORMAL, PRIO_SECONDARY,
};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{self, NetApp, NodeID};
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -94,7 +91,6 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
@@ -104,7 +100,6 @@ impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
rpc_timeout: Option<Duration>,
) -> Self {
@@ -113,7 +108,6 @@ impl RpcHelper {
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
- background,
ring,
metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
@@ -377,16 +371,13 @@ impl RpcHelper {
if !resp_stream.is_empty() {
// Continue remaining requests in background.
- // Continue the remaining requests immediately using tokio::spawn
- // but enqueue a task in the background runner
- // to ensure that the process won't exit until the requests are done
- // (if we had just enqueued the resp_stream.collect directly in the background runner,
- // the requests might have been put on hold in the background runner's queue,
- // in which case they might timeout or otherwise fail)
- let wait_finished_fut = tokio::spawn(async move {
+ // Note: these requests can get interrupted on process shutdown,
+ // we must not count on them being executed for certain.
+ // For all background things that have to happen with certainty,
+ // they have to be put in a proper queue that is persisted to disk.
+ tokio::spawn(async move {
resp_stream.collect::<Vec<Result<_, _>>>().await;
});
- self.0.background.spawn(wait_finished_fut.map(|_| Ok(())));
}
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 2c6f14fd..e0ced8cc 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
+use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -21,7 +22,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
@@ -39,6 +39,8 @@ use crate::replication_mode::*;
use crate::ring::*;
use crate::rpc_helper::*;
+use crate::system_metrics::*;
+
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
@@ -50,8 +52,6 @@ pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
-pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret";
-
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum SystemRpc {
@@ -76,13 +76,17 @@ impl Rpc for SystemRpc {
type Response = Result<SystemRpc, Error>;
}
+#[derive(Serialize, Deserialize)]
+pub struct PeerList(Vec<(Uuid, SocketAddr)>);
+impl garage_util::migrate::InitialFormat for PeerList {}
+
/// This node's membership manager
pub struct System {
/// The id of this node
pub id: Uuid,
persist_cluster_layout: Persister<ClusterLayout>,
- persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
+ persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
@@ -103,6 +107,8 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ metrics: SystemMetrics,
+
replication_mode: ReplicationMode,
replication_factor: usize,
@@ -110,23 +116,30 @@ pub struct System {
pub ring: watch::Receiver<Arc<Ring>>,
update_ring: Mutex<watch::Sender<Arc<Ring>>>,
- /// The job runner of this node
- pub background: Arc<BackgroundRunner>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
+ /// Path to data directory
+ pub data_dir: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeStatus {
/// Hostname of the node
pub hostname: String,
+
/// Replication factor configured on the node
pub replication_factor: usize,
/// Cluster layout version
pub cluster_layout_version: u64,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
+
+ /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
+ #[serde(default)]
+ pub meta_disk_avail: Option<(u64, u64)>,
+ /// Disk usage on partition containing data directory (tuple: `(avail, total)`)
+ #[serde(default)]
+ pub data_disk_avail: Option<(u64, u64)>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -232,7 +245,6 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
- background: Arc<BackgroundRunner>,
replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
@@ -269,14 +281,10 @@ impl System {
}
};
- let local_status = NodeStatus {
- hostname: gethostname::gethostname()
- .into_string()
- .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
- replication_factor,
- cluster_layout_version: cluster_layout.version,
- cluster_layout_staging_hash: cluster_layout.staging_hash,
- };
+ let metrics = SystemMetrics::new(replication_factor);
+
+ let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
+ local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
@@ -354,7 +362,6 @@ impl System {
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
- background.clone(),
ring.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
@@ -369,11 +376,12 @@ impl System {
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
+ metrics,
ring,
update_ring: Mutex::new(update_ring),
- background,
metadata_dir: config.metadata_dir.clone(),
+ data_dir: config.data_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
Ok(sys)
@@ -411,12 +419,7 @@ impl System {
.get(&n.id.into())
.cloned()
.map(|(_, st)| st)
- .unwrap_or(NodeStatus {
- hostname: "?".to_string(),
- replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
- }),
+ .unwrap_or(NodeStatus::unknown()),
})
.collect::<Vec<_>>();
known_nodes
@@ -444,17 +447,14 @@ impl System {
))
})?;
let mut errors = vec![];
- for ip in addrs.iter() {
- match self
- .netapp
- .clone()
- .try_connect(*ip, pubkey)
- .await
- .err_context(CONNECT_ERROR_MESSAGE)
- {
+ for addr in addrs.iter() {
+ match self.netapp.clone().try_connect(*addr, pubkey).await {
Ok(()) => return Ok(()),
Err(e) => {
- errors.push((*ip, e));
+ errors.push((
+ *addr,
+ Error::Message(connect_error_message(*addr, pubkey, e)),
+ ));
}
}
}
@@ -529,56 +529,61 @@ impl System {
// ---- INTERNALS ----
#[cfg(feature = "consul-discovery")]
- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
+ async fn advertise_to_consul(self: Arc<Self>) {
let c = match &self.consul_discovery {
Some(c) => c,
- _ => return Ok(()),
+ _ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected.");
- return Ok(());
+ return;
}
};
- c.publish_consul_service(
- self.netapp.id,
- &self.local_status.load_full().hostname,
- rpc_public_addr,
- )
- .await
- .err_context("Error while publishing Consul service")
+ if let Err(e) = c
+ .publish_consul_service(
+ self.netapp.id,
+ &self.local_status.load_full().hostname,
+ rpc_public_addr,
+ )
+ .await
+ {
+ error!("Error while publishing Consul service: {}", e);
+ }
}
#[cfg(feature = "kubernetes-discovery")]
- async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
+ async fn advertise_to_kubernetes(self: Arc<Self>) {
let k = match &self.kubernetes_discovery {
Some(k) => k,
- _ => return Ok(()),
+ _ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected.");
- return Ok(());
+ return;
}
};
- publish_kubernetes_node(
+ if let Err(e) = publish_kubernetes_node(
k,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
)
.await
- .err_context("Error while publishing node to kubernetes")
+ {
+ error!("Error while publishing node to Kubernetes: {}", e);
+ }
}
/// Save network configuration to disc
- async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
+ async fn save_cluster_layout(&self) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
self.persist_cluster_layout
.save_async(&ring.layout)
@@ -593,6 +598,9 @@ impl System {
let ring = self.ring.borrow();
new_si.cluster_layout_version = ring.layout.version;
new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
+
+ new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
+
self.local_status.swap(Arc::new(new_si));
}
@@ -630,11 +638,7 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{
- let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2.pull_cluster_layout(from).await;
- Ok(())
- });
+ tokio::spawn(self.clone().pull_cluster_layout(from));
}
self.node_status
@@ -676,18 +680,21 @@ impl System {
drop(update_ring);
let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2
+ tokio::spawn(async move {
+ if let Err(e) = self2
.rpc
.broadcast(
&self2.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
- .await?;
- Ok(())
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
+ }
});
- self.background.spawn(self.clone().save_cluster_layout());
+
+ self.save_cluster_layout().await?;
}
Ok(SystemRpc::Ok)
@@ -734,7 +741,7 @@ impl System {
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
- ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr)))
+ ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
// Fetch peer list from Consul
@@ -773,12 +780,12 @@ impl System {
}
for (node_id, node_addr) in ping_list {
- tokio::spawn(
- self.netapp
- .clone()
- .try_connect(node_addr, node_id)
- .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)),
- );
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await {
+ error!("{}", connect_error_message(node_addr, node_id, e));
+ }
+ });
}
}
@@ -787,11 +794,10 @@ impl System {
}
#[cfg(feature = "consul-discovery")]
- self.background.spawn(self.clone().advertise_to_consul());
+ tokio::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
- self.background
- .spawn(self.clone().advertise_to_kubernetes());
+ tokio::spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
@@ -815,12 +821,16 @@ impl System {
// and append it to the list we are about to save,
// so that no peer ID gets lost in the process.
if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await {
- prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
- peer_list.extend(prev_peer_list);
+ prev_peer_list
+ .0
+ .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
+ peer_list.extend(prev_peer_list.0);
}
// Save new peer list to file
- self.persist_peer_list.save_async(&peer_list).await
+ self.persist_peer_list
+ .save_async(&PeerList(peer_list))
+ .await
}
async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
@@ -855,6 +865,69 @@ impl EndpointHandler<SystemRpc> for System {
}
}
+impl NodeStatus {
+ fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
+ NodeStatus {
+ hostname: gethostname::gethostname()
+ .into_string()
+ .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ replication_factor,
+ cluster_layout_version: layout.version,
+ cluster_layout_staging_hash: layout.staging_hash,
+ meta_disk_avail: None,
+ data_disk_avail: None,
+ }
+ }
+
+ fn unknown() -> Self {
+ NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ cluster_layout_version: 0,
+ cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ meta_disk_avail: None,
+ data_disk_avail: None,
+ }
+ }
+
+ fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) {
+ use systemstat::{Platform, System};
+ let mounts = System::new().mounts().unwrap_or_default();
+
+ let mount_avail = |path: &Path| {
+ mounts
+ .iter()
+ .filter(|x| path.starts_with(&x.fs_mounted_on))
+ .max_by_key(|x| x.fs_mounted_on.len())
+ .map(|x| (x.avail.as_u64(), x.total.as_u64()))
+ };
+
+ self.meta_disk_avail = mount_avail(meta_dir);
+ self.data_disk_avail = mount_avail(data_dir);
+
+ if let Some((avail, total)) = self.meta_disk_avail {
+ metrics
+ .values
+ .meta_disk_avail
+ .store(avail, Ordering::Relaxed);
+ metrics
+ .values
+ .meta_disk_total
+ .store(total, Ordering::Relaxed);
+ }
+ if let Some((avail, total)) = self.data_disk_avail {
+ metrics
+ .values
+ .data_disk_avail
+ .store(avail, Ordering::Relaxed);
+ metrics
+ .values
+ .data_disk_total
+ .store(total, Ordering::Relaxed);
+ }
+ }
+}
+
fn get_default_ip() -> Option<IpAddr> {
pnet_datalink::interfaces()
.iter()
@@ -881,3 +954,11 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
ret
}
+
+fn connect_error_message(
+ addr: SocketAddr,
+ pubkey: ed25519::PublicKey,
+ e: netapp::error::Error,
+) -> String {
+ format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e)
+}
diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs
new file mode 100644
index 00000000..83f5fa97
--- /dev/null
+++ b/src/rpc/system_metrics.rs
@@ -0,0 +1,77 @@
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
+
+use opentelemetry::{global, metrics::*, KeyValue};
+
+/// TableMetrics reference all counter used for metrics
+pub struct SystemMetrics {
+ pub(crate) _garage_build_info: ValueObserver<u64>,
+ pub(crate) _replication_factor: ValueObserver<u64>,
+ pub(crate) _disk_avail: ValueObserver<u64>,
+ pub(crate) _disk_total: ValueObserver<u64>,
+ pub(crate) values: Arc<SystemMetricsValues>,
+}
+
+#[derive(Default)]
+pub struct SystemMetricsValues {
+ pub(crate) data_disk_total: AtomicU64,
+ pub(crate) data_disk_avail: AtomicU64,
+ pub(crate) meta_disk_total: AtomicU64,
+ pub(crate) meta_disk_avail: AtomicU64,
+}
+
+impl SystemMetrics {
+ pub fn new(replication_factor: usize) -> Self {
+ let meter = global::meter("garage_system");
+ let values = Arc::new(SystemMetricsValues::default());
+ let values1 = values.clone();
+ let values2 = values.clone();
+ Self {
+ _garage_build_info: meter
+ .u64_value_observer("garage_build_info", move |observer| {
+ observer.observe(
+ 1,
+ &[KeyValue::new(
+ "version",
+ garage_util::version::garage_version(),
+ )],
+ )
+ })
+ .with_description("Garage build info")
+ .init(),
+ _replication_factor: meter
+ .u64_value_observer("garage_replication_factor", move |observer| {
+ observer.observe(replication_factor as u64, &[])
+ })
+ .with_description("Garage replication factor setting")
+ .init(),
+ _disk_avail: meter
+ .u64_value_observer("garage_local_disk_avail", move |observer| {
+ match values1.data_disk_avail.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "data")]),
+ };
+ match values1.meta_disk_avail.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "metadata")]),
+ };
+ })
+ .with_description("Garage available disk space on each node")
+ .init(),
+ _disk_total: meter
+ .u64_value_observer("garage_local_disk_total", move |observer| {
+ match values2.data_disk_total.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "data")]),
+ };
+ match values2.meta_disk_total.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "metadata")]),
+ };
+ })
+ .with_description("Garage total disk space on each node")
+ .init(),
+ values,
+ }
+ }
+}