diff options
-rw-r--r-- | src/rpc/rpc_helper.rs | 31 |
1 files changed, 17 insertions, 14 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 7e1387ed..65af8901 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -436,13 +436,12 @@ impl RpcHelper { H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, { - let msg = msg.into_req().map_err(netapp::error::Error::from)?; - // Peers may appear in many quorum sets. Here, build a list of peers, // mapping to the index of the quorum sets in which they appear. let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum); // Send one request to each peer of the quorum sets + let msg = msg.into_req().map_err(netapp::error::Error::from)?; let requests = result_tracker.nodes.iter().map(|(peer, _)| { let self2 = self.clone(); let msg = msg.clone(); @@ -523,10 +522,10 @@ impl RpcHelper { ) -> Vec<Uuid> { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.fullmesh.get_peer_list(); - let our_zone = match layout.current().node_role(&self.0.our_node_id) { - Some(pc) => &pc.zone, - None => "", - }; + let our_zone = layout + .current() + .get_node_zone(&self.0.our_node_id) + .unwrap_or(""); // Augment requests with some information used to sort them. // The tuples are as follows: @@ -536,10 +535,7 @@ impl RpcHelper { // and within a same zone we priorize nodes with the lowest latency. let mut nodes = nodes .map(|to| { - let peer_zone = match layout.current().node_role(&to) { - Some(pc) => &pc.zone, - None => "", - }; + let peer_zone = layout.current().get_node_zone(&to).unwrap_or(""); let peer_avg_ping = peer_list .iter() .find(|x| x.id.as_ref() == to.as_slice()) @@ -567,21 +563,28 @@ impl RpcHelper { // ------- utility for tracking successes/errors among write sets -------- pub struct QuorumSetResultTracker<S, E> { - // The set of nodes and the quorum sets they belong to + /// The set of nodes and the index of the quorum sets they belong to pub nodes: HashMap<Uuid, Vec<usize>>, + /// The quorum value, i.e. number of success responses to await in each set pub quorum: usize, - // The success and error responses received + /// The success responses received pub successes: Vec<(Uuid, S)>, + /// The error responses received pub failures: Vec<(Uuid, E)>, - // The counters for successes and failures in each set + /// The counters for successes in each set pub success_counters: Box<[usize]>, + /// The counters for failures in each set pub failure_counters: Box<[usize]>, + /// The total number of nodes in each set pub set_lens: Box<[usize]>, } -impl<S, E: std::fmt::Display> QuorumSetResultTracker<S, E> { +impl<S, E> QuorumSetResultTracker<S, E> +where + E: std::fmt::Display, +{ pub fn new<A>(sets: &[A], quorum: usize) -> Self where A: AsRef<[Uuid]>, |