aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/rpc_helper.rs95
1 files changed, 56 insertions, 39 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 6e098446..ddabd636 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -292,47 +292,19 @@ impl RpcHelper {
// to reach a quorum, priorizing nodes with the lowest latency.
// When there are errors, we start new requests to compensate.
- // Retrieve some status variables that we will use to sort requests
- let peer_list = self.0.fullmesh.get_peer_list();
- let ring: Arc<Ring> = self.0.ring.borrow().clone();
- let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
- Some(pc) => &pc.zone,
- None => "",
- };
-
- // Augment requests with some information used to sort them.
- // The tuples are as follows:
- // (is another node?, is another zone?, latency, node ID, request future)
- // We store all of these tuples in a vec that we can sort.
- // By sorting this vec, we priorize ourself, then nodes in the same zone,
- // and within a same zone we priorize nodes with the lowest latency.
- let mut requests = requests
- .map(|(to, fut)| {
- let peer_zone = match ring.layout.node_role(&to) {
- Some(pc) => &pc.zone,
- None => "",
- };
- let peer_avg_ping = peer_list
- .iter()
- .find(|x| x.id.as_ref() == to.as_slice())
- .and_then(|pi| pi.avg_ping)
- .unwrap_or_else(|| Duration::from_secs(1));
- (
- to != self.0.our_node_id,
- peer_zone != our_zone,
- peer_avg_ping,
- to,
- fut,
- )
- })
+ // Reorder requests to priorize closeness / low latency
+ let request_order = self.request_order(to);
+ let mut ord_requests = vec![(); request_order.len()]
+ .into_iter()
+ .map(|_| None)
.collect::<Vec<_>>();
-
- // Sort requests by (priorize ourself, priorize same zone, priorize low latency)
- requests
- .sort_by_key(|(diffnode, diffzone, ping, _to, _fut)| (*diffnode, *diffzone, *ping));
+ for (to, fut) in requests {
+ let i = request_order.iter().position(|x| *x == to).unwrap();
+ ord_requests[i] = Some((to, fut));
+ }
// Make an iterator to take requests in their sorted order
- let mut requests = requests.into_iter();
+ let mut requests = ord_requests.into_iter().map(Option::unwrap);
// resp_stream will contain all of the requests that are currently in flight.
// (for the moment none, they will be added in the loop below)
@@ -343,7 +315,7 @@ impl RpcHelper {
// If the current set of requests that are running is not enough to possibly
// reach quorum, start some new requests.
while successes.len() + resp_stream.len() < quorum {
- if let Some((_, _, _, req_to, fut)) = requests.next() {
+ if let Some((req_to, fut)) = requests.next() {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn(
@@ -413,4 +385,49 @@ impl RpcHelper {
Err(Error::Quorum(quorum, successes.len(), to.len(), errors))
}
}
+
+ pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
+ // Retrieve some status variables that we will use to sort requests
+ let peer_list = self.0.fullmesh.get_peer_list();
+ let ring: Arc<Ring> = self.0.ring.borrow().clone();
+ let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
+ Some(pc) => &pc.zone,
+ None => "",
+ };
+
+ // Augment requests with some information used to sort them.
+ // The tuples are as follows:
+ // (is another node?, is another zone?, latency, node ID, request future)
+ // We store all of these tuples in a vec that we can sort.
+ // By sorting this vec, we priorize ourself, then nodes in the same zone,
+ // and within a same zone we priorize nodes with the lowest latency.
+ let mut nodes = nodes
+ .iter()
+ .map(|to| {
+ let peer_zone = match ring.layout.node_role(&to) {
+ Some(pc) => &pc.zone,
+ None => "",
+ };
+ let peer_avg_ping = peer_list
+ .iter()
+ .find(|x| x.id.as_ref() == to.as_slice())
+ .and_then(|pi| pi.avg_ping)
+ .unwrap_or_else(|| Duration::from_secs(1));
+ (
+ *to != self.0.our_node_id,
+ peer_zone != our_zone,
+ peer_avg_ping,
+ *to,
+ )
+ })
+ .collect::<Vec<_>>();
+
+ // Sort requests by (priorize ourself, priorize same zone, priorize low latency)
+ nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping));
+
+ nodes
+ .into_iter()
+ .map(|(_, _, _, to)| to)
+ .collect::<Vec<_>>()
+ }
}