From 125c662860621f9c834e254d62b29b5d5ace5dd4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Feb 2024 10:04:46 +0100 Subject: [import-netapp] move and rename FullMeshPeeringSrategy to PeeringManager --- src/net/peering.rs | 614 ++++++++++++++++++++++++++++++++++++++++++++ src/net/peering/fullmesh.rs | 613 ------------------------------------------- src/net/peering/mod.rs | 1 - src/net/test.rs | 6 +- src/rpc/rpc_helper.rs | 12 +- src/rpc/system.rs | 22 +- 6 files changed, 634 insertions(+), 634 deletions(-) create mode 100644 src/net/peering.rs delete mode 100644 src/net/peering/fullmesh.rs delete mode 100644 src/net/peering/mod.rs diff --git a/src/net/peering.rs b/src/net/peering.rs new file mode 100644 index 00000000..32199cf8 --- /dev/null +++ b/src/net/peering.rs @@ -0,0 +1,614 @@ +use std::collections::{HashMap, VecDeque}; +use std::net::SocketAddr; +use std::sync::atomic::{self, AtomicU64}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use log::{debug, info, trace, warn}; +use serde::{Deserialize, Serialize}; + +use tokio::select; +use tokio::sync::watch; + +use sodiumoxide::crypto::hash; + +use crate::endpoint::*; +use crate::error::*; +use crate::netapp::*; + +use crate::message::*; +use crate::NodeID; + +const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); +const CONN_MAX_RETRIES: usize = 10; +const PING_INTERVAL: Duration = Duration::from_secs(15); +const LOOP_DELAY: Duration = Duration::from_secs(1); +const FAILED_PING_THRESHOLD: usize = 4; + +const DEFAULT_PING_TIMEOUT_MILLIS: u64 = 10_000; + +// -- Protocol messages -- + +#[derive(Serialize, Deserialize)] +struct PingMessage { + pub id: u64, + pub peer_list_hash: hash::Digest, +} + +impl Message for PingMessage { + type Response = PingMessage; +} + +#[derive(Serialize, Deserialize)] +struct PeerListMessage { + pub list: Vec<(NodeID, SocketAddr)>, +} + +impl Message for PeerListMessage { + type Response = PeerListMessage; +} + +// -- Algorithm data structures -- + +#[derive(Debug)] +struct PeerInfoInternal { + // addr is the currently connected address, + // or the last address we were connected to, + // or an arbitrary address some other peer gave us + addr: SocketAddr, + // all_addrs contains all of the addresses everyone gave us + all_addrs: Vec, + + state: PeerConnState, + last_send_ping: Option, + last_seen: Option, + ping: VecDeque, + failed_pings: usize, +} + +impl PeerInfoInternal { + fn new(addr: SocketAddr, state: PeerConnState) -> Self { + Self { + addr, + all_addrs: vec![addr], + state, + last_send_ping: None, + last_seen: None, + ping: VecDeque::new(), + failed_pings: 0, + } + } +} + +/// Information that the full mesh peering strategy can return about the peers it knows of +#[derive(Copy, Clone, Debug)] +pub struct PeerInfo { + /// The node's identifier (its public key) + pub id: NodeID, + /// The node's network address + pub addr: SocketAddr, + /// The current status of our connection to this node + pub state: PeerConnState, + /// The last time at which the node was seen + pub last_seen: Option, + /// The average ping to this node on recent observations (if at least one ping value is known) + pub avg_ping: Option, + /// The maximum observed ping to this node on recent observations (if at least one + /// ping value is known) + pub max_ping: Option, + /// The median ping to this node on recent observations (if at least one ping value + /// is known) + pub med_ping: Option, +} + +impl PeerInfo { + /// Returns true if we can currently send requests to this peer + pub fn is_up(&self) -> bool { + self.state.is_up() + } +} + +/// PeerConnState: possible states for our tentative connections to given peer +/// This structure is only interested in recording connection info for outgoing +/// TCP connections +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PeerConnState { + /// This entry represents ourself (the local node) + Ourself, + + /// We currently have a connection to this peer + Connected, + + /// Our next connection tentative (the nth, where n is the first value of the tuple) + /// will be at given Instant + Waiting(usize, Instant), + + /// A connection tentative is in progress (the nth, where n is the value stored) + Trying(usize), + + /// We abandonned trying to connect to this peer (too many failed attempts) + Abandonned, +} + +impl PeerConnState { + /// Returns true if we can currently send requests to this peer + pub fn is_up(&self) -> bool { + matches!(self, Self::Ourself | Self::Connected) + } +} + +struct KnownHosts { + list: HashMap, + hash: hash::Digest, +} + +impl KnownHosts { + fn new() -> Self { + let list = HashMap::new(); + let hash = Self::calculate_hash(&list); + Self { list, hash } + } + fn update_hash(&mut self) { + self.hash = Self::calculate_hash(&self.list); + } + fn map_into_vec(input: &HashMap) -> Vec<(NodeID, SocketAddr)> { + let mut list = Vec::with_capacity(input.len()); + for (id, peer) in input.iter() { + if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { + list.push((*id, peer.addr)); + } + } + list + } + fn calculate_hash(input: &HashMap) -> hash::Digest { + let mut list = Self::map_into_vec(input); + list.sort(); + let mut hash_state = hash::State::new(); + for (id, addr) in list { + hash_state.update(&id[..]); + hash_state.update(&format!("{}\n", addr).into_bytes()[..]); + } + hash_state.finalize() + } +} + +/// A "Full Mesh" peering strategy is a peering strategy that tries +/// to establish and maintain a direct connection with all of the +/// known nodes in the network. +pub struct PeeringManager { + netapp: Arc, + known_hosts: RwLock, + public_peer_list: ArcSwap>, + + next_ping_id: AtomicU64, + ping_endpoint: Arc>, + peer_list_endpoint: Arc>, + + ping_timeout_millis: AtomicU64, +} + +impl PeeringManager { + /// Create a new Full Mesh peering strategy. + /// The strategy will not be run until `.run()` is called and awaited. + /// Once that happens, the peering strategy will try to connect + /// to all of the nodes specified in the bootstrap list. + pub fn new( + netapp: Arc, + bootstrap_list: Vec<(NodeID, SocketAddr)>, + our_addr: Option, + ) -> Arc { + let mut known_hosts = KnownHosts::new(); + for (id, addr) in bootstrap_list { + if id != netapp.id { + known_hosts.list.insert( + id, + PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())), + ); + } + } + + if let Some(addr) = our_addr { + known_hosts.list.insert( + netapp.id, + PeerInfoInternal::new(addr, PeerConnState::Ourself), + ); + } + + // TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility) + let strat = Arc::new(Self { + netapp: netapp.clone(), + known_hosts: RwLock::new(known_hosts), + public_peer_list: ArcSwap::new(Arc::new(Vec::new())), + next_ping_id: AtomicU64::new(42), + ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()), + peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()), + ping_timeout_millis: DEFAULT_PING_TIMEOUT_MILLIS.into(), + }); + + strat.update_public_peer_list(&strat.known_hosts.read().unwrap()); + + strat.ping_endpoint.set_handler(strat.clone()); + strat.peer_list_endpoint.set_handler(strat.clone()); + + let strat2 = strat.clone(); + netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { + let strat2 = strat2.clone(); + strat2.on_connected(id, addr, is_incoming); + }); + + let strat2 = strat.clone(); + netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { + let strat2 = strat2.clone(); + strat2.on_disconnected(id, is_incoming); + }); + + strat + } + + /// Run the full mesh peering strategy. + /// This future exits when the `must_exit` watch becomes true. + pub async fn run(self: Arc, must_exit: watch::Receiver) { + while !*must_exit.borrow() { + // 1. Read current state: get list of connected peers (ping them) + let (to_ping, to_retry) = { + let known_hosts = self.known_hosts.read().unwrap(); + trace!("known_hosts: {} peers", known_hosts.list.len()); + + let mut to_ping = vec![]; + let mut to_retry = vec![]; + for (id, info) in known_hosts.list.iter() { + trace!("{}, {:?}", hex::encode(&id[..8]), info); + match info.state { + PeerConnState::Connected => { + let must_ping = match info.last_send_ping { + None => true, + Some(t) => Instant::now() - t > PING_INTERVAL, + }; + if must_ping { + to_ping.push(*id); + } + } + PeerConnState::Waiting(_, t) => { + if Instant::now() >= t { + to_retry.push(*id); + } + } + _ => (), + } + } + (to_ping, to_retry) + }; + + // 2. Dispatch ping to hosts + trace!("to_ping: {} peers", to_ping.len()); + if !to_ping.is_empty() { + let mut known_hosts = self.known_hosts.write().unwrap(); + for id in to_ping.iter() { + known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now()); + } + drop(known_hosts); + for id in to_ping { + tokio::spawn(self.clone().ping(id)); + } + } + + // 3. Try reconnects + trace!("to_retry: {} peers", to_retry.len()); + if !to_retry.is_empty() { + let mut known_hosts = self.known_hosts.write().unwrap(); + for id in to_retry { + if let Some(h) = known_hosts.list.get_mut(&id) { + if let PeerConnState::Waiting(i, _) = h.state { + info!( + "Retrying connection to {} at {} ({})", + hex::encode(&id[..8]), + h.all_addrs + .iter() + .map(|x| format!("{}", x)) + .collect::>() + .join(", "), + i + 1 + ); + h.state = PeerConnState::Trying(i); + + let alternate_addrs = h + .all_addrs + .iter() + .filter(|x| **x != h.addr) + .cloned() + .collect::>(); + tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs)); + } + } + } + self.update_public_peer_list(&known_hosts); + } + + // 4. Sleep before next loop iteration + tokio::time::sleep(LOOP_DELAY).await; + } + } + + /// Returns a list of currently known peers in the network. + pub fn get_peer_list(&self) -> Arc> { + self.public_peer_list.load_full() + } + + /// Set the timeout for ping messages, in milliseconds + pub fn set_ping_timeout_millis(&self, timeout: u64) { + self.ping_timeout_millis + .store(timeout, atomic::Ordering::Relaxed); + } + + // -- internal stuff -- + + fn update_public_peer_list(&self, known_hosts: &KnownHosts) { + let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len()); + for (id, info) in known_hosts.list.iter() { + let mut pings = info.ping.iter().cloned().collect::>(); + pings.sort(); + if !pings.is_empty() { + pub_peer_list.push(PeerInfo { + id: *id, + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: Some( + pings + .iter() + .fold(Duration::from_secs(0), |x, y| x + *y) + .div_f64(pings.len() as f64), + ), + max_ping: pings.last().cloned(), + med_ping: Some(pings[pings.len() / 2]), + }); + } else { + pub_peer_list.push(PeerInfo { + id: *id, + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: None, + max_ping: None, + med_ping: None, + }); + } + } + self.public_peer_list.store(Arc::new(pub_peer_list)); + } + + async fn ping(self: Arc, id: NodeID) { + let peer_list_hash = self.known_hosts.read().unwrap().hash; + let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); + let ping_time = Instant::now(); + let ping_timeout = + Duration::from_millis(self.ping_timeout_millis.load(atomic::Ordering::Relaxed)); + let ping_msg = PingMessage { + id: ping_id, + peer_list_hash, + }; + + debug!( + "Sending ping {} to {} at {:?}", + ping_id, + hex::encode(&id[..8]), + ping_time + ); + let ping_response = select! { + r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r, + _ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())), + }; + + match ping_response { + Err(e) => { + warn!("Error pinging {}: {}", hex::encode(&id[..8]), e); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.failed_pings += 1; + if host.failed_pings > FAILED_PING_THRESHOLD { + warn!( + "Too many failed pings from {}, closing connection.", + hex::encode(&id[..8]) + ); + // this will later update info in known_hosts + // through the disconnection handler + self.netapp.disconnect(&id); + } + } + } + Ok(ping_resp) => { + let resp_time = Instant::now(); + debug!( + "Got ping response from {} at {:?}", + hex::encode(&id[..8]), + resp_time + ); + { + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.failed_pings = 0; + host.last_seen = Some(resp_time); + host.ping.push_back(resp_time - ping_time); + while host.ping.len() > 10 { + host.ping.pop_front(); + } + self.update_public_peer_list(&known_hosts); + } + } + if ping_resp.peer_list_hash != peer_list_hash { + self.exchange_peers(&id).await; + } + } + } + } + + async fn exchange_peers(self: Arc, id: &NodeID) { + let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); + let pex_message = PeerListMessage { list: peer_list }; + match self + .peer_list_endpoint + .call(id, pex_message, PRIO_BACKGROUND) + .await + { + Err(e) => warn!("Error doing peer exchange: {}", e), + Ok(resp) => { + self.handle_peer_list(&resp.list[..]); + } + } + } + + fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) { + let mut known_hosts = self.known_hosts.write().unwrap(); + + let mut changed = false; + for (id, addr) in list.iter() { + if let Some(kh) = known_hosts.list.get_mut(id) { + if !kh.all_addrs.contains(addr) { + kh.all_addrs.push(*addr); + changed = true; + } + } else { + known_hosts.list.insert(*id, self.new_peer(id, *addr)); + changed = true; + } + } + + if changed { + known_hosts.update_hash(); + self.update_public_peer_list(&known_hosts); + } + } + + async fn try_connect( + self: Arc, + id: NodeID, + default_addr: SocketAddr, + alternate_addrs: Vec, + ) { + let conn_addr = { + let mut ret = None; + for addr in [default_addr].iter().chain(alternate_addrs.iter()) { + debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8])); + match self.netapp.clone().try_connect(*addr, id).await { + Ok(()) => { + ret = Some(*addr); + break; + } + Err(e) => { + debug!( + "Error connecting to {} at {}: {}", + hex::encode(&id[..8]), + addr, + e + ); + } + } + } + ret + }; + + if let Some(ok_addr) = conn_addr { + self.on_connected(id, ok_addr, false); + } else { + warn!( + "Could not connect to peer {} ({} addresses tried)", + hex::encode(&id[..8]), + 1 + alternate_addrs.len() + ); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.state = match host.state { + PeerConnState::Trying(i) => { + if i >= CONN_MAX_RETRIES { + PeerConnState::Abandonned + } else { + PeerConnState::Waiting(i + 1, Instant::now() + CONN_RETRY_INTERVAL) + } + } + _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL), + }; + self.update_public_peer_list(&known_hosts); + } + } + } + + fn on_connected(self: Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { + let mut known_hosts = self.known_hosts.write().unwrap(); + if is_incoming { + if let Some(host) = known_hosts.list.get_mut(&id) { + if !host.all_addrs.contains(&addr) { + host.all_addrs.push(addr); + } + } else { + known_hosts.list.insert(id, self.new_peer(&id, addr)); + } + } else { + info!( + "Successfully connected to {} at {}", + hex::encode(&id[..8]), + addr + ); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.state = PeerConnState::Connected; + host.addr = addr; + if !host.all_addrs.contains(&addr) { + host.all_addrs.push(addr); + } + } else { + known_hosts + .list + .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected)); + } + } + known_hosts.update_hash(); + self.update_public_peer_list(&known_hosts); + } + + fn on_disconnected(self: Arc, id: NodeID, is_incoming: bool) { + if !is_incoming { + info!("Connection to {} was closed", hex::encode(&id[..8])); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.state = PeerConnState::Waiting(0, Instant::now()); + known_hosts.update_hash(); + self.update_public_peer_list(&known_hosts); + } + } + } + + fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal { + let state = if *id == self.netapp.id { + PeerConnState::Ourself + } else { + PeerConnState::Waiting(0, Instant::now()) + }; + PeerInfoInternal::new(addr, state) + } +} + +#[async_trait] +impl EndpointHandler for PeeringManager { + async fn handle(self: &Arc, ping: &PingMessage, from: NodeID) -> PingMessage { + let ping_resp = PingMessage { + id: ping.id, + peer_list_hash: self.known_hosts.read().unwrap().hash, + }; + debug!("Ping from {}", hex::encode(&from[..8])); + ping_resp + } +} + +#[async_trait] +impl EndpointHandler for PeeringManager { + async fn handle( + self: &Arc, + peer_list: &PeerListMessage, + _from: NodeID, + ) -> PeerListMessage { + self.handle_peer_list(&peer_list.list[..]); + let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); + PeerListMessage { list: peer_list } + } +} diff --git a/src/net/peering/fullmesh.rs b/src/net/peering/fullmesh.rs deleted file mode 100644 index 8e666044..00000000 --- a/src/net/peering/fullmesh.rs +++ /dev/null @@ -1,613 +0,0 @@ -use std::collections::{HashMap, VecDeque}; -use std::net::SocketAddr; -use std::sync::atomic::{self, AtomicU64}; -use std::sync::{Arc, RwLock}; -use std::time::{Duration, Instant}; - -use arc_swap::ArcSwap; -use async_trait::async_trait; -use log::{debug, info, trace, warn}; -use serde::{Deserialize, Serialize}; - -use tokio::select; -use tokio::sync::watch; - -use sodiumoxide::crypto::hash; - -use crate::endpoint::*; -use crate::error::*; -use crate::netapp::*; - -use crate::message::*; -use crate::NodeID; - -const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); -const CONN_MAX_RETRIES: usize = 10; -const PING_INTERVAL: Duration = Duration::from_secs(15); -const LOOP_DELAY: Duration = Duration::from_secs(1); -const FAILED_PING_THRESHOLD: usize = 4; - -const DEFAULT_PING_TIMEOUT_MILLIS: u64 = 10_000; - -// -- Protocol messages -- - -#[derive(Serialize, Deserialize)] -struct PingMessage { - pub id: u64, - pub peer_list_hash: hash::Digest, -} - -impl Message for PingMessage { - type Response = PingMessage; -} - -#[derive(Serialize, Deserialize)] -struct PeerListMessage { - pub list: Vec<(NodeID, SocketAddr)>, -} - -impl Message for PeerListMessage { - type Response = PeerListMessage; -} - -// -- Algorithm data structures -- - -#[derive(Debug)] -struct PeerInfoInternal { - // addr is the currently connected address, - // or the last address we were connected to, - // or an arbitrary address some other peer gave us - addr: SocketAddr, - // all_addrs contains all of the addresses everyone gave us - all_addrs: Vec, - - state: PeerConnState, - last_send_ping: Option, - last_seen: Option, - ping: VecDeque, - failed_pings: usize, -} - -impl PeerInfoInternal { - fn new(addr: SocketAddr, state: PeerConnState) -> Self { - Self { - addr, - all_addrs: vec![addr], - state, - last_send_ping: None, - last_seen: None, - ping: VecDeque::new(), - failed_pings: 0, - } - } -} - -/// Information that the full mesh peering strategy can return about the peers it knows of -#[derive(Copy, Clone, Debug)] -pub struct PeerInfo { - /// The node's identifier (its public key) - pub id: NodeID, - /// The node's network address - pub addr: SocketAddr, - /// The current status of our connection to this node - pub state: PeerConnState, - /// The last time at which the node was seen - pub last_seen: Option, - /// The average ping to this node on recent observations (if at least one ping value is known) - pub avg_ping: Option, - /// The maximum observed ping to this node on recent observations (if at least one - /// ping value is known) - pub max_ping: Option, - /// The median ping to this node on recent observations (if at least one ping value - /// is known) - pub med_ping: Option, -} - -impl PeerInfo { - /// Returns true if we can currently send requests to this peer - pub fn is_up(&self) -> bool { - self.state.is_up() - } -} - -/// PeerConnState: possible states for our tentative connections to given peer -/// This structure is only interested in recording connection info for outgoing -/// TCP connections -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum PeerConnState { - /// This entry represents ourself (the local node) - Ourself, - - /// We currently have a connection to this peer - Connected, - - /// Our next connection tentative (the nth, where n is the first value of the tuple) - /// will be at given Instant - Waiting(usize, Instant), - - /// A connection tentative is in progress (the nth, where n is the value stored) - Trying(usize), - - /// We abandonned trying to connect to this peer (too many failed attempts) - Abandonned, -} - -impl PeerConnState { - /// Returns true if we can currently send requests to this peer - pub fn is_up(&self) -> bool { - matches!(self, Self::Ourself | Self::Connected) - } -} - -struct KnownHosts { - list: HashMap, - hash: hash::Digest, -} - -impl KnownHosts { - fn new() -> Self { - let list = HashMap::new(); - let hash = Self::calculate_hash(&list); - Self { list, hash } - } - fn update_hash(&mut self) { - self.hash = Self::calculate_hash(&self.list); - } - fn map_into_vec(input: &HashMap) -> Vec<(NodeID, SocketAddr)> { - let mut list = Vec::with_capacity(input.len()); - for (id, peer) in input.iter() { - if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { - list.push((*id, peer.addr)); - } - } - list - } - fn calculate_hash(input: &HashMap) -> hash::Digest { - let mut list = Self::map_into_vec(input); - list.sort(); - let mut hash_state = hash::State::new(); - for (id, addr) in list { - hash_state.update(&id[..]); - hash_state.update(&format!("{}\n", addr).into_bytes()[..]); - } - hash_state.finalize() - } -} - -/// A "Full Mesh" peering strategy is a peering strategy that tries -/// to establish and maintain a direct connection with all of the -/// known nodes in the network. -pub struct FullMeshPeeringStrategy { - netapp: Arc, - known_hosts: RwLock, - public_peer_list: ArcSwap>, - - next_ping_id: AtomicU64, - ping_endpoint: Arc>, - peer_list_endpoint: Arc>, - - ping_timeout_millis: AtomicU64, -} - -impl FullMeshPeeringStrategy { - /// Create a new Full Mesh peering strategy. - /// The strategy will not be run until `.run()` is called and awaited. - /// Once that happens, the peering strategy will try to connect - /// to all of the nodes specified in the bootstrap list. - pub fn new( - netapp: Arc, - bootstrap_list: Vec<(NodeID, SocketAddr)>, - our_addr: Option, - ) -> Arc { - let mut known_hosts = KnownHosts::new(); - for (id, addr) in bootstrap_list { - if id != netapp.id { - known_hosts.list.insert( - id, - PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())), - ); - } - } - - if let Some(addr) = our_addr { - known_hosts.list.insert( - netapp.id, - PeerInfoInternal::new(addr, PeerConnState::Ourself), - ); - } - - let strat = Arc::new(Self { - netapp: netapp.clone(), - known_hosts: RwLock::new(known_hosts), - public_peer_list: ArcSwap::new(Arc::new(Vec::new())), - next_ping_id: AtomicU64::new(42), - ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()), - peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()), - ping_timeout_millis: DEFAULT_PING_TIMEOUT_MILLIS.into(), - }); - - strat.update_public_peer_list(&strat.known_hosts.read().unwrap()); - - strat.ping_endpoint.set_handler(strat.clone()); - strat.peer_list_endpoint.set_handler(strat.clone()); - - let strat2 = strat.clone(); - netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { - let strat2 = strat2.clone(); - strat2.on_connected(id, addr, is_incoming); - }); - - let strat2 = strat.clone(); - netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { - let strat2 = strat2.clone(); - strat2.on_disconnected(id, is_incoming); - }); - - strat - } - - /// Run the full mesh peering strategy. - /// This future exits when the `must_exit` watch becomes true. - pub async fn run(self: Arc, must_exit: watch::Receiver) { - while !*must_exit.borrow() { - // 1. Read current state: get list of connected peers (ping them) - let (to_ping, to_retry) = { - let known_hosts = self.known_hosts.read().unwrap(); - trace!("known_hosts: {} peers", known_hosts.list.len()); - - let mut to_ping = vec![]; - let mut to_retry = vec![]; - for (id, info) in known_hosts.list.iter() { - trace!("{}, {:?}", hex::encode(&id[..8]), info); - match info.state { - PeerConnState::Connected => { - let must_ping = match info.last_send_ping { - None => true, - Some(t) => Instant::now() - t > PING_INTERVAL, - }; - if must_ping { - to_ping.push(*id); - } - } - PeerConnState::Waiting(_, t) => { - if Instant::now() >= t { - to_retry.push(*id); - } - } - _ => (), - } - } - (to_ping, to_retry) - }; - - // 2. Dispatch ping to hosts - trace!("to_ping: {} peers", to_ping.len()); - if !to_ping.is_empty() { - let mut known_hosts = self.known_hosts.write().unwrap(); - for id in to_ping.iter() { - known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now()); - } - drop(known_hosts); - for id in to_ping { - tokio::spawn(self.clone().ping(id)); - } - } - - // 3. Try reconnects - trace!("to_retry: {} peers", to_retry.len()); - if !to_retry.is_empty() { - let mut known_hosts = self.known_hosts.write().unwrap(); - for id in to_retry { - if let Some(h) = known_hosts.list.get_mut(&id) { - if let PeerConnState::Waiting(i, _) = h.state { - info!( - "Retrying connection to {} at {} ({})", - hex::encode(&id[..8]), - h.all_addrs - .iter() - .map(|x| format!("{}", x)) - .collect::>() - .join(", "), - i + 1 - ); - h.state = PeerConnState::Trying(i); - - let alternate_addrs = h - .all_addrs - .iter() - .filter(|x| **x != h.addr) - .cloned() - .collect::>(); - tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs)); - } - } - } - self.update_public_peer_list(&known_hosts); - } - - // 4. Sleep before next loop iteration - tokio::time::sleep(LOOP_DELAY).await; - } - } - - /// Returns a list of currently known peers in the network. - pub fn get_peer_list(&self) -> Arc> { - self.public_peer_list.load_full() - } - - /// Set the timeout for ping messages, in milliseconds - pub fn set_ping_timeout_millis(&self, timeout: u64) { - self.ping_timeout_millis - .store(timeout, atomic::Ordering::Relaxed); - } - - // -- internal stuff -- - - fn update_public_peer_list(&self, known_hosts: &KnownHosts) { - let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len()); - for (id, info) in known_hosts.list.iter() { - let mut pings = info.ping.iter().cloned().collect::>(); - pings.sort(); - if !pings.is_empty() { - pub_peer_list.push(PeerInfo { - id: *id, - addr: info.addr, - state: info.state, - last_seen: info.last_seen, - avg_ping: Some( - pings - .iter() - .fold(Duration::from_secs(0), |x, y| x + *y) - .div_f64(pings.len() as f64), - ), - max_ping: pings.last().cloned(), - med_ping: Some(pings[pings.len() / 2]), - }); - } else { - pub_peer_list.push(PeerInfo { - id: *id, - addr: info.addr, - state: info.state, - last_seen: info.last_seen, - avg_ping: None, - max_ping: None, - med_ping: None, - }); - } - } - self.public_peer_list.store(Arc::new(pub_peer_list)); - } - - async fn ping(self: Arc, id: NodeID) { - let peer_list_hash = self.known_hosts.read().unwrap().hash; - let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); - let ping_time = Instant::now(); - let ping_timeout = - Duration::from_millis(self.ping_timeout_millis.load(atomic::Ordering::Relaxed)); - let ping_msg = PingMessage { - id: ping_id, - peer_list_hash, - }; - - debug!( - "Sending ping {} to {} at {:?}", - ping_id, - hex::encode(&id[..8]), - ping_time - ); - let ping_response = select! { - r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r, - _ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())), - }; - - match ping_response { - Err(e) => { - warn!("Error pinging {}: {}", hex::encode(&id[..8]), e); - let mut known_hosts = self.known_hosts.write().unwrap(); - if let Some(host) = known_hosts.list.get_mut(&id) { - host.failed_pings += 1; - if host.failed_pings > FAILED_PING_THRESHOLD { - warn!( - "Too many failed pings from {}, closing connection.", - hex::encode(&id[..8]) - ); - // this will later update info in known_hosts - // through the disconnection handler - self.netapp.disconnect(&id); - } - } - } - Ok(ping_resp) => { - let resp_time = Instant::now(); - debug!( - "Got ping response from {} at {:?}", - hex::encode(&id[..8]), - resp_time - ); - { - let mut known_hosts = self.known_hosts.write().unwrap(); - if let Some(host) = known_hosts.list.get_mut(&id) { - host.failed_pings = 0; - host.last_seen = Some(resp_time); - host.ping.push_back(resp_time - ping_time); - while host.ping.len() > 10 { - host.ping.pop_front(); - } - self.update_public_peer_list(&known_hosts); - } - } - if ping_resp.peer_list_hash != peer_list_hash { - self.exchange_peers(&id).await; - } - } - } - } - - async fn exchange_peers(self: Arc, id: &NodeID) { - let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); - let pex_message = PeerListMessage { list: peer_list }; - match self - .peer_list_endpoint - .call(id, pex_message, PRIO_BACKGROUND) - .await - { - Err(e) => warn!("Error doing peer exchange: {}", e), - Ok(resp) => { - self.handle_peer_list(&resp.list[..]); - } - } - } - - fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) { - let mut known_hosts = self.known_hosts.write().unwrap(); - - let mut changed = false; - for (id, addr) in list.iter() { - if let Some(kh) = known_hosts.list.get_mut(id) { - if !kh.all_addrs.contains(addr) { - kh.all_addrs.push(*addr); - changed = true; - } - } else { - known_hosts.list.insert(*id, self.new_peer(id, *addr)); - changed = true; - } - } - - if changed { - known_hosts.update_hash(); - self.update_public_peer_list(&known_hosts); - } - } - - async fn try_connect( - self: Arc, - id: NodeID, - default_addr: SocketAddr, - alternate_addrs: Vec, - ) { - let conn_addr = { - let mut ret = None; - for addr in [default_addr].iter().chain(alternate_addrs.iter()) { - debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8])); - match self.netapp.clone().try_connect(*addr, id).await { - Ok(()) => { - ret = Some(*addr); - break; - } - Err(e) => { - debug!( - "Error connecting to {} at {}: {}", - hex::encode(&id[..8]), - addr, - e - ); - } - } - } - ret - }; - - if let Some(ok_addr) = conn_addr { - self.on_connected(id, ok_addr, false); - } else { - warn!( - "Could not connect to peer {} ({} addresses tried)", - hex::encode(&id[..8]), - 1 + alternate_addrs.len() - ); - let mut known_hosts = self.known_hosts.write().unwrap(); - if let Some(host) = known_hosts.list.get_mut(&id) { - host.state = match host.state { - PeerConnState::Trying(i) => { - if i >= CONN_MAX_RETRIES { - PeerConnState::Abandonned - } else { - PeerConnState::Waiting(i + 1, Instant::now() + CONN_RETRY_INTERVAL) - } - } - _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL), - }; - self.update_public_peer_list(&known_hosts); - } - } - } - - fn on_connected(self: Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { - let mut known_hosts = self.known_hosts.write().unwrap(); - if is_incoming { - if let Some(host) = known_hosts.list.get_mut(&id) { - if !host.all_addrs.contains(&addr) { - host.all_addrs.push(addr); - } - } else { - known_hosts.list.insert(id, self.new_peer(&id, addr)); - } - } else { - info!( - "Successfully connected to {} at {}", - hex::encode(&id[..8]), - addr - ); - if let Some(host) = known_hosts.list.get_mut(&id) { - host.state = PeerConnState::Connected; - host.addr = addr; - if !host.all_addrs.contains(&addr) { - host.all_addrs.push(addr); - } - } else { - known_hosts - .list - .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected)); - } - } - known_hosts.update_hash(); - self.update_public_peer_list(&known_hosts); - } - - fn on_disconnected(self: Arc, id: NodeID, is_incoming: bool) { - if !is_incoming { - info!("Connection to {} was closed", hex::encode(&id[..8])); - let mut known_hosts = self.known_hosts.write().unwrap(); - if let Some(host) = known_hosts.list.get_mut(&id) { - host.state = PeerConnState::Waiting(0, Instant::now()); - known_hosts.update_hash(); - self.update_public_peer_list(&known_hosts); - } - } - } - - fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal { - let state = if *id == self.netapp.id { - PeerConnState::Ourself - } else { - PeerConnState::Waiting(0, Instant::now()) - }; - PeerInfoInternal::new(addr, state) - } -} - -#[async_trait] -impl EndpointHandler for FullMeshPeeringStrategy { - async fn handle(self: &Arc, ping: &PingMessage, from: NodeID) -> PingMessage { - let ping_resp = PingMessage { - id: ping.id, - peer_list_hash: self.known_hosts.read().unwrap().hash, - }; - debug!("Ping from {}", hex::encode(&from[..8])); - ping_resp - } -} - -#[async_trait] -impl EndpointHandler for FullMeshPeeringStrategy { - async fn handle( - self: &Arc, - peer_list: &PeerListMessage, - _from: NodeID, - ) -> PeerListMessage { - self.handle_peer_list(&peer_list.list[..]); - let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); - PeerListMessage { list: peer_list } - } -} diff --git a/src/net/peering/mod.rs b/src/net/peering/mod.rs deleted file mode 100644 index 044b1dfe..00000000 --- a/src/net/peering/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod fullmesh; diff --git a/src/net/test.rs b/src/net/test.rs index d4da6f23..c6259752 100644 --- a/src/net/test.rs +++ b/src/net/test.rs @@ -9,7 +9,7 @@ use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; use crate::netapp::*; -use crate::peering::fullmesh::*; +use crate::peering::*; use crate::NodeID; #[tokio::test(flavor = "current_thread")] @@ -100,10 +100,10 @@ fn run_netapp( ) -> ( tokio::task::JoinHandle<()>, Arc, - Arc, + Arc, ) { let netapp = NetApp::new(0u64, netid, sk); - let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers, None); + let peering = PeeringManager::new(netapp.clone(), bootstrap_peers, None); let peering2 = peering.clone(); let netapp2 = netapp.clone(); diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index b5279bcd..c46e577f 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -19,7 +19,7 @@ pub use garage_net::message::{ IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, PRIO_SECONDARY, }; -use garage_net::peering::fullmesh::FullMeshPeeringStrategy; +use garage_net::peering::PeeringManager; pub use garage_net::{self, NetApp, NodeID}; use garage_util::data::*; @@ -90,7 +90,7 @@ pub struct RpcHelper(Arc); struct RpcHelperInner { our_node_id: Uuid, - fullmesh: Arc, + peering: Arc, ring: watch::Receiver>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -99,7 +99,7 @@ struct RpcHelperInner { impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, - fullmesh: Arc, + peering: Arc, ring: watch::Receiver>, rpc_timeout: Option, ) -> Self { @@ -107,7 +107,7 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, - fullmesh, + peering, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -210,7 +210,7 @@ impl RpcHelper { { let to = self .0 - .fullmesh + .peering .get_peer_list() .iter() .map(|p| p.id.into()) @@ -391,7 +391,7 @@ impl RpcHelper { pub fn request_order(&self, nodes: &[Uuid]) -> Vec { // Retrieve some status variables that we will use to sort requests - let peer_list = self.0.fullmesh.get_peer_list(); + let peer_list = self.0.peering.get_peer_list(); let ring: Arc = self.0.ring.borrow().clone(); let our_zone = match ring.layout.node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 4b9a72ef..de44e656 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -18,7 +18,7 @@ use tokio::sync::Mutex; use garage_net::endpoint::{Endpoint, EndpointHandler}; use garage_net::message::*; -use garage_net::peering::fullmesh::FullMeshPeeringStrategy; +use garage_net::peering::PeeringManager; use garage_net::util::parse_and_resolve_peer_addr_async; use garage_net::{NetApp, NetworkKey, NodeID, NodeKey}; @@ -92,7 +92,7 @@ pub struct System { node_status: RwLock>, pub netapp: Arc, - fullmesh: Arc, + peering: Arc, pub rpc: RpcHelper, system_endpoint: Arc>, @@ -326,9 +326,9 @@ impl System { } let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); - let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); + let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr); if let Some(ping_timeout) = config.rpc_ping_timeout_msec { - fullmesh.set_ping_timeout_millis(ping_timeout); + peering.set_ping_timeout_millis(ping_timeout); } let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); @@ -358,10 +358,10 @@ impl System { local_status: ArcSwap::new(Arc::new(local_status)), node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), - fullmesh: fullmesh.clone(), + peering: peering.clone(), rpc: RpcHelper::new( netapp.id.into(), - fullmesh, + peering, ring.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), @@ -393,7 +393,7 @@ impl System { self.netapp .clone() .listen(self.rpc_listen_addr, None, must_exit.clone()), - self.fullmesh.clone().run(must_exit.clone()), + self.peering.clone().run(must_exit.clone()), self.discovery_loop(must_exit.clone()), self.status_exchange_loop(must_exit.clone()), ); @@ -405,7 +405,7 @@ impl System { pub fn get_known_nodes(&self) -> Vec { let node_status = self.node_status.read().unwrap(); let known_nodes = self - .fullmesh + .peering .get_peer_list() .iter() .map(|n| KnownNodeInfo { @@ -726,10 +726,10 @@ impl System { async fn discovery_loop(self: &Arc, mut stop_signal: watch::Receiver) { while !*stop_signal.borrow() { let not_configured = self.ring.borrow().layout.check().is_err(); - let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; + let no_peers = self.peering.get_peer_list().len() < self.replication_factor; let expected_n_nodes = self.ring.borrow().layout.num_nodes(); let bad_peers = self - .fullmesh + .peering .get_peer_list() .iter() .filter(|p| p.is_up()) @@ -811,7 +811,7 @@ impl System { // Prepare new peer list to save to file // It is a vec of tuples (node ID as Uuid, node SocketAddr) let mut peer_list = self - .fullmesh + .peering .get_peer_list() .iter() .map(|n| (n.id.into(), n.addr)) -- cgit v1.2.3