aboutsummaryrefslogtreecommitdiff
path: root/src/endpoint.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r--src/endpoint.rs45
1 files changed, 34 insertions, 11 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs
index 97a7644..8ee64a5 100644
--- a/src/endpoint.rs
+++ b/src/endpoint.rs
@@ -18,7 +18,7 @@ pub trait EndpointHandler<M>: Send + Sync
where
M: Message,
{
- async fn handle(self: &Arc<Self>, m: M, from: NodeID) -> M::Response;
+ async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>;
}
/// If one simply wants to use an endpoint in a client fashion,
@@ -27,7 +27,7 @@ where
/// it will panic if it is ever made to handle request.
#[async_trait]
impl<M: Message + 'static> EndpointHandler<M> for () {
- async fn handle(self: &Arc<()>, _m: M, _from: NodeID) -> M::Response {
+ async fn handle(self: &Arc<()>, _m: Req<M>, _from: NodeID) -> Resp<M> {
panic!("This endpoint should not have a local handler.");
}
}
@@ -80,16 +80,19 @@ where
/// Call this endpoint on a remote node (or on the local node,
/// for that matter)
- pub async fn call(
+ pub async fn call_full<T>(
&self,
target: &NodeID,
- req: M,
+ req: T,
prio: RequestPriority,
- ) -> Result<<M as Message>::Response, Error> {
+ ) -> Result<Resp<M>, Error>
+ where
+ T: IntoReq<M>,
+ {
if *target == self.netapp.id {
match self.handler.load_full() {
None => Err(Error::NoHandler),
- Some(h) => Ok(h.handle(req, self.netapp.id).await),
+ Some(h) => Ok(h.handle(req.into_req_local(), self.netapp.id).await),
}
} else {
let conn = self
@@ -104,10 +107,21 @@ where
"Not connected: {}",
hex::encode(&target[..8])
))),
- Some(c) => c.call(req, self.path.as_str(), prio).await,
+ Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await,
}
}
}
+
+ /// Call this endpoint on a remote node, without the possibility
+ /// of adding or receiving a body
+ pub async fn call(
+ &self,
+ target: &NodeID,
+ req: M,
+ prio: RequestPriority,
+ ) -> Result<<M as Message>::Response, Error> {
+ Ok(self.call_full(target, req, prio).await?.into_msg())
+ }
}
// ---- Internal stuff ----
@@ -148,11 +162,20 @@ where
None => Err(Error::NoHandler),
Some(h) => {
let req = rmp_serde::decode::from_read_ref(buf)?;
- let req = M::from_parts(req, stream);
+ let req = Req {
+ _phantom: Default::default(),
+ msg: Arc::new(req),
+ msg_ser: None,
+ body: BodyData::Stream(stream),
+ };
let res = h.handle(req, from).await;
- let (res, res_stream) = res.into_parts();
- let res_bytes = rmp_to_vec_all_named(&res)?;
- Ok((res_bytes, res_stream))
+ let Resp {
+ msg,
+ body,
+ _phantom,
+ } = res;
+ let res_bytes = rmp_to_vec_all_named(&msg)?;
+ Ok((res_bytes, body.into_stream()))
}
}
}