diff options
Diffstat (limited to 'src/peering')
-rw-r--r-- | src/peering/fullmesh.rs | 64 |
1 files changed, 40 insertions, 24 deletions
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 8b1c802..b579654 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -4,12 +4,13 @@ use std::sync::atomic::{self, AtomicU64}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; +use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::hash; -use crate::message::*; +use crate::endpoint::*; use crate::netapp::*; use crate::proto::*; use crate::NodeID; @@ -28,7 +29,6 @@ struct PingMessage { } impl Message for PingMessage { - const KIND: MessageKind = 0x42001000; type Response = PingMessage; } @@ -38,7 +38,6 @@ struct PeerListMessage { } impl Message for PeerListMessage { - const KIND: MessageKind = 0x42001001; type Response = PeerListMessage; } @@ -124,6 +123,9 @@ pub struct FullMeshPeeringStrategy { netapp: Arc<NetApp>, known_hosts: RwLock<KnownHosts>, next_ping_id: AtomicU64, + + ping_endpoint: Arc<Endpoint<PingMessage, Self>>, + peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>, } impl FullMeshPeeringStrategy { @@ -147,27 +149,12 @@ impl FullMeshPeeringStrategy { netapp: netapp.clone(), known_hosts: RwLock::new(known_hosts), next_ping_id: AtomicU64::new(42), + ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()), + peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()), }); - let strat2 = strat.clone(); - netapp.add_msg_handler::<PingMessage, _, _>(move |from: NodeID, ping: PingMessage| { - let ping_resp = PingMessage { - id: ping.id, - peer_list_hash: strat2.known_hosts.read().unwrap().hash, - }; - debug!("Ping from {}", hex::encode(&from)); - async move { ping_resp } - }); - - let strat2 = strat.clone(); - netapp.add_msg_handler::<PeerListMessage, _, _>( - move |_from: NodeID, peer_list: PeerListMessage| { - strat2.handle_peer_list(&peer_list.list[..]); - let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list); - let resp = PeerListMessage { list: peer_list }; - async move { resp } - }, - ); + strat.ping_endpoint.set_handler(strat.clone()); + strat.peer_list_endpoint.set_handler(strat.clone()); let strat2 = strat.clone(); netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { @@ -262,7 +249,7 @@ impl FullMeshPeeringStrategy { hex::encode(id), ping_time ); - match self.netapp.request(&id, ping_msg, PRIO_HIGH).await { + match self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH).await { Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e), Ok(ping_resp) => { let resp_time = Instant::now(); @@ -291,7 +278,11 @@ impl FullMeshPeeringStrategy { async fn exchange_peers(self: Arc<Self>, id: &NodeID) { let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let pex_message = PeerListMessage { list: peer_list }; - match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await { + match self + .peer_list_endpoint + .call(id, pex_message, PRIO_BACKGROUND) + .await + { Err(e) => warn!("Error doing peer exchange: {}", e), Ok(resp) => { self.handle_peer_list(&resp.list[..]); @@ -408,3 +399,28 @@ impl FullMeshPeeringStrategy { } } } + +#[async_trait] +impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy { + async fn handle(self: &Arc<Self>, ping: PingMessage, from: NodeID) -> PingMessage { + let ping_resp = PingMessage { + id: ping.id, + peer_list_hash: self.known_hosts.read().unwrap().hash, + }; + debug!("Ping from {}", hex::encode(&from)); + ping_resp + } +} + +#[async_trait] +impl EndpointHandler<PeerListMessage> for FullMeshPeeringStrategy { + async fn handle( + self: &Arc<Self>, + peer_list: PeerListMessage, + _from: NodeID, + ) -> PeerListMessage { + self.handle_peer_list(&peer_list.list[..]); + let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); + PeerListMessage { list: peer_list } + } +} |