diff options
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 26 |
1 files changed, 10 insertions, 16 deletions
diff --git a/src/server.rs b/src/server.rs index 6cd4056..86e5156 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,7 +2,6 @@ use std::net::SocketAddr; use std::sync::Arc; use arc_swap::ArcSwapOption; -use bytes::Bytes; use log::{debug, trace}; #[cfg(feature = "telemetry")] @@ -55,7 +54,7 @@ pub(crate) struct ServerConn { netapp: Arc<NetApp>, - resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, Data)>>, + resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, AssociatedStream)>>, } impl ServerConn { @@ -177,13 +176,13 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { - fn recv_handler(self: &Arc<Self>, id: RequestID, bytes: Vec<u8>, stream: AssociatedStream) { + fn recv_handler(self: &Arc<Self>, id: RequestID, stream: AssociatedStream) { let resp_send = self.resp_send.load_full().unwrap(); let self2 = self.clone(); tokio::spawn(async move { - trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); - let bytes: Bytes = bytes.into(); + trace!("ServerConn recv_handler {}", id); + let (bytes, stream) = Framing::from_stream(stream).await?.into_parts(); let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 }; let resp = self2.recv_handler_aux(&bytes[..], stream).await; @@ -204,18 +203,13 @@ impl RecvLoop for ServerConn { trace!("ServerConn sending response to {}: ", id); resp_send - .send((id, prio, Data::Full(resp_bytes))) + .send(( + id, + prio, + Framing::new(resp_bytes, resp_stream).into_stream(), + )) .log_err("ServerConn recv_handler send resp bytes"); - - if let Some(resp_stream) = resp_stream { - resp_send - .send((id + 1, prio, Data::Streaming(resp_stream))) - .log_err("ServerConn recv_handler send resp stream"); - } else { - resp_send - .send((id + 1, prio, Data::Full(Vec::new()))) - .log_err("ServerConn recv_handler send resp stream"); - } + Ok::<_, Error>(()) }); } } |