diff options
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 41 |
1 files changed, 37 insertions, 4 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 68bdfc4f..0d722e43 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,11 +1,12 @@ //! Contain structs related to making RPCs use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use futures_util::future::FutureExt; +use opentelemetry::KeyValue; use tokio::select; use tokio::sync::{watch, Semaphore}; @@ -18,6 +19,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use crate::metrics::RpcMetrics; use crate::ring::Ring; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); @@ -76,7 +78,8 @@ struct RpcHelperInner { fullmesh: Arc<FullMeshPeeringStrategy>, background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, - request_buffer_semaphore: Semaphore, + request_buffer_semaphore: Arc<Semaphore>, + metrics: RpcMetrics, } impl RpcHelper { @@ -86,12 +89,17 @@ impl RpcHelper { background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, ) -> Self { + let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)); + + let metrics = RpcMetrics::new(sem.clone()); + Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, background, ring, - request_buffer_semaphore: Semaphore::new(REQUEST_BUFFER_SIZE), + request_buffer_semaphore: sem, + metrics, })) } @@ -120,6 +128,9 @@ impl RpcHelper { M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { + let queueing_start_time = SystemTime::now(); + let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())]; + let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let permit = self .0 @@ -127,14 +138,36 @@ impl RpcHelper { .acquire_many(msg_size) .await?; + self.0.metrics.rpc_queueing_time.record( + queueing_start_time + .elapsed() + .map_or(0.0, |d| d.as_secs_f64()), + &metric_tags, + ); + self.0.metrics.rpc_counter.add(1, &metric_tags); + let rpc_start_time = SystemTime::now(); + let node_id = to.into(); select! { res = endpoint.call(&node_id, &msg, strat.rs_priority) => { drop(permit); - Ok(res??) + + if res.is_err() { + self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); + } + let res = res?; + + self.0.metrics.rpc_duration + .record(rpc_start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()), &metric_tags); + if res.is_err() { + self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags); + } + + Ok(res?) } _ = tokio::time::sleep(strat.rs_timeout) => { drop(permit); + self.0.metrics.rpc_timeout_counter.add(1, &metric_tags); Err(Error::Timeout) } } |