From 22eaa0f404e433b3ef01052be48b8ff50142d85d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 15 Mar 2022 17:01:51 +0100 Subject: Add logic to handle ping timeouts and other failures --- src/peering/fullmesh.rs | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index a5b2c3a..52d7fda 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -9,11 +9,13 @@ use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; +use tokio::select; use tokio::sync::watch; use sodiumoxide::crypto::hash; use crate::endpoint::*; +use crate::error::*; use crate::netapp::*; use crate::proto::*; use crate::NodeID; @@ -22,6 +24,8 @@ const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); const CONN_MAX_RETRIES: usize = 10; const PING_INTERVAL: Duration = Duration::from_secs(10); const LOOP_DELAY: Duration = Duration::from_secs(1); +const PING_TIMEOUT: Duration = Duration::from_secs(5); +const FAILED_PING_THRESHOLD: usize = 3; // -- Protocol messages -- @@ -52,6 +56,7 @@ struct PeerInfoInternal { state: PeerConnState, last_seen: Option, ping: VecDeque, + failed_pings: usize, } #[derive(Copy, Clone, Debug)] @@ -178,6 +183,7 @@ impl FullMeshPeeringStrategy { state: PeerConnState::Waiting(0, Instant::now()), last_seen: None, ping: VecDeque::new(), + failed_pings: 0, }, ); } @@ -191,6 +197,7 @@ impl FullMeshPeeringStrategy { state: PeerConnState::Ourself, last_seen: None, ping: VecDeque::new(), + failed_pings: 0, }, ); } @@ -347,8 +354,28 @@ impl FullMeshPeeringStrategy { hex::encode(&id[..8]), ping_time ); - match self.ping_endpoint.call(&id, &ping_msg, PRIO_HIGH).await { - Err(e) => warn!("Error pinging {}: {}", hex::encode(&id[..8]), e), + let ping_response = select! { + r = self.ping_endpoint.call(&id, &ping_msg, PRIO_HIGH) => r, + _ = tokio::time::sleep(PING_TIMEOUT) => Err(Error::Message("Ping timeout".into())), + }; + + match ping_response { + Err(e) => { + warn!("Error pinging {}: {}", hex::encode(&id[..8]), e); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.failed_pings += 1; + if host.failed_pings > FAILED_PING_THRESHOLD { + warn!( + "Too many failed pings from {}, closing connection.", + hex::encode(&id[..8]) + ); + // this will later update info in known_hosts + // through the disconnection handler + self.netapp.disconnect(&id); + } + } + } Ok(ping_resp) => { let resp_time = Instant::now(); debug!( @@ -359,6 +386,7 @@ impl FullMeshPeeringStrategy { { let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { + host.failed_pings = 0; host.last_seen = Some(resp_time); host.ping.push_back(resp_time - ping_time); while host.ping.len() > 10 { @@ -455,6 +483,7 @@ impl FullMeshPeeringStrategy { addr, last_seen: None, ping: VecDeque::new(), + failed_pings: 0, }, ); } @@ -486,6 +515,7 @@ impl FullMeshPeeringStrategy { state, last_seen: None, ping: VecDeque::new(), + failed_pings: 0, } } } -- cgit v1.2.3