aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-03-21 10:26:36 +0000
committerAlex <alex@adnab.me>2024-03-21 10:26:36 +0000
commit7e0107c47db71e8da13990c9111ebde8cbf60d8f (patch)
tree3deae99e0651b66e6d12f8e3336dca9016e23ecc /src
parentce69dc302c6eaad4fe5268cca3511620fcca12f8 (diff)
parent3844110cd03210a1600d57db1aab53e41cf4815f (diff)
downloadgarage-7e0107c47db71e8da13990c9111ebde8cbf60d8f.tar.gz
garage-7e0107c47db71e8da13990c9111ebde8cbf60d8f.zip
Merge pull request 'Fixes to garage_net peering manager' (#786) from net-fixes into next-0.10
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/786
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/cluster.rs2
-rw-r--r--src/garage/cli/cmd.rs21
-rw-r--r--src/net/netapp.rs51
-rw-r--r--src/net/peering.rs149
-rw-r--r--src/rpc/system.rs15
5 files changed, 118 insertions, 120 deletions
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 8ce6c5ed..8c9cb1e5 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -27,7 +27,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
i.id,
NodeResp {
id: hex::encode(i.id),
- addr: Some(i.addr),
+ addr: i.addr,
hostname: i.status.hostname,
is_up: i.is_up,
last_seen_secs_ago: i.last_seen_secs_ago,
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index a84061a7..44d3d96c 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -57,6 +57,10 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
let host = adv.status.hostname.as_deref().unwrap_or("?");
+ let addr = match adv.addr {
+ Some(addr) => addr.to_string(),
+ None => "N/A".to_string(),
+ };
if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) {
let data_avail = match &adv.status.data_disk_avail {
_ if cfg.capacity.is_none() => "N/A".into(),
@@ -71,7 +75,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
id = adv.id,
host = host,
- addr = adv.addr,
+ addr = addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = cfg.capacity_string(),
@@ -91,7 +95,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...",
id = adv.id,
host = host,
- addr = adv.addr,
+ addr = addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
));
@@ -104,7 +108,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
"{id:?}\t{h}\t{addr}\t\t\t{new_role}",
id = adv.id,
h = host,
- addr = adv.addr,
+ addr = addr,
new_role = new_role,
));
}
@@ -120,8 +124,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
let tf = timeago::Formatter::new();
let mut drain_msg = false;
- let mut failed_nodes =
- vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
+ let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()];
let mut listed = HashSet::new();
for ver in layout.versions.iter().rev() {
for (node, _, role) in ver.roles.items().iter() {
@@ -142,15 +145,14 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
// Node is in a layout version, is not a gateway node, and is not up:
// it is in a failed state, add proper line to the output
- let (host, addr, last_seen) = match adv {
+ let (host, last_seen) = match adv {
Some(adv) => (
adv.status.hostname.as_deref().unwrap_or("?"),
- adv.addr.to_string(),
adv.last_seen_secs_ago
.map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()),
),
- None => ("??", "??".into(), "never seen".into()),
+ None => ("??", "never seen".into()),
};
let capacity = if ver.version == layout.current().version {
cfg.capacity_string()
@@ -159,10 +161,9 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
"draining metadata...".to_string()
};
failed_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
+ "{id:?}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = node,
host = host,
- addr = addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = capacity,
diff --git a/src/net/netapp.rs b/src/net/netapp.rs
index faa51a99..6480a126 100644
--- a/src/net/netapp.rs
+++ b/src/net/netapp.rs
@@ -292,13 +292,7 @@ impl NetApp {
/// the other node with `Netapp::request`
pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
// Don't connect to ourself, we don't care
- // but pretend we did
if id == self.id {
- tokio::spawn(async move {
- if let Some(h) = self.on_connected_handler.load().as_ref() {
- h(id, ip, false);
- }
- });
return Ok(());
}
@@ -327,31 +321,32 @@ impl NetApp {
/// Close the outgoing connection we have to a node specified by its public key,
/// if such a connection is currently open.
pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
+ let conn = self.client_conns.write().unwrap().remove(id);
+
// If id is ourself, we're not supposed to have a connection open
- if *id != self.id {
- let conn = self.client_conns.write().unwrap().remove(id);
- if let Some(c) = conn {
- debug!(
- "Closing connection to {} ({})",
- hex::encode(&c.peer_id[..8]),
- c.remote_addr
- );
- c.close();
- } else {
- return;
- }
+ if *id == self.id {
+ // sanity check
+ assert!(conn.is_none(), "had a connection to local node");
+ return;
}
- // call on_disconnected_handler immediately, since the connection
- // was removed
- // (if id == self.id, we pretend we disconnected)
- let id = *id;
- let self2 = self.clone();
- tokio::spawn(async move {
- if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
- h(id, false);
- }
- });
+ if let Some(c) = conn {
+ debug!(
+ "Closing connection to {} ({})",
+ hex::encode(&c.peer_id[..8]),
+ c.remote_addr
+ );
+ c.close();
+
+ // call on_disconnected_handler immediately, since the connection was removed
+ let id = *id;
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
+ h(id, false);
+ }
+ });
+ }
}
// Called from conn.rs when an incoming connection is successfully established
diff --git a/src/net/peering.rs b/src/net/peering.rs
index 61882a18..b4271231 100644
--- a/src/net/peering.rs
+++ b/src/net/peering.rs
@@ -54,12 +54,8 @@ impl Message for PeerListMessage {
#[derive(Debug)]
struct PeerInfoInternal {
- // addr is the currently connected address,
- // or the last address we were connected to,
- // or an arbitrary address some other peer gave us
- addr: SocketAddr,
- // all_addrs contains all of the addresses everyone gave us
- all_addrs: Vec<SocketAddr>,
+ // known_addrs contains all of the addresses everyone gave us
+ known_addrs: Vec<SocketAddr>,
state: PeerConnState,
last_send_ping: Option<Instant>,
@@ -69,10 +65,9 @@ struct PeerInfoInternal {
}
impl PeerInfoInternal {
- fn new(addr: SocketAddr, state: PeerConnState) -> Self {
+ fn new(state: PeerConnState, known_addr: Option<SocketAddr>) -> Self {
Self {
- addr,
- all_addrs: vec![addr],
+ known_addrs: known_addr.map(|x| vec![x]).unwrap_or_default(),
state,
last_send_ping: None,
last_seen: None,
@@ -81,8 +76,8 @@ impl PeerInfoInternal {
}
}
fn add_addr(&mut self, addr: SocketAddr) -> bool {
- if !self.all_addrs.contains(&addr) {
- self.all_addrs.push(addr);
+ if !self.known_addrs.contains(&addr) {
+ self.known_addrs.push(addr);
// If we are learning a new address for this node,
// we want to retry connecting
self.state = match self.state {
@@ -90,7 +85,7 @@ impl PeerInfoInternal {
PeerConnState::Waiting(_, _) | PeerConnState::Abandonned => {
PeerConnState::Waiting(0, Instant::now())
}
- x @ (PeerConnState::Ourself | PeerConnState::Connected) => x,
+ x @ (PeerConnState::Ourself | PeerConnState::Connected { .. }) => x,
};
true
} else {
@@ -104,8 +99,6 @@ impl PeerInfoInternal {
pub struct PeerInfo {
/// The node's identifier (its public key)
pub id: NodeID,
- /// The node's network address
- pub addr: SocketAddr,
/// The current status of our connection to this node
pub state: PeerConnState,
/// The last time at which the node was seen
@@ -136,7 +129,7 @@ pub enum PeerConnState {
Ourself,
/// We currently have a connection to this peer
- Connected,
+ Connected { addr: SocketAddr },
/// Our next connection tentative (the nth, where n is the first value of the tuple)
/// will be at given Instant
@@ -152,7 +145,7 @@ pub enum PeerConnState {
impl PeerConnState {
/// Returns true if we can currently send requests to this peer
pub fn is_up(&self) -> bool {
- matches!(self, Self::Ourself | Self::Connected)
+ matches!(self, Self::Ourself | Self::Connected { .. })
}
}
@@ -164,29 +157,42 @@ struct KnownHosts {
impl KnownHosts {
fn new() -> Self {
let list = HashMap::new();
- let hash = Self::calculate_hash(vec![]);
- Self { list, hash }
+ let mut ret = Self {
+ list,
+ hash: hash::Digest::from_slice(&[0u8; 64][..]).unwrap(),
+ };
+ ret.update_hash();
+ ret
}
fn update_hash(&mut self) {
- self.hash = Self::calculate_hash(self.connected_peers_vec());
- }
- fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> {
- let mut list = Vec::with_capacity(self.list.len());
- for (id, peer) in self.list.iter() {
- if peer.state.is_up() {
- list.push((*id, peer.addr));
- }
- }
- list
- }
- fn calculate_hash(mut list: Vec<(NodeID, SocketAddr)>) -> hash::Digest {
+ // The hash is a value that is exchanged between nodes when they ping one
+ // another. Nodes compare their known hosts hash to know if they are connected
+ // to the same set of nodes. If the hashes differ, they are connected to
+ // different nodes and they trigger an exchange of the full list of active
+ // connections. The hash value only represents the set of node IDs and not
+ // their actual socket addresses, because nodes can be connected via different
+ // addresses and that shouldn't necessarily trigger a full peer exchange.
+ let mut list = self
+ .list
+ .iter()
+ .filter(|(_, peer)| peer.state.is_up())
+ .map(|(id, _)| *id)
+ .collect::<Vec<_>>();
list.sort();
let mut hash_state = hash::State::new();
- for (id, addr) in list {
+ for id in list {
hash_state.update(&id[..]);
- hash_state.update(&format!("{}\n", addr).into_bytes()[..]);
}
- hash_state.finalize()
+ self.hash = hash_state.finalize();
+ }
+ fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> {
+ self.list
+ .iter()
+ .filter_map(|(id, peer)| match peer.state {
+ PeerConnState::Connected { addr } => Some((*id, addr)),
+ _ => None,
+ })
+ .collect::<Vec<_>>()
}
}
@@ -220,18 +226,16 @@ impl PeeringManager {
if id != netapp.id {
known_hosts.list.insert(
id,
- PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
+ PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr)),
);
}
}
- if let Some(addr) = our_addr {
- known_hosts.list.insert(
- netapp.id,
- PeerInfoInternal::new(addr, PeerConnState::Ourself),
- );
- known_hosts.update_hash();
- }
+ known_hosts.list.insert(
+ netapp.id,
+ PeerInfoInternal::new(PeerConnState::Ourself, our_addr),
+ );
+ known_hosts.update_hash();
// TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility)
let strat = Arc::new(Self {
@@ -276,7 +280,7 @@ impl PeeringManager {
for (id, info) in known_hosts.list.iter() {
trace!("{}, {:?}", hex::encode(&id[..8]), info);
match info.state {
- PeerConnState::Connected => {
+ PeerConnState::Connected { .. } => {
let must_ping = match info.last_send_ping {
None => true,
Some(t) => Instant::now() - t > PING_INTERVAL,
@@ -319,7 +323,7 @@ impl PeeringManager {
info!(
"Retrying connection to {} at {} ({})",
hex::encode(&id[..8]),
- h.all_addrs
+ h.known_addrs
.iter()
.map(|x| format!("{}", x))
.collect::<Vec<_>>()
@@ -328,13 +332,8 @@ impl PeeringManager {
);
h.state = PeerConnState::Trying(i);
- let alternate_addrs = h
- .all_addrs
- .iter()
- .filter(|x| **x != h.addr)
- .cloned()
- .collect::<Vec<_>>();
- tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs));
+ let addresses = h.known_addrs.clone();
+ tokio::spawn(self.clone().try_connect(id, addresses));
}
}
}
@@ -362,27 +361,24 @@ impl PeeringManager {
fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len());
for (id, info) in known_hosts.list.iter() {
+ if *id == self.netapp.id {
+ // sanity check
+ assert!(matches!(info.state, PeerConnState::Ourself));
+ }
let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
pings.sort();
if !pings.is_empty() {
pub_peer_list.push(PeerInfo {
id: *id,
- addr: info.addr,
state: info.state,
last_seen: info.last_seen,
- avg_ping: Some(
- pings
- .iter()
- .fold(Duration::from_secs(0), |x, y| x + *y)
- .div_f64(pings.len() as f64),
- ),
+ avg_ping: Some(pings.iter().sum::<Duration>().div_f64(pings.len() as f64)),
max_ping: pings.last().cloned(),
med_ping: Some(pings[pings.len() / 2]),
});
} else {
pub_peer_list.push(PeerInfo {
id: *id,
- addr: info.addr,
state: info.state,
last_seen: info.last_seen,
avg_ping: None,
@@ -495,15 +491,10 @@ impl PeeringManager {
}
}
- async fn try_connect(
- self: Arc<Self>,
- id: NodeID,
- default_addr: SocketAddr,
- alternate_addrs: Vec<SocketAddr>,
- ) {
+ async fn try_connect(self: Arc<Self>, id: NodeID, addresses: Vec<SocketAddr>) {
let conn_addr = {
let mut ret = None;
- for addr in [default_addr].iter().chain(alternate_addrs.iter()) {
+ for addr in addresses.iter() {
debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8]));
match self.netapp.clone().try_connect(*addr, id).await {
Ok(()) => {
@@ -529,7 +520,7 @@ impl PeeringManager {
warn!(
"Could not connect to peer {} ({} addresses tried)",
hex::encode(&id[..8]),
- 1 + alternate_addrs.len()
+ addresses.len()
);
let mut known_hosts = self.known_hosts.write().unwrap();
if let Some(host) = known_hosts.list.get_mut(&id) {
@@ -549,6 +540,14 @@ impl PeeringManager {
}
fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
+ if id == self.netapp.id {
+ // sanity check
+ panic!(
+ "on_connected from local node, id={:?}, addr={}, incoming={}",
+ id, addr, is_incoming
+ );
+ }
+
let mut known_hosts = self.known_hosts.write().unwrap();
if is_incoming {
if let Some(host) = known_hosts.list.get_mut(&id) {
@@ -563,13 +562,13 @@ impl PeeringManager {
addr
);
if let Some(host) = known_hosts.list.get_mut(&id) {
- host.state = PeerConnState::Connected;
- host.addr = addr;
+ host.state = PeerConnState::Connected { addr };
host.add_addr(addr);
} else {
- known_hosts
- .list
- .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
+ known_hosts.list.insert(
+ id,
+ PeerInfoInternal::new(PeerConnState::Connected { addr }, Some(addr)),
+ );
}
}
known_hosts.update_hash();
@@ -589,12 +588,8 @@ impl PeeringManager {
}
fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal {
- let state = if *id == self.netapp.id {
- PeerConnState::Ourself
- } else {
- PeerConnState::Waiting(0, Instant::now())
- };
- PeerInfoInternal::new(addr, state)
+ assert!(*id != self.netapp.id);
+ PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr))
}
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 54d589d2..9da1b176 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -16,7 +16,7 @@ use tokio::sync::{watch, Notify};
use garage_net::endpoint::{Endpoint, EndpointHandler};
use garage_net::message::*;
-use garage_net::peering::PeeringManager;
+use garage_net::peering::{PeerConnState, PeeringManager};
use garage_net::util::parse_and_resolve_peer_addr_async;
use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
@@ -142,7 +142,7 @@ pub struct NodeStatus {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KnownNodeInfo {
pub id: Uuid,
- pub addr: SocketAddr,
+ pub addr: Option<SocketAddr>,
pub is_up: bool,
pub last_seen_secs_ago: Option<u64>,
pub status: NodeStatus,
@@ -381,7 +381,11 @@ impl System {
.iter()
.map(|n| KnownNodeInfo {
id: n.id.into(),
- addr: n.addr,
+ addr: match n.state {
+ PeerConnState::Ourself => self.rpc_public_addr,
+ PeerConnState::Connected { addr } => Some(addr),
+ _ => None,
+ },
is_up: n.is_up(),
last_seen_secs_ago: n
.last_seen
@@ -722,7 +726,10 @@ impl System {
.peering
.get_peer_list()
.iter()
- .map(|n| (n.id.into(), n.addr))
+ .filter_map(|n| match n.state {
+ PeerConnState::Connected { addr } => Some((n.id.into(), addr)),
+ _ => None,
+ })
.collect::<Vec<_>>();
// Before doing it, we read the current peer list file (if it exists)