diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-09 12:55:36 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-09 12:55:36 +0100 |
commit | 8a2b1dd422fb57abe611d8c1cf3cb0b55f487189 (patch) | |
tree | 2109cf405af3489eff0cbdd132ea2862c844214c /src/model/k2v/rpc.rs | |
parent | 523d2ecb9511f74e144cd116b942d6c1bf0f546d (diff) | |
download | garage-8a2b1dd422fb57abe611d8c1cf3cb0b55f487189.tar.gz garage-8a2b1dd422fb57abe611d8c1cf3cb0b55f487189.zip |
wip: split out layout management from System into separate LayoutManager
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r-- | src/model/k2v/rpc.rs | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 37e142f6..2f548ad7 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -131,7 +131,7 @@ impl K2VRpcHandler { who.sort(); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -187,7 +187,7 @@ impl K2VRpcHandler { let call_futures = call_list.into_iter().map(|(nodes, items)| async move { let resp = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &nodes[..], @@ -229,7 +229,7 @@ impl K2VRpcHandler { .replication .write_nodes(&poll_key.partition.hash()); - let rpc = self.system.rpc.try_call_many( + let rpc = self.system.rpc_helper().try_call_many( &self.endpoint, &nodes[..], K2VRpc::PollItem { @@ -241,7 +241,8 @@ impl K2VRpcHandler { .with_quorum(self.item_table.data.replication.read_quorum()) .without_timeout(), ); - let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let timeout_duration = + Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout(); let resps = select! { r = rpc => r?, _ = tokio::time::sleep(timeout_duration) => return Ok(None), @@ -300,7 +301,11 @@ impl K2VRpcHandler { let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); let mut requests = nodes .iter() - .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .map(|node| { + self.system + .rpc_helper() + .call(&self.endpoint, *node, msg.clone(), rs) + }) .collect::<FuturesUnordered<_>>(); // Fetch responses. This procedure stops fetching responses when any of the following @@ -316,8 +321,9 @@ impl K2VRpcHandler { // kind: all items produced by that node until time ts have been returned, so we can // bump the entry in the global vector clock and possibly remove some item-specific // vector clocks) - let mut deadline = - Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let mut deadline = Instant::now() + + Duration::from_millis(timeout_msec) + + self.system.rpc_helper().rpc_timeout(); let mut resps = vec![]; let mut errors = vec![]; loop { |