diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-12 18:13:07 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-12 18:13:07 +0200 |
commit | d9bd1182f7b980df8e631ae8eeca444f5d997909 (patch) | |
tree | 9345b078b2225c2fd58074ef3016ea6524d0ab3f /src/server.rs | |
parent | f87dbe73dc12f2d6eb13850a3bc4b012aadd3c9b (diff) | |
download | netapp-d9bd1182f7b980df8e631ae8eeca444f5d997909.tar.gz netapp-d9bd1182f7b980df8e631ae8eeca444f5d997909.zip |
Move out things from conn.rs into two separate files
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..73ae267 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,171 @@ +use std::net::SocketAddr; +use std::sync::{Arc}; + +use bytes::Bytes; +use log::{debug, trace}; + +use tokio::net::TcpStream; +use tokio::sync::{mpsc, watch}; +use tokio_util::compat::*; + +use futures::io::AsyncReadExt; + +use async_trait::async_trait; + +use kuska_handshake::async_std::{handshake_server, BoxStream}; + +use crate::error::*; +use crate::netapp::*; +use crate::proto::*; +use crate::util::*; + +// The client and server connection structs (client.rs and server.rs) +// build upon the chunking mechanism which is exclusively contained +// in proto.rs. +// Here, we just care about sending big messages without size limit. +// The format of these messages is described below. +// Chunking happens independently. + +// Request message format (client -> server): +// - u8 priority +// - u8 path length +// - [u8; path length] path +// - [u8; *] data + +// Response message format (server -> client): +// - u8 response code +// - [u8; *] response + +pub(crate) struct ServerConn { + pub(crate) remote_addr: SocketAddr, + pub(crate) peer_id: NodeID, + + netapp: Arc<NetApp>, + + resp_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, + close_send: watch::Sender<bool>, +} + +impl ServerConn { + pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> { + let remote_addr = socket.peer_addr()?; + let mut socket = socket.compat(); + + let handshake = handshake_server( + &mut socket, + netapp.netid.clone(), + netapp.id, + netapp.privkey.clone(), + ) + .await?; + let peer_id = handshake.peer_pk; + + debug!( + "Handshake complete (server) with {}@{}", + hex::encode(&peer_id), + remote_addr + ); + + let (read, write) = socket.split(); + + let (read, write) = + BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); + + let (resp_send, resp_recv) = mpsc::unbounded_channel(); + + let (close_send, close_recv) = watch::channel(false); + + let conn = Arc::new(ServerConn { + netapp: netapp.clone(), + remote_addr, + peer_id, + resp_send, + close_send, + }); + + netapp.connected_as_server(peer_id, conn.clone()); + + let conn2 = conn.clone(); + let conn3 = conn.clone(); + let close_recv2 = close_recv.clone(); + tokio::try_join!( + async move { + tokio::select!( + r = conn2.recv_loop(read) => r, + _ = await_exit(close_recv) => Ok(()), + ) + }, + async move { + tokio::select!( + r = conn3.send_loop(resp_recv, write) => r, + _ = await_exit(close_recv2) => Ok(()), + ) + }, + ) + .map(|_| ()) + .log_err("ServerConn recv_loop/send_loop"); + + netapp.disconnected_as_server(&peer_id, conn); + + Ok(()) + } + + pub fn close(&self) { + self.close_send.send(true).unwrap(); + } + + async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> { + if bytes.len() < 2 { + return Err(Error::Message("Invalid protocol message".into())); + } + + // byte 0 is the request priority, we don't care here + let path_length = bytes[1] as usize; + if bytes.len() < 2 + path_length { + return Err(Error::Message("Invalid protocol message".into())); + } + + let path = &bytes[2..2 + path_length]; + let path = String::from_utf8(path.to_vec())?; + let data = &bytes[2 + path_length..]; + + let handler_opt = { + let endpoints = self.netapp.endpoints.read().unwrap(); + endpoints.get(&path).map(|e| e.clone_endpoint()) + }; + + if let Some(handler) = handler_opt { + handler.handle(data, self.peer_id).await + } else { + Err(Error::NoHandler) + } + } +} + +impl SendLoop for ServerConn {} + +#[async_trait] +impl RecvLoop for ServerConn { + async fn recv_handler(self: Arc<Self>, id: RequestID, bytes: Vec<u8>) { + trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); + let bytes: Bytes = bytes.into(); + + let resp = self.recv_handler_aux(&bytes[..]).await; + let prio = bytes[0]; + + let mut resp_bytes = vec![]; + match resp { + Ok(rb) => { + resp_bytes.push(0u8); + resp_bytes.extend(&rb[..]); + } + Err(e) => { + resp_bytes.push(e.code()); + } + } + + self.resp_send + .send(Some((id, prio, resp_bytes))) + .log_err("ServerConn recv_handler send resp"); + } +} |