diff options
Diffstat (limited to 'src/conn.rs')
-rw-r--r-- | src/conn.rs | 319 |
1 files changed, 0 insertions, 319 deletions
diff --git a/src/conn.rs b/src/conn.rs deleted file mode 100644 index 64318dc..0000000 --- a/src/conn.rs +++ /dev/null @@ -1,319 +0,0 @@ -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::atomic::{self, AtomicBool, AtomicU32}; -use std::sync::{Arc, Mutex}; - -use bytes::Bytes; -use log::{debug, error, trace}; - -use tokio::net::TcpStream; -use tokio::sync::{mpsc, oneshot, watch}; -use tokio_util::compat::*; - -use futures::io::AsyncReadExt; - -use async_trait::async_trait; - -use kuska_handshake::async_std::{handshake_client, handshake_server, BoxStream}; - -use crate::endpoint::*; -use crate::error::*; -use crate::netapp::*; -use crate::proto::*; -use crate::util::*; - -// Request message format (client -> server): -// - u8 priority -// - u8 path length -// - [u8; path length] path -// - [u8; *] data - -// Response message format (server -> client): -// - u8 response code -// - [u8; *] response - -pub(crate) struct ServerConn { - pub(crate) remote_addr: SocketAddr, - pub(crate) peer_id: NodeID, - - netapp: Arc<NetApp>, - - resp_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, - close_send: watch::Sender<bool>, -} - -impl ServerConn { - pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> { - let remote_addr = socket.peer_addr()?; - let mut socket = socket.compat(); - - let handshake = handshake_server( - &mut socket, - netapp.netid.clone(), - netapp.id, - netapp.privkey.clone(), - ) - .await?; - let peer_id = handshake.peer_pk; - - debug!( - "Handshake complete (server) with {}@{}", - hex::encode(&peer_id), - remote_addr - ); - - let (read, write) = socket.split(); - - let (read, write) = - BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); - - let (resp_send, resp_recv) = mpsc::unbounded_channel(); - - let (close_send, close_recv) = watch::channel(false); - - let conn = Arc::new(ServerConn { - netapp: netapp.clone(), - remote_addr, - peer_id, - resp_send, - close_send, - }); - - netapp.connected_as_server(peer_id, conn.clone()); - - let conn2 = conn.clone(); - let conn3 = conn.clone(); - let close_recv2 = close_recv.clone(); - tokio::try_join!( - async move { - tokio::select!( - r = conn2.recv_loop(read) => r, - _ = await_exit(close_recv) => Ok(()), - ) - }, - async move { - tokio::select!( - r = conn3.send_loop(resp_recv, write) => r, - _ = await_exit(close_recv2) => Ok(()), - ) - }, - ) - .map(|_| ()) - .log_err("ServerConn recv_loop/send_loop"); - - netapp.disconnected_as_server(&peer_id, conn); - - Ok(()) - } - - pub fn close(&self) { - self.close_send.send(true).unwrap(); - } - - async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> { - if bytes.len() < 2 { - return Err(Error::Message("Invalid protocol message".into())); - } - - // byte 0 is the request priority, we don't care here - let path_length = bytes[1] as usize; - if bytes.len() < 2 + path_length { - return Err(Error::Message("Invalid protocol message".into())); - } - - let path = &bytes[2..2 + path_length]; - let path = String::from_utf8(path.to_vec())?; - let data = &bytes[2 + path_length..]; - - let handler_opt = { - let endpoints = self.netapp.endpoints.read().unwrap(); - endpoints.get(&path).map(|e| e.clone_endpoint()) - }; - - if let Some(handler) = handler_opt { - handler.handle(data, self.peer_id).await - } else { - Err(Error::NoHandler) - } - } -} - -impl SendLoop for ServerConn {} - -#[async_trait] -impl RecvLoop for ServerConn { - async fn recv_handler(self: Arc<Self>, id: RequestID, bytes: Vec<u8>) { - trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); - let bytes: Bytes = bytes.into(); - - let resp = self.recv_handler_aux(&bytes[..]).await; - let prio = bytes[0]; - - let mut resp_bytes = vec![]; - match resp { - Ok(rb) => { - resp_bytes.push(0u8); - resp_bytes.extend(&rb[..]); - } - Err(e) => { - resp_bytes.push(e.code()); - } - } - - self.resp_send - .send(Some((id, prio, resp_bytes))) - .log_err("ServerConn recv_handler send resp"); - } -} -pub(crate) struct ClientConn { - pub(crate) remote_addr: SocketAddr, - pub(crate) peer_id: NodeID, - - query_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, - - next_query_number: AtomicU32, - inflight: Mutex<HashMap<RequestID, oneshot::Sender<Vec<u8>>>>, - must_exit: AtomicBool, - stop_recv_loop: watch::Sender<bool>, -} - -impl ClientConn { - pub(crate) async fn init( - netapp: Arc<NetApp>, - socket: TcpStream, - peer_id: NodeID, - ) -> Result<(), Error> { - let remote_addr = socket.peer_addr()?; - let mut socket = socket.compat(); - - let handshake = handshake_client( - &mut socket, - netapp.netid.clone(), - netapp.id, - netapp.privkey.clone(), - peer_id, - ) - .await?; - - debug!( - "Handshake complete (client) with {}@{}", - hex::encode(&peer_id), - remote_addr - ); - - let (read, write) = socket.split(); - - let (read, write) = - BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); - - let (query_send, query_recv) = mpsc::unbounded_channel(); - - let (stop_recv_loop, stop_recv_loop_recv) = watch::channel(false); - - let conn = Arc::new(ClientConn { - remote_addr, - peer_id, - next_query_number: AtomicU32::from(RequestID::default()), - query_send, - inflight: Mutex::new(HashMap::new()), - must_exit: AtomicBool::new(false), - stop_recv_loop, - }); - - netapp.connected_as_client(peer_id, conn.clone()); - - tokio::spawn(async move { - let conn2 = conn.clone(); - let conn3 = conn.clone(); - tokio::try_join!(conn2.send_loop(query_recv, write), async move { - tokio::select!( - r = conn3.recv_loop(read) => r, - _ = await_exit(stop_recv_loop_recv) => Ok(()), - ) - }) - .map(|_| ()) - .log_err("ClientConn send_loop/recv_loop/dispatch_loop"); - - netapp.disconnected_as_client(&peer_id, conn); - }); - - Ok(()) - } - - pub fn close(&self) { - self.must_exit.store(true, atomic::Ordering::SeqCst); - self.query_send - .send(None) - .log_err("could not write None in query_send"); - if self.inflight.lock().unwrap().is_empty() { - self.stop_recv_loop - .send(true) - .log_err("could not write true to stop_recv_loop"); - } - } - - pub(crate) async fn call<T>( - self: Arc<Self>, - rq: T, - path: &str, - prio: RequestPriority, - ) -> Result<<T as Message>::Response, Error> - where - T: Message, - { - let id = self - .next_query_number - .fetch_add(1, atomic::Ordering::Relaxed); - let mut bytes = vec![prio, path.as_bytes().len() as u8]; - bytes.extend_from_slice(path.as_bytes()); - bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]); - - let (resp_send, resp_recv) = oneshot::channel(); - let old = self.inflight.lock().unwrap().insert(id, resp_send); - if let Some(old_ch) = old { - error!( - "Too many inflight requests! RequestID collision. Interrupting previous request." - ); - if old_ch.send(vec![]).is_err() { - debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); - } - } - - trace!("request: query_send {}, {} bytes", id, bytes.len()); - self.query_send.send(Some((id, prio, bytes)))?; - - let resp = resp_recv.await?; - - let code = resp[0]; - if code == 0 { - rmp_serde::decode::from_read_ref::<_, Result<<T as Message>::Response, String>>( - &resp[1..], - )? - .map_err(Error::Remote) - } else { - Err(Error::Remote(format!("Remote error code {}", code))) - } - } -} - -impl SendLoop for ClientConn {} - -#[async_trait] -impl RecvLoop for ClientConn { - async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) { - trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); - - let mut inflight = self.inflight.lock().unwrap(); - if let Some(ch) = inflight.remove(&id) { - if ch.send(msg).is_err() { - debug!("Could not send request response, probably because request was interrupted. Dropping response."); - } - } - - if inflight.is_empty() && self.must_exit.load(atomic::Ordering::SeqCst) { - self.stop_recv_loop - .send(true) - .log_err("could not write true to stop_recv_loop"); - } - } -} |