use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock};
use log::{debug, info};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519;
use tokio::net::{TcpListener, TcpStream};
use crate::conn::*;
use crate::endpoint::*;
use crate::error::*;
use crate::proto::*;
use crate::util::*;
#[derive(Serialize, Deserialize)]
pub(crate) struct HelloMessage {
pub server_addr: Option<IpAddr>,
pub server_port: u16,
}
impl Message for HelloMessage {
type Response = ();
}
type OnConnectHandler = Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>;
type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
/// NetApp is the main class that handles incoming and outgoing connections.
///
/// The `request()` method can be used to send a message to any peer to which we have
/// an outgoing connection, or to ourself. On the server side, these messages are
/// processed by the handlers that have been defined using `add_msg_handler()`.
///
/// NetApp can be used in a stand-alone fashion or together with a peering strategy.
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events
/// in order to manage information about the current peer list.
///
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
/// and RPS peering strategies take care of the most common use cases.
pub struct NetApp {
listen_params: ArcSwapOption<ListenParams>,
/// Network secret key
pub netid: auth::Key,
/// Our peer ID
pub id: NodeID,
/// Private key associated with our peer ID
pub privkey: ed25519::SecretKey,
pub(crate) server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>,
pub(crate) client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
pub(crate) endpoints: RwLock<HashMap<String, DynEndpoint>>,
hello_endpoint: ArcSwapOption<Endpoint<HelloMessage, NetApp>>,
on_connected_handler: ArcSwapOption<OnConnectHandler>,
on_disconnected_handler: ArcSwapOption<OnDisconnectHandler>,
}
struct ListenParams {
listen_addr: SocketAddr,
public_addr: Option<IpAddr>,
}
impl NetApp {
/// Creates a new instance of NetApp, which can serve either as a full p2p node,
/// or just as a passive client. To upgrade to a full p2p node, spawn a listener
/// using `.listen()`
///
/// Our Peer ID is the public key associated to the secret key given here.
pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
let id = privkey.public_key();
let netapp = Arc::new(Self {
listen_params: ArcSwapOption::new(None),
netid,
id,
privkey,
server_conns: RwLock::new(HashMap::new()),
client_conns: RwLock::new(HashMap::new()),
endpoints: RwLock::new(HashMap::new()),
hello_endpoint: ArcSwapOption::new(None),
on_connected_handler: ArcSwapOption::new(None),
on_disconnected_handler: ArcSwapOption::new(None),
});
netapp
.hello_endpoint
.swap(Some(netapp.endpoint("__netapp/netapp.rs/Hello".into())));
netapp
.hello_endpoint
.load_full()
.unwrap()
.set_handler(netapp.clone());
netapp
}
/// Set the handler to be called when a new connection (incoming or outgoing) has
/// been successfully established. Do not set this if using a peering strategy,
/// as the peering strategy will need to set this itself.
pub fn on_connected<F>(&self, handler: F)
where
F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static,
{
self.on_connected_handler
.store(Some(Arc::new(Box::new(handler))));
}
/// Set the handler to be called when an existing connection (incoming or outgoing) has
/// been closed by either party. Do not set this if using a peering strategy,
/// as the peering strategy will need to set this itself.
pub fn on_disconnected<F>(&self, handler: F)
where
F: Fn(NodeID, bool) + Sized + Send + Sync + 'static,
{
self.on_disconnected_handler
.store(Some(Arc::new(Box::new(handler))));
}
pub fn endpoint<M, H>(self: &Arc<Self>, name: String) -> Arc<Endpoint<M, H>>
where
M: Message + 'static,
H: EndpointHandler<M> + 'static,
{
let endpoint = Arc::new(Endpoint::<M, H>::new(self.clone(), name.clone()));
let endpoint_arc = EndpointArc(endpoint.clone());
if self
.endpoints
.write()
.unwrap()
.insert(name.clone(), Box::new(endpoint_arc))
.is_some()
{
panic!("Redefining endpoint: {}", name);
};
endpoint
}
/// Main listening process for our app. This future runs during the whole
/// run time of our application.
/// If this is not called, the NetApp instance remains a passive client.
pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) {
let listen_params = ListenParams {
listen_addr,
public_addr,
};
self.listen_params.store(Some(Arc::new(listen_params)));
let listener = TcpListener::bind(listen_addr).await.unwrap();
info!("Listening on {}", listen_addr);
loop {
// The second item contains the IP and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
info!(
"Incoming connection from {}, negotiating handshake...",
match socket.peer_addr() {
Ok(x) => format!("{}", x),
Err(e) => format!("<invalid addr: {}>", e),
}
);
let self2 = self.clone();
tokio::spawn(async move {
ServerConn::run(self2, socket)
.await
.log_err("ServerConn::run");
});
}
}
/// Attempt to connect to a peer, given by its ip:port and its public key.
/// The public key will be checked during the secret handshake process.
/// This function returns once the connection has been established and a
/// successfull handshake was made. At this point we can send messages to
/// the other node with `Netapp::request`
pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
// Don't connect to ourself, we don't care
// but pretend we did
if id == self.id {
tokio::spawn(async move {
if let Some(h) = self.on_connected_handler.load().as_ref() {
h(id, ip, false);
}
});
return Ok(());
}
// Don't connect if already connected
if self.client_conns.read().unwrap().contains_key(&id) {
return Ok(());
}
let socket = TcpStream::connect(ip).await?;
info!("Connected to {}, negotiating handshake...", ip);
ClientConn::init(self, socket, id).await?;
Ok(())
}
/// Close the outgoing connection we have to a node specified by its public key,
/// if such a connection is currently open.
pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
// If id is ourself, we're not supposed to have a connection open
if *id != self.id {
let conn = self.client_conns.write().unwrap().remove(id);
if let Some(c) = conn {
debug!(
"Closing connection to {} ({})",
hex::encode(c.peer_id),
c.remote_addr
);
c.close();
} else {
return;
}
}
// call on_disconnected_handler immediately, since the connection
// was removed
// (if id == self.id, we pretend we disconnected)
let id = *id;
let self2 = self.clone();
tokio::spawn(async move {
if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
h(id, false);
}
});
}
/// Close the incoming connection from a certain client to us,
/// if such a connection is currently open.
pub fn server_disconnect(self: &Arc<Self>, id: &NodeID) {
let conn = self.server_conns.read().unwrap().get(id).cloned();
if let Some(c) = conn {
debug!(
"Closing incoming connection from {} ({})",
hex::encode(c.peer_id),
c.remote_addr
);
c.close();
}
}
// Called from conn.rs when an incoming connection is successfully established
// Registers the connection in our list of connections
// Do not yet call the on_connected handler, because we don't know if the remote
// has an actual IP address and port we can call them back on.
// We will know this when they send a Hello message, which is handled below.
pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc<ServerConn>) {
info!("Accepted connection from {}", hex::encode(id));
self.server_conns.write().unwrap().insert(id, conn);
}
// Handle hello message from a client. This message is used for them to tell us
// that they are listening on a certain port number on which we can call them back.
// At this point we know they are a full network member, and not just a client,
// and we call the on_connected handler so that the peering strategy knows
// we have a new potential peer
// Called from conn.rs when an incoming connection is closed.
// We deregister the connection from server_conns and call the
// handler registered by on_disconnected
pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc<ServerConn>) {
info!("Connection from {} closed", hex::encode(id));
let mut conn_list = self.server_conns.write().unwrap();
if let Some(c) = conn_list.get(id) {
if Arc::ptr_eq(c, &conn) {
conn_list.remove(id);
drop(conn_list);
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
h(conn.peer_id, true);
}
}
}
}
// Called from conn.rs when an outgoinc connection is successfully established.
// The connection is registered in self.client_conns, and the
// on_connected handler is called.
//
// Since we are ourself listening, we send them a Hello message so that
// they know on which port to call us back. (TODO: don't do this if we are
// just a simple client and not a full p2p node)
pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc<ClientConn>) {
info!("Connection established to {}", hex::encode(id));
{
let old_c_opt = self.client_conns.write().unwrap().insert(id, conn.clone());
if let Some(old_c) = old_c_opt {
tokio::spawn(async move { old_c.close() });
}
}
if let Some(h) = self.on_connected_handler.load().as_ref() {
h(conn.peer_id, conn.remote_addr, false);
}
if let Some(lp) = self.listen_params.load_full() {
let server_addr = lp.public_addr;
let server_port = lp.listen_addr.port();
let hello_endpoint = self.hello_endpoint.load_full().unwrap();
tokio::spawn(async move {
hello_endpoint
.call(
&conn.peer_id,
HelloMessage {
server_addr,
server_port,
},
PRIO_NORMAL,
)
.await
.log_err("Sending hello message");
});
}
}
// Called from conn.rs when an outgoinc connection is closed.
// The connection is removed from conn_list, and the on_disconnected handler
// is called.
pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc<ClientConn>) {
info!("Connection to {} closed", hex::encode(id));
let mut conn_list = self.client_conns.write().unwrap();
if let Some(c) = conn_list.get(id) {
if Arc::ptr_eq(c, &conn) {
conn_list.remove(id);
drop(conn_list);
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
h(conn.peer_id, false);
}
}
}
// else case: happens if connection was removed in .disconnect()
// in which case on_disconnected_handler was already called
}
}
#[async_trait]
impl EndpointHandler<HelloMessage> for NetApp {
async fn handle(self: &Arc<Self>, msg: HelloMessage, from: NodeID) {
if let Some(h) = self.on_connected_handler.load().as_ref() {
if let Some(c) = self.server_conns.read().unwrap().get(&from) {
let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
h(from, remote_addr, true);
}
}
}
}