aboutsummaryrefslogtreecommitdiff
path: root/src/peering
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-12-14 11:22:29 +0100
committerAlex Auvolat <alex@adnab.me>2020-12-14 11:22:29 +0100
commite761f2f6efe6ce3fd6080eac85de511a3073959b (patch)
treea658976126b132c9c8361ee135b12bbe41346745 /src/peering
parent3aa46dfe9599105918420a9fa807511e01f483c5 (diff)
parent6742638c81f8bda2ba3e3ab72ec520ab22b314e2 (diff)
downloadnetapp-e761f2f6efe6ce3fd6080eac85de511a3073959b.tar.gz
netapp-e761f2f6efe6ce3fd6080eac85de511a3073959b.zip
Merge branch 'master' of git.deuxfleurs.fr:lx/netapp
Diffstat (limited to 'src/peering')
-rw-r--r--src/peering/fullmesh.rs92
1 files changed, 39 insertions, 53 deletions
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index d6ca08a..a4b9248 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -8,11 +8,11 @@ use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::hash;
-use sodiumoxide::crypto::sign::ed25519;
use crate::message::*;
use crate::netapp::*;
use crate::proto::*;
+use crate::NodeID;
const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const CONN_MAX_RETRIES: usize = 10;
@@ -34,7 +34,7 @@ impl Message for PingMessage {
#[derive(Serialize, Deserialize)]
struct PeerListMessage {
- pub list: Vec<(ed25519::PublicKey, SocketAddr)>,
+ pub list: Vec<(NodeID, SocketAddr)>,
}
impl Message for PeerListMessage {
@@ -54,7 +54,7 @@ struct PeerInfo {
#[derive(Copy, Clone, Debug)]
pub struct PeerInfoPub {
- pub id: ed25519::PublicKey,
+ pub id: NodeID,
pub addr: SocketAddr,
pub state: PeerConnState,
pub last_seen: Option<Instant>,
@@ -86,7 +86,7 @@ pub enum PeerConnState {
}
struct KnownHosts {
- list: HashMap<ed25519::PublicKey, PeerInfo>,
+ list: HashMap<NodeID, PeerInfo>,
hash: hash::Digest,
}
@@ -99,9 +99,7 @@ impl KnownHosts {
fn update_hash(&mut self) {
self.hash = Self::calculate_hash(&self.list);
}
- fn map_into_vec(
- input: &HashMap<ed25519::PublicKey, PeerInfo>,
- ) -> Vec<(ed25519::PublicKey, SocketAddr)> {
+ fn map_into_vec(input: &HashMap<NodeID, PeerInfo>) -> Vec<(NodeID, SocketAddr)> {
let mut list = Vec::with_capacity(input.len());
for (id, peer) in input.iter() {
if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
@@ -110,7 +108,7 @@ impl KnownHosts {
}
list
}
- fn calculate_hash(input: &HashMap<ed25519::PublicKey, PeerInfo>) -> hash::Digest {
+ fn calculate_hash(input: &HashMap<NodeID, PeerInfo>) -> hash::Digest {
let mut list = Self::map_into_vec(input);
list.sort();
let mut hash_state = hash::State::new();
@@ -129,15 +127,12 @@ pub struct FullMeshPeeringStrategy {
}
impl FullMeshPeeringStrategy {
- pub fn new(
- netapp: Arc<NetApp>,
- bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>,
- ) -> Arc<Self> {
+ pub fn new(netapp: Arc<NetApp>, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc<Self> {
let mut known_hosts = KnownHosts::new();
- for (pk, addr) in bootstrap_list {
- if pk != netapp.pubkey {
+ for (id, addr) in bootstrap_list {
+ if id != netapp.id {
known_hosts.list.insert(
- pk,
+ id,
PeerInfo {
addr: addr,
state: PeerConnState::Waiting(0, Instant::now()),
@@ -155,20 +150,18 @@ impl FullMeshPeeringStrategy {
});
let strat2 = strat.clone();
- netapp.add_msg_handler::<PingMessage, _, _>(
- move |from: ed25519::PublicKey, ping: PingMessage| {
- let ping_resp = PingMessage {
- id: ping.id,
- peer_list_hash: strat2.known_hosts.read().unwrap().hash,
- };
- debug!("Ping from {}", hex::encode(&from));
- async move { ping_resp }
- },
- );
+ netapp.add_msg_handler::<PingMessage, _, _>(move |from: NodeID, ping: PingMessage| {
+ let ping_resp = PingMessage {
+ id: ping.id,
+ peer_list_hash: strat2.known_hosts.read().unwrap().hash,
+ };
+ debug!("Ping from {}", hex::encode(&from));
+ async move { ping_resp }
+ });
let strat2 = strat.clone();
netapp.add_msg_handler::<PeerListMessage, _, _>(
- move |_from: ed25519::PublicKey, peer_list: PeerListMessage| {
+ move |_from: NodeID, peer_list: PeerListMessage| {
strat2.handle_peer_list(&peer_list.list[..]);
let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list);
let resp = PeerListMessage { list: peer_list };
@@ -177,17 +170,15 @@ impl FullMeshPeeringStrategy {
);
let strat2 = strat.clone();
- netapp.on_connected(
- move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
- let strat2 = strat2.clone();
- tokio::spawn(strat2.on_connected(pk, addr, is_incoming));
- },
- );
+ netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
+ let strat2 = strat2.clone();
+ tokio::spawn(strat2.on_connected(id, addr, is_incoming));
+ });
let strat2 = strat.clone();
- netapp.on_disconnected(move |pk: ed25519::PublicKey, is_incoming: bool| {
+ netapp.on_disconnected(move |id: NodeID, is_incoming: bool| {
let strat2 = strat2.clone();
- tokio::spawn(strat2.on_disconnected(pk, is_incoming));
+ tokio::spawn(strat2.on_disconnected(id, is_incoming));
});
strat
@@ -254,7 +245,7 @@ impl FullMeshPeeringStrategy {
}
}
- async fn ping(self: Arc<Self>, id: ed25519::PublicKey) {
+ async fn ping(self: Arc<Self>, id: NodeID) {
let peer_list_hash = self.known_hosts.read().unwrap().hash;
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
let ping_time = Instant::now();
@@ -295,7 +286,7 @@ impl FullMeshPeeringStrategy {
}
}
- async fn exchange_peers(self: Arc<Self>, id: &ed25519::PublicKey) {
+ async fn exchange_peers(self: Arc<Self>, id: &NodeID) {
let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
let pex_message = PeerListMessage { list: peer_list };
match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await {
@@ -306,7 +297,7 @@ impl FullMeshPeeringStrategy {
}
}
- fn handle_peer_list(&self, list: &[(ed25519::PublicKey, SocketAddr)]) {
+ fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) {
let mut known_hosts = self.known_hosts.write().unwrap();
for (id, addr) in list.iter() {
if !known_hosts.list.contains_key(id) {
@@ -315,7 +306,7 @@ impl FullMeshPeeringStrategy {
}
}
- async fn try_connect(self: Arc<Self>, id: ed25519::PublicKey, addr: SocketAddr) {
+ async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await;
if let Err(e) = conn_result {
warn!("Error connecting to {}: {}", hex::encode(id), e);
@@ -335,35 +326,30 @@ impl FullMeshPeeringStrategy {
}
}
- async fn on_connected(
- self: Arc<Self>,
- pk: ed25519::PublicKey,
- addr: SocketAddr,
- is_incoming: bool,
- ) {
+ async fn on_connected(self: Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
if is_incoming {
- if !self.known_hosts.read().unwrap().list.contains_key(&pk) {
+ if !self.known_hosts.read().unwrap().list.contains_key(&id) {
self.known_hosts
.write()
.unwrap()
.list
- .insert(pk, self.new_peer(&pk, addr));
+ .insert(id, self.new_peer(&id, addr));
}
} else {
- info!("Successfully connected to {} at {}", hex::encode(&pk), addr);
+ info!("Successfully connected to {} at {}", hex::encode(&id), addr);
let mut known_hosts = self.known_hosts.write().unwrap();
- if let Some(host) = known_hosts.list.get_mut(&pk) {
+ if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Connected;
known_hosts.update_hash();
}
}
}
- async fn on_disconnected(self: Arc<Self>, pk: ed25519::PublicKey, is_incoming: bool) {
+ async fn on_disconnected(self: Arc<Self>, id: NodeID, is_incoming: bool) {
if !is_incoming {
- info!("Connection to {} was closed", hex::encode(pk));
+ info!("Connection to {} was closed", hex::encode(id));
let mut known_hosts = self.known_hosts.write().unwrap();
- if let Some(host) = known_hosts.list.get_mut(&pk) {
+ if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Waiting(0, Instant::now());
known_hosts.update_hash();
}
@@ -406,8 +392,8 @@ impl FullMeshPeeringStrategy {
ret
}
- fn new_peer(&self, id: &ed25519::PublicKey, addr: SocketAddr) -> PeerInfo {
- let state = if *id == self.netapp.pubkey {
+ fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfo {
+ let state = if *id == self.netapp.id {
PeerConnState::Ourself
} else {
PeerConnState::Waiting(0, Instant::now())