aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs77
1 files changed, 32 insertions, 45 deletions
diff --git a/src/server.rs b/src/server.rs
index a835959..2c12d9d 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -2,8 +2,15 @@ use std::net::SocketAddr;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
-use bytes::Bytes;
-use log::{debug, trace};
+use async_trait::async_trait;
+use log::*;
+
+use futures::io::{AsyncReadExt, AsyncWriteExt};
+use kuska_handshake::async_std::{handshake_server, BoxStream};
+use tokio::net::TcpStream;
+use tokio::select;
+use tokio::sync::{mpsc, watch};
+use tokio_util::compat::*;
#[cfg(feature = "telemetry")]
use opentelemetry::{
@@ -15,21 +22,12 @@ use opentelemetry_contrib::trace::propagator::binary::*;
#[cfg(feature = "telemetry")]
use rand::{thread_rng, Rng};
-use tokio::net::TcpStream;
-use tokio::select;
-use tokio::sync::{mpsc, watch};
-use tokio_util::compat::*;
-
-use futures::io::{AsyncReadExt, AsyncWriteExt};
-
-use async_trait::async_trait;
-
-use kuska_handshake::async_std::{handshake_server, BoxStream};
-
use crate::error::*;
+use crate::message::*;
use crate::netapp::*;
-use crate::proto::*;
-use crate::proto2::*;
+use crate::recv::*;
+use crate::send::*;
+use crate::stream::*;
use crate::util::*;
// The client and server connection structs (client.rs and server.rs)
@@ -55,7 +53,7 @@ pub(crate) struct ServerConn {
netapp: Arc<NetApp>,
- resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>>,
+ resp_send: ArcSwapOption<mpsc::UnboundedSender<SendStream>>,
}
impl ServerConn {
@@ -126,13 +124,12 @@ impl ServerConn {
Ok(())
}
- async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> {
- let msg = QueryMessage::decode(bytes)?;
- let path = String::from_utf8(msg.path.to_vec())?;
+ async fn recv_handler_aux(self: &Arc<Self>, req_enc: ReqEnc) -> Result<RespEnc, Error> {
+ let path = String::from_utf8(req_enc.path.to_vec())?;
let handler_opt = {
let endpoints = self.netapp.endpoints.read().unwrap();
- endpoints.get(&path).map(|e| e.clone_endpoint())
+ endpoints.get(&path[..]).map(|e| e.clone_endpoint())
};
if let Some(handler) = handler_opt {
@@ -140,9 +137,9 @@ impl ServerConn {
if #[cfg(feature = "telemetry")] {
let tracer = opentelemetry::global::tracer("netapp");
- let mut span = if let Some(telemetry_id) = msg.telemetry_id {
+ let mut span = if !req_enc.telemetry_id.is_empty() {
let propagator = BinaryPropagator::new();
- let context = propagator.from_bytes(telemetry_id);
+ let context = propagator.from_bytes(req_enc.telemetry_id.to_vec());
let context = Context::new().with_remote_span_context(context);
tracer.span_builder(format!(">> RPC {}", path))
.with_kind(SpanKind::Server)
@@ -157,13 +154,13 @@ impl ServerConn {
.start(&tracer)
};
span.set_attribute(KeyValue::new("path", path.to_string()));
- span.set_attribute(KeyValue::new("len_query", msg.body.len() as i64));
+ span.set_attribute(KeyValue::new("len_query_msg", req_enc.msg.len() as i64));
- handler.handle(msg.body, self.peer_id)
+ handler.handle(req_enc, self.peer_id)
.with_context(Context::current_with_span(span))
.await
} else {
- handler.handle(msg.body, self.peer_id).await
+ handler.handle(req_enc, self.peer_id).await
}
}
} else {
@@ -176,35 +173,25 @@ impl SendLoop for ServerConn {}
#[async_trait]
impl RecvLoop for ServerConn {
- fn recv_handler(self: &Arc<Self>, id: RequestID, bytes: Vec<u8>) {
+ fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
let resp_send = self.resp_send.load_full().unwrap();
let self2 = self.clone();
tokio::spawn(async move {
- trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len());
- let bytes: Bytes = bytes.into();
-
- let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 };
- let resp = self2.recv_handler_aux(&bytes[..]).await;
+ debug!("server: recv_handler got {}", id);
- let resp_bytes = match resp {
- Ok(rb) => {
- let mut resp_bytes = vec![0u8];
- resp_bytes.extend(rb);
- resp_bytes
- }
- Err(e) => {
- let mut resp_bytes = vec![e.code()];
- resp_bytes.extend(e.to_string().into_bytes());
- resp_bytes
- }
+ let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
+ Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
+ Err(e) => (PRIO_HIGH, Err(e)),
};
- trace!("ServerConn sending response to {}: ", id);
+ debug!("server: sending response to {}", id);
+ let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result);
resp_send
- .send((id, prio, resp_bytes))
- .log_err("ServerConn recv_handler send resp");
+ .send((id, prio, resp_order, resp_stream))
+ .log_err("ServerConn recv_handler send resp bytes");
+ Ok::<_, Error>(())
});
}
}