aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs18
-rw-r--r--src/block/resync.rs14
-rw-r--r--src/model/k2v/rpc.rs36
-rw-r--r--src/rpc/Cargo.toml2
-rw-r--r--src/rpc/metrics.rs19
-rw-r--r--src/rpc/rpc_helper.rs57
-rw-r--r--src/rpc/system.rs16
-rw-r--r--src/table/gc.rs10
-rw-r--r--src/table/sync.rs16
-rw-r--r--src/table/table.rs14
-rw-r--r--src/util/config.rs5
11 files changed, 84 insertions, 123 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index ec694fc8..7f439b96 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -41,9 +41,6 @@ use crate::resync::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
-// Timeout for RPCs that read and write blocks to remote nodes
-pub(crate) const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(60);
-
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
// to delete the block locally.
@@ -183,7 +180,7 @@ impl BlockManager {
};
return Ok((header, stream));
}
- _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
+ _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
@@ -235,7 +232,7 @@ impl BlockManager {
}
}
}
- _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
+ _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
@@ -300,8 +297,7 @@ impl BlockManager {
&who[..],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
- .with_quorum(self.replication.write_quorum())
- .with_timeout(BLOCK_RW_TIMEOUT),
+ .with_quorum(self.replication.write_quorum()),
)
.await?;
@@ -336,7 +332,10 @@ impl BlockManager {
// we will fecth it from someone.
let this = self.clone();
tokio::spawn(async move {
- if let Err(e) = this.resync.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) {
+ if let Err(e) = this
+ .resync
+ .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
+ {
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
});
@@ -444,7 +443,8 @@ impl BlockManager {
Ok(c) => c,
Err(e) => {
// Not found but maybe we should have had it ??
- self.resync.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
+ self.resync
+ .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
return Err(Into::into(e));
}
};
diff --git a/src/block/resync.rs b/src/block/resync.rs
index bde3e98c..ada3ac54 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -33,14 +33,6 @@ use garage_table::replication::TableReplication;
use crate::manager::*;
-// Timeout for RPCs that ask other nodes whether they need a copy
-// of a given block before we delete it locally
-// The timeout here is relatively low because we don't want to block
-// the entire resync loop when some nodes are not responding.
-// Nothing will be deleted if the nodes don't answer the queries,
-// we will just retry later.
-const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15);
-
// The delay between the time where a resync operation fails
// and the time when it is retried, with exponential backoff
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
@@ -346,8 +338,7 @@ impl BlockResyncManager {
&manager.endpoint,
&who,
BlockRpc::NeedBlockQuery(*hash),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
@@ -394,8 +385,7 @@ impl BlockResyncManager {
&need_nodes[..],
put_block_message,
RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(need_nodes.len())
- .with_timeout(BLOCK_RW_TIMEOUT),
+ .with_quorum(need_nodes.len()),
)
.await
.err_context("PutBlock RPC")?;
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 90101d0f..a74df277 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -23,7 +23,6 @@ use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
-use garage_table::table::TABLE_RPC_TIMEOUT;
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
@@ -117,7 +116,6 @@ impl K2VRpcHandler {
}),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -169,7 +167,6 @@ impl K2VRpcHandler {
K2VRpc::InsertManyItems(items),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -205,22 +202,23 @@ impl K2VRpcHandler {
.replication
.write_nodes(&poll_key.partition.hash());
- let resps = self
- .system
- .rpc
- .try_call_many(
- &self.endpoint,
- &nodes[..],
- K2VRpc::PollItem {
- key: poll_key,
- causal_context,
- timeout_msec,
- },
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.item_table.data.replication.read_quorum())
- .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT),
- )
- .await?;
+ let rpc = self.system.rpc.try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::PollItem {
+ key: poll_key,
+ causal_context,
+ timeout_msec,
+ },
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.item_table.data.replication.read_quorum())
+ .without_timeout(),
+ );
+ let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let resps = select! {
+ r = rpc => r?,
+ _ = tokio::time::sleep(timeout_duration) => return Ok(None),
+ };
let mut resp: Option<K2VItem> = None;
for v in resps {
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index e51f1f73..d61acea4 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
-netapp = { version = "0.5.1", features = ["telemetry"] }
+netapp = { version = "0.5.2", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs
index c900518c..61f8fa79 100644
--- a/src/rpc/metrics.rs
+++ b/src/rpc/metrics.rs
@@ -1,31 +1,18 @@
-use std::sync::Arc;
-
use opentelemetry::{global, metrics::*};
-use tokio::sync::Semaphore;
/// TableMetrics reference all counter used for metrics
pub struct RpcMetrics {
- pub(crate) _rpc_available_permits: ValueObserver<u64>,
-
pub(crate) rpc_counter: Counter<u64>,
pub(crate) rpc_timeout_counter: Counter<u64>,
pub(crate) rpc_netapp_error_counter: Counter<u64>,
pub(crate) rpc_garage_error_counter: Counter<u64>,
pub(crate) rpc_duration: ValueRecorder<f64>,
- pub(crate) rpc_queueing_time: ValueRecorder<f64>,
}
impl RpcMetrics {
- pub fn new(sem: Arc<Semaphore>) -> Self {
+ pub fn new() -> Self {
let meter = global::meter("garage_rpc");
RpcMetrics {
- _rpc_available_permits: meter
- .u64_value_observer("rpc.available_permits", move |observer| {
- observer.observe(sem.available_permits() as u64, &[])
- })
- .with_description("Number of available RPC permits")
- .init(),
-
rpc_counter: meter
.u64_counter("rpc.request_counter")
.with_description("Number of RPC requests emitted")
@@ -46,10 +33,6 @@ impl RpcMetrics {
.f64_value_recorder("rpc.duration")
.with_description("Duration of RPCs")
.init(),
- rpc_queueing_time: meter
- .f64_value_recorder("rpc.queueing_time")
- .with_description("Time RPC requests were queued for before being sent")
- .init(),
}
}
}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 19abb4c5..857ed620 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
-use tokio::sync::{watch, Semaphore};
+use tokio::sync::watch;
use opentelemetry::KeyValue;
use opentelemetry::{
@@ -32,32 +32,30 @@ use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring;
-const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
-
-// Don't allow more than 100 concurrent outgoing RPCs.
-const MAX_CONCURRENT_REQUESTS: usize = 100;
+// Default RPC timeout = 5 minutes
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
- /// Max time to wait for reponse
- pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: Option<usize>,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
/// Request priority
pub rs_priority: RequestPriority,
+ /// Deactivate timeout for this request
+ pub rs_no_timeout: bool,
}
impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
- rs_timeout: DEFAULT_TIMEOUT,
rs_quorum: None,
rs_interrupt_after_quorum: false,
rs_priority: prio,
+ rs_no_timeout: false,
}
}
/// Set quorum to be reached for request
@@ -65,17 +63,17 @@ impl RequestStrategy {
self.rs_quorum = Some(quorum);
self
}
- /// Set timeout of the strategy
- pub fn with_timeout(mut self, timeout: Duration) -> Self {
- self.rs_timeout = timeout;
- self
- }
/// Set if requests can be dropped after quorum has been reached
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
+ /// Deactivate timeout for this request
+ pub fn without_timeout(mut self) -> Self {
+ self.rs_no_timeout = true;
+ self
+ }
}
#[derive(Clone)]
@@ -86,8 +84,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
- request_buffer_semaphore: Arc<Semaphore>,
metrics: RpcMetrics,
+ rpc_timeout: Duration,
}
impl RpcHelper {
@@ -96,21 +94,24 @@ impl RpcHelper {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
+ rpc_timeout: Option<Duration>,
) -> Self {
- let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
-
- let metrics = RpcMetrics::new(sem.clone());
+ let metrics = RpcMetrics::new();
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
background,
ring,
- request_buffer_semaphore: sem,
metrics,
+ rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
}
+ pub fn rpc_timeout(&self) -> Duration {
+ self.0.rpc_timeout
+ }
+
pub async fn call<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
@@ -129,13 +130,6 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)),
];
- let permit = self
- .0
- .request_buffer_semaphore
- .acquire()
- .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
- .await?;
-
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into();
@@ -143,10 +137,16 @@ impl RpcHelper {
.call_streaming(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
+ let timeout = async {
+ if strat.rs_no_timeout {
+ futures::future::pending().await
+ } else {
+ tokio::time::sleep(self.0.rpc_timeout).await
+ }
+ };
+
select! {
res = rpc_call => {
- drop(permit);
-
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
@@ -158,8 +158,7 @@ impl RpcHelper {
Ok(res?)
}
- _ = tokio::time::sleep(strat.rs_timeout) => {
- drop(permit);
+ () = timeout => {
self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout)
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 2c6136a8..f8121193 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -37,7 +37,6 @@ use crate::rpc_helper::*;
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
-const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15);
/// Version tag used for version check upon Netapp connection.
/// Cluster nodes with different version tags are deemed
@@ -280,6 +279,9 @@ impl System {
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
+ if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
+ fullmesh.set_ping_timeout_millis(ping_timeout);
+ }
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
@@ -317,7 +319,13 @@ impl System {
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
- rpc: RpcHelper::new(netapp.id.into(), fullmesh, background.clone(), ring.clone()),
+ rpc: RpcHelper::new(
+ netapp.id.into(),
+ fullmesh,
+ background.clone(),
+ ring.clone(),
+ config.rpc_timeout_msec.map(Duration::from_millis),
+ ),
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
@@ -600,7 +608,7 @@ impl System {
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
- RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
@@ -724,7 +732,7 @@ impl System {
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayout,
- RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 6cae9701..83e7eeff 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -25,8 +25,6 @@ use crate::replication::*;
use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
-// Same timeout as NEED_BLOCK_QUERY_TIMEOUT in block manager
-const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(15);
// GC delay for table entries: 1 day (24 hours)
// (the delay before the entry is added in the GC todo list
@@ -237,9 +235,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::Update(updates),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: send tombstones")?;
@@ -260,9 +256,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::DeleteIfEqualHash(deletes),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: remote delete tombstones")?;
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 62b88a58..76402d28 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -24,9 +24,6 @@ use crate::merkle::*;
use crate::replication::*;
use crate::*;
-// Sync RPC can contain a lot of data, so have a 1min timeout
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(60);
-
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
@@ -248,9 +245,7 @@ where
&self.endpoint,
nodes,
SyncRpc::Items(values),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await?;
@@ -311,8 +306,7 @@ where
&self.endpoint,
who,
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
@@ -368,8 +362,7 @@ where
&self.endpoint,
who,
SyncRpc::GetNode(key.clone()),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?
{
@@ -445,8 +438,7 @@ where
&self.endpoint,
who,
SyncRpc::Items(values),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
if let SyncRpc::Ok = rpc_resp {
diff --git a/src/table/table.rs b/src/table/table.rs
index 8e801be6..8a66c420 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -1,7 +1,6 @@
use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
-use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
@@ -31,8 +30,6 @@ use crate::schema::*;
use crate::sync::*;
use crate::util::*;
-pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-
pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub system: Arc<System>,
pub data: Arc<TableData<F, R>>,
@@ -124,8 +121,7 @@ where
&who[..],
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.data.replication.write_quorum())
- .with_timeout(TABLE_RPC_TIMEOUT),
+ .with_quorum(self.data.replication.write_quorum()),
)
.await?;
@@ -177,7 +173,7 @@ where
&self.endpoint,
node,
rpc,
- RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_NORMAL),
)
.await?;
Ok::<_, Error>((node, resp))
@@ -234,7 +230,6 @@ where
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum())
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -329,7 +324,6 @@ where
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum())
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -406,9 +400,7 @@ where
&self.endpoint,
who,
TableRpc::<F>::Update(vec![what_enc]),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(who.len())
- .with_timeout(TABLE_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()),
)
.await?;
Ok(())
diff --git a/src/util/config.rs b/src/util/config.rs
index 5e113e13..2d4b4f57 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -41,6 +41,11 @@ pub struct Config {
/// Public IP address of this node
pub rpc_public_addr: Option<String>,
+ /// Timeout for Netapp's ping messagess
+ pub rpc_ping_timeout_msec: Option<u64>,
+ /// Timeout for Netapp RPC calls
+ pub rpc_timeout_msec: Option<u64>,
+
/// Bootstrap peers RPC address
#[serde(default)]
pub bootstrap_peers: Vec<String>,