aboutsummaryrefslogtreecommitdiff
path: root/src/netapp.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-13 17:12:13 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-13 17:12:13 +0200
commit70839d70d86354232f168e63ce4062219acb85c7 (patch)
tree9c956af0339aa048f487c3a4e54c320be8d13647 /src/netapp.rs
parent8dede69dee20b812ad1dcab5b374c60232409f4f (diff)
downloadnetapp-70839d70d86354232f168e63ce4062219acb85c7.tar.gz
netapp-70839d70d86354232f168e63ce4062219acb85c7.zip
Try to handle termination and closing of stuff properly
Diffstat (limited to 'src/netapp.rs')
-rw-r--r--src/netapp.rs107
1 files changed, 77 insertions, 30 deletions
diff --git a/src/netapp.rs b/src/netapp.rs
index b6994ea..bffa0e1 100644
--- a/src/netapp.rs
+++ b/src/netapp.rs
@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock};
-use log::{debug, info, error};
+use log::{debug, error, info, trace, warn};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
@@ -10,13 +10,18 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519;
+
+use futures::stream::futures_unordered::FuturesUnordered;
+use futures::stream::StreamExt;
use tokio::net::{TcpListener, TcpStream};
+use tokio::select;
+use tokio::sync::{mpsc, watch};
use crate::client::*;
-use crate::server::*;
use crate::endpoint::*;
use crate::error::*;
use crate::proto::*;
+use crate::server::*;
use crate::util::*;
#[derive(Serialize, Deserialize)]
@@ -142,35 +147,91 @@ impl NetApp {
/// Main listening process for our app. This future runs during the whole
/// run time of our application.
/// If this is not called, the NetApp instance remains a passive client.
- pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) {
+ pub async fn listen(
+ self: Arc<Self>,
+ listen_addr: SocketAddr,
+ public_addr: Option<IpAddr>,
+ mut must_exit: watch::Receiver<bool>,
+ ) {
let listen_params = ListenParams {
listen_addr,
public_addr,
};
- if self.listen_params.swap(Some(Arc::new(listen_params))).is_some() {
+ if self
+ .listen_params
+ .swap(Some(Arc::new(listen_params)))
+ .is_some()
+ {
error!("Trying to listen on NetApp but we're already listening!");
}
let listener = TcpListener::bind(listen_addr).await.unwrap();
info!("Listening on {}", listen_addr);
- loop {
- // The second item contains the IP and port of the new connection.
- let (socket, _) = listener.accept().await.unwrap();
+ let (conn_in, mut conn_out) = mpsc::unbounded_channel();
+ let connection_collector = tokio::spawn(async move {
+ let mut collection = FuturesUnordered::new();
+ loop {
+ if collection.is_empty() {
+ match conn_out.recv().await {
+ Some(f) => collection.push(f),
+ None => break,
+ }
+ } else {
+ select! {
+ new_fut = conn_out.recv() => {
+ match new_fut {
+ Some(f) => collection.push(f),
+ None => break,
+ }
+ }
+ result = collection.next() => {
+ trace!("Collected connection: {:?}", result);
+ }
+ }
+ }
+ }
+ debug!("Collecting last open server connections.");
+ while let Some(conn_res) = collection.next().await {
+ trace!("Collected connection: {:?}", conn_res);
+ }
+ debug!("No more server connections to collect");
+ });
+
+ while !*must_exit.borrow_and_update() {
+ let (socket, peer_addr) = select! {
+ sockres = listener.accept() => {
+ match sockres {
+ Ok(x) => x,
+ Err(e) => {
+ warn!("Error in listener.accept: {}", e);
+ continue;
+ }
+ }
+ },
+ _ = must_exit.changed() => continue,
+ };
+
info!(
"Incoming connection from {}, negotiating handshake...",
- match socket.peer_addr() {
- Ok(x) => format!("{}", x),
- Err(e) => format!("<invalid addr: {}>", e),
- }
+ peer_addr
);
let self2 = self.clone();
- tokio::spawn(async move {
- ServerConn::run(self2, socket)
- .await
- .log_err("ServerConn::run");
- });
+ let must_exit2 = must_exit.clone();
+ conn_in
+ .send(tokio::spawn(async move {
+ ServerConn::run(self2, socket, must_exit2)
+ .await
+ .log_err("ServerConn::run");
+ }))
+ .log_err("Failed to send connection to connection collector");
}
+
+ drop(conn_in);
+
+ connection_collector
+ .await
+ .log_err("Failed to await for connection collector");
}
/// Attempt to connect to a peer, given by its ip:port and its public key.
@@ -231,20 +292,6 @@ impl NetApp {
});
}
- /// Close the incoming connection from a certain client to us,
- /// if such a connection is currently open.
- pub fn server_disconnect(self: &Arc<Self>, id: &NodeID) {
- let conn = self.server_conns.read().unwrap().get(id).cloned();
- if let Some(c) = conn {
- debug!(
- "Closing incoming connection from {} ({})",
- hex::encode(c.peer_id),
- c.remote_addr
- );
- c.close();
- }
- }
-
// Called from conn.rs when an incoming connection is successfully established
// Registers the connection in our list of connections
// Do not yet call the on_connected handler, because we don't know if the remote