From d9bd1182f7b980df8e631ae8eeca444f5d997909 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 12 Oct 2021 18:13:07 +0200 Subject: Move out things from conn.rs into two separate files --- src/client.rs | 180 +++++++++++++++++++++++++++++++++ src/conn.rs | 319 ---------------------------------------------------------- src/lib.rs | 3 +- src/netapp.rs | 3 +- src/proto.rs | 14 +-- src/server.rs | 171 +++++++++++++++++++++++++++++++ 6 files changed, 363 insertions(+), 327 deletions(-) create mode 100644 src/client.rs delete mode 100644 src/conn.rs create mode 100644 src/server.rs (limited to 'src') diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..a436d53 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,180 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::atomic::{self, AtomicBool, AtomicU32}; +use std::sync::{Arc, Mutex}; + +use log::{debug, error, trace}; + +use tokio::net::TcpStream; +use tokio::sync::{mpsc, oneshot, watch}; +use tokio_util::compat::*; + +use futures::io::AsyncReadExt; + +use async_trait::async_trait; + +use kuska_handshake::async_std::{handshake_client, BoxStream}; + +use crate::endpoint::*; +use crate::error::*; +use crate::netapp::*; +use crate::proto::*; +use crate::util::*; + + +pub(crate) struct ClientConn { + pub(crate) remote_addr: SocketAddr, + pub(crate) peer_id: NodeID, + + query_send: mpsc::UnboundedSender)>>, + + next_query_number: AtomicU32, + inflight: Mutex>>>, + must_exit: AtomicBool, + stop_recv_loop: watch::Sender, +} + +impl ClientConn { + pub(crate) async fn init( + netapp: Arc, + socket: TcpStream, + peer_id: NodeID, + ) -> Result<(), Error> { + let remote_addr = socket.peer_addr()?; + let mut socket = socket.compat(); + + let handshake = handshake_client( + &mut socket, + netapp.netid.clone(), + netapp.id, + netapp.privkey.clone(), + peer_id, + ) + .await?; + + debug!( + "Handshake complete (client) with {}@{}", + hex::encode(&peer_id), + remote_addr + ); + + let (read, write) = socket.split(); + + let (read, write) = + BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); + + let (query_send, query_recv) = mpsc::unbounded_channel(); + + let (stop_recv_loop, stop_recv_loop_recv) = watch::channel(false); + + let conn = Arc::new(ClientConn { + remote_addr, + peer_id, + next_query_number: AtomicU32::from(RequestID::default()), + query_send, + inflight: Mutex::new(HashMap::new()), + must_exit: AtomicBool::new(false), + stop_recv_loop, + }); + + netapp.connected_as_client(peer_id, conn.clone()); + + tokio::spawn(async move { + let conn2 = conn.clone(); + let conn3 = conn.clone(); + tokio::try_join!(conn2.send_loop(query_recv, write), async move { + tokio::select!( + r = conn3.recv_loop(read) => r, + _ = await_exit(stop_recv_loop_recv) => Ok(()), + ) + }) + .map(|_| ()) + .log_err("ClientConn send_loop/recv_loop/dispatch_loop"); + + netapp.disconnected_as_client(&peer_id, conn); + }); + + Ok(()) + } + + pub fn close(&self) { + self.must_exit.store(true, atomic::Ordering::SeqCst); + self.query_send + .send(None) + .log_err("could not write None in query_send"); + if self.inflight.lock().unwrap().is_empty() { + self.stop_recv_loop + .send(true) + .log_err("could not write true to stop_recv_loop"); + } + } + + pub(crate) async fn call( + self: Arc, + rq: T, + path: &str, + prio: RequestPriority, + ) -> Result<::Response, Error> + where + T: Message, + { + let id = self + .next_query_number + .fetch_add(1, atomic::Ordering::Relaxed); + + let mut bytes = vec![prio, path.as_bytes().len() as u8]; + bytes.extend_from_slice(path.as_bytes()); + bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]); + + let (resp_send, resp_recv) = oneshot::channel(); + let old = self.inflight.lock().unwrap().insert(id, resp_send); + if let Some(old_ch) = old { + error!( + "Too many inflight requests! RequestID collision. Interrupting previous request." + ); + if old_ch.send(vec![]).is_err() { + debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); + } + } + + trace!("request: query_send {}, {} bytes", id, bytes.len()); + self.query_send.send(Some((id, prio, bytes)))?; + + let resp = resp_recv.await?; + if resp.len() == 0 { + return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into())); + } + + let code = resp[0]; + if code == 0 { + rmp_serde::decode::from_read_ref::<_, Result<::Response, String>>( + &resp[1..], + )? + .map_err(Error::Remote) + } else { + Err(Error::Remote(format!("Remote error code {}", code))) + } + } +} + +impl SendLoop for ClientConn {} + +#[async_trait] +impl RecvLoop for ClientConn { + async fn recv_handler(self: Arc, id: RequestID, msg: Vec) { + trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); + + let mut inflight = self.inflight.lock().unwrap(); + if let Some(ch) = inflight.remove(&id) { + if ch.send(msg).is_err() { + debug!("Could not send request response, probably because request was interrupted. Dropping response."); + } + } + + if inflight.is_empty() && self.must_exit.load(atomic::Ordering::SeqCst) { + self.stop_recv_loop + .send(true) + .log_err("could not write true to stop_recv_loop"); + } + } +} diff --git a/src/conn.rs b/src/conn.rs deleted file mode 100644 index 64318dc..0000000 --- a/src/conn.rs +++ /dev/null @@ -1,319 +0,0 @@ -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::atomic::{self, AtomicBool, AtomicU32}; -use std::sync::{Arc, Mutex}; - -use bytes::Bytes; -use log::{debug, error, trace}; - -use tokio::net::TcpStream; -use tokio::sync::{mpsc, oneshot, watch}; -use tokio_util::compat::*; - -use futures::io::AsyncReadExt; - -use async_trait::async_trait; - -use kuska_handshake::async_std::{handshake_client, handshake_server, BoxStream}; - -use crate::endpoint::*; -use crate::error::*; -use crate::netapp::*; -use crate::proto::*; -use crate::util::*; - -// Request message format (client -> server): -// - u8 priority -// - u8 path length -// - [u8; path length] path -// - [u8; *] data - -// Response message format (server -> client): -// - u8 response code -// - [u8; *] response - -pub(crate) struct ServerConn { - pub(crate) remote_addr: SocketAddr, - pub(crate) peer_id: NodeID, - - netapp: Arc, - - resp_send: mpsc::UnboundedSender)>>, - close_send: watch::Sender, -} - -impl ServerConn { - pub(crate) async fn run(netapp: Arc, socket: TcpStream) -> Result<(), Error> { - let remote_addr = socket.peer_addr()?; - let mut socket = socket.compat(); - - let handshake = handshake_server( - &mut socket, - netapp.netid.clone(), - netapp.id, - netapp.privkey.clone(), - ) - .await?; - let peer_id = handshake.peer_pk; - - debug!( - "Handshake complete (server) with {}@{}", - hex::encode(&peer_id), - remote_addr - ); - - let (read, write) = socket.split(); - - let (read, write) = - BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); - - let (resp_send, resp_recv) = mpsc::unbounded_channel(); - - let (close_send, close_recv) = watch::channel(false); - - let conn = Arc::new(ServerConn { - netapp: netapp.clone(), - remote_addr, - peer_id, - resp_send, - close_send, - }); - - netapp.connected_as_server(peer_id, conn.clone()); - - let conn2 = conn.clone(); - let conn3 = conn.clone(); - let close_recv2 = close_recv.clone(); - tokio::try_join!( - async move { - tokio::select!( - r = conn2.recv_loop(read) => r, - _ = await_exit(close_recv) => Ok(()), - ) - }, - async move { - tokio::select!( - r = conn3.send_loop(resp_recv, write) => r, - _ = await_exit(close_recv2) => Ok(()), - ) - }, - ) - .map(|_| ()) - .log_err("ServerConn recv_loop/send_loop"); - - netapp.disconnected_as_server(&peer_id, conn); - - Ok(()) - } - - pub fn close(&self) { - self.close_send.send(true).unwrap(); - } - - async fn recv_handler_aux(self: &Arc, bytes: &[u8]) -> Result, Error> { - if bytes.len() < 2 { - return Err(Error::Message("Invalid protocol message".into())); - } - - // byte 0 is the request priority, we don't care here - let path_length = bytes[1] as usize; - if bytes.len() < 2 + path_length { - return Err(Error::Message("Invalid protocol message".into())); - } - - let path = &bytes[2..2 + path_length]; - let path = String::from_utf8(path.to_vec())?; - let data = &bytes[2 + path_length..]; - - let handler_opt = { - let endpoints = self.netapp.endpoints.read().unwrap(); - endpoints.get(&path).map(|e| e.clone_endpoint()) - }; - - if let Some(handler) = handler_opt { - handler.handle(data, self.peer_id).await - } else { - Err(Error::NoHandler) - } - } -} - -impl SendLoop for ServerConn {} - -#[async_trait] -impl RecvLoop for ServerConn { - async fn recv_handler(self: Arc, id: RequestID, bytes: Vec) { - trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); - let bytes: Bytes = bytes.into(); - - let resp = self.recv_handler_aux(&bytes[..]).await; - let prio = bytes[0]; - - let mut resp_bytes = vec![]; - match resp { - Ok(rb) => { - resp_bytes.push(0u8); - resp_bytes.extend(&rb[..]); - } - Err(e) => { - resp_bytes.push(e.code()); - } - } - - self.resp_send - .send(Some((id, prio, resp_bytes))) - .log_err("ServerConn recv_handler send resp"); - } -} -pub(crate) struct ClientConn { - pub(crate) remote_addr: SocketAddr, - pub(crate) peer_id: NodeID, - - query_send: mpsc::UnboundedSender)>>, - - next_query_number: AtomicU32, - inflight: Mutex>>>, - must_exit: AtomicBool, - stop_recv_loop: watch::Sender, -} - -impl ClientConn { - pub(crate) async fn init( - netapp: Arc, - socket: TcpStream, - peer_id: NodeID, - ) -> Result<(), Error> { - let remote_addr = socket.peer_addr()?; - let mut socket = socket.compat(); - - let handshake = handshake_client( - &mut socket, - netapp.netid.clone(), - netapp.id, - netapp.privkey.clone(), - peer_id, - ) - .await?; - - debug!( - "Handshake complete (client) with {}@{}", - hex::encode(&peer_id), - remote_addr - ); - - let (read, write) = socket.split(); - - let (read, write) = - BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); - - let (query_send, query_recv) = mpsc::unbounded_channel(); - - let (stop_recv_loop, stop_recv_loop_recv) = watch::channel(false); - - let conn = Arc::new(ClientConn { - remote_addr, - peer_id, - next_query_number: AtomicU32::from(RequestID::default()), - query_send, - inflight: Mutex::new(HashMap::new()), - must_exit: AtomicBool::new(false), - stop_recv_loop, - }); - - netapp.connected_as_client(peer_id, conn.clone()); - - tokio::spawn(async move { - let conn2 = conn.clone(); - let conn3 = conn.clone(); - tokio::try_join!(conn2.send_loop(query_recv, write), async move { - tokio::select!( - r = conn3.recv_loop(read) => r, - _ = await_exit(stop_recv_loop_recv) => Ok(()), - ) - }) - .map(|_| ()) - .log_err("ClientConn send_loop/recv_loop/dispatch_loop"); - - netapp.disconnected_as_client(&peer_id, conn); - }); - - Ok(()) - } - - pub fn close(&self) { - self.must_exit.store(true, atomic::Ordering::SeqCst); - self.query_send - .send(None) - .log_err("could not write None in query_send"); - if self.inflight.lock().unwrap().is_empty() { - self.stop_recv_loop - .send(true) - .log_err("could not write true to stop_recv_loop"); - } - } - - pub(crate) async fn call( - self: Arc, - rq: T, - path: &str, - prio: RequestPriority, - ) -> Result<::Response, Error> - where - T: Message, - { - let id = self - .next_query_number - .fetch_add(1, atomic::Ordering::Relaxed); - let mut bytes = vec![prio, path.as_bytes().len() as u8]; - bytes.extend_from_slice(path.as_bytes()); - bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]); - - let (resp_send, resp_recv) = oneshot::channel(); - let old = self.inflight.lock().unwrap().insert(id, resp_send); - if let Some(old_ch) = old { - error!( - "Too many inflight requests! RequestID collision. Interrupting previous request." - ); - if old_ch.send(vec![]).is_err() { - debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); - } - } - - trace!("request: query_send {}, {} bytes", id, bytes.len()); - self.query_send.send(Some((id, prio, bytes)))?; - - let resp = resp_recv.await?; - - let code = resp[0]; - if code == 0 { - rmp_serde::decode::from_read_ref::<_, Result<::Response, String>>( - &resp[1..], - )? - .map_err(Error::Remote) - } else { - Err(Error::Remote(format!("Remote error code {}", code))) - } - } -} - -impl SendLoop for ClientConn {} - -#[async_trait] -impl RecvLoop for ClientConn { - async fn recv_handler(self: Arc, id: RequestID, msg: Vec) { - trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); - - let mut inflight = self.inflight.lock().unwrap(); - if let Some(ch) = inflight.remove(&id) { - if ch.send(msg).is_err() { - debug!("Could not send request response, probably because request was interrupted. Dropping response."); - } - } - - if inflight.is_empty() && self.must_exit.load(atomic::Ordering::SeqCst) { - self.stop_recv_loop - .send(true) - .log_err("could not write true to stop_recv_loop"); - } - } -} diff --git a/src/lib.rs b/src/lib.rs index e5251c5..f24b7ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,8 @@ pub mod util; pub mod endpoint; pub mod proto; -mod conn; +mod server; +mod client; pub mod netapp; pub mod peering; diff --git a/src/netapp.rs b/src/netapp.rs index 8415c58..afdd3c9 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -12,7 +12,8 @@ use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; use tokio::net::{TcpListener, TcpStream}; -use crate::conn::*; +use crate::client::*; +use crate::server::*; use crate::endpoint::*; use crate::error::*; use crate::proto::*; diff --git a/src/proto.rs b/src/proto.rs index 5b71ba5..3811e3f 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -38,11 +38,19 @@ pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) pub const PRIO_SECONDARY: RequestPriority = 0x01; +// Messages are sent by chunks +// Chunk format: +// - u32 BE: request id (same for request and response) +// - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag +// when this is not the last chunk of the message +// - [u8; chunk_length] chunk data + pub(crate) type RequestID = u32; type ChunkLength = u16; const MAX_CHUNK_LENGTH: ChunkLength = 0x4000; const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; + struct SendQueueItem { id: RequestID, prio: RequestPriority, @@ -86,12 +94,6 @@ impl SendQueue { } } -// Messages are sent by chunks -// Chunk format: -// - u32 BE: request id (same for request and response) -// - u16 BE: chunk length -// - [u8; chunk_length] chunk data - #[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop( diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..73ae267 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,171 @@ +use std::net::SocketAddr; +use std::sync::{Arc}; + +use bytes::Bytes; +use log::{debug, trace}; + +use tokio::net::TcpStream; +use tokio::sync::{mpsc, watch}; +use tokio_util::compat::*; + +use futures::io::AsyncReadExt; + +use async_trait::async_trait; + +use kuska_handshake::async_std::{handshake_server, BoxStream}; + +use crate::error::*; +use crate::netapp::*; +use crate::proto::*; +use crate::util::*; + +// The client and server connection structs (client.rs and server.rs) +// build upon the chunking mechanism which is exclusively contained +// in proto.rs. +// Here, we just care about sending big messages without size limit. +// The format of these messages is described below. +// Chunking happens independently. + +// Request message format (client -> server): +// - u8 priority +// - u8 path length +// - [u8; path length] path +// - [u8; *] data + +// Response message format (server -> client): +// - u8 response code +// - [u8; *] response + +pub(crate) struct ServerConn { + pub(crate) remote_addr: SocketAddr, + pub(crate) peer_id: NodeID, + + netapp: Arc, + + resp_send: mpsc::UnboundedSender)>>, + close_send: watch::Sender, +} + +impl ServerConn { + pub(crate) async fn run(netapp: Arc, socket: TcpStream) -> Result<(), Error> { + let remote_addr = socket.peer_addr()?; + let mut socket = socket.compat(); + + let handshake = handshake_server( + &mut socket, + netapp.netid.clone(), + netapp.id, + netapp.privkey.clone(), + ) + .await?; + let peer_id = handshake.peer_pk; + + debug!( + "Handshake complete (server) with {}@{}", + hex::encode(&peer_id), + remote_addr + ); + + let (read, write) = socket.split(); + + let (read, write) = + BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); + + let (resp_send, resp_recv) = mpsc::unbounded_channel(); + + let (close_send, close_recv) = watch::channel(false); + + let conn = Arc::new(ServerConn { + netapp: netapp.clone(), + remote_addr, + peer_id, + resp_send, + close_send, + }); + + netapp.connected_as_server(peer_id, conn.clone()); + + let conn2 = conn.clone(); + let conn3 = conn.clone(); + let close_recv2 = close_recv.clone(); + tokio::try_join!( + async move { + tokio::select!( + r = conn2.recv_loop(read) => r, + _ = await_exit(close_recv) => Ok(()), + ) + }, + async move { + tokio::select!( + r = conn3.send_loop(resp_recv, write) => r, + _ = await_exit(close_recv2) => Ok(()), + ) + }, + ) + .map(|_| ()) + .log_err("ServerConn recv_loop/send_loop"); + + netapp.disconnected_as_server(&peer_id, conn); + + Ok(()) + } + + pub fn close(&self) { + self.close_send.send(true).unwrap(); + } + + async fn recv_handler_aux(self: &Arc, bytes: &[u8]) -> Result, Error> { + if bytes.len() < 2 { + return Err(Error::Message("Invalid protocol message".into())); + } + + // byte 0 is the request priority, we don't care here + let path_length = bytes[1] as usize; + if bytes.len() < 2 + path_length { + return Err(Error::Message("Invalid protocol message".into())); + } + + let path = &bytes[2..2 + path_length]; + let path = String::from_utf8(path.to_vec())?; + let data = &bytes[2 + path_length..]; + + let handler_opt = { + let endpoints = self.netapp.endpoints.read().unwrap(); + endpoints.get(&path).map(|e| e.clone_endpoint()) + }; + + if let Some(handler) = handler_opt { + handler.handle(data, self.peer_id).await + } else { + Err(Error::NoHandler) + } + } +} + +impl SendLoop for ServerConn {} + +#[async_trait] +impl RecvLoop for ServerConn { + async fn recv_handler(self: Arc, id: RequestID, bytes: Vec) { + trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); + let bytes: Bytes = bytes.into(); + + let resp = self.recv_handler_aux(&bytes[..]).await; + let prio = bytes[0]; + + let mut resp_bytes = vec![]; + match resp { + Ok(rb) => { + resp_bytes.push(0u8); + resp_bytes.extend(&rb[..]); + } + Err(e) => { + resp_bytes.push(e.code()); + } + } + + self.resp_send + .send(Some((id, prio, resp_bytes))) + .log_err("ServerConn recv_handler send resp"); + } +} -- cgit v1.2.3