aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml2
-rw-r--r--src/rpc/metrics.rs19
-rw-r--r--src/rpc/rpc_helper.rs71
-rw-r--r--src/rpc/system.rs16
4 files changed, 55 insertions, 53 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index e51f1f73..d61acea4 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
-netapp = { version = "0.5.1", features = ["telemetry"] }
+netapp = { version = "0.5.2", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs
index c900518c..61f8fa79 100644
--- a/src/rpc/metrics.rs
+++ b/src/rpc/metrics.rs
@@ -1,31 +1,18 @@
-use std::sync::Arc;
-
use opentelemetry::{global, metrics::*};
-use tokio::sync::Semaphore;
/// TableMetrics reference all counter used for metrics
pub struct RpcMetrics {
- pub(crate) _rpc_available_permits: ValueObserver<u64>,
-
pub(crate) rpc_counter: Counter<u64>,
pub(crate) rpc_timeout_counter: Counter<u64>,
pub(crate) rpc_netapp_error_counter: Counter<u64>,
pub(crate) rpc_garage_error_counter: Counter<u64>,
pub(crate) rpc_duration: ValueRecorder<f64>,
- pub(crate) rpc_queueing_time: ValueRecorder<f64>,
}
impl RpcMetrics {
- pub fn new(sem: Arc<Semaphore>) -> Self {
+ pub fn new() -> Self {
let meter = global::meter("garage_rpc");
RpcMetrics {
- _rpc_available_permits: meter
- .u64_value_observer("rpc.available_permits", move |observer| {
- observer.observe(sem.available_permits() as u64, &[])
- })
- .with_description("Number of available RPC permits")
- .init(),
-
rpc_counter: meter
.u64_counter("rpc.request_counter")
.with_description("Number of RPC requests emitted")
@@ -46,10 +33,6 @@ impl RpcMetrics {
.f64_value_recorder("rpc.duration")
.with_description("Duration of RPCs")
.init(),
- rpc_queueing_time: meter
- .f64_value_recorder("rpc.queueing_time")
- .with_description("Time RPC requests were queued for before being sent")
- .init(),
}
}
}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 19abb4c5..949aced6 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
-use tokio::sync::{watch, Semaphore};
+use tokio::sync::watch;
use opentelemetry::KeyValue;
use opentelemetry::{
@@ -32,32 +32,37 @@ use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring;
-const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
-
-// Don't allow more than 100 concurrent outgoing RPCs.
-const MAX_CONCURRENT_REQUESTS: usize = 100;
+// Default RPC timeout = 5 minutes
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
- /// Max time to wait for reponse
- pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: Option<usize>,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
/// Request priority
pub rs_priority: RequestPriority,
+ /// Custom timeout for this request
+ rs_timeout: Timeout,
+}
+
+#[derive(Copy, Clone)]
+enum Timeout {
+ None,
+ Default,
+ Custom(Duration),
}
impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
- rs_timeout: DEFAULT_TIMEOUT,
rs_quorum: None,
rs_interrupt_after_quorum: false,
rs_priority: prio,
+ rs_timeout: Timeout::Default,
}
}
/// Set quorum to be reached for request
@@ -65,17 +70,22 @@ impl RequestStrategy {
self.rs_quorum = Some(quorum);
self
}
- /// Set timeout of the strategy
- pub fn with_timeout(mut self, timeout: Duration) -> Self {
- self.rs_timeout = timeout;
- self
- }
/// Set if requests can be dropped after quorum has been reached
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
+ /// Deactivate timeout for this request
+ pub fn without_timeout(mut self) -> Self {
+ self.rs_timeout = Timeout::None;
+ self
+ }
+ /// Set custom timeout for this request
+ pub fn with_custom_timeout(mut self, timeout: Duration) -> Self {
+ self.rs_timeout = Timeout::Custom(timeout);
+ self
+ }
}
#[derive(Clone)]
@@ -86,8 +96,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
- request_buffer_semaphore: Arc<Semaphore>,
metrics: RpcMetrics,
+ rpc_timeout: Duration,
}
impl RpcHelper {
@@ -96,21 +106,24 @@ impl RpcHelper {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
+ rpc_timeout: Option<Duration>,
) -> Self {
- let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
-
- let metrics = RpcMetrics::new(sem.clone());
+ let metrics = RpcMetrics::new();
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
background,
ring,
- request_buffer_semaphore: sem,
metrics,
+ rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
}
+ pub fn rpc_timeout(&self) -> Duration {
+ self.0.rpc_timeout
+ }
+
pub async fn call<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
@@ -129,13 +142,6 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)),
];
- let permit = self
- .0
- .request_buffer_semaphore
- .acquire()
- .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
- .await?;
-
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into();
@@ -143,10 +149,16 @@ impl RpcHelper {
.call_streaming(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
+ let timeout = async {
+ match strat.rs_timeout {
+ Timeout::None => futures::future::pending().await,
+ Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await,
+ Timeout::Custom(t) => tokio::time::sleep(t).await,
+ }
+ };
+
select! {
res = rpc_call => {
- drop(permit);
-
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
@@ -158,8 +170,7 @@ impl RpcHelper {
Ok(res?)
}
- _ = tokio::time::sleep(strat.rs_timeout) => {
- drop(permit);
+ () = timeout => {
self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout)
}
@@ -413,7 +424,7 @@ impl RpcHelper {
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
- .unwrap_or_else(|| Duration::from_secs(1));
+ .unwrap_or_else(|| Duration::from_secs(10));
(
*to != self.0.our_node_id,
peer_zone != our_zone,
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 2c6136a8..f8121193 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -37,7 +37,6 @@ use crate::rpc_helper::*;
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
-const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15);
/// Version tag used for version check upon Netapp connection.
/// Cluster nodes with different version tags are deemed
@@ -280,6 +279,9 @@ impl System {
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
+ if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
+ fullmesh.set_ping_timeout_millis(ping_timeout);
+ }
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
@@ -317,7 +319,13 @@ impl System {
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
- rpc: RpcHelper::new(netapp.id.into(), fullmesh, background.clone(), ring.clone()),
+ rpc: RpcHelper::new(
+ netapp.id.into(),
+ fullmesh,
+ background.clone(),
+ ring.clone(),
+ config.rpc_timeout_msec.map(Duration::from_millis),
+ ),
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
@@ -600,7 +608,7 @@ impl System {
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
- RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
@@ -724,7 +732,7 @@ impl System {
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayout,
- RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {