aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-25 18:19:35 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:25:03 +0200
commite935861854deed5d1ca66767fc51d9849201a4dd (patch)
treefd368113ae4ad4b672fbcaaf37be8666e2695411
parentf0ee3056d3357221d71ad7d346e419dc26b9e063 (diff)
downloadgarage-e935861854deed5d1ca66767fc51d9849201a4dd.tar.gz
garage-e935861854deed5d1ca66767fc51d9849201a4dd.zip
Factor out node request order selection logic & use in manager
-rw-r--r--Cargo.lock2
-rwxr-xr-xscript/dev-cluster.sh2
-rw-r--r--src/block/manager.rs2
-rw-r--r--src/rpc/rpc_helper.rs95
4 files changed, 60 insertions, 41 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0d00bca1..e3d8373f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2158,7 +2158,7 @@ dependencies = [
[[package]]
name = "netapp"
version = "0.5.0"
-source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#fed0542313824df295a7e322a9aebe8ba62f97b9"
+source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#74e57016f63b6052cf6d539812859c3a46138eee"
dependencies = [
"arc-swap",
"async-trait",
diff --git a/script/dev-cluster.sh b/script/dev-cluster.sh
index fa0a950e..c7fbe08d 100755
--- a/script/dev-cluster.sh
+++ b/script/dev-cluster.sh
@@ -11,7 +11,7 @@ PATH="${GARAGE_DEBUG}:${GARAGE_RELEASE}:${NIX_RELEASE}:$PATH"
FANCYCOLORS=("41m" "42m" "44m" "45m" "100m" "104m")
export RUST_BACKTRACE=1
-export RUST_LOG=garage=info,garage_api=debug
+export RUST_LOG=garage=info,garage_api=debug,netapp=trace
MAIN_LABEL="\e[${FANCYCOLORS[0]}[main]\e[49m"
WHICH_GARAGE=$(which garage || exit 1)
diff --git a/src/block/manager.rs b/src/block/manager.rs
index bb01c300..80c52510 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -185,6 +185,7 @@ impl BlockManager {
hash: &Hash,
) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash);
+ //let who = self.system.rpc.request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -225,6 +226,7 @@ impl BlockManager {
/// Return its entire body
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
let who = self.replication.read_nodes(hash);
+ //let who = self.system.rpc.request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 6e098446..ddabd636 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -292,47 +292,19 @@ impl RpcHelper {
// to reach a quorum, priorizing nodes with the lowest latency.
// When there are errors, we start new requests to compensate.
- // Retrieve some status variables that we will use to sort requests
- let peer_list = self.0.fullmesh.get_peer_list();
- let ring: Arc<Ring> = self.0.ring.borrow().clone();
- let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
- Some(pc) => &pc.zone,
- None => "",
- };
-
- // Augment requests with some information used to sort them.
- // The tuples are as follows:
- // (is another node?, is another zone?, latency, node ID, request future)
- // We store all of these tuples in a vec that we can sort.
- // By sorting this vec, we priorize ourself, then nodes in the same zone,
- // and within a same zone we priorize nodes with the lowest latency.
- let mut requests = requests
- .map(|(to, fut)| {
- let peer_zone = match ring.layout.node_role(&to) {
- Some(pc) => &pc.zone,
- None => "",
- };
- let peer_avg_ping = peer_list
- .iter()
- .find(|x| x.id.as_ref() == to.as_slice())
- .and_then(|pi| pi.avg_ping)
- .unwrap_or_else(|| Duration::from_secs(1));
- (
- to != self.0.our_node_id,
- peer_zone != our_zone,
- peer_avg_ping,
- to,
- fut,
- )
- })
+ // Reorder requests to priorize closeness / low latency
+ let request_order = self.request_order(to);
+ let mut ord_requests = vec![(); request_order.len()]
+ .into_iter()
+ .map(|_| None)
.collect::<Vec<_>>();
-
- // Sort requests by (priorize ourself, priorize same zone, priorize low latency)
- requests
- .sort_by_key(|(diffnode, diffzone, ping, _to, _fut)| (*diffnode, *diffzone, *ping));
+ for (to, fut) in requests {
+ let i = request_order.iter().position(|x| *x == to).unwrap();
+ ord_requests[i] = Some((to, fut));
+ }
// Make an iterator to take requests in their sorted order
- let mut requests = requests.into_iter();
+ let mut requests = ord_requests.into_iter().map(Option::unwrap);
// resp_stream will contain all of the requests that are currently in flight.
// (for the moment none, they will be added in the loop below)
@@ -343,7 +315,7 @@ impl RpcHelper {
// If the current set of requests that are running is not enough to possibly
// reach quorum, start some new requests.
while successes.len() + resp_stream.len() < quorum {
- if let Some((_, _, _, req_to, fut)) = requests.next() {
+ if let Some((req_to, fut)) = requests.next() {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn(
@@ -413,4 +385,49 @@ impl RpcHelper {
Err(Error::Quorum(quorum, successes.len(), to.len(), errors))
}
}
+
+ pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
+ // Retrieve some status variables that we will use to sort requests
+ let peer_list = self.0.fullmesh.get_peer_list();
+ let ring: Arc<Ring> = self.0.ring.borrow().clone();
+ let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
+ Some(pc) => &pc.zone,
+ None => "",
+ };
+
+ // Augment requests with some information used to sort them.
+ // The tuples are as follows:
+ // (is another node?, is another zone?, latency, node ID, request future)
+ // We store all of these tuples in a vec that we can sort.
+ // By sorting this vec, we priorize ourself, then nodes in the same zone,
+ // and within a same zone we priorize nodes with the lowest latency.
+ let mut nodes = nodes
+ .iter()
+ .map(|to| {
+ let peer_zone = match ring.layout.node_role(&to) {
+ Some(pc) => &pc.zone,
+ None => "",
+ };
+ let peer_avg_ping = peer_list
+ .iter()
+ .find(|x| x.id.as_ref() == to.as_slice())
+ .and_then(|pi| pi.avg_ping)
+ .unwrap_or_else(|| Duration::from_secs(1));
+ (
+ *to != self.0.our_node_id,
+ peer_zone != our_zone,
+ peer_avg_ping,
+ *to,
+ )
+ })
+ .collect::<Vec<_>>();
+
+ // Sort requests by (priorize ourself, priorize same zone, priorize low latency)
+ nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping));
+
+ nodes
+ .into_iter()
+ .map(|(_, _, _, to)| to)
+ .collect::<Vec<_>>()
+ }
}