diff options
Diffstat (limited to 'src/message.rs')
-rw-r--r-- | src/message.rs | 52 |
1 files changed, 49 insertions, 3 deletions
diff --git a/src/message.rs b/src/message.rs index ec9433a..2b2b75f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -41,17 +41,31 @@ pub const PRIO_SECONDARY: RequestPriority = 0x01; // ---- -#[derive(Clone, Copy)] -pub struct OrderTagStream(u64); +/// An order tag can be added to a message or a response to indicate +/// whether it should be sent after or before other messages with order tags +/// referencing a same stream #[derive(Clone, Copy, Serialize, Deserialize, Debug)] pub struct OrderTag(pub(crate) u64, pub(crate) u64); +/// A stream is an opaque identifier that defines a set of messages +/// or responses that are ordered wrt one another using to order tags. +#[derive(Clone, Copy)] +pub struct OrderTagStream(u64); + + impl OrderTag { + /// Create a new stream from which to generate order tags. Example: + /// ``` + /// let stream = OrderTag.stream(); + /// let tag_1 = stream.order(1); + /// let tag_2 = stream.order(2); + /// ``` pub fn stream() -> OrderTagStream { OrderTagStream(thread_rng().gen()) } } impl OrderTagStream { + /// Create the order tag for message `order` in this stream pub fn order(&self, order: u64) -> OrderTag { OrderTag(self.0, order) } @@ -60,8 +74,10 @@ impl OrderTagStream { // ---- /// This trait should be implemented by all messages your application -/// wants to handle +/// wants to handle. It specifies which data type should be sent +/// as a response to this message in the RPC protocol. pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { + /// The type of the response that is sent in response to this message type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; } @@ -79,10 +95,13 @@ pub struct Req<M: Message> { } impl<M: Message> Req<M> { + /// Creates a new request from a base message `M` pub fn new(v: M) -> Result<Self, Error> { Ok(v.into_req()?) } + /// Attach a stream to message in request, where the stream is streamed + /// from a fixed `Bytes` buffer pub fn with_stream_from_buffer(self, b: Bytes) -> Self { Self { stream: AttachedStream::Fixed(b), @@ -90,6 +109,10 @@ impl<M: Message> Req<M> { } } + /// Attach a stream to message in request, where the stream is + /// an instance of `ByteStream`. Note than when a `Req<M>` has an attached + /// stream which is a `ByteStream` instance, it can no longer be cloned + /// to be sent to different nodes (`.clone()` will panic) pub fn with_stream(self, b: ByteStream) -> Self { Self { stream: AttachedStream::Stream(b), @@ -97,6 +120,8 @@ impl<M: Message> Req<M> { } } + /// Add an order tag to this request to indicate in which order it should + /// be sent. pub fn with_order_tag(self, order_tag: OrderTag) -> Self { Self { order_tag: Some(order_tag), @@ -104,10 +129,12 @@ impl<M: Message> Req<M> { } } + /// Get a reference to the message `M` contained in this request pub fn msg(&self) -> &M { &self.msg } + /// Takes out the stream attached to this request, if any pub fn take_stream(&mut self) -> Option<ByteStream> { std::mem::replace(&mut self.stream, AttachedStream::None).into_stream() } @@ -142,8 +169,14 @@ impl<M: Message> Req<M> { } } +/// `IntoReq<M>` represents any object that can be transformed into `Req<M>` pub trait IntoReq<M: Message> { + /// Transform the object into a `Req<M>`, serializing the message M + /// to be sent to remote nodes fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error>; + /// Transform the object into a `Req<M>`, skipping the serialization + /// of message M, in the case we are not sending this RPC message to + /// a remote node fn into_req_local(self) -> Req<M>; } @@ -220,6 +253,7 @@ pub struct Resp<M: Message> { } impl<M: Message> Resp<M> { + /// Creates a new response from a base response message pub fn new(v: M::Response) -> Self { Resp { _phantom: Default::default(), @@ -229,6 +263,8 @@ impl<M: Message> Resp<M> { } } + /// Attach a stream to message in response, where the stream is streamed + /// from a fixed `Bytes` buffer pub fn with_stream_from_buffer(self, b: Bytes) -> Self { Self { stream: AttachedStream::Fixed(b), @@ -236,6 +272,8 @@ impl<M: Message> Resp<M> { } } + /// Attach a stream to message in response, where the stream is + /// an instance of `ByteStream`. pub fn with_stream(self, b: ByteStream) -> Self { Self { stream: AttachedStream::Stream(b), @@ -243,6 +281,8 @@ impl<M: Message> Resp<M> { } } + /// Add an order tag to this response to indicate in which order it should + /// be sent. pub fn with_order_tag(self, order_tag: OrderTag) -> Self { Self { order_tag: Some(order_tag), @@ -250,14 +290,20 @@ impl<M: Message> Resp<M> { } } + /// Get a reference to the response message contained in this request pub fn msg(&self) -> &M::Response { &self.msg } + /// Transforms the `Resp<M>` into the response message it contains, + /// dropping everything else (including attached data stream) pub fn into_msg(self) -> M::Response { self.msg } + /// Transforms the `Resp<M>` into, on the one side, the response message + /// it contains, and on the other side, the associated data stream + /// if it exists pub fn into_parts(self) -> (M::Response, Option<ByteStream>) { (self.msg, self.stream.into_stream()) } |