aboutsummaryrefslogtreecommitdiff
path: root/src/endpoint.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-09-13 12:56:53 +0200
committerAlex <alex@adnab.me>2022-09-13 12:56:53 +0200
commit8ac109e3a84bd34550d66baf65fe59b86b63bca2 (patch)
treea49a199a1049d18afaa60f47f46e04cb798aa4b2 /src/endpoint.rs
parenta82700c5a27612002e6ee029ae77915b8114182f (diff)
parent298e956a199711b65ce3820931ca943108b78225 (diff)
downloadnetapp-8ac109e3a84bd34550d66baf65fe59b86b63bca2.tar.gz
netapp-8ac109e3a84bd34550d66baf65fe59b86b63bca2.zip
Merge pull request 'add streaming body to requests and responses' (#3) from stream-body into mainv0.5.0
Reviewed-on: https://git.deuxfleurs.fr/lx/netapp/pulls/3
Diffstat (limited to 'src/endpoint.rs')
-rw-r--r--src/endpoint.rs106
1 files changed, 70 insertions, 36 deletions
diff --git a/src/endpoint.rs b/src/endpoint.rs
index 42e9a98..3cafafe 100644
--- a/src/endpoint.rs
+++ b/src/endpoint.rs
@@ -1,33 +1,25 @@
-use std::borrow::Borrow;
use std::marker::PhantomData;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
-
use crate::error::Error;
+use crate::message::*;
use crate::netapp::*;
-use crate::proto::*;
-use crate::util::*;
-
-/// This trait should be implemented by all messages your application
-/// wants to handle
-pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
- type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync;
-}
/// This trait should be implemented by an object of your application
-/// that can handle a message of type `M`.
+/// that can handle a message of type `M`, if it wishes to handle
+/// streams attached to the request and/or to send back streams
+/// attached to the response..
///
/// The handler object should be in an Arc, see `Endpoint::set_handler`
#[async_trait]
-pub trait EndpointHandler<M>: Send + Sync
+pub trait StreamingEndpointHandler<M>: Send + Sync
where
M: Message,
{
- async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response;
+ async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>;
}
/// If one simply wants to use an endpoint in a client fashion,
@@ -35,12 +27,41 @@ where
/// use the unit type `()` as the handler type:
/// it will panic if it is ever made to handle request.
#[async_trait]
-impl<M: Message + 'static> EndpointHandler<M> for () {
+impl<M: Message> EndpointHandler<M> for () {
async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
panic!("This endpoint should not have a local handler.");
}
}
+// ----
+
+/// This trait should be implemented by an object of your application
+/// that can handle a message of type `M`, in the cases where it doesn't
+/// care about attached stream in the request nor in the response.
+#[async_trait]
+pub trait EndpointHandler<M>: Send + Sync
+where
+ M: Message,
+{
+ async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response;
+}
+
+#[async_trait]
+impl<T, M> StreamingEndpointHandler<M> for T
+where
+ T: EndpointHandler<M>,
+ M: Message,
+{
+ async fn handle(self: &Arc<Self>, mut m: Req<M>, from: NodeID) -> Resp<M> {
+ // Immediately drop stream to ignore all data that comes in,
+ // instead of buffering it indefinitely
+ drop(m.take_stream());
+ Resp::new(EndpointHandler::handle(self, m.msg(), from).await)
+ }
+}
+
+// ----
+
/// This struct represents an endpoint for message of type `M`.
///
/// Creating a new endpoint is done by calling `NetApp::endpoint`.
@@ -50,13 +71,13 @@ impl<M: Message + 'static> EndpointHandler<M> for () {
/// An `Endpoint` is used both to send requests to remote nodes,
/// and to specify the handler for such requests on the local node.
/// The type `H` represents the type of the handler object for
-/// endpoint messages (see `EndpointHandler`).
+/// endpoint messages (see `StreamingEndpointHandler`).
pub struct Endpoint<M, H>
where
M: Message,
- H: EndpointHandler<M>,
+ H: StreamingEndpointHandler<M>,
{
- phantom: PhantomData<M>,
+ _phantom: PhantomData<M>,
netapp: Arc<NetApp>,
path: String,
handler: ArcSwapOption<H>,
@@ -65,11 +86,11 @@ where
impl<M, H> Endpoint<M, H>
where
M: Message,
- H: EndpointHandler<M>,
+ H: StreamingEndpointHandler<M>,
{
pub(crate) fn new(netapp: Arc<NetApp>, path: String) -> Self {
Self {
- phantom: PhantomData::default(),
+ _phantom: PhantomData::default(),
netapp,
path,
handler: ArcSwapOption::from(None),
@@ -88,20 +109,22 @@ where
}
/// Call this endpoint on a remote node (or on the local node,
- /// for that matter)
- pub async fn call<B>(
+ /// for that matter). This function invokes the full version that
+ /// allows to attach a stream to the request and to
+ /// receive such a stream attached to the response.
+ pub async fn call_streaming<T>(
&self,
target: &NodeID,
- req: B,
+ req: T,
prio: RequestPriority,
- ) -> Result<<M as Message>::Response, Error>
+ ) -> Result<Resp<M>, Error>
where
- B: Borrow<M>,
+ T: IntoReq<M>,
{
if *target == self.netapp.id {
match self.handler.load_full() {
None => Err(Error::NoHandler),
- Some(h) => Ok(h.handle(req.borrow(), self.netapp.id).await),
+ Some(h) => Ok(h.handle(req.into_req_local(), self.netapp.id).await),
}
} else {
let conn = self
@@ -116,10 +139,22 @@ where
"Not connected: {}",
hex::encode(&target[..8])
))),
- Some(c) => c.call(req, self.path.as_str(), prio).await,
+ Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await,
}
}
}
+
+ /// Call this endpoint on a remote node. This function is the simplified
+ /// version that doesn't allow to have streams attached to the request
+ /// or the response; see `call_streaming` for the full version.
+ pub async fn call(
+ &self,
+ target: &NodeID,
+ req: M,
+ prio: RequestPriority,
+ ) -> Result<<M as Message>::Response, Error> {
+ Ok(self.call_streaming(target, req, prio).await?.into_msg())
+ }
}
// ---- Internal stuff ----
@@ -128,7 +163,7 @@ pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
#[async_trait]
pub(crate) trait GenericEndpoint {
- async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error>;
+ async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>;
fn drop_handler(&self);
fn clone_endpoint(&self) -> DynEndpoint;
}
@@ -137,22 +172,21 @@ pub(crate) trait GenericEndpoint {
pub(crate) struct EndpointArc<M, H>(pub(crate) Arc<Endpoint<M, H>>)
where
M: Message,
- H: EndpointHandler<M>;
+ H: StreamingEndpointHandler<M>;
#[async_trait]
impl<M, H> GenericEndpoint for EndpointArc<M, H>
where
- M: Message + 'static,
- H: EndpointHandler<M> + 'static,
+ M: Message,
+ H: StreamingEndpointHandler<M> + 'static,
{
- async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error> {
+ async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> {
match self.0.handler.load_full() {
None => Err(Error::NoHandler),
Some(h) => {
- let req = rmp_serde::decode::from_read_ref::<_, M>(buf)?;
- let res = h.handle(&req, from).await;
- let res_bytes = rmp_to_vec_all_named(&res)?;
- Ok(res_bytes)
+ let req = Req::from_enc(req_enc)?;
+ let res = h.handle(req, from).await;
+ Ok(res.into_enc()?)
}
}
}