aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rpc/rpc_helper.rs31
1 files changed, 17 insertions, 14 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 7e1387ed..65af8901 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -436,13 +436,12 @@ impl RpcHelper {
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
{
- let msg = msg.into_req().map_err(netapp::error::Error::from)?;
-
// Peers may appear in many quorum sets. Here, build a list of peers,
// mapping to the index of the quorum sets in which they appear.
let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum);
// Send one request to each peer of the quorum sets
+ let msg = msg.into_req().map_err(netapp::error::Error::from)?;
let requests = result_tracker.nodes.iter().map(|(peer, _)| {
let self2 = self.clone();
let msg = msg.clone();
@@ -523,10 +522,10 @@ impl RpcHelper {
) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
- let our_zone = match layout.current().node_role(&self.0.our_node_id) {
- Some(pc) => &pc.zone,
- None => "",
- };
+ let our_zone = layout
+ .current()
+ .get_node_zone(&self.0.our_node_id)
+ .unwrap_or("");
// Augment requests with some information used to sort them.
// The tuples are as follows:
@@ -536,10 +535,7 @@ impl RpcHelper {
// and within a same zone we priorize nodes with the lowest latency.
let mut nodes = nodes
.map(|to| {
- let peer_zone = match layout.current().node_role(&to) {
- Some(pc) => &pc.zone,
- None => "",
- };
+ let peer_zone = layout.current().get_node_zone(&to).unwrap_or("");
let peer_avg_ping = peer_list
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
@@ -567,21 +563,28 @@ impl RpcHelper {
// ------- utility for tracking successes/errors among write sets --------
pub struct QuorumSetResultTracker<S, E> {
- // The set of nodes and the quorum sets they belong to
+ /// The set of nodes and the index of the quorum sets they belong to
pub nodes: HashMap<Uuid, Vec<usize>>,
+ /// The quorum value, i.e. number of success responses to await in each set
pub quorum: usize,
- // The success and error responses received
+ /// The success responses received
pub successes: Vec<(Uuid, S)>,
+ /// The error responses received
pub failures: Vec<(Uuid, E)>,
- // The counters for successes and failures in each set
+ /// The counters for successes in each set
pub success_counters: Box<[usize]>,
+ /// The counters for failures in each set
pub failure_counters: Box<[usize]>,
+ /// The total number of nodes in each set
pub set_lens: Box<[usize]>,
}
-impl<S, E: std::fmt::Display> QuorumSetResultTracker<S, E> {
+impl<S, E> QuorumSetResultTracker<S, E>
+where
+ E: std::fmt::Display,
+{
pub fn new<A>(sets: &[A], quorum: usize) -> Self
where
A: AsRef<[Uuid]>,