diff options
author | Alex Auvolat <alex@adnab.me> | 2020-12-07 18:07:55 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-12-07 18:07:55 +0100 |
commit | 58ec2abe1a805d0fe6c86ab009f6947adbd9ca2b (patch) | |
tree | 1060640d08de00ddf59aa6f2d213061ff0db27aa /src/proto.rs | |
parent | 32a0fbcbd919ec45bb6380352190115f701f2c91 (diff) | |
download | netapp-58ec2abe1a805d0fe6c86ab009f6947adbd9ca2b.tar.gz netapp-58ec2abe1a805d0fe6c86ab009f6947adbd9ca2b.zip |
Small changes
Diffstat (limited to 'src/proto.rs')
-rw-r--r-- | src/proto.rs | 21 |
1 files changed, 10 insertions, 11 deletions
diff --git a/src/proto.rs b/src/proto.rs index 3e9fe20..7b8aa4b 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -6,16 +6,12 @@ use log::trace; use async_std::io::prelude::WriteExt; use async_std::io::ReadExt; -use tokio::io::{ReadHalf, WriteHalf}; -use tokio::net::TcpStream; use tokio::sync::mpsc; use async_trait::async_trait; use crate::error::*; -use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite, TokioCompat}; - /// Priority of a request (click to read more about priorities). /// /// This priority value is used to priorize messages @@ -92,11 +88,14 @@ impl SendQueue { #[async_trait] pub(crate) trait SendLoop: Sync { - async fn send_loop( + async fn send_loop<W>( self: Arc<Self>, mut msg_recv: mpsc::UnboundedReceiver<Option<(RequestID, RequestPriority, Vec<u8>)>>, - mut write: BoxStreamWrite<TokioCompat<WriteHalf<TcpStream>>>, - ) -> Result<(), Error> { + mut write: W, + ) -> Result<(), Error> + where + W: WriteExt + Unpin + Send + Sync, + { let mut sending = SendQueue::new(); let mut should_exit = false; while !should_exit || !sending.is_empty() { @@ -167,10 +166,10 @@ pub(crate) trait RecvLoop: Sync + 'static { // Returns true if we should stop receiving after this async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>); - async fn recv_loop( - self: Arc<Self>, - mut read: BoxStreamRead<TokioCompat<ReadHalf<TcpStream>>>, - ) -> Result<(), Error> { + async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> + where + R: ReadExt + Unpin + Send + Sync, + { let mut receiving = HashMap::new(); loop { trace!("recv_loop: reading packet"); |