diff options
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 95 |
1 files changed, 56 insertions, 39 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 6e098446..ddabd636 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -292,47 +292,19 @@ impl RpcHelper { // to reach a quorum, priorizing nodes with the lowest latency. // When there are errors, we start new requests to compensate. - // Retrieve some status variables that we will use to sort requests - let peer_list = self.0.fullmesh.get_peer_list(); - let ring: Arc<Ring> = self.0.ring.borrow().clone(); - let our_zone = match ring.layout.node_role(&self.0.our_node_id) { - Some(pc) => &pc.zone, - None => "", - }; - - // Augment requests with some information used to sort them. - // The tuples are as follows: - // (is another node?, is another zone?, latency, node ID, request future) - // We store all of these tuples in a vec that we can sort. - // By sorting this vec, we priorize ourself, then nodes in the same zone, - // and within a same zone we priorize nodes with the lowest latency. - let mut requests = requests - .map(|(to, fut)| { - let peer_zone = match ring.layout.node_role(&to) { - Some(pc) => &pc.zone, - None => "", - }; - let peer_avg_ping = peer_list - .iter() - .find(|x| x.id.as_ref() == to.as_slice()) - .and_then(|pi| pi.avg_ping) - .unwrap_or_else(|| Duration::from_secs(1)); - ( - to != self.0.our_node_id, - peer_zone != our_zone, - peer_avg_ping, - to, - fut, - ) - }) + // Reorder requests to priorize closeness / low latency + let request_order = self.request_order(to); + let mut ord_requests = vec![(); request_order.len()] + .into_iter() + .map(|_| None) .collect::<Vec<_>>(); - - // Sort requests by (priorize ourself, priorize same zone, priorize low latency) - requests - .sort_by_key(|(diffnode, diffzone, ping, _to, _fut)| (*diffnode, *diffzone, *ping)); + for (to, fut) in requests { + let i = request_order.iter().position(|x| *x == to).unwrap(); + ord_requests[i] = Some((to, fut)); + } // Make an iterator to take requests in their sorted order - let mut requests = requests.into_iter(); + let mut requests = ord_requests.into_iter().map(Option::unwrap); // resp_stream will contain all of the requests that are currently in flight. // (for the moment none, they will be added in the loop below) @@ -343,7 +315,7 @@ impl RpcHelper { // If the current set of requests that are running is not enough to possibly // reach quorum, start some new requests. while successes.len() + resp_stream.len() < quorum { - if let Some((_, _, _, req_to, fut)) = requests.next() { + if let Some((req_to, fut)) = requests.next() { let tracer = opentelemetry::global::tracer("garage"); let span = tracer.start(format!("RPC to {:?}", req_to)); resp_stream.push(tokio::spawn( @@ -413,4 +385,49 @@ impl RpcHelper { Err(Error::Quorum(quorum, successes.len(), to.len(), errors)) } } + + pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> { + // Retrieve some status variables that we will use to sort requests + let peer_list = self.0.fullmesh.get_peer_list(); + let ring: Arc<Ring> = self.0.ring.borrow().clone(); + let our_zone = match ring.layout.node_role(&self.0.our_node_id) { + Some(pc) => &pc.zone, + None => "", + }; + + // Augment requests with some information used to sort them. + // The tuples are as follows: + // (is another node?, is another zone?, latency, node ID, request future) + // We store all of these tuples in a vec that we can sort. + // By sorting this vec, we priorize ourself, then nodes in the same zone, + // and within a same zone we priorize nodes with the lowest latency. + let mut nodes = nodes + .iter() + .map(|to| { + let peer_zone = match ring.layout.node_role(&to) { + Some(pc) => &pc.zone, + None => "", + }; + let peer_avg_ping = peer_list + .iter() + .find(|x| x.id.as_ref() == to.as_slice()) + .and_then(|pi| pi.avg_ping) + .unwrap_or_else(|| Duration::from_secs(1)); + ( + *to != self.0.our_node_id, + peer_zone != our_zone, + peer_avg_ping, + *to, + ) + }) + .collect::<Vec<_>>(); + + // Sort requests by (priorize ourself, priorize same zone, priorize low latency) + nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping)); + + nodes + .into_iter() + .map(|(_, _, _, to)| to) + .collect::<Vec<_>>() + } } |