aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
authortrinity-1686a <trinity@deuxfleurs.fr>2022-06-20 23:40:31 +0200
committertrinity-1686a <trinity@deuxfleurs.fr>2022-06-20 23:40:31 +0200
commitd3d18b8e8bde5fee81022fd050d5f4c114262fcf (patch)
tree1ac73cb61b0e5298c36f913c303537561269498e /src/server.rs
parent0fec85b47a1bc679d2684994bfae6ef0fe7d4911 (diff)
downloadnetapp-d3d18b8e8bde5fee81022fd050d5f4c114262fcf.tar.gz
netapp-d3d18b8e8bde5fee81022fd050d5f4c114262fcf.zip
use a framing protocol instead of even/odd channel
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs26
1 files changed, 10 insertions, 16 deletions
diff --git a/src/server.rs b/src/server.rs
index 6cd4056..86e5156 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -2,7 +2,6 @@ use std::net::SocketAddr;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
-use bytes::Bytes;
use log::{debug, trace};
#[cfg(feature = "telemetry")]
@@ -55,7 +54,7 @@ pub(crate) struct ServerConn {
netapp: Arc<NetApp>,
- resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Data)>>,
+ resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, AssociatedStream)>>,
}
impl ServerConn {
@@ -177,13 +176,13 @@ impl SendLoop for ServerConn {}
#[async_trait]
impl RecvLoop for ServerConn {
- fn recv_handler(self: &Arc<Self>, id: RequestID, bytes: Vec<u8>, stream: AssociatedStream) {
+ fn recv_handler(self: &Arc<Self>, id: RequestID, stream: AssociatedStream) {
let resp_send = self.resp_send.load_full().unwrap();
let self2 = self.clone();
tokio::spawn(async move {
- trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len());
- let bytes: Bytes = bytes.into();
+ trace!("ServerConn recv_handler {}", id);
+ let (bytes, stream) = Framing::from_stream(stream).await?.into_parts();
let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 };
let resp = self2.recv_handler_aux(&bytes[..], stream).await;
@@ -204,18 +203,13 @@ impl RecvLoop for ServerConn {
trace!("ServerConn sending response to {}: ", id);
resp_send
- .send((id, prio, Data::Full(resp_bytes)))
+ .send((
+ id,
+ prio,
+ Framing::new(resp_bytes, resp_stream).into_stream(),
+ ))
.log_err("ServerConn recv_handler send resp bytes");
-
- if let Some(resp_stream) = resp_stream {
- resp_send
- .send((id + 1, prio, Data::Streaming(resp_stream)))
- .log_err("ServerConn recv_handler send resp stream");
- } else {
- resp_send
- .send((id + 1, prio, Data::Full(Vec::new())))
- .log_err("ServerConn recv_handler send resp stream");
- }
+ Ok::<_, Error>(())
});
}
}