aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v/rpc.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-09-20 16:17:23 +0200
committerAlex <alex@adnab.me>2022-09-20 16:17:23 +0200
commit7a901f7aab29d9ae09c378e3824b8236458f85f1 (patch)
tree068a012a904b9e7552bc1f594b7d93260c69409f /src/model/k2v/rpc.rs
parent2c312e9cbd58368484e9acb043b7c9d0ebb8905c (diff)
parentded444f6c96f8ab991e762f65760b42e4d64246c (diff)
downloadgarage-0.8.0-beta2.tar.gz
garage-0.8.0-beta2.zip
Merge pull request 'RPC performance changes' (#387) from configurable-timeouts into mainv0.8.0-beta2
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/387
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r--src/model/k2v/rpc.rs36
1 files changed, 17 insertions, 19 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 90101d0f..a74df277 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -23,7 +23,6 @@ use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
-use garage_table::table::TABLE_RPC_TIMEOUT;
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
@@ -117,7 +116,6 @@ impl K2VRpcHandler {
}),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -169,7 +167,6 @@ impl K2VRpcHandler {
K2VRpc::InsertManyItems(items),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -205,22 +202,23 @@ impl K2VRpcHandler {
.replication
.write_nodes(&poll_key.partition.hash());
- let resps = self
- .system
- .rpc
- .try_call_many(
- &self.endpoint,
- &nodes[..],
- K2VRpc::PollItem {
- key: poll_key,
- causal_context,
- timeout_msec,
- },
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.item_table.data.replication.read_quorum())
- .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT),
- )
- .await?;
+ let rpc = self.system.rpc.try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::PollItem {
+ key: poll_key,
+ causal_context,
+ timeout_msec,
+ },
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.item_table.data.replication.read_quorum())
+ .without_timeout(),
+ );
+ let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let resps = select! {
+ r = rpc => r?,
+ _ = tokio::time::sleep(timeout_duration) => return Ok(None),
+ };
let mut resp: Option<K2VItem> = None;
for v in resps {