aboutsummaryrefslogtreecommitdiff
path: root/src/netapp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/netapp.rs')
-rw-r--r--src/netapp.rs133
1 files changed, 111 insertions, 22 deletions
diff --git a/src/netapp.rs b/src/netapp.rs
index 25c3b5a..e903f44 100644
--- a/src/netapp.rs
+++ b/src/netapp.rs
@@ -33,17 +33,31 @@ pub(crate) struct Handler {
>,
}
+/// NetApp is the main class that handles incoming and outgoing connections.
+///
+/// The `request()` method can be used to send a message to any peer to which we have
+/// an outgoing connection, or to ourself. On the server side, these messages are
+/// processed by the handlers that have been defined using `add_msg_handler()`.
+///
+/// NetApp can be used in a stand-alone fashion or together with a peering strategy.
+/// If using it alone, you will want to set `on_connect` and `on_disconnect` events
+/// in order to manage information about the current peer list.
+///
+/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
+/// and RPS peering strategies take care of the most common use cases.
pub struct NetApp {
pub listen_addr: SocketAddr,
pub netid: auth::Key,
pub pubkey: ed25519::PublicKey,
pub privkey: ed25519::SecretKey,
- pub server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>,
- pub client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>,
+
+ server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>,
+ client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>,
+
pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
- pub(crate) on_connected:
+ on_connected_handler:
ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>,
- pub(crate) on_disconnected: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
+ on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
}
async fn net_handler_aux<M, F, R>(
@@ -78,13 +92,14 @@ where
F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
R: Future<Output = <M as Message>::Response> + Send + Sync,
{
- debug!("Handling message of kind {:08x} from ourself", M::KIND,);
+ debug!("Handling message of kind {:08x} from ourself", M::KIND);
let msg = (msg as Box<dyn Any + 'static>).downcast::<M>().unwrap();
let res = handler(remote, *msg).await;
Box::new(res)
}
impl NetApp {
+ /// Creates a new instance of NetApp. No background process is
pub fn new(
listen_addr: SocketAddr,
netid: auth::Key,
@@ -99,8 +114,8 @@ impl NetApp {
server_conns: RwLock::new(HashMap::new()),
client_conns: RwLock::new(HashMap::new()),
msg_handlers: ArcSwap::new(Arc::new(HashMap::new())),
- on_connected: ArcSwapOption::new(None),
- on_disconnected: ArcSwapOption::new(None),
+ on_connected_handler: ArcSwapOption::new(None),
+ on_disconnected_handler: ArcSwapOption::new(None),
});
let netapp2 = netapp.clone();
@@ -114,6 +129,26 @@ impl NetApp {
netapp
}
+ /// Set the handler to be called when a new connection (incoming or outgoing) has
+ /// been successfully established. Do not set this if using a peering strategy,
+ /// as the peering strategy will need to set this itself.
+ pub fn on_connected<F>(&self, handler: F)
+ where F: Fn(ed25519::PublicKey, SocketAddr, bool) + Sized + Send + Sync + 'static
+ {
+ self.on_connected_handler.store(Some(Arc::new(Box::new(handler))));
+ }
+
+ /// Set the handler to be called when an existing connection (incoming or outgoing) has
+ /// been closed by either party. Do not set this if using a peering strategy,
+ /// as the peering strategy will need to set this itself.
+ pub fn on_disconnected<F>(&self, handler: F)
+ where F: Fn(ed25519::PublicKey, bool) + Sized + Send + Sync + 'static
+ {
+ self.on_disconnected_handler.store(Some(Arc::new(Box::new(handler))));
+ }
+
+ /// Add a handler for a certain message type. Note that only one handler
+ /// can be specified for each message type.
pub fn add_msg_handler<M, F, R>(&self, handler: F)
where
M: Message + 'static,
@@ -122,10 +157,10 @@ impl NetApp {
{
let handler = Arc::new(handler);
- let handler1 = handler.clone();
+ let handler2 = handler.clone();
let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| {
let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> =
- Box::pin(net_handler_aux(handler1.clone(), remote, bytes));
+ Box::pin(net_handler_aux(handler2.clone(), remote, bytes));
fun
});
@@ -146,6 +181,8 @@ impl NetApp {
self.msg_handlers.store(Arc::new(handlers));
}
+ /// Main listening process for our app. This future runs during the whole
+ /// run time of our application.
pub async fn listen(self: Arc<Self>) {
let mut listener = TcpListener::bind(self.listen_addr).await.unwrap();
info!("Listening on {}", self.listen_addr);
@@ -166,16 +203,21 @@ impl NetApp {
}
}
+ /// Attempt to connect to a peer, given by its ip:port and its public key.
+ /// The public key will be checked during the secret handshake process.
+ /// This function returns once the connection has been established and a
+ /// successfull handshake was made. At this point we can send messages to
+ /// the other node with `Netapp::request`
pub async fn try_connect(
self: Arc<Self>,
ip: SocketAddr,
pk: ed25519::PublicKey,
) -> Result<(), Error> {
+ // Don't connect to ourself, we don't care
+ // but pretend we did
if pk == self.pubkey {
- // Don't connect to ourself, we don't care
- // but pretend we did
tokio::spawn(async move {
- if let Some(h) = self.on_connected.load().as_ref() {
+ if let Some(h) = self.on_connected_handler.load().as_ref() {
h(pk, ip, false);
}
});
@@ -193,11 +235,16 @@ impl NetApp {
Ok(())
}
- pub fn disconnect(self: Arc<Self>, pk: &ed25519::PublicKey) {
+ /// Close the outgoing connection we have to a node specified by its public key,
+ /// if such a connection is currently open.
+ pub fn disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) {
+ // Don't disconnect from ourself (we aren't connected anyways)
+ // but pretend we did
if *pk == self.pubkey {
let pk = *pk;
+ let self2 = self.clone();
tokio::spawn(async move {
- if let Some(h) = self.on_disconnected.load().as_ref() {
+ if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
h(pk, false);
}
});
@@ -206,10 +253,30 @@ impl NetApp {
let conn = self.client_conns.read().unwrap().get(pk).cloned();
if let Some(c) = conn {
+ debug!("Closing connection to {} ({})",
+ hex::encode(c.peer_pk),
+ c.remote_addr);
c.close();
}
}
+ /// Close the incoming connection from a certain client to us,
+ /// if such a connection is currently open.
+ pub fn server_disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) {
+ let conn = self.server_conns.read().unwrap().get(pk).cloned();
+ if let Some(c) = conn {
+ debug!("Closing incoming connection from {} ({})",
+ hex::encode(c.peer_pk),
+ c.remote_addr);
+ c.close();
+ }
+ }
+
+ // Called from conn.rs when an incoming connection is successfully established
+ // Registers the connection in our list of connections
+ // Do not yet call the on_connected handler, because we don't know if the remote
+ // has an actual IP address and port we can call them back on.
+ // We will know this when they send a Hello message, which is handled below.
pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) {
info!("Accepted connection from {}", hex::encode(id));
@@ -217,8 +284,13 @@ impl NetApp {
conn_list.insert(id.clone(), conn);
}
+ // Handle hello message from a client. This message is used for them to tell us
+ // that they are listening on a certain port number on which we can call them back.
+ // At this point we know they are a full network member, and not just a client,
+ // and we call the on_connected handler so that the peering strategy knows
+ // we have a new potential peer
fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) {
- if let Some(h) = self.on_connected.load().as_ref() {
+ if let Some(h) = self.on_connected_handler.load().as_ref() {
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
let remote_addr = SocketAddr::new(c.remote_addr.ip(), msg.server_port);
h(id, remote_addr, true);
@@ -226,6 +298,9 @@ impl NetApp {
}
}
+ // Called from conn.rs when an incoming connection is closed.
+ // We deregister the connection from server_conns and call the
+ // handler registered by on_disconnected
pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) {
info!("Connection from {} closed", hex::encode(id));
@@ -235,12 +310,19 @@ impl NetApp {
conn_list.remove(id);
}
- if let Some(h) = self.on_disconnected.load().as_ref() {
+ if let Some(h) = self.on_disconnected_handler.load().as_ref() {
h(conn.peer_pk, true);
}
}
}
+ // Called from conn.rs when an outgoinc connection is successfully established.
+ // The connection is registered in self.client_conns, and the
+ // on_connected handler is called.
+ //
+ // Since we are ourself listening, we send them a Hello message so that
+ // they know on which port to call us back. (TODO: don't do this if we are
+ // just a simple client and not a full p2p node)
pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) {
info!("Connection established to {}", hex::encode(id));
@@ -251,32 +333,39 @@ impl NetApp {
}
}
- if let Some(h) = self.on_connected.load().as_ref() {
+ if let Some(h) = self.on_connected_handler.load().as_ref() {
h(conn.peer_pk, conn.remote_addr, false);
}
+ let server_port = self.listen_addr.port();
tokio::spawn(async move {
- let server_port = conn.netapp.listen_addr.port();
- conn.request(HelloMessage { server_port }, prio::NORMAL)
+ conn.request(HelloMessage { server_port }, PRIO_NORMAL)
.await
.log_err("Sending hello message");
});
}
+ // Called from conn.rs when an outgoinc connection is closed.
+ // The connection is removed from conn_list, and the on_disconnected handler
+ // is called.
pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) {
info!("Connection to {} closed", hex::encode(id));
let mut conn_list = self.client_conns.write().unwrap();
if let Some(c) = conn_list.get(id) {
if Arc::ptr_eq(c, &conn) {
conn_list.remove(id);
- }
- if let Some(h) = self.on_disconnected.load().as_ref() {
- h(conn.peer_pk, false);
+ if let Some(h) = self.on_disconnected_handler.load().as_ref() {
+ h(conn.peer_pk, false);
+ }
}
}
}
+ /// Send a message to a remote host to which a client connection is already
+ /// established, and await their response. The target is the id of the peer we
+ /// want to send the message to.
+ /// The priority is an `u8`, with lower numbers meaning highest priority.
pub async fn request<T>(
&self,
target: &ed25519::PublicKey,