aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml3
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/metrics.rs55
-rw-r--r--src/rpc/rpc_helper.rs41
4 files changed, 95 insertions, 5 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index f06606e5..57b61a08 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -43,8 +43,9 @@ futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = { version = "0.1", features = ["net"] }
+opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-netapp = "0.3.0"
+netapp = "0.3.1"
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 2c877a7f..736513f4 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -10,6 +10,7 @@ pub mod layout;
pub mod ring;
pub mod system;
+mod metrics;
pub mod rpc_helper;
pub use rpc_helper::*;
diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs
new file mode 100644
index 00000000..c900518c
--- /dev/null
+++ b/src/rpc/metrics.rs
@@ -0,0 +1,55 @@
+use std::sync::Arc;
+
+use opentelemetry::{global, metrics::*};
+use tokio::sync::Semaphore;
+
+/// TableMetrics reference all counter used for metrics
+pub struct RpcMetrics {
+ pub(crate) _rpc_available_permits: ValueObserver<u64>,
+
+ pub(crate) rpc_counter: Counter<u64>,
+ pub(crate) rpc_timeout_counter: Counter<u64>,
+ pub(crate) rpc_netapp_error_counter: Counter<u64>,
+ pub(crate) rpc_garage_error_counter: Counter<u64>,
+
+ pub(crate) rpc_duration: ValueRecorder<f64>,
+ pub(crate) rpc_queueing_time: ValueRecorder<f64>,
+}
+impl RpcMetrics {
+ pub fn new(sem: Arc<Semaphore>) -> Self {
+ let meter = global::meter("garage_rpc");
+ RpcMetrics {
+ _rpc_available_permits: meter
+ .u64_value_observer("rpc.available_permits", move |observer| {
+ observer.observe(sem.available_permits() as u64, &[])
+ })
+ .with_description("Number of available RPC permits")
+ .init(),
+
+ rpc_counter: meter
+ .u64_counter("rpc.request_counter")
+ .with_description("Number of RPC requests emitted")
+ .init(),
+ rpc_timeout_counter: meter
+ .u64_counter("rpc.timeout_counter")
+ .with_description("Number of RPC timeouts")
+ .init(),
+ rpc_netapp_error_counter: meter
+ .u64_counter("rpc.netapp_error_counter")
+ .with_description("Number of communication errors (errors in the Netapp library)")
+ .init(),
+ rpc_garage_error_counter: meter
+ .u64_counter("rpc.garage_error_counter")
+ .with_description("Number of RPC errors (errors happening when handling the RPC)")
+ .init(),
+ rpc_duration: meter
+ .f64_value_recorder("rpc.duration")
+ .with_description("Duration of RPCs")
+ .init(),
+ rpc_queueing_time: meter
+ .f64_value_recorder("rpc.queueing_time")
+ .with_description("Time RPC requests were queued for before being sent")
+ .init(),
+ }
+ }
+}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 68bdfc4f..0d722e43 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -1,11 +1,12 @@
//! Contain structs related to making RPCs
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
+use opentelemetry::KeyValue;
use tokio::select;
use tokio::sync::{watch, Semaphore};
@@ -18,6 +19,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use crate::metrics::RpcMetrics;
use crate::ring::Ring;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@@ -76,7 +78,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
- request_buffer_semaphore: Semaphore,
+ request_buffer_semaphore: Arc<Semaphore>,
+ metrics: RpcMetrics,
}
impl RpcHelper {
@@ -86,12 +89,17 @@ impl RpcHelper {
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
) -> Self {
+ let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE));
+
+ let metrics = RpcMetrics::new(sem.clone());
+
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
background,
ring,
- request_buffer_semaphore: Semaphore::new(REQUEST_BUFFER_SIZE),
+ request_buffer_semaphore: sem,
+ metrics,
}))
}
@@ -120,6 +128,9 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
+ let queueing_start_time = SystemTime::now();
+ let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())];
+
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self
.0
@@ -127,14 +138,36 @@ impl RpcHelper {
.acquire_many(msg_size)
.await?;
+ self.0.metrics.rpc_queueing_time.record(
+ queueing_start_time
+ .elapsed()
+ .map_or(0.0, |d| d.as_secs_f64()),
+ &metric_tags,
+ );
+ self.0.metrics.rpc_counter.add(1, &metric_tags);
+ let rpc_start_time = SystemTime::now();
+
let node_id = to.into();
select! {
res = endpoint.call(&node_id, &msg, strat.rs_priority) => {
drop(permit);
- Ok(res??)
+
+ if res.is_err() {
+ self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
+ }
+ let res = res?;
+
+ self.0.metrics.rpc_duration
+ .record(rpc_start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()), &metric_tags);
+ if res.is_err() {
+ self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags);
+ }
+
+ Ok(res?)
}
_ = tokio::time::sleep(strat.rs_timeout) => {
drop(permit);
+ self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout)
}
}