aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-12 14:51:28 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-12 14:51:28 +0200
commita4069d703c63e9512a87df7f16c574711f960335 (patch)
treedac67bf63ba2c85c421d3d10b9dbf71d9d32e4e2 /src
parentb14515a422a04fa008e6c67f6465ead927c7b8ad (diff)
downloadnetapp-a4069d703c63e9512a87df7f16c574711f960335.tar.gz
netapp-a4069d703c63e9512a87df7f16c574711f960335.zip
Use tokio_util::compat instead of the one from kuska-handshake
Diffstat (limited to 'src')
-rw-r--r--src/conn.rs34
-rw-r--r--src/proto.rs7
2 files changed, 16 insertions, 25 deletions
diff --git a/src/conn.rs b/src/conn.rs
index 474032d..73bc6a1 100644
--- a/src/conn.rs
+++ b/src/conn.rs
@@ -6,15 +6,16 @@ use std::sync::{Arc, Mutex};
use bytes::Bytes;
use log::{debug, error, trace};
-use tokio::io::split;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot, watch};
+use tokio_util::compat::*;
+
+use futures::io::AsyncReadExt;
use async_trait::async_trait;
use kuska_handshake::async_std::{
- handshake_client, handshake_server, BoxStream, TokioCompatExt, TokioCompatExtRead,
- TokioCompatExtWrite,
+ handshake_client, handshake_server, BoxStream
};
use crate::error::*;
@@ -35,9 +36,11 @@ pub(crate) struct ServerConn {
impl ServerConn {
pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> {
- let mut asyncstd_socket = TokioCompatExt::wrap(socket);
+ let remote_addr = socket.peer_addr()?;
+ let mut socket = socket.compat();
+
let handshake = handshake_server(
- &mut asyncstd_socket,
+ &mut socket,
netapp.netid.clone(),
netapp.id,
netapp.privkey.clone(),
@@ -45,19 +48,13 @@ impl ServerConn {
.await?;
let peer_id = handshake.peer_pk;
- let tokio_socket = asyncstd_socket.into_inner();
- let remote_addr = tokio_socket.peer_addr()?;
-
debug!(
"Handshake complete (server) with {}@{}",
hex::encode(&peer_id),
remote_addr
);
- let (read, write) = split(tokio_socket);
-
- let read = TokioCompatExtRead::wrap(read);
- let write = TokioCompatExtWrite::wrap(write);
+ let (read, write) = socket.split();
let (read, write) =
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
@@ -148,10 +145,11 @@ impl ClientConn {
socket: TcpStream,
peer_id: NodeID,
) -> Result<(), Error> {
- let mut asyncstd_socket = TokioCompatExt::wrap(socket);
+ let remote_addr = socket.peer_addr()?;
+ let mut socket = socket.compat();
let handshake = handshake_client(
- &mut asyncstd_socket,
+ &mut socket,
netapp.netid.clone(),
netapp.id,
netapp.privkey.clone(),
@@ -159,19 +157,13 @@ impl ClientConn {
)
.await?;
- let tokio_socket = asyncstd_socket.into_inner();
- let remote_addr = tokio_socket.peer_addr()?;
-
debug!(
"Handshake complete (client) with {}@{}",
hex::encode(&peer_id),
remote_addr
);
- let (read, write) = split(tokio_socket);
-
- let read = TokioCompatExtRead::wrap(read);
- let write = TokioCompatExtWrite::wrap(write);
+ let (read, write) = socket.split();
let (read, write) =
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
diff --git a/src/proto.rs b/src/proto.rs
index d8f6289..ef3b31c 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -3,8 +3,7 @@ use std::sync::Arc;
use log::trace;
-use async_std::io::prelude::WriteExt;
-use async_std::io::ReadExt;
+use futures::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
@@ -94,7 +93,7 @@ pub(crate) trait SendLoop: Sync {
mut write: W,
) -> Result<(), Error>
where
- W: WriteExt + Unpin + Send + Sync,
+ W: AsyncWriteExt + Unpin + Send + Sync,
{
let mut sending = SendQueue::new();
let mut should_exit = false;
@@ -168,7 +167,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
where
- R: ReadExt + Unpin + Send + Sync,
+ R: AsyncReadExt + Unpin + Send + Sync,
{
let mut receiving = HashMap::new();
loop {