aboutsummaryrefslogtreecommitdiff
path: root/src/net/peering.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/peering.rs')
-rw-r--r--src/net/peering.rs124
1 files changed, 54 insertions, 70 deletions
diff --git a/src/net/peering.rs b/src/net/peering.rs
index f4283683..0b4fec9e 100644
--- a/src/net/peering.rs
+++ b/src/net/peering.rs
@@ -43,7 +43,7 @@ impl Message for PingMessage {
#[derive(Serialize, Deserialize)]
struct PeerListMessage {
- pub list: Vec<(NodeID, SocketAddr)>,
+ pub list: Vec<(NodeID, Vec<SocketAddr>)>,
}
impl Message for PeerListMessage {
@@ -54,12 +54,8 @@ impl Message for PeerListMessage {
#[derive(Debug)]
struct PeerInfoInternal {
- // addr is the currently connected address,
- // or the last address we were connected to,
- // or an arbitrary address some other peer gave us
- addr: SocketAddr,
- // all_addrs contains all of the addresses everyone gave us
- all_addrs: Vec<SocketAddr>,
+ // known_addrs contains all of the addresses everyone gave us
+ known_addrs: Vec<SocketAddr>,
state: PeerConnState,
last_send_ping: Option<Instant>,
@@ -69,10 +65,9 @@ struct PeerInfoInternal {
}
impl PeerInfoInternal {
- fn new(addr: SocketAddr, state: PeerConnState) -> Self {
+ fn new(state: PeerConnState, known_addr: Option<SocketAddr>) -> Self {
Self {
- addr,
- all_addrs: vec![addr],
+ known_addrs: known_addr.map(|x| vec![x]).unwrap_or_default(),
state,
last_send_ping: None,
last_seen: None,
@@ -81,8 +76,8 @@ impl PeerInfoInternal {
}
}
fn add_addr(&mut self, addr: SocketAddr) -> bool {
- if !self.all_addrs.contains(&addr) {
- self.all_addrs.push(addr);
+ if !self.known_addrs.contains(&addr) {
+ self.known_addrs.push(addr);
// If we are learning a new address for this node,
// we want to retry connecting
self.state = match self.state {
@@ -90,7 +85,7 @@ impl PeerInfoInternal {
PeerConnState::Waiting(_, _) | PeerConnState::Abandonned => {
PeerConnState::Waiting(0, Instant::now())
}
- x @ (PeerConnState::Ourself | PeerConnState::Connected) => x,
+ x @ (PeerConnState::Ourself | PeerConnState::Connected { .. }) => x,
};
true
} else {
@@ -104,8 +99,6 @@ impl PeerInfoInternal {
pub struct PeerInfo {
/// The node's identifier (its public key)
pub id: NodeID,
- /// The node's network address
- pub addr: SocketAddr,
/// The current status of our connection to this node
pub state: PeerConnState,
/// The last time at which the node was seen
@@ -136,7 +129,7 @@ pub enum PeerConnState {
Ourself,
/// We currently have a connection to this peer
- Connected,
+ Connected { addr: SocketAddr },
/// Our next connection tentative (the nth, where n is the first value of the tuple)
/// will be at given Instant
@@ -152,7 +145,7 @@ pub enum PeerConnState {
impl PeerConnState {
/// Returns true if we can currently send requests to this peer
pub fn is_up(&self) -> bool {
- matches!(self, Self::Ourself | Self::Connected)
+ matches!(self, Self::Ourself | Self::Connected { .. })
}
}
@@ -192,11 +185,11 @@ impl KnownHosts {
}
self.hash = hash_state.finalize();
}
- fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> {
+ fn connected_peers_vec(&self) -> Vec<(NodeID, Vec<SocketAddr>)> {
self.list
.iter()
.filter(|(_, peer)| peer.state.is_up())
- .map(|(id, peer)| (*id, peer.addr))
+ .map(|(id, peer)| (*id, peer.known_addrs.clone()))
.collect::<Vec<_>>()
}
}
@@ -231,18 +224,16 @@ impl PeeringManager {
if id != netapp.id {
known_hosts.list.insert(
id,
- PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
+ PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr)),
);
}
}
- if let Some(addr) = our_addr {
- known_hosts.list.insert(
- netapp.id,
- PeerInfoInternal::new(addr, PeerConnState::Ourself),
- );
- known_hosts.update_hash();
- }
+ known_hosts.list.insert(
+ netapp.id,
+ PeerInfoInternal::new(PeerConnState::Ourself, our_addr),
+ );
+ known_hosts.update_hash();
// TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility)
let strat = Arc::new(Self {
@@ -287,7 +278,7 @@ impl PeeringManager {
for (id, info) in known_hosts.list.iter() {
trace!("{}, {:?}", hex::encode(&id[..8]), info);
match info.state {
- PeerConnState::Connected => {
+ PeerConnState::Connected { .. } => {
let must_ping = match info.last_send_ping {
None => true,
Some(t) => Instant::now() - t > PING_INTERVAL,
@@ -330,7 +321,7 @@ impl PeeringManager {
info!(
"Retrying connection to {} at {} ({})",
hex::encode(&id[..8]),
- h.all_addrs
+ h.known_addrs
.iter()
.map(|x| format!("{}", x))
.collect::<Vec<_>>()
@@ -339,13 +330,8 @@ impl PeeringManager {
);
h.state = PeerConnState::Trying(i);
- let alternate_addrs = h
- .all_addrs
- .iter()
- .filter(|x| **x != h.addr)
- .cloned()
- .collect::<Vec<_>>();
- tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs));
+ let addresses = h.known_addrs.clone();
+ tokio::spawn(self.clone().try_connect(id, addresses));
}
}
}
@@ -373,27 +359,24 @@ impl PeeringManager {
fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len());
for (id, info) in known_hosts.list.iter() {
+ if *id == self.netapp.id {
+ // sanity check
+ assert!(matches!(info.state, PeerConnState::Ourself));
+ }
let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
pings.sort();
if !pings.is_empty() {
pub_peer_list.push(PeerInfo {
id: *id,
- addr: info.addr,
state: info.state,
last_seen: info.last_seen,
- avg_ping: Some(
- pings
- .iter()
- .fold(Duration::from_secs(0), |x, y| x + *y)
- .div_f64(pings.len() as f64),
- ),
+ avg_ping: Some(pings.iter().sum::<Duration>().div_f64(pings.len() as f64)),
max_ping: pings.last().cloned(),
med_ping: Some(pings[pings.len() / 2]),
});
} else {
pub_peer_list.push(PeerInfo {
id: *id,
- addr: info.addr,
state: info.state,
last_seen: info.last_seen,
avg_ping: None,
@@ -485,18 +468,20 @@ impl PeeringManager {
}
}
- fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) {
+ fn handle_peer_list(&self, list: &[(NodeID, Vec<SocketAddr>)]) {
let mut known_hosts = self.known_hosts.write().unwrap();
let mut changed = false;
- for (id, addr) in list.iter() {
- if let Some(kh) = known_hosts.list.get_mut(id) {
- if kh.add_addr(*addr) {
+ for (id, addrs) in list.iter() {
+ for addr in addrs.iter() {
+ if let Some(kh) = known_hosts.list.get_mut(id) {
+ if kh.add_addr(*addr) {
+ changed = true;
+ }
+ } else {
+ known_hosts.list.insert(*id, self.new_peer(id, *addr));
changed = true;
}
- } else {
- known_hosts.list.insert(*id, self.new_peer(id, *addr));
- changed = true;
}
}
@@ -506,15 +491,10 @@ impl PeeringManager {
}
}
- async fn try_connect(
- self: Arc<Self>,
- id: NodeID,
- default_addr: SocketAddr,
- alternate_addrs: Vec<SocketAddr>,
- ) {
+ async fn try_connect(self: Arc<Self>, id: NodeID, addresses: Vec<SocketAddr>) {
let conn_addr = {
let mut ret = None;
- for addr in [default_addr].iter().chain(alternate_addrs.iter()) {
+ for addr in addresses.iter() {
debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8]));
match self.netapp.clone().try_connect(*addr, id).await {
Ok(()) => {
@@ -540,7 +520,7 @@ impl PeeringManager {
warn!(
"Could not connect to peer {} ({} addresses tried)",
hex::encode(&id[..8]),
- 1 + alternate_addrs.len()
+ addresses.len()
);
let mut known_hosts = self.known_hosts.write().unwrap();
if let Some(host) = known_hosts.list.get_mut(&id) {
@@ -560,6 +540,14 @@ impl PeeringManager {
}
fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
+ if id == self.netapp.id {
+ // sanity check
+ panic!(
+ "on_connected from local node, id={:?}, addr={}, incoming={}",
+ id, addr, is_incoming
+ );
+ }
+
let mut known_hosts = self.known_hosts.write().unwrap();
if is_incoming {
if let Some(host) = known_hosts.list.get_mut(&id) {
@@ -574,13 +562,13 @@ impl PeeringManager {
addr
);
if let Some(host) = known_hosts.list.get_mut(&id) {
- host.state = PeerConnState::Connected;
- host.addr = addr;
+ host.state = PeerConnState::Connected { addr };
host.add_addr(addr);
} else {
- known_hosts
- .list
- .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
+ known_hosts.list.insert(
+ id,
+ PeerInfoInternal::new(PeerConnState::Connected { addr }, Some(addr)),
+ );
}
}
known_hosts.update_hash();
@@ -600,12 +588,8 @@ impl PeeringManager {
}
fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal {
- let state = if *id == self.netapp.id {
- PeerConnState::Ourself
- } else {
- PeerConnState::Waiting(0, Instant::now())
- };
- PeerInfoInternal::new(addr, state)
+ assert!(*id != self.netapp.id);
+ PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr))
}
}