aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_helper.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-09-20 16:17:23 +0200
committerAlex <alex@adnab.me>2022-09-20 16:17:23 +0200
commit7a901f7aab29d9ae09c378e3824b8236458f85f1 (patch)
tree068a012a904b9e7552bc1f594b7d93260c69409f /src/rpc/rpc_helper.rs
parent2c312e9cbd58368484e9acb043b7c9d0ebb8905c (diff)
parentded444f6c96f8ab991e762f65760b42e4d64246c (diff)
downloadgarage-0.8.0-beta2.tar.gz
garage-0.8.0-beta2.zip
Merge pull request 'RPC performance changes' (#387) from configurable-timeouts into mainv0.8.0-beta2
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/387
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r--src/rpc/rpc_helper.rs71
1 files changed, 41 insertions, 30 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 19abb4c5..949aced6 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
-use tokio::sync::{watch, Semaphore};
+use tokio::sync::watch;
use opentelemetry::KeyValue;
use opentelemetry::{
@@ -32,32 +32,37 @@ use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring;
-const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
-
-// Don't allow more than 100 concurrent outgoing RPCs.
-const MAX_CONCURRENT_REQUESTS: usize = 100;
+// Default RPC timeout = 5 minutes
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
- /// Max time to wait for reponse
- pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: Option<usize>,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
/// Request priority
pub rs_priority: RequestPriority,
+ /// Custom timeout for this request
+ rs_timeout: Timeout,
+}
+
+#[derive(Copy, Clone)]
+enum Timeout {
+ None,
+ Default,
+ Custom(Duration),
}
impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
- rs_timeout: DEFAULT_TIMEOUT,
rs_quorum: None,
rs_interrupt_after_quorum: false,
rs_priority: prio,
+ rs_timeout: Timeout::Default,
}
}
/// Set quorum to be reached for request
@@ -65,17 +70,22 @@ impl RequestStrategy {
self.rs_quorum = Some(quorum);
self
}
- /// Set timeout of the strategy
- pub fn with_timeout(mut self, timeout: Duration) -> Self {
- self.rs_timeout = timeout;
- self
- }
/// Set if requests can be dropped after quorum has been reached
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
+ /// Deactivate timeout for this request
+ pub fn without_timeout(mut self) -> Self {
+ self.rs_timeout = Timeout::None;
+ self
+ }
+ /// Set custom timeout for this request
+ pub fn with_custom_timeout(mut self, timeout: Duration) -> Self {
+ self.rs_timeout = Timeout::Custom(timeout);
+ self
+ }
}
#[derive(Clone)]
@@ -86,8 +96,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
- request_buffer_semaphore: Arc<Semaphore>,
metrics: RpcMetrics,
+ rpc_timeout: Duration,
}
impl RpcHelper {
@@ -96,21 +106,24 @@ impl RpcHelper {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
+ rpc_timeout: Option<Duration>,
) -> Self {
- let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
-
- let metrics = RpcMetrics::new(sem.clone());
+ let metrics = RpcMetrics::new();
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
background,
ring,
- request_buffer_semaphore: sem,
metrics,
+ rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
}
+ pub fn rpc_timeout(&self) -> Duration {
+ self.0.rpc_timeout
+ }
+
pub async fn call<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
@@ -129,13 +142,6 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)),
];
- let permit = self
- .0
- .request_buffer_semaphore
- .acquire()
- .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
- .await?;
-
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into();
@@ -143,10 +149,16 @@ impl RpcHelper {
.call_streaming(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
+ let timeout = async {
+ match strat.rs_timeout {
+ Timeout::None => futures::future::pending().await,
+ Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await,
+ Timeout::Custom(t) => tokio::time::sleep(t).await,
+ }
+ };
+
select! {
res = rpc_call => {
- drop(permit);
-
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
@@ -158,8 +170,7 @@ impl RpcHelper {
Ok(res?)
}
- _ = tokio::time::sleep(strat.rs_timeout) => {
- drop(permit);
+ () = timeout => {
self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout)
}
@@ -413,7 +424,7 @@ impl RpcHelper {
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
- .unwrap_or_else(|| Duration::from_secs(1));
+ .unwrap_or_else(|| Duration::from_secs(10));
(
*to != self.0.our_node_id,
peer_zone != our_zone,