diff options
Diffstat (limited to 'src/message.rs')
-rw-r--r-- | src/message.rs | 205 |
1 files changed, 142 insertions, 63 deletions
diff --git a/src/message.rs b/src/message.rs index 22cae6a..d918c29 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,3 +1,7 @@ +use std::fmt; +use std::marker::PhantomData; +use std::sync::Arc; + use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -37,94 +41,169 @@ pub const PRIO_SECONDARY: RequestPriority = 0x01; /// This trait should be implemented by all messages your application /// wants to handle -pub trait Message: SerializeMessage + Send + Sync { - type Response: SerializeMessage + Send + Sync; +pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync { + type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync; +} + +pub struct Req<M: Message> { + pub(crate) _phantom: PhantomData<M>, + pub(crate) msg: Arc<M>, + pub(crate) msg_ser: Option<Bytes>, + pub(crate) body: BodyData, } -/// A trait for de/serializing messages, with possible associated stream. -/// This is default-implemented by anything that can already be serialized -/// and deserialized. Adapters are provided that implement this for -/// adding a body, either from a fixed Bytes buffer (which allows the thing -/// to be Clone), or from a streaming byte stream. -pub trait SerializeMessage: Sized { - type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send; +pub struct Resp<M: Message> { + pub(crate) _phantom: PhantomData<M>, + pub(crate) msg: M::Response, + pub(crate) body: BodyData, +} - fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>); +pub(crate) enum BodyData { + None, + Fixed(Bytes), + Stream(ByteStream), +} - fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self; +impl BodyData { + pub fn into_stream(self) -> Option<ByteStream> { + match self { + BodyData::None => None, + BodyData::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))), + BodyData::Stream(s) => Some(s), + } + } } // ---- -impl<T> SerializeMessage for T -where - T: Serialize + for<'de> Deserialize<'de> + Send, -{ - type SerializableSelf = Self; - fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) { - (self, None) +impl<M: Message> Req<M> { + pub fn msg(&self) -> &M { + &self.msg } - fn from_parts(ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self { - // TODO verify no stream - ser_self + pub fn with_fixed_body(self, b: Bytes) -> Self { + Self { + body: BodyData::Fixed(b), + ..self + } + } + + pub fn with_streaming_body(self, b: ByteStream) -> Self { + Self { + body: BodyData::Stream(b), + ..self + } } } -// ---- +pub trait IntoReq<M: Message> { + fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error>; + fn into_req_local(self) -> Req<M>; +} -/// An adapter that adds a body from a fixed Bytes buffer to a serializable message, -/// implementing the SerializeMessage trait. This allows for the SerializeMessage object -/// to be cloned, which is usefull for requests that must be sent to multiple servers. -/// Note that cloning the body is cheap thanks to Bytes; make sure that your serializable -/// part is also easily clonable (e.g. by wrapping it in an Arc). -/// Note that this CANNOT be used for a response type, as it cannot be reconstructed -/// from a remote stream. -#[derive(Clone)] -pub struct WithFixedBody<T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static>( - pub T, - pub Bytes, -); - -impl<T> SerializeMessage for WithFixedBody<T> -where - T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static, -{ - type SerializableSelf = T; - - fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) { - let body = self.1; - ( - self.0, - Some(Box::pin(futures::stream::once(async move { Ok(body) }))), - ) +impl<M: Message> IntoReq<M> for M { + fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> { + let msg_ser = rmp_to_vec_all_named(&self)?; + Ok(Req { + _phantom: Default::default(), + msg: Arc::new(self), + msg_ser: Some(Bytes::from(msg_ser)), + body: BodyData::None, + }) } + fn into_req_local(self) -> Req<M> { + Req { + _phantom: Default::default(), + msg: Arc::new(self), + msg_ser: None, + body: BodyData::None, + } + } +} - fn from_parts(_ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self { - panic!("Cannot use a WithFixedBody as a response type"); +impl<M: Message> IntoReq<M> for Req<M> { + fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> { + Ok(self) + } + fn into_req_local(self) -> Req<M> { + self } } -/// An adapter that adds a body from a ByteStream. This is usefull for receiving -/// responses to requests that contain attached byte streams. This type is -/// not clonable. -pub struct WithStreamingBody<T: Serialize + for<'de> Deserialize<'de> + Send>( - pub T, - pub ByteStream, -); +impl<M: Message> Clone for Req<M> { + fn clone(&self) -> Self { + let body = match &self.body { + BodyData::None => BodyData::None, + BodyData::Fixed(b) => BodyData::Fixed(b.clone()), + BodyData::Stream(_) => panic!("Cannot clone a Req<_> with a stream body"), + }; + Self { + _phantom: Default::default(), + msg: self.msg.clone(), + msg_ser: self.msg_ser.clone(), + body, + } + } +} -impl<T> SerializeMessage for WithStreamingBody<T> +impl<M> fmt::Debug for Req<M> where - T: Serialize + for<'de> Deserialize<'de> + Send, + M: Message + fmt::Debug, { - type SerializableSelf = T; + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + write!(f, "Req[{:?}", self.msg)?; + match &self.body { + BodyData::None => write!(f, "]"), + BodyData::Fixed(b) => write!(f, "; body={}]", b.len()), + BodyData::Stream(_) => write!(f, "; body=stream]"), + } + } +} + +impl<M> fmt::Debug for Resp<M> +where + M: Message, + <M as Message>::Response: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + write!(f, "Resp[{:?}", self.msg)?; + match &self.body { + BodyData::None => write!(f, "]"), + BodyData::Fixed(b) => write!(f, "; body={}]", b.len()), + BodyData::Stream(_) => write!(f, "; body=stream]"), + } + } +} + +impl<M: Message> Resp<M> { + pub fn new(v: M::Response) -> Self { + Resp { + _phantom: Default::default(), + msg: v, + body: BodyData::None, + } + } + + pub fn with_fixed_body(self, b: Bytes) -> Self { + Self { + body: BodyData::Fixed(b), + ..self + } + } + + pub fn with_streaming_body(self, b: ByteStream) -> Self { + Self { + body: BodyData::Stream(b), + ..self + } + } - fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) { - (self.0, Some(self.1)) + pub fn msg(&self) -> &M::Response { + &self.msg } - fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self { - WithStreamingBody(ser_self, stream) + pub fn into_msg(self) -> M::Response { + self.msg } } |