aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/conn.rs23
-rw-r--r--src/netapp.rs24
-rw-r--r--src/proto.rs21
3 files changed, 37 insertions, 31 deletions
diff --git a/src/conn.rs b/src/conn.rs
index 7ba15f9..0aee952 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -60,7 +60,7 @@ impl ServerConn {
let read = TokioCompatExtRead::wrap(read);
let write = TokioCompatExtWrite::wrap(write);
- let (box_stream_read, box_stream_write) =
+ let (read, write) =
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
let (resp_send, resp_recv) = mpsc::unbounded_channel();
@@ -83,13 +83,13 @@ impl ServerConn {
tokio::try_join!(
async move {
tokio::select!(
- r = conn2.recv_loop(box_stream_read) => r,
+ r = conn2.recv_loop(read) => r,
_ = await_exit(close_recv) => Ok(()),
)
},
async move {
tokio::select!(
- r = conn3.send_loop(resp_recv, box_stream_write) => r,
+ r = conn3.send_loop(resp_recv, write) => r,
_ = await_exit(close_recv2) => Ok(()),
)
},
@@ -174,7 +174,7 @@ impl ClientConn {
let read = TokioCompatExtRead::wrap(read);
let write = TokioCompatExtWrite::wrap(write);
- let (box_stream_read, box_stream_write) =
+ let (read, write) =
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
let (query_send, query_recv) = mpsc::unbounded_channel();
@@ -196,15 +196,12 @@ impl ClientConn {
tokio::spawn(async move {
let conn2 = conn.clone();
let conn3 = conn.clone();
- tokio::try_join!(
- conn2.send_loop(query_recv, box_stream_write),
- async move {
- tokio::select!(
- r = conn3.recv_loop(box_stream_read) => r,
- _ = await_exit(stop_recv_loop_recv) => Ok(()),
- )
- }
- )
+ tokio::try_join!(conn2.send_loop(query_recv, write), async move {
+ tokio::select!(
+ r = conn3.recv_loop(read) => r,
+ _ = await_exit(stop_recv_loop_recv) => Ok(()),
+ )
+ })
.map(|_| ())
.log_err("ClientConn send_loop/recv_loop/dispatch_loop");
diff --git a/src/netapp.rs b/src/netapp.rs
index 967105e..6f6da5b 100644
--- a/src/netapp.rs
+++ b/src/netapp.rs
@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
+use std::time::Instant;
use std::future::Future;
@@ -75,10 +76,18 @@ where
M::KIND,
hex::encode(remote)
);
+ let begin_time = Instant::now();
let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) {
Ok(msg) => Ok(handler(remote, msg).await),
Err(e) => Err(e.to_string()),
};
+ let end_time = Instant::now();
+ debug!(
+ "Request {:08x} from {} handled in {}msec",
+ M::KIND,
+ hex::encode(remote),
+ (end_time - begin_time).as_millis()
+ );
rmp_to_vec_all_named(&res).unwrap_or(vec![])
}
@@ -291,8 +300,7 @@ impl NetApp {
pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) {
info!("Accepted connection from {}", hex::encode(id));
- let mut conn_list = self.server_conns.write().unwrap();
- conn_list.insert(id.clone(), conn);
+ self.server_conns.write().unwrap().insert(id, conn);
}
// Handle hello message from a client. This message is used for them to tell us
@@ -319,10 +327,11 @@ impl NetApp {
if let Some(c) = conn_list.get(id) {
if Arc::ptr_eq(c, &conn) {
conn_list.remove(id);
- }
+ drop(conn_list);
- if let Some(h) = self.on_disconnected_handler.load().as_ref() {
- h(conn.peer_pk, true);
+ if let Some(h) = self.on_disconnected_handler.load().as_ref() {
+ h(conn.peer_pk, true);
+ }
}
}
}
@@ -338,8 +347,8 @@ impl NetApp {
info!("Connection established to {}", hex::encode(id));
{
- let mut conn_list = self.client_conns.write().unwrap();
- if let Some(old_c) = conn_list.insert(id.clone(), conn.clone()) {
+ let old_c_opt = self.client_conns.write().unwrap().insert(id, conn.clone());
+ if let Some(old_c) = old_c_opt {
tokio::spawn(async move { old_c.close() });
}
}
@@ -365,6 +374,7 @@ impl NetApp {
if let Some(c) = conn_list.get(id) {
if Arc::ptr_eq(c, &conn) {
conn_list.remove(id);
+ drop(conn_list);
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
h(conn.peer_pk, false);
diff --git a/src/proto.rs b/src/proto.rs
index 3e9fe20..7b8aa4b 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -6,16 +6,12 @@ use log::trace;
use async_std::io::prelude::WriteExt;
use async_std::io::ReadExt;
-use tokio::io::{ReadHalf, WriteHalf};
-use tokio::net::TcpStream;
use tokio::sync::mpsc;
use async_trait::async_trait;
use crate::error::*;
-use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite, TokioCompat};
-
/// Priority of a request (click to read more about priorities).
///
/// This priority value is used to priorize messages
@@ -92,11 +88,14 @@ impl SendQueue {
#[async_trait]
pub(crate) trait SendLoop: Sync {
- async fn send_loop(
+ async fn send_loop<W>(
self: Arc<Self>,
mut msg_recv: mpsc::UnboundedReceiver<Option<(RequestID, RequestPriority, Vec<u8>)>>,
- mut write: BoxStreamWrite<TokioCompat<WriteHalf<TcpStream>>>,
- ) -> Result<(), Error> {
+ mut write: W,
+ ) -> Result<(), Error>
+ where
+ W: WriteExt + Unpin + Send + Sync,
+ {
let mut sending = SendQueue::new();
let mut should_exit = false;
while !should_exit || !sending.is_empty() {
@@ -167,10 +166,10 @@ pub(crate) trait RecvLoop: Sync + 'static {
// Returns true if we should stop receiving after this
async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>);
- async fn recv_loop(
- self: Arc<Self>,
- mut read: BoxStreamRead<TokioCompat<ReadHalf<TcpStream>>>,
- ) -> Result<(), Error> {
+ async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
+ where
+ R: ReadExt + Unpin + Send + Sync,
+ {
let mut receiving = HashMap::new();
loop {
trace!("recv_loop: reading packet");