aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-12-08 12:02:24 +0100
committerAlex Auvolat <alex@adnab.me>2023-12-08 12:02:24 +0100
commit5dd200c015aed786173f0e11541b0505f95dd6d1 (patch)
treedc490bcdf93fbba7ff9c14e48ca577f93545e375
parent063294dd569e10c6d85e29eb6507249eece00956 (diff)
downloadgarage-5dd200c015aed786173f0e11541b0505f95dd6d1.tar.gz
garage-5dd200c015aed786173f0e11541b0505f95dd6d1.zip
layout: move block_read_nodes_of to rpc_helper to avoid double-locking
(in theory, this could have caused a deadlock)
-rw-r--r--src/block/manager.rs2
-rw-r--r--src/rpc/layout/helper.rs27
-rw-r--r--src/rpc/rpc_helper.rs121
3 files changed, 80 insertions, 70 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 47111160..bfd390ee 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -266,7 +266,7 @@ impl BlockManager {
{
let who = self
.system
- .cluster_layout()
+ .rpc_helper()
.block_read_nodes_of(hash, self.system.rpc_helper());
for node in who.iter() {
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index eeaf4ffa..147c8b4f 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -8,7 +8,6 @@ use garage_util::data::*;
use super::schema::*;
use crate::replication_mode::ReplicationMode;
-use crate::rpc_helper::RpcHelper;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct LayoutDigest {
@@ -155,6 +154,10 @@ impl LayoutHelper {
self.ack_map_min
}
+ pub fn all_sync(&self) -> u64 {
+ self.sync_map_min
+ }
+
pub fn sync_versions(&self) -> (u64, u64, u64) {
(
self.layout().current().version,
@@ -177,28 +180,6 @@ impl LayoutHelper {
.collect()
}
- pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
- let mut ret = Vec::with_capacity(12);
- let ver_iter = self
- .layout()
- .versions
- .iter()
- .rev()
- .chain(self.layout().old_versions.iter().rev());
- for ver in ver_iter {
- if ver.version > self.sync_map_min {
- continue;
- }
- let nodes = ver.nodes_of(position, ver.replication_factor);
- for node in rpc_helper.request_order(nodes) {
- if !ret.contains(&node) {
- ret.push(node);
- }
- }
- }
- ret
- }
-
pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.layout()
.versions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index c6dcbe75..7e1387ed 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -26,7 +26,7 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
-use crate::layout::LayoutHelper;
+use crate::layout::{LayoutHelper, LayoutHistory};
use crate::metrics::RpcMetrics;
// Default RPC timeout = 5 minutes
@@ -304,7 +304,7 @@ impl RpcHelper {
// preemptively send an additional request to any remaining nodes.
// Reorder requests to priorize closeness / low latency
- let request_order = self.request_order(to.iter().copied());
+ let request_order = self.request_order(&self.0.layout.read().unwrap(), to.iter().copied());
let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false);
// Build future for each request
@@ -368,50 +368,6 @@ impl RpcHelper {
}
}
- pub fn request_order(&self, nodes: impl Iterator<Item = Uuid>) -> Vec<Uuid> {
- // Retrieve some status variables that we will use to sort requests
- let peer_list = self.0.fullmesh.get_peer_list();
- let layout = self.0.layout.read().unwrap();
- let our_zone = match layout.current().node_role(&self.0.our_node_id) {
- Some(pc) => &pc.zone,
- None => "",
- };
-
- // Augment requests with some information used to sort them.
- // The tuples are as follows:
- // (is another node?, is another zone?, latency, node ID, request future)
- // We store all of these tuples in a vec that we can sort.
- // By sorting this vec, we priorize ourself, then nodes in the same zone,
- // and within a same zone we priorize nodes with the lowest latency.
- let mut nodes = nodes
- .map(|to| {
- let peer_zone = match layout.current().node_role(&to) {
- Some(pc) => &pc.zone,
- None => "",
- };
- let peer_avg_ping = peer_list
- .iter()
- .find(|x| x.id.as_ref() == to.as_slice())
- .and_then(|pi| pi.avg_ping)
- .unwrap_or_else(|| Duration::from_secs(10));
- (
- to != self.0.our_node_id,
- peer_zone != our_zone,
- peer_avg_ping,
- to,
- )
- })
- .collect::<Vec<_>>();
-
- // Sort requests by (priorize ourself, priorize same zone, priorize low latency)
- nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping));
-
- nodes
- .into_iter()
- .map(|(_, _, _, to)| to)
- .collect::<Vec<_>>()
- }
-
/// Make a RPC call to multiple servers, returning either a Vec of responses,
/// or an error if quorum could not be reached due to too many errors
///
@@ -533,6 +489,79 @@ impl RpcHelper {
// Failure, could not get quorum
Err(result_tracker.quorum_error())
}
+
+ // ---- functions not related to MAKING RPCs, but just determining to what nodes
+ // they should be made and in which order ----
+
+ pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
+ let layout = self.0.layout.read().unwrap();
+
+ let mut ret = Vec::with_capacity(12);
+ let ver_iter = layout
+ .versions
+ .iter()
+ .rev()
+ .chain(layout.old_versions.iter().rev());
+ for ver in ver_iter {
+ if ver.version > layout.all_sync() {
+ continue;
+ }
+ let nodes = ver.nodes_of(position, ver.replication_factor);
+ for node in rpc_helper.request_order(&layout, nodes) {
+ if !ret.contains(&node) {
+ ret.push(node);
+ }
+ }
+ }
+ ret
+ }
+
+ fn request_order(
+ &self,
+ layout: &LayoutHistory,
+ nodes: impl Iterator<Item = Uuid>,
+ ) -> Vec<Uuid> {
+ // Retrieve some status variables that we will use to sort requests
+ let peer_list = self.0.fullmesh.get_peer_list();
+ let our_zone = match layout.current().node_role(&self.0.our_node_id) {
+ Some(pc) => &pc.zone,
+ None => "",
+ };
+
+ // Augment requests with some information used to sort them.
+ // The tuples are as follows:
+ // (is another node?, is another zone?, latency, node ID, request future)
+ // We store all of these tuples in a vec that we can sort.
+ // By sorting this vec, we priorize ourself, then nodes in the same zone,
+ // and within a same zone we priorize nodes with the lowest latency.
+ let mut nodes = nodes
+ .map(|to| {
+ let peer_zone = match layout.current().node_role(&to) {
+ Some(pc) => &pc.zone,
+ None => "",
+ };
+ let peer_avg_ping = peer_list
+ .iter()
+ .find(|x| x.id.as_ref() == to.as_slice())
+ .and_then(|pi| pi.avg_ping)
+ .unwrap_or_else(|| Duration::from_secs(10));
+ (
+ to != self.0.our_node_id,
+ peer_zone != our_zone,
+ peer_avg_ping,
+ to,
+ )
+ })
+ .collect::<Vec<_>>();
+
+ // Sort requests by (priorize ourself, priorize same zone, priorize low latency)
+ nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping));
+
+ nodes
+ .into_iter()
+ .map(|(_, _, _, to)| to)
+ .collect::<Vec<_>>()
+ }
}
// ------- utility for tracking successes/errors among write sets --------