From 5225a81dee21603950e7944cd93c40fdb1bd8feb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Mar 2024 09:47:04 +0100 Subject: [net-fixes] peering: only count node IDs and not addresses in hash --- src/net/peering.rs | 45 ++++++++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 17 deletions(-) (limited to 'src/net') diff --git a/src/net/peering.rs b/src/net/peering.rs index 61882a18..f4283683 100644 --- a/src/net/peering.rs +++ b/src/net/peering.rs @@ -164,29 +164,40 @@ struct KnownHosts { impl KnownHosts { fn new() -> Self { let list = HashMap::new(); - let hash = Self::calculate_hash(vec![]); - Self { list, hash } + let mut ret = Self { + list, + hash: hash::Digest::from_slice(&[0u8; 64][..]).unwrap(), + }; + ret.update_hash(); + ret } fn update_hash(&mut self) { - self.hash = Self::calculate_hash(self.connected_peers_vec()); - } - fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> { - let mut list = Vec::with_capacity(self.list.len()); - for (id, peer) in self.list.iter() { - if peer.state.is_up() { - list.push((*id, peer.addr)); - } - } - list - } - fn calculate_hash(mut list: Vec<(NodeID, SocketAddr)>) -> hash::Digest { + // The hash is a value that is exchanged between nodes when they ping one + // another. Nodes compare their known hosts hash to know if they are connected + // to the same set of nodes. If the hashes differ, they are connected to + // different nodes and they trigger an exchange of the full list of active + // connections. The hash value only represents the set of node IDs and not + // their actual socket addresses, because nodes can be connected via different + // addresses and that shouldn't necessarily trigger a full peer exchange. + let mut list = self + .list + .iter() + .filter(|(_, peer)| peer.state.is_up()) + .map(|(id, _)| *id) + .collect::>(); list.sort(); let mut hash_state = hash::State::new(); - for (id, addr) in list { + for id in list { hash_state.update(&id[..]); - hash_state.update(&format!("{}\n", addr).into_bytes()[..]); } - hash_state.finalize() + self.hash = hash_state.finalize(); + } + fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> { + self.list + .iter() + .filter(|(_, peer)| peer.state.is_up()) + .map(|(id, peer)| (*id, peer.addr)) + .collect::>() } } -- cgit v1.2.3 From 961b4f9af36a7fb5d3a661ac19e8f2c168bb48ae Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Mar 2024 10:45:34 +0100 Subject: [net-fixes] fix issues with local peer address (fix #761) --- src/net/netapp.rs | 51 ++++++++++------------ src/net/peering.rs | 124 +++++++++++++++++++++++------------------------------ 2 files changed, 77 insertions(+), 98 deletions(-) (limited to 'src/net') diff --git a/src/net/netapp.rs b/src/net/netapp.rs index faa51a99..6480a126 100644 --- a/src/net/netapp.rs +++ b/src/net/netapp.rs @@ -292,13 +292,7 @@ impl NetApp { /// the other node with `Netapp::request` pub async fn try_connect(self: Arc, ip: SocketAddr, id: NodeID) -> Result<(), Error> { // Don't connect to ourself, we don't care - // but pretend we did if id == self.id { - tokio::spawn(async move { - if let Some(h) = self.on_connected_handler.load().as_ref() { - h(id, ip, false); - } - }); return Ok(()); } @@ -327,31 +321,32 @@ impl NetApp { /// Close the outgoing connection we have to a node specified by its public key, /// if such a connection is currently open. pub fn disconnect(self: &Arc, id: &NodeID) { + let conn = self.client_conns.write().unwrap().remove(id); + // If id is ourself, we're not supposed to have a connection open - if *id != self.id { - let conn = self.client_conns.write().unwrap().remove(id); - if let Some(c) = conn { - debug!( - "Closing connection to {} ({})", - hex::encode(&c.peer_id[..8]), - c.remote_addr - ); - c.close(); - } else { - return; - } + if *id == self.id { + // sanity check + assert!(conn.is_none(), "had a connection to local node"); + return; } - // call on_disconnected_handler immediately, since the connection - // was removed - // (if id == self.id, we pretend we disconnected) - let id = *id; - let self2 = self.clone(); - tokio::spawn(async move { - if let Some(h) = self2.on_disconnected_handler.load().as_ref() { - h(id, false); - } - }); + if let Some(c) = conn { + debug!( + "Closing connection to {} ({})", + hex::encode(&c.peer_id[..8]), + c.remote_addr + ); + c.close(); + + // call on_disconnected_handler immediately, since the connection was removed + let id = *id; + let self2 = self.clone(); + tokio::spawn(async move { + if let Some(h) = self2.on_disconnected_handler.load().as_ref() { + h(id, false); + } + }); + } } // Called from conn.rs when an incoming connection is successfully established diff --git a/src/net/peering.rs b/src/net/peering.rs index f4283683..0b4fec9e 100644 --- a/src/net/peering.rs +++ b/src/net/peering.rs @@ -43,7 +43,7 @@ impl Message for PingMessage { #[derive(Serialize, Deserialize)] struct PeerListMessage { - pub list: Vec<(NodeID, SocketAddr)>, + pub list: Vec<(NodeID, Vec)>, } impl Message for PeerListMessage { @@ -54,12 +54,8 @@ impl Message for PeerListMessage { #[derive(Debug)] struct PeerInfoInternal { - // addr is the currently connected address, - // or the last address we were connected to, - // or an arbitrary address some other peer gave us - addr: SocketAddr, - // all_addrs contains all of the addresses everyone gave us - all_addrs: Vec, + // known_addrs contains all of the addresses everyone gave us + known_addrs: Vec, state: PeerConnState, last_send_ping: Option, @@ -69,10 +65,9 @@ struct PeerInfoInternal { } impl PeerInfoInternal { - fn new(addr: SocketAddr, state: PeerConnState) -> Self { + fn new(state: PeerConnState, known_addr: Option) -> Self { Self { - addr, - all_addrs: vec![addr], + known_addrs: known_addr.map(|x| vec![x]).unwrap_or_default(), state, last_send_ping: None, last_seen: None, @@ -81,8 +76,8 @@ impl PeerInfoInternal { } } fn add_addr(&mut self, addr: SocketAddr) -> bool { - if !self.all_addrs.contains(&addr) { - self.all_addrs.push(addr); + if !self.known_addrs.contains(&addr) { + self.known_addrs.push(addr); // If we are learning a new address for this node, // we want to retry connecting self.state = match self.state { @@ -90,7 +85,7 @@ impl PeerInfoInternal { PeerConnState::Waiting(_, _) | PeerConnState::Abandonned => { PeerConnState::Waiting(0, Instant::now()) } - x @ (PeerConnState::Ourself | PeerConnState::Connected) => x, + x @ (PeerConnState::Ourself | PeerConnState::Connected { .. }) => x, }; true } else { @@ -104,8 +99,6 @@ impl PeerInfoInternal { pub struct PeerInfo { /// The node's identifier (its public key) pub id: NodeID, - /// The node's network address - pub addr: SocketAddr, /// The current status of our connection to this node pub state: PeerConnState, /// The last time at which the node was seen @@ -136,7 +129,7 @@ pub enum PeerConnState { Ourself, /// We currently have a connection to this peer - Connected, + Connected { addr: SocketAddr }, /// Our next connection tentative (the nth, where n is the first value of the tuple) /// will be at given Instant @@ -152,7 +145,7 @@ pub enum PeerConnState { impl PeerConnState { /// Returns true if we can currently send requests to this peer pub fn is_up(&self) -> bool { - matches!(self, Self::Ourself | Self::Connected) + matches!(self, Self::Ourself | Self::Connected { .. }) } } @@ -192,11 +185,11 @@ impl KnownHosts { } self.hash = hash_state.finalize(); } - fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> { + fn connected_peers_vec(&self) -> Vec<(NodeID, Vec)> { self.list .iter() .filter(|(_, peer)| peer.state.is_up()) - .map(|(id, peer)| (*id, peer.addr)) + .map(|(id, peer)| (*id, peer.known_addrs.clone())) .collect::>() } } @@ -231,18 +224,16 @@ impl PeeringManager { if id != netapp.id { known_hosts.list.insert( id, - PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())), + PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr)), ); } } - if let Some(addr) = our_addr { - known_hosts.list.insert( - netapp.id, - PeerInfoInternal::new(addr, PeerConnState::Ourself), - ); - known_hosts.update_hash(); - } + known_hosts.list.insert( + netapp.id, + PeerInfoInternal::new(PeerConnState::Ourself, our_addr), + ); + known_hosts.update_hash(); // TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility) let strat = Arc::new(Self { @@ -287,7 +278,7 @@ impl PeeringManager { for (id, info) in known_hosts.list.iter() { trace!("{}, {:?}", hex::encode(&id[..8]), info); match info.state { - PeerConnState::Connected => { + PeerConnState::Connected { .. } => { let must_ping = match info.last_send_ping { None => true, Some(t) => Instant::now() - t > PING_INTERVAL, @@ -330,7 +321,7 @@ impl PeeringManager { info!( "Retrying connection to {} at {} ({})", hex::encode(&id[..8]), - h.all_addrs + h.known_addrs .iter() .map(|x| format!("{}", x)) .collect::>() @@ -339,13 +330,8 @@ impl PeeringManager { ); h.state = PeerConnState::Trying(i); - let alternate_addrs = h - .all_addrs - .iter() - .filter(|x| **x != h.addr) - .cloned() - .collect::>(); - tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs)); + let addresses = h.known_addrs.clone(); + tokio::spawn(self.clone().try_connect(id, addresses)); } } } @@ -373,27 +359,24 @@ impl PeeringManager { fn update_public_peer_list(&self, known_hosts: &KnownHosts) { let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len()); for (id, info) in known_hosts.list.iter() { + if *id == self.netapp.id { + // sanity check + assert!(matches!(info.state, PeerConnState::Ourself)); + } let mut pings = info.ping.iter().cloned().collect::>(); pings.sort(); if !pings.is_empty() { pub_peer_list.push(PeerInfo { id: *id, - addr: info.addr, state: info.state, last_seen: info.last_seen, - avg_ping: Some( - pings - .iter() - .fold(Duration::from_secs(0), |x, y| x + *y) - .div_f64(pings.len() as f64), - ), + avg_ping: Some(pings.iter().sum::().div_f64(pings.len() as f64)), max_ping: pings.last().cloned(), med_ping: Some(pings[pings.len() / 2]), }); } else { pub_peer_list.push(PeerInfo { id: *id, - addr: info.addr, state: info.state, last_seen: info.last_seen, avg_ping: None, @@ -485,18 +468,20 @@ impl PeeringManager { } } - fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) { + fn handle_peer_list(&self, list: &[(NodeID, Vec)]) { let mut known_hosts = self.known_hosts.write().unwrap(); let mut changed = false; - for (id, addr) in list.iter() { - if let Some(kh) = known_hosts.list.get_mut(id) { - if kh.add_addr(*addr) { + for (id, addrs) in list.iter() { + for addr in addrs.iter() { + if let Some(kh) = known_hosts.list.get_mut(id) { + if kh.add_addr(*addr) { + changed = true; + } + } else { + known_hosts.list.insert(*id, self.new_peer(id, *addr)); changed = true; } - } else { - known_hosts.list.insert(*id, self.new_peer(id, *addr)); - changed = true; } } @@ -506,15 +491,10 @@ impl PeeringManager { } } - async fn try_connect( - self: Arc, - id: NodeID, - default_addr: SocketAddr, - alternate_addrs: Vec, - ) { + async fn try_connect(self: Arc, id: NodeID, addresses: Vec) { let conn_addr = { let mut ret = None; - for addr in [default_addr].iter().chain(alternate_addrs.iter()) { + for addr in addresses.iter() { debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8])); match self.netapp.clone().try_connect(*addr, id).await { Ok(()) => { @@ -540,7 +520,7 @@ impl PeeringManager { warn!( "Could not connect to peer {} ({} addresses tried)", hex::encode(&id[..8]), - 1 + alternate_addrs.len() + addresses.len() ); let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { @@ -560,6 +540,14 @@ impl PeeringManager { } fn on_connected(self: &Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { + if id == self.netapp.id { + // sanity check + panic!( + "on_connected from local node, id={:?}, addr={}, incoming={}", + id, addr, is_incoming + ); + } + let mut known_hosts = self.known_hosts.write().unwrap(); if is_incoming { if let Some(host) = known_hosts.list.get_mut(&id) { @@ -574,13 +562,13 @@ impl PeeringManager { addr ); if let Some(host) = known_hosts.list.get_mut(&id) { - host.state = PeerConnState::Connected; - host.addr = addr; + host.state = PeerConnState::Connected { addr }; host.add_addr(addr); } else { - known_hosts - .list - .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected)); + known_hosts.list.insert( + id, + PeerInfoInternal::new(PeerConnState::Connected { addr }, Some(addr)), + ); } } known_hosts.update_hash(); @@ -600,12 +588,8 @@ impl PeeringManager { } fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal { - let state = if *id == self.netapp.id { - PeerConnState::Ourself - } else { - PeerConnState::Waiting(0, Instant::now()) - }; - PeerInfoInternal::new(addr, state) + assert!(*id != self.netapp.id); + PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr)) } } -- cgit v1.2.3 From 3844110cd03210a1600d57db1aab53e41cf4815f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Mar 2024 10:50:44 +0100 Subject: [net-fixes] netapp peer exchange: send only currently connected address --- src/net/peering.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) (limited to 'src/net') diff --git a/src/net/peering.rs b/src/net/peering.rs index 0b4fec9e..b4271231 100644 --- a/src/net/peering.rs +++ b/src/net/peering.rs @@ -43,7 +43,7 @@ impl Message for PingMessage { #[derive(Serialize, Deserialize)] struct PeerListMessage { - pub list: Vec<(NodeID, Vec)>, + pub list: Vec<(NodeID, SocketAddr)>, } impl Message for PeerListMessage { @@ -185,11 +185,13 @@ impl KnownHosts { } self.hash = hash_state.finalize(); } - fn connected_peers_vec(&self) -> Vec<(NodeID, Vec)> { + fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> { self.list .iter() - .filter(|(_, peer)| peer.state.is_up()) - .map(|(id, peer)| (*id, peer.known_addrs.clone())) + .filter_map(|(id, peer)| match peer.state { + PeerConnState::Connected { addr } => Some((*id, addr)), + _ => None, + }) .collect::>() } } @@ -468,20 +470,18 @@ impl PeeringManager { } } - fn handle_peer_list(&self, list: &[(NodeID, Vec)]) { + fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) { let mut known_hosts = self.known_hosts.write().unwrap(); let mut changed = false; - for (id, addrs) in list.iter() { - for addr in addrs.iter() { - if let Some(kh) = known_hosts.list.get_mut(id) { - if kh.add_addr(*addr) { - changed = true; - } - } else { - known_hosts.list.insert(*id, self.new_peer(id, *addr)); + for (id, addr) in list.iter() { + if let Some(kh) = known_hosts.list.get_mut(id) { + if kh.add_addr(*addr) { changed = true; } + } else { + known_hosts.list.insert(*id, self.new_peer(id, *addr)); + changed = true; } } -- cgit v1.2.3 From afad62939e071621666ca7255f7164f92c4475bb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 28 Mar 2024 15:19:44 +0100 Subject: [next-0.10] bump version number to 1.0 --- src/net/Cargo.toml | 2 +- src/net/netapp.rs | 8 +++++--- src/net/peering.rs | 5 ++--- 3 files changed, 8 insertions(+), 7 deletions(-) (limited to 'src/net') diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml index 4bd0d2e5..c12b39a4 100644 --- a/src/net/Cargo.toml +++ b/src/net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_net" -version = "0.10.0" +version = "1.0.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" diff --git a/src/net/netapp.rs b/src/net/netapp.rs index 6480a126..f1e9f1ae 100644 --- a/src/net/netapp.rs +++ b/src/net/netapp.rs @@ -35,8 +35,10 @@ pub type NetworkKey = sodiumoxide::crypto::auth::Key; /// composed of 8 bytes for Netapp version and 8 bytes for client version pub(crate) type VersionTag = [u8; 16]; -/// Value of the Netapp version used in the version tag -pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700005; // netapp 0x0005 +/// Value of garage_net version used in the version tag +/// We are no longer using prefix `netapp` as garage_net is forked from the netapp crate. +/// Since Garage v1.0, we have replaced the prefix by `grgnet` (shorthand for garage_net). +pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6772676e65740010; // grgnet 0x0010 (1.0) /// HelloMessage is sent by the client on a Netapp connection to indicate /// that they are also a server and ready to recieve incoming connections @@ -123,7 +125,7 @@ impl NetApp { netapp .hello_endpoint - .swap(Some(netapp.endpoint("__netapp/netapp.rs/Hello".into()))); + .swap(Some(netapp.endpoint("garage_net/netapp.rs/Hello".into()))); netapp .hello_endpoint .load_full() diff --git a/src/net/peering.rs b/src/net/peering.rs index b4271231..168162d9 100644 --- a/src/net/peering.rs +++ b/src/net/peering.rs @@ -237,14 +237,13 @@ impl PeeringManager { ); known_hosts.update_hash(); - // TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility) let strat = Arc::new(Self { netapp: netapp.clone(), known_hosts: RwLock::new(known_hosts), public_peer_list: ArcSwap::new(Arc::new(Vec::new())), next_ping_id: AtomicU64::new(42), - ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()), - peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()), + ping_endpoint: netapp.endpoint("garage_net/peering.rs/Ping".into()), + peer_list_endpoint: netapp.endpoint("garage_net/peering.rs/PeerList".into()), ping_timeout_millis: DEFAULT_PING_TIMEOUT_MILLIS.into(), }); -- cgit v1.2.3