diff options
author | Alex <alex@adnab.me> | 2022-09-20 16:17:23 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-09-20 16:17:23 +0200 |
commit | 7a901f7aab29d9ae09c378e3824b8236458f85f1 (patch) | |
tree | 068a012a904b9e7552bc1f594b7d93260c69409f /src | |
parent | 2c312e9cbd58368484e9acb043b7c9d0ebb8905c (diff) | |
parent | ded444f6c96f8ab991e762f65760b42e4d64246c (diff) | |
download | garage-0.8.0-beta2.tar.gz garage-0.8.0-beta2.zip |
Merge pull request 'RPC performance changes' (#387) from configurable-timeouts into mainv0.8.0-beta2
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/387
Diffstat (limited to 'src')
-rw-r--r-- | src/block/manager.rs | 18 | ||||
-rw-r--r-- | src/block/resync.rs | 14 | ||||
-rw-r--r-- | src/model/k2v/causality.rs | 2 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 8 | ||||
-rw-r--r-- | src/model/k2v/rpc.rs | 36 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 2 | ||||
-rw-r--r-- | src/rpc/metrics.rs | 19 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 71 | ||||
-rw-r--r-- | src/rpc/system.rs | 16 | ||||
-rw-r--r-- | src/table/gc.rs | 10 | ||||
-rw-r--r-- | src/table/sync.rs | 16 | ||||
-rw-r--r-- | src/table/table.rs | 14 | ||||
-rw-r--r-- | src/util/config.rs | 5 |
13 files changed, 102 insertions, 129 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index ec694fc8..7f439b96 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -41,9 +41,6 @@ use crate::resync::*; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; -// Timeout for RPCs that read and write blocks to remote nodes -pub(crate) const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(60); - // The delay between the moment when the reference counter // drops to zero, and the moment where we allow ourselves // to delete the block locally. @@ -183,7 +180,7 @@ impl BlockManager { }; return Ok((header, stream)); } - _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => { + _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { debug!("Node {:?} didn't return block in time, trying next.", node); } }; @@ -235,7 +232,7 @@ impl BlockManager { } } } - _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => { + _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { debug!("Node {:?} didn't return block in time, trying next.", node); } }; @@ -300,8 +297,7 @@ impl BlockManager { &who[..], put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) - .with_quorum(self.replication.write_quorum()) - .with_timeout(BLOCK_RW_TIMEOUT), + .with_quorum(self.replication.write_quorum()), ) .await?; @@ -336,7 +332,10 @@ impl BlockManager { // we will fecth it from someone. let this = self.clone(); tokio::spawn(async move { - if let Err(e) = this.resync.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) { + if let Err(e) = this + .resync + .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout()) + { error!("Block {:?} could not be put in resync queue: {}.", hash, e); } }); @@ -444,7 +443,8 @@ impl BlockManager { Ok(c) => c, Err(e) => { // Not found but maybe we should have had it ?? - self.resync.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; + self.resync + .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; return Err(Into::into(e)); } }; diff --git a/src/block/resync.rs b/src/block/resync.rs index bde3e98c..ada3ac54 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -33,14 +33,6 @@ use garage_table::replication::TableReplication; use crate::manager::*; -// Timeout for RPCs that ask other nodes whether they need a copy -// of a given block before we delete it locally -// The timeout here is relatively low because we don't want to block -// the entire resync loop when some nodes are not responding. -// Nothing will be deleted if the nodes don't answer the queries, -// we will just retry later. -const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15); - // The delay between the time where a resync operation fails // and the time when it is retried, with exponential backoff // (multiplied by 2, 4, 8, 16, etc. for every consecutive failure). @@ -346,8 +338,7 @@ impl BlockResyncManager { &manager.endpoint, &who, BlockRpc::NeedBlockQuery(*hash), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await?; @@ -394,8 +385,7 @@ impl BlockResyncManager { &need_nodes[..], put_block_message, RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(need_nodes.len()) - .with_timeout(BLOCK_RW_TIMEOUT), + .with_quorum(need_nodes.len()), ) .await .err_context("PutBlock RPC")?; diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 8c76a32b..9a692870 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -15,7 +15,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId { u64::from_be_bytes(tmp) } -#[derive(PartialEq, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] pub struct CausalContext { pub vector_clock: BTreeMap<K2VNodeId, u64>, } diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index baa1db4b..7860cb17 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { pub partition: K2VItemPartition, pub sort_key: String, @@ -25,19 +25,19 @@ pub struct K2VItem { items: BTreeMap<K2VNodeId, DvvsEntry>, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] pub struct K2VItemPartition { pub bucket_id: Uuid, pub partition_key: String, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] struct DvvsEntry { t_discard: u64, values: Vec<(u64, DvvsValue)>, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum DvvsValue { Value(#[serde(with = "serde_bytes")] Vec<u8>), Deleted, diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 90101d0f..a74df277 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -23,7 +23,6 @@ use garage_rpc::system::System; use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; -use garage_table::table::TABLE_RPC_TIMEOUT; use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; @@ -117,7 +116,6 @@ impl K2VRpcHandler { }), RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(1) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; @@ -169,7 +167,6 @@ impl K2VRpcHandler { K2VRpc::InsertManyItems(items), RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(1) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; @@ -205,22 +202,23 @@ impl K2VRpcHandler { .replication .write_nodes(&poll_key.partition.hash()); - let resps = self - .system - .rpc - .try_call_many( - &self.endpoint, - &nodes[..], - K2VRpc::PollItem { - key: poll_key, - causal_context, - timeout_msec, - }, - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.item_table.data.replication.read_quorum()) - .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT), - ) - .await?; + let rpc = self.system.rpc.try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::PollItem { + key: poll_key, + causal_context, + timeout_msec, + }, + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.item_table.data.replication.read_quorum()) + .without_timeout(), + ); + let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let resps = select! { + r = rpc => r?, + _ = tokio::time::sleep(timeout_duration) => return Ok(None), + }; let mut resp: Option<K2VItem> = None; for v in resps { diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index e51f1f73..d61acea4 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -netapp = { version = "0.5.1", features = ["telemetry"] } +netapp = { version = "0.5.2", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs index c900518c..61f8fa79 100644 --- a/src/rpc/metrics.rs +++ b/src/rpc/metrics.rs @@ -1,31 +1,18 @@ -use std::sync::Arc; - use opentelemetry::{global, metrics::*}; -use tokio::sync::Semaphore; /// TableMetrics reference all counter used for metrics pub struct RpcMetrics { - pub(crate) _rpc_available_permits: ValueObserver<u64>, - pub(crate) rpc_counter: Counter<u64>, pub(crate) rpc_timeout_counter: Counter<u64>, pub(crate) rpc_netapp_error_counter: Counter<u64>, pub(crate) rpc_garage_error_counter: Counter<u64>, pub(crate) rpc_duration: ValueRecorder<f64>, - pub(crate) rpc_queueing_time: ValueRecorder<f64>, } impl RpcMetrics { - pub fn new(sem: Arc<Semaphore>) -> Self { + pub fn new() -> Self { let meter = global::meter("garage_rpc"); RpcMetrics { - _rpc_available_permits: meter - .u64_value_observer("rpc.available_permits", move |observer| { - observer.observe(sem.available_permits() as u64, &[]) - }) - .with_description("Number of available RPC permits") - .init(), - rpc_counter: meter .u64_counter("rpc.request_counter") .with_description("Number of RPC requests emitted") @@ -46,10 +33,6 @@ impl RpcMetrics { .f64_value_recorder("rpc.duration") .with_description("Duration of RPCs") .init(), - rpc_queueing_time: meter - .f64_value_recorder("rpc.queueing_time") - .with_description("Time RPC requests were queued for before being sent") - .init(), } } } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 19abb4c5..949aced6 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use futures_util::future::FutureExt; use tokio::select; -use tokio::sync::{watch, Semaphore}; +use tokio::sync::watch; use opentelemetry::KeyValue; use opentelemetry::{ @@ -32,32 +32,37 @@ use garage_util::metrics::RecordDuration; use crate::metrics::RpcMetrics; use crate::ring::Ring; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); - -// Don't allow more than 100 concurrent outgoing RPCs. -const MAX_CONCURRENT_REQUESTS: usize = 100; +// Default RPC timeout = 5 minutes +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); /// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { - /// Max time to wait for reponse - pub rs_timeout: Duration, /// Min number of response to consider the request successful pub rs_quorum: Option<usize>, /// Should requests be dropped after enough response are received pub rs_interrupt_after_quorum: bool, /// Request priority pub rs_priority: RequestPriority, + /// Custom timeout for this request + rs_timeout: Timeout, +} + +#[derive(Copy, Clone)] +enum Timeout { + None, + Default, + Custom(Duration), } impl RequestStrategy { /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_priority(prio: RequestPriority) -> Self { RequestStrategy { - rs_timeout: DEFAULT_TIMEOUT, rs_quorum: None, rs_interrupt_after_quorum: false, rs_priority: prio, + rs_timeout: Timeout::Default, } } /// Set quorum to be reached for request @@ -65,17 +70,22 @@ impl RequestStrategy { self.rs_quorum = Some(quorum); self } - /// Set timeout of the strategy - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.rs_timeout = timeout; - self - } /// Set if requests can be dropped after quorum has been reached /// In general true for read requests, and false for write pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self } + /// Deactivate timeout for this request + pub fn without_timeout(mut self) -> Self { + self.rs_timeout = Timeout::None; + self + } + /// Set custom timeout for this request + pub fn with_custom_timeout(mut self, timeout: Duration) -> Self { + self.rs_timeout = Timeout::Custom(timeout); + self + } } #[derive(Clone)] @@ -86,8 +96,8 @@ struct RpcHelperInner { fullmesh: Arc<FullMeshPeeringStrategy>, background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, - request_buffer_semaphore: Arc<Semaphore>, metrics: RpcMetrics, + rpc_timeout: Duration, } impl RpcHelper { @@ -96,21 +106,24 @@ impl RpcHelper { fullmesh: Arc<FullMeshPeeringStrategy>, background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, + rpc_timeout: Option<Duration>, ) -> Self { - let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); - - let metrics = RpcMetrics::new(sem.clone()); + let metrics = RpcMetrics::new(); Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, background, ring, - request_buffer_semaphore: sem, metrics, + rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), })) } + pub fn rpc_timeout(&self) -> Duration { + self.0.rpc_timeout + } + pub async fn call<M, N, H, S>( &self, endpoint: &Endpoint<M, H>, @@ -129,13 +142,6 @@ impl RpcHelper { KeyValue::new("to", format!("{:?}", to)), ]; - let permit = self - .0 - .request_buffer_semaphore - .acquire() - .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags) - .await?; - self.0.metrics.rpc_counter.add(1, &metric_tags); let node_id = to.into(); @@ -143,10 +149,16 @@ impl RpcHelper { .call_streaming(&node_id, msg, strat.rs_priority) .record_duration(&self.0.metrics.rpc_duration, &metric_tags); + let timeout = async { + match strat.rs_timeout { + Timeout::None => futures::future::pending().await, + Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await, + Timeout::Custom(t) => tokio::time::sleep(t).await, + } + }; + select! { res = rpc_call => { - drop(permit); - if res.is_err() { self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); } @@ -158,8 +170,7 @@ impl RpcHelper { Ok(res?) } - _ = tokio::time::sleep(strat.rs_timeout) => { - drop(permit); + () = timeout => { self.0.metrics.rpc_timeout_counter.add(1, &metric_tags); Err(Error::Timeout) } @@ -413,7 +424,7 @@ impl RpcHelper { .iter() .find(|x| x.id.as_ref() == to.as_slice()) .and_then(|pi| pi.avg_ping) - .unwrap_or_else(|| Duration::from_secs(1)); + .unwrap_or_else(|| Duration::from_secs(10)); ( *to != self.0.our_node_id, peer_zone != our_zone, diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 2c6136a8..f8121193 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -37,7 +37,6 @@ use crate::rpc_helper::*; const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); -const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15); /// Version tag used for version check upon Netapp connection. /// Cluster nodes with different version tags are deemed @@ -280,6 +279,9 @@ impl System { let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); + if let Some(ping_timeout) = config.rpc_ping_timeout_msec { + fullmesh.set_ping_timeout_millis(ping_timeout); + } let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); @@ -317,7 +319,13 @@ impl System { node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), fullmesh: fullmesh.clone(), - rpc: RpcHelper::new(netapp.id.into(), fullmesh, background.clone(), ring.clone()), + rpc: RpcHelper::new( + netapp.id.into(), + fullmesh, + background.clone(), + ring.clone(), + config.rpc_timeout_msec.map(Duration::from_millis), + ), system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, @@ -600,7 +608,7 @@ impl System { .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH), ) .await; @@ -724,7 +732,7 @@ impl System { &self.system_endpoint, peer, SystemRpc::PullClusterLayout, - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH), ) .await; if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { diff --git a/src/table/gc.rs b/src/table/gc.rs index 6cae9701..83e7eeff 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -25,8 +25,6 @@ use crate::replication::*; use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; -// Same timeout as NEED_BLOCK_QUERY_TIMEOUT in block manager -const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(15); // GC delay for table entries: 1 day (24 hours) // (the delay before the entry is added in the GC todo list @@ -237,9 +235,7 @@ where &self.endpoint, &nodes[..], GcRpc::Update(updates), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await .err_context("GC: send tombstones")?; @@ -260,9 +256,7 @@ where &self.endpoint, &nodes[..], GcRpc::DeleteIfEqualHash(deletes), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await .err_context("GC: remote delete tombstones")?; diff --git a/src/table/sync.rs b/src/table/sync.rs index 28e99dd3..e34aa8d7 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -24,9 +24,6 @@ use crate::merkle::*; use crate::replication::*; use crate::*; -// Sync RPC can contain a lot of data, so have a 1min timeout -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(60); - // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); @@ -248,9 +245,7 @@ where &self.endpoint, nodes, SyncRpc::Items(values), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await?; @@ -311,8 +306,7 @@ where &self.endpoint, who, SyncRpc::RootCkHash(partition.partition, root_ck_hash), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await?; @@ -368,8 +362,7 @@ where &self.endpoint, who, SyncRpc::GetNode(key.clone()), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await? { @@ -445,8 +438,7 @@ where &self.endpoint, who, SyncRpc::Items(values), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await?; if let SyncRpc::Ok = rpc_resp { diff --git a/src/table/table.rs b/src/table/table.rs index 8e801be6..8a66c420 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,7 +1,6 @@ use std::borrow::Borrow; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use futures::stream::*; @@ -31,8 +30,6 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(30); - pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub system: Arc<System>, pub data: Arc<TableData<F, R>>, @@ -124,8 +121,7 @@ where &who[..], rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.write_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT), + .with_quorum(self.data.replication.write_quorum()), ) .await?; @@ -177,7 +173,7 @@ where &self.endpoint, node, rpc, - RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_NORMAL), ) .await?; Ok::<_, Error>((node, resp)) @@ -234,7 +230,6 @@ where rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.read_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; @@ -329,7 +324,6 @@ where rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.read_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; @@ -406,9 +400,7 @@ where &self.endpoint, who, TableRpc::<F>::Update(vec![what_enc]), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(who.len()) - .with_timeout(TABLE_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()), ) .await?; Ok(()) diff --git a/src/util/config.rs b/src/util/config.rs index 5e113e13..2d4b4f57 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -41,6 +41,11 @@ pub struct Config { /// Public IP address of this node pub rpc_public_addr: Option<String>, + /// Timeout for Netapp's ping messagess + pub rpc_ping_timeout_msec: Option<u64>, + /// Timeout for Netapp RPC calls + pub rpc_timeout_msec: Option<u64>, + /// Bootstrap peers RPC address #[serde(default)] pub bootstrap_peers: Vec<String>, |