aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v/rpc.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-09 12:55:36 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-09 12:55:36 +0100
commit8a2b1dd422fb57abe611d8c1cf3cb0b55f487189 (patch)
tree2109cf405af3489eff0cbdd132ea2862c844214c /src/model/k2v/rpc.rs
parent523d2ecb9511f74e144cd116b942d6c1bf0f546d (diff)
downloadgarage-8a2b1dd422fb57abe611d8c1cf3cb0b55f487189.tar.gz
garage-8a2b1dd422fb57abe611d8c1cf3cb0b55f487189.zip
wip: split out layout management from System into separate LayoutManager
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r--src/model/k2v/rpc.rs20
1 files changed, 13 insertions, 7 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 37e142f6..2f548ad7 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -131,7 +131,7 @@ impl K2VRpcHandler {
who.sort();
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&who[..],
@@ -187,7 +187,7 @@ impl K2VRpcHandler {
let call_futures = call_list.into_iter().map(|(nodes, items)| async move {
let resp = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&nodes[..],
@@ -229,7 +229,7 @@ impl K2VRpcHandler {
.replication
.write_nodes(&poll_key.partition.hash());
- let rpc = self.system.rpc.try_call_many(
+ let rpc = self.system.rpc_helper().try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::PollItem {
@@ -241,7 +241,8 @@ impl K2VRpcHandler {
.with_quorum(self.item_table.data.replication.read_quorum())
.without_timeout(),
);
- let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let timeout_duration =
+ Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout();
let resps = select! {
r = rpc => r?,
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
@@ -300,7 +301,11 @@ impl K2VRpcHandler {
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
let mut requests = nodes
.iter()
- .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .map(|node| {
+ self.system
+ .rpc_helper()
+ .call(&self.endpoint, *node, msg.clone(), rs)
+ })
.collect::<FuturesUnordered<_>>();
// Fetch responses. This procedure stops fetching responses when any of the following
@@ -316,8 +321,9 @@ impl K2VRpcHandler {
// kind: all items produced by that node until time ts have been returned, so we can
// bump the entry in the global vector clock and possibly remove some item-specific
// vector clocks)
- let mut deadline =
- Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let mut deadline = Instant::now()
+ + Duration::from_millis(timeout_msec)
+ + self.system.rpc_helper().rpc_timeout();
let mut resps = vec![];
let mut errors = vec![];
loop {