aboutsummaryrefslogtreecommitdiff
path: root/src/proto.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-25 09:27:57 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-25 09:27:57 +0200
commitbb4ddf3b61cf0ef1027c11e51c8aa51be9eb2142 (patch)
tree7a0e781e341738bba782fc0eae0fe0748f53fa51 /src/proto.rs
parent9b64c27da68f7ac9049e02e26da918e871a63f07 (diff)
downloadnetapp-bb4ddf3b61cf0ef1027c11e51c8aa51be9eb2142.tar.gz
netapp-bb4ddf3b61cf0ef1027c11e51c8aa51be9eb2142.zip
Better handle connection closing
Diffstat (limited to 'src/proto.rs')
-rw-r--r--src/proto.rs11
1 files changed, 9 insertions, 2 deletions
diff --git a/src/proto.rs b/src/proto.rs
index bf82e47..47480a9 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use log::trace;
use futures::{AsyncReadExt, AsyncWriteExt};
+use kuska_handshake::async_std::BoxStreamWrite;
use tokio::sync::mpsc;
@@ -100,7 +101,7 @@ pub(crate) trait SendLoop: Sync {
async fn send_loop<W>(
self: Arc<Self>,
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
- mut write: W,
+ mut write: BoxStreamWrite<W>,
) -> Result<(), Error>
where
W: AsyncWriteExt + Unpin + Send + Sync,
@@ -160,6 +161,7 @@ pub(crate) trait SendLoop: Sync {
}
}
}
+ write.goodbye().await?;
Ok(())
}
}
@@ -177,7 +179,11 @@ pub(crate) trait RecvLoop: Sync + 'static {
loop {
trace!("recv_loop: reading packet");
let mut header_id = [0u8; RequestID::BITS as usize / 8];
- read.read_exact(&mut header_id[..]).await?;
+ match read.read_exact(&mut header_id[..]).await {
+ Ok(_) => (),
+ Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
+ Err(e) => return Err(e.into()),
+ };
let id = RequestID::from_be_bytes(header_id);
trace!("recv_loop: got header id: {:04x}", id);
@@ -202,6 +208,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
self.recv_handler(id, msg_bytes);
}
}
+ Ok(())
}
}