aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_helper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r--src/rpc/rpc_helper.rs57
1 files changed, 28 insertions, 29 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 19abb4c5..857ed620 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
-use tokio::sync::{watch, Semaphore};
+use tokio::sync::watch;
use opentelemetry::KeyValue;
use opentelemetry::{
@@ -32,32 +32,30 @@ use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring;
-const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
-
-// Don't allow more than 100 concurrent outgoing RPCs.
-const MAX_CONCURRENT_REQUESTS: usize = 100;
+// Default RPC timeout = 5 minutes
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
- /// Max time to wait for reponse
- pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: Option<usize>,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
/// Request priority
pub rs_priority: RequestPriority,
+ /// Deactivate timeout for this request
+ pub rs_no_timeout: bool,
}
impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
- rs_timeout: DEFAULT_TIMEOUT,
rs_quorum: None,
rs_interrupt_after_quorum: false,
rs_priority: prio,
+ rs_no_timeout: false,
}
}
/// Set quorum to be reached for request
@@ -65,17 +63,17 @@ impl RequestStrategy {
self.rs_quorum = Some(quorum);
self
}
- /// Set timeout of the strategy
- pub fn with_timeout(mut self, timeout: Duration) -> Self {
- self.rs_timeout = timeout;
- self
- }
/// Set if requests can be dropped after quorum has been reached
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
+ /// Deactivate timeout for this request
+ pub fn without_timeout(mut self) -> Self {
+ self.rs_no_timeout = true;
+ self
+ }
}
#[derive(Clone)]
@@ -86,8 +84,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
- request_buffer_semaphore: Arc<Semaphore>,
metrics: RpcMetrics,
+ rpc_timeout: Duration,
}
impl RpcHelper {
@@ -96,21 +94,24 @@ impl RpcHelper {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
+ rpc_timeout: Option<Duration>,
) -> Self {
- let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
-
- let metrics = RpcMetrics::new(sem.clone());
+ let metrics = RpcMetrics::new();
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
background,
ring,
- request_buffer_semaphore: sem,
metrics,
+ rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
}
+ pub fn rpc_timeout(&self) -> Duration {
+ self.0.rpc_timeout
+ }
+
pub async fn call<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
@@ -129,13 +130,6 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)),
];
- let permit = self
- .0
- .request_buffer_semaphore
- .acquire()
- .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
- .await?;
-
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into();
@@ -143,10 +137,16 @@ impl RpcHelper {
.call_streaming(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
+ let timeout = async {
+ if strat.rs_no_timeout {
+ futures::future::pending().await
+ } else {
+ tokio::time::sleep(self.0.rpc_timeout).await
+ }
+ };
+
select! {
res = rpc_call => {
- drop(permit);
-
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
@@ -158,8 +158,7 @@ impl RpcHelper {
Ok(res?)
}
- _ = tokio::time::sleep(strat.rs_timeout) => {
- drop(permit);
+ () = timeout => {
self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout)
}