aboutsummaryrefslogblamecommitdiff
path: root/src/netapp.rs
blob: e49f599727577e8c2648c612b82cab69c80a5513 (plain) (tree)
1
2
3
4
5
6
                  
                              
                                   

                             
                       

















                                         

                                                   


                                                                            

                                                                                                 
                                 

                                                                                                    
 
                           

                                               

 





                                                                                     


                                                                                    


                                                                                      
                   
                                                   
 
                              
                             


                                                   
                                        
 

                                                               
 
                                                                             

                                                                    

 




                                    
                                                                                           

                             
                                                      
                                                                   





                                                          
                                        
                                                                              

                                                          
          






                                                           
                                                      

 
                                                                                           

                             
                                                      

                                                                   
                                                                        




                                                                           
             


                                                                                        

                                                                                  
                                                                                
                                              
                                            
                                                                
                              
                           



                                                                             

                                                                          


                                             

                                                                                                     
                                
                   



                      



                                                                                         
             
                                                                                


                                                                  





                                                                                               
             
                                                                    


                                                                  



                                                                                

                                                                                 


                                                          
                                                              
                                                                                     

                                                
 
                                               
                                                                                
                                                                                       
                                                                                           


                           
                                      


                                                                                           

                           





                                             
                                                                             
                                               


                                                            

                                                                                 







                                                                                                    
                                                                             
                                                      





                                                                                          



                                                                                   









                                                                    




                                                                                  
                                                                                                    

                                                          
                                  
                                                 
                                                                                            
                                                         





                                                     
                                                                        

                                      
 

                                                                       
                                                          


                      

                                                                                        



                                                                                 
                                               

                                                                        
                                                               

                                                     



                                          

                 

                                                                                 

                                                                 


                                                                                        
                                             

                         

         

                                                                      

                                                                              
                                       

                                                                           
                                                       

                                             








                                                                                        
                                                                                     

                                                                      
                                                                    

         




                                                                                           
                                                                       
                                                                            
                                                                                     
                                                                                                      
                                                                                              




                                                         


                                                                      
                                                                                         

                                                                    



                                                                       
                                                
 
                                                                                               
                                                              
                                 



                         






                                                                                       
                                                                                     

                                                                       
                 

                                                                                                    



                                                                           
                                                                            
                                                                 

                 










                                                                  

                                                                  

                           

         


                                                                                    
                                                                                         
                                                                  



                                                                       
                                                
 
                                                                                               
                                                               
                                 

                         

                                                                                
         
 



                                                                                      

                                
                                





                                                    
                                       

























                                                                                          
 
use std::any::Any;
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::time::Instant;

use std::future::Future;

use log::{debug, info};

use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;

use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519;
use tokio::net::{TcpListener, TcpStream};

use crate::conn::*;
use crate::error::*;
use crate::message::*;
use crate::proto::*;
use crate::util::*;

type DynMsg = Box<dyn Any + Send + Sync + 'static>;

type OnConnectHandler = Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>;
type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;

pub(crate) type LocalHandler =
	Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>;
