diff options
Diffstat (limited to 'src/netapp.rs')
-rw-r--r-- | src/netapp.rs | 172 |
1 files changed, 87 insertions, 85 deletions
diff --git a/src/netapp.rs b/src/netapp.rs index 8397be9..9733fb7 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -28,9 +28,7 @@ pub(crate) struct Handler { pub(crate) local_handler: Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>, pub(crate) net_handler: Box< - dyn Fn(ed25519::PublicKey, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> - + Sync - + Send, + dyn Fn(NodeID, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + Sync + Send, >, } @@ -47,30 +45,32 @@ pub(crate) struct Handler { /// It is generally not necessary to use NetApp stand-alone, as the provided full mesh /// and RPS peering strategies take care of the most common use cases. pub struct NetApp { - pub listen_addr: SocketAddr, - pub public_addr: Option<IpAddr>, + listen_params: ArcSwapOption<ListenParams>, + /// Network secret key pub netid: auth::Key, - pub pubkey: ed25519::PublicKey, + /// Our peer ID + pub id: NodeID, + /// Private key associated with our peer ID pub privkey: ed25519::SecretKey, - server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>, - client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>, + server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>, + client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>, pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>, - on_connected_handler: - ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>, - on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>, + on_connected_handler: ArcSwapOption<Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>>, + on_disconnected_handler: ArcSwapOption<Box<dyn Fn(NodeID, bool) + Send + Sync>>, } -async fn net_handler_aux<M, F, R>( - handler: Arc<F>, - remote: ed25519::PublicKey, - bytes: Bytes, -) -> Vec<u8> +struct ListenParams { + listen_addr: SocketAddr, + public_addr: Option<IpAddr>, +} + +async fn net_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, bytes: Bytes) -> Vec<u8> where M: Message + 'static, - F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, + F: Fn(NodeID, M) -> R + Send + Sync + 'static, R: Future<Output = <M as Message>::Response> + Send + Sync, { debug!( @@ -93,14 +93,10 @@ where rmp_to_vec_all_named(&res).unwrap_or(vec![]) } -async fn local_handler_aux<M, F, R>( - handler: Arc<F>, - remote: ed25519::PublicKey, - msg: DynMsg, -) -> DynMsg +async fn local_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, msg: DynMsg) -> DynMsg where M: Message + 'static, - F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, + F: Fn(NodeID, M) -> R + Send + Sync + 'static, R: Future<Output = <M as Message>::Response> + Send + Sync, { debug!("Handling message of kind {:08x} from ourself", M::KIND); @@ -110,19 +106,17 @@ where } impl NetApp { - /// Creates a new instance of NetApp. No background process is - pub fn new( - listen_addr: SocketAddr, - public_addr: Option<IpAddr>, - netid: auth::Key, - privkey: ed25519::SecretKey, - ) -> Arc<Self> { - let pubkey = privkey.public_key(); + /// Creates a new instance of NetApp, which can serve either as a full p2p node, + /// or just as a passive client. To upgrade to a full p2p node, spawn a listener + /// using `.listen()` + /// + /// Our Peer ID is the public key associated to the secret key given here. + pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> { + let id = privkey.public_key(); let netapp = Arc::new(Self { - listen_addr, - public_addr, + listen_params: ArcSwapOption::new(None), netid, - pubkey, + id, privkey, server_conns: RwLock::new(HashMap::new()), client_conns: RwLock::new(HashMap::new()), @@ -132,12 +126,10 @@ impl NetApp { }); let netapp2 = netapp.clone(); - netapp.add_msg_handler::<HelloMessage, _, _>( - move |from: ed25519::PublicKey, msg: HelloMessage| { - netapp2.handle_hello_message(from, msg); - async { () } - }, - ); + netapp.add_msg_handler::<HelloMessage, _, _>(move |from: NodeID, msg: HelloMessage| { + netapp2.handle_hello_message(from, msg); + async { () } + }); netapp } @@ -147,7 +139,7 @@ impl NetApp { /// as the peering strategy will need to set this itself. pub fn on_connected<F>(&self, handler: F) where - F: Fn(ed25519::PublicKey, SocketAddr, bool) + Sized + Send + Sync + 'static, + F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static, { self.on_connected_handler .store(Some(Arc::new(Box::new(handler)))); @@ -158,7 +150,7 @@ impl NetApp { /// as the peering strategy will need to set this itself. pub fn on_disconnected<F>(&self, handler: F) where - F: Fn(ed25519::PublicKey, bool) + Sized + Send + Sync + 'static, + F: Fn(NodeID, bool) + Sized + Send + Sync + 'static, { self.on_disconnected_handler .store(Some(Arc::new(Box::new(handler)))); @@ -169,19 +161,19 @@ impl NetApp { pub fn add_msg_handler<M, F, R>(&self, handler: F) where M: Message + 'static, - F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, + F: Fn(NodeID, M) -> R + Send + Sync + 'static, R: Future<Output = <M as Message>::Response> + Send + Sync + 'static, { let handler = Arc::new(handler); let handler2 = handler.clone(); - let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| { + let net_handler = Box::new(move |remote: NodeID, bytes: Bytes| { let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> = Box::pin(net_handler_aux(handler2.clone(), remote, bytes)); fun }); - let self_id = self.pubkey.clone(); + let self_id = self.id.clone(); let local_handler = Box::new(move |msg: DynMsg| { let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> = Box::pin(local_handler_aux(handler.clone(), self_id, msg)); @@ -200,9 +192,16 @@ impl NetApp { /// Main listening process for our app. This future runs during the whole /// run time of our application. - pub async fn listen(self: Arc<Self>) { - let mut listener = TcpListener::bind(self.listen_addr).await.unwrap(); - info!("Listening on {}", self.listen_addr); + /// If this is not called, the NetApp instance remains a passive client. + pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) { + let listen_params = ListenParams { + listen_addr, + public_addr, + }; + self.listen_params.store(Some(Arc::new(listen_params))); + + let mut listener = TcpListener::bind(listen_addr).await.unwrap(); + info!("Listening on {}", listen_addr); loop { // The second item contains the IP and port of the new connection. @@ -225,43 +224,39 @@ impl NetApp { /// This function returns once the connection has been established and a /// successfull handshake was made. At this point we can send messages to /// the other node with `Netapp::request` - pub async fn try_connect( - self: Arc<Self>, - ip: SocketAddr, - pk: ed25519::PublicKey, - ) -> Result<(), Error> { + pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> { // Don't connect to ourself, we don't care // but pretend we did - if pk == self.pubkey { + if id == self.id { tokio::spawn(async move { if let Some(h) = self.on_connected_handler.load().as_ref() { - h(pk, ip, false); + h(id, ip, false); } }); return Ok(()); } // Don't connect if already connected - if self.client_conns.read().unwrap().contains_key(&pk) { + if self.client_conns.read().unwrap().contains_key(&id) { return Ok(()); } let socket = TcpStream::connect(ip).await?; info!("Connected to {}, negotiating handshake...", ip); - ClientConn::init(self, socket, pk.clone()).await?; + ClientConn::init(self, socket, id.clone()).await?; Ok(()) } /// Close the outgoing connection we have to a node specified by its public key, /// if such a connection is currently open. - pub fn disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) { - // If pk is ourself, we're not supposed to have a connection open - if *pk != self.pubkey { - let conn = self.client_conns.write().unwrap().remove(pk); + pub fn disconnect(self: &Arc<Self>, id: &NodeID) { + // If id is ourself, we're not supposed to have a connection open + if *id != self.id { + let conn = self.client_conns.write().unwrap().remove(id); if let Some(c) = conn { debug!( "Closing connection to {} ({})", - hex::encode(c.peer_pk), + hex::encode(c.peer_id), c.remote_addr ); c.close(); @@ -272,24 +267,24 @@ impl NetApp { // call on_disconnected_handler immediately, since the connection // was removed - // (if pk == self.pubkey, we pretend we disconnected) - let pk = *pk; + // (if id == self.id, we pretend we disconnected) + let id = *id; let self2 = self.clone(); tokio::spawn(async move { if let Some(h) = self2.on_disconnected_handler.load().as_ref() { - h(pk, false); + h(id, false); } }); } /// Close the incoming connection from a certain client to us, /// if such a connection is currently open. - pub fn server_disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) { - let conn = self.server_conns.read().unwrap().get(pk).cloned(); + pub fn server_disconnect(self: &Arc<Self>, id: &NodeID) { + let conn = self.server_conns.read().unwrap().get(id).cloned(); if let Some(c) = conn { debug!( "Closing incoming connection from {} ({})", - hex::encode(c.peer_pk), + hex::encode(c.peer_id), c.remote_addr ); c.close(); @@ -301,7 +296,7 @@ impl NetApp { // Do not yet call the on_connected handler, because we don't know if the remote // has an actual IP address and port we can call them back on. // We will know this when they send a Hello message, which is handled below. - pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) { + pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc<ServerConn>) { info!("Accepted connection from {}", hex::encode(id)); self.server_conns.write().unwrap().insert(id, conn); @@ -312,11 +307,10 @@ impl NetApp { // At this point we know they are a full network member, and not just a client, // and we call the on_connected handler so that the peering strategy knows // we have a new potential peer - fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) { + fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) { if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(c) = self.server_conns.read().unwrap().get(&id) { - let remote_ip = msg.server_addr - .unwrap_or(c.remote_addr.ip()); + let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip()); let remote_addr = SocketAddr::new(remote_ip, msg.server_port); h(id, remote_addr, true); } @@ -326,7 +320,7 @@ impl NetApp { // Called from conn.rs when an incoming connection is closed. // We deregister the connection from server_conns and call the // handler registered by on_disconnected - pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) { + pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc<ServerConn>) { info!("Connection from {} closed", hex::encode(id)); let mut conn_list = self.server_conns.write().unwrap(); @@ -336,7 +330,7 @@ impl NetApp { drop(conn_list); if let Some(h) = self.on_disconnected_handler.load().as_ref() { - h(conn.peer_pk, true); + h(conn.peer_id, true); } } } @@ -349,7 +343,7 @@ impl NetApp { // Since we are ourself listening, we send them a Hello message so that // they know on which port to call us back. (TODO: don't do this if we are // just a simple client and not a full p2p node) - pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) { + pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc<ClientConn>) { info!("Connection established to {}", hex::encode(id)); { @@ -360,22 +354,30 @@ impl NetApp { } if let Some(h) = self.on_connected_handler.load().as_ref() { - h(conn.peer_pk, conn.remote_addr, false); + h(conn.peer_id, conn.remote_addr, false); } - let server_addr = self.public_addr; - let server_port = self.listen_addr.port(); - tokio::spawn(async move { - conn.request(HelloMessage { server_addr, server_port }, PRIO_NORMAL) + if let Some(lp) = self.listen_params.load_full() { + let server_addr = lp.public_addr; + let server_port = lp.listen_addr.port(); + tokio::spawn(async move { + conn.request( + HelloMessage { + server_addr, + server_port, + }, + PRIO_NORMAL, + ) .await .log_err("Sending hello message"); - }); + }); + } } // Called from conn.rs when an outgoinc connection is closed. // The connection is removed from conn_list, and the on_disconnected handler // is called. - pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) { + pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc<ClientConn>) { info!("Connection to {} closed", hex::encode(id)); let mut conn_list = self.client_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { @@ -384,7 +386,7 @@ impl NetApp { drop(conn_list); if let Some(h) = self.on_disconnected_handler.load().as_ref() { - h(conn.peer_pk, false); + h(conn.peer_id, false); } } } @@ -398,14 +400,14 @@ impl NetApp { /// The priority is an `u8`, with lower numbers meaning highest priority. pub async fn request<T>( &self, - target: &ed25519::PublicKey, + target: &NodeID, rq: T, prio: RequestPriority, ) -> Result<<T as Message>::Response, Error> where T: Message + 'static, { - if *target == self.pubkey { + if *target == self.id { let handler = self.msg_handlers.load().get(&T::KIND).cloned(); match handler { None => Err(Error::Message(format!( |