aboutsummaryrefslogtreecommitdiff
path: root/src/peering
diff options
context:
space:
mode:
Diffstat (limited to 'src/peering')
-rw-r--r--src/peering/fullmesh.rs64
1 files changed, 40 insertions, 24 deletions
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index 8b1c802..b579654 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -4,12 +4,13 @@ use std::sync::atomic::{self, AtomicU64};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
+use async_trait::async_trait;
use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::hash;
-use crate::message::*;
+use crate::endpoint::*;
use crate::netapp::*;
use crate::proto::*;
use crate::NodeID;
@@ -28,7 +29,6 @@ struct PingMessage {
}
impl Message for PingMessage {
- const KIND: MessageKind = 0x42001000;
type Response = PingMessage;
}
@@ -38,7 +38,6 @@ struct PeerListMessage {
}
impl Message for PeerListMessage {
- const KIND: MessageKind = 0x42001001;
type Response = PeerListMessage;
}
@@ -124,6 +123,9 @@ pub struct FullMeshPeeringStrategy {
netapp: Arc<NetApp>,
known_hosts: RwLock<KnownHosts>,
next_ping_id: AtomicU64,
+
+ ping_endpoint: Arc<Endpoint<PingMessage, Self>>,
+ peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>,
}
impl FullMeshPeeringStrategy {
@@ -147,27 +149,12 @@ impl FullMeshPeeringStrategy {
netapp: netapp.clone(),
known_hosts: RwLock::new(known_hosts),
next_ping_id: AtomicU64::new(42),
+ ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()),
+ peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()),
});
- let strat2 = strat.clone();
- netapp.add_msg_handler::<PingMessage, _, _>(move |from: NodeID, ping: PingMessage| {
- let ping_resp = PingMessage {
- id: ping.id,
- peer_list_hash: strat2.known_hosts.read().unwrap().hash,
- };
- debug!("Ping from {}", hex::encode(&from));
- async move { ping_resp }
- });
-
- let strat2 = strat.clone();
- netapp.add_msg_handler::<PeerListMessage, _, _>(
- move |_from: NodeID, peer_list: PeerListMessage| {
- strat2.handle_peer_list(&peer_list.list[..]);
- let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list);
- let resp = PeerListMessage { list: peer_list };
- async move { resp }
- },
- );
+ strat.ping_endpoint.set_handler(strat.clone());
+ strat.peer_list_endpoint.set_handler(strat.clone());
let strat2 = strat.clone();
netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
@@ -262,7 +249,7 @@ impl FullMeshPeeringStrategy {
hex::encode(id),
ping_time
);
- match self.netapp.request(&id, ping_msg, PRIO_HIGH).await {
+ match self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH).await {
Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
Ok(ping_resp) => {
let resp_time = Instant::now();
@@ -291,7 +278,11 @@ impl FullMeshPeeringStrategy {
async fn exchange_peers(self: Arc<Self>, id: &NodeID) {
let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
let pex_message = PeerListMessage { list: peer_list };
- match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await {
+ match self
+ .peer_list_endpoint
+ .call(id, pex_message, PRIO_BACKGROUND)
+ .await
+ {
Err(e) => warn!("Error doing peer exchange: {}", e),
Ok(resp) => {
self.handle_peer_list(&resp.list[..]);
@@ -408,3 +399,28 @@ impl FullMeshPeeringStrategy {
}
}
}
+
+#[async_trait]
+impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy {
+ async fn handle(self: &Arc<Self>, ping: PingMessage, from: NodeID) -> PingMessage {
+ let ping_resp = PingMessage {
+ id: ping.id,
+ peer_list_hash: self.known_hosts.read().unwrap().hash,
+ };
+ debug!("Ping from {}", hex::encode(&from));
+ ping_resp
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<PeerListMessage> for FullMeshPeeringStrategy {
+ async fn handle(
+ self: &Arc<Self>,
+ peer_list: PeerListMessage,
+ _from: NodeID,
+ ) -> PeerListMessage {
+ self.handle_peer_list(&peer_list.list[..]);
+ let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
+ PeerListMessage { list: peer_list }
+ }
+}