aboutsummaryrefslogtreecommitdiff
path: root/src/peering/fullmesh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/peering/fullmesh.rs')
-rw-r--r--src/peering/fullmesh.rs143
1 files changed, 88 insertions, 55 deletions
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index 1162048..c521590 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -4,6 +4,7 @@ use std::sync::atomic::{self, AtomicU64};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
+use arc_swap::ArcSwap;
use async_trait::async_trait;
use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};
@@ -46,7 +47,7 @@ impl Message for PeerListMessage {
// -- Algorithm data structures --
#[derive(Debug)]
-struct PeerInfo {
+struct PeerInfoInternal {
addr: SocketAddr,
state: PeerConnState,
last_seen: Option<Instant>,
@@ -54,40 +55,49 @@ struct PeerInfo {
}
#[derive(Copy, Clone, Debug)]
-pub struct PeerInfoPub {
+pub struct PeerInfo {
+ /// The node's identifier (its public key)
pub id: NodeID,
+ /// The node's network address
pub addr: SocketAddr,
+ /// The current status of our connection to this node
pub state: PeerConnState,
+ /// The last time at which the node was seen
pub last_seen: Option<Instant>,
+ /// The average ping to this node on recent observations (if at least one ping value is known)
pub avg_ping: Option<Duration>,
+ /// The maximum observed ping to this node on recent observations (if at least one
+ /// ping value is known)
pub max_ping: Option<Duration>,
+ /// The median ping to this node on recent observations (if at least one ping value
+ /// is known)
pub med_ping: Option<Duration>,
}
-// PeerConnState: possible states for our tentative connections to given peer
-// This module is only interested in recording connection info for outgoing
-// TCP connections
+/// PeerConnState: possible states for our tentative connections to given peer
+/// This structure is only interested in recording connection info for outgoing
+/// TCP connections
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum PeerConnState {
- // This entry represents ourself
+ /// This entry represents ourself (the local node)
Ourself,
- // We currently have a connection to this peer
+ /// We currently have a connection to this peer
Connected,
- // Our next connection tentative (the nth, where n is the first value)
- // will be at given Instant
+ /// Our next connection tentative (the nth, where n is the first value of the tuple)
+ /// will be at given Instant
Waiting(usize, Instant),
- // A connection tentative is in progress
+ /// A connection tentative is in progress (the nth, where n is the value stored)
Trying(usize),
- // We abandonned trying to connect to this peer (too many failed attempts)
+ /// We abandonned trying to connect to this peer (too many failed attempts)
Abandonned,
}
struct KnownHosts {
- list: HashMap<NodeID, PeerInfo>,
+ list: HashMap<NodeID, PeerInfoInternal>,
hash: hash::Digest,
}
@@ -100,7 +110,7 @@ impl KnownHosts {
fn update_hash(&mut self) {
self.hash = Self::calculate_hash(&self.list);
}
- fn map_into_vec(input: &HashMap<NodeID, PeerInfo>) -> Vec<(NodeID, SocketAddr)> {
+ fn map_into_vec(input: &HashMap<NodeID, PeerInfoInternal>) -> Vec<(NodeID, SocketAddr)> {
let mut list = Vec::with_capacity(input.len());
for (id, peer) in input.iter() {
if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
@@ -109,35 +119,43 @@ impl KnownHosts {
}
list
}
- fn calculate_hash(input: &HashMap<NodeID, PeerInfo>) -> hash::Digest {
+ fn calculate_hash(input: &HashMap<NodeID, PeerInfoInternal>) -> hash::Digest {
let mut list = Self::map_into_vec(input);
list.sort();
let mut hash_state = hash::State::new();
for (id, addr) in list {
hash_state.update(&id[..]);
- hash_state.update(&format!("{}", addr).into_bytes()[..]);
+ hash_state.update(&format!("{}\n", addr).into_bytes()[..]);
}
hash_state.finalize()
}
}
+/// A "Full Mesh" peering strategy is a peering strategy that tries
+/// to establish and maintain a direct connection with all of the
+/// known nodes in the network.
pub struct FullMeshPeeringStrategy {
netapp: Arc<NetApp>,
known_hosts: RwLock<KnownHosts>,
- next_ping_id: AtomicU64,
+ public_peer_list: ArcSwap<Vec<PeerInfo>>,
+ next_ping_id: AtomicU64,
ping_endpoint: Arc<Endpoint<PingMessage, Self>>,
peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>,
}
impl FullMeshPeeringStrategy {
+ /// Create a new Full Mesh peering strategy.
+ /// The strategy will not be run until `.run()` is called and awaited.
+ /// Once that happens, the peering strategy will try to connect
+ /// to all of the nodes specified in the bootstrap list.
pub fn new(netapp: Arc<NetApp>, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc<Self> {
let mut known_hosts = KnownHosts::new();
for (id, addr) in bootstrap_list {
if id != netapp.id {
known_hosts.list.insert(
id,
- PeerInfo {
+ PeerInfoInternal {
addr,
state: PeerConnState::Waiting(0, Instant::now()),
last_seen: None,
@@ -150,6 +168,7 @@ impl FullMeshPeeringStrategy {
let strat = Arc::new(Self {
netapp: netapp.clone(),
known_hosts: RwLock::new(known_hosts),
+ public_peer_list: ArcSwap::new(Arc::new(Vec::new())),
next_ping_id: AtomicU64::new(42),
ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()),
peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()),
@@ -173,6 +192,8 @@ impl FullMeshPeeringStrategy {
strat
}
+ /// Run the full mesh peering strategy.
+ /// This future exits when the `must_exit` watch becomes true.
pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
// 1. Read current state: get list of connected peers (ping them)
@@ -229,6 +250,7 @@ impl FullMeshPeeringStrategy {
}
}
}
+ self.update_public_peer_list(&known_hosts);
}
// 4. Sleep before next loop iteration
@@ -236,6 +258,48 @@ impl FullMeshPeeringStrategy {
}
}
+ /// Returns a list of currently known peers in the network.
+ pub fn get_peer_list(&self) -> Arc<Vec<PeerInfo>> {
+ self.public_peer_list.load_full()
+ }
+
+ // -- internal stuff --
+
+ fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
+ let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len());
+ for (id, info) in known_hosts.list.iter() {
+ let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
+ pings.sort();
+ if !pings.is_empty() {
+ pub_peer_list.push(PeerInfo {
+ id: *id,
+ addr: info.addr,
+ state: info.state,
+ last_seen: info.last_seen,
+ avg_ping: Some(
+ pings
+ .iter()
+ .fold(Duration::from_secs(0), |x, y| x + *y)
+ .div_f64(pings.len() as f64),
+ ),
+ max_ping: pings.last().cloned(),
+ med_ping: Some(pings[pings.len() / 2]),
+ });
+ } else {
+ pub_peer_list.push(PeerInfo {
+ id: *id,
+ addr: info.addr,
+ state: info.state,
+ last_seen: info.last_seen,
+ avg_ping: None,
+ max_ping: None,
+ med_ping: None,
+ });
+ }
+ }
+ self.public_peer_list.store(Arc::new(pub_peer_list));
+ }
+
async fn ping(self: Arc<Self>, id: NodeID) {
let peer_list_hash = self.known_hosts.read().unwrap().hash;
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
@@ -268,6 +332,7 @@ impl FullMeshPeeringStrategy {
while host.ping.len() > 10 {
host.ping.pop_front();
}
+ self.update_public_peer_list(&known_hosts);
}
}
if ping_resp.peer_list_hash != peer_list_hash {
@@ -299,6 +364,7 @@ impl FullMeshPeeringStrategy {
known_hosts.list.insert(*id, self.new_peer(id, *addr));
}
}
+ self.update_public_peer_list(&known_hosts);
}
async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
@@ -317,6 +383,7 @@ impl FullMeshPeeringStrategy {
}
_ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL),
};
+ self.update_public_peer_list(&known_hosts);
}
}
}
@@ -336,6 +403,7 @@ impl FullMeshPeeringStrategy {
if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Connected;
known_hosts.update_hash();
+ self.update_public_peer_list(&known_hosts);
}
}
}
@@ -347,53 +415,18 @@ impl FullMeshPeeringStrategy {
if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Waiting(0, Instant::now());
known_hosts.update_hash();
+ self.update_public_peer_list(&known_hosts);
}
}
}
- pub fn get_peer_list(&self) -> Vec<PeerInfoPub> {
- let known_hosts = self.known_hosts.read().unwrap();
- let mut ret = Vec::with_capacity(known_hosts.list.len());
- for (id, info) in known_hosts.list.iter() {
- let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
- pings.sort();
- if !pings.is_empty() {
- ret.push(PeerInfoPub {
- id: *id,
- addr: info.addr,
- state: info.state,
- last_seen: info.last_seen,
- avg_ping: Some(
- pings
- .iter()
- .fold(Duration::from_secs(0), |x, y| x + *y)
- .div_f64(pings.len() as f64),
- ),
- max_ping: pings.last().cloned(),
- med_ping: Some(pings[pings.len() / 2]),
- });
- } else {
- ret.push(PeerInfoPub {
- id: *id,
- addr: info.addr,
- state: info.state,
- last_seen: info.last_seen,
- avg_ping: None,
- max_ping: None,
- med_ping: None,
- });
- }
- }
- ret
- }
-
- fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfo {
+ fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal {
let state = if *id == self.netapp.id {
PeerConnState::Ourself
} else {
PeerConnState::Waiting(0, Instant::now())
};
- PeerInfo {
+ PeerInfoInternal {
addr,
state,
last_seen: None,