From 0b71ca12f910c17eaf2291076438dff3b70dc9cd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 12:45:38 +0200 Subject: Clean up framing protocol --- src/client.rs | 58 ++++++++++++++++++---------------------------------------- 1 file changed, 18 insertions(+), 40 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index c878627..42eeaa3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; use async_trait::async_trait; +use bytes::Bytes; use log::{debug, error, trace}; use futures::channel::mpsc::{unbounded, UnboundedReceiver}; @@ -28,6 +29,7 @@ use crate::message::*; use crate::netapp::*; use crate::recv::*; use crate::send::*; +use crate::stream::*; use crate::util::*; pub(crate) struct ClientConn { @@ -155,24 +157,16 @@ impl ClientConn { .with_kind(SpanKind::Client) .start(&tracer); let propagator = BinaryPropagator::new(); - let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec()); + let telemetry_id: Bytes = propagator.to_bytes(span.span_context()).to_vec().into(); } else { - let telemetry_id: Option> = None; + let telemetry_id: Bytes = Bytes::new(); } }; // Encode request - let body = req.msg_ser.unwrap().clone(); - let stream = req.body.into_stream(); - - let request = QueryMessage { - prio, - path: path.as_bytes(), - telemetry_id, - body: &body[..], - }; - let bytes = request.encode(); - drop(body); + let req_enc = req.into_enc(prio, path.as_bytes().to_vec().into(), telemetry_id); + let req_msg_len = req_enc.msg.len(); + let req_stream = req_enc.encode(); // Send request through let (resp_send, resp_recv) = oneshot::channel(); @@ -181,17 +175,19 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - if old_ch.send(unbounded().1).is_err() { - debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); - } + let _ = old_ch.send(unbounded().1); } - trace!("request: query_send {}, {} bytes", id, bytes.len()); + trace!( + "request: query_send {} (serialized message: {} bytes)", + id, + req_msg_len + ); #[cfg(feature = "telemetry")] - span.set_attribute(KeyValue::new("len_query", bytes.len() as i64)); + span.set_attribute(KeyValue::new("len_query_msg", req_msg_len as i64)); - query_send.send((id, prio, Framing::new(bytes, stream).into_stream()))?; + query_send.send((id, prio, req_stream))?; cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { @@ -202,28 +198,10 @@ impl ClientConn { let stream = resp_recv.await?; } } - let (resp, stream) = Framing::from_stream(stream).await?.into_parts(); - if resp.is_empty() { - return Err(Error::Message( - "Response is 0 bytes, either a collision or a protocol error".into(), - )); - } - - trace!("request response {}: ", id); - - let code = resp[0]; - if code == 0 { - let ser_resp = rmp_serde::decode::from_read_ref(&resp[1..])?; - Ok(Resp { - _phantom: Default::default(), - msg: ser_resp, - body: BodyData::Stream(stream), - }) - } else { - let msg = String::from_utf8(resp[1..].to_vec()).unwrap_or_default(); - Err(Error::Remote(code, msg)) - } + let resp_enc = RespEnc::decode(Box::pin(stream)).await?; + trace!("request response {}", id); + Resp::from_enc(resp_enc) } } -- cgit v1.2.3