aboutsummaryrefslogtreecommitdiff
path: root/src/message.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/message.rs')
-rw-r--r--src/message.rs118
1 files changed, 84 insertions, 34 deletions
diff --git a/src/message.rs b/src/message.rs
index 6d50254..f92eb8c 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -1,7 +1,9 @@
use async_trait::async_trait;
-use futures::stream::{Stream, StreamExt};
+use bytes::Bytes;
use serde::{Deserialize, Serialize};
+use futures::stream::{Stream, StreamExt};
+
use crate::error::*;
use crate::util::*;
@@ -41,66 +43,112 @@ pub trait Message: SerializeMessage + Send + Sync {
}
/// A trait for de/serializing messages, with possible associated stream.
-#[async_trait]
pub trait SerializeMessage: Sized {
type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send;
- fn serialize_msg(&self) -> (Self::SerializableSelf, Option<ByteStream>);
+ fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>);
+
+ fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self;
+}
+
+// ----
+
+impl<T, E> SerializeMessage for Result<T, E>
+where
+ T: SerializeMessage + Send,
+ E: Serialize + for<'de> Deserialize<'de> + Send,
+{
+ type SerializableSelf = Result<T::SerializableSelf, E>;
+
+ fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) {
+ match self {
+ Ok(ok) => {
+ let (msg, stream) = ok.into_parts();
+ (Ok(msg), stream)
+ }
+ Err(err) => (Err(err), None),
+ }
+ }
- // TODO should return Result
- async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self;
+ fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self {
+ match ser_self {
+ Ok(ok) => Ok(T::from_parts(ok, stream)),
+ Err(err) => Err(err),
+ }
+ }
}
-pub trait AutoSerialize: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync {}
+// ---
+
+pub trait SimpleMessage: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync {}
-#[async_trait]
impl<T> SerializeMessage for T
where
- T: AutoSerialize,
+ T: SimpleMessage,
{
type SerializableSelf = Self;
- fn serialize_msg(&self) -> (Self::SerializableSelf, Option<ByteStream>) {
- (self.clone(), None)
+ fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) {
+ (self, None)
}
- async fn deserialize_msg(ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self {
+ fn from_parts(ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self {
// TODO verify no stream
ser_self
}
}
-impl AutoSerialize for () {}
+impl SimpleMessage for () {}
-#[async_trait]
-impl<T, E> SerializeMessage for Result<T, E>
+impl<T: SimpleMessage> SimpleMessage for std::sync::Arc<T> {}
+
+// ----
+
+#[derive(Clone)]
+pub struct WithFixedBody<T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static>(
+ pub T,
+ pub Bytes,
+);
+
+impl<T> SerializeMessage for WithFixedBody<T>
where
- T: SerializeMessage + Send,
- E: SerializeMessage + Send,
+ T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static,
{
- type SerializableSelf = Result<T::SerializableSelf, E::SerializableSelf>;
+ type SerializableSelf = T;
+
+ fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) {
+ let body = self.1;
+ (
+ self.0,
+ Some(Box::pin(futures::stream::once(async move { Ok(body) }))),
+ )
+ }
- fn serialize_msg(&self) -> (Self::SerializableSelf, Option<ByteStream>) {
- match self {
- Ok(ok) => {
- let (msg, stream) = ok.serialize_msg();
- (Ok(msg), stream)
- }
- Err(err) => {
- let (msg, stream) = err.serialize_msg();
- (Err(msg), stream)
- }
- }
+ fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self {
+ panic!("Cannot reconstruct a WithFixedBody type from parts");
}
+}
- async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self {
- match ser_self {
- Ok(ok) => Ok(T::deserialize_msg(ok, stream).await),
- Err(err) => Err(E::deserialize_msg(err, stream).await),
- }
+pub struct WithStreamingBody<T: Serialize + for<'de> Deserialize<'de> + Send>(
+ pub T,
+ pub ByteStream,
+);
+
+impl<T> SerializeMessage for WithStreamingBody<T>
+where
+ T: Serialize + for<'de> Deserialize<'de> + Send,
+{
+ type SerializableSelf = T;
+
+ fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) {
+ (self.0, Some(self.1))
+ }
+
+ fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self {
+ WithStreamingBody(ser_self, stream)
}
}
-// ----
+// ---- ----
pub(crate) struct QueryMessage<'a> {
pub(crate) prio: RequestPriority,
@@ -175,6 +223,8 @@ impl<'a> QueryMessage<'a> {
}
}
+// ---- ----
+
pub(crate) struct Framing {
direct: Vec<u8>,
stream: Option<ByteStream>,