diff options
author | trinity-1686a <trinity@deuxfleurs.fr> | 2022-07-18 15:21:13 +0200 |
---|---|---|
committer | trinity-1686a <trinity@deuxfleurs.fr> | 2022-07-18 15:21:13 +0200 |
commit | cdff8ae1beab44a22d0eb0eb00c624e49971b6ca (patch) | |
tree | 1586e6042c03dd34336d4d2f2a79f1479f5e83ec /src/client.rs | |
parent | d3d18b8e8bde5fee81022fd050d5f4c114262fcf (diff) | |
download | netapp-cdff8ae1beab44a22d0eb0eb00c624e49971b6ca.tar.gz netapp-cdff8ae1beab44a22d0eb0eb00c624e49971b6ca.zip |
add detection of premature eos
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/src/client.rs b/src/client.rs index a630f87..6d49f5c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; use log::{debug, error, trace}; +use futures::channel::mpsc::{unbounded, UnboundedReceiver}; use tokio::net::TcpStream; use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; @@ -41,7 +42,7 @@ pub(crate) struct ClientConn { ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, AssociatedStream)>>, next_query_number: AtomicU32, - inflight: Mutex<HashMap<RequestID, oneshot::Sender<AssociatedStream>>>, + inflight: Mutex<HashMap<RequestID, oneshot::Sender<UnboundedReceiver<Packet>>>>, } impl ClientConn { @@ -186,7 +187,7 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - if old_ch.send(Box::pin(futures::stream::empty())).is_err() { + if old_ch.send(unbounded().1).is_err() { debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); } } @@ -232,7 +233,7 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { - fn recv_handler(self: &Arc<Self>, id: RequestID, stream: AssociatedStream) { + fn recv_handler(self: &Arc<Self>, id: RequestID, stream: UnboundedReceiver<Packet>) { trace!("ClientConn recv_handler {}", id); let mut inflight = self.inflight.lock().unwrap(); |