diff options
-rw-r--r-- | src/endpoint.rs | 39 | ||||
-rw-r--r-- | src/netapp.rs | 17 | ||||
-rw-r--r-- | src/peering/fullmesh.rs | 143 | ||||
-rw-r--r-- | src/util.rs | 4 |
4 files changed, 141 insertions, 62 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs index 0e1f5c8..adb3532 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -12,13 +12,15 @@ use crate::proto::*; use crate::util::*; /// This trait should be implemented by all messages your application -/// wants to handle (click to read more). +/// wants to handle pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync { type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync; } -pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>; - +/// This trait should be implemented by an object of your application +/// that can handle a message of type `M`. +/// +/// The handler object should be in an Arc, see `Endpoint::set_handler` #[async_trait] pub trait EndpointHandler<M>: Send + Sync where @@ -27,6 +29,27 @@ where async fn handle(self: &Arc<Self>, m: M, from: NodeID) -> M::Response; } +/// If one simply wants to use an endpoint in a client fashion, +/// without locally serving requests to that endpoint, +/// use the unit type `()` as the handler type: +/// it will panic if it is ever made to handle request. +#[async_trait] +impl<M: Message + 'static> EndpointHandler<M> for () { + async fn handle(self: &Arc<()>, _m: M, _from: NodeID) -> M::Response { + panic!("This endpoint should not have a local handler."); + } +} + +/// This struct represents an endpoint for message of type `M`. +/// +/// Creating a new endpoint is done by calling `NetApp::endpoint`. +/// An endpoint is identified primarily by its path, which is specified +/// at creation time. +/// +/// An `Endpoint` is used both to send requests to remote nodes, +/// and to specify the handler for such requests on the local node. +/// The type `H` represents the type of the handler object for +/// endpoint messages (see `EndpointHandler`). pub struct Endpoint<M, H> where M: Message, @@ -51,9 +74,15 @@ where handler: ArcSwapOption::from(None), } } + + /// Set the object that is responsible of handling requests to + /// this endpoint on the local node. pub fn set_handler(&self, h: Arc<H>) { self.handler.swap(Some(h)); } + + /// Call this endpoint on a remote node (or on the local node, + /// for that matter) pub async fn call( &self, target: &NodeID, @@ -84,6 +113,10 @@ where } } +// ---- Internal stuff ---- + +pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>; + #[async_trait] pub(crate) trait GenericEndpoint { async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error>; diff --git a/src/netapp.rs b/src/netapp.rs index bffa0e1..7fe1e71 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -125,21 +125,30 @@ impl NetApp { .store(Some(Arc::new(Box::new(handler)))); } - pub fn endpoint<M, H>(self: &Arc<Self>, name: String) -> Arc<Endpoint<M, H>> + /// Create a new endpoint with path `path`, + /// that handles messages of type `M`. + /// `H` is the type of the object that should handle requests + /// to this endpoint on the local node. If you don't want + /// to handle request on the local node (e.g. if this node + /// is only a client in the network), define the type `H` + /// to be `()`. + /// This function will panic if the endpoint has already been + /// created. + pub fn endpoint<M, H>(self: &Arc<Self>, path: String) -> Arc<Endpoint<M, H>> where M: Message + 'static, H: EndpointHandler<M> + 'static, { - let endpoint = Arc::new(Endpoint::<M, H>::new(self.clone(), name.clone())); + let endpoint = Arc::new(Endpoint::<M, H>::new(self.clone(), path.clone())); let endpoint_arc = EndpointArc(endpoint.clone()); if self .endpoints .write() .unwrap() - .insert(name.clone(), Box::new(endpoint_arc)) + .insert(path.clone(), Box::new(endpoint_arc)) .is_some() { - panic!("Redefining endpoint: {}", name); + panic!("Redefining endpoint: {}", path); }; endpoint } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 1162048..c521590 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -4,6 +4,7 @@ use std::sync::atomic::{self, AtomicU64}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; +use arc_swap::ArcSwap; use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; @@ -46,7 +47,7 @@ impl Message for PeerListMessage { // -- Algorithm data structures -- #[derive(Debug)] -struct PeerInfo { +struct PeerInfoInternal { addr: SocketAddr, state: PeerConnState, last_seen: Option<Instant>, @@ -54,40 +55,49 @@ struct PeerInfo { } #[derive(Copy, Clone, Debug)] -pub struct PeerInfoPub { +pub struct PeerInfo { + /// The node's identifier (its public key) pub id: NodeID, + /// The node's network address pub addr: SocketAddr, + /// The current status of our connection to this node pub state: PeerConnState, + /// The last time at which the node was seen pub last_seen: Option<Instant>, + /// The average ping to this node on recent observations (if at least one ping value is known) pub avg_ping: Option<Duration>, + /// The maximum observed ping to this node on recent observations (if at least one + /// ping value is known) pub max_ping: Option<Duration>, + /// The median ping to this node on recent observations (if at least one ping value + /// is known) pub med_ping: Option<Duration>, } -// PeerConnState: possible states for our tentative connections to given peer -// This module is only interested in recording connection info for outgoing -// TCP connections +/// PeerConnState: possible states for our tentative connections to given peer +/// This structure is only interested in recording connection info for outgoing +/// TCP connections #[derive(Copy, Clone, Debug, PartialEq)] pub enum PeerConnState { - // This entry represents ourself + /// This entry represents ourself (the local node) Ourself, - // We currently have a connection to this peer + /// We currently have a connection to this peer Connected, - // Our next connection tentative (the nth, where n is the first value) - // will be at given Instant + /// Our next connection tentative (the nth, where n is the first value of the tuple) + /// will be at given Instant Waiting(usize, Instant), - // A connection tentative is in progress + /// A connection tentative is in progress (the nth, where n is the value stored) Trying(usize), - // We abandonned trying to connect to this peer (too many failed attempts) + /// We abandonned trying to connect to this peer (too many failed attempts) Abandonned, } struct KnownHosts { - list: HashMap<NodeID, PeerInfo>, + list: HashMap<NodeID, PeerInfoInternal>, hash: hash::Digest, } @@ -100,7 +110,7 @@ impl KnownHosts { fn update_hash(&mut self) { self.hash = Self::calculate_hash(&self.list); } - fn map_into_vec(input: &HashMap<NodeID, PeerInfo>) -> Vec<(NodeID, SocketAddr)> { + fn map_into_vec(input: &HashMap<NodeID, PeerInfoInternal>) -> Vec<(NodeID, SocketAddr)> { let mut list = Vec::with_capacity(input.len()); for (id, peer) in input.iter() { if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { @@ -109,35 +119,43 @@ impl KnownHosts { } list } - fn calculate_hash(input: &HashMap<NodeID, PeerInfo>) -> hash::Digest { + fn calculate_hash(input: &HashMap<NodeID, PeerInfoInternal>) -> hash::Digest { let mut list = Self::map_into_vec(input); list.sort(); let mut hash_state = hash::State::new(); for (id, addr) in list { hash_state.update(&id[..]); - hash_state.update(&format!("{}", addr).into_bytes()[..]); + hash_state.update(&format!("{}\n", addr).into_bytes()[..]); } hash_state.finalize() } } +/// A "Full Mesh" peering strategy is a peering strategy that tries +/// to establish and maintain a direct connection with all of the +/// known nodes in the network. pub struct FullMeshPeeringStrategy { netapp: Arc<NetApp>, known_hosts: RwLock<KnownHosts>, - next_ping_id: AtomicU64, + public_peer_list: ArcSwap<Vec<PeerInfo>>, + next_ping_id: AtomicU64, ping_endpoint: Arc<Endpoint<PingMessage, Self>>, peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>, } impl FullMeshPeeringStrategy { + /// Create a new Full Mesh peering strategy. + /// The strategy will not be run until `.run()` is called and awaited. + /// Once that happens, the peering strategy will try to connect + /// to all of the nodes specified in the bootstrap list. pub fn new(netapp: Arc<NetApp>, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc<Self> { let mut known_hosts = KnownHosts::new(); for (id, addr) in bootstrap_list { if id != netapp.id { known_hosts.list.insert( id, - PeerInfo { + PeerInfoInternal { addr, state: PeerConnState::Waiting(0, Instant::now()), last_seen: None, @@ -150,6 +168,7 @@ impl FullMeshPeeringStrategy { let strat = Arc::new(Self { netapp: netapp.clone(), known_hosts: RwLock::new(known_hosts), + public_peer_list: ArcSwap::new(Arc::new(Vec::new())), next_ping_id: AtomicU64::new(42), ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()), peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()), @@ -173,6 +192,8 @@ impl FullMeshPeeringStrategy { strat } + /// Run the full mesh peering strategy. + /// This future exits when the `must_exit` watch becomes true. pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { // 1. Read current state: get list of connected peers (ping them) @@ -229,6 +250,7 @@ impl FullMeshPeeringStrategy { } } } + self.update_public_peer_list(&known_hosts); } // 4. Sleep before next loop iteration @@ -236,6 +258,48 @@ impl FullMeshPeeringStrategy { } } + /// Returns a list of currently known peers in the network. + pub fn get_peer_list(&self) -> Arc<Vec<PeerInfo>> { + self.public_peer_list.load_full() + } + + // -- internal stuff -- + + fn update_public_peer_list(&self, known_hosts: &KnownHosts) { + let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len()); + for (id, info) in known_hosts.list.iter() { + let mut pings = info.ping.iter().cloned().collect::<Vec<_>>(); + pings.sort(); + if !pings.is_empty() { + pub_peer_list.push(PeerInfo { + id: *id, + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: Some( + pings + .iter() + .fold(Duration::from_secs(0), |x, y| x + *y) + .div_f64(pings.len() as f64), + ), + max_ping: pings.last().cloned(), + med_ping: Some(pings[pings.len() / 2]), + }); + } else { + pub_peer_list.push(PeerInfo { + id: *id, + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: None, + max_ping: None, + med_ping: None, + }); + } + } + self.public_peer_list.store(Arc::new(pub_peer_list)); + } + async fn ping(self: Arc<Self>, id: NodeID) { let peer_list_hash = self.known_hosts.read().unwrap().hash; let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); @@ -268,6 +332,7 @@ impl FullMeshPeeringStrategy { while host.ping.len() > 10 { host.ping.pop_front(); } + self.update_public_peer_list(&known_hosts); } } if ping_resp.peer_list_hash != peer_list_hash { @@ -299,6 +364,7 @@ impl FullMeshPeeringStrategy { known_hosts.list.insert(*id, self.new_peer(id, *addr)); } } + self.update_public_peer_list(&known_hosts); } async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) { @@ -317,6 +383,7 @@ impl FullMeshPeeringStrategy { } _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL), }; + self.update_public_peer_list(&known_hosts); } } } @@ -336,6 +403,7 @@ impl FullMeshPeeringStrategy { if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Connected; known_hosts.update_hash(); + self.update_public_peer_list(&known_hosts); } } } @@ -347,53 +415,18 @@ impl FullMeshPeeringStrategy { if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Waiting(0, Instant::now()); known_hosts.update_hash(); + self.update_public_peer_list(&known_hosts); } } } - pub fn get_peer_list(&self) -> Vec<PeerInfoPub> { - let known_hosts = self.known_hosts.read().unwrap(); - let mut ret = Vec::with_capacity(known_hosts.list.len()); - for (id, info) in known_hosts.list.iter() { - let mut pings = info.ping.iter().cloned().collect::<Vec<_>>(); - pings.sort(); - if !pings.is_empty() { - ret.push(PeerInfoPub { - id: *id, - addr: info.addr, - state: info.state, - last_seen: info.last_seen, - avg_ping: Some( - pings - .iter() - .fold(Duration::from_secs(0), |x, y| x + *y) - .div_f64(pings.len() as f64), - ), - max_ping: pings.last().cloned(), - med_ping: Some(pings[pings.len() / 2]), - }); - } else { - ret.push(PeerInfoPub { - id: *id, - addr: info.addr, - state: info.state, - last_seen: info.last_seen, - avg_ping: None, - max_ping: None, - med_ping: None, - }); - } - } - ret - } - - fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfo { + fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal { let state = if *id == self.netapp.id { PeerConnState::Ourself } else { PeerConnState::Waiting(0, Instant::now()) }; - PeerInfo { + PeerInfoInternal { addr, state, last_seen: None, diff --git a/src/util.rs b/src/util.rs index a87f779..bf8f4cb 100644 --- a/src/util.rs +++ b/src/util.rs @@ -4,6 +4,7 @@ use log::info; use tokio::sync::watch; +/// A node's identifier, which is also its public cryptographic key pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey; /// Utility function: encodes any serializable value in MessagePack binary format @@ -25,6 +26,7 @@ where /// This async function returns only when a true signal was received /// from a watcher that tells us when to exit. +/// /// Usefull in a select statement to interrupt another /// future: /// ```ignore @@ -41,6 +43,8 @@ pub async fn await_exit(mut must_exit: watch::Receiver<bool>) { } } +/// Creates a watch that contains `false`, and that changes +/// to `true` when a Ctrl+C signal is received. pub fn watch_ctrl_c() -> watch::Receiver<bool> { let (send_cancel, watch_cancel) = watch::channel(false); tokio::spawn(async move { |