diff options
author | Alex Auvolat <alex@adnab.me> | 2020-12-02 18:10:07 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-12-02 18:10:07 +0100 |
commit | 46fae5d138cb7c0a74e2a8c7837541f18400ccf4 (patch) | |
tree | f4456300e4ed12ffa6dd918236ad74d4c89b0249 /src/netapp.rs | |
parent | 9ed776d16ad40a4d47900814b2b7f1ef1c02fa4e (diff) | |
download | netapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.tar.gz netapp-46fae5d138cb7c0a74e2a8c7837541f18400ccf4.zip |
Better handle requests to ourself
Diffstat (limited to 'src/netapp.rs')
-rw-r--r-- | src/netapp.rs | 152 |
1 files changed, 127 insertions, 25 deletions
diff --git a/src/netapp.rs b/src/netapp.rs index 6f174b4..25c3b5a 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; @@ -20,6 +21,18 @@ use crate::message::*; use crate::proto::*; use crate::util::*; +type DynMsg = Box<dyn Any + Send + Sync + 'static>; + +pub(crate) struct Handler { + pub(crate) local_handler: + Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>, + pub(crate) net_handler: Box< + dyn Fn(ed25519::PublicKey, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + + Sync + + Send, + >, +} + pub struct NetApp { pub listen_addr: SocketAddr, pub netid: auth::Key, @@ -27,29 +40,21 @@ pub struct NetApp { pub privkey: ed25519::SecretKey, pub server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>, pub client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>, - pub(crate) msg_handlers: ArcSwap< - HashMap< - MessageKind, - Arc< - dyn Fn( - ed25519::PublicKey, - Bytes, - ) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> - + Sync - + Send, - >, - >, - >, + pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>, pub(crate) on_connected: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>, pub(crate) on_disconnected: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>, } -async fn handler_aux<M, F, R>(handler: Arc<F>, remote: ed25519::PublicKey, bytes: Bytes) -> Vec<u8> +async fn net_handler_aux<M, F, R>( + handler: Arc<F>, + remote: ed25519::PublicKey, + bytes: Bytes, +) -> Vec<u8> where M: Message + 'static, F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, - R: Future<Output = Result<<M as Message>::Response, Error>> + Send + Sync, + R: Future<Output = <M as Message>::Response> + Send + Sync, { debug!( "Handling message of kind {:08x} from {}", @@ -57,13 +62,28 @@ where hex::encode(remote) ); let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) { - Ok(msg) => handler(remote.clone(), msg).await, - Err(e) => Err(e.into()), + Ok(msg) => Ok(handler(remote, msg).await), + Err(e) => Err(e.to_string()), }; - let res = res.map_err(|e| format!("{}", e)); rmp_to_vec_all_named(&res).unwrap_or(vec![]) } +async fn local_handler_aux<M, F, R>( + handler: Arc<F>, + remote: ed25519::PublicKey, + msg: DynMsg, +) -> DynMsg +where + M: Message + 'static, + F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, + R: Future<Output = <M as Message>::Response> + Send + Sync, +{ + debug!("Handling message of kind {:08x} from ourself", M::KIND,); + let msg = (msg as Box<dyn Any + 'static>).downcast::<M>().unwrap(); + let res = handler(remote, *msg).await; + Box::new(res) +} + impl NetApp { pub fn new( listen_addr: SocketAddr, @@ -87,7 +107,7 @@ impl NetApp { netapp.add_msg_handler::<HelloMessage, _, _>( move |from: ed25519::PublicKey, msg: HelloMessage| { netapp2.handle_hello_message(from, msg); - async { Ok(()) } + async { () } }, ); @@ -98,16 +118,31 @@ impl NetApp { where M: Message + 'static, F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, - R: Future<Output = Result<<M as Message>::Response, Error>> + Send + Sync + 'static, + R: Future<Output = <M as Message>::Response> + Send + Sync + 'static, { let handler = Arc::new(handler); - let fun = Arc::new(move |remote: ed25519::PublicKey, bytes: Bytes| { + + let handler1 = handler.clone(); + let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| { let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> = - Box::pin(handler_aux(handler.clone(), remote, bytes)); + Box::pin(net_handler_aux(handler1.clone(), remote, bytes)); + fun + }); + + let self_id = self.pubkey.clone(); + let local_handler = Box::new(move |msg: DynMsg| { + let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> = + Box::pin(local_handler_aux(handler.clone(), self_id, msg)); fun }); + + let funs = Arc::new(Handler { + net_handler, + local_handler, + }); + let mut handlers = self.msg_handlers.load().as_ref().clone(); - handlers.insert(M::KIND, fun); + handlers.insert(M::KIND, funs); self.msg_handlers.store(Arc::new(handlers)); } @@ -136,23 +171,48 @@ impl NetApp { ip: SocketAddr, pk: ed25519::PublicKey, ) -> Result<(), Error> { + if pk == self.pubkey { + // Don't connect to ourself, we don't care + // but pretend we did + tokio::spawn(async move { + if let Some(h) = self.on_connected.load().as_ref() { + h(pk, ip, false); + } + }); + return Ok(()); + } + + // Don't connect if already connected if self.client_conns.read().unwrap().contains_key(&pk) { return Ok(()); } + let socket = TcpStream::connect(ip).await?; info!("Connected to {}, negotiating handshake...", ip); ClientConn::init(self, socket, pk.clone()).await?; Ok(()) } - pub fn disconnect(self: Arc<Self>, id: &ed25519::PublicKey) { - let conn = self.client_conns.read().unwrap().get(id).cloned(); + pub fn disconnect(self: Arc<Self>, pk: &ed25519::PublicKey) { + if *pk == self.pubkey { + let pk = *pk; + tokio::spawn(async move { + if let Some(h) = self.on_disconnected.load().as_ref() { + h(pk, false); + } + }); + return; + } + + let conn = self.client_conns.read().unwrap().get(pk).cloned(); if let Some(c) = conn { c.close(); } } pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) { + info!("Accepted connection from {}", hex::encode(id)); + let mut conn_list = self.server_conns.write().unwrap(); conn_list.insert(id.clone(), conn); } @@ -167,6 +227,8 @@ impl NetApp { } pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) { + info!("Connection from {} closed", hex::encode(id)); + let mut conn_list = self.server_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { @@ -180,6 +242,8 @@ impl NetApp { } pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) { + info!("Connection established to {}", hex::encode(id)); + { let mut conn_list = self.client_conns.write().unwrap(); if let Some(old_c) = conn_list.insert(id.clone(), conn.clone()) { @@ -200,6 +264,7 @@ impl NetApp { } pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) { + info!("Connection to {} closed", hex::encode(id)); let mut conn_list = self.client_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { @@ -211,4 +276,41 @@ impl NetApp { } } } + + pub async fn request<T>( + &self, + target: &ed25519::PublicKey, + rq: T, + prio: RequestPriority, + ) -> Result<<T as Message>::Response, Error> + where + T: Message + 'static, + { + if *target == self.pubkey { + let handler = self.msg_handlers.load().get(&T::KIND).cloned(); + match handler { + None => Err(Error::Message(format!( + "No handler registered for message kind {:08x}", + T::KIND + ))), + Some(h) => { + let local_handler = &h.local_handler; + let res = local_handler(Box::new(rq)).await; + let res_t = (res as Box<dyn Any + 'static>) + .downcast::<<T as Message>::Response>() + .unwrap(); + Ok(*res_t) + } + } + } else { + let conn = self.client_conns.read().unwrap().get(target).cloned(); + match conn { + None => Err(Error::Message(format!( + "Not connected: {}", + hex::encode(target) + ))), + Some(c) => c.request(rq, prio).await, + } + } + } } |