1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
use std::marker::PhantomData;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::error::Error;
use crate::netapp::*;
use crate::proto::*;
use crate::util::*;
/// This trait should be implemented by all messages your application
/// wants to handle (click to read more).
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync;
}
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
#[async_trait]
pub trait EndpointHandler<M>: Send + Sync
where
M: Message,
{
async fn handle(self: &Arc<Self>, m: M, from: NodeID) -> M::Response;
}
pub struct Endpoint<M, H>
where
M: Message,
H: EndpointHandler<M>,
{
phantom: PhantomData<M>,
netapp: Arc<NetApp>,
path: String,
handler: ArcSwapOption<H>,
}
impl<M, H> Endpoint<M, H>
where
M: Message,
H: EndpointHandler<M>,
{
pub(crate) fn new(netapp: Arc<NetApp>, path: String) -> Self {
Self {
phantom: PhantomData::default(),
netapp,
path,
handler: ArcSwapOption::from(None),
}
}
pub fn set_handler(&self, h: Arc<H>) {
self.handler.swap(Some(h));
}
pub async fn call(
&self,
target: &NodeID,
req: M,
prio: RequestPriority,
) -> Result<<M as Message>::Response, Error> {
if *target == self.netapp.id {
match self.handler.load_full() {
None => Err(Error::NoHandler),
Some(h) => Ok(h.handle(req, self.netapp.id).await),
}
} else {
let conn = self
.netapp
.client_conns
.read()
.unwrap()
.get(target)
.cloned();
match conn {
None => Err(Error::Message(format!(
"Not connected: {}",
hex::encode(target)
))),
Some(c) => c.call(req, self.path.as_str(), prio).await,
}
}
}
}
#[async_trait]
pub(crate) trait GenericEndpoint {
async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error>;
fn clear_handler(&self);
fn clone_endpoint(&self) -> DynEndpoint;
}
#[derive(Clone)]
pub(crate) struct EndpointArc<M, H>(pub(crate) Arc<Endpoint<M, H>>)
where
M: Message,
H: EndpointHandler<M>;
#[async_trait]
impl<M, H> GenericEndpoint for EndpointArc<M, H>
where
M: Message + 'static,
H: EndpointHandler<M> + 'static,
{
async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error> {
match self.0.handler.load_full() {
None => Err(Error::NoHandler),
Some(h) => {
let req = rmp_serde::decode::from_read_ref::<_, M>(buf)?;
let res = h.handle(req, from).await;
let res_bytes = rmp_to_vec_all_named(&res)?;
Ok(res_bytes)
}
}
}
fn clear_handler(&self) {
self.0.handler.swap(None);
}
fn clone_endpoint(&self) -> DynEndpoint {
Box::new(Self(self.0.clone()))
}
}
|