From 7d148c7e764d563efa3bccc0f14f50867db38ef1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 19:25:07 +0200 Subject: One possibility, but I don't like it --- src/client.rs | 1 - src/endpoint.rs | 1 - src/message.rs | 54 ++++++++++++++++--------------------------------- src/netapp.rs | 2 -- src/peering/basalt.rs | 9 ++++----- src/peering/fullmesh.rs | 4 ---- src/util.rs | 2 -- 7 files changed, 21 insertions(+), 52 deletions(-) (limited to 'src') diff --git a/src/client.rs b/src/client.rs index cf80746..9d572aa 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,3 @@ -use std::borrow::Borrow; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::atomic::{self, AtomicU32}; diff --git a/src/endpoint.rs b/src/endpoint.rs index 3f292d9..97a7644 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -1,4 +1,3 @@ -use std::borrow::Borrow; use std::marker::PhantomData; use std::sync::Arc; diff --git a/src/message.rs b/src/message.rs index f92eb8c..22cae6a 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -43,6 +42,10 @@ pub trait Message: SerializeMessage + Send + Sync { } /// A trait for de/serializing messages, with possible associated stream. +/// This is default-implemented by anything that can already be serialized +/// and deserialized. Adapters are provided that implement this for +/// adding a body, either from a fixed Bytes buffer (which allows the thing +/// to be Clone), or from a streaming byte stream. pub trait SerializeMessage: Sized { type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send; @@ -53,38 +56,9 @@ pub trait SerializeMessage: Sized { // ---- -impl SerializeMessage for Result -where - T: SerializeMessage + Send, - E: Serialize + for<'de> Deserialize<'de> + Send, -{ - type SerializableSelf = Result; - - fn into_parts(self) -> (Self::SerializableSelf, Option) { - match self { - Ok(ok) => { - let (msg, stream) = ok.into_parts(); - (Ok(msg), stream) - } - Err(err) => (Err(err), None), - } - } - - fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self { - match ser_self { - Ok(ok) => Ok(T::from_parts(ok, stream)), - Err(err) => Err(err), - } - } -} - -// --- - -pub trait SimpleMessage: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync {} - impl SerializeMessage for T where - T: SimpleMessage, + T: Serialize + for<'de> Deserialize<'de> + Send, { type SerializableSelf = Self; fn into_parts(self) -> (Self::SerializableSelf, Option) { @@ -97,12 +71,15 @@ where } } -impl SimpleMessage for () {} - -impl SimpleMessage for std::sync::Arc {} - // ---- +/// An adapter that adds a body from a fixed Bytes buffer to a serializable message, +/// implementing the SerializeMessage trait. This allows for the SerializeMessage object +/// to be cloned, which is usefull for requests that must be sent to multiple servers. +/// Note that cloning the body is cheap thanks to Bytes; make sure that your serializable +/// part is also easily clonable (e.g. by wrapping it in an Arc). +/// Note that this CANNOT be used for a response type, as it cannot be reconstructed +/// from a remote stream. #[derive(Clone)] pub struct WithFixedBody Deserialize<'de> + Clone + Send + 'static>( pub T, @@ -123,11 +100,14 @@ where ) } - fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self { - panic!("Cannot reconstruct a WithFixedBody type from parts"); + fn from_parts(_ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self { + panic!("Cannot use a WithFixedBody as a response type"); } } +/// An adapter that adds a body from a ByteStream. This is usefull for receiving +/// responses to requests that contain attached byte streams. This type is +/// not clonable. pub struct WithStreamingBody Deserialize<'de> + Send>( pub T, pub ByteStream, diff --git a/src/netapp.rs b/src/netapp.rs index 8365de0..32a5c23 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -38,8 +38,6 @@ pub(crate) struct HelloMessage { pub server_port: u16, } -impl SimpleMessage for HelloMessage {} - impl Message for HelloMessage { type Response = (); } diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 98977a3..d7bc6a8 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -16,7 +16,6 @@ use tokio::sync::watch; use crate::endpoint::*; use crate::message::*; use crate::netapp::*; -use crate::send::*; use crate::NodeID; // -- Protocol messages -- @@ -332,7 +331,7 @@ impl Basalt { async fn do_pull(self: Arc, peer: NodeID) { match self .pull_endpoint - .call(&peer, &PullMessage {}, PRIO_NORMAL) + .call(&peer, PullMessage {}, PRIO_NORMAL) .await { Ok(resp) => { @@ -347,7 +346,7 @@ impl Basalt { async fn do_push(self: Arc, peer: NodeID) { let push_msg = self.make_push_message(); - match self.push_endpoint.call(&peer, &push_msg, PRIO_NORMAL).await { + match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await { Ok(_) => { trace!("KYEV PEXo {}", hex::encode(peer)); } @@ -469,14 +468,14 @@ impl Basalt { #[async_trait] impl EndpointHandler for Basalt { - async fn handle(self: &Arc, _pullmsg: &PullMessage, _from: NodeID) -> PushMessage { + async fn handle(self: &Arc, _pullmsg: PullMessage, _from: NodeID) -> PushMessage { self.make_push_message() } } #[async_trait] impl EndpointHandler for Basalt { - async fn handle(self: &Arc, pushmsg: &PushMessage, _from: NodeID) { + async fn handle(self: &Arc, pushmsg: PushMessage, _from: NodeID) { self.handle_peer_list(&pushmsg.peers[..]); } } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 3eeebb3..f8348af 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -40,8 +40,6 @@ impl Message for PingMessage { type Response = PingMessage; } -impl SimpleMessage for PingMessage {} - #[derive(Serialize, Deserialize, Clone)] struct PeerListMessage { pub list: Vec<(NodeID, SocketAddr)>, @@ -51,8 +49,6 @@ impl Message for PeerListMessage { type Response = PeerListMessage; } -impl SimpleMessage for PeerListMessage {} - // -- Algorithm data structures -- #[derive(Debug)] diff --git a/src/util.rs b/src/util.rs index f860672..e7ecea8 100644 --- a/src/util.rs +++ b/src/util.rs @@ -9,8 +9,6 @@ use serde::Serialize; use futures::Stream; use tokio::sync::watch; -use crate::message::SerializeMessage; - /// A node's identifier, which is also its public cryptographic key pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey; /// A node's secret key -- cgit v1.2.3