diff options
Diffstat (limited to 'src/peering/fullmesh.rs')
-rw-r--r-- | src/peering/fullmesh.rs | 437 |
1 files changed, 437 insertions, 0 deletions
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs new file mode 100644 index 0000000..e04beb6 --- /dev/null +++ b/src/peering/fullmesh.rs @@ -0,0 +1,437 @@ +use std::collections::{HashMap, VecDeque}; +use std::net::SocketAddr; +use std::sync::atomic::{self, AtomicU64}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; + +use log::{debug, info, trace, warn}; +use serde::{Deserialize, Serialize}; + +use sodiumoxide::crypto::hash; +use sodiumoxide::crypto::sign::ed25519; + +use crate::conn::*; +use crate::message::*; +use crate::netapp::*; +use crate::proto::*; + +const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); +const CONN_MAX_RETRIES: usize = 10; +const PING_INTERVAL: Duration = Duration::from_secs(10); +const LOOP_DELAY: Duration = Duration::from_secs(1); + +// -- Protocol messages -- + +#[derive(Serialize, Deserialize)] +struct PingMessage { + pub id: u64, + pub peer_list_hash: hash::Digest, +} + +impl Message for PingMessage { + const KIND: MessageKind = 0x42001000; + type Response = PingMessage; +} + +#[derive(Serialize, Deserialize)] +struct PeerListMessage { + pub list: Vec<(ed25519::PublicKey, SocketAddr)>, +} + +impl Message for PeerListMessage { + const KIND: MessageKind = 0x42001001; + type Response = PeerListMessage; +} + +// -- Algorithm data structures -- + +#[derive(Debug)] +struct PeerInfo { + addr: SocketAddr, + state: PeerConnState, + last_seen: Option<Instant>, + ping: VecDeque<Duration>, +} + +#[derive(Copy, Clone, Debug)] +pub struct PeerInfoPub { + pub id: ed25519::PublicKey, + pub addr: SocketAddr, + pub state: PeerConnState, + pub last_seen: Option<Instant>, + pub avg_ping: Option<Duration>, + pub max_ping: Option<Duration>, + pub med_ping: Option<Duration>, +} + +// PeerConnState: possible states for our tentative connections to given peer +// This module is only interested in recording connection info for outgoing +// TCP connections +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum PeerConnState { + // This entry represents ourself + Ourself, + + // We currently have a connection to this peer + Connected, + + // Our next connection tentative (the nth, where n is the first value) + // will be at given Instant + Waiting(usize, Instant), + + // A connection tentative is in progress + Trying(usize), + + // We abandonned trying to connect to this peer (too many failed attempts) + Abandonned, +} + +struct KnownHosts { + list: HashMap<ed25519::PublicKey, PeerInfo>, + hash: hash::Digest, +} + +impl KnownHosts { + fn new() -> Self { + let list = HashMap::new(); + let hash = Self::calculate_hash(&list); + Self { list, hash } + } + fn update_hash(&mut self) { + self.hash = Self::calculate_hash(&self.list); + } + fn map_into_vec( + input: &HashMap<ed25519::PublicKey, PeerInfo>, + ) -> Vec<(ed25519::PublicKey, SocketAddr)> { + let mut list = Vec::with_capacity(input.len()); + for (id, peer) in input.iter() { + if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { + list.push((id.clone(), peer.addr)); + } + } + list + } + fn calculate_hash(input: &HashMap<ed25519::PublicKey, PeerInfo>) -> hash::Digest { + let mut list = Self::map_into_vec(input); + list.sort(); + let mut hash_state = hash::State::new(); + for (id, addr) in list { + hash_state.update(&id[..]); + hash_state.update(&format!("{}", addr).into_bytes()[..]); + } + hash_state.finalize() + } +} + +pub struct FullMeshPeeringStrategy { + netapp: Arc<NetApp>, + known_hosts: RwLock<KnownHosts>, + next_ping_id: AtomicU64, +} + +impl FullMeshPeeringStrategy { + pub fn new( + netapp: Arc<NetApp>, + bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>, + ) -> Arc<Self> { + let mut known_hosts = KnownHosts::new(); + for (pk, addr) in bootstrap_list { + if pk != netapp.pubkey { + known_hosts.list.insert( + pk, + PeerInfo { + addr: addr, + state: PeerConnState::Waiting(0, Instant::now()), + last_seen: None, + ping: VecDeque::new(), + }, + ); + } + } + + let strat = Arc::new(Self { + netapp: netapp.clone(), + known_hosts: RwLock::new(known_hosts), + next_ping_id: AtomicU64::new(42), + }); + + let strat2 = strat.clone(); + netapp.add_msg_handler::<PingMessage, _, _>( + move |from: ed25519::PublicKey, ping: PingMessage| { + let ping_resp = PingMessage { + id: ping.id, + peer_list_hash: strat2.known_hosts.read().unwrap().hash, + }; + async move { + debug!("Ping from {}", hex::encode(&from)); + Ok(ping_resp) + } + }, + ); + + let strat2 = strat.clone(); + netapp.add_msg_handler::<PeerListMessage, _, _>( + move |_from: ed25519::PublicKey, peer_list: PeerListMessage| { + strat2.handle_peer_list(&peer_list.list[..]); + let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list); + let resp = PeerListMessage { list: peer_list }; + async move { Ok(resp) } + }, + ); + + let strat2 = strat.clone(); + netapp.on_connected.store(Some(Arc::new(Box::new( + move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { + let strat2 = strat2.clone(); + tokio::spawn(strat2.on_connected(pk, addr, is_incoming)); + }, + )))); + + let strat2 = strat.clone(); + netapp.on_disconnected.store(Some(Arc::new(Box::new( + move |pk: ed25519::PublicKey, is_incoming: bool| { + let strat2 = strat2.clone(); + tokio::spawn(strat2.on_disconnected(pk, is_incoming)); + }, + )))); + + strat + } + + pub async fn run(self: Arc<Self>) { + loop { + // 1. Read current state: get list of connected peers (ping them) + let known_hosts = self.known_hosts.read().unwrap(); + debug!("known_hosts: {} peers", known_hosts.list.len()); + + let mut to_ping = vec![]; + let mut to_retry = vec![]; + for (id, info) in known_hosts.list.iter() { + debug!("{}, {:?}", hex::encode(id), info); + match info.state { + PeerConnState::Connected => { + let must_ping = match info.last_seen { + None => true, + Some(t) => Instant::now() - t > PING_INTERVAL, + }; + if must_ping { + to_ping.push(id.clone()); + } + } + PeerConnState::Waiting(_, t) => { + if Instant::now() >= t { + to_retry.push(id.clone()); + } + } + _ => (), + } + } + drop(known_hosts); + + // 2. Dispatch ping to hosts + trace!("to_ping: {} peers", to_retry.len()); + for id in to_ping { + tokio::spawn(self.clone().ping(id)); + } + + // 3. Try reconnects + trace!("to_retry: {} peers", to_retry.len()); + if !to_retry.is_empty() { + let mut known_hosts = self.known_hosts.write().unwrap(); + for id in to_retry { + if let Some(h) = known_hosts.list.get_mut(&id) { + if let PeerConnState::Waiting(i, _) = h.state { + info!( + "Retrying connection to {} at {} ({})", + hex::encode(&id), + h.addr, + i + 1 + ); + h.state = PeerConnState::Trying(i); + tokio::spawn(self.clone().try_connect(id, h.addr.clone())); + } + } + } + } + + // 4. Sleep before next loop iteration + tokio::time::delay_for(LOOP_DELAY).await; + } + } + + async fn ping(self: Arc<Self>, id: ed25519::PublicKey) { + let peer = { + match self.netapp.client_conns.read().unwrap().get(&id) { + None => { + warn!("Should ping {}, but no connection", hex::encode(id)); + return; + } + Some(peer) => peer.clone(), + } + }; + + let peer_list_hash = self.known_hosts.read().unwrap().hash; + let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); + let ping_time = Instant::now(); + let ping_msg = PingMessage { + id: ping_id, + peer_list_hash, + }; + + debug!( + "Sending ping {} to {} at {:?}", + ping_id, + hex::encode(id), + ping_time + ); + match peer.clone().request(ping_msg, prio::HIGH).await { + Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e), + Ok(ping_resp) => { + let resp_time = Instant::now(); + debug!( + "Got ping response from {} at {:?}", + hex::encode(id), + resp_time + ); + { + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.last_seen = Some(resp_time); + host.ping.push_back(resp_time - ping_time); + while host.ping.len() > 10 { + host.ping.pop_front(); + } + } + } + if ping_resp.peer_list_hash != peer_list_hash { + self.exchange_peers(peer).await; + } + } + } + } + + async fn exchange_peers(self: Arc<Self>, peer: Arc<ClientConn>) { + let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); + let pex_message = PeerListMessage { list: peer_list }; + match peer.request(pex_message, prio::BACKGROUND).await { + Err(e) => warn!("Error doing peer exchange: {}", e), + Ok(resp) => { + self.handle_peer_list(&resp.list[..]); + } + } + } + + fn handle_peer_list(&self, list: &[(ed25519::PublicKey, SocketAddr)]) { + let mut known_hosts = self.known_hosts.write().unwrap(); + for (id, addr) in list.iter() { + if !known_hosts.list.contains_key(id) { + known_hosts.list.insert(*id, self.new_peer(id, *addr)); + } + } + } + + async fn try_connect(self: Arc<Self>, id: ed25519::PublicKey, addr: SocketAddr) { + let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await; + if let Err(e) = conn_result { + warn!("Error connecting to {}: {}", hex::encode(id), e); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.state = match host.state { + PeerConnState::Trying(i) => { + if i >= CONN_MAX_RETRIES { + PeerConnState::Abandonned + } else { + PeerConnState::Waiting(i + 1, Instant::now() + CONN_RETRY_INTERVAL) + } + } + _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL), + }; + } + } + } + + async fn on_connected( + self: Arc<Self>, + pk: ed25519::PublicKey, + addr: SocketAddr, + is_incoming: bool, + ) { + if is_incoming { + if !self.known_hosts.read().unwrap().list.contains_key(&pk) { + self.known_hosts + .write() + .unwrap() + .list + .insert(pk, self.new_peer(&pk, addr)); + } + } else { + info!("Successfully connected to {} at {}", hex::encode(&pk), addr); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&pk) { + host.state = PeerConnState::Connected; + known_hosts.update_hash(); + } + } + } + + async fn on_disconnected(self: Arc<Self>, pk: ed25519::PublicKey, is_incoming: bool) { + if !is_incoming { + info!("Connection to {} was closed", hex::encode(pk)); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&pk) { + host.state = PeerConnState::Waiting(0, Instant::now()); + known_hosts.update_hash(); + } + } + } + + pub fn get_peer_list(&self) -> Vec<PeerInfoPub> { + let known_hosts = self.known_hosts.read().unwrap(); + let mut ret = Vec::with_capacity(known_hosts.list.len()); + for (id, info) in known_hosts.list.iter() { + let mut pings = info.ping.iter().cloned().collect::<Vec<_>>(); + pings.sort(); + if pings.len() > 0 { + ret.push(PeerInfoPub { + id: id.clone(), + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: Some( + pings + .iter() + .fold(Duration::from_secs(0), |x, y| x + *y) + .div_f64(pings.len() as f64), + ), + max_ping: pings.last().cloned(), + med_ping: Some(pings[pings.len() / 2]), + }); + } else { + ret.push(PeerInfoPub { + id: id.clone(), + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: None, + max_ping: None, + med_ping: None, + }); + } + } + ret + } + + fn new_peer(&self, id: &ed25519::PublicKey, addr: SocketAddr) -> PeerInfo { + let state = if *id == self.netapp.pubkey { + PeerConnState::Ourself + } else { + PeerConnState::Waiting(0, Instant::now()) + }; + PeerInfo { + addr, + state, + last_seen: None, + ping: VecDeque::new(), + } + } +} |