aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-21 19:25:07 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-21 19:25:07 +0200
commit7d148c7e764d563efa3bccc0f14f50867db38ef1 (patch)
treebd2f968b1f2d82b8c95ab7367fa7a282f97f0850
parent44bbc1c00c2532e08dff0d4a547b0a707e89f32d (diff)
downloadnetapp-7d148c7e764d563efa3bccc0f14f50867db38ef1.tar.gz
netapp-7d148c7e764d563efa3bccc0f14f50867db38ef1.zip
One possibility, but I don't like it
-rw-r--r--examples/basalt.rs5
-rw-r--r--src/client.rs1
-rw-r--r--src/endpoint.rs1
-rw-r--r--src/message.rs54
-rw-r--r--src/netapp.rs2
-rw-r--r--src/peering/basalt.rs9
-rw-r--r--src/peering/fullmesh.rs4
-rw-r--r--src/util.rs2
8 files changed, 23 insertions, 55 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs
index 52fab4b..dd56cd7 100644
--- a/examples/basalt.rs
+++ b/examples/basalt.rs
@@ -16,7 +16,6 @@ use tokio::sync::watch;
use netapp::endpoint::*;
use netapp::message::*;
use netapp::peering::basalt::*;
-use netapp::send::*;
use netapp::util::parse_peer_addr;
use netapp::{NetApp, NodeID};
@@ -146,7 +145,7 @@ impl Example {
tokio::spawn(async move {
match self2
.example_endpoint
- .call(&p, &ExampleMessage { example_field: 42 }, PRIO_NORMAL)
+ .call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
.await
{
Ok(resp) => debug!("Got example response: {:?}", resp),
@@ -160,7 +159,7 @@ impl Example {
#[async_trait]
impl EndpointHandler<ExampleMessage> for Example {
- async fn handle(self: &Arc<Self>, msg: &ExampleMessage, _from: NodeID) -> ExampleResponse {
+ async fn handle(self: &Arc<Self>, msg: ExampleMessage, _from: NodeID) -> ExampleResponse {
debug!("Got example message: {:?}, sending example response", msg);
ExampleResponse {
example_field: false,
diff --git a/src/client.rs b/src/client.rs
index cf80746..9d572aa 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,4 +1,3 @@
-use std::borrow::Borrow;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::{self, AtomicU32};
diff --git a/src/endpoint.rs b/src/endpoint.rs
index 3f292d9..97a7644 100644
--- a/src/endpoint.rs
+++ b/src/endpoint.rs
@@ -1,4 +1,3 @@
-use std::borrow::Borrow;
use std::marker::PhantomData;
use std::sync::Arc;
diff --git a/src/message.rs b/src/message.rs
index f92eb8c..22cae6a 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -1,4 +1,3 @@
-use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
@@ -43,6 +42,10 @@ pub trait Message: SerializeMessage + Send + Sync {
}
/// A trait for de/serializing messages, with possible associated stream.
+/// This is default-implemented by anything that can already be serialized
+/// and deserialized. Adapters are provided that implement this for
+/// adding a body, either from a fixed Bytes buffer (which allows the thing
+/// to be Clone), or from a streaming byte stream.
pub trait SerializeMessage: Sized {
type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send;
@@ -53,38 +56,9 @@ pub trait SerializeMessage: Sized {
// ----
-impl<T, E> SerializeMessage for Result<T, E>
-where
- T: SerializeMessage + Send,
- E: Serialize + for<'de> Deserialize<'de> + Send,
-{
- type SerializableSelf = Result<T::SerializableSelf, E>;
-
- fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) {
- match self {
- Ok(ok) => {
- let (msg, stream) = ok.into_parts();
- (Ok(msg), stream)
- }
- Err(err) => (Err(err), None),
- }
- }
-
- fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self {
- match ser_self {
- Ok(ok) => Ok(T::from_parts(ok, stream)),
- Err(err) => Err(err),
- }
- }
-}
-
-// ---
-
-pub trait SimpleMessage: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync {}
-
impl<T> SerializeMessage for T
where
- T: SimpleMessage,
+ T: Serialize + for<'de> Deserialize<'de> + Send,
{
type SerializableSelf = Self;
fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) {
@@ -97,12 +71,15 @@ where
}
}
-impl SimpleMessage for () {}
-
-impl<T: SimpleMessage> SimpleMessage for std::sync::Arc<T> {}
-
// ----
+/// An adapter that adds a body from a fixed Bytes buffer to a serializable message,
+/// implementing the SerializeMessage trait. This allows for the SerializeMessage object
+/// to be cloned, which is usefull for requests that must be sent to multiple servers.
+/// Note that cloning the body is cheap thanks to Bytes; make sure that your serializable
+/// part is also easily clonable (e.g. by wrapping it in an Arc).
+/// Note that this CANNOT be used for a response type, as it cannot be reconstructed
+/// from a remote stream.
#[derive(Clone)]
pub struct WithFixedBody<T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static>(
pub T,
@@ -123,11 +100,14 @@ where
)
}
- fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self {
- panic!("Cannot reconstruct a WithFixedBody type from parts");
+ fn from_parts(_ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self {
+ panic!("Cannot use a WithFixedBody as a response type");
}
}
+/// An adapter that adds a body from a ByteStream. This is usefull for receiving
+/// responses to requests that contain attached byte streams. This type is
+/// not clonable.
pub struct WithStreamingBody<T: Serialize + for<'de> Deserialize<'de> + Send>(
pub T,
pub ByteStream,
diff --git a/src/netapp.rs b/src/netapp.rs
index 8365de0..32a5c23 100644
--- a/src/netapp.rs
+++ b/src/netapp.rs
@@ -38,8 +38,6 @@ pub(crate) struct HelloMessage {
pub server_port: u16,
}
-impl SimpleMessage for HelloMessage {}
-
impl Message for HelloMessage {
type Response = ();
}
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
index 98977a3..d7bc6a8 100644
--- a/src/peering/basalt.rs
+++ b/src/peering/basalt.rs
@@ -16,7 +16,6 @@ use tokio::sync::watch;
use crate::endpoint::*;
use crate::message::*;
use crate::netapp::*;
-use crate::send::*;
use crate::NodeID;
// -- Protocol messages --
@@ -332,7 +331,7 @@ impl Basalt {
async fn do_pull(self: Arc<Self>, peer: NodeID) {
match self
.pull_endpoint
- .call(&peer, &PullMessage {}, PRIO_NORMAL)
+ .call(&peer, PullMessage {}, PRIO_NORMAL)
.await
{
Ok(resp) => {
@@ -347,7 +346,7 @@ impl Basalt {
async fn do_push(self: Arc<Self>, peer: NodeID) {
let push_msg = self.make_push_message();
- match self.push_endpoint.call(&peer, &push_msg, PRIO_NORMAL).await {
+ match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await {
Ok(_) => {
trace!("KYEV PEXo {}", hex::encode(peer));
}
@@ -469,14 +468,14 @@ impl Basalt {
#[async_trait]
impl EndpointHandler<PullMessage> for Basalt {
- async fn handle(self: &Arc<Self>, _pullmsg: &PullMessage, _from: NodeID) -> PushMessage {
+ async fn handle(self: &Arc<Self>, _pullmsg: PullMessage, _from: NodeID) -> PushMessage {
self.make_push_message()
}
}
#[async_trait]
impl EndpointHandler<PushMessage> for Basalt {
- async fn handle(self: &Arc<Self>, pushmsg: &PushMessage, _from: NodeID) {
+ async fn handle(self: &Arc<Self>, pushmsg: PushMessage, _from: NodeID) {
self.handle_peer_list(&pushmsg.peers[..]);
}
}
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
index 3eeebb3..f8348af 100644
--- a/src/peering/fullmesh.rs
+++ b/src/peering/fullmesh.rs
@@ -40,8 +40,6 @@ impl Message for PingMessage {
type Response = PingMessage;
}
-impl SimpleMessage for PingMessage {}
-
#[derive(Serialize, Deserialize, Clone)]
struct PeerListMessage {
pub list: Vec<(NodeID, SocketAddr)>,
@@ -51,8 +49,6 @@ impl Message for PeerListMessage {
type Response = PeerListMessage;
}
-impl SimpleMessage for PeerListMessage {}
-
// -- Algorithm data structures --
#[derive(Debug)]
diff --git a/src/util.rs b/src/util.rs
index f860672..e7ecea8 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -9,8 +9,6 @@ use serde::Serialize;
use futures::Stream;
use tokio::sync::watch;
-use crate::message::SerializeMessage;
-
/// A node's identifier, which is also its public cryptographic key
pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey;
/// A node's secret key