From 01a2737bd8537da41a3babdd2b0949795492a59e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 14 Oct 2021 11:35:05 +0200 Subject: Document --- src/peering/fullmesh.rs | 143 +++++++++++++++++++++++++++++------------------- 1 file changed, 88 insertions(+), 55 deletions(-) (limited to 'src/peering/fullmesh.rs') diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 1162048..c521590 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -4,6 +4,7 @@ use std::sync::atomic::{self, AtomicU64}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; +use arc_swap::ArcSwap; use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; @@ -46,7 +47,7 @@ impl Message for PeerListMessage { // -- Algorithm data structures -- #[derive(Debug)] -struct PeerInfo { +struct PeerInfoInternal { addr: SocketAddr, state: PeerConnState, last_seen: Option, @@ -54,40 +55,49 @@ struct PeerInfo { } #[derive(Copy, Clone, Debug)] -pub struct PeerInfoPub { +pub struct PeerInfo { + /// The node's identifier (its public key) pub id: NodeID, + /// The node's network address pub addr: SocketAddr, + /// The current status of our connection to this node pub state: PeerConnState, + /// The last time at which the node was seen pub last_seen: Option, + /// The average ping to this node on recent observations (if at least one ping value is known) pub avg_ping: Option, + /// The maximum observed ping to this node on recent observations (if at least one + /// ping value is known) pub max_ping: Option, + /// The median ping to this node on recent observations (if at least one ping value + /// is known) pub med_ping: Option, } -// PeerConnState: possible states for our tentative connections to given peer -// This module is only interested in recording connection info for outgoing -// TCP connections +/// PeerConnState: possible states for our tentative connections to given peer +/// This structure is only interested in recording connection info for outgoing +/// TCP connections #[derive(Copy, Clone, Debug, PartialEq)] pub enum PeerConnState { - // This entry represents ourself + /// This entry represents ourself (the local node) Ourself, - // We currently have a connection to this peer + /// We currently have a connection to this peer Connected, - // Our next connection tentative (the nth, where n is the first value) - // will be at given Instant + /// Our next connection tentative (the nth, where n is the first value of the tuple) + /// will be at given Instant Waiting(usize, Instant), - // A connection tentative is in progress + /// A connection tentative is in progress (the nth, where n is the value stored) Trying(usize), - // We abandonned trying to connect to this peer (too many failed attempts) + /// We abandonned trying to connect to this peer (too many failed attempts) Abandonned, } struct KnownHosts { - list: HashMap, + list: HashMap, hash: hash::Digest, } @@ -100,7 +110,7 @@ impl KnownHosts { fn update_hash(&mut self) { self.hash = Self::calculate_hash(&self.list); } - fn map_into_vec(input: &HashMap) -> Vec<(NodeID, SocketAddr)> { + fn map_into_vec(input: &HashMap) -> Vec<(NodeID, SocketAddr)> { let mut list = Vec::with_capacity(input.len()); for (id, peer) in input.iter() { if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { @@ -109,35 +119,43 @@ impl KnownHosts { } list } - fn calculate_hash(input: &HashMap) -> hash::Digest { + fn calculate_hash(input: &HashMap) -> hash::Digest { let mut list = Self::map_into_vec(input); list.sort(); let mut hash_state = hash::State::new(); for (id, addr) in list { hash_state.update(&id[..]); - hash_state.update(&format!("{}", addr).into_bytes()[..]); + hash_state.update(&format!("{}\n", addr).into_bytes()[..]); } hash_state.finalize() } } +/// A "Full Mesh" peering strategy is a peering strategy that tries +/// to establish and maintain a direct connection with all of the +/// known nodes in the network. pub struct FullMeshPeeringStrategy { netapp: Arc, known_hosts: RwLock, - next_ping_id: AtomicU64, + public_peer_list: ArcSwap>, + next_ping_id: AtomicU64, ping_endpoint: Arc>, peer_list_endpoint: Arc>, } impl FullMeshPeeringStrategy { + /// Create a new Full Mesh peering strategy. + /// The strategy will not be run until `.run()` is called and awaited. + /// Once that happens, the peering strategy will try to connect + /// to all of the nodes specified in the bootstrap list. pub fn new(netapp: Arc, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc { let mut known_hosts = KnownHosts::new(); for (id, addr) in bootstrap_list { if id != netapp.id { known_hosts.list.insert( id, - PeerInfo { + PeerInfoInternal { addr, state: PeerConnState::Waiting(0, Instant::now()), last_seen: None, @@ -150,6 +168,7 @@ impl FullMeshPeeringStrategy { let strat = Arc::new(Self { netapp: netapp.clone(), known_hosts: RwLock::new(known_hosts), + public_peer_list: ArcSwap::new(Arc::new(Vec::new())), next_ping_id: AtomicU64::new(42), ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()), peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()), @@ -173,6 +192,8 @@ impl FullMeshPeeringStrategy { strat } + /// Run the full mesh peering strategy. + /// This future exits when the `must_exit` watch becomes true. pub async fn run(self: Arc, must_exit: watch::Receiver) { while !*must_exit.borrow() { // 1. Read current state: get list of connected peers (ping them) @@ -229,6 +250,7 @@ impl FullMeshPeeringStrategy { } } } + self.update_public_peer_list(&known_hosts); } // 4. Sleep before next loop iteration @@ -236,6 +258,48 @@ impl FullMeshPeeringStrategy { } } + /// Returns a list of currently known peers in the network. + pub fn get_peer_list(&self) -> Arc> { + self.public_peer_list.load_full() + } + + // -- internal stuff -- + + fn update_public_peer_list(&self, known_hosts: &KnownHosts) { + let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len()); + for (id, info) in known_hosts.list.iter() { + let mut pings = info.ping.iter().cloned().collect::>(); + pings.sort(); + if !pings.is_empty() { + pub_peer_list.push(PeerInfo { + id: *id, + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: Some( + pings + .iter() + .fold(Duration::from_secs(0), |x, y| x + *y) + .div_f64(pings.len() as f64), + ), + max_ping: pings.last().cloned(), + med_ping: Some(pings[pings.len() / 2]), + }); + } else { + pub_peer_list.push(PeerInfo { + id: *id, + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: None, + max_ping: None, + med_ping: None, + }); + } + } + self.public_peer_list.store(Arc::new(pub_peer_list)); + } + async fn ping(self: Arc, id: NodeID) { let peer_list_hash = self.known_hosts.read().unwrap().hash; let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); @@ -268,6 +332,7 @@ impl FullMeshPeeringStrategy { while host.ping.len() > 10 { host.ping.pop_front(); } + self.update_public_peer_list(&known_hosts); } } if ping_resp.peer_list_hash != peer_list_hash { @@ -299,6 +364,7 @@ impl FullMeshPeeringStrategy { known_hosts.list.insert(*id, self.new_peer(id, *addr)); } } + self.update_public_peer_list(&known_hosts); } async fn try_connect(self: Arc, id: NodeID, addr: SocketAddr) { @@ -317,6 +383,7 @@ impl FullMeshPeeringStrategy { } _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL), }; + self.update_public_peer_list(&known_hosts); } } } @@ -336,6 +403,7 @@ impl FullMeshPeeringStrategy { if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Connected; known_hosts.update_hash(); + self.update_public_peer_list(&known_hosts); } } } @@ -347,53 +415,18 @@ impl FullMeshPeeringStrategy { if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Waiting(0, Instant::now()); known_hosts.update_hash(); + self.update_public_peer_list(&known_hosts); } } } - pub fn get_peer_list(&self) -> Vec { - let known_hosts = self.known_hosts.read().unwrap(); - let mut ret = Vec::with_capacity(known_hosts.list.len()); - for (id, info) in known_hosts.list.iter() { - let mut pings = info.ping.iter().cloned().collect::>(); - pings.sort(); - if !pings.is_empty() { - ret.push(PeerInfoPub { - id: *id, - addr: info.addr, - state: info.state, - last_seen: info.last_seen, - avg_ping: Some( - pings - .iter() - .fold(Duration::from_secs(0), |x, y| x + *y) - .div_f64(pings.len() as f64), - ), - max_ping: pings.last().cloned(), - med_ping: Some(pings[pings.len() / 2]), - }); - } else { - ret.push(PeerInfoPub { - id: *id, - addr: info.addr, - state: info.state, - last_seen: info.last_seen, - avg_ping: None, - max_ping: None, - med_ping: None, - }); - } - } - ret - } - - fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfo { + fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal { let state = if *id == self.netapp.id { PeerConnState::Ourself } else { PeerConnState::Waiting(0, Instant::now()) }; - PeerInfo { + PeerInfoInternal { addr, state, last_seen: None, -- cgit v1.2.3