aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs58
1 files changed, 18 insertions, 40 deletions
diff --git a/src/client.rs b/src/client.rs
index c878627..42eeaa3 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
+use bytes::Bytes;
use log::{debug, error, trace};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
@@ -28,6 +29,7 @@ use crate::message::*;
use crate::netapp::*;
use crate::recv::*;
use crate::send::*;
+use crate::stream::*;
use crate::util::*;
pub(crate) struct ClientConn {
@@ -155,24 +157,16 @@ impl ClientConn {
.with_kind(SpanKind::Client)
.start(&tracer);
let propagator = BinaryPropagator::new();
- let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec());
+ let telemetry_id: Bytes = propagator.to_bytes(span.span_context()).to_vec().into();
} else {
- let telemetry_id: Option<Vec<u8>> = None;
+ let telemetry_id: Bytes = Bytes::new();
}
};
// Encode request
- let body = req.msg_ser.unwrap().clone();
- let stream = req.body.into_stream();
-
- let request = QueryMessage {
- prio,
- path: path.as_bytes(),
- telemetry_id,
- body: &body[..],
- };
- let bytes = request.encode();
- drop(body);
+ let req_enc = req.into_enc(prio, path.as_bytes().to_vec().into(), telemetry_id);
+ let req_msg_len = req_enc.msg.len();
+ let req_stream = req_enc.encode();
// Send request through
let (resp_send, resp_recv) = oneshot::channel();
@@ -181,17 +175,19 @@ impl ClientConn {
error!(
"Too many inflight requests! RequestID collision. Interrupting previous request."
);
- if old_ch.send(unbounded().1).is_err() {
- debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response.");
- }
+ let _ = old_ch.send(unbounded().1);
}
- trace!("request: query_send {}, {} bytes", id, bytes.len());
+ trace!(
+ "request: query_send {} (serialized message: {} bytes)",
+ id,
+ req_msg_len
+ );
#[cfg(feature = "telemetry")]
- span.set_attribute(KeyValue::new("len_query", bytes.len() as i64));
+ span.set_attribute(KeyValue::new("len_query_msg", req_msg_len as i64));
- query_send.send((id, prio, Framing::new(bytes, stream).into_stream()))?;
+ query_send.send((id, prio, req_stream))?;
cfg_if::cfg_if! {
if #[cfg(feature = "telemetry")] {
@@ -202,28 +198,10 @@ impl ClientConn {
let stream = resp_recv.await?;
}
}
- let (resp, stream) = Framing::from_stream(stream).await?.into_parts();
- if resp.is_empty() {
- return Err(Error::Message(
- "Response is 0 bytes, either a collision or a protocol error".into(),
- ));
- }
-
- trace!("request response {}: ", id);
-
- let code = resp[0];
- if code == 0 {
- let ser_resp = rmp_serde::decode::from_read_ref(&resp[1..])?;
- Ok(Resp {
- _phantom: Default::default(),
- msg: ser_resp,
- body: BodyData::Stream(stream),
- })
- } else {
- let msg = String::from_utf8(resp[1..].to_vec()).unwrap_or_default();
- Err(Error::Remote(code, msg))
- }
+ let resp_enc = RespEnc::decode(Box::pin(stream)).await?;
+ trace!("request response {}", id);
+ Resp::from_enc(resp_enc)
}
}