aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_helper.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-12-07 10:55:15 +0100
committerAlex Auvolat <alex@adnab.me>2023-12-07 10:57:21 +0100
commit95eb13eb08d517d328e3c8aeb222440a27211ee9 (patch)
tree4216b2172cb973f404b56b51546824cbfa966a7a /src/rpc/rpc_helper.rs
parentc8356a91d9bf1d1488ec288099f2a55a1019918f (diff)
downloadgarage-95eb13eb08d517d328e3c8aeb222440a27211ee9.tar.gz
garage-95eb13eb08d517d328e3c8aeb222440a27211ee9.zip
rpc: refactor result tracking for quorum sets
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r--src/rpc/rpc_helper.rs147
1 files changed, 103 insertions, 44 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index f71f5ae7..c6dcbe75 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -484,15 +484,10 @@ impl RpcHelper {
// Peers may appear in many quorum sets. Here, build a list of peers,
// mapping to the index of the quorum sets in which they appear.
- let mut peers = HashMap::<Uuid, Vec<usize>>::new();
- for (i, set) in to_sets.iter().enumerate() {
- for peer in set.iter() {
- peers.entry(*peer).or_default().push(i);
- }
- }
+ let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum);
// Send one request to each peer of the quorum sets
- let requests = peers.iter().map(|(peer, _)| {
+ let requests = result_tracker.nodes.iter().map(|(peer, _)| {
let self2 = self.clone();
let msg = msg.clone();
let endpoint2 = endpoint.clone();
@@ -501,52 +496,25 @@ impl RpcHelper {
});
let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
- // Success and error responses will be collected in these two vectors
- let mut successes = vec![];
- let mut errors = vec![];
-
- // `set_counters` is used to keep track of how many success and error
- // responses are received within each quorum set. When a node returns
- // its response, it counts as a sucess/an error for all of the quorum
- // sets which it is part of.
- let mut set_counters = vec![(0, 0); to_sets.len()];
-
// Drive requests to completion
while let Some((node, resp)) = resp_stream.next().await {
// Store the response in the correct vector and increment the
// appropriate counters
- match resp {
- Ok(msg) => {
- for set in peers.get(&node).unwrap().iter() {
- set_counters[*set].0 += 1;
- }
- successes.push(msg);
- }
- Err(e) => {
- for set in peers.get(&node).unwrap().iter() {
- set_counters[*set].1 += 1;
- }
- errors.push(e);
- }
- }
+ result_tracker.register_result(node, resp);
// If we have a quorum of ok in all quorum sets, then it's a success!
- if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) {
+ if result_tracker.all_quorums_ok() {
// Continue all other requets in background
tokio::spawn(async move {
resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
});
- return Ok(successes);
+ return Ok(result_tracker.success_values());
}
// If there is a quorum set for which too many errors were received,
// we know it's impossible to get a quorum, so return immediately.
- if set_counters
- .iter()
- .enumerate()
- .any(|(i, (_, err_cnt))| err_cnt + quorum > to_sets[i].len())
- {
+ if result_tracker.too_many_failures() {
break;
}
}
@@ -563,13 +531,104 @@ impl RpcHelper {
// running request handler.)
// Failure, could not get quorum
- let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::Quorum(
+ Err(result_tracker.quorum_error())
+ }
+}
+
+// ------- utility for tracking successes/errors among write sets --------
+
+pub struct QuorumSetResultTracker<S, E> {
+ // The set of nodes and the quorum sets they belong to
+ pub nodes: HashMap<Uuid, Vec<usize>>,
+ pub quorum: usize,
+
+ // The success and error responses received
+ pub successes: Vec<(Uuid, S)>,
+ pub failures: Vec<(Uuid, E)>,
+
+ // The counters for successes and failures in each set
+ pub success_counters: Box<[usize]>,
+ pub failure_counters: Box<[usize]>,
+ pub set_lens: Box<[usize]>,
+}
+
+impl<S, E: std::fmt::Display> QuorumSetResultTracker<S, E> {
+ pub fn new<A>(sets: &[A], quorum: usize) -> Self
+ where
+ A: AsRef<[Uuid]>,
+ {
+ let mut nodes = HashMap::<Uuid, Vec<usize>>::new();
+ for (i, set) in sets.iter().enumerate() {
+ for node in set.as_ref().iter() {
+ nodes.entry(*node).or_default().push(i);
+ }
+ }
+
+ let num_nodes = nodes.len();
+ Self {
+ nodes,
quorum,
- Some(to_sets.len()),
- successes.len(),
- peers.len(),
+ successes: Vec::with_capacity(num_nodes),
+ failures: vec![],
+ success_counters: vec![0; sets.len()].into_boxed_slice(),
+ failure_counters: vec![0; sets.len()].into_boxed_slice(),
+ set_lens: sets
+ .iter()
+ .map(|x| x.as_ref().len())
+ .collect::<Vec<_>>()
+ .into_boxed_slice(),
+ }
+ }
+
+ pub fn register_result(&mut self, node: Uuid, result: Result<S, E>) {
+ match result {
+ Ok(s) => {
+ self.successes.push((node, s));
+ for set in self.nodes.get(&node).unwrap().iter() {
+ self.success_counters[*set] += 1;
+ }
+ }
+ Err(e) => {
+ self.failures.push((node, e));
+ for set in self.nodes.get(&node).unwrap().iter() {
+ self.failure_counters[*set] += 1;
+ }
+ }
+ }
+ }
+
+ pub fn all_quorums_ok(&self) -> bool {
+ self.success_counters
+ .iter()
+ .all(|ok_cnt| *ok_cnt >= self.quorum)
+ }
+
+ pub fn too_many_failures(&self) -> bool {
+ self.failure_counters
+ .iter()
+ .zip(self.set_lens.iter())
+ .any(|(err_cnt, set_len)| *err_cnt + self.quorum > *set_len)
+ }
+
+ pub fn success_values(self) -> Vec<S> {
+ self.successes
+ .into_iter()
+ .map(|(_, x)| x)
+ .collect::<Vec<_>>()
+ }
+
+ pub fn quorum_error(self) -> Error {
+ let errors = self
+ .failures
+ .iter()
+ .map(|(n, e)| format!("{:?}: {}", n, e))
+ .collect::<Vec<_>>();
+ Error::Quorum(
+ self.quorum,
+ Some(self.set_lens.len()),
+ self.successes.len(),
+ self.nodes.len(),
errors,
- ))
+ )
}
}