diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 12:45:38 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-22 12:45:38 +0200 |
commit | 0b71ca12f910c17eaf2291076438dff3b70dc9cd (patch) | |
tree | 28c4239938b1bd585052c9a1b8b6a752b9c3bbe0 /src/client.rs | |
parent | c358fe3c92da8a8454e461484737efe2a14dfd73 (diff) | |
download | netapp-0b71ca12f910c17eaf2291076438dff3b70dc9cd.tar.gz netapp-0b71ca12f910c17eaf2291076438dff3b70dc9cd.zip |
Clean up framing protocol
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 58 |
1 files changed, 18 insertions, 40 deletions
diff --git a/src/client.rs b/src/client.rs index c878627..42eeaa3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; use async_trait::async_trait; +use bytes::Bytes; use log::{debug, error, trace}; use futures::channel::mpsc::{unbounded, UnboundedReceiver}; @@ -28,6 +29,7 @@ use crate::message::*; use crate::netapp::*; use crate::recv::*; use crate::send::*; +use crate::stream::*; use crate::util::*; pub(crate) struct ClientConn { @@ -155,24 +157,16 @@ impl ClientConn { .with_kind(SpanKind::Client) .start(&tracer); let propagator = BinaryPropagator::new(); - let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec()); + let telemetry_id: Bytes = propagator.to_bytes(span.span_context()).to_vec().into(); } else { - let telemetry_id: Option<Vec<u8>> = None; + let telemetry_id: Bytes = Bytes::new(); } }; // Encode request - let body = req.msg_ser.unwrap().clone(); - let stream = req.body.into_stream(); - - let request = QueryMessage { - prio, - path: path.as_bytes(), - telemetry_id, - body: &body[..], - }; - let bytes = request.encode(); - drop(body); + let req_enc = req.into_enc(prio, path.as_bytes().to_vec().into(), telemetry_id); + let req_msg_len = req_enc.msg.len(); + let req_stream = req_enc.encode(); // Send request through let (resp_send, resp_recv) = oneshot::channel(); @@ -181,17 +175,19 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - if old_ch.send(unbounded().1).is_err() { - debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); - } + let _ = old_ch.send(unbounded().1); } - trace!("request: query_send {}, {} bytes", id, bytes.len()); + trace!( + "request: query_send {} (serialized message: {} bytes)", + id, + req_msg_len + ); #[cfg(feature = "telemetry")] - span.set_attribute(KeyValue::new("len_query", bytes.len() as i64)); + span.set_attribute(KeyValue::new("len_query_msg", req_msg_len as i64)); - query_send.send((id, prio, Framing::new(bytes, stream).into_stream()))?; + query_send.send((id, prio, req_stream))?; cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { @@ -202,28 +198,10 @@ impl ClientConn { let stream = resp_recv.await?; } } - let (resp, stream) = Framing::from_stream(stream).await?.into_parts(); - if resp.is_empty() { - return Err(Error::Message( - "Response is 0 bytes, either a collision or a protocol error".into(), - )); - } - - trace!("request response {}: ", id); - - let code = resp[0]; - if code == 0 { - let ser_resp = rmp_serde::decode::from_read_ref(&resp[1..])?; - Ok(Resp { - _phantom: Default::default(), - msg: ser_resp, - body: BodyData::Stream(stream), - }) - } else { - let msg = String::from_utf8(resp[1..].to_vec()).unwrap_or_default(); - Err(Error::Remote(code, msg)) - } + let resp_enc = RespEnc::decode(Box::pin(stream)).await?; + trace!("request response {}", id); + Resp::from_enc(resp_enc) } } |