aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-09-02 14:22:57 +0200
committerAlex <alex@adnab.me>2022-09-02 14:22:57 +0200
commita82700c5a27612002e6ee029ae77915b8114182f (patch)
tree7c79d9d17e1506f1fcddc6163fb27b2570f71113
parent8c73b276557405e5d7eac8139b6f8a79d0379200 (diff)
parentca25331d7320dddbe8f76c2a74bf406cd0622c2e (diff)
downloadnetapp-a82700c5a27612002e6ee029ae77915b8114182f.tar.gz
netapp-a82700c5a27612002e6ee029ae77915b8114182f.zip
Merge pull request 'Fix ping timeout and interval' (#4) from fix-ping into main
Reviewed-on: https://git.deuxfleurs.fr/lx/netapp/pulls/4
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml2
-rw-r--r--src/client.rs7
-rw-r--r--src/peering/fullmesh.rs77
-rw-r--r--src/proto.rs52
-rw-r--r--src/server.rs7
6 files changed, 91 insertions, 56 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9a55766..aa6b4a4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -428,7 +428,7 @@ dependencies = [
[[package]]
name = "netapp"
-version = "0.4.4"
+version = "0.4.5"
dependencies = [
"arc-swap",
"async-trait",
diff --git a/Cargo.toml b/Cargo.toml
index af3f1ab..9a69d66 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "netapp"
-version = "0.4.4"
+version = "0.4.5"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license-file = "LICENSE"
diff --git a/src/client.rs b/src/client.rs
index 8227e8f..5c5a05b 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -102,13 +102,16 @@ impl ClientConn {
netapp.connected_as_client(peer_id, conn.clone());
+ let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
+
tokio::spawn(async move {
- let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write));
+ let debug_name_2 = debug_name.clone();
+ let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write, debug_name_2));
let conn2 = conn.clone();
let recv_future = tokio::spawn(async move {
select! {
- r = conn2.recv_loop(read) => r,
+ r = conn2.recv_loop(read, debug_name) => r,
_ = await_exit(stop_recv_loop_recv) => Ok(())
}
});
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index 012c5a0..208cfe4 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -22,10 +22,10 @@ use crate::NodeID;
const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const CONN_MAX_RETRIES: usize = 10;
-const PING_INTERVAL: Duration = Duration::from_secs(10);
+const PING_INTERVAL: Duration = Duration::from_secs(15);
const LOOP_DELAY: Duration = Duration::from_secs(1);
-const PING_TIMEOUT: Duration = Duration::from_secs(5);
-const FAILED_PING_THRESHOLD: usize = 3;
+const PING_TIMEOUT: Duration = Duration::from_secs(10);
+const FAILED_PING_THRESHOLD: usize = 4;
// -- Protocol messages --
@@ -60,11 +60,26 @@ struct PeerInfoInternal {
all_addrs: Vec<SocketAddr>,
state: PeerConnState,
+ last_send_ping: Option<Instant>,
last_seen: Option<Instant>,
ping: VecDeque<Duration>,
failed_pings: usize,
}
+impl PeerInfoInternal {
+ fn new(addr: SocketAddr, state: PeerConnState) -> Self {
+ Self {
+ addr,
+ all_addrs: vec![addr],
+ state,
+ last_send_ping: None,
+ last_seen: None,
+ ping: VecDeque::new(),
+ failed_pings: 0,
+ }
+ }
+}
+
#[derive(Copy, Clone, Debug)]
pub struct PeerInfo {
/// The node's identifier (its public key)
@@ -184,14 +199,7 @@ impl FullMeshPeeringStrategy {
if id != netapp.id {
known_hosts.list.insert(
id,
- PeerInfoInternal {
- addr,
- all_addrs: vec![addr],
- state: PeerConnState::Waiting(0, Instant::now()),
- last_seen: None,
- ping: VecDeque::new(),
- failed_pings: 0,
- },
+ PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
);
}
}
@@ -199,14 +207,7 @@ impl FullMeshPeeringStrategy {
if let Some(addr) = our_addr {
known_hosts.list.insert(
netapp.id,
- PeerInfoInternal {
- addr,
- all_addrs: vec![addr],
- state: PeerConnState::Ourself,
- last_seen: None,
- ping: VecDeque::new(),
- failed_pings: 0,
- },
+ PeerInfoInternal::new(addr, PeerConnState::Ourself),
);
}
@@ -254,7 +255,7 @@ impl FullMeshPeeringStrategy {
trace!("{}, {:?}", hex::encode(&id[..8]), info);
match info.state {
PeerConnState::Connected => {
- let must_ping = match info.last_seen {
+ let must_ping = match info.last_send_ping {
None => true,
Some(t) => Instant::now() - t > PING_INTERVAL,
};
@@ -274,9 +275,16 @@ impl FullMeshPeeringStrategy {
};
// 2. Dispatch ping to hosts
- trace!("to_ping: {} peers", to_retry.len());
- for id in to_ping {
- tokio::spawn(self.clone().ping(id));
+ trace!("to_ping: {} peers", to_ping.len());
+ if !to_ping.is_empty() {
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ for id in to_ping.iter() {
+ known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now());
+ }
+ drop(known_hosts);
+ for id in to_ping {
+ tokio::spawn(self.clone().ping(id));
+ }
}
// 3. Try reconnects
@@ -534,17 +542,9 @@ impl FullMeshPeeringStrategy {
host.all_addrs.push(addr);
}
} else {
- known_hosts.list.insert(
- id,
- PeerInfoInternal {
- state: PeerConnState::Connected,
- addr,
- all_addrs: vec![addr],
- last_seen: None,
- ping: VecDeque::new(),
- failed_pings: 0,
- },
- );
+ known_hosts
+ .list
+ .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
}
}
known_hosts.update_hash();
@@ -569,14 +569,7 @@ impl FullMeshPeeringStrategy {
} else {
PeerConnState::Waiting(0, Instant::now())
};
- PeerInfoInternal {
- addr,
- all_addrs: vec![addr],
- state,
- last_seen: None,
- ping: VecDeque::new(),
- failed_pings: 0,
- }
+ PeerInfoInternal::new(addr, state)
}
}
diff --git a/src/proto.rs b/src/proto.rs
index e843bff..8f7e70f 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -1,4 +1,5 @@
use std::collections::{HashMap, VecDeque};
+use std::fmt::Write;
use std::sync::Arc;
use log::trace;
@@ -94,6 +95,22 @@ impl SendQueue {
fn is_empty(&self) -> bool {
self.items.iter().all(|(_k, v)| v.is_empty())
}
+ fn dump(&self) -> String {
+ let mut ret = String::new();
+ for (prio, q) in self.items.iter() {
+ for item in q.iter() {
+ write!(
+ &mut ret,
+ " [{} {} ({})]",
+ prio,
+ item.data.len() - item.cursor,
+ item.id
+ )
+ .unwrap();
+ }
+ }
+ ret
+ }
}
/// The SendLoop trait, which is implemented both by the client and the server
@@ -110,6 +127,7 @@ pub(crate) trait SendLoop: Sync {
self: Arc<Self>,
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
mut write: BoxStreamWrite<W>,
+ debug_name: String,
) -> Result<(), Error>
where
W: AsyncWriteExt + Unpin + Send + Sync,
@@ -117,8 +135,15 @@ pub(crate) trait SendLoop: Sync {
let mut sending = SendQueue::new();
let mut should_exit = false;
while !should_exit || !sending.is_empty() {
+ trace!("send_loop({}): queue = {}", debug_name, sending.dump());
if let Ok((id, prio, data)) = msg_recv.try_recv() {
- trace!("send_loop: got {}, {} bytes", id, data.len());
+ trace!(
+ "send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
+ debug_name,
+ id,
+ prio,
+ data.len()
+ );
sending.push(SendQueueItem {
id,
prio,
@@ -127,7 +152,8 @@ pub(crate) trait SendLoop: Sync {
});
} else if let Some(mut item) = sending.pop() {
trace!(
- "send_loop: sending bytes for {} ({} bytes, {} already sent)",
+ "send_loop({}): sending bytes for {} ({} bytes, {} already sent)",
+ debug_name,
item.id,
item.data.len(),
item.cursor
@@ -157,7 +183,13 @@ pub(crate) trait SendLoop: Sync {
} else {
let sth = msg_recv.recv().await;
if let Some((id, prio, data)) = sth {
- trace!("send_loop: got {}, {} bytes", id, data.len());
+ trace!(
+ "send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
+ debug_name,
+ id,
+ prio,
+ data.len()
+ );
sending.push(SendQueueItem {
id,
prio,
@@ -186,13 +218,12 @@ pub(crate) trait SendLoop: Sync {
pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
- async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
+ async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error>
where
R: AsyncReadExt + Unpin + Send + Sync,
{
let mut receiving = HashMap::new();
loop {
- trace!("recv_loop: reading packet");
let mut header_id = [0u8; RequestID::BITS as usize / 8];
match read.read_exact(&mut header_id[..]).await {
Ok(_) => (),
@@ -200,19 +231,24 @@ pub(crate) trait RecvLoop: Sync + 'static {
Err(e) => return Err(e.into()),
};
let id = RequestID::from_be_bytes(header_id);
- trace!("recv_loop: got header id: {:04x}", id);
let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
read.read_exact(&mut header_size[..]).await?;
let size = ChunkLength::from_be_bytes(header_size);
- trace!("recv_loop: got header size: {:04x}", size);
+ trace!(
+ "recv_loop({}): got header id = {}, size = 0x{:04x} ({} bytes)",
+ debug_name,
+ id,
+ size,
+ size & !CHUNK_HAS_CONTINUATION
+ );
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
let size = size & !CHUNK_HAS_CONTINUATION;
let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?;
- trace!("recv_loop: read {} bytes", next_slice.len());
+ trace!("recv_loop({}): read {} bytes", debug_name, next_slice.len());
let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default();
msg_bytes.extend_from_slice(&next_slice[..]);
diff --git a/src/server.rs b/src/server.rs
index 5465307..a835959 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -105,14 +105,17 @@ impl ServerConn {
netapp.connected_as_server(peer_id, conn.clone());
+ let debug_name = format!("SRV {}", hex::encode(&peer_id[..8]));
+ let debug_name_2 = debug_name.clone();
+
let conn2 = conn.clone();
let recv_future = tokio::spawn(async move {
select! {
- r = conn2.recv_loop(read) => r,
+ r = conn2.recv_loop(read, debug_name_2) => r,
_ = await_exit(must_exit) => Ok(())
}
});
- let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write));
+ let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write, debug_name));
recv_future.await.log_err("ServerConn recv_loop");
conn.resp_send.store(None);