aboutsummaryrefslogtreecommitdiff
path: root/src/netapp.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-12-02 18:10:07 +0100
committerAlex Auvolat <alex@adnab.me>2020-12-02 18:10:07 +0100
commit46fae5d138cb7c0a74e2a8c7837541f18400ccf4 (patch)
treef4456300e4ed12ffa6dd918236ad74d4c89b0249 /src/netapp.rs
parent9ed776d16ad40a4d47900814b2b7f1ef1c02fa4e (diff)
downloadnetapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.tar.gz
netapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.zip
Better handle requests to ourself
Diffstat (limited to 'src/netapp.rs')
-rw-r--r--src/netapp.rs152
1 files changed, 127 insertions, 25 deletions
diff --git a/src/netapp.rs b/src/netapp.rs
index 6f174b4..25c3b5a 100644
--- a/src/netapp.rs
+++ b/src/netapp.rs
@@ -1,3 +1,4 @@
+use std::any::Any;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
@@ -20,6 +21,18 @@ use crate::message::*;
use crate::proto::*;
use crate::util::*;
+type DynMsg = Box<dyn Any + Send + Sync + 'static>;
+
+pub(crate) struct Handler {
+ pub(crate) local_handler:
+ Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>,
+ pub(crate) net_handler: Box<
+ dyn Fn(ed25519::PublicKey, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>>
+ + Sync
+ + Send,
+ >,
+}
+
pub struct NetApp {
pub listen_addr: SocketAddr,
pub netid: auth::Key,
@@ -27,29 +40,21 @@ pub struct NetApp {
pub privkey: ed25519::SecretKey,
pub server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>,
pub client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>,
- pub(crate) msg_handlers: ArcSwap<
- HashMap<
- MessageKind,
- Arc<
- dyn Fn(
- ed25519::PublicKey,
- Bytes,
- ) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>>
- + Sync
- + Send,
- >,
- >,
- >,
+ pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
pub(crate) on_connected:
ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>,
pub(crate) on_disconnected: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
}
-async fn handler_aux<M, F, R>(handler: Arc<F>, remote: ed25519::PublicKey, bytes: Bytes) -> Vec<u8>
+async fn net_handler_aux<M, F, R>(
+ handler: Arc<F>,
+ remote: ed25519::PublicKey,
+ bytes: Bytes,
+) -> Vec<u8>
where
M: Message + 'static,
F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
- R: Future<Output = Result<<M as Message>::Response, Error>> + Send + Sync,
+ R: Future<Output = <M as Message>::Response> + Send + Sync,
{
debug!(
"Handling message of kind {:08x} from {}",
@@ -57,13 +62,28 @@ where
hex::encode(remote)
);
let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) {
- Ok(msg) => handler(remote.clone(), msg).await,
- Err(e) => Err(e.into()),
+ Ok(msg) => Ok(handler(remote, msg).await),
+ Err(e) => Err(e.to_string()),
};
- let res = res.map_err(|e| format!("{}", e));
rmp_to_vec_all_named(&res).unwrap_or(vec![])
}
+async fn local_handler_aux<M, F, R>(
+ handler: Arc<F>,
+ remote: ed25519::PublicKey,
+ msg: DynMsg,
+) -> DynMsg
+where
+ M: Message + 'static,
+ F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
+ R: Future<Output = <M as Message>::Response> + Send + Sync,
+{
+ debug!("Handling message of kind {:08x} from ourself", M::KIND,);
+ let msg = (msg as Box<dyn Any + 'static>).downcast::<M>().unwrap();
+ let res = handler(remote, *msg).await;
+ Box::new(res)
+}
+
impl NetApp {
pub fn new(
listen_addr: SocketAddr,
@@ -87,7 +107,7 @@ impl NetApp {
netapp.add_msg_handler::<HelloMessage, _, _>(
move |from: ed25519::PublicKey, msg: HelloMessage| {
netapp2.handle_hello_message(from, msg);
- async { Ok(()) }
+ async { () }
},
);
@@ -98,16 +118,31 @@ impl NetApp {
where
M: Message + 'static,
F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
- R: Future<Output = Result<<M as Message>::Response, Error>> + Send + Sync + 'static,
+ R: Future<Output = <M as Message>::Response> + Send + Sync + 'static,
{
let handler = Arc::new(handler);
- let fun = Arc::new(move |remote: ed25519::PublicKey, bytes: Bytes| {
+
+ let handler1 = handler.clone();
+ let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| {
let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> =
- Box::pin(handler_aux(handler.clone(), remote, bytes));
+ Box::pin(net_handler_aux(handler1.clone(), remote, bytes));
+ fun
+ });
+
+ let self_id = self.pubkey.clone();
+ let local_handler = Box::new(move |msg: DynMsg| {
+ let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> =
+ Box::pin(local_handler_aux(handler.clone(), self_id, msg));
fun
});
+
+ let funs = Arc::new(Handler {
+ net_handler,
+ local_handler,
+ });
+
let mut handlers = self.msg_handlers.load().as_ref().clone();
- handlers.insert(M::KIND, fun);
+ handlers.insert(M::KIND, funs);
self.msg_handlers.store(Arc::new(handlers));
}
@@ -136,23 +171,48 @@ impl NetApp {
ip: SocketAddr,
pk: ed25519::PublicKey,
) -> Result<(), Error> {
+ if pk == self.pubkey {
+ // Don't connect to ourself, we don't care
+ // but pretend we did
+ tokio::spawn(async move {
+ if let Some(h) = self.on_connected.load().as_ref() {
+ h(pk, ip, false);
+ }
+ });
+ return Ok(());
+ }
+
+ // Don't connect if already connected
if self.client_conns.read().unwrap().contains_key(&pk) {
return Ok(());
}
+
let socket = TcpStream::connect(ip).await?;
info!("Connected to {}, negotiating handshake...", ip);
ClientConn::init(self, socket, pk.clone()).await?;
Ok(())
}
- pub fn disconnect(self: Arc<Self>, id: &ed25519::PublicKey) {
- let conn = self.client_conns.read().unwrap().get(id).cloned();
+ pub fn disconnect(self: Arc<Self>, pk: &ed25519::PublicKey) {
+ if *pk == self.pubkey {
+ let pk = *pk;
+ tokio::spawn(async move {
+ if let Some(h) = self.on_disconnected.load().as_ref() {
+ h(pk, false);
+ }
+ });
+ return;
+ }
+
+ let conn = self.client_conns.read().unwrap().get(pk).cloned();
if let Some(c) = conn {
c.close();
}
}
pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) {
+ info!("Accepted connection from {}", hex::encode(id));
+
let mut conn_list = self.server_conns.write().unwrap();
conn_list.insert(id.clone(), conn);
}
@@ -167,6 +227,8 @@ impl NetApp {
}
pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) {
+ info!("Connection from {} closed", hex::encode(id));
+
let mut conn_list = self.server_conns.write().unwrap();
if let Some(c) = conn_list.get(id) {
if Arc::ptr_eq(c, &conn) {
@@ -180,6 +242,8 @@ impl NetApp {
}
pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) {
+ info!("Connection established to {}", hex::encode(id));
+
{
let mut conn_list = self.client_conns.write().unwrap();
if let Some(old_c) = conn_list.insert(id.clone(), conn.clone()) {
@@ -200,6 +264,7 @@ impl NetApp {
}
pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) {
+ info!("Connection to {} closed", hex::encode(id));
let mut conn_list = self.client_conns.write().unwrap();
if let Some(c) = conn_list.get(id) {
if Arc::ptr_eq(c, &conn) {
@@ -211,4 +276,41 @@ impl NetApp {
}
}
}
+
+ pub async fn request<T>(
+ &self,
+ target: &ed25519::PublicKey,
+ rq: T,
+ prio: RequestPriority,
+ ) -> Result<<T as Message>::Response, Error>
+ where
+ T: Message + 'static,
+ {
+ if *target == self.pubkey {
+ let handler = self.msg_handlers.load().get(&T::KIND).cloned();
+ match handler {
+ None => Err(Error::Message(format!(
+ "No handler registered for message kind {:08x}",
+ T::KIND
+ ))),
+ Some(h) => {
+ let local_handler = &h.local_handler;
+ let res = local_handler(Box::new(rq)).await;
+ let res_t = (res as Box<dyn Any + 'static>)
+ .downcast::<<T as Message>::Response>()
+ .unwrap();
+ Ok(*res_t)
+ }
+ }
+ } else {
+ let conn = self.client_conns.read().unwrap().get(target).cloned();
+ match conn {
+ None => Err(Error::Message(format!(
+ "Not connected: {}",
+ hex::encode(target)
+ ))),
+ Some(c) => c.request(rq, prio).await,
+ }
+ }
+ }
}