diff options
author | Alex Auvolat <alex@adnab.me> | 2022-05-09 11:54:34 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-05-09 11:54:34 +0200 |
commit | 677c47154817857c806ec755bae6cf1b83023cc3 (patch) | |
tree | 0fe5068f4635e751a070fc3fb061884dacbe0614 | |
parent | faecefc7a8ce91582b33f3da45a3ba4669afeffe (diff) | |
download | netapp-677c47154817857c806ec755bae6cf1b83023cc3.tar.gz netapp-677c47154817857c806ec755bae6cf1b83023cc3.zip |
Handle the possibility of several alternative IP addresses for peers
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/peering/fullmesh.rs | 77 |
3 files changed, 70 insertions, 11 deletions
@@ -433,7 +433,7 @@ dependencies = [ [[package]] name = "netapp" -version = "0.4.1" +version = "0.4.3" dependencies = [ "arc-swap", "async-trait", @@ -1,6 +1,6 @@ [package] name = "netapp" -version = "0.4.2" +version = "0.4.3" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license-file = "LICENSE" diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 52d7fda..bb0c9dc 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -52,7 +52,13 @@ impl Message for PeerListMessage { #[derive(Debug)] struct PeerInfoInternal { + // addr is the currently connected address, + // or the last address we were connected to, + // or an arbitrary address some other peer gave us addr: SocketAddr, + // all_addrs contains all of the addresses everyone gave us + all_addrs: Vec<SocketAddr>, + state: PeerConnState, last_seen: Option<Instant>, ping: VecDeque<Duration>, @@ -180,6 +186,7 @@ impl FullMeshPeeringStrategy { id, PeerInfoInternal { addr, + all_addrs: vec![addr], state: PeerConnState::Waiting(0, Instant::now()), last_seen: None, ping: VecDeque::new(), @@ -194,6 +201,7 @@ impl FullMeshPeeringStrategy { netapp.id, PeerInfoInternal { addr, + all_addrs: vec![addr], state: PeerConnState::Ourself, last_seen: None, ping: VecDeque::new(), @@ -281,11 +289,22 @@ impl FullMeshPeeringStrategy { info!( "Retrying connection to {} at {} ({})", hex::encode(&id[..8]), - h.addr, + h.all_addrs + .iter() + .map(|x| format!("{}", x)) + .collect::<Vec<_>>() + .join(", "), i + 1 ); h.state = PeerConnState::Trying(i); - tokio::spawn(self.clone().try_connect(id, h.addr)); + + let alternate_addrs = h + .all_addrs + .iter() + .filter(|x| **x != h.addr) + .cloned() + .collect::<Vec<_>>(); + tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs)); } } } @@ -422,7 +441,12 @@ impl FullMeshPeeringStrategy { let mut changed = false; for (id, addr) in list.iter() { - if !known_hosts.list.contains_key(id) { + if let Some(kh) = known_hosts.list.get_mut(id) { + if !kh.all_addrs.contains(addr) { + kh.all_addrs.push(*addr); + changed = true; + } + } else { known_hosts.list.insert(*id, self.new_peer(id, *addr)); changed = true; } @@ -434,10 +458,42 @@ impl FullMeshPeeringStrategy { } } - async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) { - let conn_result = self.netapp.clone().try_connect(addr, id).await; - if let Err(e) = conn_result { - warn!("Error connecting to {}: {}", hex::encode(&id[..8]), e); + async fn try_connect( + self: Arc<Self>, + id: NodeID, + default_addr: SocketAddr, + alternate_addrs: Vec<SocketAddr>, + ) { + let conn_addr = { + let mut ret = None; + for addr in [default_addr].iter().chain(alternate_addrs.iter()) { + debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8])); + match self.netapp.clone().try_connect(*addr, id).await { + Ok(()) => { + ret = Some(*addr); + break; + } + Err(e) => { + debug!( + "Error connecting to {} at {}: {}", + hex::encode(&id[..8]), + addr, + e + ); + } + } + } + ret + }; + + if let Some(ok_addr) = conn_addr { + self.on_connected(id, ok_addr, false); + } else { + warn!( + "Could not connect to peer {} ({} addresses tried)", + hex::encode(&id[..8]), + 1 + alternate_addrs.len() + ); let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.state = match host.state { @@ -452,8 +508,6 @@ impl FullMeshPeeringStrategy { }; self.update_public_peer_list(&known_hosts); } - } else { - self.on_connected(id, addr, false); } } @@ -475,12 +529,16 @@ impl FullMeshPeeringStrategy { if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Connected; host.addr = addr; + if !host.all_addrs.contains(&addr) { + host.all_addrs.push(addr); + } } else { known_hosts.list.insert( id, PeerInfoInternal { state: PeerConnState::Connected, addr, + all_addrs: vec![addr], last_seen: None, ping: VecDeque::new(), failed_pings: 0, @@ -512,6 +570,7 @@ impl FullMeshPeeringStrategy { }; PeerInfoInternal { addr, + all_addrs: vec![addr], state, last_seen: None, ping: VecDeque::new(), |