From 4745e7c4ba5665d3303ae567087781778cec9c34 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 8 Jun 2022 00:30:56 +0200 Subject: further work on streams most changes still required are related to error handling --- src/peering/fullmesh.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'src/peering') diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 012c5a0..7dfc5c4 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -29,7 +29,7 @@ const FAILED_PING_THRESHOLD: usize = 3; // -- Protocol messages -- -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] struct PingMessage { pub id: u64, pub peer_list_hash: hash::Digest, @@ -39,7 +39,9 @@ impl Message for PingMessage { type Response = PingMessage; } -#[derive(Serialize, Deserialize)] +impl AutoSerialize for PingMessage {} + +#[derive(Serialize, Deserialize, Clone)] struct PeerListMessage { pub list: Vec<(NodeID, SocketAddr)>, } @@ -48,6 +50,8 @@ impl Message for PeerListMessage { type Response = PeerListMessage; } +impl AutoSerialize for PeerListMessage {} + // -- Algorithm data structures -- #[derive(Debug)] -- cgit v1.2.3 From f35fa7d18d9e0f51bed311355ec1310b1d311ab3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 17:34:53 +0200 Subject: Move things around --- src/peering/basalt.rs | 3 ++- src/peering/fullmesh.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'src/peering') diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 7f77995..98977a3 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -14,8 +14,9 @@ use sodiumoxide::crypto::hash; use tokio::sync::watch; use crate::endpoint::*; +use crate::message::*; use crate::netapp::*; -use crate::proto::*; +use crate::send::*; use crate::NodeID; // -- Protocol messages -- diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 7dfc5c4..5b489ae 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -17,7 +17,8 @@ use sodiumoxide::crypto::hash; use crate::endpoint::*; use crate::error::*; use crate::netapp::*; -use crate::proto::*; + +use crate::message::*; use crate::NodeID; const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); -- cgit v1.2.3 From 44bbc1c00c2532e08dff0d4a547b0a707e89f32d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 19:05:51 +0200 Subject: Rename AutoSerialize into SimpleMessage and refactor a bit --- src/peering/fullmesh.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src/peering') diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 5b489ae..3eeebb3 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -40,7 +40,7 @@ impl Message for PingMessage { type Response = PingMessage; } -impl AutoSerialize for PingMessage {} +impl SimpleMessage for PingMessage {} #[derive(Serialize, Deserialize, Clone)] struct PeerListMessage { @@ -51,7 +51,7 @@ impl Message for PeerListMessage { type Response = PeerListMessage; } -impl AutoSerialize for PeerListMessage {} +impl SimpleMessage for PeerListMessage {} // -- Algorithm data structures -- @@ -379,7 +379,7 @@ impl FullMeshPeeringStrategy { ping_time ); let ping_response = select! { - r = self.ping_endpoint.call(&id, &ping_msg, PRIO_HIGH) => r, + r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r, _ = tokio::time::sleep(PING_TIMEOUT) => Err(Error::Message("Ping timeout".into())), }; @@ -431,7 +431,7 @@ impl FullMeshPeeringStrategy { let pex_message = PeerListMessage { list: peer_list }; match self .peer_list_endpoint - .call(id, &pex_message, PRIO_BACKGROUND) + .call(id, pex_message, PRIO_BACKGROUND) .await { Err(e) => warn!("Error doing peer exchange: {}", e), @@ -587,7 +587,7 @@ impl FullMeshPeeringStrategy { #[async_trait] impl EndpointHandler for FullMeshPeeringStrategy { - async fn handle(self: &Arc, ping: &PingMessage, from: NodeID) -> PingMessage { + async fn handle(self: &Arc, ping: PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { id: ping.id, peer_list_hash: self.known_hosts.read().unwrap().hash, @@ -601,7 +601,7 @@ impl EndpointHandler for FullMeshPeeringStrategy { impl EndpointHandler for FullMeshPeeringStrategy { async fn handle( self: &Arc, - peer_list: &PeerListMessage, + peer_list: PeerListMessage, _from: NodeID, ) -> PeerListMessage { self.handle_peer_list(&peer_list.list[..]); -- cgit v1.2.3 From 7d148c7e764d563efa3bccc0f14f50867db38ef1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 19:25:07 +0200 Subject: One possibility, but I don't like it --- src/peering/basalt.rs | 9 ++++----- src/peering/fullmesh.rs | 4 ---- 2 files changed, 4 insertions(+), 9 deletions(-) (limited to 'src/peering') diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 98977a3..d7bc6a8 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -16,7 +16,6 @@ use tokio::sync::watch; use crate::endpoint::*; use crate::message::*; use crate::netapp::*; -use crate::send::*; use crate::NodeID; // -- Protocol messages -- @@ -332,7 +331,7 @@ impl Basalt { async fn do_pull(self: Arc, peer: NodeID) { match self .pull_endpoint - .call(&peer, &PullMessage {}, PRIO_NORMAL) + .call(&peer, PullMessage {}, PRIO_NORMAL) .await { Ok(resp) => { @@ -347,7 +346,7 @@ impl Basalt { async fn do_push(self: Arc, peer: NodeID) { let push_msg = self.make_push_message(); - match self.push_endpoint.call(&peer, &push_msg, PRIO_NORMAL).await { + match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await { Ok(_) => { trace!("KYEV PEXo {}", hex::encode(peer)); } @@ -469,14 +468,14 @@ impl Basalt { #[async_trait] impl EndpointHandler for Basalt { - async fn handle(self: &Arc, _pullmsg: &PullMessage, _from: NodeID) -> PushMessage { + async fn handle(self: &Arc, _pullmsg: PullMessage, _from: NodeID) -> PushMessage { self.make_push_message() } } #[async_trait] impl EndpointHandler for Basalt { - async fn handle(self: &Arc, pushmsg: &PushMessage, _from: NodeID) { + async fn handle(self: &Arc, pushmsg: PushMessage, _from: NodeID) { self.handle_peer_list(&pushmsg.peers[..]); } } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 3eeebb3..f8348af 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -40,8 +40,6 @@ impl Message for PingMessage { type Response = PingMessage; } -impl SimpleMessage for PingMessage {} - #[derive(Serialize, Deserialize, Clone)] struct PeerListMessage { pub list: Vec<(NodeID, SocketAddr)>, @@ -51,8 +49,6 @@ impl Message for PeerListMessage { type Response = PeerListMessage; } -impl SimpleMessage for PeerListMessage {} - // -- Algorithm data structures -- #[derive(Debug)] -- cgit v1.2.3 From 4934ed726d51913afd97ca937d0ece39ef8b7371 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 20:22:56 +0200 Subject: Propose alternative API --- src/peering/basalt.rs | 17 +++++++++++++---- src/peering/fullmesh.rs | 12 +++++++----- 2 files changed, 20 insertions(+), 9 deletions(-) (limited to 'src/peering') diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index d7bc6a8..71dea84 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -468,15 +468,24 @@ impl Basalt { #[async_trait] impl EndpointHandler for Basalt { - async fn handle(self: &Arc, _pullmsg: PullMessage, _from: NodeID) -> PushMessage { - self.make_push_message() + async fn handle( + self: &Arc, + _pullmsg: Req, + _from: NodeID, + ) -> Resp { + Resp::new(self.make_push_message()) } } #[async_trait] impl EndpointHandler for Basalt { - async fn handle(self: &Arc, pushmsg: PushMessage, _from: NodeID) { - self.handle_peer_list(&pushmsg.peers[..]); + async fn handle( + self: &Arc, + pushmsg: Req, + _from: NodeID, + ) -> Resp { + self.handle_peer_list(&pushmsg.msg().peers[..]); + Resp::new(()) } } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index f8348af..9b7b666 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -583,13 +583,14 @@ impl FullMeshPeeringStrategy { #[async_trait] impl EndpointHandler for FullMeshPeeringStrategy { - async fn handle(self: &Arc, ping: PingMessage, from: NodeID) -> PingMessage { + async fn handle(self: &Arc, ping: Req, from: NodeID) -> Resp { + let ping = ping.msg(); let ping_resp = PingMessage { id: ping.id, peer_list_hash: self.known_hosts.read().unwrap().hash, }; debug!("Ping from {}", hex::encode(&from[..8])); - ping_resp + Resp::new(ping_resp) } } @@ -597,11 +598,12 @@ impl EndpointHandler for FullMeshPeeringStrategy { impl EndpointHandler for FullMeshPeeringStrategy { async fn handle( self: &Arc, - peer_list: PeerListMessage, + peer_list: Req, _from: NodeID, - ) -> PeerListMessage { + ) -> Resp { + let peer_list = peer_list.msg(); self.handle_peer_list(&peer_list.list[..]); let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); - PeerListMessage { list: peer_list } + Resp::new(PeerListMessage { list: peer_list }) } } -- cgit v1.2.3 From c358fe3c92da8a8454e461484737efe2a14dfd73 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 10:55:37 +0200 Subject: Hide streaming versions as much as possible --- src/peering/basalt.rs | 17 ++++------------- src/peering/fullmesh.rs | 12 +++++------- 2 files changed, 9 insertions(+), 20 deletions(-) (limited to 'src/peering') diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 71dea84..310077f 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -468,24 +468,15 @@ impl Basalt { #[async_trait] impl EndpointHandler for Basalt { - async fn handle( - self: &Arc, - _pullmsg: Req, - _from: NodeID, - ) -> Resp { - Resp::new(self.make_push_message()) + async fn handle(self: &Arc, _pullmsg: &PullMessage, _from: NodeID) -> PushMessage { + self.make_push_message() } } #[async_trait] impl EndpointHandler for Basalt { - async fn handle( - self: &Arc, - pushmsg: Req, - _from: NodeID, - ) -> Resp { - self.handle_peer_list(&pushmsg.msg().peers[..]); - Resp::new(()) + async fn handle(self: &Arc, pushmsg: &PushMessage, _from: NodeID) { + self.handle_peer_list(&pushmsg.peers[..]); } } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 9b7b666..ccbd0ba 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -583,14 +583,13 @@ impl FullMeshPeeringStrategy { #[async_trait] impl EndpointHandler for FullMeshPeeringStrategy { - async fn handle(self: &Arc, ping: Req, from: NodeID) -> Resp { - let ping = ping.msg(); + async fn handle(self: &Arc, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { id: ping.id, peer_list_hash: self.known_hosts.read().unwrap().hash, }; debug!("Ping from {}", hex::encode(&from[..8])); - Resp::new(ping_resp) + ping_resp } } @@ -598,12 +597,11 @@ impl EndpointHandler for FullMeshPeeringStrategy { impl EndpointHandler for FullMeshPeeringStrategy { async fn handle( self: &Arc, - peer_list: Req, + peer_list: &PeerListMessage, _from: NodeID, - ) -> Resp { - let peer_list = peer_list.msg(); + ) -> PeerListMessage { self.handle_peer_list(&peer_list.list[..]); let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); - Resp::new(PeerListMessage { list: peer_list }) + PeerListMessage { list: peer_list } } } -- cgit v1.2.3 From f022a77f97c169807ae098e101a29301c0d19fbd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 12 Sep 2022 17:43:10 +0200 Subject: Add documentation --- src/peering/fullmesh.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/peering') diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 7f1c065..2f3330e 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -81,6 +81,7 @@ impl PeerInfoInternal { } } +/// Information that the full mesh peering strategy can return about the peers it knows of #[derive(Copy, Clone, Debug)] pub struct PeerInfo { /// The node's identifier (its public key) -- cgit v1.2.3 From 8ab6256c3b5a2cde7144b3a5e1ef488b7bce6227 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 12:12:55 +0200 Subject: No longer need to derive Clone on message types --- src/peering/fullmesh.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/peering') diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 2f3330e..fb2e3d1 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -30,7 +30,7 @@ const FAILED_PING_THRESHOLD: usize = 4; // -- Protocol messages -- -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize)] struct PingMessage { pub id: u64, pub peer_list_hash: hash::Digest, @@ -40,7 +40,7 @@ impl Message for PingMessage { type Response = PingMessage; } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize)] struct PeerListMessage { pub list: Vec<(NodeID, SocketAddr)>, } -- cgit v1.2.3