aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-03-15 17:01:51 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-15 17:03:41 +0100
commit22eaa0f404e433b3ef01052be48b8ff50142d85d (patch)
tree4d9cb5de1b76aa141b464f1147c25674795dca29
parentfa7cdf37476e11a77c7a07261bdf383a0bb07a7f (diff)
downloadnetapp-22eaa0f404e433b3ef01052be48b8ff50142d85d.tar.gz
netapp-22eaa0f404e433b3ef01052be48b8ff50142d85d.zip
Add logic to handle ping timeouts and other failures
-rw-r--r--src/peering/fullmesh.rs34
1 files changed, 32 insertions, 2 deletions
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index a5b2c3a..52d7fda 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -9,11 +9,13 @@ use async_trait::async_trait;
use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};
+use tokio::select;
use tokio::sync::watch;
use sodiumoxide::crypto::hash;
use crate::endpoint::*;
+use crate::error::*;
use crate::netapp::*;
use crate::proto::*;
use crate::NodeID;
@@ -22,6 +24,8 @@ const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const CONN_MAX_RETRIES: usize = 10;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const LOOP_DELAY: Duration = Duration::from_secs(1);
+const PING_TIMEOUT: Duration = Duration::from_secs(5);
+const FAILED_PING_THRESHOLD: usize = 3;
// -- Protocol messages --
@@ -52,6 +56,7 @@ struct PeerInfoInternal {
state: PeerConnState,
last_seen: Option<Instant>,
ping: VecDeque<Duration>,
+ failed_pings: usize,
}
#[derive(Copy, Clone, Debug)]
@@ -178,6 +183,7 @@ impl FullMeshPeeringStrategy {
state: PeerConnState::Waiting(0, Instant::now()),
last_seen: None,
ping: VecDeque::new(),
+ failed_pings: 0,
},
);
}
@@ -191,6 +197,7 @@ impl FullMeshPeeringStrategy {
state: PeerConnState::Ourself,
last_seen: None,
ping: VecDeque::new(),
+ failed_pings: 0,
},
);
}
@@ -347,8 +354,28 @@ impl FullMeshPeeringStrategy {
hex::encode(&id[..8]),
ping_time
);
- match self.ping_endpoint.call(&id, &ping_msg, PRIO_HIGH).await {
- Err(e) => warn!("Error pinging {}: {}", hex::encode(&id[..8]), e),
+ let ping_response = select! {
+ r = self.ping_endpoint.call(&id, &ping_msg, PRIO_HIGH) => r,
+ _ = tokio::time::sleep(PING_TIMEOUT) => Err(Error::Message("Ping timeout".into())),
+ };
+
+ match ping_response {
+ Err(e) => {
+ warn!("Error pinging {}: {}", hex::encode(&id[..8]), e);
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ if let Some(host) = known_hosts.list.get_mut(&id) {
+ host.failed_pings += 1;
+ if host.failed_pings > FAILED_PING_THRESHOLD {
+ warn!(
+ "Too many failed pings from {}, closing connection.",
+ hex::encode(&id[..8])
+ );
+ // this will later update info in known_hosts
+ // through the disconnection handler
+ self.netapp.disconnect(&id);
+ }
+ }
+ }
Ok(ping_resp) => {
let resp_time = Instant::now();
debug!(
@@ -359,6 +386,7 @@ impl FullMeshPeeringStrategy {
{
let mut known_hosts = self.known_hosts.write().unwrap();
if let Some(host) = known_hosts.list.get_mut(&id) {
+ host.failed_pings = 0;
host.last_seen = Some(resp_time);
host.ping.push_back(resp_time - ping_time);
while host.ping.len() > 10 {
@@ -455,6 +483,7 @@ impl FullMeshPeeringStrategy {
addr,
last_seen: None,
ping: VecDeque::new(),
+ failed_pings: 0,
},
);
}
@@ -486,6 +515,7 @@ impl FullMeshPeeringStrategy {
state,
last_seen: None,
ping: VecDeque::new(),
+ failed_pings: 0,
}
}
}