aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 12:45:38 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-22 12:45:38 +0200
commit0b71ca12f910c17eaf2291076438dff3b70dc9cd (patch)
tree28c4239938b1bd585052c9a1b8b6a752b9c3bbe0 /src/server.rs
parentc358fe3c92da8a8454e461484737efe2a14dfd73 (diff)
downloadnetapp-0b71ca12f910c17eaf2291076438dff3b70dc9cd.tar.gz
netapp-0b71ca12f910c17eaf2291076438dff3b70dc9cd.zip
Clean up framing protocol
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs53
1 files changed, 20 insertions, 33 deletions
diff --git a/src/server.rs b/src/server.rs
index 1f1c22a..ae1196c 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -28,6 +28,7 @@ use crate::message::*;
use crate::netapp::*;
use crate::recv::*;
use crate::send::*;
+use crate::stream::*;
use crate::util::*;
// The client and server connection structs (client.rs and server.rs)
@@ -121,17 +122,12 @@ impl ServerConn {
Ok(())
}
- async fn recv_handler_aux(
- self: &Arc<Self>,
- bytes: &[u8],
- stream: ByteStream,
- ) -> Result<(Vec<u8>, Option<ByteStream>), Error> {
- let msg = QueryMessage::decode(bytes)?;
- let path = String::from_utf8(msg.path.to_vec())?;
+ async fn recv_handler_aux(self: &Arc<Self>, req_enc: ReqEnc) -> Result<RespEnc, Error> {
+ let path = String::from_utf8(req_enc.path.to_vec())?;
let handler_opt = {
let endpoints = self.netapp.endpoints.read().unwrap();
- endpoints.get(&path).map(|e| e.clone_endpoint())
+ endpoints.get(&path[..]).map(|e| e.clone_endpoint())
};
if let Some(handler) = handler_opt {
@@ -139,9 +135,9 @@ impl ServerConn {
if #[cfg(feature = "telemetry")] {
let tracer = opentelemetry::global::tracer("netapp");
- let mut span = if let Some(telemetry_id) = msg.telemetry_id {
+ let mut span = if !req_enc.telemetry_id.is_empty() {
let propagator = BinaryPropagator::new();
- let context = propagator.from_bytes(telemetry_id);
+ let context = propagator.from_bytes(req_enc.telemetry_id.to_vec());
let context = Context::new().with_remote_span_context(context);
tracer.span_builder(format!(">> RPC {}", path))
.with_kind(SpanKind::Server)
@@ -156,13 +152,13 @@ impl ServerConn {
.start(&tracer)
};
span.set_attribute(KeyValue::new("path", path.to_string()));
- span.set_attribute(KeyValue::new("len_query", msg.body.len() as i64));
+ span.set_attribute(KeyValue::new("len_query_msg", req_enc.msg.len() as i64));
- handler.handle(msg.body, stream, self.peer_id)
+ handler.handle(req_enc, self.peer_id)
.with_context(Context::current_with_span(span))
.await
} else {
- handler.handle(msg.body, stream, self.peer_id).await
+ handler.handle(req_enc, self.peer_id).await
}
}
} else {
@@ -181,32 +177,23 @@ impl RecvLoop for ServerConn {
let self2 = self.clone();
tokio::spawn(async move {
trace!("ServerConn recv_handler {}", id);
- let (bytes, stream) = Framing::from_stream(stream).await?.into_parts();
-
- let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 };
- let resp = self2.recv_handler_aux(&bytes[..], stream).await;
-
- let (resp_bytes, resp_stream) = match resp {
- Ok((rb, rs)) => {
- let mut resp_bytes = vec![0u8];
- resp_bytes.extend(rb);
- (resp_bytes, rs)
- }
- Err(e) => {
- let mut resp_bytes = vec![e.code()];
- resp_bytes.extend(e.to_string().into_bytes());
- (resp_bytes, None)
+ let (prio, resp_enc) = match ReqEnc::decode(Box::pin(stream)).await {
+ Ok(req_enc) => {
+ let prio = req_enc.prio;
+ let resp = self2.recv_handler_aux(req_enc).await;
+
+ (prio, match resp {
+ Ok(resp_enc) => resp_enc,
+ Err(e) => RespEnc::from_err(e),
+ })
}
+ Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)),
};
trace!("ServerConn sending response to {}: ", id);
resp_send
- .send((
- id,
- prio,
- Framing::new(resp_bytes, resp_stream).into_stream(),
- ))
+ .send((id, prio, resp_enc.encode()))
.log_err("ServerConn recv_handler send resp bytes");
Ok::<_, Error>(())
});