diff options
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r-- | src/endpoint.rs | 45 |
1 files changed, 34 insertions, 11 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs index 97a7644..8ee64a5 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -18,7 +18,7 @@ pub trait EndpointHandler<M>: Send + Sync where M: Message, { - async fn handle(self: &Arc<Self>, m: M, from: NodeID) -> M::Response; + async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>; } /// If one simply wants to use an endpoint in a client fashion, @@ -27,7 +27,7 @@ where /// it will panic if it is ever made to handle request. #[async_trait] impl<M: Message + 'static> EndpointHandler<M> for () { - async fn handle(self: &Arc<()>, _m: M, _from: NodeID) -> M::Response { + async fn handle(self: &Arc<()>, _m: Req<M>, _from: NodeID) -> Resp<M> { panic!("This endpoint should not have a local handler."); } } @@ -80,16 +80,19 @@ where /// Call this endpoint on a remote node (or on the local node, /// for that matter) - pub async fn call( + pub async fn call_full<T>( &self, target: &NodeID, - req: M, + req: T, prio: RequestPriority, - ) -> Result<<M as Message>::Response, Error> { + ) -> Result<Resp<M>, Error> + where + T: IntoReq<M>, + { if *target == self.netapp.id { match self.handler.load_full() { None => Err(Error::NoHandler), - Some(h) => Ok(h.handle(req, self.netapp.id).await), + Some(h) => Ok(h.handle(req.into_req_local(), self.netapp.id).await), } } else { let conn = self @@ -104,10 +107,21 @@ where "Not connected: {}", hex::encode(&target[..8]) ))), - Some(c) => c.call(req, self.path.as_str(), prio).await, + Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await, } } } + + /// Call this endpoint on a remote node, without the possibility + /// of adding or receiving a body + pub async fn call( + &self, + target: &NodeID, + req: M, + prio: RequestPriority, + ) -> Result<<M as Message>::Response, Error> { + Ok(self.call_full(target, req, prio).await?.into_msg()) + } } // ---- Internal stuff ---- @@ -148,11 +162,20 @@ where None => Err(Error::NoHandler), Some(h) => { let req = rmp_serde::decode::from_read_ref(buf)?; - let req = M::from_parts(req, stream); + let req = Req { + _phantom: Default::default(), + msg: Arc::new(req), + msg_ser: None, + body: BodyData::Stream(stream), + }; let res = h.handle(req, from).await; - let (res, res_stream) = res.into_parts(); - let res_bytes = rmp_to_vec_all_named(&res)?; - Ok((res_bytes, res_stream)) + let Resp { + msg, + body, + _phantom, + } = res; + let res_bytes = rmp_to_vec_all_named(&msg)?; + Ok((res_bytes, body.into_stream())) } } } |