aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-01 14:23:10 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-01 14:23:10 +0200
commit22d96929d5416750e1f5889ee6cc16b382293104 (patch)
tree7b9407132ffa02fa5c7e9f6040f90f0e071b4bf3 /src
parent4a59b73d7bfd0f136f654e874afb5d2a9bf4df2e (diff)
parentd75146fb8157dd03c156e5f7ce4834fa1d72b581 (diff)
downloadnetapp-22d96929d5416750e1f5889ee6cc16b382293104.tar.gz
netapp-22d96929d5416750e1f5889ee6cc16b382293104.zip
Merge branch 'fix-ping' into stream-body
Diffstat (limited to 'src')
-rw-r--r--src/client.rs7
-rw-r--r--src/peering/fullmesh.rs77
-rw-r--r--src/recv.rs16
-rw-r--r--src/send.rs11
-rw-r--r--src/server.rs7
5 files changed, 61 insertions, 57 deletions
diff --git a/src/client.rs b/src/client.rs
index df54810..9726125 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -100,13 +100,16 @@ impl ClientConn {
netapp.connected_as_client(peer_id, conn.clone());
+ let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
+
tokio::spawn(async move {
- let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write));
+ let debug_name_2 = debug_name.clone();
+ let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write, debug_name_2));
let conn2 = conn.clone();
let recv_future = tokio::spawn(async move {
select! {
- r = conn2.recv_loop(read) => r,
+ r = conn2.recv_loop(read, debug_name) => r,
_ = await_exit(stop_recv_loop_recv) => Ok(())
}
});
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index ccbd0ba..7f1c065 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -23,10 +23,10 @@ use crate::NodeID;
const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const CONN_MAX_RETRIES: usize = 10;
-const PING_INTERVAL: Duration = Duration::from_secs(10);
+const PING_INTERVAL: Duration = Duration::from_secs(15);
const LOOP_DELAY: Duration = Duration::from_secs(1);
-const PING_TIMEOUT: Duration = Duration::from_secs(5);
-const FAILED_PING_THRESHOLD: usize = 3;
+const PING_TIMEOUT: Duration = Duration::from_secs(10);
+const FAILED_PING_THRESHOLD: usize = 4;
// -- Protocol messages --
@@ -61,11 +61,26 @@ struct PeerInfoInternal {
all_addrs: Vec<SocketAddr>,
state: PeerConnState,
+ last_send_ping: Option<Instant>,
last_seen: Option<Instant>,
ping: VecDeque<Duration>,
failed_pings: usize,
}
+impl PeerInfoInternal {
+ fn new(addr: SocketAddr, state: PeerConnState) -> Self {
+ Self {
+ addr,
+ all_addrs: vec![addr],
+ state,
+ last_send_ping: None,
+ last_seen: None,
+ ping: VecDeque::new(),
+ failed_pings: 0,
+ }
+ }
+}
+
#[derive(Copy, Clone, Debug)]
pub struct PeerInfo {
/// The node's identifier (its public key)
@@ -185,14 +200,7 @@ impl FullMeshPeeringStrategy {
if id != netapp.id {
known_hosts.list.insert(
id,
- PeerInfoInternal {
- addr,
- all_addrs: vec![addr],
- state: PeerConnState::Waiting(0, Instant::now()),
- last_seen: None,
- ping: VecDeque::new(),
- failed_pings: 0,
- },
+ PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
);
}
}
@@ -200,14 +208,7 @@ impl FullMeshPeeringStrategy {
if let Some(addr) = our_addr {
known_hosts.list.insert(
netapp.id,
- PeerInfoInternal {
- addr,
- all_addrs: vec![addr],
- state: PeerConnState::Ourself,
- last_seen: None,
- ping: VecDeque::new(),
- failed_pings: 0,
- },
+ PeerInfoInternal::new(addr, PeerConnState::Ourself),
);
}
@@ -255,7 +256,7 @@ impl FullMeshPeeringStrategy {
trace!("{}, {:?}", hex::encode(&id[..8]), info);
match info.state {
PeerConnState::Connected => {
- let must_ping = match info.last_seen {
+ let must_ping = match info.last_send_ping {
None => true,
Some(t) => Instant::now() - t > PING_INTERVAL,
};
@@ -275,9 +276,16 @@ impl FullMeshPeeringStrategy {
};
// 2. Dispatch ping to hosts
- trace!("to_ping: {} peers", to_retry.len());
- for id in to_ping {
- tokio::spawn(self.clone().ping(id));
+ trace!("to_ping: {} peers", to_ping.len());
+ if !to_ping.is_empty() {
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ for id in to_ping.iter() {
+ known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now());
+ }
+ drop(known_hosts);
+ for id in to_ping {
+ tokio::spawn(self.clone().ping(id));
+ }
}
// 3. Try reconnects
@@ -535,17 +543,9 @@ impl FullMeshPeeringStrategy {
host.all_addrs.push(addr);
}
} else {
- known_hosts.list.insert(
- id,
- PeerInfoInternal {
- state: PeerConnState::Connected,
- addr,
- all_addrs: vec![addr],
- last_seen: None,
- ping: VecDeque::new(),
- failed_pings: 0,
- },
- );
+ known_hosts
+ .list
+ .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
}
}
known_hosts.update_hash();
@@ -570,14 +570,7 @@ impl FullMeshPeeringStrategy {
} else {
PeerConnState::Waiting(0, Instant::now())
};
- PeerInfoInternal {
- addr,
- all_addrs: vec![addr],
- state,
- last_seen: None,
- ping: VecDeque::new(),
- failed_pings: 0,
- }
+ PeerInfoInternal::new(addr, state)
}
}
diff --git a/src/recv.rs b/src/recv.rs
index b5289fb..ac93c4b 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -54,14 +54,15 @@ impl Drop for Sender {
pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
- async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
+ async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error>
where
R: AsyncReadExt + Unpin + Send + Sync,
{
let mut streams: HashMap<RequestID, Sender> = HashMap::new();
loop {
- debug!(
- "Receiving: {:?}",
+ trace!(
+ "recv_loop({}): in_progress = {:?}",
+ debug_name,
streams.iter().map(|(id, _)| id).collect::<Vec<_>>()
);
@@ -87,11 +88,12 @@ pub(crate) trait RecvLoop: Sync + 'static {
let kind = u8_to_io_errorkind(next_slice[0]);
let msg =
std::str::from_utf8(&next_slice[1..]).unwrap_or("<invalid utf8 error message>");
- debug!("recv_loop: got id {}, error {:?}: {}", id, kind, msg);
+ debug!("recv_loop({}): got id {}, error {:?}: {}", debug_name, id, kind, msg);
Some(Err(std::io::Error::new(kind, msg.to_string())))
} else {
trace!(
- "recv_loop: got id {}, size {}, has_cont {}",
+ "recv_loop({}): got id {}, size {}, has_cont {}",
+ debug_name,
id,
size,
has_cont
@@ -107,7 +109,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
send
} else {
let (send, recv) = mpsc::unbounded_channel();
- trace!("recv_loop: id {} is new channel", id);
+ trace!("recv_loop({}): id {} is new channel", debug_name, id);
self.recv_handler(
id,
Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(recv)),
@@ -126,7 +128,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
assert!(!is_error);
streams.insert(id, sender);
} else {
- trace!("recv_loop: close channel id {}", id);
+ trace!("recv_loop({}): close channel id {}", debug_name, id);
sender.end();
}
}
diff --git a/src/send.rs b/src/send.rs
index ea6cf9f..3b01cb5 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -231,6 +231,7 @@ pub(crate) trait SendLoop: Sync {
self: Arc<Self>,
msg_recv: mpsc::UnboundedReceiver<SendStream>,
mut write: BoxStreamWrite<W>,
+ debug_name: String,
) -> Result<(), Error>
where
W: AsyncWriteExt + Unpin + Send + Sync,
@@ -238,8 +239,9 @@ pub(crate) trait SendLoop: Sync {
let mut sending = SendQueue::new();
let mut msg_recv = Some(msg_recv);
while msg_recv.is_some() || !sending.is_empty() {
- debug!(
- "Sending: {:?}",
+ trace!(
+ "send_loop({}): queue = {:?}",
+ debug_name,
sending
.items
.iter()
@@ -262,7 +264,7 @@ pub(crate) trait SendLoop: Sync {
biased; // always read incomming channel first if it has data
sth = recv_fut => {
if let Some((id, prio, order_tag, data)) = sth {
- trace!("send_loop: add stream {} to send", id);
+ trace!("send_loop({}): add stream {} to send", debug_name, id);
sending.push(SendQueueItem {
id,
prio,
@@ -275,7 +277,8 @@ pub(crate) trait SendLoop: Sync {
}
(id, data) = send_fut => {
trace!(
- "send_loop: id {}, send {} bytes, header_size {}",
+ "send_loop({}): id {}, send {} bytes, header_size {}",
+ debug_name,
id,
data.data().len(),
hex::encode(data.header())
diff --git a/src/server.rs b/src/server.rs
index f8c3f98..2c12d9d 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -103,14 +103,17 @@ impl ServerConn {
netapp.connected_as_server(peer_id, conn.clone());
+ let debug_name = format!("SRV {}", hex::encode(&peer_id[..8]));
+ let debug_name_2 = debug_name.clone();
+
let conn2 = conn.clone();
let recv_future = tokio::spawn(async move {
select! {
- r = conn2.recv_loop(read) => r,
+ r = conn2.recv_loop(read, debug_name_2) => r,
_ = await_exit(must_exit) => Ok(())
}
});
- let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write));
+ let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write, debug_name));
recv_future.await.log_err("ServerConn recv_loop");
conn.resp_send.store(None);