aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v/rpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r--src/model/k2v/rpc.rs36
1 files changed, 17 insertions, 19 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 90101d0f..a74df277 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -23,7 +23,6 @@ use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
-use garage_table::table::TABLE_RPC_TIMEOUT;
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
@@ -117,7 +116,6 @@ impl K2VRpcHandler {
}),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -169,7 +167,6 @@ impl K2VRpcHandler {
K2VRpc::InsertManyItems(items),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -205,22 +202,23 @@ impl K2VRpcHandler {
.replication
.write_nodes(&poll_key.partition.hash());
- let resps = self
- .system
- .rpc
- .try_call_many(
- &self.endpoint,
- &nodes[..],
- K2VRpc::PollItem {
- key: poll_key,
- causal_context,
- timeout_msec,
- },
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.item_table.data.replication.read_quorum())
- .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT),
- )
- .await?;
+ let rpc = self.system.rpc.try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::PollItem {
+ key: poll_key,
+ causal_context,
+ timeout_msec,
+ },
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.item_table.data.replication.read_quorum())
+ .without_timeout(),
+ );
+ let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let resps = select! {
+ r = rpc => r?,
+ _ = tokio::time::sleep(timeout_duration) => return Ok(None),
+ };
let mut resp: Option<K2VItem> = None;
for v in resps {