From 368ba908794901bc793c6a087c02241be046bdf2 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sun, 5 Jun 2022 15:33:43 +0200 Subject: initial work on associated stream still require testing, and fixing a few kinks: - sending packets > 16k truncate them - send one more packet than it could at eos - probably update documentation /!\ contains breaking changes --- src/client.rs | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 8227e8f..bce7aca 100644 --- a/src/client.rs +++ b/src/client.rs @@ -37,10 +37,10 @@ pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, - query_send: ArcSwapOption)>>, + query_send: ArcSwapOption>, next_query_number: AtomicU32, - inflight: Mutex>>>, + inflight: Mutex, AssociatedStream)>>>, } impl ClientConn { @@ -148,9 +148,11 @@ impl ClientConn { { let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?; + // increment by 2; even are direct data; odd are associated stream let id = self .next_query_number - .fetch_add(1, atomic::Ordering::Relaxed); + .fetch_add(2, atomic::Ordering::Relaxed); + let stream_id = id + 1; cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { @@ -166,7 +168,7 @@ impl ClientConn { }; // Encode request - let body = rmp_to_vec_all_named(rq.borrow())?; + let (body, stream) = rmp_to_vec_all_named(rq.borrow())?; drop(rq); let request = QueryMessage { @@ -185,7 +187,10 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - if old_ch.send(vec![]).is_err() { + if old_ch + .send((vec![], Box::pin(futures::stream::empty()))) + .is_err() + { debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); } } @@ -195,15 +200,20 @@ impl ClientConn { #[cfg(feature = "telemetry")] span.set_attribute(KeyValue::new("len_query", bytes.len() as i64)); - query_send.send((id, prio, bytes))?; + query_send.send((id, prio, Data::Full(bytes)))?; + if let Some(stream) = stream { + query_send.send((stream_id, prio | PRIO_SECONDARY, Data::Streaming(stream)))?; + } else { + query_send.send((stream_id, prio, Data::Full(Vec::new())))?; + } cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { - let resp = resp_recv + let (resp, stream) = resp_recv .with_context(Context::current_with_span(span)) .await?; } else { - let resp = resp_recv.await?; + let (resp, stream) = resp_recv.await?; } } @@ -217,10 +227,9 @@ impl ClientConn { let code = resp[0]; if code == 0 { - Ok(rmp_serde::decode::from_read_ref::< - _, - ::Response, - >(&resp[1..])?) + let mut deser = rmp_serde::decode::Deserializer::from_read_ref(&resp[1..]); + let res = T::Response::deserialize_msg(&mut deser, stream).await?; + Ok(res) } else { let msg = String::from_utf8(resp[1..].to_vec()).unwrap_or_default(); Err(Error::Remote(code, msg)) @@ -232,12 +241,12 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { - fn recv_handler(self: &Arc, id: RequestID, msg: Vec) { + fn recv_handler(self: &Arc, id: RequestID, msg: Vec, stream: AssociatedStream) { trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); let mut inflight = self.inflight.lock().unwrap(); if let Some(ch) = inflight.remove(&id) { - if ch.send(msg).is_err() { + if ch.send((msg, stream)).is_err() { debug!("Could not send request response, probably because request was interrupted. Dropping response."); } } -- cgit v1.2.3 From 4745e7c4ba5665d3303ae567087781778cec9c34 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 8 Jun 2022 00:30:56 +0200 Subject: further work on streams most changes still required are related to error handling --- src/client.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index bce7aca..bc16fb1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -227,9 +227,8 @@ impl ClientConn { let code = resp[0]; if code == 0 { - let mut deser = rmp_serde::decode::Deserializer::from_read_ref(&resp[1..]); - let res = T::Response::deserialize_msg(&mut deser, stream).await?; - Ok(res) + let ser_resp = rmp_serde::decode::from_read_ref(&resp[1..])?; + Ok(T::Response::deserialize_msg(ser_resp, stream).await) } else { let msg = String::from_utf8(resp[1..].to_vec()).unwrap_or_default(); Err(Error::Remote(code, msg)) -- cgit v1.2.3 From d3d18b8e8bde5fee81022fd050d5f4c114262fcf Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 20 Jun 2022 23:40:31 +0200 Subject: use a framing protocol instead of even/odd channel --- src/client.rs | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index bc16fb1..a630f87 100644 --- a/src/client.rs +++ b/src/client.rs @@ -37,10 +37,11 @@ pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, - query_send: ArcSwapOption>, + query_send: + ArcSwapOption>, next_query_number: AtomicU32, - inflight: Mutex, AssociatedStream)>>>, + inflight: Mutex>>, } impl ClientConn { @@ -148,11 +149,9 @@ impl ClientConn { { let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?; - // increment by 2; even are direct data; odd are associated stream let id = self .next_query_number - .fetch_add(2, atomic::Ordering::Relaxed); - let stream_id = id + 1; + .fetch_add(1, atomic::Ordering::Relaxed); cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { @@ -187,10 +186,7 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - if old_ch - .send((vec![], Box::pin(futures::stream::empty()))) - .is_err() - { + if old_ch.send(Box::pin(futures::stream::empty())).is_err() { debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); } } @@ -200,22 +196,18 @@ impl ClientConn { #[cfg(feature = "telemetry")] span.set_attribute(KeyValue::new("len_query", bytes.len() as i64)); - query_send.send((id, prio, Data::Full(bytes)))?; - if let Some(stream) = stream { - query_send.send((stream_id, prio | PRIO_SECONDARY, Data::Streaming(stream)))?; - } else { - query_send.send((stream_id, prio, Data::Full(Vec::new())))?; - } + query_send.send((id, prio, Framing::new(bytes, stream).into_stream()))?; cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { - let (resp, stream) = resp_recv + let stream = resp_recv .with_context(Context::current_with_span(span)) .await?; } else { - let (resp, stream) = resp_recv.await?; + let stream = resp_recv.await?; } } + let (resp, stream) = Framing::from_stream(stream).await?.into_parts(); if resp.is_empty() { return Err(Error::Message( @@ -240,12 +232,12 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { - fn recv_handler(self: &Arc, id: RequestID, msg: Vec, stream: AssociatedStream) { - trace!("ClientConn recv_handler {} ({} bytes)", id, msg.len()); + fn recv_handler(self: &Arc, id: RequestID, stream: AssociatedStream) { + trace!("ClientConn recv_handler {}", id); let mut inflight = self.inflight.lock().unwrap(); if let Some(ch) = inflight.remove(&id) { - if ch.send((msg, stream)).is_err() { + if ch.send(stream).is_err() { debug!("Could not send request response, probably because request was interrupted. Dropping response."); } } -- cgit v1.2.3 From cdff8ae1beab44a22d0eb0eb00c624e49971b6ca Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 18 Jul 2022 15:21:13 +0200 Subject: add detection of premature eos --- src/client.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index a630f87..6d49f5c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; use log::{debug, error, trace}; +use futures::channel::mpsc::{unbounded, UnboundedReceiver}; use tokio::net::TcpStream; use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; @@ -41,7 +42,7 @@ pub(crate) struct ClientConn { ArcSwapOption>, next_query_number: AtomicU32, - inflight: Mutex>>, + inflight: Mutex>>>, } impl ClientConn { @@ -186,7 +187,7 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - if old_ch.send(Box::pin(futures::stream::empty())).is_err() { + if old_ch.send(unbounded().1).is_err() { debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); } } @@ -232,7 +233,7 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { - fn recv_handler(self: &Arc, id: RequestID, stream: AssociatedStream) { + fn recv_handler(self: &Arc, id: RequestID, stream: UnboundedReceiver) { trace!("ClientConn recv_handler {}", id); let mut inflight = self.inflight.lock().unwrap(); -- cgit v1.2.3 From f35fa7d18d9e0f51bed311355ec1310b1d311ab3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 17:34:53 +0200 Subject: Move things around --- src/client.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 6d49f5c..663a3e4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,9 +5,12 @@ use std::sync::atomic::{self, AtomicU32}; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; +use async_trait::async_trait; use log::{debug, error, trace}; use futures::channel::mpsc::{unbounded, UnboundedReceiver}; +use futures::io::AsyncReadExt; +use kuska_handshake::async_std::{handshake_client, BoxStream}; use tokio::net::TcpStream; use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; @@ -21,25 +24,18 @@ use opentelemetry::{ #[cfg(feature = "telemetry")] use opentelemetry_contrib::trace::propagator::binary::*; -use futures::io::AsyncReadExt; - -use async_trait::async_trait; - -use kuska_handshake::async_std::{handshake_client, BoxStream}; - -use crate::endpoint::*; use crate::error::*; +use crate::message::*; use crate::netapp::*; -use crate::proto::*; -use crate::proto2::*; +use crate::recv::*; +use crate::send::*; use crate::util::*; pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, - query_send: - ArcSwapOption>, + query_send: ArcSwapOption>, next_query_number: AtomicU32, inflight: Mutex>>>, -- cgit v1.2.3 From 44bbc1c00c2532e08dff0d4a547b0a707e89f32d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 19:05:51 +0200 Subject: Rename AutoSerialize into SimpleMessage and refactor a bit --- src/client.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 663a3e4..cf80746 100644 --- a/src/client.rs +++ b/src/client.rs @@ -134,15 +134,14 @@ impl ClientConn { self.query_send.store(None); } - pub(crate) async fn call( + pub(crate) async fn call( self: Arc, - rq: B, + rq: T, path: &str, prio: RequestPriority, ) -> Result<::Response, Error> where T: Message, - B: Borrow, { let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?; @@ -164,7 +163,8 @@ impl ClientConn { }; // Encode request - let (body, stream) = rmp_to_vec_all_named(rq.borrow())?; + let (rq, stream) = rq.into_parts(); + let body = rmp_to_vec_all_named(&rq)?; drop(rq); let request = QueryMessage { @@ -217,7 +217,7 @@ impl ClientConn { let code = resp[0]; if code == 0 { let ser_resp = rmp_serde::decode::from_read_ref(&resp[1..])?; - Ok(T::Response::deserialize_msg(ser_resp, stream).await) + Ok(T::Response::from_parts(ser_resp, stream)) } else { let msg = String::from_utf8(resp[1..].to_vec()).unwrap_or_default(); Err(Error::Remote(code, msg)) -- cgit v1.2.3 From 7d148c7e764d563efa3bccc0f14f50867db38ef1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 19:25:07 +0200 Subject: One possibility, but I don't like it --- src/client.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index cf80746..9d572aa 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,3 @@ -use std::borrow::Borrow; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::atomic::{self, AtomicU32}; -- cgit v1.2.3 From 4934ed726d51913afd97ca937d0ece39ef8b7371 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 20:22:56 +0200 Subject: Propose alternative API --- src/client.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 9d572aa..c878627 100644 --- a/src/client.rs +++ b/src/client.rs @@ -135,10 +135,10 @@ impl ClientConn { pub(crate) async fn call( self: Arc, - rq: T, + req: Req, path: &str, prio: RequestPriority, - ) -> Result<::Response, Error> + ) -> Result, Error> where T: Message, { @@ -162,9 +162,8 @@ impl ClientConn { }; // Encode request - let (rq, stream) = rq.into_parts(); - let body = rmp_to_vec_all_named(&rq)?; - drop(rq); + let body = req.msg_ser.unwrap().clone(); + let stream = req.body.into_stream(); let request = QueryMessage { prio, @@ -216,7 +215,11 @@ impl ClientConn { let code = resp[0]; if code == 0 { let ser_resp = rmp_serde::decode::from_read_ref(&resp[1..])?; - Ok(T::Response::from_parts(ser_resp, stream)) + Ok(Resp { + _phantom: Default::default(), + msg: ser_resp, + body: BodyData::Stream(stream), + }) } else { let msg = String::from_utf8(resp[1..].to_vec()).unwrap_or_default(); Err(Error::Remote(code, msg)) -- cgit v1.2.3 From 0b71ca12f910c17eaf2291076438dff3b70dc9cd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 12:45:38 +0200 Subject: Clean up framing protocol --- src/client.rs | 58 ++++++++++++++++++---------------------------------------- 1 file changed, 18 insertions(+), 40 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index c878627..42eeaa3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; use async_trait::async_trait; +use bytes::Bytes; use log::{debug, error, trace}; use futures::channel::mpsc::{unbounded, UnboundedReceiver}; @@ -28,6 +29,7 @@ use crate::message::*; use crate::netapp::*; use crate::recv::*; use crate::send::*; +use crate::stream::*; use crate::util::*; pub(crate) struct ClientConn { @@ -155,24 +157,16 @@ impl ClientConn { .with_kind(SpanKind::Client) .start(&tracer); let propagator = BinaryPropagator::new(); - let telemetry_id = Some(propagator.to_bytes(span.span_context()).to_vec()); + let telemetry_id: Bytes = propagator.to_bytes(span.span_context()).to_vec().into(); } else { - let telemetry_id: Option> = None; + let telemetry_id: Bytes = Bytes::new(); } }; // Encode request - let body = req.msg_ser.unwrap().clone(); - let stream = req.body.into_stream(); - - let request = QueryMessage { - prio, - path: path.as_bytes(), - telemetry_id, - body: &body[..], - }; - let bytes = request.encode(); - drop(body); + let req_enc = req.into_enc(prio, path.as_bytes().to_vec().into(), telemetry_id); + let req_msg_len = req_enc.msg.len(); + let req_stream = req_enc.encode(); // Send request through let (resp_send, resp_recv) = oneshot::channel(); @@ -181,17 +175,19 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - if old_ch.send(unbounded().1).is_err() { - debug!("Could not send empty response to collisionned request, probably because request was interrupted. Dropping response."); - } + let _ = old_ch.send(unbounded().1); } - trace!("request: query_send {}, {} bytes", id, bytes.len()); + trace!( + "request: query_send {} (serialized message: {} bytes)", + id, + req_msg_len + ); #[cfg(feature = "telemetry")] - span.set_attribute(KeyValue::new("len_query", bytes.len() as i64)); + span.set_attribute(KeyValue::new("len_query_msg", req_msg_len as i64)); - query_send.send((id, prio, Framing::new(bytes, stream).into_stream()))?; + query_send.send((id, prio, req_stream))?; cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { @@ -202,28 +198,10 @@ impl ClientConn { let stream = resp_recv.await?; } } - let (resp, stream) = Framing::from_stream(stream).await?.into_parts(); - if resp.is_empty() { - return Err(Error::Message( - "Response is 0 bytes, either a collision or a protocol error".into(), - )); - } - - trace!("request response {}: ", id); - - let code = resp[0]; - if code == 0 { - let ser_resp = rmp_serde::decode::from_read_ref(&resp[1..])?; - Ok(Resp { - _phantom: Default::default(), - msg: ser_resp, - body: BodyData::Stream(stream), - }) - } else { - let msg = String::from_utf8(resp[1..].to_vec()).unwrap_or_default(); - Err(Error::Remote(code, msg)) - } + let resp_enc = RespEnc::decode(Box::pin(stream)).await?; + trace!("request response {}", id); + Resp::from_enc(resp_enc) } } -- cgit v1.2.3 From 9cb28c21b4a80aa9f29097f6bb1b8b6c23446ddc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 13:01:52 +0200 Subject: Use bounded channels on receive side for backpressure --- src/client.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 42eeaa3..d51236b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,7 +8,6 @@ use async_trait::async_trait; use bytes::Bytes; use log::{debug, error, trace}; -use futures::channel::mpsc::{unbounded, UnboundedReceiver}; use futures::io::AsyncReadExt; use kuska_handshake::async_std::{handshake_client, BoxStream}; use tokio::net::TcpStream; @@ -39,7 +38,7 @@ pub(crate) struct ClientConn { query_send: ArcSwapOption>, next_query_number: AtomicU32, - inflight: Mutex>>>, + inflight: Mutex>>, } impl ClientConn { @@ -175,7 +174,9 @@ impl ClientConn { error!( "Too many inflight requests! RequestID collision. Interrupting previous request." ); - let _ = old_ch.send(unbounded().1); + let _ = old_ch.send(Box::pin(futures::stream::once(async move { + Err(Error::IdCollision.code()) + }))); } trace!( @@ -199,7 +200,7 @@ impl ClientConn { } } - let resp_enc = RespEnc::decode(Box::pin(stream)).await?; + let resp_enc = RespEnc::decode(stream).await?; trace!("request response {}", id); Resp::from_enc(resp_enc) } @@ -209,7 +210,7 @@ impl SendLoop for ClientConn {} #[async_trait] impl RecvLoop for ClientConn { - fn recv_handler(self: &Arc, id: RequestID, stream: UnboundedReceiver) { + fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream) { trace!("ClientConn recv_handler {}", id); let mut inflight = self.inflight.lock().unwrap(); -- cgit v1.2.3 From 74e57016f63b6052cf6d539812859c3a46138eee Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 25 Jul 2022 15:04:52 +0200 Subject: Add some debugging --- src/client.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index d51236b..2fccdb8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -179,10 +179,9 @@ impl ClientConn { }))); } - trace!( - "request: query_send {} (serialized message: {} bytes)", - id, - req_msg_len + debug!( + "request: query_send {}, path {}, prio {} (serialized message: {} bytes)", + id, path, prio, req_msg_len ); #[cfg(feature = "telemetry")] @@ -201,7 +200,7 @@ impl ClientConn { } let resp_enc = RespEnc::decode(stream).await?; - trace!("request response {}", id); + debug!("client: got response to request {} (path {})", id, path); Resp::from_enc(resp_enc) } } -- cgit v1.2.3 From 7909a95d3c02a738c9a088c1cb8a5d6f70b06046 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 11:21:24 +0200 Subject: Stream errors are now std::io::Error --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 2fccdb8..0dcbdf1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -175,7 +175,7 @@ impl ClientConn { "Too many inflight requests! RequestID collision. Interrupting previous request." ); let _ = old_ch.send(Box::pin(futures::stream::once(async move { - Err(Error::IdCollision.code()) + Err(std::io::Error::new(std::io::ErrorKind::Other, "RequestID collision, too many inflight requests")) }))); } -- cgit v1.2.3 From cd203f5708907c2bf172a3c5b7c5b40e2557b2f4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:15:50 +0200 Subject: Add OrderTag to Req and Resp, refactor errors --- src/client.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 0dcbdf1..aef7bbb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -35,7 +35,7 @@ pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, - query_send: ArcSwapOption>, + query_send: ArcSwapOption>, next_query_number: AtomicU32, inflight: Mutex>>, @@ -165,7 +165,7 @@ impl ClientConn { // Encode request let req_enc = req.into_enc(prio, path.as_bytes().to_vec().into(), telemetry_id); let req_msg_len = req_enc.msg.len(); - let req_stream = req_enc.encode(); + let (req_stream, req_order) = req_enc.encode(); // Send request through let (resp_send, resp_recv) = oneshot::channel(); @@ -175,7 +175,10 @@ impl ClientConn { "Too many inflight requests! RequestID collision. Interrupting previous request." ); let _ = old_ch.send(Box::pin(futures::stream::once(async move { - Err(std::io::Error::new(std::io::ErrorKind::Other, "RequestID collision, too many inflight requests")) + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "RequestID collision, too many inflight requests", + )) }))); } -- cgit v1.2.3 From 4a59b73d7bfd0f136f654e874afb5d2a9bf4df2e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:46:33 +0200 Subject: Add actual support for order tag --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index aef7bbb..df54810 100644 --- a/src/client.rs +++ b/src/client.rs @@ -190,7 +190,7 @@ impl ClientConn { #[cfg(feature = "telemetry")] span.set_attribute(KeyValue::new("len_query_msg", req_msg_len as i64)); - query_send.send((id, prio, req_stream))?; + query_send.send((id, prio, req_order, req_stream))?; cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { -- cgit v1.2.3 From 522f420e2bf30d5ef6f50dccb88adf86882ac7c6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 15:54:11 +0200 Subject: Implement request cancellation --- src/client.rs | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 2 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 9726125..d82c91e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::atomic::{self, AtomicU32}; use std::sync::{Arc, Mutex}; +use std::task::Poll; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -9,6 +11,7 @@ use bytes::Bytes; use log::{debug, error, trace}; use futures::io::AsyncReadExt; +use futures::Stream; use kuska_handshake::async_std::{handshake_client, BoxStream}; use tokio::net::TcpStream; use tokio::select; @@ -35,7 +38,7 @@ pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, - query_send: ArcSwapOption>, + query_send: ArcSwapOption>, next_query_number: AtomicU32, inflight: Mutex>>, @@ -193,7 +196,9 @@ impl ClientConn { #[cfg(feature = "telemetry")] span.set_attribute(KeyValue::new("len_query_msg", req_msg_len as i64)); - query_send.send((id, prio, req_order, req_stream))?; + query_send.send(SendItem::Stream(id, prio, req_order, req_stream))?; + + let canceller = CancelOnDrop::new(id, query_send.as_ref().clone()); cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { @@ -205,6 +210,8 @@ impl ClientConn { } } + let stream = Box::pin(canceller.for_stream(stream)); + let resp_enc = RespEnc::decode(stream).await?; debug!("client: got response to request {} (path {})", id, path); Resp::from_enc(resp_enc) @@ -223,6 +230,63 @@ impl RecvLoop for ClientConn { if ch.send(stream).is_err() { debug!("Could not send request response, probably because request was interrupted. Dropping response."); } + } else { + debug!("Got unexpected response to request {}, dropping it", id); + } + } +} + +// ---- + +struct CancelOnDrop { + id: RequestID, + query_send: mpsc::UnboundedSender, +} + +impl CancelOnDrop { + fn new(id: RequestID, query_send: mpsc::UnboundedSender) -> Self { + Self { id, query_send } + } + fn for_stream(self, stream: ByteStream) -> CancelOnDropStream { + CancelOnDropStream { + cancel: Some(self), + stream: stream, + } + } +} + +impl Drop for CancelOnDrop { + fn drop(&mut self) { + trace!("cancelling request {}", self.id); + let _ = self.query_send.send(SendItem::Cancel(self.id)); + } +} + +#[pin_project::pin_project] +struct CancelOnDropStream { + cancel: Option, + #[pin] + stream: ByteStream, +} + +impl Stream for CancelOnDropStream { + type Item = Packet; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.project(); + let res = this.stream.poll_next(cx); + if matches!(res, Poll::Ready(None)) { + if let Some(c) = this.cancel.take() { + std::mem::forget(c) + } } + res + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() } } -- cgit v1.2.3 From b931d0d1cfb39d5feae1d4e0a7a49cdebd45761b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:01:56 +0200 Subject: try debug --- src/client.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index d82c91e..7dffa36 100644 --- a/src/client.rs +++ b/src/client.rs @@ -280,6 +280,7 @@ impl Stream for CancelOnDropStream { let res = this.stream.poll_next(cx); if matches!(res, Poll::Ready(None)) { if let Some(c) = this.cancel.take() { + trace!("defusing cancel request {}", c.id); std::mem::forget(c) } } -- cgit v1.2.3 From f6ad1d0fab340e77fbfcb3488a98c342d334838e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:13:43 +0200 Subject: less verbosity --- src/client.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 7dffa36..d82c91e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -280,7 +280,6 @@ impl Stream for CancelOnDropStream { let res = this.stream.poll_next(cx); if matches!(res, Poll::Ready(None)) { if let Some(c) = this.cancel.take() { - trace!("defusing cancel request {}", c.id); std::mem::forget(c) } } -- cgit v1.2.3