diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-12 18:13:07 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-12 18:13:07 +0200 |
commit | d9bd1182f7b980df8e631ae8eeca444f5d997909 (patch) | |
tree | 9345b078b2225c2fd58074ef3016ea6524d0ab3f /src/client.rs | |
parent | f87dbe73dc12f2d6eb13850a3bc4b012aadd3c9b (diff) | |
download | netapp-d9bd1182f7b980df8e631ae8eeca444f5d997909.tar.gz netapp-d9bd1182f7b980df8e631ae8eeca444f5d997909.zip |
Move out things from conn.rs into two separate files
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 180 |
1 files changed, 180 insertions, 0 deletions
diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..a436d53 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,180 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::atomic::{self, AtomicBool, AtomicU32}; +use std::sync::{Arc, Mutex}; + +use log::{debug, error, trace}; + +use tokio::net::TcpStream; +use tokio::sync::{mpsc, oneshot, watch}; +use tokio_util::compat::*; + +use futures::io::AsyncReadExt; + +use async_trait::async_trait; + +use kuska_handshake::async_std::{handshake_client, BoxStream}; + +use crate::endpoint::*; +use crate::error::*; +use crate::netapp::*; +use crate::proto::*; +use crate::util::*; + + +pub(crate) struct ClientConn { + pub(crate) remote_addr: SocketAddr, + pub(crate) peer_id: NodeID, + + query_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, + + next_query_number: AtomicU32, + inflight: Mutex<HashMap<RequestID, oneshot::Sender<Vec<u8>>>>, + must_exit: AtomicBool, + stop_recv_loop: watch::Sender<bool>, +} + +impl ClientConn { + pub(crate) async fn init( + netapp: Arc<NetApp>, + socket: TcpStream, + peer_id: NodeID, + ) -> Result<(), Error> { + let remote_addr = socket.peer_addr()?; + let mut socket = socket.compat(); + + let handshake = handshake_client( + &mut socket, + netapp.netid.clone(), + netapp.id, + netapp.privkey.clone(), + peer_id, + ) + .await?; + + debug!( + "Handshake complete (client) with {}@{}", + hex::encode(&peer_id), + remote_addr + ); + + let (read, write) = socket.split(); + + let (read, write) = + BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); + + let (query_send, query_recv) = mpsc::unbounded_channel(); + + let (stop_recv_loop, stop_recv_loop_recv) = watch::channel(false); + + let conn = Arc::new(ClientConn { + remote_addr, + peer_id, + next_query_number: AtomicU32::from(RequestID::default()), + query_send, + inflight: Mutex::new(HashMap::new()), + must_exit: AtomicBool::new(false), + stop_recv_loop, + }); + + netapp.connected_as_client(peer_id, conn.clone()); + + tokio::spawn(async move { + let conn2 = conn.clone(); + let conn3 = conn.clone(); + tokio::try_join!(conn2.send_loop(query_recv, write), async move { + tokio::select!( + r = conn3.recv_loop(read) => r, + _ = await_exit(stop_recv_loop_recv) => Ok(()), + ) + }) + .map(|_| ()) + .log_err("ClientConn send_loop/recv_loop/dispatch_loop"); + + netapp.disconnected_as_client(&peer_id, conn); + }); + + Ok(()) + } + + pub fn close(&self) { + self.must_exit.store(true, atomic::Ordering::SeqCst); + self.query_send + .send(None) + .log_err("could not write None in query_send"); + if self.inflight.lock().unwrap().is_empty() { + self.stop_recv_loop + .send(true) + .log_err("could not write true to stop_recv_loop"); + } + } + + pub(crate) async fn call<T>( + self: Arc<Self>, + rq: T, + path: &str, + prio: RequestPriority, + ) -> Result<<T as Message>::Response, Error> + where + T: Message, + { + let id = self + .next_query_number + .fetch_add(1, atomic::Ordering::Relaxed); + + let mut bytes = vec![prio, path.as_bytes().len() as u8]; + bytes.extend_from_slice(path.as_bytes()); + bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]); + + let (resp_send, resp_recv) = oneshot::channel(); + let old = self.inflight.lock().unwrap().insert(id, resp_send); + if let Some(old_ch) = old { + error!( + "Too many inflight requests! RequestID collision. Interrupting previous request." + ); + if old_ch.send(vec![]).is_err() { + debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); + } + } + + trace!("request: query_send {}, {} bytes", id, bytes.len()); + self.query_send.send(Some((id, prio, bytes)))?; + + let resp = resp_recv.await?; + if resp.len() == 0 { + return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into())); + } + + let code = resp[0]; + if code == 0 { + rmp_serde::decode::from_read_ref::<_, Result<<T as Message>::Response, String>>( + &resp[1..], + )? + .map_err(Error::Remote) + } else { + Err(Error::Remote(format!("Remote error code {}", code))) + } + } +} + +impl SendLoop for ClientConn {} + +#[async_trait] +impl RecvLoop for ClientConn { + async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) { + trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); + + let mut inflight = self.inflight.lock().unwrap(); + if let Some(ch) = inflight.remove(&id) { + if ch.send(msg).is_err() { + debug!("Could not send request response, probably because request was interrupted. Dropping response."); + } + } + + if inflight.is_empty() && self.must_exit.load(atomic::Ordering::SeqCst) { + self.stop_recv_loop + .send(true) + .log_err("could not write true to stop_recv_loop"); + } + } +} |