diff options
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 32 |
1 files changed, 12 insertions, 20 deletions
diff --git a/src/client.rs b/src/client.rs index bc16fb1..a630f87 100644 --- a/src/client.rs +++ b/src/client.rs @@ -37,10 +37,11 @@ pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, - query_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Data)>>, + query_send: + ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, AssociatedStream)>>, next_query_number: AtomicU32, - inflight: Mutex<HashMap<RequestID, oneshot::Sender<(Vec<u8>, AssociatedStream)>>>, + inflight: Mutex<HashMap<RequestID, oneshot::Sender<AssociatedStream>>>, } impl ClientConn { @@ -148,11 +149,9 @@ impl ClientConn { { let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?; - // increment by 2; even are direct data; odd are associated stream let id = self .next_query_number - .fetch_add(2, atomic::Ordering::Relaxed); - let stream_id = id + 1; + .fetch_add(1, atomic::Ordering::Relaxed); cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { @@ -187,10 +186,7 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - if old_ch - .send((vec![], Box::pin(futures::stream::empty()))) - .is_err() - { + if old_ch.send(Box::pin(futures::stream::empty())).is_err() { debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); } } @@ -200,22 +196,18 @@ impl ClientConn { #[cfg(feature = "telemetry")] span.set_attribute(KeyValue::new("len_query", bytes.len() as i64)); - query_send.send((id, prio, Data::Full(bytes)))?; - if let Some(stream) = stream { - query_send.send((stream_id, prio | PRIO_SECONDARY, Data::Streaming(stream)))?; - } else { - query_send.send((stream_id, prio, Data::Full(Vec::new())))?; - } + query_send.send((id, prio, Framing::new(bytes, stream).into_stream()))?; cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { - let (resp, stream) = resp_recv + let stream = resp_recv .with_context(Context::current_with_span(span)) .await?; } else { - let (resp, stream) = resp_recv.await?; + let stream = resp_recv.await?; } } + let (resp, stream) = Framing::from_stream(stream).await?.into_parts(); if resp.is_empty() { return Err(Error::Message( @@ -240,12 +232,12 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { - fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>, stream: AssociatedStream) { - trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); + fn recv_handler(self: &Arc<Self>, id: RequestID, stream: AssociatedStream) { + trace!("ClientConn recv_handler {}", id); let mut inflight = self.inflight.lock().unwrap(); if let Some(ch) = inflight.remove(&id) { - if ch.send((msg, stream)).is_err() { + if ch.send(stream).is_err() { debug!("Could not send request response, probably because request was interrupted. Dropping response."); } } |