From 56592e18538b379ccaaa7b7c1990a599ac83b191 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 19 Sep 2022 20:12:19 +0200 Subject: RPC performance changes - configurable ping timeout - single, much higher, configurable RPC timeout - no more concurrency semaphore --- src/rpc/Cargo.toml | 2 +- src/rpc/metrics.rs | 19 +---------------- src/rpc/rpc_helper.rs | 57 +++++++++++++++++++++++++-------------------------- src/rpc/system.rs | 16 +++++++++++---- 4 files changed, 42 insertions(+), 52 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index e51f1f73..d61acea4 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -netapp = { version = "0.5.1", features = ["telemetry"] } +netapp = { version = "0.5.2", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs index c900518c..61f8fa79 100644 --- a/src/rpc/metrics.rs +++ b/src/rpc/metrics.rs @@ -1,31 +1,18 @@ -use std::sync::Arc; - use opentelemetry::{global, metrics::*}; -use tokio::sync::Semaphore; /// TableMetrics reference all counter used for metrics pub struct RpcMetrics { - pub(crate) _rpc_available_permits: ValueObserver, - pub(crate) rpc_counter: Counter, pub(crate) rpc_timeout_counter: Counter, pub(crate) rpc_netapp_error_counter: Counter, pub(crate) rpc_garage_error_counter: Counter, pub(crate) rpc_duration: ValueRecorder, - pub(crate) rpc_queueing_time: ValueRecorder, } impl RpcMetrics { - pub fn new(sem: Arc) -> Self { + pub fn new() -> Self { let meter = global::meter("garage_rpc"); RpcMetrics { - _rpc_available_permits: meter - .u64_value_observer("rpc.available_permits", move |observer| { - observer.observe(sem.available_permits() as u64, &[]) - }) - .with_description("Number of available RPC permits") - .init(), - rpc_counter: meter .u64_counter("rpc.request_counter") .with_description("Number of RPC requests emitted") @@ -46,10 +33,6 @@ impl RpcMetrics { .f64_value_recorder("rpc.duration") .with_description("Duration of RPCs") .init(), - rpc_queueing_time: meter - .f64_value_recorder("rpc.queueing_time") - .with_description("Time RPC requests were queued for before being sent") - .init(), } } } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 19abb4c5..857ed620 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use futures_util::future::FutureExt; use tokio::select; -use tokio::sync::{watch, Semaphore}; +use tokio::sync::watch; use opentelemetry::KeyValue; use opentelemetry::{ @@ -32,32 +32,30 @@ use garage_util::metrics::RecordDuration; use crate::metrics::RpcMetrics; use crate::ring::Ring; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); - -// Don't allow more than 100 concurrent outgoing RPCs. -const MAX_CONCURRENT_REQUESTS: usize = 100; +// Default RPC timeout = 5 minutes +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); /// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { - /// Max time to wait for reponse - pub rs_timeout: Duration, /// Min number of response to consider the request successful pub rs_quorum: Option, /// Should requests be dropped after enough response are received pub rs_interrupt_after_quorum: bool, /// Request priority pub rs_priority: RequestPriority, + /// Deactivate timeout for this request + pub rs_no_timeout: bool, } impl RequestStrategy { /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_priority(prio: RequestPriority) -> Self { RequestStrategy { - rs_timeout: DEFAULT_TIMEOUT, rs_quorum: None, rs_interrupt_after_quorum: false, rs_priority: prio, + rs_no_timeout: false, } } /// Set quorum to be reached for request @@ -65,17 +63,17 @@ impl RequestStrategy { self.rs_quorum = Some(quorum); self } - /// Set timeout of the strategy - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.rs_timeout = timeout; - self - } /// Set if requests can be dropped after quorum has been reached /// In general true for read requests, and false for write pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self } + /// Deactivate timeout for this request + pub fn without_timeout(mut self) -> Self { + self.rs_no_timeout = true; + self + } } #[derive(Clone)] @@ -86,8 +84,8 @@ struct RpcHelperInner { fullmesh: Arc, background: Arc, ring: watch::Receiver>, - request_buffer_semaphore: Arc, metrics: RpcMetrics, + rpc_timeout: Duration, } impl RpcHelper { @@ -96,21 +94,24 @@ impl RpcHelper { fullmesh: Arc, background: Arc, ring: watch::Receiver>, + rpc_timeout: Option, ) -> Self { - let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); - - let metrics = RpcMetrics::new(sem.clone()); + let metrics = RpcMetrics::new(); Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, background, ring, - request_buffer_semaphore: sem, metrics, + rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), })) } + pub fn rpc_timeout(&self) -> Duration { + self.0.rpc_timeout + } + pub async fn call( &self, endpoint: &Endpoint, @@ -129,13 +130,6 @@ impl RpcHelper { KeyValue::new("to", format!("{:?}", to)), ]; - let permit = self - .0 - .request_buffer_semaphore - .acquire() - .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags) - .await?; - self.0.metrics.rpc_counter.add(1, &metric_tags); let node_id = to.into(); @@ -143,10 +137,16 @@ impl RpcHelper { .call_streaming(&node_id, msg, strat.rs_priority) .record_duration(&self.0.metrics.rpc_duration, &metric_tags); + let timeout = async { + if strat.rs_no_timeout { + futures::future::pending().await + } else { + tokio::time::sleep(self.0.rpc_timeout).await + } + }; + select! { res = rpc_call => { - drop(permit); - if res.is_err() { self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); } @@ -158,8 +158,7 @@ impl RpcHelper { Ok(res?) } - _ = tokio::time::sleep(strat.rs_timeout) => { - drop(permit); + () = timeout => { self.0.metrics.rpc_timeout_counter.add(1, &metric_tags); Err(Error::Timeout) } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 2c6136a8..f8121193 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -37,7 +37,6 @@ use crate::rpc_helper::*; const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); -const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15); /// Version tag used for version check upon Netapp connection. /// Cluster nodes with different version tags are deemed @@ -280,6 +279,9 @@ impl System { let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); + if let Some(ping_timeout) = config.rpc_ping_timeout_msec { + fullmesh.set_ping_timeout_millis(ping_timeout); + } let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); @@ -317,7 +319,13 @@ impl System { node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), fullmesh: fullmesh.clone(), - rpc: RpcHelper::new(netapp.id.into(), fullmesh, background.clone(), ring.clone()), + rpc: RpcHelper::new( + netapp.id.into(), + fullmesh, + background.clone(), + ring.clone(), + config.rpc_timeout_msec.map(Duration::from_millis), + ), system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, @@ -600,7 +608,7 @@ impl System { .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH), ) .await; @@ -724,7 +732,7 @@ impl System { &self.system_endpoint, peer, SystemRpc::PullClusterLayout, - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH), ) .await; if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { -- cgit v1.2.3 From ded444f6c96f8ab991e762f65760b42e4d64246c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 20 Sep 2022 16:01:41 +0200 Subject: Ability to have custom timeouts in request strategy (not used) --- src/rpc/rpc_helper.rs | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 857ed620..949aced6 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -44,8 +44,15 @@ pub struct RequestStrategy { pub rs_interrupt_after_quorum: bool, /// Request priority pub rs_priority: RequestPriority, - /// Deactivate timeout for this request - pub rs_no_timeout: bool, + /// Custom timeout for this request + rs_timeout: Timeout, +} + +#[derive(Copy, Clone)] +enum Timeout { + None, + Default, + Custom(Duration), } impl RequestStrategy { @@ -55,7 +62,7 @@ impl RequestStrategy { rs_quorum: None, rs_interrupt_after_quorum: false, rs_priority: prio, - rs_no_timeout: false, + rs_timeout: Timeout::Default, } } /// Set quorum to be reached for request @@ -71,7 +78,12 @@ impl RequestStrategy { } /// Deactivate timeout for this request pub fn without_timeout(mut self) -> Self { - self.rs_no_timeout = true; + self.rs_timeout = Timeout::None; + self + } + /// Set custom timeout for this request + pub fn with_custom_timeout(mut self, timeout: Duration) -> Self { + self.rs_timeout = Timeout::Custom(timeout); self } } @@ -138,10 +150,10 @@ impl RpcHelper { .record_duration(&self.0.metrics.rpc_duration, &metric_tags); let timeout = async { - if strat.rs_no_timeout { - futures::future::pending().await - } else { - tokio::time::sleep(self.0.rpc_timeout).await + match strat.rs_timeout { + Timeout::None => futures::future::pending().await, + Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await, + Timeout::Custom(t) => tokio::time::sleep(t).await, } }; @@ -412,7 +424,7 @@ impl RpcHelper { .iter() .find(|x| x.id.as_ref() == to.as_slice()) .and_then(|pi| pi.avg_ping) - .unwrap_or_else(|| Duration::from_secs(1)); + .unwrap_or_else(|| Duration::from_secs(10)); ( *to != self.0.our_node_id, peer_zone != our_zone, -- cgit v1.2.3