diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-13 17:12:13 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-13 17:12:13 +0200 |
commit | 70839d70d86354232f168e63ce4062219acb85c7 (patch) | |
tree | 9c956af0339aa048f487c3a4e54c320be8d13647 /src/netapp.rs | |
parent | 8dede69dee20b812ad1dcab5b374c60232409f4f (diff) | |
download | netapp-70839d70d86354232f168e63ce4062219acb85c7.tar.gz netapp-70839d70d86354232f168e63ce4062219acb85c7.zip |
Try to handle termination and closing of stuff properly
Diffstat (limited to 'src/netapp.rs')
-rw-r--r-- | src/netapp.rs | 107 |
1 files changed, 77 insertions, 30 deletions
diff --git a/src/netapp.rs b/src/netapp.rs index b6994ea..bffa0e1 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::{Arc, RwLock}; -use log::{debug, info, error}; +use log::{debug, error, info, trace, warn}; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -10,13 +10,18 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; + +use futures::stream::futures_unordered::FuturesUnordered; +use futures::stream::StreamExt; use tokio::net::{TcpListener, TcpStream}; +use tokio::select; +use tokio::sync::{mpsc, watch}; use crate::client::*; -use crate::server::*; use crate::endpoint::*; use crate::error::*; use crate::proto::*; +use crate::server::*; use crate::util::*; #[derive(Serialize, Deserialize)] @@ -142,35 +147,91 @@ impl NetApp { /// Main listening process for our app. This future runs during the whole /// run time of our application. /// If this is not called, the NetApp instance remains a passive client. - pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) { + pub async fn listen( + self: Arc<Self>, + listen_addr: SocketAddr, + public_addr: Option<IpAddr>, + mut must_exit: watch::Receiver<bool>, + ) { let listen_params = ListenParams { listen_addr, public_addr, }; - if self.listen_params.swap(Some(Arc::new(listen_params))).is_some() { + if self + .listen_params + .swap(Some(Arc::new(listen_params))) + .is_some() + { error!("Trying to listen on NetApp but we're already listening!"); } let listener = TcpListener::bind(listen_addr).await.unwrap(); info!("Listening on {}", listen_addr); - loop { - // The second item contains the IP and port of the new connection. - let (socket, _) = listener.accept().await.unwrap(); + let (conn_in, mut conn_out) = mpsc::unbounded_channel(); + let connection_collector = tokio::spawn(async move { + let mut collection = FuturesUnordered::new(); + loop { + if collection.is_empty() { + match conn_out.recv().await { + Some(f) => collection.push(f), + None => break, + } + } else { + select! { + new_fut = conn_out.recv() => { + match new_fut { + Some(f) => collection.push(f), + None => break, + } + } + result = collection.next() => { + trace!("Collected connection: {:?}", result); + } + } + } + } + debug!("Collecting last open server connections."); + while let Some(conn_res) = collection.next().await { + trace!("Collected connection: {:?}", conn_res); + } + debug!("No more server connections to collect"); + }); + + while !*must_exit.borrow_and_update() { + let (socket, peer_addr) = select! { + sockres = listener.accept() => { + match sockres { + Ok(x) => x, + Err(e) => { + warn!("Error in listener.accept: {}", e); + continue; + } + } + }, + _ = must_exit.changed() => continue, + }; + info!( "Incoming connection from {}, negotiating handshake...", - match socket.peer_addr() { - Ok(x) => format!("{}", x), - Err(e) => format!("<invalid addr: {}>", e), - } + peer_addr ); let self2 = self.clone(); - tokio::spawn(async move { - ServerConn::run(self2, socket) - .await - .log_err("ServerConn::run"); - }); + let must_exit2 = must_exit.clone(); + conn_in + .send(tokio::spawn(async move { + ServerConn::run(self2, socket, must_exit2) + .await + .log_err("ServerConn::run"); + })) + .log_err("Failed to send connection to connection collector"); } + + drop(conn_in); + + connection_collector + .await + .log_err("Failed to await for connection collector"); } /// Attempt to connect to a peer, given by its ip:port and its public key. @@ -231,20 +292,6 @@ impl NetApp { }); } - /// Close the incoming connection from a certain client to us, - /// if such a connection is currently open. - pub fn server_disconnect(self: &Arc<Self>, id: &NodeID) { - let conn = self.server_conns.read().unwrap().get(id).cloned(); - if let Some(c) = conn { - debug!( - "Closing incoming connection from {} ({})", - hex::encode(c.peer_id), - c.remote_addr - ); - c.close(); - } - } - // Called from conn.rs when an incoming connection is successfully established // Registers the connection in our list of connections // Do not yet call the on_connected handler, because we don't know if the remote |