From 6742638c81f8bda2ba3e3ab72ec520ab22b314e2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 12 Dec 2020 21:14:15 +0100 Subject: Replace pk,pubkey,PublicKey by id,NodeID,etc --- src/peering/basalt.rs | 60 +++++++++++++++----------------- src/peering/fullmesh.rs | 92 +++++++++++++++++++++---------------------------- 2 files changed, 66 insertions(+), 86 deletions(-) (limited to 'src/peering') diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 4d56319..3c1fc9e 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -9,11 +9,11 @@ use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::hash; -use sodiumoxide::crypto::sign::ed25519; use crate::message::*; use crate::netapp::*; use crate::proto::*; +use crate::NodeID; // -- Protocol messages -- @@ -41,7 +41,7 @@ type Seed = [u8; 32]; #[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] struct Peer { - id: ed25519::PublicKey, + id: NodeID, addr: SocketAddr, } @@ -168,7 +168,7 @@ impl BasaltView { } } - fn disconnected(&mut self, id: ed25519::PublicKey) { + fn disconnected(&mut self, id: NodeID) { let mut cleared_slots = vec![]; for i in 0..self.slots.len() { if let Some(p) = self.slots[i].peer { @@ -248,7 +248,7 @@ pub struct Basalt { impl Basalt { pub fn new( netapp: Arc, - bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>, + bootstrap_list: Vec<(NodeID, SocketAddr)>, param: BasaltParams, ) -> Arc { let bootstrap_peers = bootstrap_list @@ -272,37 +272,31 @@ impl Basalt { }); let basalt2 = basalt.clone(); - netapp.on_connected( - move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { - basalt2.on_connected(pk, addr, is_incoming); - }, - ); + netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { + basalt2.on_connected(id, addr, is_incoming); + }); let basalt2 = basalt.clone(); - netapp.on_disconnected(move |pk: ed25519::PublicKey, is_incoming: bool| { - basalt2.on_disconnected(pk, is_incoming); + netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { + basalt2.on_disconnected(id, is_incoming); }); let basalt2 = basalt.clone(); - netapp.add_msg_handler::( - move |_from: ed25519::PublicKey, _pullmsg: PullMessage| { - let push_msg = basalt2.make_push_message(); - async move { push_msg } - }, - ); + netapp.add_msg_handler::(move |_from: NodeID, _pullmsg: PullMessage| { + let push_msg = basalt2.make_push_message(); + async move { push_msg } + }); let basalt2 = basalt.clone(); - netapp.add_msg_handler::( - move |_from: ed25519::PublicKey, push_msg: PushMessage| { - basalt2.handle_peer_list(&push_msg.peers[..]); - async move { () } - }, - ); + netapp.add_msg_handler::(move |_from: NodeID, push_msg: PushMessage| { + basalt2.handle_peer_list(&push_msg.peers[..]); + async move { () } + }); basalt } - pub fn sample(&self, count: usize) -> Vec { + pub fn sample(&self, count: usize) -> Vec { self.view .read() .unwrap() @@ -337,7 +331,7 @@ impl Basalt { } } - async fn do_pull(self: Arc, peer: ed25519::PublicKey) { + async fn do_pull(self: Arc, peer: NodeID) { match self .netapp .request(&peer, PullMessage {}, PRIO_NORMAL) @@ -353,7 +347,7 @@ impl Basalt { }; } - async fn do_push(self: Arc, peer: ed25519::PublicKey) { + async fn do_push(self: Arc, peer: NodeID) { let push_msg = self.make_push_message(); match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { Ok(_) => { @@ -434,12 +428,12 @@ impl Basalt { } } - fn on_connected(self: &Arc, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) { + fn on_connected(self: &Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { if is_incoming { - self.handle_peer_list(&[Peer { id: pk, addr }][..]); + self.handle_peer_list(&[Peer { id, addr }][..]); } else { - info!("KYEV C {} {}", hex::encode(pk), addr); - let peer = Peer { id: pk, addr }; + info!("KYEV C {} {}", hex::encode(id), addr); + let peer = Peer { id, addr }; let mut backlog = self.backlog.write().unwrap(); if backlog.get(&peer).is_none() { @@ -459,10 +453,10 @@ impl Basalt { } } - fn on_disconnected(&self, pk: ed25519::PublicKey, is_incoming: bool) { + fn on_disconnected(&self, id: NodeID, is_incoming: bool) { if !is_incoming { - info!("KYEV D {}", hex::encode(pk)); - self.view.write().unwrap().disconnected(pk); + info!("KYEV D {}", hex::encode(id)); + self.view.write().unwrap().disconnected(id); } } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index d6ca08a..a4b9248 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -8,11 +8,11 @@ use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::hash; -use sodiumoxide::crypto::sign::ed25519; use crate::message::*; use crate::netapp::*; use crate::proto::*; +use crate::NodeID; const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); const CONN_MAX_RETRIES: usize = 10; @@ -34,7 +34,7 @@ impl Message for PingMessage { #[derive(Serialize, Deserialize)] struct PeerListMessage { - pub list: Vec<(ed25519::PublicKey, SocketAddr)>, + pub list: Vec<(NodeID, SocketAddr)>, } impl Message for PeerListMessage { @@ -54,7 +54,7 @@ struct PeerInfo { #[derive(Copy, Clone, Debug)] pub struct PeerInfoPub { - pub id: ed25519::PublicKey, + pub id: NodeID, pub addr: SocketAddr, pub state: PeerConnState, pub last_seen: Option, @@ -86,7 +86,7 @@ pub enum PeerConnState { } struct KnownHosts { - list: HashMap, + list: HashMap, hash: hash::Digest, } @@ -99,9 +99,7 @@ impl KnownHosts { fn update_hash(&mut self) { self.hash = Self::calculate_hash(&self.list); } - fn map_into_vec( - input: &HashMap, - ) -> Vec<(ed25519::PublicKey, SocketAddr)> { + fn map_into_vec(input: &HashMap) -> Vec<(NodeID, SocketAddr)> { let mut list = Vec::with_capacity(input.len()); for (id, peer) in input.iter() { if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { @@ -110,7 +108,7 @@ impl KnownHosts { } list } - fn calculate_hash(input: &HashMap) -> hash::Digest { + fn calculate_hash(input: &HashMap) -> hash::Digest { let mut list = Self::map_into_vec(input); list.sort(); let mut hash_state = hash::State::new(); @@ -129,15 +127,12 @@ pub struct FullMeshPeeringStrategy { } impl FullMeshPeeringStrategy { - pub fn new( - netapp: Arc, - bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>, - ) -> Arc { + pub fn new(netapp: Arc, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc { let mut known_hosts = KnownHosts::new(); - for (pk, addr) in bootstrap_list { - if pk != netapp.pubkey { + for (id, addr) in bootstrap_list { + if id != netapp.id { known_hosts.list.insert( - pk, + id, PeerInfo { addr: addr, state: PeerConnState::Waiting(0, Instant::now()), @@ -155,20 +150,18 @@ impl FullMeshPeeringStrategy { }); let strat2 = strat.clone(); - netapp.add_msg_handler::( - move |from: ed25519::PublicKey, ping: PingMessage| { - let ping_resp = PingMessage { - id: ping.id, - peer_list_hash: strat2.known_hosts.read().unwrap().hash, - }; - debug!("Ping from {}", hex::encode(&from)); - async move { ping_resp } - }, - ); + netapp.add_msg_handler::(move |from: NodeID, ping: PingMessage| { + let ping_resp = PingMessage { + id: ping.id, + peer_list_hash: strat2.known_hosts.read().unwrap().hash, + }; + debug!("Ping from {}", hex::encode(&from)); + async move { ping_resp } + }); let strat2 = strat.clone(); netapp.add_msg_handler::( - move |_from: ed25519::PublicKey, peer_list: PeerListMessage| { + move |_from: NodeID, peer_list: PeerListMessage| { strat2.handle_peer_list(&peer_list.list[..]); let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list); let resp = PeerListMessage { list: peer_list }; @@ -177,17 +170,15 @@ impl FullMeshPeeringStrategy { ); let strat2 = strat.clone(); - netapp.on_connected( - move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { - let strat2 = strat2.clone(); - tokio::spawn(strat2.on_connected(pk, addr, is_incoming)); - }, - ); + netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { + let strat2 = strat2.clone(); + tokio::spawn(strat2.on_connected(id, addr, is_incoming)); + }); let strat2 = strat.clone(); - netapp.on_disconnected(move |pk: ed25519::PublicKey, is_incoming: bool| { + netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { let strat2 = strat2.clone(); - tokio::spawn(strat2.on_disconnected(pk, is_incoming)); + tokio::spawn(strat2.on_disconnected(id, is_incoming)); }); strat @@ -254,7 +245,7 @@ impl FullMeshPeeringStrategy { } } - async fn ping(self: Arc, id: ed25519::PublicKey) { + async fn ping(self: Arc, id: NodeID) { let peer_list_hash = self.known_hosts.read().unwrap().hash; let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); let ping_time = Instant::now(); @@ -295,7 +286,7 @@ impl FullMeshPeeringStrategy { } } - async fn exchange_peers(self: Arc, id: &ed25519::PublicKey) { + async fn exchange_peers(self: Arc, id: &NodeID) { let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let pex_message = PeerListMessage { list: peer_list }; match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await { @@ -306,7 +297,7 @@ impl FullMeshPeeringStrategy { } } - fn handle_peer_list(&self, list: &[(ed25519::PublicKey, SocketAddr)]) { + fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) { let mut known_hosts = self.known_hosts.write().unwrap(); for (id, addr) in list.iter() { if !known_hosts.list.contains_key(id) { @@ -315,7 +306,7 @@ impl FullMeshPeeringStrategy { } } - async fn try_connect(self: Arc, id: ed25519::PublicKey, addr: SocketAddr) { + async fn try_connect(self: Arc, id: NodeID, addr: SocketAddr) { let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await; if let Err(e) = conn_result { warn!("Error connecting to {}: {}", hex::encode(id), e); @@ -335,35 +326,30 @@ impl FullMeshPeeringStrategy { } } - async fn on_connected( - self: Arc, - pk: ed25519::PublicKey, - addr: SocketAddr, - is_incoming: bool, - ) { + async fn on_connected(self: Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { if is_incoming { - if !self.known_hosts.read().unwrap().list.contains_key(&pk) { + if !self.known_hosts.read().unwrap().list.contains_key(&id) { self.known_hosts .write() .unwrap() .list - .insert(pk, self.new_peer(&pk, addr)); + .insert(id, self.new_peer(&id, addr)); } } else { - info!("Successfully connected to {} at {}", hex::encode(&pk), addr); + info!("Successfully connected to {} at {}", hex::encode(&id), addr); let mut known_hosts = self.known_hosts.write().unwrap(); - if let Some(host) = known_hosts.list.get_mut(&pk) { + if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Connected; known_hosts.update_hash(); } } } - async fn on_disconnected(self: Arc, pk: ed25519::PublicKey, is_incoming: bool) { + async fn on_disconnected(self: Arc, id: NodeID, is_incoming: bool) { if !is_incoming { - info!("Connection to {} was closed", hex::encode(pk)); + info!("Connection to {} was closed", hex::encode(id)); let mut known_hosts = self.known_hosts.write().unwrap(); - if let Some(host) = known_hosts.list.get_mut(&pk) { + if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Waiting(0, Instant::now()); known_hosts.update_hash(); } @@ -406,8 +392,8 @@ impl FullMeshPeeringStrategy { ret } - fn new_peer(&self, id: &ed25519::PublicKey, addr: SocketAddr) -> PeerInfo { - let state = if *id == self.netapp.pubkey { + fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfo { + let state = if *id == self.netapp.id { PeerConnState::Ourself } else { PeerConnState::Waiting(0, Instant::now()) -- cgit v1.2.3