aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
authortrinity-1686a <trinity@deuxfleurs.fr>2022-07-18 15:21:13 +0200
committertrinity-1686a <trinity@deuxfleurs.fr>2022-07-18 15:21:13 +0200
commitcdff8ae1beab44a22d0eb0eb00c624e49971b6ca (patch)
tree1586e6042c03dd34336d4d2f2a79f1479f5e83ec /src/client.rs
parentd3d18b8e8bde5fee81022fd050d5f4c114262fcf (diff)
downloadnetapp-cdff8ae1beab44a22d0eb0eb00c624e49971b6ca.tar.gz
netapp-cdff8ae1beab44a22d0eb0eb00c624e49971b6ca.zip
add detection of premature eos
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs7
1 files changed, 4 insertions, 3 deletions
diff --git a/src/client.rs b/src/client.rs
index a630f87..6d49f5c 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex};
use arc_swap::ArcSwapOption;
use log::{debug, error, trace};
+use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::{mpsc, oneshot, watch};
@@ -41,7 +42,7 @@ pub(crate) struct ClientConn {
ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, AssociatedStream)>>,
next_query_number: AtomicU32,
- inflight: Mutex<HashMap<RequestID, oneshot::Sender<AssociatedStream>>>,
+ inflight: Mutex<HashMap<RequestID, oneshot::Sender<UnboundedReceiver<Packet>>>>,
}
impl ClientConn {
@@ -186,7 +187,7 @@ impl ClientConn {
error!(
"Too many inflight requests! RequestID collision. Interrupting previous request."
);
- if old_ch.send(Box::pin(futures::stream::empty())).is_err() {
+ if old_ch.send(unbounded().1).is_err() {
debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response.");
}
}
@@ -232,7 +233,7 @@ impl SendLoop for ClientConn {}
#[async_trait]
impl RecvLoop for ClientConn {
- fn recv_handler(self: &Arc<Self>, id: RequestID, stream: AssociatedStream) {
+ fn recv_handler(self: &Arc<Self>, id: RequestID, stream: UnboundedReceiver<Packet>) {
trace!("ClientConn recv_handler {}", id);
let mut inflight = self.inflight.lock().unwrap();