diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 3 | ||||
-rw-r--r-- | src/rpc/lib.rs | 1 | ||||
-rw-r--r-- | src/rpc/metrics.rs | 55 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 41 |
4 files changed, 95 insertions, 5 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index f06606e5..57b61a08 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -43,8 +43,9 @@ futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio-stream = { version = "0.1", features = ["net"] } +opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -netapp = "0.3.0" +netapp = "0.3.1" hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 2c877a7f..736513f4 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -10,6 +10,7 @@ pub mod layout; pub mod ring; pub mod system; +mod metrics; pub mod rpc_helper; pub use rpc_helper::*; diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs new file mode 100644 index 00000000..c900518c --- /dev/null +++ b/src/rpc/metrics.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; + +use opentelemetry::{global, metrics::*}; +use tokio::sync::Semaphore; + +/// TableMetrics reference all counter used for metrics +pub struct RpcMetrics { + pub(crate) _rpc_available_permits: ValueObserver<u64>, + + pub(crate) rpc_counter: Counter<u64>, + pub(crate) rpc_timeout_counter: Counter<u64>, + pub(crate) rpc_netapp_error_counter: Counter<u64>, + pub(crate) rpc_garage_error_counter: Counter<u64>, + + pub(crate) rpc_duration: ValueRecorder<f64>, + pub(crate) rpc_queueing_time: ValueRecorder<f64>, +} +impl RpcMetrics { + pub fn new(sem: Arc<Semaphore>) -> Self { + let meter = global::meter("garage_rpc"); + RpcMetrics { + _rpc_available_permits: meter + .u64_value_observer("rpc.available_permits", move |observer| { + observer.observe(sem.available_permits() as u64, &[]) + }) + .with_description("Number of available RPC permits") + .init(), + + rpc_counter: meter + .u64_counter("rpc.request_counter") + .with_description("Number of RPC requests emitted") + .init(), + rpc_timeout_counter: meter + .u64_counter("rpc.timeout_counter") + .with_description("Number of RPC timeouts") + .init(), + rpc_netapp_error_counter: meter + .u64_counter("rpc.netapp_error_counter") + .with_description("Number of communication errors (errors in the Netapp library)") + .init(), + rpc_garage_error_counter: meter + .u64_counter("rpc.garage_error_counter") + .with_description("Number of RPC errors (errors happening when handling the RPC)") + .init(), + rpc_duration: meter + .f64_value_recorder("rpc.duration") + .with_description("Duration of RPCs") + .init(), + rpc_queueing_time: meter + .f64_value_recorder("rpc.queueing_time") + .with_description("Time RPC requests were queued for before being sent") + .init(), + } + } +} diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 68bdfc4f..0d722e43 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,11 +1,12 @@ //! Contain structs related to making RPCs use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use futures_util::future::FutureExt; +use opentelemetry::KeyValue; use tokio::select; use tokio::sync::{watch, Semaphore}; @@ -18,6 +19,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use crate::metrics::RpcMetrics; use crate::ring::Ring; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); @@ -76,7 +78,8 @@ struct RpcHelperInner { fullmesh: Arc<FullMeshPeeringStrategy>, background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, - request_buffer_semaphore: Semaphore, + request_buffer_semaphore: Arc<Semaphore>, + metrics: RpcMetrics, } impl RpcHelper { @@ -86,12 +89,17 @@ impl RpcHelper { background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, ) -> Self { + let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)); + + let metrics = RpcMetrics::new(sem.clone()); + Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, background, ring, - request_buffer_semaphore: Semaphore::new(REQUEST_BUFFER_SIZE), + request_buffer_semaphore: sem, + metrics, })) } @@ -120,6 +128,9 @@ impl RpcHelper { M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { + let queueing_start_time = SystemTime::now(); + let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())]; + let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let permit = self .0 @@ -127,14 +138,36 @@ impl RpcHelper { .acquire_many(msg_size) .await?; + self.0.metrics.rpc_queueing_time.record( + queueing_start_time + .elapsed() + .map_or(0.0, |d| d.as_secs_f64()), + &metric_tags, + ); + self.0.metrics.rpc_counter.add(1, &metric_tags); + let rpc_start_time = SystemTime::now(); + let node_id = to.into(); select! { res = endpoint.call(&node_id, &msg, strat.rs_priority) => { drop(permit); - Ok(res??) + + if res.is_err() { + self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); + } + let res = res?; + + self.0.metrics.rpc_duration + .record(rpc_start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()), &metric_tags); + if res.is_err() { + self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags); + } + + Ok(res?) } _ = tokio::time::sleep(strat.rs_timeout) => { drop(permit); + self.0.metrics.rpc_timeout_counter.add(1, &metric_tags); Err(Error::Timeout) } } |