diff options
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r-- | src/endpoint.rs | 106 |
1 files changed, 70 insertions, 36 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs index 42e9a98..bb768de 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -1,33 +1,25 @@ -use std::borrow::Borrow; use std::marker::PhantomData; use std::sync::Arc; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - use crate::error::Error; +use crate::message::*; use crate::netapp::*; -use crate::proto::*; -use crate::util::*; - -/// This trait should be implemented by all messages your application -/// wants to handle -pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync { - type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync; -} /// This trait should be implemented by an object of your application -/// that can handle a message of type `M`. +/// that can handle a message of type `M`, if it wishes to handle +/// streams attached to the request and/or to send back streams +/// attached to the response.. /// /// The handler object should be in an Arc, see `Endpoint::set_handler` #[async_trait] -pub trait EndpointHandler<M>: Send + Sync +pub trait StreamingEndpointHandler<M>: Send + Sync where M: Message, { - async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response; + async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>; } /// If one simply wants to use an endpoint in a client fashion, @@ -35,12 +27,41 @@ where /// use the unit type `()` as the handler type: /// it will panic if it is ever made to handle request. #[async_trait] -impl<M: Message + 'static> EndpointHandler<M> for () { +impl<M: Message> EndpointHandler<M> for () { async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response { panic!("This endpoint should not have a local handler."); } } +// ---- + +/// This trait should be implemented by an object of your application +/// that can handle a message of type `M`, in the cases where it doesn't +/// care about attached stream in the request nor in the response. +#[async_trait] +pub trait EndpointHandler<M>: Send + Sync +where + M: Message, +{ + async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> <M as Message>::Response; +} + +#[async_trait] +impl<T, M> StreamingEndpointHandler<M> for T +where + T: EndpointHandler<M>, + M: Message, +{ + async fn handle(self: &Arc<Self>, mut m: Req<M>, from: NodeID) -> Resp<M> { + // Immediately drop stream to avoid backpressure if a stream was sent + // (this will make all data sent to the stream be ignored immediately) + drop(m.take_stream()); + Resp::new(EndpointHandler::handle(self, m.msg(), from).await) + } +} + +// ---- + /// This struct represents an endpoint for message of type `M`. /// /// Creating a new endpoint is done by calling `NetApp::endpoint`. @@ -50,13 +71,13 @@ impl<M: Message + 'static> EndpointHandler<M> for () { /// An `Endpoint` is used both to send requests to remote nodes, /// and to specify the handler for such requests on the local node. /// The type `H` represents the type of the handler object for -/// endpoint messages (see `EndpointHandler`). +/// endpoint messages (see `StreamingEndpointHandler`). pub struct Endpoint<M, H> where M: Message, - H: EndpointHandler<M>, + H: StreamingEndpointHandler<M>, { - phantom: PhantomData<M>, + _phantom: PhantomData<M>, netapp: Arc<NetApp>, path: String, handler: ArcSwapOption<H>, @@ -65,11 +86,11 @@ where impl<M, H> Endpoint<M, H> where M: Message, - H: EndpointHandler<M>, + H: StreamingEndpointHandler<M>, { pub(crate) fn new(netapp: Arc<NetApp>, path: String) -> Self { Self { - phantom: PhantomData::default(), + _phantom: PhantomData::default(), netapp, path, handler: ArcSwapOption::from(None), @@ -88,20 +109,22 @@ where } /// Call this endpoint on a remote node (or on the local node, - /// for that matter) - pub async fn call<B>( + /// for that matter). This function invokes the full version that + /// allows to attach a stream to the request and to + /// receive such a stream attached to the response. + pub async fn call_streaming<T>( &self, target: &NodeID, - req: B, + req: T, prio: RequestPriority, - ) -> Result<<M as Message>::Response, Error> + ) -> Result<Resp<M>, Error> where - B: Borrow<M>, + T: IntoReq<M>, { if *target == self.netapp.id { match self.handler.load_full() { None => Err(Error::NoHandler), - Some(h) => Ok(h.handle(req.borrow(), self.netapp.id).await), + Some(h) => Ok(h.handle(req.into_req_local(), self.netapp.id).await), } } else { let conn = self @@ -116,10 +139,22 @@ where "Not connected: {}", hex::encode(&target[..8]) ))), - Some(c) => c.call(req, self.path.as_str(), prio).await, + Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await, } } } + + /// Call this endpoint on a remote node. This function is the simplified + /// version that doesn't allow to have streams attached to the request + /// or the response; see `call_streaming` for the full version. + pub async fn call( + &self, + target: &NodeID, + req: M, + prio: RequestPriority, + ) -> Result<<M as Message>::Response, Error> { + Ok(self.call_streaming(target, req, prio).await?.into_msg()) + } } // ---- Internal stuff ---- @@ -128,7 +163,7 @@ pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>; #[async_trait] pub(crate) trait GenericEndpoint { - async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error>; + async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>; fn drop_handler(&self); fn clone_endpoint(&self) -> DynEndpoint; } @@ -137,22 +172,21 @@ pub(crate) trait GenericEndpoint { pub(crate) struct EndpointArc<M, H>(pub(crate) Arc<Endpoint<M, H>>) where M: Message, - H: EndpointHandler<M>; + H: StreamingEndpointHandler<M>; #[async_trait] impl<M, H> GenericEndpoint for EndpointArc<M, H> where - M: Message + 'static, - H: EndpointHandler<M> + 'static, + M: Message, + H: StreamingEndpointHandler<M> + 'static, { - async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error> { + async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> { match self.0.handler.load_full() { None => Err(Error::NoHandler), Some(h) => { - let req = rmp_serde::decode::from_read_ref::<_, M>(buf)?; - let res = h.handle(&req, from).await; - let res_bytes = rmp_to_vec_all_named(&res)?; - Ok(res_bytes) + let req = Req::from_enc(req_enc)?; + let res = h.handle(req, from).await; + Ok(res.into_enc()?) } } } |