aboutsummaryrefslogblamecommitdiff
path: root/src/peering/fullmesh.rs
blob: b5796541ff83f3bde0133f5b94ffaade588860b0 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11





                                          
                             



                                    
 
                       

                     
                  














                                                              




                                    
                                            


                                  














                                        
                       






























                                                                                  
                                        











                                                             
                                                                                         


                                                                                                           
                                                            



                         
                                                                              














                                                                                 


                                                                 


                              
                                                                                                 
                                                        

                                                  
                                                        
                                           
                                                  
                                                     











                                                                                                 

                                                                                                            

                   

                                                                    

                                           



                                                                                            

                                           
                                                                             
                                                    
                                                                              
                   






                                                                                         
















                                                                                                              
                                                 



                                                                                   
                                                 
                                                        
                                         
                                 

                                                   




















                                                                                                       
                                                                                                           





                                                              
                                                             


                 
                                                    













                                                                                           
                                                                               


















                                                                                                
                                                                       




                                 
                                                               

                                                                                                 




                                                               






                                                                            
                                                                   







                                                                                       
                                                                             
                                                                                  

















                                                                                                                           
                                                                                                 
                                
                                                                                     



                                                 
                                                                              

                         
                                                                                            
                                                                                
                                                                           





                                                                      
                                                                                  
                                 
                                                                              
                                                                                
                                                                           











                                                                                       
                                              
                                                      
                                                













                                                                                                    
                                                











                                                                  

                                                                       











                                                                 
























                                                                                                 
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::sync::atomic::{self, AtomicU64};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use async_trait::async_trait;
use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};

use sodiumoxide::crypto::hash;

use crate::endpoint::*;
use crate::netapp::*;
use crate::proto::*;
use crate::NodeID;

const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const CONN_MAX_RETRIES: usize = 10;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const LOOP_DELAY: Duration = Duration::from_secs(1);

// -- Protocol messages --

#[derive(Serialize, Deserialize)]
struct PingMessage {
	pub id: u64,
	pub peer_list_hash: hash::Digest,
}

impl Message for PingMessage {
	type Response = PingMessage;
}

#[derive(Serialize, Deserialize)]
struct PeerListMessage {
	pub list: Vec<(NodeID, SocketAddr)>,
}

impl Message for PeerListMessage {
	type Response = PeerListMessage;
}

// -- Algorithm data structures --

#[derive(Debug)]
struct PeerInfo {
	addr: SocketAddr,
	state: PeerConnState,
	last_seen: Option<Instant>,
	ping: VecDeque<Duration>,
}

#[derive(Copy, Clone, Debug)]
pub struct PeerInfoPub {
	pub id: NodeID,
	pub addr: SocketAddr,
	pub state: PeerConnState,
	pub last_seen: Option<Instant>,
	pub avg_ping: Option<Duration>,
	pub max_ping: Option<Duration>,
	pub med_ping: Option<Duration>,
}

// PeerConnState: possible states for our tentative connections to given peer
// This module is only interested in recording connection info for outgoing
// TCP connections
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum PeerConnState {
	// This entry represents ourself
	Ourself,

	// We currently have a connection to this peer
	Connected,

	// Our next connection tentative (the nth, where n is the first value)
	// will be at given Instant
	Waiting(usize, Instant),

	// A connection tentative is in progress
	Trying(usize),

	// We abandonned trying to connect to this peer (too many failed attempts)
	Abandonned,
}

struct KnownHosts {
	list: HashMap<NodeID, PeerInfo>,
	hash: hash::Digest,
}

impl KnownHosts {
	fn new() -> Self {
		let list = HashMap::new();
		let hash = Self::calculate_hash(&list);
		Self { list, hash }
	}
	fn update_hash(&mut self) {
		self.hash = Self::calculate_hash(&self.list);
	}
	fn map_into_vec(input: &HashMap<NodeID, PeerInfo>) -> Vec<(NodeID, SocketAddr)> {
		let mut list = Vec::with_capacity(input.len());
		for (id, peer) in input.iter() {
			if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
				list.push((*id, peer.addr));
			}
		}
		list
	}
	fn calculate_hash(input: &HashMap<NodeID, PeerInfo>) -> hash::Digest {
		let mut list = Self::map_into_vec(input);
		list.sort();
		let mut hash_state = hash::State::new();
		for (id, addr) in list {
			hash_state.update(&id[..]);
			hash_state.update(&format!("{}", addr).into_bytes()[..]);
		}
		hash_state.finalize()
	}
}