pub(crate) type NetHandler = Box<
	dyn Fn(NodeID, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + Sync + Send,
>;

pub(crate) struct Handler {
	pub(crate) local_handler: LocalHandler,
	pub(crate) net_handler: NetHandler,
}

/// NetApp is the main class that handles incoming and outgoing connections.
///
/// The `request()` method can be used to send a message to any peer to which we have
/// an outgoing connection, or to ourself. On the server side, these messages are
/// processed by the handlers that have been defined using `add_msg_handler()`.
///
/// NetApp can be used in a stand-alone fashion or together with a peering strategy.
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events
/// in order to manage information about the current peer list.
///
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
/// and RPS peering strategies take care of the most common use cases.
pub struct NetApp {
	listen_params: ArcSwapOption<ListenParams>,

	/// Network secret key
	pub netid: auth::Key,
	/// Our peer ID
	pub id: NodeID,
	/// Private key associated with our peer ID
	pub privkey: ed25519::SecretKey,

	server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>,
	client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,

	pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
	on_connected_handler: ArcSwapOption<OnConnectHandler>,
	on_disconnected_handler: ArcSwapOption<OnDisconnectHandler>,
}

struct ListenParams {
	listen_addr: SocketAddr,
	public_addr: Option<IpAddr>,
}

async fn net_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, bytes: Bytes) -> Vec<u8>
where
	M: Message + 'static,
	F: Fn(NodeID, M) -> R + Send + Sync + 'static,
	R: Future<Output = <M as Message>::Response> + Send + Sync,
{
	debug!(
		"Handling message of kind {:08x} from {}",
		M::KIND,
		hex::encode(remote)
	);
	let begin_time = Instant::now();
	let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) {
		Ok(msg) => Ok(handler(remote, msg).await),
		Err(e) => Err(e.to_string()),
	};
	let end_time = Instant::now();
	debug!(
		"Request {:08x} from {} handled in {}msec",
		M::KIND,
		hex::encode(remote),
		(end_time - begin_time).as_millis()
	);
	rmp_to_vec_all_named(&res).unwrap_or_default()
}

