aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_helper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r--src/rpc/rpc_helper.rs41
1 files changed, 37 insertions, 4 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 68bdfc4f..0d722e43 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -1,11 +1,12 @@
//! Contain structs related to making RPCs
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
+use opentelemetry::KeyValue;
use tokio::select;
use tokio::sync::{watch, Semaphore};
@@ -18,6 +19,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use crate::metrics::RpcMetrics;
use crate::ring::Ring;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@@ -76,7 +78,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
- request_buffer_semaphore: Semaphore,
+ request_buffer_semaphore: Arc<Semaphore>,
+ metrics: RpcMetrics,
}
impl RpcHelper {
@@ -86,12 +89,17 @@ impl RpcHelper {
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
) -> Self {
+ let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE));
+
+ let metrics = RpcMetrics::new(sem.clone());
+
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
background,
ring,
- request_buffer_semaphore: Semaphore::new(REQUEST_BUFFER_SIZE),
+ request_buffer_semaphore: sem,
+ metrics,
}))
}
@@ -120,6 +128,9 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
+ let queueing_start_time = SystemTime::now();
+ let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())];
+
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self
.0
@@ -127,14 +138,36 @@ impl RpcHelper {
.acquire_many(msg_size)
.await?;
+ self.0.metrics.rpc_queueing_time.record(
+ queueing_start_time
+ .elapsed()
+ .map_or(0.0, |d| d.as_secs_f64()),
+ &metric_tags,
+ );
+ self.0.metrics.rpc_counter.add(1, &metric_tags);
+ let rpc_start_time = SystemTime::now();
+
let node_id = to.into();
select! {
res = endpoint.call(&node_id, &msg, strat.rs_priority) => {
drop(permit);
- Ok(res??)
+
+ if res.is_err() {
+ self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
+ }
+ let res = res?;
+
+ self.0.metrics.rpc_duration
+ .record(rpc_start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()), &metric_tags);
+ if res.is_err() {
+ self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags);
+ }
+
+ Ok(res?)
}
_ = tokio::time::sleep(strat.rs_timeout) => {
drop(permit);
+ self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout)
}
}