diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-21 19:05:51 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-21 19:05:51 +0200 |
commit | 44bbc1c00c2532e08dff0d4a547b0a707e89f32d (patch) | |
tree | a6c021ae50370b3c065e3485ef1dd06052a962c9 /src/message.rs | |
parent | 26989bba1409bfc093e58ef98e75885b10ad7c1c (diff) | |
download | netapp-44bbc1c00c2532e08dff0d4a547b0a707e89f32d.tar.gz netapp-44bbc1c00c2532e08dff0d4a547b0a707e89f32d.zip |
Rename AutoSerialize into SimpleMessage and refactor a bit
Diffstat (limited to 'src/message.rs')
-rw-r--r-- | src/message.rs | 118 |
1 files changed, 84 insertions, 34 deletions
diff --git a/src/message.rs b/src/message.rs index 6d50254..f92eb8c 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,7 +1,9 @@ use async_trait::async_trait; -use futures::stream::{Stream, StreamExt}; +use bytes::Bytes; use serde::{Deserialize, Serialize}; +use futures::stream::{Stream, StreamExt}; + use crate::error::*; use crate::util::*; @@ -41,66 +43,112 @@ pub trait Message: SerializeMessage + Send + Sync { } /// A trait for de/serializing messages, with possible associated stream. -#[async_trait] pub trait SerializeMessage: Sized { type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send; - fn serialize_msg(&self) -> (Self::SerializableSelf, Option<ByteStream>); + fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>); + + fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self; +} + +// ---- + +impl<T, E> SerializeMessage for Result<T, E> +where + T: SerializeMessage + Send, + E: Serialize + for<'de> Deserialize<'de> + Send, +{ + type SerializableSelf = Result<T::SerializableSelf, E>; + + fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) { + match self { + Ok(ok) => { + let (msg, stream) = ok.into_parts(); + (Ok(msg), stream) + } + Err(err) => (Err(err), None), + } + } - // TODO should return Result - async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self; + fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self { + match ser_self { + Ok(ok) => Ok(T::from_parts(ok, stream)), + Err(err) => Err(err), + } + } } -pub trait AutoSerialize: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync {} +// --- + +pub trait SimpleMessage: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync {} -#[async_trait] impl<T> SerializeMessage for T where - T: AutoSerialize, + T: SimpleMessage, { type SerializableSelf = Self; - fn serialize_msg(&self) -> (Self::SerializableSelf, Option<ByteStream>) { - (self.clone(), None) + fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) { + (self, None) } - async fn deserialize_msg(ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self { + fn from_parts(ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self { // TODO verify no stream ser_self } } -impl AutoSerialize for () {} +impl SimpleMessage for () {} -#[async_trait] -impl<T, E> SerializeMessage for Result<T, E> +impl<T: SimpleMessage> SimpleMessage for std::sync::Arc<T> {} + +// ---- + +#[derive(Clone)] +pub struct WithFixedBody<T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static>( + pub T, + pub Bytes, +); + +impl<T> SerializeMessage for WithFixedBody<T> where - T: SerializeMessage + Send, - E: SerializeMessage + Send, + T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static, { - type SerializableSelf = Result<T::SerializableSelf, E::SerializableSelf>; + type SerializableSelf = T; + + fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) { + let body = self.1; + ( + self.0, + Some(Box::pin(futures::stream::once(async move { Ok(body) }))), + ) + } - fn serialize_msg(&self) -> (Self::SerializableSelf, Option<ByteStream>) { - match self { - Ok(ok) => { - let (msg, stream) = ok.serialize_msg(); - (Ok(msg), stream) - } - Err(err) => { - let (msg, stream) = err.serialize_msg(); - (Err(msg), stream) - } - } + fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self { + panic!("Cannot reconstruct a WithFixedBody type from parts"); } +} - async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self { - match ser_self { - Ok(ok) => Ok(T::deserialize_msg(ok, stream).await), - Err(err) => Err(E::deserialize_msg(err, stream).await), - } +pub struct WithStreamingBody<T: Serialize + for<'de> Deserialize<'de> + Send>( + pub T, + pub ByteStream, +); + +impl<T> SerializeMessage for WithStreamingBody<T> +where + T: Serialize + for<'de> Deserialize<'de> + Send, +{ + type SerializableSelf = T; + + fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) { + (self.0, Some(self.1)) + } + + fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self { + WithStreamingBody(ser_self, stream) } } -// ---- +// ---- ---- pub(crate) struct QueryMessage<'a> { pub(crate) prio: RequestPriority, @@ -175,6 +223,8 @@ impl<'a> QueryMessage<'a> { } } +// ---- ---- + pub(crate) struct Framing { direct: Vec<u8>, stream: Option<ByteStream>, |