From 368ba908794901bc793c6a087c02241be046bdf2 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sun, 5 Jun 2022 15:33:43 +0200 Subject: initial work on associated stream still require testing, and fixing a few kinks: - sending packets > 16k truncate them - send one more packet than it could at eos - probably update documentation /!\ contains breaking changes --- src/server.rs | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 5465307..6cd4056 100644 --- a/src/server.rs +++ b/src/server.rs @@ -55,7 +55,7 @@ pub(crate) struct ServerConn { netapp: Arc, - resp_send: ArcSwapOption)>>, + resp_send: ArcSwapOption>, } impl ServerConn { @@ -123,7 +123,11 @@ impl ServerConn { Ok(()) } - async fn recv_handler_aux(self: &Arc, bytes: &[u8]) -> Result, Error> { + async fn recv_handler_aux( + self: &Arc, + bytes: &[u8], + stream: AssociatedStream, + ) -> Result<(Vec, Option), Error> { let msg = QueryMessage::decode(bytes)?; let path = String::from_utf8(msg.path.to_vec())?; @@ -156,11 +160,11 @@ impl ServerConn { span.set_attribute(KeyValue::new("path", path.to_string())); span.set_attribute(KeyValue::new("len_query", msg.body.len() as i64)); - handler.handle(msg.body, self.peer_id) + handler.handle(msg.body, stream, self.peer_id) .with_context(Context::current_with_span(span)) .await } else { - handler.handle(msg.body, self.peer_id).await + handler.handle(msg.body, stream, self.peer_id).await } } } else { @@ -173,7 +177,7 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { - fn recv_handler(self: &Arc, id: RequestID, bytes: Vec) { + fn recv_handler(self: &Arc, id: RequestID, bytes: Vec, stream: AssociatedStream) { let resp_send = self.resp_send.load_full().unwrap(); let self2 = self.clone(); @@ -182,26 +186,36 @@ impl RecvLoop for ServerConn { let bytes: Bytes = bytes.into(); let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 }; - let resp = self2.recv_handler_aux(&bytes[..]).await; + let resp = self2.recv_handler_aux(&bytes[..], stream).await; - let resp_bytes = match resp { - Ok(rb) => { + let (resp_bytes, resp_stream) = match resp { + Ok((rb, rs)) => { let mut resp_bytes = vec![0u8]; resp_bytes.extend(rb); - resp_bytes + (resp_bytes, rs) } Err(e) => { let mut resp_bytes = vec![e.code()]; resp_bytes.extend(e.to_string().into_bytes()); - resp_bytes + (resp_bytes, None) } }; trace!("ServerConn sending response to {}: ", id); resp_send - .send((id, prio, resp_bytes)) - .log_err("ServerConn recv_handler send resp"); + .send((id, prio, Data::Full(resp_bytes))) + .log_err("ServerConn recv_handler send resp bytes"); + + if let Some(resp_stream) = resp_stream { + resp_send + .send((id + 1, prio, Data::Streaming(resp_stream))) + .log_err("ServerConn recv_handler send resp stream"); + } else { + resp_send + .send((id + 1, prio, Data::Full(Vec::new()))) + .log_err("ServerConn recv_handler send resp stream"); + } }); } } -- cgit v1.2.3 From d3d18b8e8bde5fee81022fd050d5f4c114262fcf Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 20 Jun 2022 23:40:31 +0200 Subject: use a framing protocol instead of even/odd channel --- src/server.rs | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 6cd4056..86e5156 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,7 +2,6 @@ use std::net::SocketAddr; use std::sync::Arc; use arc_swap::ArcSwapOption; -use bytes::Bytes; use log::{debug, trace}; #[cfg(feature = "telemetry")] @@ -55,7 +54,7 @@ pub(crate) struct ServerConn { netapp: Arc, - resp_send: ArcSwapOption>, + resp_send: ArcSwapOption>, } impl ServerConn { @@ -177,13 +176,13 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { - fn recv_handler(self: &Arc, id: RequestID, bytes: Vec, stream: AssociatedStream) { + fn recv_handler(self: &Arc, id: RequestID, stream: AssociatedStream) { let resp_send = self.resp_send.load_full().unwrap(); let self2 = self.clone(); tokio::spawn(async move { - trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); - let bytes: Bytes = bytes.into(); + trace!("ServerConn recv_handler {}", id); + let (bytes, stream) = Framing::from_stream(stream).await?.into_parts(); let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 }; let resp = self2.recv_handler_aux(&bytes[..], stream).await; @@ -204,18 +203,13 @@ impl RecvLoop for ServerConn { trace!("ServerConn sending response to {}: ", id); resp_send - .send((id, prio, Data::Full(resp_bytes))) + .send(( + id, + prio, + Framing::new(resp_bytes, resp_stream).into_stream(), + )) .log_err("ServerConn recv_handler send resp bytes"); - - if let Some(resp_stream) = resp_stream { - resp_send - .send((id + 1, prio, Data::Streaming(resp_stream))) - .log_err("ServerConn recv_handler send resp stream"); - } else { - resp_send - .send((id + 1, prio, Data::Full(Vec::new()))) - .log_err("ServerConn recv_handler send resp stream"); - } + Ok::<_, Error>(()) }); } } -- cgit v1.2.3 From cdff8ae1beab44a22d0eb0eb00c624e49971b6ca Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 18 Jul 2022 15:21:13 +0200 Subject: add detection of premature eos --- src/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 86e5156..8075484 100644 --- a/src/server.rs +++ b/src/server.rs @@ -19,6 +19,7 @@ use tokio::select; use tokio::sync::{mpsc, watch}; use tokio_util::compat::*; +use futures::channel::mpsc::UnboundedReceiver; use futures::io::{AsyncReadExt, AsyncWriteExt}; use async_trait::async_trait; @@ -176,7 +177,7 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { - fn recv_handler(self: &Arc, id: RequestID, stream: AssociatedStream) { + fn recv_handler(self: &Arc, id: RequestID, stream: UnboundedReceiver) { let resp_send = self.resp_send.load_full().unwrap(); let self2 = self.clone(); -- cgit v1.2.3 From f35fa7d18d9e0f51bed311355ec1310b1d311ab3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 17:34:53 +0200 Subject: Move things around --- src/server.rs | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 8075484..1f1c22a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,8 +2,17 @@ use std::net::SocketAddr; use std::sync::Arc; use arc_swap::ArcSwapOption; +use async_trait::async_trait; use log::{debug, trace}; +use futures::channel::mpsc::UnboundedReceiver; +use futures::io::{AsyncReadExt, AsyncWriteExt}; +use kuska_handshake::async_std::{handshake_server, BoxStream}; +use tokio::net::TcpStream; +use tokio::select; +use tokio::sync::{mpsc, watch}; +use tokio_util::compat::*; + #[cfg(feature = "telemetry")] use opentelemetry::{ trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer}, @@ -14,22 +23,11 @@ use opentelemetry_contrib::trace::propagator::binary::*; #[cfg(feature = "telemetry")] use rand::{thread_rng, Rng}; -use tokio::net::TcpStream; -use tokio::select; -use tokio::sync::{mpsc, watch}; -use tokio_util::compat::*; - -use futures::channel::mpsc::UnboundedReceiver; -use futures::io::{AsyncReadExt, AsyncWriteExt}; - -use async_trait::async_trait; - -use kuska_handshake::async_std::{handshake_server, BoxStream}; - use crate::error::*; +use crate::message::*; use crate::netapp::*; -use crate::proto::*; -use crate::proto2::*; +use crate::recv::*; +use crate::send::*; use crate::util::*; // The client and server connection structs (client.rs and server.rs) @@ -55,7 +53,7 @@ pub(crate) struct ServerConn { netapp: Arc, - resp_send: ArcSwapOption>, + resp_send: ArcSwapOption>, } impl ServerConn { @@ -126,8 +124,8 @@ impl ServerConn { async fn recv_handler_aux( self: &Arc, bytes: &[u8], - stream: AssociatedStream, - ) -> Result<(Vec, Option), Error> { + stream: ByteStream, + ) -> Result<(Vec, Option), Error> { let msg = QueryMessage::decode(bytes)?; let path = String::from_utf8(msg.path.to_vec())?; -- cgit v1.2.3 From 0b71ca12f910c17eaf2291076438dff3b70dc9cd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 12:45:38 +0200 Subject: Clean up framing protocol --- src/server.rs | 53 ++++++++++++++++++++--------------------------------- 1 file changed, 20 insertions(+), 33 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 1f1c22a..ae1196c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -28,6 +28,7 @@ use crate::message::*; use crate::netapp::*; use crate::recv::*; use crate::send::*; +use crate::stream::*; use crate::util::*; // The client and server connection structs (client.rs and server.rs) @@ -121,17 +122,12 @@ impl ServerConn { Ok(()) } - async fn recv_handler_aux( - self: &Arc, - bytes: &[u8], - stream: ByteStream, - ) -> Result<(Vec, Option), Error> { - let msg = QueryMessage::decode(bytes)?; - let path = String::from_utf8(msg.path.to_vec())?; + async fn recv_handler_aux(self: &Arc, req_enc: ReqEnc) -> Result { + let path = String::from_utf8(req_enc.path.to_vec())?; let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); - endpoints.get(&path).map(|e| e.clone_endpoint()) + endpoints.get(&path[..]).map(|e| e.clone_endpoint()) }; if let Some(handler) = handler_opt { @@ -139,9 +135,9 @@ impl ServerConn { if #[cfg(feature = "telemetry")] { let tracer = opentelemetry::global::tracer("netapp"); - let mut span = if let Some(telemetry_id) = msg.telemetry_id { + let mut span = if !req_enc.telemetry_id.is_empty() { let propagator = BinaryPropagator::new(); - let context = propagator.from_bytes(telemetry_id); + let context = propagator.from_bytes(req_enc.telemetry_id.to_vec()); let context = Context::new().with_remote_span_context(context); tracer.span_builder(format!(">> RPC {}", path)) .with_kind(SpanKind::Server) @@ -156,13 +152,13 @@ impl ServerConn { .start(&tracer) }; span.set_attribute(KeyValue::new("path", path.to_string())); - span.set_attribute(KeyValue::new("len_query", msg.body.len() as i64)); + span.set_attribute(KeyValue::new("len_query_msg", req_enc.msg.len() as i64)); - handler.handle(msg.body, stream, self.peer_id) + handler.handle(req_enc, self.peer_id) .with_context(Context::current_with_span(span)) .await } else { - handler.handle(msg.body, stream, self.peer_id).await + handler.handle(req_enc, self.peer_id).await } } } else { @@ -181,32 +177,23 @@ impl RecvLoop for ServerConn { let self2 = self.clone(); tokio::spawn(async move { trace!("ServerConn recv_handler {}", id); - let (bytes, stream) = Framing::from_stream(stream).await?.into_parts(); - - let prio = if !bytes.is_empty() { bytes[0] } else { 0u8 }; - let resp = self2.recv_handler_aux(&bytes[..], stream).await; - - let (resp_bytes, resp_stream) = match resp { - Ok((rb, rs)) => { - let mut resp_bytes = vec![0u8]; - resp_bytes.extend(rb); - (resp_bytes, rs) - } - Err(e) => { - let mut resp_bytes = vec![e.code()]; - resp_bytes.extend(e.to_string().into_bytes()); - (resp_bytes, None) + let (prio, resp_enc) = match ReqEnc::decode(Box::pin(stream)).await { + Ok(req_enc) => { + let prio = req_enc.prio; + let resp = self2.recv_handler_aux(req_enc).await; + + (prio, match resp { + Ok(resp_enc) => resp_enc, + Err(e) => RespEnc::from_err(e), + }) } + Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)), }; trace!("ServerConn sending response to {}: ", id); resp_send - .send(( - id, - prio, - Framing::new(resp_bytes, resp_stream).into_stream(), - )) + .send((id, prio, resp_enc.encode())) .log_err("ServerConn recv_handler send resp bytes"); Ok::<_, Error>(()) }); -- cgit v1.2.3 From 9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 13:01:52 +0200 Subject: Use bounded channels on receive side for backpressure --- src/server.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index ae1196c..4b232af 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,7 +5,6 @@ use arc_swap::ArcSwapOption; use async_trait::async_trait; use log::{debug, trace}; -use futures::channel::mpsc::UnboundedReceiver; use futures::io::{AsyncReadExt, AsyncWriteExt}; use kuska_handshake::async_std::{handshake_server, BoxStream}; use tokio::net::TcpStream; @@ -171,21 +170,24 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { - fn recv_handler(self: &Arc, id: RequestID, stream: UnboundedReceiver) { + fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream) { let resp_send = self.resp_send.load_full().unwrap(); let self2 = self.clone(); tokio::spawn(async move { trace!("ServerConn recv_handler {}", id); - let (prio, resp_enc) = match ReqEnc::decode(Box::pin(stream)).await { + let (prio, resp_enc) = match ReqEnc::decode(stream).await { Ok(req_enc) => { let prio = req_enc.prio; let resp = self2.recv_handler_aux(req_enc).await; - (prio, match resp { - Ok(resp_enc) => resp_enc, - Err(e) => RespEnc::from_err(e), - }) + ( + prio, + match resp { + Ok(resp_enc) => resp_enc, + Err(e) => RespEnc::from_err(e), + }, + ) } Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)), }; -- cgit v1.2.3 From 74e57016f63b6052cf6d539812859c3a46138eee Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 25 Jul 2022 15:04:52 +0200 Subject: Add some debugging --- src/server.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 4b232af..57062d8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use log::{debug, trace}; +use log::*; use futures::io::{AsyncReadExt, AsyncWriteExt}; use kuska_handshake::async_std::{handshake_server, BoxStream}; @@ -175,7 +175,8 @@ impl RecvLoop for ServerConn { let self2 = self.clone(); tokio::spawn(async move { - trace!("ServerConn recv_handler {}", id); + debug!("server: recv_handler got {}", id); + let (prio, resp_enc) = match ReqEnc::decode(stream).await { Ok(req_enc) => { let prio = req_enc.prio; @@ -192,7 +193,7 @@ impl RecvLoop for ServerConn { Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)), }; - trace!("ServerConn sending response to {}: ", id); + debug!("server: sending response to {}", id); resp_send .send((id, prio, resp_enc.encode())) -- cgit v1.2.3 From cd203f5708907c2bf172a3c5b7c5b40e2557b2f4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:15:50 +0200 Subject: Add OrderTag to Req and Resp, refactor errors --- src/server.rs | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 57062d8..c23c9e4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -53,7 +53,7 @@ pub(crate) struct ServerConn { netapp: Arc, - resp_send: ArcSwapOption>, + resp_send: ArcSwapOption>, } impl ServerConn { @@ -177,26 +177,16 @@ impl RecvLoop for ServerConn { tokio::spawn(async move { debug!("server: recv_handler got {}", id); - let (prio, resp_enc) = match ReqEnc::decode(stream).await { - Ok(req_enc) => { - let prio = req_enc.prio; - let resp = self2.recv_handler_aux(req_enc).await; - - ( - prio, - match resp { - Ok(resp_enc) => resp_enc, - Err(e) => RespEnc::from_err(e), - }, - ) - } - Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)), + let (prio, resp_enc_result) = match ReqEnc::decode(stream).await { + Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await), + Err(e) => (PRIO_HIGH, Err(e)), }; debug!("server: sending response to {}", id); + let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result); resp_send - .send((id, prio, resp_enc.encode())) + .send((id, prio, resp_stream)) .log_err("ServerConn recv_handler send resp bytes"); Ok::<_, Error>(()) }); -- cgit v1.2.3 From 4a59b73d7bfd0f136f654e874afb5d2a9bf4df2e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:46:33 +0200 Subject: Add actual support for order tag --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index c23c9e4..f8c3f98 100644 --- a/src/server.rs +++ b/src/server.rs @@ -186,7 +186,7 @@ impl RecvLoop for ServerConn { let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result); resp_send - .send((id, prio, resp_stream)) + .send((id, prio, resp_order, resp_stream)) .log_err("ServerConn recv_handler send resp bytes"); Ok::<_, Error>(()) }); -- cgit v1.2.3 From 522f420e2bf30d5ef6f50dccb88adf86882ac7c6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 15:54:11 +0200 Subject: Implement request cancellation --- src/server.rs | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index 2c12d9d..f9eb121 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,6 @@ +use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -53,7 +54,8 @@ pub(crate) struct ServerConn { netapp: Arc, - resp_send: ArcSwapOption>, + resp_send: ArcSwapOption>, + running_handlers: Mutex>>, } impl ServerConn { @@ -99,6 +101,7 @@ impl ServerConn { remote_addr, peer_id, resp_send: ArcSwapOption::new(Some(Arc::new(resp_send))), + running_handlers: Mutex::new(HashMap::new()), }); netapp.connected_as_server(peer_id, conn.clone()); @@ -174,10 +177,15 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream) { - let resp_send = self.resp_send.load_full().unwrap(); + let resp_send = match self.resp_send.load_full() { + Some(c) => c, + None => return, + }; + + let mut rh = self.running_handlers.lock().unwrap(); let self2 = self.clone(); - tokio::spawn(async move { + let jh = tokio::spawn(async move { debug!("server: recv_handler got {}", id); let (prio, resp_enc_result) = match ReqEnc::decode(stream).await { @@ -189,9 +197,26 @@ impl RecvLoop for ServerConn { let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result); resp_send - .send((id, prio, resp_order, resp_stream)) + .send(SendItem::Stream(id, prio, resp_order, resp_stream)) .log_err("ServerConn recv_handler send resp bytes"); - Ok::<_, Error>(()) + + self2.running_handlers.lock().unwrap().remove(&id); }); + + rh.insert(id, jh); + } + + fn cancel_handler(self: &Arc, id: RequestID) { + trace!("received cancel for request {}", id); + + // If the handler is still running, abort it now + if let Some(jh) = self.running_handlers.lock().unwrap().remove(&id) { + jh.abort(); + } + + // Inform the response sender that we don't need to send the response + if let Some(resp_send) = self.resp_send.load_full() { + let _ = resp_send.send(SendItem::Cancel(id)); + } } } -- cgit v1.2.3 From 298e956a199711b65ce3820931ca943108b78225 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 12:48:54 +0200 Subject: undo needless change --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/server.rs') diff --git a/src/server.rs b/src/server.rs index f9eb121..cd367c4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -132,7 +132,7 @@ impl ServerConn { let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); - endpoints.get(&path[..]).map(|e| e.clone_endpoint()) + endpoints.get(&path).map(|e| e.clone_endpoint()) }; if let Some(handler) = handler_opt { -- cgit v1.2.3