use std::borrow::Borrow;
use std::marker::PhantomData;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::de::Error as DeError;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::error::Error;
use crate::netapp::*;
use crate::proto::*;
use crate::util::*;
/// This trait should be implemented by all messages your application
/// wants to handle
pub trait Message: SerializeMessage + Send + Sync {
type Response: SerializeMessage + Send + Sync;
}
/// A trait for de/serializing messages, with possible associated stream.
#[async_trait]
pub trait SerializeMessage: Sized {
fn serialize_msg<S: Serializer>(
&self,
serializer: S,
) -> Result<(S::Ok, Option<AssociatedStream>), S::Error>;
async fn deserialize_msg<'de, D: Deserializer<'de> + Send>(
deserializer: D,
stream: AssociatedStream,
) -> Result<Self, D::Error>;
}
#[async_trait]
impl<T> SerializeMessage for T
where
T: Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
fn serialize_msg<S: Serializer>(
&self,
serializer: S,
) -> Result<(S::Ok, Option<AssociatedStream>), S::Error> {
self.serialize(serializer).map(|r| (r, None))
}
async fn deserialize_msg<'de, D: Deserializer<'de> + Send>(
deserializer: D,
mut stream: AssociatedStream,
) -> Result<Self, D::Error> {
use futures::StreamExt;
let res = Self::deserialize(deserializer)?;
if stream.next().await.is_some() {
return Err(D::Error::custom(
"failed to deserialize: found associated stream when none expected",
));
}
Ok(res)
}
}
/// This trait should be implemented by an object of your application
/// that can handle a message of type `M`.
///
/// The handler object should be in an Arc, see `Endpoint::set_handler`
#[async_trait]
pub trait EndpointHandler<M>: Send + Sync
where
M: Message,
{
async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response;
}
/// If one simply wants to use an endpoint in a client fashion,
/// without locally serving requests to that endpoint,
/// use the unit type `()` as the handler type:
/// it will panic if it is ever made to handle request.
#[async_trait]
impl<M: Message + 'static> EndpointHandler<M> for () {
async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
panic!("This endpoint should not have a local handler.");
}
}
/// This struct represents an endpoint for message of type `M`.
///
/// Creating a new endpoint is done by calling `NetApp::endpoint`.
/// An endpoint is identified primarily by its path, which is specified
/// at creation time.
///
/// An `Endpoint` is used both to send requests to remote nodes,
/// and to specify the handler for such requests on the local node.
/// The type `H` represents the type of the handler object for
/// endpoint messages (see `EndpointHandler`).
pub struct Endpoint<M, H>
where
M: Message,
H: EndpointHandler<M>,
{
phantom: PhantomData<M>,
netapp: Arc<NetApp>,
path: String,
handler: ArcSwapOption<H>,
}
impl<M, H> Endpoint<M, H>
where
M: Message,
H: EndpointHandler<M>,
{
pub(crate) fn new(netapp: Arc<NetApp>, path: String) -> Self {
Self {
phantom: PhantomData::default(),
netapp,
path,
handler: ArcSwapOption::from(None),
}
}
/// Get the path of this endpoint
pub fn path(&self) -> &str {
&self.path
}
/// Set the object that is responsible of handling requests to
/// this endpoint on the local node.
pub fn set_handler(&self, h: Arc<H>) {
self.handler.swap(Some(h));
}
/// Call this endpoint on a remote node (or on the local node,
/// for that matter)
pub async fn call<B>(
&self,
target: &NodeID,
req: B,
prio: RequestPriority,
) -> Result<<M as Message>::Response, Error>
where
B: Borrow<M>,
{
if *target == self.netapp.id {
match self.handler.load_full() {
None => Err(Error::NoHandler),
Some(h) => Ok(h.handle(req.borrow(), self.netapp.id).await),
}
} else {
let conn = self
.netapp
.client_conns
.read()
.unwrap()
.get(target)
.cloned();
match conn {
None => Err(Error::Message(format!(
"Not connected: {}",
hex::encode(&target[..8])
))),
Some(c) => c.call(req, self.path.as_str(), prio).await,
}
}
}
}
// ---- Internal stuff ----
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
#[async_trait]
pub(crate) trait GenericEndpoint {
async fn handle(
&self,
buf: &[u8],
stream: AssociatedStream,
from: NodeID,
) -> Result<(Vec<u8>, Option<AssociatedStream>), Error>;
fn drop_handler(&self);
fn clone_endpoint(&self) -> DynEndpoint;
}
#[derive(Clone)]
pub(crate) struct EndpointArc<M, H>(pub(crate) Arc<Endpoint<M, H>>)
where
M: Message,
H: EndpointHandler<M>;
#[async_trait]
impl<M, H> GenericEndpoint for EndpointArc<M, H>
where
M: Message + 'static,
H: EndpointHandler<M> + 'static,
{
async fn handle(
&self,
buf: &[u8],
stream: AssociatedStream,
from: NodeID,
) -> Result<(Vec<u8>, Option<AssociatedStream>), Error> {
match self.0.handler.load_full() {
None => Err(Error::NoHandler),
Some(h) => {
let mut deser = rmp_serde::decode::Deserializer::from_read_ref(buf);
let req = M::deserialize_msg(&mut deser, stream).await?;
let res = h.handle(&req, from).await;
let res_bytes = rmp_to_vec_all_named(&res)?;
Ok(res_bytes)
}
}
}
fn drop_handler(&self) {
self.0.handler.swap(None);
}
fn clone_endpoint(&self) -> DynEndpoint {
Box::new(Self(self.0.clone()))
}
}