diff options
Diffstat (limited to 'src/netapp.rs')
-rw-r--r-- | src/netapp.rs | 133 |
1 files changed, 111 insertions, 22 deletions
diff --git a/src/netapp.rs b/src/netapp.rs index 25c3b5a..e903f44 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -33,17 +33,31 @@ pub(crate) struct Handler { >, } +/// NetApp is the main class that handles incoming and outgoing connections. +/// +/// The `request()` method can be used to send a message to any peer to which we have +/// an outgoing connection, or to ourself. On the server side, these messages are +/// processed by the handlers that have been defined using `add_msg_handler()`. +/// +/// NetApp can be used in a stand-alone fashion or together with a peering strategy. +/// If using it alone, you will want to set `on_connect` and `on_disconnect` events +/// in order to manage information about the current peer list. +/// +/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh +/// and RPS peering strategies take care of the most common use cases. pub struct NetApp { pub listen_addr: SocketAddr, pub netid: auth::Key, pub pubkey: ed25519::PublicKey, pub privkey: ed25519::SecretKey, - pub server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>, - pub client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>, + + server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>, + client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>, + pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>, - pub(crate) on_connected: + on_connected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>, - pub(crate) on_disconnected: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>, + on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>, } async fn net_handler_aux<M, F, R>( @@ -78,13 +92,14 @@ where F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, R: Future<Output = <M as Message>::Response> + Send + Sync, { - debug!("Handling message of kind {:08x} from ourself", M::KIND,); + debug!("Handling message of kind {:08x} from ourself", M::KIND); let msg = (msg as Box<dyn Any + 'static>).downcast::<M>().unwrap(); let res = handler(remote, *msg).await; Box::new(res) } impl NetApp { + /// Creates a new instance of NetApp. No background process is pub fn new( listen_addr: SocketAddr, netid: auth::Key, @@ -99,8 +114,8 @@ impl NetApp { server_conns: RwLock::new(HashMap::new()), client_conns: RwLock::new(HashMap::new()), msg_handlers: ArcSwap::new(Arc::new(HashMap::new())), - on_connected: ArcSwapOption::new(None), - on_disconnected: ArcSwapOption::new(None), + on_connected_handler: ArcSwapOption::new(None), + on_disconnected_handler: ArcSwapOption::new(None), }); let netapp2 = netapp.clone(); @@ -114,6 +129,26 @@ impl NetApp { netapp } + /// Set the handler to be called when a new connection (incoming or outgoing) has + /// been successfully established. Do not set this if using a peering strategy, + /// as the peering strategy will need to set this itself. + pub fn on_connected<F>(&self, handler: F) + where F: Fn(ed25519::PublicKey, SocketAddr, bool) + Sized + Send + Sync + 'static + { + self.on_connected_handler.store(Some(Arc::new(Box::new(handler)))); + } + + /// Set the handler to be called when an existing connection (incoming or outgoing) has + /// been closed by either party. Do not set this if using a peering strategy, + /// as the peering strategy will need to set this itself. + pub fn on_disconnected<F>(&self, handler: F) + where F: Fn(ed25519::PublicKey, bool) + Sized + Send + Sync + 'static + { + self.on_disconnected_handler.store(Some(Arc::new(Box::new(handler)))); + } + + /// Add a handler for a certain message type. Note that only one handler + /// can be specified for each message type. pub fn add_msg_handler<M, F, R>(&self, handler: F) where M: Message + 'static, @@ -122,10 +157,10 @@ impl NetApp { { let handler = Arc::new(handler); - let handler1 = handler.clone(); + let handler2 = handler.clone(); let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| { let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> = - Box::pin(net_handler_aux(handler1.clone(), remote, bytes)); + Box::pin(net_handler_aux(handler2.clone(), remote, bytes)); fun }); @@ -146,6 +181,8 @@ impl NetApp { self.msg_handlers.store(Arc::new(handlers)); } + /// Main listening process for our app. This future runs during the whole + /// run time of our application. pub async fn listen(self: Arc<Self>) { let mut listener = TcpListener::bind(self.listen_addr).await.unwrap(); info!("Listening on {}", self.listen_addr); @@ -166,16 +203,21 @@ impl NetApp { } } + /// Attempt to connect to a peer, given by its ip:port and its public key. + /// The public key will be checked during the secret handshake process. + /// This function returns once the connection has been established and a + /// successfull handshake was made. At this point we can send messages to + /// the other node with `Netapp::request` pub async fn try_connect( self: Arc<Self>, ip: SocketAddr, pk: ed25519::PublicKey, ) -> Result<(), Error> { + // Don't connect to ourself, we don't care + // but pretend we did if pk == self.pubkey { - // Don't connect to ourself, we don't care - // but pretend we did tokio::spawn(async move { - if let Some(h) = self.on_connected.load().as_ref() { + if let Some(h) = self.on_connected_handler.load().as_ref() { h(pk, ip, false); } }); @@ -193,11 +235,16 @@ impl NetApp { Ok(()) } - pub fn disconnect(self: Arc<Self>, pk: &ed25519::PublicKey) { + /// Close the outgoing connection we have to a node specified by its public key, + /// if such a connection is currently open. + pub fn disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) { + // Don't disconnect from ourself (we aren't connected anyways) + // but pretend we did if *pk == self.pubkey { let pk = *pk; + let self2 = self.clone(); tokio::spawn(async move { - if let Some(h) = self.on_disconnected.load().as_ref() { + if let Some(h) = self2.on_disconnected_handler.load().as_ref() { h(pk, false); } }); @@ -206,10 +253,30 @@ impl NetApp { let conn = self.client_conns.read().unwrap().get(pk).cloned(); if let Some(c) = conn { + debug!("Closing connection to {} ({})", + hex::encode(c.peer_pk), + c.remote_addr); c.close(); } } + /// Close the incoming connection from a certain client to us, + /// if such a connection is currently open. + pub fn server_disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) { + let conn = self.server_conns.read().unwrap().get(pk).cloned(); + if let Some(c) = conn { + debug!("Closing incoming connection from {} ({})", + hex::encode(c.peer_pk), + c.remote_addr); + c.close(); + } + } + + // Called from conn.rs when an incoming connection is successfully established + // Registers the connection in our list of connections + // Do not yet call the on_connected handler, because we don't know if the remote + // has an actual IP address and port we can call them back on. + // We will know this when they send a Hello message, which is handled below. pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) { info!("Accepted connection from {}", hex::encode(id)); @@ -217,8 +284,13 @@ impl NetApp { conn_list.insert(id.clone(), conn); } + // Handle hello message from a client. This message is used for them to tell us + // that they are listening on a certain port number on which we can call them back. + // At this point we know they are a full network member, and not just a client, + // and we call the on_connected handler so that the peering strategy knows + // we have a new potential peer fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) { - if let Some(h) = self.on_connected.load().as_ref() { + if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(c) = self.server_conns.read().unwrap().get(&id) { let remote_addr = SocketAddr::new(c.remote_addr.ip(), msg.server_port); h(id, remote_addr, true); @@ -226,6 +298,9 @@ impl NetApp { } } + // Called from conn.rs when an incoming connection is closed. + // We deregister the connection from server_conns and call the + // handler registered by on_disconnected pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) { info!("Connection from {} closed", hex::encode(id)); @@ -235,12 +310,19 @@ impl NetApp { conn_list.remove(id); } - if let Some(h) = self.on_disconnected.load().as_ref() { + if let Some(h) = self.on_disconnected_handler.load().as_ref() { h(conn.peer_pk, true); } } } + // Called from conn.rs when an outgoinc connection is successfully established. + // The connection is registered in self.client_conns, and the + // on_connected handler is called. + // + // Since we are ourself listening, we send them a Hello message so that + // they know on which port to call us back. (TODO: don't do this if we are + // just a simple client and not a full p2p node) pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) { info!("Connection established to {}", hex::encode(id)); @@ -251,32 +333,39 @@ impl NetApp { } } - if let Some(h) = self.on_connected.load().as_ref() { + if let Some(h) = self.on_connected_handler.load().as_ref() { h(conn.peer_pk, conn.remote_addr, false); } + let server_port = self.listen_addr.port(); tokio::spawn(async move { - let server_port = conn.netapp.listen_addr.port(); - conn.request(HelloMessage { server_port }, prio::NORMAL) + conn.request(HelloMessage { server_port }, PRIO_NORMAL) .await .log_err("Sending hello message"); }); } + // Called from conn.rs when an outgoinc connection is closed. + // The connection is removed from conn_list, and the on_disconnected handler + // is called. pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) { info!("Connection to {} closed", hex::encode(id)); let mut conn_list = self.client_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { conn_list.remove(id); - } - if let Some(h) = self.on_disconnected.load().as_ref() { - h(conn.peer_pk, false); + if let Some(h) = self.on_disconnected_handler.load().as_ref() { + h(conn.peer_pk, false); + } } } } + /// Send a message to a remote host to which a client connection is already + /// established, and await their response. The target is the id of the peer we + /// want to send the message to. + /// The priority is an `u8`, with lower numbers meaning highest priority. pub async fn request<T>( &self, target: &ed25519::PublicKey, |