diff options
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 53 |
1 files changed, 20 insertions, 33 deletions
diff --git a/src/server.rs b/src/server.rs index 1f1c22a..ae1196c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -28,6 +28,7 @@ use crate::message::*; use crate::netapp::*; use crate::recv::*; use crate::send::*; +use crate::stream::*; use crate::util::*; // The client and server connection structs (client.rs and server.rs) @@ -121,17 +122,12 @@ impl ServerConn { Ok(()) } - async fn recv_handler_aux( - self: &Arc<Self>, - bytes: &[u8], - stream: ByteStream, - ) -> Result<(Vec<u8>, Option<ByteStream>), Error> { - let msg = QueryMessage::decode(bytes)?; - let path = String::from_utf8(msg.path.to_vec())?; + async fn recv_handler_aux(self: &Arc<Self>, req_enc: ReqEnc) -> Result<RespEnc, Error> { + let path = String::from_utf8(req_enc.path.to_vec())?; let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); - endpoints.get(&path).map(|e| e.clone_endpoint()) + endpoints.get(&path[..]).map(|e| e.clone_endpoint()) }; if let Some(handler) = handler_opt { @@ -139,9 +135,9 @@ impl ServerConn { if #[cfg(feature = "telemetry")] { let tracer = opentelemetry::global::tracer("netapp"); - let mut span = if let Some(telemetry_id) = msg.telemetry_id { + let mut span = if !req_enc.telemetry_id.is_empty() { let propagator = BinaryPropagator::new(); - let context = propagator.from_bytes(telemetry_id); + let context = propagator.from_bytes(req_enc.telemetry_id.to_vec()); let context = Context::new().with_remote_span_context(context); tracer.span_builder(format!(">> RPC {}", path)) .with_kind(SpanKind::Server) @@ -156,13 +152,13 @@ impl ServerConn { .start(&tracer) }; span.set_attribute(KeyValue::new("path", path.to_string())); - span.set_attribute(KeyValue::new("len_query", msg.body.len() as i64)); + span.set_attribute(KeyValue::new("len_query_msg", req_enc.msg.len() as i64)); - handler.handle(msg.body, stream, self.peer_id) + handler.handle(req_enc, self.peer_id) .with_context(Context::current_with_span(span)) .await } else { - handler.handle(msg.body, stream, self.peer_id).await + handler.handle(req_enc, self.peer_id).await } } } else { @@ -181,32 +177,23 @@ impl RecvLoop for ServerConn { let self2 = self.clone(); tokio::spawn(async move { trace!("ServerConn recv_handler {}", id); - let (bytes, stream) = Framing::from_stream(stream).await?.into_parts(); - - let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 }; - let resp = self2.recv_handler_aux(&bytes[..], stream).await; - - let (resp_bytes, resp_stream) = match resp { - Ok((rb, rs)) => { - let mut resp_bytes = vec![0u8]; - resp_bytes.extend(rb); - (resp_bytes, rs) - } - Err(e) => { - let mut resp_bytes = vec![e.code()]; - resp_bytes.extend(e.to_string().into_bytes()); - (resp_bytes, None) + let (prio, resp_enc) = match ReqEnc::decode(Box::pin(stream)).await { + Ok(req_enc) => { + let prio = req_enc.prio; + let resp = self2.recv_handler_aux(req_enc).await; + + (prio, match resp { + Ok(resp_enc) => resp_enc, + Err(e) => RespEnc::from_err(e), + }) } + Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)), }; trace!("ServerConn sending response to {}: ", id); resp_send - .send(( - id, - prio, - Framing::new(resp_bytes, resp_stream).into_stream(), - )) + .send((id, prio, resp_enc.encode())) .log_err("ServerConn recv_handler send resp bytes"); Ok::<_, Error>(()) }); |