aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
authortrinity-1686a <trinity@deuxfleurs.fr>2022-06-20 23:40:31 +0200
committertrinity-1686a <trinity@deuxfleurs.fr>2022-06-20 23:40:31 +0200
commitd3d18b8e8bde5fee81022fd050d5f4c114262fcf (patch)
tree1ac73cb61b0e5298c36f913c303537561269498e /src/client.rs
parent0fec85b47a1bc679d2684994bfae6ef0fe7d4911 (diff)
downloadnetapp-d3d18b8e8bde5fee81022fd050d5f4c114262fcf.tar.gz
netapp-d3d18b8e8bde5fee81022fd050d5f4c114262fcf.zip
use a framing protocol instead of even/odd channel
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs32
1 files changed, 12 insertions, 20 deletions
diff --git a/src/client.rs b/src/client.rs
index bc16fb1..a630f87 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -37,10 +37,11 @@ pub(crate) struct ClientConn {
pub(crate) remote_addr: SocketAddr,
pub(crate) peer_id: NodeID,
- query_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Data)>>,
+ query_send:
+ ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, AssociatedStream)>>,
next_query_number: AtomicU32,
- inflight: Mutex<HashMap<RequestID, oneshot::Sender<(Vec<u8>, AssociatedStream)>>>,
+ inflight: Mutex<HashMap<RequestID, oneshot::Sender<AssociatedStream>>>,
}
impl ClientConn {
@@ -148,11 +149,9 @@ impl ClientConn {
{
let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?;
- // increment by 2; even are direct data; odd are associated stream
let id = self
.next_query_number
- .fetch_add(2, atomic::Ordering::Relaxed);
- let stream_id = id + 1;
+ .fetch_add(1, atomic::Ordering::Relaxed);
cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] {
@@ -187,10 +186,7 @@ impl ClientConn {
error!(
"Too many inflight requests! RequestID collision. Interrupting previous request."
);
- if old_ch
- .send((vec![], Box::pin(futures::stream::empty())))
- .is_err()
- {
+ if old_ch.send(Box::pin(futures::stream::empty())).is_err() {
debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response.");
}
}
@@ -200,22 +196,18 @@ impl ClientConn {
#[cfg(feature = "telemetry")]
span.set_attribute(KeyValue::new("len_query", bytes.len() as i64));
- query_send.send((id, prio, Data::Full(bytes)))?;
- if let Some(stream) = stream {
- query_send.send((stream_id, prio | PRIO_SECONDARY, Data::Streaming(stream)))?;
- } else {
- query_send.send((stream_id, prio, Data::Full(Vec::new())))?;
- }
+ query_send.send((id, prio, Framing::new(bytes, stream).into_stream()))?;
cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] {
- let (resp, stream) = resp_recv
+ let stream = resp_recv
.with_context(Context::current_with_span(span))
.await?;
} else {
- let (resp, stream) = resp_recv.await?;
+ let stream = resp_recv.await?;
}
}
+ let (resp, stream) = Framing::from_stream(stream).await?.into_parts();
if resp.is_empty() {
return Err(Error::Message(
@@ -240,12 +232,12 @@ impl SendLoop for ClientConn {}
#[async_trait]
impl RecvLoop for ClientConn {
- fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>, stream: AssociatedStream) {
- trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len());
+ fn recv_handler(self: &Arc<Self>, id: RequestID, stream: AssociatedStream) {
+ trace!("ClientConn recv_handler {}", id);
let mut inflight = self.inflight.lock().unwrap();
if let Some(ch) = inflight.remove(&id) {
- if ch.send((msg, stream)).is_err() {
+ if ch.send(stream).is_err() {
debug!("Could not send request response, probably because request was interrupted. Dropping response.");
}
}