use std::collections::{HashMap, VecDeque}; use std::net::SocketAddr; use std::sync::atomic::{self, AtomicU64}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use arc_swap::ArcSwap; use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use sodiumoxide::crypto::hash; use crate::endpoint::*; use crate::netapp::*; use crate::proto::*; use crate::NodeID; const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); const CONN_MAX_RETRIES: usize = 10; const PING_INTERVAL: Duration = Duration::from_secs(10); const LOOP_DELAY: Duration = Duration::from_secs(1); // -- Protocol messages -- #[derive(Serialize, Deserialize)] struct PingMessage { pub id: u64, pub peer_list_hash: hash::Digest, } impl Message for PingMessage { type Response = PingMessage; } #[derive(Serialize, Deserialize)] struct PeerListMessage { pub list: Vec<(NodeID, SocketAddr)>, } impl Message for PeerListMessage { type Response = PeerListMessage; } // -- Algorithm data structures -- #[derive(Debug)] struct PeerInfoInternal { addr: SocketAddr, state: PeerConnState, last_seen: Option, ping: VecDeque, } #[derive(Copy, Clone, Debug)] pub struct PeerInfo { /// The node's identifier (its public key) pub id: NodeID, /// The node's network address pub addr: SocketAddr, /// The current status of our connection to this node pub state: PeerConnState, /// The last time at which the node was seen pub last_seen: Option, /// The average ping to this node on recent observations (if at least one ping value is known) pub avg_ping: Option, /// The maximum observed ping to this node on recent observations (if at least one /// ping value is known) pub max_ping: Option, /// The median ping to this node on recent observations (if at least one ping value /// is known) pub med_ping: Option, } impl PeerInfo { /// Returns true if we can currently send requests to this peer pub fn is_up(&self) -> bool { self.state.is_up() } } /// PeerConnState: possible states for our tentative connections to given peer /// This structure is only interested in recording connection info for outgoing /// TCP connections #[derive(Copy, Clone, Debug, PartialEq)] pub enum PeerConnState { /// This entry represents ourself (the local node) Ourself, /// We currently have a connection to this peer Connected, /// Our next connection tentative (the nth, where n is the first value of the tuple) /// will be at given Instant Waiting(usize, Instant), /// A connection tentative is in progress (the nth, where n is the value stored) Trying(usize), /// We abandonned trying to connect to this peer (too many failed attempts) Abandonned, } impl PeerConnState { /// Returns true if we can currently send requests to this peer pub fn is_up(&self) -> bool { matches!(self, Self::Ourself | Self::Connected) } } struct KnownHosts { list: HashMap, hash: hash::Digest, } impl KnownHosts { fn new() -> Self { let list = HashMap::new(); let hash = Self::calculate_hash(&list); Self { list, hash } } fn update_hash(&mut self) { self.hash = Self::calculate_hash(&self.list); } fn map_into_vec(input: &HashMap) -> Vec<(NodeID, SocketAddr)> { let mut list = Vec::with_capacity(input.len()); for (id, peer) in input.iter() { if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { list.push((*id, peer.addr)); } } list } fn calculate_hash(input: &HashMap) -> hash::Digest { let mut list = Self::map_into_vec(input); list.sort(); let mut hash_state = hash::State::new(); for (id, addr) in list { hash_state.update(&id[..]); hash_state.update(&format!("{}\n", addr).into_bytes()[..]); } hash_state.finalize() } } /// A "Full Mesh" peering strategy is a peering strategy that tries /// to establish and maintain a direct connection with all of the /// known nodes in the network. pub struct FullMeshPeeringStrategy { netapp: Arc, known_hosts: RwLock, public_peer_list: ArcSwap>, next_ping_id: AtomicU64, ping_endpoint: Arc>, peer_list_endpoint: Arc>, } impl FullMeshPeeringStrategy { /// Create a new Full Mesh peering strategy. /// The strategy will not be run until `.run()` is called and awaited. /// Once that happens, the peering strategy will try to connect /// to all of the nodes specified in the bootstrap list. pub fn new(netapp: Arc, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc { let mut known_hosts = KnownHosts::new(); for (id, addr) in bootstrap_list { if id != netapp.id { known_hosts.list.insert( id, PeerInfoInternal { addr, state: PeerConnState::Waiting(0, Instant::now()), last_seen: None, ping: VecDeque::new(), }, ); } } let strat = Arc::new(Self { netapp: netapp.clone(), known_hosts: RwLock::new(known_hosts), public_peer_list: ArcSwap::new(Arc::new(Vec::new())), next_ping_id: AtomicU64::new(42), ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()), peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()), }); strat.ping_endpoint.set_handler(strat.clone()); strat.peer_list_endpoint.set_handler(strat.clone()); let strat2 = strat.clone(); netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { let strat2 = strat2.clone(); tokio::spawn(strat2.on_connected(id, addr, is_incoming)); }); let strat2 = strat.clone(); netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { let strat2 = strat2.clone(); tokio::spawn(strat2.on_disconnected(id, is_incoming)); }); strat } /// Run the full mesh peering strategy. /// This future exits when the `must_exit` watch becomes true. pub async fn run(self: Arc, must_exit: watch::Receiver) { while !*must_exit.borrow() { // 1. Read current state: get list of connected peers (ping them) let (to_ping, to_retry) = { let known_hosts = self.known_hosts.read().unwrap(); trace!("known_hosts: {} peers", known_hosts.list.len()); let mut to_ping = vec![]; let mut to_retry = vec![]; for (id, info) in known_hosts.list.iter() { trace!("{}, {:?}", hex::encode(id), info); match info.state { PeerConnState::Connected => { let must_ping = match info.last_seen { None => true, Some(t) => Instant::now() - t > PING_INTERVAL, }; if must_ping { to_ping.push(*id); } } PeerConnState::Waiting(_, t) => { if Instant::now() >= t { to_retry.push(*id); } } _ => (), } } (to_ping, to_retry) }; // 2. Dispatch ping to hosts trace!("to_ping: {} peers", to_retry.len()); for id in to_ping { tokio::spawn(self.clone().ping(id)); } // 3. Try reconnects trace!("to_retry: {} peers", to_retry.len()); if !to_retry.is_empty() { let mut known_hosts = self.known_hosts.write().unwrap(); for id in to_retry { if let Some(h) = known_hosts.list.get_mut(&id) { if let PeerConnState::Waiting(i, _) = h.state { info!( "Retrying connection to {} at {} ({})", hex::encode(&id), h.addr, i + 1 ); h.state = PeerConnState::Trying(i); tokio::spawn(self.clone().try_connect(id, h.addr)); } } } self.update_public_peer_list(&known_hosts); } // 4. Sleep before next loop iteration tokio::time::sleep(LOOP_DELAY).await; } } /// Returns a list of currently known peers in the network. pub fn get_peer_list(&self) -> Arc> { self.public_peer_list.load_full() } // -- internal stuff -- fn update_public_peer_list(&self, known_hosts: &KnownHosts) { let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len()); for (id, info) in known_hosts.list.iter() { let mut pings = info.ping.iter().cloned().collect::>(); pings.sort(); if !pings.is_empty() { pub_peer_list.push(PeerInfo { id: *id, addr: info.addr, state: info.state, last_seen: info.last_seen, avg_ping: Some( pings .iter() .fold(Duration::from_secs(0), |x, y| x + *y) .div_f64(pings.len() as f64), ), max_ping: pings.last().cloned(), med_ping: Some(pings[pings.len() / 2]), }); } else { pub_peer_list.push(PeerInfo { id: *id, addr: info.addr, state: info.state, last_seen: info.last_seen, avg_ping: None, max_ping: None, med_ping: None, }); } } self.public_peer_list.store(Arc::new(pub_peer_list)); } async fn ping(self: Arc, id: NodeID) { let peer_list_hash = self.known_hosts.read().unwrap().hash; let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); let ping_time = Instant::now(); let ping_msg = PingMessage { id: ping_id, peer_list_hash, }; debug!( "Sending ping {} to {} at {:?}", ping_id, hex::encode(id), ping_time ); match self.ping_endpoint.call(&id, &ping_msg, PRIO_HIGH).await { Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e), Ok(ping_resp) => { let resp_time = Instant::now(); debug!( "Got ping response from {} at {:?}", hex::encode(id), resp_time ); { let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.last_seen = Some(resp_time); host.ping.push_back(resp_time - ping_time); while host.ping.len() > 10 { host.ping.pop_front(); } self.update_public_peer_list(&known_hosts); } } if ping_resp.peer_list_hash != peer_list_hash { self.exchange_peers(&id).await; } } } } async fn exchange_peers(self: Arc, id: &NodeID) { let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let pex_message = PeerListMessage { list: peer_list }; match self .peer_list_endpoint .call(id, &pex_message, PRIO_BACKGROUND) .await { Err(e) => warn!("Error doing peer exchange: {}", e), Ok(resp) => { self.handle_peer_list(&resp.list[..]); } } } fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) { let mut known_hosts = self.known_hosts.write().unwrap(); for (id, addr) in list.iter() { if !known_hosts.list.contains_key(id) { known_hosts.list.insert(*id, self.new_peer(id, *addr)); } } self.update_public_peer_list(&known_hosts); } async fn try_connect(self: Arc, id: NodeID, addr: SocketAddr) { let conn_result = self.netapp.clone().try_connect(addr, id).await; if let Err(e) = conn_result { warn!("Error connecting to {}: {}", hex::encode(id), e); let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.state = match host.state { PeerConnState::Trying(i) => { if i >= CONN_MAX_RETRIES { PeerConnState::Abandonned } else { PeerConnState::Waiting(i + 1, Instant::now() + CONN_RETRY_INTERVAL) } } _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL), }; self.update_public_peer_list(&known_hosts); } } } async fn on_connected(self: Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { if is_incoming { if !self.known_hosts.read().unwrap().list.contains_key(&id) { self.known_hosts .write() .unwrap() .list .insert(id, self.new_peer(&id, addr)); } } else { info!("Successfully connected to {} at {}", hex::encode(&id), addr); let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Connected; known_hosts.update_hash(); self.update_public_peer_list(&known_hosts); } } } async fn on_disconnected(self: Arc, id: NodeID, is_incoming: bool) { if !is_incoming { info!("Connection to {} was closed", hex::encode(id)); let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Waiting(0, Instant::now()); known_hosts.update_hash(); self.update_public_peer_list(&known_hosts); } } } fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal { let state = if *id == self.netapp.id { PeerConnState::Ourself } else { PeerConnState::Waiting(0, Instant::now()) }; PeerInfoInternal { addr, state, last_seen: None, ping: VecDeque::new(), } } } #[async_trait] impl EndpointHandler for FullMeshPeeringStrategy { async fn handle(self: &Arc, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { id: ping.id, peer_list_hash: self.known_hosts.read().unwrap().hash, }; debug!("Ping from {}", hex::encode(&from)); ping_resp } } #[async_trait] impl EndpointHandler for FullMeshPeeringStrategy { async fn handle( self: &Arc, peer_list: &PeerListMessage, _from: NodeID, ) -> PeerListMessage { self.handle_peer_list(&peer_list.list[..]); let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); PeerListMessage { list: peer_list } } }