aboutsummaryrefslogtreecommitdiff
path: root/src/message.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/message.rs')
-rw-r--r--src/message.rs52
1 files changed, 49 insertions, 3 deletions
diff --git a/src/message.rs b/src/message.rs
index ec9433a..2b2b75f 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -41,17 +41,31 @@ pub const PRIO_SECONDARY: RequestPriority = 0x01;
// ----
-#[derive(Clone, Copy)]
-pub struct OrderTagStream(u64);
+/// An order tag can be added to a message or a response to indicate
+/// whether it should be sent after or before other messages with order tags
+/// referencing a same stream
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
pub struct OrderTag(pub(crate) u64, pub(crate) u64);
+/// A stream is an opaque identifier that defines a set of messages
+/// or responses that are ordered wrt one another using to order tags.
+#[derive(Clone, Copy)]
+pub struct OrderTagStream(u64);
+
+
impl OrderTag {
+ /// Create a new stream from which to generate order tags. Example:
+ /// ```
+ /// let stream = OrderTag.stream();
+ /// let tag_1 = stream.order(1);
+ /// let tag_2 = stream.order(2);
+ /// ```
pub fn stream() -> OrderTagStream {
OrderTagStream(thread_rng().gen())
}
}
impl OrderTagStream {
+ /// Create the order tag for message `order` in this stream
pub fn order(&self, order: u64) -> OrderTag {
OrderTag(self.0, order)
}
@@ -60,8 +74,10 @@ impl OrderTagStream {
// ----
/// This trait should be implemented by all messages your application
-/// wants to handle
+/// wants to handle. It specifies which data type should be sent
+/// as a response to this message in the RPC protocol.
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
+ /// The type of the response that is sent in response to this message
type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
}
@@ -79,10 +95,13 @@ pub struct Req<M: Message> {
}
impl<M: Message> Req<M> {
+ /// Creates a new request from a base message `M`
pub fn new(v: M) -> Result<Self, Error> {
Ok(v.into_req()?)
}
+ /// Attach a stream to message in request, where the stream is streamed
+ /// from a fixed `Bytes` buffer
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
Self {
stream: AttachedStream::Fixed(b),
@@ -90,6 +109,10 @@ impl<M: Message> Req<M> {
}
}
+ /// Attach a stream to message in request, where the stream is
+ /// an instance of `ByteStream`. Note than when a `Req<M>` has an attached
+ /// stream which is a `ByteStream` instance, it can no longer be cloned
+ /// to be sent to different nodes (`.clone()` will panic)
pub fn with_stream(self, b: ByteStream) -> Self {
Self {
stream: AttachedStream::Stream(b),
@@ -97,6 +120,8 @@ impl<M: Message> Req<M> {
}
}
+ /// Add an order tag to this request to indicate in which order it should
+ /// be sent.
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
Self {
order_tag: Some(order_tag),
@@ -104,10 +129,12 @@ impl<M: Message> Req<M> {
}
}
+ /// Get a reference to the message `M` contained in this request
pub fn msg(&self) -> &M {
&self.msg
}
+ /// Takes out the stream attached to this request, if any
pub fn take_stream(&mut self) -> Option<ByteStream> {
std::mem::replace(&mut self.stream, AttachedStream::None).into_stream()
}
@@ -142,8 +169,14 @@ impl<M: Message> Req<M> {
}
}
+/// `IntoReq<M>` represents any object that can be transformed into `Req<M>`
pub trait IntoReq<M: Message> {
+ /// Transform the object into a `Req<M>`, serializing the message M
+ /// to be sent to remote nodes
fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error>;
+ /// Transform the object into a `Req<M>`, skipping the serialization
+ /// of message M, in the case we are not sending this RPC message to
+ /// a remote node
fn into_req_local(self) -> Req<M>;
}
@@ -220,6 +253,7 @@ pub struct Resp<M: Message> {
}
impl<M: Message> Resp<M> {
+ /// Creates a new response from a base response message
pub fn new(v: M::Response) -> Self {
Resp {
_phantom: Default::default(),
@@ -229,6 +263,8 @@ impl<M: Message> Resp<M> {
}
}
+ /// Attach a stream to message in response, where the stream is streamed
+ /// from a fixed `Bytes` buffer
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
Self {
stream: AttachedStream::Fixed(b),
@@ -236,6 +272,8 @@ impl<M: Message> Resp<M> {
}
}
+ /// Attach a stream to message in response, where the stream is
+ /// an instance of `ByteStream`.
pub fn with_stream(self, b: ByteStream) -> Self {
Self {
stream: AttachedStream::Stream(b),
@@ -243,6 +281,8 @@ impl<M: Message> Resp<M> {
}
}
+ /// Add an order tag to this response to indicate in which order it should
+ /// be sent.
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
Self {
order_tag: Some(order_tag),
@@ -250,14 +290,20 @@ impl<M: Message> Resp<M> {
}
}
+ /// Get a reference to the response message contained in this request
pub fn msg(&self) -> &M::Response {
&self.msg
}
+ /// Transforms the `Resp<M>` into the response message it contains,
+ /// dropping everything else (including attached data stream)
pub fn into_msg(self) -> M::Response {
self.msg
}
+ /// Transforms the `Resp<M>` into, on the one side, the response message
+ /// it contains, and on the other side, the associated data stream
+ /// if it exists
pub fn into_parts(self) -> (M::Response, Option<ByteStream>) {
(self.msg, self.stream.into_stream())
}