diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs | 78 | ||||
-rw-r--r-- | src/endpoint.rs | 1 | ||||
-rw-r--r-- | src/error.rs | 17 | ||||
-rw-r--r-- | src/lib.rs | 4 | ||||
-rw-r--r-- | src/netapp.rs | 107 | ||||
-rw-r--r-- | src/peering/basalt.rs | 2 | ||||
-rw-r--r-- | src/peering/fullmesh.rs | 6 | ||||
-rw-r--r-- | src/proto.rs | 56 | ||||
-rw-r--r-- | src/server.rs | 101 | ||||
-rw-r--r-- | src/util.rs | 14 |
10 files changed, 224 insertions, 162 deletions
diff --git a/src/client.rs b/src/client.rs index 127ff46..773fa9d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::atomic::{self, AtomicBool, AtomicU32}; +use std::sync::atomic::{self, AtomicU32}; use std::sync::{Arc, Mutex}; +use arc_swap::ArcSwapOption; use log::{debug, error, trace}; use tokio::net::TcpStream; +use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::compat::*; @@ -21,17 +23,14 @@ use crate::netapp::*; use crate::proto::*; use crate::util::*; - pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, - query_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, + query_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>>, next_query_number: AtomicU32, inflight: Mutex<HashMap<RequestID, oneshot::Sender<Vec<u8>>>>, - must_exit: AtomicBool, - stop_recv_loop: watch::Sender<bool>, } impl ClientConn { @@ -71,25 +70,35 @@ impl ClientConn { remote_addr, peer_id, next_query_number: AtomicU32::from(RequestID::default()), - query_send, + query_send: ArcSwapOption::new(Some(Arc::new(query_send))), inflight: Mutex::new(HashMap::new()), - must_exit: AtomicBool::new(false), - stop_recv_loop, }); netapp.connected_as_client(peer_id, conn.clone()); tokio::spawn(async move { + let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write)); + let conn2 = conn.clone(); - let conn3 = conn.clone(); - tokio::try_join!(conn2.send_loop(query_recv, write), async move { - tokio::select!( - r = conn3.recv_loop(read) => r, - _ = await_exit(stop_recv_loop_recv) => Ok(()), - ) - }) - .map(|_| ()) - .log_err("ClientConn send_loop/recv_loop/dispatch_loop"); + let recv_future = tokio::spawn(async move { + select! { + r = conn2.recv_loop(read) => r, + _ = await_exit(stop_recv_loop_recv) => Ok(()) + } + }); + + send_future.await.log_err("ClientConn send_loop"); + + // TODO here: wait for inflight requests to all have their response + stop_recv_loop + .send(true) + .log_err("ClientConn send true to stop_recv_loop"); + + recv_future.await.log_err("ClientConn recv_loop"); + + // Make sure we don't wait on any more requests that won't + // have a response + conn.inflight.lock().unwrap().clear(); netapp.disconnected_as_client(&peer_id, conn); }); @@ -98,15 +107,7 @@ impl ClientConn { } pub fn close(&self) { - self.must_exit.store(true, atomic::Ordering::SeqCst); - self.query_send - .send(None) - .log_err("could not write None in query_send"); - if self.inflight.lock().unwrap().is_empty() { - self.stop_recv_loop - .send(true) - .log_err("could not write true to stop_recv_loop"); - } + self.query_send.store(None); } pub(crate) async fn call<T>( @@ -118,6 +119,8 @@ impl ClientConn { where T: Message, { + let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?; + let id = self .next_query_number .fetch_add(1, atomic::Ordering::Relaxed); @@ -138,20 +141,23 @@ impl ClientConn { } trace!("request: query_send {}, {} bytes", id, bytes.len()); - self.query_send.send(Some((id, prio, bytes)))?; + query_send.send((id, prio, bytes))?; let resp = resp_recv.await?; - if resp.len() == 0 { - return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into())); + if resp.is_empty() { + return Err(Error::Message( + "Response is 0 bytes, either a collision or a protocol error".into(), + )); } trace!("request response {}: ", id); let code = resp[0]; if code == 0 { - Ok(rmp_serde::decode::from_read_ref::<_, <T as Message>::Response>( - &resp[1..], - )?) + Ok(rmp_serde::decode::from_read_ref::< + _, + <T as Message>::Response, + >(&resp[1..])?) } else { Err(Error::Remote(format!("Remote error code {}", code))) } @@ -162,7 +168,7 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { - async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) { + fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>) { trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); let mut inflight = self.inflight.lock().unwrap(); @@ -171,11 +177,5 @@ impl RecvLoop for ClientConn { debug!("Could not send request response, probably because request was interrupted. Dropping response."); } } - - if inflight.is_empty() && self.must_exit.load(atomic::Ordering::SeqCst) { - self.stop_recv_loop - .send(true) - .log_err("could not write true to stop_recv_loop"); - } } } diff --git a/src/endpoint.rs b/src/endpoint.rs index 83957e2..0e1f5c8 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -123,4 +123,3 @@ where Box::new(Self(self.0.clone())) } } - diff --git a/src/error.rs b/src/error.rs index 14c6187..0ed30a5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -31,6 +31,9 @@ pub enum Error { #[error(display = "No handler / shutting down")] NoHandler, + #[error(display = "Connection closed")] + ConnectionClosed, + #[error(display = "Remote error: {}", _0)] Remote(String), } @@ -45,6 +48,7 @@ impl Error { Self::RMPDecode(_) => 11, Self::UTF8(_) => 12, Self::NoHandler => 20, + Self::ConnectionClosed => 21, Self::Handshake(_) => 30, Self::Remote(_) => 40, Self::Message(_) => 99, @@ -80,3 +84,16 @@ where }; } } + +impl<E, T> LogError for Result<T, E> +where + T: LogError, + E: Into<Error>, +{ + fn log_err(self, msg: &'static str) { + match self { + Err(e) => error!("Error: {}: {}", msg, Into::<Error>::into(e)), + Ok(x) => x.log_err(msg), + } + } +} @@ -13,16 +13,14 @@ //! about message priorization. //! Also check out the examples to learn how to use this crate. -#![feature(map_first_last)] - pub mod error; pub mod util; pub mod endpoint; pub mod proto; -mod server; mod client; +mod server; pub mod netapp; pub mod peering; diff --git a/src/netapp.rs b/src/netapp.rs index b6994ea..bffa0e1 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::{Arc, RwLock}; -use log::{debug, info, error}; +use log::{debug, error, info, trace, warn}; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -10,13 +10,18 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; + +use futures::stream::futures_unordered::FuturesUnordered; +use futures::stream::StreamExt; use tokio::net::{TcpListener, TcpStream}; +use tokio::select; +use tokio::sync::{mpsc, watch}; use crate::client::*; -use crate::server::*; use crate::endpoint::*; use crate::error::*; use crate::proto::*; +use crate::server::*; use crate::util::*; #[derive(Serialize, Deserialize)] @@ -142,35 +147,91 @@ impl NetApp { /// Main listening process for our app. This future runs during the whole /// run time of our application. /// If this is not called, the NetApp instance remains a passive client. - pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) { + pub async fn listen( + self: Arc<Self>, + listen_addr: SocketAddr, + public_addr: Option<IpAddr>, + mut must_exit: watch::Receiver<bool>, + ) { let listen_params = ListenParams { listen_addr, public_addr, }; - if self.listen_params.swap(Some(Arc::new(listen_params))).is_some() { + if self + .listen_params + .swap(Some(Arc::new(listen_params))) + .is_some() + { error!("Trying to listen on NetApp but we're already listening!"); } let listener = TcpListener::bind(listen_addr).await.unwrap(); info!("Listening on {}", listen_addr); - loop { - // The second item contains the IP and port of the new connection. - let (socket, _) = listener.accept().await.unwrap(); + let (conn_in, mut conn_out) = mpsc::unbounded_channel(); + let connection_collector = tokio::spawn(async move { + let mut collection = FuturesUnordered::new(); + loop { + if collection.is_empty() { + match conn_out.recv().await { + Some(f) => collection.push(f), + None => break, + } + } else { + select! { + new_fut = conn_out.recv() => { + match new_fut { + Some(f) => collection.push(f), + None => break, + } + } + result = collection.next() => { + trace!("Collected connection: {:?}", result); + } + } + } + } + debug!("Collecting last open server connections."); + while let Some(conn_res) = collection.next().await { + trace!("Collected connection: {:?}", conn_res); + } + debug!("No more server connections to collect"); + }); + + while !*must_exit.borrow_and_update() { + let (socket, peer_addr) = select! { + sockres = listener.accept() => { + match sockres { + Ok(x) => x, + Err(e) => { + warn!("Error in listener.accept: {}", e); + continue; + } + } + }, + _ = must_exit.changed() => continue, + }; + info!( "Incoming connection from {}, negotiating handshake...", - match socket.peer_addr() { - Ok(x) => format!("{}", x), - Err(e) => format!("<invalid addr: {}>", e), - } + peer_addr ); let self2 = self.clone(); - tokio::spawn(async move { - ServerConn::run(self2, socket) - .await - .log_err("ServerConn::run"); - }); + let must_exit2 = must_exit.clone(); + conn_in + .send(tokio::spawn(async move { + ServerConn::run(self2, socket, must_exit2) + .await + .log_err("ServerConn::run"); + })) + .log_err("Failed to send connection to connection collector"); } + + drop(conn_in); + + connection_collector + .await + .log_err("Failed to await for connection collector"); } /// Attempt to connect to a peer, given by its ip:port and its public key. @@ -231,20 +292,6 @@ impl NetApp { }); } - /// Close the incoming connection from a certain client to us, - /// if such a connection is currently open. - pub fn server_disconnect(self: &Arc<Self>, id: &NodeID) { - let conn = self.server_conns.read().unwrap().get(id).cloned(); - if let Some(c) = conn { - debug!( - "Closing incoming connection from {} ({})", - hex::encode(c.peer_id), - c.remote_addr - ); - c.close(); - } - } - // Called from conn.rs when an incoming connection is successfully established // Registers the connection in our list of connections // Do not yet call the on_connected handler, because we don't know if the remote diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index e0c8301..efbf6e6 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -3,11 +3,11 @@ use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use std::time::Duration; +use async_trait::async_trait; use log::{debug, info, trace, warn}; use lru::LruCache; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; -use async_trait::async_trait; use sodiumoxide::crypto::hash; diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index b579654..793eeb2 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -8,6 +8,8 @@ use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; +use tokio::sync::watch; + use sodiumoxide::crypto::hash; use crate::endpoint::*; @@ -171,8 +173,8 @@ impl FullMeshPeeringStrategy { strat } - pub async fn run(self: Arc<Self>) { - loop { + pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) { + while !*must_exit.borrow() { // 1. Read current state: get list of connected peers (ping them) let (to_ping, to_retry) = { let known_hosts = self.known_hosts.read().unwrap(); diff --git a/src/proto.rs b/src/proto.rs index 3811e3f..f91ffc7 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use log::trace; @@ -50,7 +50,6 @@ type ChunkLength = u16; const MAX_CHUNK_LENGTH: ChunkLength = 0x4000; const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; - struct SendQueueItem { id: RequestID, prio: RequestPriority, @@ -59,31 +58,33 @@ struct SendQueueItem { } struct SendQueue { - items: BTreeMap<u8, VecDeque<SendQueueItem>>, + items: VecDeque<(u8, VecDeque<SendQueueItem>)>, } impl SendQueue { fn new() -> Self { Self { - items: BTreeMap::new(), + items: VecDeque::with_capacity(64), } } fn push(&mut self, item: SendQueueItem) { let prio = item.prio; - let mut items_at_prio = self - .items - .remove(&prio) - .unwrap_or_else(|| VecDeque::with_capacity(4)); - items_at_prio.push_back(item); - self.items.insert(prio, items_at_prio); + let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) { + Ok(i) => i, + Err(i) => { + self.items.insert(i, (prio, VecDeque::new())); + i + } + }; + self.items[pos_prio].1.push_back(item); } fn pop(&mut self) -> Option<SendQueueItem> { - match self.items.pop_first() { + match self.items.pop_front() { None => None, Some((prio, mut items_at_prio)) => { let ret = items_at_prio.pop_front(); if !items_at_prio.is_empty() { - self.items.insert(prio, items_at_prio); + self.items.push_front((prio, items_at_prio)); } ret.or_else(|| self.pop()) } @@ -98,7 +99,7 @@ impl SendQueue { pub(crate) trait SendLoop: Sync { async fn send_loop<W>( self: Arc<Self>, - mut msg_recv: mpsc::UnboundedReceiver<Option<(RequestID, RequestPriority, Vec<u8>)>>, + mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>, mut write: W, ) -> Result<(), Error> where @@ -107,18 +108,14 @@ pub(crate) trait SendLoop: Sync { let mut sending = SendQueue::new(); let mut should_exit = false; while !should_exit || !sending.is_empty() { - if let Ok(sth) = msg_recv.try_recv() { - if let Some((id, prio, data)) = sth { - trace!("send_loop: got {}, {} bytes", id, data.len()); - sending.push(SendQueueItem { - id, - prio, - data, - cursor: 0, - }); - } else { - should_exit = true; - } + if let Ok((id, prio, data)) = msg_recv.try_recv() { + trace!("send_loop: got {}, {} bytes", id, data.len()); + sending.push(SendQueueItem { + id, + prio, + data, + cursor: 0, + }); } else if let Some(mut item) = sending.pop() { trace!( "send_loop: sending bytes for {} ({} bytes, {} already sent)", @@ -149,10 +146,7 @@ pub(crate) trait SendLoop: Sync { } write.flush().await?; } else { - let sth = msg_recv - .recv() - .await - .ok_or_else(|| Error::Message("Connection closed.".into()))?; + let sth = msg_recv.recv().await; if let Some((id, prio, data)) = sth { trace!("send_loop: got {}, {} bytes", id, data.len()); sending.push(SendQueueItem { @@ -173,7 +167,7 @@ pub(crate) trait SendLoop: Sync { #[async_trait] pub(crate) trait RecvLoop: Sync + 'static { // Returns true if we should stop receiving after this - async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>); + fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>); async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> where @@ -205,7 +199,7 @@ pub(crate) trait RecvLoop: Sync + 'static { if has_cont { receiving.insert(id, msg_bytes); } else { - tokio::spawn(self.clone().recv_handler(id, msg_bytes)); + self.recv_handler(id, msg_bytes); } } } diff --git a/src/server.rs b/src/server.rs index c7d99b5..f23b810 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,12 @@ use std::net::SocketAddr; -use std::sync::{Arc}; +use std::sync::Arc; +use arc_swap::ArcSwapOption; use bytes::Bytes; use log::{debug, trace}; use tokio::net::TcpStream; +use tokio::select; use tokio::sync::{mpsc, watch}; use tokio_util::compat::*; @@ -42,12 +44,15 @@ pub(crate) struct ServerConn { netapp: Arc<NetApp>, - resp_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, - close_send: watch::Sender<bool>, + resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>>, } impl ServerConn { - pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> { + pub(crate) async fn run( + netapp: Arc<NetApp>, + socket: TcpStream, + must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { let remote_addr = socket.peer_addr()?; let mut socket = socket.compat(); @@ -73,47 +78,33 @@ impl ServerConn { let (resp_send, resp_recv) = mpsc::unbounded_channel(); - let (close_send, close_recv) = watch::channel(false); - let conn = Arc::new(ServerConn { netapp: netapp.clone(), remote_addr, peer_id, - resp_send, - close_send, + resp_send: ArcSwapOption::new(Some(Arc::new(resp_send))), }); netapp.connected_as_server(peer_id, conn.clone()); let conn2 = conn.clone(); - let conn3 = conn.clone(); - let close_recv2 = close_recv.clone(); - tokio::try_join!( - async move { - tokio::select!( - r = conn2.recv_loop(read) => r, - _ = await_exit(close_recv) => Ok(()), - ) - }, - async move { - tokio::select!( - r = conn3.send_loop(resp_recv, write) => r, - _ = await_exit(close_recv2) => Ok(()), - ) - }, - ) - .map(|_| ()) - .log_err("ServerConn recv_loop/send_loop"); + let recv_future = tokio::spawn(async move { + select! { + r = conn2.recv_loop(read) => r, + _ = await_exit(must_exit) => Ok(()) + } + }); + let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write)); + + recv_future.await.log_err("ServerConn recv_loop"); + conn.resp_send.store(None); + send_future.await.log_err("ServerConn send_loop"); netapp.disconnected_as_server(&peer_id, conn); Ok(()) } - pub fn close(&self) { - self.close_send.send(true).unwrap(); - } - async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> { if bytes.len() < 2 { return Err(Error::Message("Invalid protocol message".into())); @@ -146,33 +137,33 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { - async fn recv_handler(self: Arc<Self>, id: RequestID, bytes: Vec<u8>) { - trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); - let bytes: Bytes = bytes.into(); - - let prio = if !bytes.is_empty() { - bytes[0] - } else { - 0u8 - }; - let resp = self.recv_handler_aux(&bytes[..]).await; - - let mut resp_bytes = vec![]; - match resp { - Ok(rb) => { - resp_bytes.push(0u8); - resp_bytes.extend(&rb[..]); - } - Err(e) => { - resp_bytes.push(e.code()); + fn recv_handler(self: &Arc<Self>, id: RequestID, bytes: Vec<u8>) { + let resp_send = self.resp_send.load_full().unwrap(); + + let self2 = self.clone(); + tokio::spawn(async move { + trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); + let bytes: Bytes = bytes.into(); + + let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 }; + let resp = self2.recv_handler_aux(&bytes[..]).await; + + let mut resp_bytes = vec![]; + match resp { + Ok(rb) => { + resp_bytes.push(0u8); + resp_bytes.extend(&rb[..]); + } + Err(e) => { + resp_bytes.push(e.code()); + } } - } - trace!("ServerConn sending response to {}: ", id); + trace!("ServerConn sending response to {}: ", id); - self.resp_send - .send(Some((id, prio, resp_bytes))) - .log_err("ServerConn recv_handler send resp"); + resp_send + .send((id, prio, resp_bytes)) + .log_err("ServerConn recv_handler send resp"); + }); } } - diff --git a/src/util.rs b/src/util.rs index ba485bf..e5b57ec 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,5 +1,7 @@ use serde::Serialize; +use log::info; + use tokio::sync::watch; pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey; @@ -38,3 +40,15 @@ pub async fn await_exit(mut must_exit: watch::Receiver<bool>) { } } } + +pub fn watch_ctrl_c() -> watch::Receiver<bool> { + let (send_cancel, watch_cancel) = watch::channel(false); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.send(true).unwrap(); + }); + watch_cancel +} |