async fn local_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, msg: DynMsg) -> DynMsg
where
	M: Message + 'static,
	F: Fn(NodeID, M) -> R + Send + Sync + 'static,
	R: Future<Output = <M as Message>::Response> + Send + Sync,
{
	debug!("Handling message of kind {:08x} from ourself", M::KIND);
	let msg = (msg as Box<dyn Any + 'static>).downcast::<M>().unwrap();
	let res = handler(remote, *msg).await;
	Box::new(res)
}

impl NetApp {
	/// Creates a new instance of NetApp, which can serve either as a full p2p node,
	/// or just as a passive client. To upgrade to a full p2p node, spawn a listener
	/// using `.listen()`
	///
	/// Our Peer ID is the public key associated to the secret key given here.
	pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
		let id = privkey.public_key();
		let netapp = Arc::new(Self {
			listen_params: ArcSwapOption::new(None),
			netid,
			id,
			privkey,
			server_conns: RwLock::new(HashMap::new()),
			client_conns: RwLock::new(HashMap::new()),
			msg_handlers: ArcSwap::new(Arc::new(HashMap::new())),
			on_connected_handler: ArcSwapOption::new(None),
			on_disconnected_handler: ArcSwapOption::new(None),
		});

		let netapp2 = netapp.clone();
		netapp.add_msg_handler::<HelloMessage, _, _>(move |from: NodeID, msg: HelloMessage| {
			netapp2.handle_hello_message(from, msg);
			async {}
		});

		netapp
	}

	/// Set the handler to be called when a new connection (incoming or outgoing) has
	/// been successfully established. Do not set this if using a peering strategy,
	/// as the peering strategy will need to set this itself.
	pub fn on_connected<F>(&self, handler: F)
	where
		F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static,
	{
		self.on_connected_handler
			.store(Some(Arc::new(Box::new(handler))));
	}

	/// Set the handler to be called when an existing connection (incoming or outgoing) has
	/// been closed by either party. Do not set this if using a peering strategy,
	/// as the peering strategy will need to set this itself.
	pub fn on_disconnected<F>(&self, handler: F)
	where
		F: Fn(NodeID, bool) + Sized + Send + Sync + 'static,
	{
		self.on_disconnected_handler
			.store(Some(Arc::new(Box::new(handler))));
	}

	/// Add a handler for a certain message type. Note that only one handler
	/// can be specified for each message type.
	/// The handler is an asynchronous function, i.e. a function that returns
	/// a future.
	pub fn add_msg_handler<M, F, R>(&self, handler: F)
	where
		M: Message + 'static,
		F: Fn(NodeID, M) -> R + Send + Sync + 'static,
		R: Future<Output = <M as Message>::Response> + Send + Sync + 'static,
	{
		let handler = Arc::new(handler);

		let handler2 = handler.clone();
		let net_handler = Box::new(move |remote: NodeID, bytes: Bytes| {
			let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> =
				Box::pin(net_handler_aux(handler2.clone(), remote, bytes));
			fun
		});

		let self_id = self.id;
		let local_handler = Box::new(move |msg: DynMsg| {
			let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> =
				Box::pin(local_handler_aux(handler.clone(), self_id, msg));
			fun
		});

		let funs = Arc::new(Handler {
			net_handler,
			local_handler,
		});

		let mut handlers = self.msg_handlers.load().as_ref().clone();
		handlers.insert(M::KIND, funs);
		self.msg_handlers.store(Arc::new(handlers));
	}

	/// Main listening process for our app. This future runs during the whole
	/// run time of our application.
	/// If this is not called, the NetApp instance remains a passive client.
	pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) {
		let listen_params = ListenParams {
			listen_addr,
			public_addr,
		};
		self.listen_params.store(Some(Arc::new(listen_params)));

		let listener = TcpListener::bind(listen_addr).await.unwrap();
		info!("Listening on {}", listen_addr);

		loop {
			// The second item contains the IP and port of the new connection.
			let (socket, _) = listener.accept().await.unwrap();
			info!(
				"Incoming connection from {}, negotiating handshake...",
				match socket.peer_addr() {
					Ok(x) => format!("{}", x),
					Err(e) => format!("<invalid addr: {}>", e),
				}
			);
			let self2 = self.clone();
			tokio::spawn(async move {
				ServerConn::run(self2, socket)
					.await
					.log_err("ServerConn::run");
			});
		}
	}

	/// Attempt to connect to a peer, given by its ip:port and its public key.
	/// The public key will be checked during the secret handshake process.
	/// This function returns once the connection has been established and a
	/// successfull handshake was made. At this point we can send messages to
	/// the other node with `Netapp::request`
	pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
		// Don't connect to ourself, we don't care
		// but pretend we did
		if id == self.id {
			tokio::spawn(async move {
				if let Some(h) = self.on_connected_handler.load().as_ref() {
					h(id, ip, false);
				}
			});
			return Ok(());
		}

		// Don't connect if already connected
		if self.client_conns.read().unwrap().contains_key(&id) {
			return Ok(());
		}

		let socket = TcpStream::connect(ip).await?;
		info!("Connected to {}, negotiating handshake...", ip);
		ClientConn::init(self, socket, id).await?;
		Ok(())
	}

	/// Close the outgoing connection we have to a node specified by its public key,
	/// if such a connection is currently open.
	pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
		// If id is ourself, we're not supposed to have a connection open
		if *id != self.id {
			let conn = self.client_conns.write().unwrap().remove(id);
			if let Some(c) = conn {
				debug!(
					"Closing connection to {} ({})",
					hex::encode(c.peer_id),
					c.remote_addr
				);
				c.close();
			} else {
				return;
			}
		}

		// call on_disconnected_handler immediately, since the connection
		// was removed
		// (if id == self.id, we pretend we disconnected)
		let id = *id;
		let self2 = self.clone();
		tokio::spawn(async move {
			if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
				h(id, false);
			}
		});
	}

	/// Close the incoming connection from a certain client to us,
	/// if such a connection is currently open.
	pub fn server_disconnect(self: &Arc<Self>, id: &NodeID) {
		let conn = self.server_conns.read().unwrap().get(id).cloned();
		if let Some(c) = conn {
			debug!(
				"Closing incoming connection from {} ({})",
				hex::encode(c.peer_id),
				c.remote_addr
			);
			c.close();
		}
	}

	// Called from conn.rs when an incoming connection is successfully established
	// Registers the connection in our list of connections
	// Do not yet call the on_connected handler, because we don't know if the remote
	// has an actual IP address and port we can call them back on.
	// We will know this when they send a Hello message, which is handled below.
	pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc<ServerConn>) {
		info!("Accepted connection from {}", hex::encode(id));

		self.server_conns.write().unwrap().insert(id, conn);
	}

	// Handle hello message from a client. This message is used for them to tell us
	// that they are listening on a certain port number on which we can call them back.
	// At this point we know they are a full network member, and not just a client,
	// and we call the on_connected handler so that the peering strategy knows
	// we have a new potential peer
	fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) {
		if let Some(h) = self.on_connected_handler.load().as_ref() {
			if let Some(c) = self.server_conns.read().unwrap().get(&id) {
				let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
				let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
				h(id, remote_addr, true);
			}
		}
	}

	// Called from conn.rs when an incoming connection is closed.
	// We deregister the connection from server_conns and call the
	// handler registered by on_disconnected
	pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc<ServerConn>) {
		info!("Connection from {} closed", hex::encode(id));

		let mut conn_list = self.server_conns.write().unwrap();
		if let Some(c) = conn_list.get(id) {
			if Arc::ptr_eq(c, &conn) {
				conn_list.remove(id);
				drop(conn_list);

				if let Some(h) = self.on_disconnected_handler.load().as_ref() {
					h(conn.peer_id, true);
				}
			}
		}
	}

	// Called from conn.rs when an outgoinc connection is successfully established.
	// The connection is registered in self.client_conns, and the
	// on_connected handler is called.
	//
	// Since we are ourself listening, we send them a Hello message so that
	// they know on which port to call us back. (TODO: don't do this if we are
	// just a simple client and not a full p2p node)
	pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc<ClientConn>) {
		info!("Connection established to {}", hex::encode(id));

		{
			let old_c_opt = self.client_conns.write().unwrap().insert(id, conn.clone());
			if let Some(old_c) = old_c_opt {
				tokio::spawn(async move { old_c.close() });
			}
		}

		if let Some(h) = self.on_connected_handler.load().as_ref() {
			h(conn.peer_id, conn.remote_addr, false);
		}

		if let Some(lp) = self.listen_params.load_full() {
			let server_addr = lp.public_addr;
			let server_port = lp.listen_addr.port();
			tokio::spawn(async move {
				conn.request(
					HelloMessage {
						server_addr,
						server_port,
					},
					PRIO_NORMAL,
				)
				.await
				.log_err("Sending hello message");
			});
		}
	}

	// Called from conn.rs when an outgoinc connection is closed.
	// The connection is removed from conn_list, and the on_disconnected handler
	// is called.
	pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc<ClientConn>) {
		info!("Connection to {} closed", hex::encode(id));
		let mut conn_list = self.client_conns.write().unwrap();
		if let Some(c) = conn_list.get(id) {
			if Arc::ptr_eq(c, &conn) {
				conn_list.remove(id);
				drop(conn_list);

				if let Some(h) = self.on_disconnected_handler.load().as_ref() {
					h(conn.peer_id, false);
				}
			}
		}
		// else case: happens if connection was removed in .disconnect()
		// in which case on_disconnected_handler was already called
	}

	/// Send a message to a remote host to which a client connection is already
	/// established, and await their response. The target is the id of the peer we
	/// want to send the message to.
	/// The priority is an `u8`, with lower numbers meaning highest priority.
	pub async fn request<T>(
		&self,
		target: &NodeID,
		rq: T,
		prio: RequestPriority,
	) -> Result<<T as Message>::Response, Error>
	where
		T: Message + 'static,
	{
		if *target == self.id {
			let handler = self.msg_handlers.load().get(&T::KIND).cloned();
			match handler {
				None => Err(Error::Message(format!(
					"No handler registered for message kind {:08x}",
					T::KIND
				))),
				Some(h) => {
					let local_handler = &h.local_handler;
					let res = local_handler(Box::new(rq)).await;
					let res_t = (res as Box<dyn Any + 'static>)
						.downcast::<<T as Message>::Response>()
						.unwrap();
					Ok(*res_t)
				}
			}
		} else {
			let conn = self.client_conns.read().unwrap().get(target).cloned();
			match conn {
				None => Err(Error::Message(format!(
					"Not connected: {}",
					hex::encode(target)
				))),
				Some(c) => c.request(rq, prio).await,
			}
		}
	}
}