pub struct FullMeshPeeringStrategy {
	netapp: Arc<NetApp>,
	known_hosts: RwLock<KnownHosts>,
	next_ping_id: AtomicU64,

	ping_endpoint: Arc<Endpoint<PingMessage, Self>>,
	peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>,
}

impl FullMeshPeeringStrategy {
	pub fn new(netapp: Arc<NetApp>, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc<Self> {
		let mut known_hosts = KnownHosts::new();
		for (id, addr) in bootstrap_list {
			if id != netapp.id {
				known_hosts.list.insert(
					id,
					PeerInfo {
						addr,
						state: PeerConnState::Waiting(0, Instant::now()),
						last_seen: None,
						ping: VecDeque::new(),
					},
				);
			}
		}

		let strat = Arc::new(Self {
			netapp: netapp.clone(),
			known_hosts: RwLock::new(known_hosts),
			next_ping_id: AtomicU64::new(42),
			ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()),
			peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()),
		});

		strat.ping_endpoint.set_handler(strat.clone());
		strat.peer_list_endpoint.set_handler(strat.clone());

		let strat2 = strat.clone();
		netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
			let strat2 = strat2.clone();
			tokio::spawn(strat2.on_connected(id, addr, is_incoming));
		});

		let strat2 = strat.clone();
		netapp.on_disconnected(move |id: NodeID, is_incoming: bool| {
			let strat2 = strat2.clone();
			tokio::spawn(strat2.on_disconnected(id, is_incoming));
		});

		strat
	}

	pub async fn run(self: Arc<Self>) {
		loop {
			// 1. Read current state: get list of connected peers (ping them)
			let (to_ping, to_retry) = {
				let known_hosts = self.known_hosts.read().unwrap();
				debug!("known_hosts: {} peers", known_hosts.list.len());

				let mut to_ping = vec![];
				let mut to_retry = vec![];
				for (id, info) in known_hosts.list.iter() {
					debug!("{}, {:?}", hex::encode(id), info);
					match info.state {
						PeerConnState::Connected => {
							let must_ping = match info.last_seen {
								None => true,
								Some(t) => Instant::now() - t > PING_INTERVAL,
							};
							if must_ping {
								to_ping.push(*id);
							}
						}
						PeerConnState::Waiting(_, t) => {
							if Instant::now() >= t {
								to_retry.push(*id);
							}
						}
						_ => (),
					}
				}
				(to_ping, to_retry)
			};

			// 2. Dispatch ping to hosts
			trace!("to_ping: {} peers", to_retry.len());
			for id in to_ping {
				tokio::spawn(self.clone().ping(id));
			}

			// 3. Try reconnects
			trace!("to_retry: {} peers", to_retry.len());
			if !to_retry.is_empty() {
				let mut known_hosts = self.known_hosts.write().unwrap();
				for id in to_retry {
					if let Some(h) = known_hosts.list.get_mut(&id) {
						if let PeerConnState::Waiting(i, _) = h.state {
							info!(
								"Retrying connection to {} at {} ({})",
								hex::encode(&id),
								h.addr,
								i + 1
							);
							h.state = PeerConnState::Trying(i);
							tokio::spawn(self.clone().try_connect(id, h.addr));
						}
					}
				}
			}

			// 4. Sleep before next loop iteration
			tokio::time::sleep(LOOP_DELAY).await;
		}
	}

	async fn ping(self: Arc<Self>, id: NodeID) {
		let peer_list_hash = self.known_hosts.read().unwrap().hash;
		let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
		let ping_time = Instant::now();
		let ping_msg = PingMessage {
			id: ping_id,
			peer_list_hash,
		};

		debug!(
			"Sending ping {} to {} at {:?}",
			ping_id,
			hex::encode(id),
			ping_time
		);
		match self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH).await {
			Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
			Ok(ping_resp) => {
				let resp_time = Instant::now();
				debug!(
					"Got ping response from {} at {:?}",
					hex::encode(id),
					resp_time
				);
				{
					let mut known_hosts = self.known_hosts.write().unwrap();
					if let Some(host) = known_hosts.list.get_mut(&id) {
						host.last_seen = Some(resp_time);
						host.ping.push_back(resp_time - ping_time);
						while host.ping.len() > 10 {
							host.ping.pop_front();
						}
					}
				}
				if ping_resp.peer_list_hash != peer_list_hash {
					self.exchange_peers(&id).await;
				}
			}
		}
	}

	async fn exchange_peers(self: Arc<Self>, id: &NodeID) {
		let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
		let pex_message = PeerListMessage { list: peer_list };
		match self
			.peer_list_endpoint
			.call(id, pex_message, PRIO_BACKGROUND)
			.await
		{
			Err(e) => warn!("Error doing peer exchange: {}", e),
			Ok(resp) => {
				self.handle_peer_list(&resp.list[..]);
			}
		}
	}

	fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) {
		let mut known_hosts = self.known_hosts.write().unwrap();
		for (id, addr) in list.iter() {
			if !known_hosts.list.contains_key(id) {
				known_hosts.list.insert(*id, self.new_peer(id, *addr));
			}
		}
	}

	async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
		let conn_result = self.netapp.clone().try_connect(addr, id).await;
		if let Err(e) = conn_result {
			warn!("Error connecting to {}: {}", hex::encode(id), e);
			let mut known_hosts = self.known_hosts.write().unwrap();
			if let Some(host) = known_hosts.list.get_mut(&id) {
				host.state = match host.state {
					PeerConnState::Trying(i) => {
						if i >= CONN_MAX_RETRIES {
							PeerConnState::Abandonned
						} else {
							PeerConnState::Waiting(i + 1, Instant::now() + CONN_RETRY_INTERVAL)
						}
					}
					_ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL),
				};
			}
		}
	}

	async fn on_connected(self: Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
		if is_incoming {
			if !self.known_hosts.read().unwrap().list.contains_key(&id) {
				self.known_hosts
					.write()
					.unwrap()
					.list
					.insert(id, self.new_peer(&id, addr));
			}
		} else {
			info!("Successfully connected to {} at {}", hex::encode(&id), addr);
			let mut known_hosts = self.known_hosts.write().unwrap();
			if let Some(host) = known_hosts.list.get_mut(&id) {
				host.state = PeerConnState::Connected;
				known_hosts.update_hash();
			}
		}
	}

	async fn on_disconnected(self: Arc<Self>, id: NodeID, is_incoming: bool) {
		if !is_incoming {
			info!("Connection to {} was closed", hex::encode(id));
			let mut known_hosts = self.known_hosts.write().unwrap();
			if let Some(host) = known_hosts.list.get_mut(&id) {
				host.state = PeerConnState::Waiting(0, Instant::now());
				known_hosts.update_hash();
			}
		}
	}

	pub fn get_peer_list(&self) -> Vec<PeerInfoPub> {
		let known_hosts = self.known_hosts.read().unwrap();
		let mut ret = Vec::with_capacity(known_hosts.list.len());
		for (id, info) in known_hosts.list.iter() {
			let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
			pings.sort();
			if !pings.is_empty() {
				ret.push(PeerInfoPub {
					id: *id,
					addr: info.addr,
					state: info.state,
					last_seen: info.last_seen,
					avg_ping: Some(
						pings
							.iter()
							.fold(Duration::from_secs(0), |x, y| x + *y)
							.div_f64(pings.len() as f64),
					),
					max_ping: pings.last().cloned(),
					med_ping: Some(pings[pings.len() / 2]),
				});
			} else {
				ret.push(PeerInfoPub {
					id: *id,
					addr: info.addr,
					state: info.state,
					last_seen: info.last_seen,
					avg_ping: None,
					max_ping: None,
					med_ping: None,
				});
			}
		}
		ret
	}

	fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfo {
		let state = if *id == self.netapp.id {
			PeerConnState::Ourself
		} else {
			PeerConnState::Waiting(0, Instant::now())
		};
		PeerInfo {
			addr,
			state,
			last_seen: None,
			ping: VecDeque::new(),
		}
	}
}

#[async_trait]
impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy {
	async fn handle(self: &Arc<Self>, ping: PingMessage, from: NodeID) -> PingMessage {
		let ping_resp = PingMessage {
			id: ping.id,
			peer_list_hash: self.known_hosts.read().unwrap().hash,
		};
		debug!("Ping from {}", hex::encode(&from));
		ping_resp
	}
}

#[async_trait]
impl EndpointHandler<PeerListMessage> for FullMeshPeeringStrategy {
	async fn handle(
		self: &Arc<Self>,
		peer_list: PeerListMessage,
		_from: NodeID,
	) -> PeerListMessage {
		self.handle_peer_list(&peer_list.list[..]);
		let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
		PeerListMessage { list: peer_list }
	}
}