diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs (renamed from src/conn.rs) | 149 | ||||
-rw-r--r-- | src/lib.rs | 3 | ||||
-rw-r--r-- | src/netapp.rs | 3 | ||||
-rw-r--r-- | src/proto.rs | 14 | ||||
-rw-r--r-- | src/server.rs | 171 |
5 files changed, 188 insertions, 152 deletions
diff --git a/src/conn.rs b/src/client.rs index 64318dc..a436d53 100644 --- a/src/conn.rs +++ b/src/client.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::sync::atomic::{self, AtomicBool, AtomicU32}; use std::sync::{Arc, Mutex}; -use bytes::Bytes; use log::{debug, error, trace}; use tokio::net::TcpStream; @@ -14,7 +13,7 @@ use futures::io::AsyncReadExt; use async_trait::async_trait; -use kuska_handshake::async_std::{handshake_client, handshake_server, BoxStream}; +use kuska_handshake::async_std::{handshake_client, BoxStream}; use crate::endpoint::*; use crate::error::*; @@ -22,149 +21,7 @@ use crate::netapp::*; use crate::proto::*; use crate::util::*; -// Request message format (client -> server): -// - u8 priority -// - u8 path length -// - [u8; path length] path -// - [u8; *] data -// Response message format (server -> client): -// - u8 response code -// - [u8; *] response - -pub(crate) struct ServerConn { - pub(crate) remote_addr: SocketAddr, - pub(crate) peer_id: NodeID, - - netapp: Arc<NetApp>, - - resp_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, - close_send: watch::Sender<bool>, -} - -impl ServerConn { - pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> { - let remote_addr = socket.peer_addr()?; - let mut socket = socket.compat(); - - let handshake = handshake_server( - &mut socket, - netapp.netid.clone(), - netapp.id, - netapp.privkey.clone(), - ) - .await?; - let peer_id = handshake.peer_pk; - - debug!( - "Handshake complete (server) with {}@{}", - hex::encode(&peer_id), - remote_addr - ); - - let (read, write) = socket.split(); - - let (read, write) = - BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); - - let (resp_send, resp_recv) = mpsc::unbounded_channel(); - - let (close_send, close_recv) = watch::channel(false); - - let conn = Arc::new(ServerConn { - netapp: netapp.clone(), - remote_addr, - peer_id, - resp_send, - close_send, - }); - - netapp.connected_as_server(peer_id, conn.clone()); - - let conn2 = conn.clone(); - let conn3 = conn.clone(); - let close_recv2 = close_recv.clone(); - tokio::try_join!( - async move { - tokio::select!( - r = conn2.recv_loop(read) => r, - _ = await_exit(close_recv) => Ok(()), - ) - }, - async move { - tokio::select!( - r = conn3.send_loop(resp_recv, write) => r, - _ = await_exit(close_recv2) => Ok(()), - ) - }, - ) - .map(|_| ()) - .log_err("ServerConn recv_loop/send_loop"); - - netapp.disconnected_as_server(&peer_id, conn); - - Ok(()) - } - - pub fn close(&self) { - self.close_send.send(true).unwrap(); - } - - async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> { - if bytes.len() < 2 { - return Err(Error::Message("Invalid protocol message".into())); - } - - // byte 0 is the request priority, we don't care here - let path_length = bytes[1] as usize; - if bytes.len() < 2 + path_length { - return Err(Error::Message("Invalid protocol message".into())); - } - - let path = &bytes[2..2 + path_length]; - let path = String::from_utf8(path.to_vec())?; - let data = &bytes[2 + path_length..]; - - let handler_opt = { - let endpoints = self.netapp.endpoints.read().unwrap(); - endpoints.get(&path).map(|e| e.clone_endpoint()) - }; - - if let Some(handler) = handler_opt { - handler.handle(data, self.peer_id).await - } else { - Err(Error::NoHandler) - } - } -} - -impl SendLoop for ServerConn {} - -#[async_trait] -impl RecvLoop for ServerConn { - async fn recv_handler(self: Arc<Self>, id: RequestID, bytes: Vec<u8>) { - trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); - let bytes: Bytes = bytes.into(); - - let resp = self.recv_handler_aux(&bytes[..]).await; - let prio = bytes[0]; - - let mut resp_bytes = vec![]; - match resp { - Ok(rb) => { - resp_bytes.push(0u8); - resp_bytes.extend(&rb[..]); - } - Err(e) => { - resp_bytes.push(e.code()); - } - } - - self.resp_send - .send(Some((id, prio, resp_bytes))) - .log_err("ServerConn recv_handler send resp"); - } -} pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, @@ -264,6 +121,7 @@ impl ClientConn { let id = self .next_query_number .fetch_add(1, atomic::Ordering::Relaxed); + let mut bytes = vec![prio, path.as_bytes().len() as u8]; bytes.extend_from_slice(path.as_bytes()); bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]); @@ -283,6 +141,9 @@ impl ClientConn { self.query_send.send(Some((id, prio, bytes)))?; let resp = resp_recv.await?; + if resp.len() == 0 { + return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into())); + } let code = resp[0]; if code == 0 { @@ -21,7 +21,8 @@ pub mod util; pub mod endpoint; pub mod proto; -mod conn; +mod server; +mod client; pub mod netapp; pub mod peering; diff --git a/src/netapp.rs b/src/netapp.rs index 8415c58..afdd3c9 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -12,7 +12,8 @@ use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; use tokio::net::{TcpListener, TcpStream}; -use crate::conn::*; +use crate::client::*; +use crate::server::*; use crate::endpoint::*; use crate::error::*; use crate::proto::*; diff --git a/src/proto.rs b/src/proto.rs index 5b71ba5..3811e3f 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -38,11 +38,19 @@ pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) pub const PRIO_SECONDARY: RequestPriority = 0x01; +// Messages are sent by chunks +// Chunk format: +// - u32 BE: request id (same for request and response) +// - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag +// when this is not the last chunk of the message +// - [u8; chunk_length] chunk data + pub(crate) type RequestID = u32; type ChunkLength = u16; const MAX_CHUNK_LENGTH: ChunkLength = 0x4000; const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; + struct SendQueueItem { id: RequestID, prio: RequestPriority, @@ -86,12 +94,6 @@ impl SendQueue { } } -// Messages are sent by chunks -// Chunk format: -// - u32 BE: request id (same for request and response) -// - u16 BE: chunk length -// - [u8; chunk_length] chunk data - #[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop<W>( diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..73ae267 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,171 @@ +use std::net::SocketAddr; +use std::sync::{Arc}; + +use bytes::Bytes; +use log::{debug, trace}; + +use tokio::net::TcpStream; +use tokio::sync::{mpsc, watch}; +use tokio_util::compat::*; + +use futures::io::AsyncReadExt; + +use async_trait::async_trait; + +use kuska_handshake::async_std::{handshake_server, BoxStream}; + +use crate::error::*; +use crate::netapp::*; +use crate::proto::*; +use crate::util::*; + +// The client and server connection structs (client.rs and server.rs) +// build upon the chunking mechanism which is exclusively contained +// in proto.rs. +// Here, we just care about sending big messages without size limit. +// The format of these messages is described below. +// Chunking happens independently. + +// Request message format (client -> server): +// - u8 priority +// - u8 path length +// - [u8; path length] path +// - [u8; *] data + +// Response message format (server -> client): +// - u8 response code +// - [u8; *] response + +pub(crate) struct ServerConn { + pub(crate) remote_addr: SocketAddr, + pub(crate) peer_id: NodeID, + + netapp: Arc<NetApp>, + + resp_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, + close_send: watch::Sender<bool>, +} + +impl ServerConn { + pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> { + let remote_addr = socket.peer_addr()?; + let mut socket = socket.compat(); + + let handshake = handshake_server( + &mut socket, + netapp.netid.clone(), + netapp.id, + netapp.privkey.clone(), + ) + .await?; + let peer_id = handshake.peer_pk; + + debug!( + "Handshake complete (server) with {}@{}", + hex::encode(&peer_id), + remote_addr + ); + + let (read, write) = socket.split(); + + let (read, write) = + BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); + + let (resp_send, resp_recv) = mpsc::unbounded_channel(); + + let (close_send, close_recv) = watch::channel(false); + + let conn = Arc::new(ServerConn { + netapp: netapp.clone(), + remote_addr, + peer_id, + resp_send, + close_send, + }); + + netapp.connected_as_server(peer_id, conn.clone()); + + let conn2 = conn.clone(); + let conn3 = conn.clone(); + let close_recv2 = close_recv.clone(); + tokio::try_join!( + async move { + tokio::select!( + r = conn2.recv_loop(read) => r, + _ = await_exit(close_recv) => Ok(()), + ) + }, + async move { + tokio::select!( + r = conn3.send_loop(resp_recv, write) => r, + _ = await_exit(close_recv2) => Ok(()), + ) + }, + ) + .map(|_| ()) + .log_err("ServerConn recv_loop/send_loop"); + + netapp.disconnected_as_server(&peer_id, conn); + + Ok(()) + } + + pub fn close(&self) { + self.close_send.send(true).unwrap(); + } + + async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> { + if bytes.len() < 2 { + return Err(Error::Message("Invalid protocol message".into())); + } + + // byte 0 is the request priority, we don't care here + let path_length = bytes[1] as usize; + if bytes.len() < 2 + path_length { + return Err(Error::Message("Invalid protocol message".into())); + } + + let path = &bytes[2..2 + path_length]; + let path = String::from_utf8(path.to_vec())?; + let data = &bytes[2 + path_length..]; + + let handler_opt = { + let endpoints = self.netapp.endpoints.read().unwrap(); + endpoints.get(&path).map(|e| e.clone_endpoint()) + }; + + if let Some(handler) = handler_opt { + handler.handle(data, self.peer_id).await + } else { + Err(Error::NoHandler) + } + } +} + +impl SendLoop for ServerConn {} + +#[async_trait] +impl RecvLoop for ServerConn { + async fn recv_handler(self: Arc<Self>, id: RequestID, bytes: Vec<u8>) { + trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); + let bytes: Bytes = bytes.into(); + + let resp = self.recv_handler_aux(&bytes[..]).await; + let prio = bytes[0]; + + let mut resp_bytes = vec![]; + match resp { + Ok(rb) => { + resp_bytes.push(0u8); + resp_bytes.extend(&rb[..]); + } + Err(e) => { + resp_bytes.push(e.code()); + } + } + + self.resp_send + .send(Some((id, prio, resp_bytes))) + .log_err("ServerConn recv_handler send resp"); + } +} |