aboutsummaryrefslogtreecommitdiff
path: root/src/conn.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-12 18:13:07 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-12 18:13:07 +0200
commitd9bd1182f7b980df8e631ae8eeca444f5d997909 (patch)
tree9345b078b2225c2fd58074ef3016ea6524d0ab3f /src/conn.rs
parentf87dbe73dc12f2d6eb13850a3bc4b012aadd3c9b (diff)
downloadnetapp-d9bd1182f7b980df8e631ae8eeca444f5d997909.tar.gz
netapp-d9bd1182f7b980df8e631ae8eeca444f5d997909.zip
Move out things from conn.rs into two separate files
Diffstat (limited to 'src/conn.rs')
-rw-r--r--src/conn.rs319
1 files changed, 0 insertions, 319 deletions
diff --git a/src/conn.rs b/src/conn.rs
deleted file mode 100644
index 64318dc..0000000
--- a/src/conn.rs
+++ /dev/null
@@ -1,319 +0,0 @@
-use std::collections::HashMap;
-use std::net::SocketAddr;
-use std::sync::atomic::{self, AtomicBool, AtomicU32};
-use std::sync::{Arc, Mutex};
-
-use bytes::Bytes;
-use log::{debug, error, trace};
-
-use tokio::net::TcpStream;
-use tokio::sync::{mpsc, oneshot, watch};
-use tokio_util::compat::*;
-
-use futures::io::AsyncReadExt;
-
-use async_trait::async_trait;
-
-use kuska_handshake::async_std::{handshake_client, handshake_server, BoxStream};
-
-use crate::endpoint::*;
-use crate::error::*;
-use crate::netapp::*;
-use crate::proto::*;
-use crate::util::*;
-
-// Request message format (client -> server):
-// - u8 priority
-// - u8 path length
-// - [u8; path length] path
-// - [u8; *] data
-
-// Response message format (server -> client):
-// - u8 response code
-// - [u8; *] response
-
-pub(crate) struct ServerConn {
- pub(crate) remote_addr: SocketAddr,
- pub(crate) peer_id: NodeID,
-
- netapp: Arc<NetApp>,
-
- resp_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>,
- close_send: watch::Sender<bool>,
-}
-
-impl ServerConn {
- pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> {
- let remote_addr = socket.peer_addr()?;
- let mut socket = socket.compat();
-
- let handshake = handshake_server(
- &mut socket,
- netapp.netid.clone(),
- netapp.id,
- netapp.privkey.clone(),
- )
- .await?;
- let peer_id = handshake.peer_pk;
-
- debug!(
- "Handshake complete (server) with {}@{}",
- hex::encode(&peer_id),
- remote_addr
- );
-
- let (read, write) = socket.split();
-
- let (read, write) =
- BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
-
- let (resp_send, resp_recv) = mpsc::unbounded_channel();
-
- let (close_send, close_recv) = watch::channel(false);
-
- let conn = Arc::new(ServerConn {
- netapp: netapp.clone(),
- remote_addr,
- peer_id,
- resp_send,
- close_send,
- });
-
- netapp.connected_as_server(peer_id, conn.clone());
-
- let conn2 = conn.clone();
- let conn3 = conn.clone();
- let close_recv2 = close_recv.clone();
- tokio::try_join!(
- async move {
- tokio::select!(
- r = conn2.recv_loop(read) => r,
- _ = await_exit(close_recv) => Ok(()),
- )
- },
- async move {
- tokio::select!(
- r = conn3.send_loop(resp_recv, write) => r,
- _ = await_exit(close_recv2) => Ok(()),
- )
- },
- )
- .map(|_| ())
- .log_err("ServerConn recv_loop/send_loop");
-
- netapp.disconnected_as_server(&peer_id, conn);
-
- Ok(())
- }
-
- pub fn close(&self) {
- self.close_send.send(true).unwrap();
- }
-
- async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> {
- if bytes.len() < 2 {
- return Err(Error::Message("Invalid protocol message".into()));
- }
-
- // byte 0 is the request priority, we don't care here
- let path_length = bytes[1] as usize;
- if bytes.len() < 2 + path_length {
- return Err(Error::Message("Invalid protocol message".into()));
- }
-
- let path = &bytes[2..2 + path_length];
- let path = String::from_utf8(path.to_vec())?;
- let data = &bytes[2 + path_length..];
-
- let handler_opt = {
- let endpoints = self.netapp.endpoints.read().unwrap();
- endpoints.get(&path).map(|e| e.clone_endpoint())
- };
-
- if let Some(handler) = handler_opt {
- handler.handle(data, self.peer_id).await
- } else {
- Err(Error::NoHandler)
- }
- }
-}
-
-impl SendLoop for ServerConn {}
-
-#[async_trait]
-impl RecvLoop for ServerConn {
- async fn recv_handler(self: Arc<Self>, id: RequestID, bytes: Vec<u8>) {
- trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len());
- let bytes: Bytes = bytes.into();
-
- let resp = self.recv_handler_aux(&bytes[..]).await;
- let prio = bytes[0];
-
- let mut resp_bytes = vec![];
- match resp {
- Ok(rb) => {
- resp_bytes.push(0u8);
- resp_bytes.extend(&rb[..]);
- }
- Err(e) => {
- resp_bytes.push(e.code());
- }
- }
-
- self.resp_send
- .send(Some((id, prio, resp_bytes)))
- .log_err("ServerConn recv_handler send resp");
- }
-}
-pub(crate) struct ClientConn {
- pub(crate) remote_addr: SocketAddr,
- pub(crate) peer_id: NodeID,
-
- query_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>,
-
- next_query_number: AtomicU32,
- inflight: Mutex<HashMap<RequestID, oneshot::Sender<Vec<u8>>>>,
- must_exit: AtomicBool,
- stop_recv_loop: watch::Sender<bool>,
-}
-
-impl ClientConn {
- pub(crate) async fn init(
- netapp: Arc<NetApp>,
- socket: TcpStream,
- peer_id: NodeID,
- ) -> Result<(), Error> {
- let remote_addr = socket.peer_addr()?;
- let mut socket = socket.compat();
-
- let handshake = handshake_client(
- &mut socket,
- netapp.netid.clone(),
- netapp.id,
- netapp.privkey.clone(),
- peer_id,
- )
- .await?;
-
- debug!(
- "Handshake complete (client) with {}@{}",
- hex::encode(&peer_id),
- remote_addr
- );
-
- let (read, write) = socket.split();
-
- let (read, write) =
- BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
-
- let (query_send, query_recv) = mpsc::unbounded_channel();
-
- let (stop_recv_loop, stop_recv_loop_recv) = watch::channel(false);
-
- let conn = Arc::new(ClientConn {
- remote_addr,
- peer_id,
- next_query_number: AtomicU32::from(RequestID::default()),
- query_send,
- inflight: Mutex::new(HashMap::new()),
- must_exit: AtomicBool::new(false),
- stop_recv_loop,
- });
-
- netapp.connected_as_client(peer_id, conn.clone());
-
- tokio::spawn(async move {
- let conn2 = conn.clone();
- let conn3 = conn.clone();
- tokio::try_join!(conn2.send_loop(query_recv, write), async move {
- tokio::select!(
- r = conn3.recv_loop(read) => r,
- _ = await_exit(stop_recv_loop_recv) => Ok(()),
- )
- })
- .map(|_| ())
- .log_err("ClientConn send_loop/recv_loop/dispatch_loop");
-
- netapp.disconnected_as_client(&peer_id, conn);
- });
-
- Ok(())
- }
-
- pub fn close(&self) {
- self.must_exit.store(true, atomic::Ordering::SeqCst);
- self.query_send
- .send(None)
- .log_err("could not write None in query_send");
- if self.inflight.lock().unwrap().is_empty() {
- self.stop_recv_loop
- .send(true)
- .log_err("could not write true to stop_recv_loop");
- }
- }
-
- pub(crate) async fn call<T>(
- self: Arc<Self>,
- rq: T,
- path: &str,
- prio: RequestPriority,
- ) -> Result<<T as Message>::Response, Error>
- where
- T: Message,
- {
- let id = self
- .next_query_number
- .fetch_add(1, atomic::Ordering::Relaxed);
- let mut bytes = vec![prio, path.as_bytes().len() as u8];
- bytes.extend_from_slice(path.as_bytes());
- bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]);
-
- let (resp_send, resp_recv) = oneshot::channel();
- let old = self.inflight.lock().unwrap().insert(id, resp_send);
- if let Some(old_ch) = old {
- error!(
- "Too many inflight requests! RequestID collision. Interrupting previous request."
- );
- if old_ch.send(vec![]).is_err() {
- debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response.");
- }
- }
-
- trace!("request: query_send {}, {} bytes", id, bytes.len());
- self.query_send.send(Some((id, prio, bytes)))?;
-
- let resp = resp_recv.await?;
-
- let code = resp[0];
- if code == 0 {
- rmp_serde::decode::from_read_ref::<_, Result<<T as Message>::Response, String>>(
- &resp[1..],
- )?
- .map_err(Error::Remote)
- } else {
- Err(Error::Remote(format!("Remote error code {}", code)))
- }
- }
-}
-
-impl SendLoop for ClientConn {}
-
-#[async_trait]
-impl RecvLoop for ClientConn {
- async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) {
- trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len());
-
- let mut inflight = self.inflight.lock().unwrap();
- if let Some(ch) = inflight.remove(&id) {
- if ch.send(msg).is_err() {
- debug!("Could not send request response, probably because request was interrupted. Dropping response.");
- }
- }
-
- if inflight.is_empty() && self.must_exit.load(atomic::Ordering::SeqCst) {
- self.stop_recv_loop
- .send(true)
- .log_err("could not write true to stop_recv_loop");
- }
- }
-}