diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/conn.rs | 34 | ||||
-rw-r--r-- | src/proto.rs | 7 |
2 files changed, 16 insertions, 25 deletions
diff --git a/src/conn.rs b/src/conn.rs index 474032d..73bc6a1 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -6,15 +6,16 @@ use std::sync::{Arc, Mutex}; use bytes::Bytes; use log::{debug, error, trace}; -use tokio::io::split; use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot, watch}; +use tokio_util::compat::*; + +use futures::io::AsyncReadExt; use async_trait::async_trait; use kuska_handshake::async_std::{ - handshake_client, handshake_server, BoxStream, TokioCompatExt, TokioCompatExtRead, - TokioCompatExtWrite, + handshake_client, handshake_server, BoxStream }; use crate::error::*; @@ -35,9 +36,11 @@ pub(crate) struct ServerConn { impl ServerConn { pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> { - let mut asyncstd_socket = TokioCompatExt::wrap(socket); + let remote_addr = socket.peer_addr()?; + let mut socket = socket.compat(); + let handshake = handshake_server( - &mut asyncstd_socket, + &mut socket, netapp.netid.clone(), netapp.id, netapp.privkey.clone(), @@ -45,19 +48,13 @@ impl ServerConn { .await?; let peer_id = handshake.peer_pk; - let tokio_socket = asyncstd_socket.into_inner(); - let remote_addr = tokio_socket.peer_addr()?; - debug!( "Handshake complete (server) with {}@{}", hex::encode(&peer_id), remote_addr ); - let (read, write) = split(tokio_socket); - - let read = TokioCompatExtRead::wrap(read); - let write = TokioCompatExtWrite::wrap(write); + let (read, write) = socket.split(); let (read, write) = BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); @@ -148,10 +145,11 @@ impl ClientConn { socket: TcpStream, peer_id: NodeID, ) -> Result<(), Error> { - let mut asyncstd_socket = TokioCompatExt::wrap(socket); + let remote_addr = socket.peer_addr()?; + let mut socket = socket.compat(); let handshake = handshake_client( - &mut asyncstd_socket, + &mut socket, netapp.netid.clone(), netapp.id, netapp.privkey.clone(), @@ -159,19 +157,13 @@ impl ClientConn { ) .await?; - let tokio_socket = asyncstd_socket.into_inner(); - let remote_addr = tokio_socket.peer_addr()?; - debug!( "Handshake complete (client) with {}@{}", hex::encode(&peer_id), remote_addr ); - let (read, write) = split(tokio_socket); - - let read = TokioCompatExtRead::wrap(read); - let write = TokioCompatExtWrite::wrap(write); + let (read, write) = socket.split(); let (read, write) = BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); diff --git a/src/proto.rs b/src/proto.rs index d8f6289..ef3b31c 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -3,8 +3,7 @@ use std::sync::Arc; use log::trace; -use async_std::io::prelude::WriteExt; -use async_std::io::ReadExt; +use futures::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::mpsc; @@ -94,7 +93,7 @@ pub(crate) trait SendLoop: Sync { mut write: W, ) -> Result<(), Error> where - W: WriteExt + Unpin + Send + Sync, + W: AsyncWriteExt + Unpin + Send + Sync, { let mut sending = SendQueue::new(); let mut should_exit = false; @@ -168,7 +167,7 @@ pub(crate) trait RecvLoop: Sync + 'static { async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> where - R: ReadExt + Unpin + Send + Sync, + R: AsyncReadExt + Unpin + Send + Sync, { let mut receiving = HashMap::new(); loop { |