diff options
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 78 |
1 files changed, 39 insertions, 39 deletions
diff --git a/src/client.rs b/src/client.rs index 127ff46..773fa9d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::atomic::{self, AtomicBool, AtomicU32}; +use std::sync::atomic::{self, AtomicU32}; use std::sync::{Arc, Mutex}; +use arc_swap::ArcSwapOption; use log::{debug, error, trace}; use tokio::net::TcpStream; +use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::compat::*; @@ -21,17 +23,14 @@ use crate::netapp::*; use crate::proto::*; use crate::util::*; - pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, - query_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, + query_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>>, next_query_number: AtomicU32, inflight: Mutex<HashMap<RequestID, oneshot::Sender<Vec<u8>>>>, - must_exit: AtomicBool, - stop_recv_loop: watch::Sender<bool>, } impl ClientConn { @@ -71,25 +70,35 @@ impl ClientConn { remote_addr, peer_id, next_query_number: AtomicU32::from(RequestID::default()), - query_send, + query_send: ArcSwapOption::new(Some(Arc::new(query_send))), inflight: Mutex::new(HashMap::new()), - must_exit: AtomicBool::new(false), - stop_recv_loop, }); netapp.connected_as_client(peer_id, conn.clone()); tokio::spawn(async move { + let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write)); + let conn2 = conn.clone(); - let conn3 = conn.clone(); - tokio::try_join!(conn2.send_loop(query_recv, write), async move { - tokio::select!( - r = conn3.recv_loop(read) => r, - _ = await_exit(stop_recv_loop_recv) => Ok(()), - ) - }) - .map(|_| ()) - .log_err("ClientConn send_loop/recv_loop/dispatch_loop"); + let recv_future = tokio::spawn(async move { + select! { + r = conn2.recv_loop(read) => r, + _ = await_exit(stop_recv_loop_recv) => Ok(()) + } + }); + + send_future.await.log_err("ClientConn send_loop"); + + // TODO here: wait for inflight requests to all have their response + stop_recv_loop + .send(true) + .log_err("ClientConn send true to stop_recv_loop"); + + recv_future.await.log_err("ClientConn recv_loop"); + + // Make sure we don't wait on any more requests that won't + // have a response + conn.inflight.lock().unwrap().clear(); netapp.disconnected_as_client(&peer_id, conn); }); @@ -98,15 +107,7 @@ impl ClientConn { } pub fn close(&self) { - self.must_exit.store(true, atomic::Ordering::SeqCst); - self.query_send - .send(None) - .log_err("could not write None in query_send"); - if self.inflight.lock().unwrap().is_empty() { - self.stop_recv_loop - .send(true) - .log_err("could not write true to stop_recv_loop"); - } + self.query_send.store(None); } pub(crate) async fn call<T>( @@ -118,6 +119,8 @@ impl ClientConn { where T: Message, { + let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?; + let id = self .next_query_number .fetch_add(1, atomic::Ordering::Relaxed); @@ -138,20 +141,23 @@ impl ClientConn { } trace!("request: query_send {}, {} bytes", id, bytes.len()); - self.query_send.send(Some((id, prio, bytes)))?; + query_send.send((id, prio, bytes))?; let resp = resp_recv.await?; - if resp.len() == 0 { - return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into())); + if resp.is_empty() { + return Err(Error::Message( + "Response is 0 bytes, either a collision or a protocol error".into(), + )); } trace!("request response {}: ", id); let code = resp[0]; if code == 0 { - Ok(rmp_serde::decode::from_read_ref::<_, <T as Message>::Response>( - &resp[1..], - )?) + Ok(rmp_serde::decode::from_read_ref::< + _, + <T as Message>::Response, + >(&resp[1..])?) } else { Err(Error::Remote(format!("Remote error code {}", code))) } @@ -162,7 +168,7 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { - async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) { + fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>) { trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); let mut inflight = self.inflight.lock().unwrap(); @@ -171,11 +177,5 @@ impl RecvLoop for ClientConn { debug!("Could not send request response, probably because request was interrupted. Dropping response."); } } - - if inflight.is_empty() && self.must_exit.load(atomic::Ordering::SeqCst) { - self.stop_recv_loop - .send(true) - .log_err("could not write true to stop_recv_loop"); - } } } |