diff options
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 121 |
1 files changed, 75 insertions, 46 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index c6dcbe75..7e1387ed 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -26,7 +26,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; -use crate::layout::LayoutHelper; +use crate::layout::{LayoutHelper, LayoutHistory}; use crate::metrics::RpcMetrics; // Default RPC timeout = 5 minutes @@ -304,7 +304,7 @@ impl RpcHelper { // preemptively send an additional request to any remaining nodes. // Reorder requests to priorize closeness / low latency - let request_order = self.request_order(to.iter().copied()); + let request_order = self.request_order(&self.0.layout.read().unwrap(), to.iter().copied()); let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false); // Build future for each request @@ -368,50 +368,6 @@ impl RpcHelper { } } - pub fn request_order(&self, nodes: impl Iterator<Item = Uuid>) -> Vec<Uuid> { - // Retrieve some status variables that we will use to sort requests - let peer_list = self.0.fullmesh.get_peer_list(); - let layout = self.0.layout.read().unwrap(); - let our_zone = match layout.current().node_role(&self.0.our_node_id) { - Some(pc) => &pc.zone, - None => "", - }; - - // Augment requests with some information used to sort them. - // The tuples are as follows: - // (is another node?, is another zone?, latency, node ID, request future) - // We store all of these tuples in a vec that we can sort. - // By sorting this vec, we priorize ourself, then nodes in the same zone, - // and within a same zone we priorize nodes with the lowest latency. - let mut nodes = nodes - .map(|to| { - let peer_zone = match layout.current().node_role(&to) { - Some(pc) => &pc.zone, - None => "", - }; - let peer_avg_ping = peer_list - .iter() - .find(|x| x.id.as_ref() == to.as_slice()) - .and_then(|pi| pi.avg_ping) - .unwrap_or_else(|| Duration::from_secs(10)); - ( - to != self.0.our_node_id, - peer_zone != our_zone, - peer_avg_ping, - to, - ) - }) - .collect::<Vec<_>>(); - - // Sort requests by (priorize ourself, priorize same zone, priorize low latency) - nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping)); - - nodes - .into_iter() - .map(|(_, _, _, to)| to) - .collect::<Vec<_>>() - } - /// Make a RPC call to multiple servers, returning either a Vec of responses, /// or an error if quorum could not be reached due to too many errors /// @@ -533,6 +489,79 @@ impl RpcHelper { // Failure, could not get quorum Err(result_tracker.quorum_error()) } + + // ---- functions not related to MAKING RPCs, but just determining to what nodes + // they should be made and in which order ---- + + pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> { + let layout = self.0.layout.read().unwrap(); + + let mut ret = Vec::with_capacity(12); + let ver_iter = layout + .versions + .iter() + .rev() + .chain(layout.old_versions.iter().rev()); + for ver in ver_iter { + if ver.version > layout.all_sync() { + continue; + } + let nodes = ver.nodes_of(position, ver.replication_factor); + for node in rpc_helper.request_order(&layout, nodes) { + if !ret.contains(&node) { + ret.push(node); + } + } + } + ret + } + + fn request_order( + &self, + layout: &LayoutHistory, + nodes: impl Iterator<Item = Uuid>, + ) -> Vec<Uuid> { + // Retrieve some status variables that we will use to sort requests + let peer_list = self.0.fullmesh.get_peer_list(); + let our_zone = match layout.current().node_role(&self.0.our_node_id) { + Some(pc) => &pc.zone, + None => "", + }; + + // Augment requests with some information used to sort them. + // The tuples are as follows: + // (is another node?, is another zone?, latency, node ID, request future) + // We store all of these tuples in a vec that we can sort. + // By sorting this vec, we priorize ourself, then nodes in the same zone, + // and within a same zone we priorize nodes with the lowest latency. + let mut nodes = nodes + .map(|to| { + let peer_zone = match layout.current().node_role(&to) { + Some(pc) => &pc.zone, + None => "", + }; + let peer_avg_ping = peer_list + .iter() + .find(|x| x.id.as_ref() == to.as_slice()) + .and_then(|pi| pi.avg_ping) + .unwrap_or_else(|| Duration::from_secs(10)); + ( + to != self.0.our_node_id, + peer_zone != our_zone, + peer_avg_ping, + to, + ) + }) + .collect::<Vec<_>>(); + + // Sort requests by (priorize ourself, priorize same zone, priorize low latency) + nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping)); + + nodes + .into_iter() + .map(|(_, _, _, to)| to) + .collect::<Vec<_>>() + } } // ------- utility for tracking successes/errors among write sets -------- |