From 81b2ff3a4e853ece8a5f2537007f386a9c7f971d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 31 Aug 2022 15:06:21 +0200 Subject: Ping less frequently --- src/peering/fullmesh.rs | 75 ++++++++++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 39 deletions(-) (limited to 'src') diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 012c5a0..b1c94af 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -60,11 +60,26 @@ struct PeerInfoInternal { all_addrs: Vec, state: PeerConnState, + last_send_ping: Option, last_seen: Option, ping: VecDeque, failed_pings: usize, } +impl PeerInfoInternal { + fn new(addr: SocketAddr, state: PeerConnState) -> Self { + Self { + addr, + all_addrs: vec![addr], + state, + last_send_ping: None, + last_seen: None, + ping: VecDeque::new(), + failed_pings: 0, + } + } +} + #[derive(Copy, Clone, Debug)] pub struct PeerInfo { /// The node's identifier (its public key) @@ -184,14 +199,7 @@ impl FullMeshPeeringStrategy { if id != netapp.id { known_hosts.list.insert( id, - PeerInfoInternal { - addr, - all_addrs: vec![addr], - state: PeerConnState::Waiting(0, Instant::now()), - last_seen: None, - ping: VecDeque::new(), - failed_pings: 0, - }, + PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())), ); } } @@ -199,14 +207,7 @@ impl FullMeshPeeringStrategy { if let Some(addr) = our_addr { known_hosts.list.insert( netapp.id, - PeerInfoInternal { - addr, - all_addrs: vec![addr], - state: PeerConnState::Ourself, - last_seen: None, - ping: VecDeque::new(), - failed_pings: 0, - }, + PeerInfoInternal::new(addr, PeerConnState::Ourself), ); } @@ -258,7 +259,11 @@ impl FullMeshPeeringStrategy { None => true, Some(t) => Instant::now() - t > PING_INTERVAL, }; - if must_ping { + let pinged_recently = match info.last_send_ping { + None => false, + Some(t) => Instant::now() - t < PING_TIMEOUT, + }; + if must_ping && !pinged_recently { to_ping.push(*id); } } @@ -274,9 +279,16 @@ impl FullMeshPeeringStrategy { }; // 2. Dispatch ping to hosts - trace!("to_ping: {} peers", to_retry.len()); - for id in to_ping { - tokio::spawn(self.clone().ping(id)); + trace!("to_ping: {} peers", to_ping.len()); + if !to_ping.is_empty() { + let mut known_hosts = self.known_hosts.write().unwrap(); + for id in to_ping.iter() { + known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now()); + } + drop(known_hosts); + for id in to_ping { + tokio::spawn(self.clone().ping(id)); + } } // 3. Try reconnects @@ -534,17 +546,9 @@ impl FullMeshPeeringStrategy { host.all_addrs.push(addr); } } else { - known_hosts.list.insert( - id, - PeerInfoInternal { - state: PeerConnState::Connected, - addr, - all_addrs: vec![addr], - last_seen: None, - ping: VecDeque::new(), - failed_pings: 0, - }, - ); + known_hosts + .list + .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected)); } } known_hosts.update_hash(); @@ -569,14 +573,7 @@ impl FullMeshPeeringStrategy { } else { PeerConnState::Waiting(0, Instant::now()) }; - PeerInfoInternal { - addr, - all_addrs: vec![addr], - state, - last_seen: None, - ping: VecDeque::new(), - failed_pings: 0, - } + PeerInfoInternal::new(addr, state) } } -- cgit v1.2.3 From 700f783956697ef9d5aff4d904167f50367409e9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 31 Aug 2022 15:08:51 +0200 Subject: Add dump of sending queue --- src/proto.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'src') diff --git a/src/proto.rs b/src/proto.rs index e843bff..56afede 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; +use std::fmt::Write; use log::trace; @@ -94,6 +95,15 @@ impl SendQueue { fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) } + fn dump(&self) -> String { + let mut ret = String::new(); + for (prio, q) in self.items.iter() { + for item in q.iter() { + write!(&mut ret, " [{} {} ({})]", prio, item.data.len() - item.cursor, item.id).unwrap(); + } + } + ret + } } /// The SendLoop trait, which is implemented both by the client and the server @@ -117,6 +127,7 @@ pub(crate) trait SendLoop: Sync { let mut sending = SendQueue::new(); let mut should_exit = false; while !should_exit || !sending.is_empty() { + trace!("send_loop: queue = {}", sending.dump()); if let Ok((id, prio, data)) = msg_recv.try_recv() { trace!("send_loop: got {}, {} bytes", id, data.len()); sending.push(SendQueueItem { -- cgit v1.2.3 From 01db3c43193d9a24ef4cd1a1b1993f4d44a9aa9b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 31 Aug 2022 15:58:05 +0200 Subject: add debug_name in proto to differenciate messages --- src/client.rs | 7 +++++-- src/proto.rs | 24 +++++++++++++++--------- src/server.rs | 7 +++++-- 3 files changed, 25 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/client.rs b/src/client.rs index 8227e8f..5c5a05b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -102,13 +102,16 @@ impl ClientConn { netapp.connected_as_client(peer_id, conn.clone()); + let debug_name = format!("CLI {}", hex::encode(&peer_id[..8])); + tokio::spawn(async move { - let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write)); + let debug_name_2 = debug_name.clone(); + let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write, debug_name_2)); let conn2 = conn.clone(); let recv_future = tokio::spawn(async move { select! { - r = conn2.recv_loop(read) => r, + r = conn2.recv_loop(read, debug_name) => r, _ = await_exit(stop_recv_loop_recv) => Ok(()) } }); diff --git a/src/proto.rs b/src/proto.rs index 56afede..41a2e47 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -120,6 +120,7 @@ pub(crate) trait SendLoop: Sync { self: Arc, mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec)>, mut write: BoxStreamWrite, + debug_name: String, ) -> Result<(), Error> where W: AsyncWriteExt + Unpin + Send + Sync, @@ -127,9 +128,9 @@ pub(crate) trait SendLoop: Sync { let mut sending = SendQueue::new(); let mut should_exit = false; while !should_exit || !sending.is_empty() { - trace!("send_loop: queue = {}", sending.dump()); + trace!("send_loop({}): queue = {}", debug_name, sending.dump()); if let Ok((id, prio, data)) = msg_recv.try_recv() { - trace!("send_loop: got {}, {} bytes", id, data.len()); + trace!("send_loop({}): got {}, {} bytes", debug_name, id, data.len()); sending.push(SendQueueItem { id, prio, @@ -138,7 +139,8 @@ pub(crate) trait SendLoop: Sync { }); } else if let Some(mut item) = sending.pop() { trace!( - "send_loop: sending bytes for {} ({} bytes, {} already sent)", + "send_loop({}): sending bytes for {} ({} bytes, {} already sent)", + debug_name, item.id, item.data.len(), item.cursor @@ -168,7 +170,7 @@ pub(crate) trait SendLoop: Sync { } else { let sth = msg_recv.recv().await; if let Some((id, prio, data)) = sth { - trace!("send_loop: got {}, {} bytes", id, data.len()); + trace!("send_loop({}): got {}, {} bytes", debug_name, id, data.len()); sending.push(SendQueueItem { id, prio, @@ -197,13 +199,17 @@ pub(crate) trait SendLoop: Sync { pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc, id: RequestID, msg: Vec); - async fn recv_loop(self: Arc, mut read: R) -> Result<(), Error> + async fn recv_loop( + self: Arc, + mut read: R, + debug_name: String, + ) -> Result<(), Error> where R: AsyncReadExt + Unpin + Send + Sync, { let mut receiving = HashMap::new(); loop { - trace!("recv_loop: reading packet"); + trace!("recv_loop({}): reading packet", debug_name); let mut header_id = [0u8; RequestID::BITS as usize / 8]; match read.read_exact(&mut header_id[..]).await { Ok(_) => (), @@ -211,19 +217,19 @@ pub(crate) trait RecvLoop: Sync + 'static { Err(e) => return Err(e.into()), }; let id = RequestID::from_be_bytes(header_id); - trace!("recv_loop: got header id: {:04x}", id); + trace!("recv_loop({}): got header id: {:04x}", debug_name, id); let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; read.read_exact(&mut header_size[..]).await?; let size = ChunkLength::from_be_bytes(header_size); - trace!("recv_loop: got header size: {:04x}", size); + trace!("recv_loop({}): got header size: {:04x}", debug_name, size); let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let size = size & !CHUNK_HAS_CONTINUATION; let mut next_slice = vec![0; size as usize]; read.read_exact(&mut next_slice[..]).await?; - trace!("recv_loop: read {} bytes", next_slice.len()); + trace!("recv_loop({}): read {} bytes", debug_name, next_slice.len()); let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default(); msg_bytes.extend_from_slice(&next_slice[..]); diff --git a/src/server.rs b/src/server.rs index 5465307..9bcf2a6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -105,14 +105,17 @@ impl ServerConn { netapp.connected_as_server(peer_id, conn.clone()); + let debug_name = format!("SVR {}", hex::encode(&peer_id[..8])); + let debug_name_2 = debug_name.clone(); + let conn2 = conn.clone(); let recv_future = tokio::spawn(async move { select! { - r = conn2.recv_loop(read) => r, + r = conn2.recv_loop(read, debug_name_2) => r, _ = await_exit(must_exit) => Ok(()) } }); - let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write)); + let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write, debug_name)); recv_future.await.log_err("ServerConn recv_loop"); conn.resp_send.store(None); -- cgit v1.2.3 From 984ba65e658a920f2d8e67ce808e9acd257c4d86 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 31 Aug 2022 16:10:14 +0200 Subject: Better messages in proto.rs --- src/proto.rs | 43 +++++++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/proto.rs b/src/proto.rs index 41a2e47..8f7e70f 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; use std::fmt::Write; +use std::sync::Arc; use log::trace; @@ -99,7 +99,14 @@ impl SendQueue { let mut ret = String::new(); for (prio, q) in self.items.iter() { for item in q.iter() { - write!(&mut ret, " [{} {} ({})]", prio, item.data.len() - item.cursor, item.id).unwrap(); + write!( + &mut ret, + " [{} {} ({})]", + prio, + item.data.len() - item.cursor, + item.id + ) + .unwrap(); } } ret @@ -130,7 +137,13 @@ pub(crate) trait SendLoop: Sync { while !should_exit || !sending.is_empty() { trace!("send_loop({}): queue = {}", debug_name, sending.dump()); if let Ok((id, prio, data)) = msg_recv.try_recv() { - trace!("send_loop({}): got {}, {} bytes", debug_name, id, data.len()); + trace!( + "send_loop({}): new message to send, id = {}, prio = {}, {} bytes", + debug_name, + id, + prio, + data.len() + ); sending.push(SendQueueItem { id, prio, @@ -170,7 +183,13 @@ pub(crate) trait SendLoop: Sync { } else { let sth = msg_recv.recv().await; if let Some((id, prio, data)) = sth { - trace!("send_loop({}): got {}, {} bytes", debug_name, id, data.len()); + trace!( + "send_loop({}): new message to send, id = {}, prio = {}, {} bytes", + debug_name, + id, + prio, + data.len() + ); sending.push(SendQueueItem { id, prio, @@ -199,17 +218,12 @@ pub(crate) trait SendLoop: Sync { pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc, id: RequestID, msg: Vec); - async fn recv_loop( - self: Arc, - mut read: R, - debug_name: String, - ) -> Result<(), Error> + async fn recv_loop(self: Arc, mut read: R, debug_name: String) -> Result<(), Error> where R: AsyncReadExt + Unpin + Send + Sync, { let mut receiving = HashMap::new(); loop { - trace!("recv_loop({}): reading packet", debug_name); let mut header_id = [0u8; RequestID::BITS as usize / 8]; match read.read_exact(&mut header_id[..]).await { Ok(_) => (), @@ -217,12 +231,17 @@ pub(crate) trait RecvLoop: Sync + 'static { Err(e) => return Err(e.into()), }; let id = RequestID::from_be_bytes(header_id); - trace!("recv_loop({}): got header id: {:04x}", debug_name, id); let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; read.read_exact(&mut header_size[..]).await?; let size = ChunkLength::from_be_bytes(header_size); - trace!("recv_loop({}): got header size: {:04x}", debug_name, size); + trace!( + "recv_loop({}): got header id = {}, size = 0x{:04x} ({} bytes)", + debug_name, + id, + size, + size & !CHUNK_HAS_CONTINUATION + ); let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let size = size & !CHUNK_HAS_CONTINUATION; -- cgit v1.2.3 From 77036597425c24a4e618974dca8eaf9e916e9570 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 31 Aug 2022 16:25:36 +0200 Subject: Be more lenient on pings --- src/peering/fullmesh.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index b1c94af..208cfe4 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -22,10 +22,10 @@ use crate::NodeID; const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); const CONN_MAX_RETRIES: usize = 10; -const PING_INTERVAL: Duration = Duration::from_secs(10); +const PING_INTERVAL: Duration = Duration::from_secs(15); const LOOP_DELAY: Duration = Duration::from_secs(1); -const PING_TIMEOUT: Duration = Duration::from_secs(5); -const FAILED_PING_THRESHOLD: usize = 3; +const PING_TIMEOUT: Duration = Duration::from_secs(10); +const FAILED_PING_THRESHOLD: usize = 4; // -- Protocol messages -- @@ -255,15 +255,11 @@ impl FullMeshPeeringStrategy { trace!("{}, {:?}", hex::encode(&id[..8]), info); match info.state { PeerConnState::Connected => { - let must_ping = match info.last_seen { + let must_ping = match info.last_send_ping { None => true, Some(t) => Instant::now() - t > PING_INTERVAL, }; - let pinged_recently = match info.last_send_ping { - None => false, - Some(t) => Instant::now() - t < PING_TIMEOUT, - }; - if must_ping && !pinged_recently { + if must_ping { to_ping.push(*id); } } -- cgit v1.2.3 From d75146fb8157dd03c156e5f7ce4834fa1d72b581 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 31 Aug 2022 17:04:45 +0200 Subject: SVR -> SRV --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/server.rs b/src/server.rs index 9bcf2a6..a835959 100644 --- a/src/server.rs +++ b/src/server.rs @@ -105,7 +105,7 @@ impl ServerConn { netapp.connected_as_server(peer_id, conn.clone()); - let debug_name = format!("SVR {}", hex::encode(&peer_id[..8])); + let debug_name = format!("SRV {}", hex::encode(&peer_id[..8])); let debug_name_2 = debug_name.clone(); let conn2 = conn.clone(); -- cgit v1.2.3