From f35fa7d18d9e0f51bed311355ec1310b1d311ab3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 17:34:53 +0200 Subject: Move things around --- src/send.rs | 410 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 410 insertions(+) create mode 100644 src/send.rs (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs new file mode 100644 index 0000000..330d41d --- /dev/null +++ b/src/send.rs @@ -0,0 +1,410 @@ +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use async_trait::async_trait; +use log::trace; + +use futures::AsyncWriteExt; +use futures::Stream; +use kuska_handshake::async_std::BoxStreamWrite; +use tokio::sync::mpsc; + +use crate::error::*; +use crate::message::*; +use crate::util::{ByteStream, Packet}; + +// Messages are sent by chunks +// Chunk format: +// - u32 BE: request id (same for request and response) +// - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag +// when this is not the last chunk of the message +// - [u8; chunk_length] chunk data + +pub(crate) type RequestID = u32; +pub(crate) type ChunkLength = u16; +pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; +pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; +pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; + +struct SendQueueItem { + id: RequestID, + prio: RequestPriority, + data: DataReader, +} + +#[pin_project::pin_project] +struct DataReader { + #[pin] + reader: ByteStream, + packet: Packet, + pos: usize, + buf: Vec, + eos: bool, +} + +impl From for DataReader { + fn from(data: ByteStream) -> DataReader { + DataReader { + reader: data, + packet: Ok(Vec::new()), + pos: 0, + buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize), + eos: false, + } + } +} + +enum DataFrame { + Data { + /// a fixed size buffer containing some data, possibly padded with 0s + data: [u8; MAX_CHUNK_LENGTH as usize], + /// actual lenght of data + len: usize, + }, + Error(u8), +} + +struct DataReaderItem { + data: DataFrame, + /// whethere there may be more data comming from this stream. Can be used for some + /// optimization. It's an error to set it to false if there is more data, but it is correct + /// (albeit sub-optimal) to set it to true if there is nothing coming after + may_have_more: bool, +} + +impl DataReaderItem { + fn empty_last() -> Self { + DataReaderItem { + data: DataFrame::Data { + data: [0; MAX_CHUNK_LENGTH as usize], + len: 0, + }, + may_have_more: false, + } + } + + fn header(&self) -> [u8; 2] { + let continuation = if self.may_have_more { + CHUNK_HAS_CONTINUATION + } else { + 0 + }; + let len = match self.data { + DataFrame::Data { len, .. } => len as u16, + DataFrame::Error(e) => e as u16 | ERROR_MARKER, + }; + + ChunkLength::to_be_bytes(len | continuation) + } + + fn data(&self) -> &[u8] { + match self.data { + DataFrame::Data { ref data, len } => &data[..len], + DataFrame::Error(_) => &[], + } + } +} + +impl Stream for DataReader { + type Item = DataReaderItem; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + if *this.eos { + // eos was reached at previous call to poll_next, where a partial packet + // was returned. Now return None + return Poll::Ready(None); + } + + loop { + let packet = match this.packet { + Ok(v) => v, + Err(e) => { + let e = *e; + *this.packet = Ok(Vec::new()); + return Poll::Ready(Some(DataReaderItem { + data: DataFrame::Error(e), + may_have_more: true, + })); + } + }; + let packet_left = packet.len() - *this.pos; + let buf_left = MAX_CHUNK_LENGTH as usize - this.buf.len(); + let to_read = std::cmp::min(buf_left, packet_left); + this.buf + .extend_from_slice(&packet[*this.pos..*this.pos + to_read]); + *this.pos += to_read; + if this.buf.len() == MAX_CHUNK_LENGTH as usize { + // we have a full buf, ready to send + break; + } + + // we don't have a full buf, packet is empty; try receive more + if let Some(p) = futures::ready!(this.reader.as_mut().poll_next(cx)) { + *this.packet = p; + *this.pos = 0; + // if buf is empty, we will loop and return the error directly. If buf + // isn't empty, send it before by breaking. + if this.packet.is_err() && !this.buf.is_empty() { + break; + } + } else { + *this.eos = true; + break; + } + } + + let mut body = [0; MAX_CHUNK_LENGTH as usize]; + let len = this.buf.len(); + body[..len].copy_from_slice(this.buf); + this.buf.clear(); + Poll::Ready(Some(DataReaderItem { + data: DataFrame::Data { data: body, len }, + may_have_more: !*this.eos, + })) + } +} + +struct SendQueue { + items: VecDeque<(u8, VecDeque)>, +} + +impl SendQueue { + fn new() -> Self { + Self { + items: VecDeque::with_capacity(64), + } + } + fn push(&mut self, item: SendQueueItem) { + let prio = item.prio; + let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) { + Ok(i) => i, + Err(i) => { + self.items.insert(i, (prio, VecDeque::new())); + i + } + }; + self.items[pos_prio].1.push_back(item); + } + // used only in tests. They should probably be rewriten + #[allow(dead_code)] + fn pop(&mut self) -> Option { + match self.items.pop_front() { + None => None, + Some((prio, mut items_at_prio)) => { + let ret = items_at_prio.pop_front(); + if !items_at_prio.is_empty() { + self.items.push_front((prio, items_at_prio)); + } + ret.or_else(|| self.pop()) + } + } + } + fn is_empty(&self) -> bool { + self.items.iter().all(|(_k, v)| v.is_empty()) + } + + // this is like an async fn, but hand implemented + fn next_ready(&mut self) -> SendQueuePollNextReady<'_> { + SendQueuePollNextReady { queue: self } + } +} + +struct SendQueuePollNextReady<'a> { + queue: &'a mut SendQueue, +} + +impl<'a> futures::Future for SendQueuePollNextReady<'a> { + type Output = (RequestID, DataReaderItem); + + fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + for i in 0..self.queue.items.len() { + let (_prio, items_at_prio) = &mut self.queue.items[i]; + + for _ in 0..items_at_prio.len() { + let mut item = items_at_prio.pop_front().unwrap(); + + match Pin::new(&mut item.data).poll_next(ctx) { + Poll::Pending => items_at_prio.push_back(item), + Poll::Ready(Some(data)) => { + let id = item.id; + if data.may_have_more { + self.queue.push(item); + } else { + if items_at_prio.is_empty() { + // this priority level is empty, remove it + self.queue.items.remove(i); + } + } + return Poll::Ready((id, data)); + } + Poll::Ready(None) => { + if items_at_prio.is_empty() { + // this priority level is empty, remove it + self.queue.items.remove(i); + } + return Poll::Ready((item.id, DataReaderItem::empty_last())); + } + } + } + } + // TODO what do we do if self.queue is empty? We won't get scheduled again. + Poll::Pending + } +} + +/// The SendLoop trait, which is implemented both by the client and the server +/// connection objects (ServerConna and ClientConn) adds a method `.send_loop()` +/// that takes a channel of messages to send and an asynchronous writer, +/// and sends messages from the channel to the async writer, putting them in a queue +/// before being sent and doing the round-robin sending strategy. +/// +/// The `.send_loop()` exits when the sending end of the channel is closed, +/// or if there is an error at any time writing to the async writer. +#[async_trait] +pub(crate) trait SendLoop: Sync { + async fn send_loop( + self: Arc, + mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>, + mut write: BoxStreamWrite, + ) -> Result<(), Error> + where + W: AsyncWriteExt + Unpin + Send + Sync, + { + let mut sending = SendQueue::new(); + let mut should_exit = false; + while !should_exit || !sending.is_empty() { + let recv_fut = msg_recv.recv(); + futures::pin_mut!(recv_fut); + let send_fut = sending.next_ready(); + + // recv_fut is cancellation-safe according to tokio doc, + // send_fut is cancellation-safe as implemented above? + use futures::future::Either; + match futures::future::select(recv_fut, send_fut).await { + Either::Left((sth, _send_fut)) => { + if let Some((id, prio, data)) = sth { + sending.push(SendQueueItem { + id, + prio, + data: data.into(), + }); + } else { + should_exit = true; + }; + } + Either::Right(((id, data), _recv_fut)) => { + trace!("send_loop: sending bytes for {}", id); + + let header_id = RequestID::to_be_bytes(id); + write.write_all(&header_id[..]).await?; + + write.write_all(&data.header()).await?; + write.write_all(data.data()).await?; + write.flush().await?; + } + } + } + + let _ = write.goodbye().await; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + fn empty_data() -> DataReader { + type Item = Packet; + let stream: Pin + Send + 'static>> = + Box::pin(futures::stream::empty::()); + stream.into() + } + + #[test] + fn test_priority_queue() { + let i1 = SendQueueItem { + id: 1, + prio: PRIO_NORMAL, + data: empty_data(), + }; + let i2 = SendQueueItem { + id: 2, + prio: PRIO_HIGH, + data: empty_data(), + }; + let i2bis = SendQueueItem { + id: 20, + prio: PRIO_HIGH, + data: empty_data(), + }; + let i3 = SendQueueItem { + id: 3, + prio: PRIO_HIGH | PRIO_SECONDARY, + data: empty_data(), + }; + let i4 = SendQueueItem { + id: 4, + prio: PRIO_BACKGROUND | PRIO_SECONDARY, + data: empty_data(), + }; + let i5 = SendQueueItem { + id: 5, + prio: PRIO_BACKGROUND | PRIO_PRIMARY, + data: empty_data(), + }; + + let mut q = SendQueue::new(); + + q.push(i1); // 1 + let a = q.pop().unwrap(); // empty -> 1 + assert_eq!(a.id, 1); + assert!(q.pop().is_none()); + + q.push(a); // 1 + q.push(i2); // 2 1 + q.push(i2bis); // [2 20] 1 + let a = q.pop().unwrap(); // 20 1 -> 2 + assert_eq!(a.id, 2); + let b = q.pop().unwrap(); // 1 -> 20 + assert_eq!(b.id, 20); + let c = q.pop().unwrap(); // empty -> 1 + assert_eq!(c.id, 1); + assert!(q.pop().is_none()); + + q.push(a); // 2 + q.push(b); // [2 20] + q.push(c); // [2 20] 1 + q.push(i3); // [2 20] 3 1 + q.push(i4); // [2 20] 3 1 4 + q.push(i5); // [2 20] 3 1 5 4 + + let a = q.pop().unwrap(); // 20 3 1 5 4 -> 2 + assert_eq!(a.id, 2); + q.push(a); // [20 2] 3 1 5 4 + + let a = q.pop().unwrap(); // 2 3 1 5 4 -> 20 + assert_eq!(a.id, 20); + let b = q.pop().unwrap(); // 3 1 5 4 -> 2 + assert_eq!(b.id, 2); + q.push(b); // 2 3 1 5 4 + let b = q.pop().unwrap(); // 3 1 5 4 -> 2 + assert_eq!(b.id, 2); + let c = q.pop().unwrap(); // 1 5 4 -> 3 + assert_eq!(c.id, 3); + q.push(b); // 2 1 5 4 + let b = q.pop().unwrap(); // 1 5 4 -> 2 + assert_eq!(b.id, 2); + let e = q.pop().unwrap(); // 5 4 -> 1 + assert_eq!(e.id, 1); + let f = q.pop().unwrap(); // 4 -> 5 + assert_eq!(f.id, 5); + let g = q.pop().unwrap(); // empty -> 4 + assert_eq!(g.id, 4); + assert!(q.pop().is_none()); + } +} -- cgit v1.2.3 From 9dffa812c43470ee8a29c23c3a1be73085e25843 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 17:59:15 +0200 Subject: Refactor send.rs --- src/send.rs | 116 +++++++++++++++++++++++++++++++----------------------------- 1 file changed, 60 insertions(+), 56 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 330d41d..0179eb2 100644 --- a/src/send.rs +++ b/src/send.rs @@ -24,6 +24,7 @@ use crate::util::{ByteStream, Packet}; pub(crate) type RequestID = u32; pub(crate) type ChunkLength = u16; + pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; @@ -62,53 +63,58 @@ enum DataFrame { data: [u8; MAX_CHUNK_LENGTH as usize], /// actual lenght of data len: usize, + /// whethere there may be more data comming from this stream. Can be used for some + /// optimization. It's an error to set it to false if there is more data, but it is correct + /// (albeit sub-optimal) to set it to true if there is nothing coming after + may_have_more: bool, }, + /// An error code automatically signals the end of the stream Error(u8), } -struct DataReaderItem { - data: DataFrame, - /// whethere there may be more data comming from this stream. Can be used for some - /// optimization. It's an error to set it to false if there is more data, but it is correct - /// (albeit sub-optimal) to set it to true if there is nothing coming after - may_have_more: bool, -} - -impl DataReaderItem { +impl DataFrame { fn empty_last() -> Self { - DataReaderItem { - data: DataFrame::Data { - data: [0; MAX_CHUNK_LENGTH as usize], - len: 0, - }, + DataFrame::Data { + data: [0; MAX_CHUNK_LENGTH as usize], + len: 0, may_have_more: false, } } fn header(&self) -> [u8; 2] { - let continuation = if self.may_have_more { - CHUNK_HAS_CONTINUATION - } else { - 0 - }; - let len = match self.data { - DataFrame::Data { len, .. } => len as u16, - DataFrame::Error(e) => e as u16 | ERROR_MARKER, + let header_u16 = match self { + DataFrame::Data { + len, + may_have_more: false, + .. + } => *len as u16, + DataFrame::Data { + len, + may_have_more: true, + .. + } => *len as u16 | CHUNK_HAS_CONTINUATION, + DataFrame::Error(e) => *e as u16 | ERROR_MARKER, }; - - ChunkLength::to_be_bytes(len | continuation) + ChunkLength::to_be_bytes(header_u16) } fn data(&self) -> &[u8] { - match self.data { - DataFrame::Data { ref data, len } => &data[..len], + match self { + DataFrame::Data { ref data, len, .. } => &data[..*len], DataFrame::Error(_) => &[], } } + + fn may_have_more(&self) -> bool { + match self { + DataFrame::Data { may_have_more, .. } => *may_have_more, + DataFrame::Error(_) => false, + } + } } impl Stream for DataReader { - type Item = DataReaderItem; + type Item = DataFrame; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); @@ -125,10 +131,8 @@ impl Stream for DataReader { Err(e) => { let e = *e; *this.packet = Ok(Vec::new()); - return Poll::Ready(Some(DataReaderItem { - data: DataFrame::Error(e), - may_have_more: true, - })); + *this.eos = true; + return Poll::Ready(Some(DataFrame::Error(e))); } }; let packet_left = packet.len() - *this.pos; @@ -161,8 +165,9 @@ impl Stream for DataReader { let len = this.buf.len(); body[..len].copy_from_slice(this.buf); this.buf.clear(); - Poll::Ready(Some(DataReaderItem { - data: DataFrame::Data { data: body, len }, + Poll::Ready(Some(DataFrame::Data { + data: body, + len, may_have_more: !*this.eos, })) } @@ -218,38 +223,37 @@ struct SendQueuePollNextReady<'a> { } impl<'a> futures::Future for SendQueuePollNextReady<'a> { - type Output = (RequestID, DataReaderItem); + type Output = (RequestID, DataFrame); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { for i in 0..self.queue.items.len() { let (_prio, items_at_prio) = &mut self.queue.items[i]; - for _ in 0..items_at_prio.len() { - let mut item = items_at_prio.pop_front().unwrap(); - + let mut ready_item = None; + for (j, item) in items_at_prio.iter_mut().enumerate() { match Pin::new(&mut item.data).poll_next(ctx) { - Poll::Pending => items_at_prio.push_back(item), - Poll::Ready(Some(data)) => { - let id = item.id; - if data.may_have_more { - self.queue.push(item); - } else { - if items_at_prio.is_empty() { - // this priority level is empty, remove it - self.queue.items.remove(i); - } - } - return Poll::Ready((id, data)); - } - Poll::Ready(None) => { - if items_at_prio.is_empty() { - // this priority level is empty, remove it - self.queue.items.remove(i); - } - return Poll::Ready((item.id, DataReaderItem::empty_last())); + Poll::Pending => (), + Poll::Ready(ready_v) => { + ready_item = Some((j, ready_v)); + break; } } } + + if let Some((j, ready_v)) = ready_item { + let item = items_at_prio.remove(j).unwrap(); + let id = item.id; + if ready_v + .as_ref() + .map(|data| data.may_have_more()) + .unwrap_or(false) + { + items_at_prio.push_back(item); + } else if items_at_prio.is_empty() { + self.queue.items.remove(i); + } + return Poll::Ready((id, ready_v.unwrap_or_else(DataFrame::empty_last))); + } } // TODO what do we do if self.queue is empty? We won't get scheduled again. Poll::Pending -- cgit v1.2.3 From 26989bba1409bfc093e58ef98e75885b10ad7c1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 18:15:07 +0200 Subject: Use Bytes instead of Vec --- src/send.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 0179eb2..660e85c 100644 --- a/src/send.rs +++ b/src/send.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use bytes::Bytes; use async_trait::async_trait; use log::trace; @@ -49,7 +50,7 @@ impl From for DataReader { fn from(data: ByteStream) -> DataReader { DataReader { reader: data, - packet: Ok(Vec::new()), + packet: Ok(Bytes::new()), pos: 0, buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize), eos: false, @@ -130,7 +131,7 @@ impl Stream for DataReader { Ok(v) => v, Err(e) => { let e = *e; - *this.packet = Ok(Vec::new()); + *this.packet = Ok(Bytes::new()); *this.eos = true; return Poll::Ready(Some(DataFrame::Error(e))); } -- cgit v1.2.3 From 44bbc1c00c2532e08dff0d4a547b0a707e89f32d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 19:05:51 +0200 Subject: Rename AutoSerialize into SimpleMessage and refactor a bit --- src/send.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 660e85c..cc28d7c 100644 --- a/src/send.rs +++ b/src/send.rs @@ -3,8 +3,8 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use bytes::Bytes; use async_trait::async_trait; +use bytes::Bytes; use log::trace; use futures::AsyncWriteExt; -- cgit v1.2.3 From 0b71ca12f910c17eaf2291076438dff3b70dc9cd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 12:45:38 +0200 Subject: Clean up framing protocol --- src/send.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index cc28d7c..59805cf 100644 --- a/src/send.rs +++ b/src/send.rs @@ -14,7 +14,7 @@ use tokio::sync::mpsc; use crate::error::*; use crate::message::*; -use crate::util::{ByteStream, Packet}; +use crate::stream::*; // Messages are sent by chunks // Chunk format: -- cgit v1.2.3 From f9db9a4b696569bbc56c40b9170320307ebcdd81 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 13:23:42 +0200 Subject: Simplify send.rs --- src/send.rs | 205 +++++++++++++++--------------------------------------------- 1 file changed, 50 insertions(+), 155 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 59805cf..a8cf966 100644 --- a/src/send.rs +++ b/src/send.rs @@ -8,7 +8,6 @@ use bytes::Bytes; use log::trace; use futures::AsyncWriteExt; -use futures::Stream; use kuska_handshake::async_std::BoxStreamWrite; use tokio::sync::mpsc; @@ -30,152 +29,14 @@ pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; +struct SendQueue { + items: VecDeque<(u8, VecDeque)>, +} + struct SendQueueItem { id: RequestID, prio: RequestPriority, - data: DataReader, -} - -#[pin_project::pin_project] -struct DataReader { - #[pin] - reader: ByteStream, - packet: Packet, - pos: usize, - buf: Vec, - eos: bool, -} - -impl From for DataReader { - fn from(data: ByteStream) -> DataReader { - DataReader { - reader: data, - packet: Ok(Bytes::new()), - pos: 0, - buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize), - eos: false, - } - } -} - -enum DataFrame { - Data { - /// a fixed size buffer containing some data, possibly padded with 0s - data: [u8; MAX_CHUNK_LENGTH as usize], - /// actual lenght of data - len: usize, - /// whethere there may be more data comming from this stream. Can be used for some - /// optimization. It's an error to set it to false if there is more data, but it is correct - /// (albeit sub-optimal) to set it to true if there is nothing coming after - may_have_more: bool, - }, - /// An error code automatically signals the end of the stream - Error(u8), -} - -impl DataFrame { - fn empty_last() -> Self { - DataFrame::Data { - data: [0; MAX_CHUNK_LENGTH as usize], - len: 0, - may_have_more: false, - } - } - - fn header(&self) -> [u8; 2] { - let header_u16 = match self { - DataFrame::Data { - len, - may_have_more: false, - .. - } => *len as u16, - DataFrame::Data { - len, - may_have_more: true, - .. - } => *len as u16 | CHUNK_HAS_CONTINUATION, - DataFrame::Error(e) => *e as u16 | ERROR_MARKER, - }; - ChunkLength::to_be_bytes(header_u16) - } - - fn data(&self) -> &[u8] { - match self { - DataFrame::Data { ref data, len, .. } => &data[..*len], - DataFrame::Error(_) => &[], - } - } - - fn may_have_more(&self) -> bool { - match self { - DataFrame::Data { may_have_more, .. } => *may_have_more, - DataFrame::Error(_) => false, - } - } -} - -impl Stream for DataReader { - type Item = DataFrame; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - if *this.eos { - // eos was reached at previous call to poll_next, where a partial packet - // was returned. Now return None - return Poll::Ready(None); - } - - loop { - let packet = match this.packet { - Ok(v) => v, - Err(e) => { - let e = *e; - *this.packet = Ok(Bytes::new()); - *this.eos = true; - return Poll::Ready(Some(DataFrame::Error(e))); - } - }; - let packet_left = packet.len() - *this.pos; - let buf_left = MAX_CHUNK_LENGTH as usize - this.buf.len(); - let to_read = std::cmp::min(buf_left, packet_left); - this.buf - .extend_from_slice(&packet[*this.pos..*this.pos + to_read]); - *this.pos += to_read; - if this.buf.len() == MAX_CHUNK_LENGTH as usize { - // we have a full buf, ready to send - break; - } - - // we don't have a full buf, packet is empty; try receive more - if let Some(p) = futures::ready!(this.reader.as_mut().poll_next(cx)) { - *this.packet = p; - *this.pos = 0; - // if buf is empty, we will loop and return the error directly. If buf - // isn't empty, send it before by breaking. - if this.packet.is_err() && !this.buf.is_empty() { - break; - } - } else { - *this.eos = true; - break; - } - } - - let mut body = [0; MAX_CHUNK_LENGTH as usize]; - let len = this.buf.len(); - body[..len].copy_from_slice(this.buf); - this.buf.clear(); - Poll::Ready(Some(DataFrame::Data { - data: body, - len, - may_have_more: !*this.eos, - })) - } -} - -struct SendQueue { - items: VecDeque<(u8, VecDeque)>, + data: ByteStreamReader, } impl SendQueue { @@ -232,35 +93,69 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> { let mut ready_item = None; for (j, item) in items_at_prio.iter_mut().enumerate() { - match Pin::new(&mut item.data).poll_next(ctx) { + let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); + match Pin::new(&mut item_reader).poll(ctx) { Poll::Pending => (), Poll::Ready(ready_v) => { - ready_item = Some((j, ready_v)); + ready_item = Some((j, ready_v, item.data.eos())); break; } } } - if let Some((j, ready_v)) = ready_item { + if let Some((j, bytes_or_err, eos)) = ready_item { + let data_frame = match bytes_or_err { + Ok(bytes) => DataFrame::Data(bytes, !eos), + Err(e) => DataFrame::Error(match e { + ReadExactError::Stream(code) => code, + _ => unreachable!(), + }), + }; let item = items_at_prio.remove(j).unwrap(); let id = item.id; - if ready_v - .as_ref() - .map(|data| data.may_have_more()) - .unwrap_or(false) - { + if !eos { items_at_prio.push_back(item); } else if items_at_prio.is_empty() { self.queue.items.remove(i); } - return Poll::Ready((id, ready_v.unwrap_or_else(DataFrame::empty_last))); + return Poll::Ready((id, data_frame)); } } - // TODO what do we do if self.queue is empty? We won't get scheduled again. + // If the queue is empty, this futures is eternally pending. + // This is ok because we use it in a select with another future + // that can interrupt it. Poll::Pending } } +enum DataFrame { + /// a fixed size buffer containing some data + a boolean indicating whether + /// there may be more data comming from this stream. Can be used for some + /// optimization. It's an error to set it to false if there is more data, but it is correct + /// (albeit sub-optimal) to set it to true if there is nothing coming after + Data(Bytes, bool), + /// An error code automatically signals the end of the stream + Error(u8), +} + +impl DataFrame { + fn header(&self) -> [u8; 2] { + let header_u16 = match self { + DataFrame::Data(data, false) => data.len() as u16, + DataFrame::Data(data, true) => data.len() as u16 | CHUNK_HAS_CONTINUATION, + DataFrame::Error(e) => *e as u16 | ERROR_MARKER, + }; + ChunkLength::to_be_bytes(header_u16) + } + + fn data(&self) -> &[u8] { + match self { + DataFrame::Data(ref data, _) => &data[..], + DataFrame::Error(_) => &[], + } + } +} + /// The SendLoop trait, which is implemented both by the client and the server /// connection objects (ServerConna and ClientConn) adds a method `.send_loop()` /// that takes a channel of messages to send and an asynchronous writer, @@ -295,7 +190,7 @@ pub(crate) trait SendLoop: Sync { sending.push(SendQueueItem { id, prio, - data: data.into(), + data: ByteStreamReader::new(data), }); } else { should_exit = true; -- cgit v1.2.3 From 50358b944ae7ee4b4aa292ede8bc5d185c86df65 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 13:48:43 +0200 Subject: Cargo fmt; better adapt with_capacity_values --- src/send.rs | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index a8cf966..f1df6f7 100644 --- a/src/send.rs +++ b/src/send.rs @@ -30,7 +30,7 @@ pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; struct SendQueue { - items: VecDeque<(u8, VecDeque)>, + items: Vec<(u8, VecDeque)>, } struct SendQueueItem { @@ -42,7 +42,7 @@ struct SendQueueItem { impl SendQueue { fn new() -> Self { Self { - items: VecDeque::with_capacity(64), + items: Vec::with_capacity(64), } } fn push(&mut self, item: SendQueueItem) { @@ -56,20 +56,6 @@ impl SendQueue { }; self.items[pos_prio].1.push_back(item); } - // used only in tests. They should probably be rewriten - #[allow(dead_code)] - fn pop(&mut self) -> Option { - match self.items.pop_front() { - None => None, - Some((prio, mut items_at_prio)) => { - let ret = items_at_prio.pop_front(); - if !items_at_prio.is_empty() { - self.items.push_front((prio, items_at_prio)); - } - ret.or_else(|| self.pop()) - } - } - } fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) } -- cgit v1.2.3 From c17a5f84ff078826084c3a990f1890461c817346 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 25 Jul 2022 11:06:51 +0200 Subject: Remove broken test --- src/send.rs | 95 ------------------------------------------------------------- 1 file changed, 95 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index f1df6f7..46c4383 100644 --- a/src/send.rs +++ b/src/send.rs @@ -199,98 +199,3 @@ pub(crate) trait SendLoop: Sync { Ok(()) } } - -#[cfg(test)] -mod test { - use super::*; - - fn empty_data() -> DataReader { - type Item = Packet; - let stream: Pin + Send + 'static>> = - Box::pin(futures::stream::empty::()); - stream.into() - } - - #[test] - fn test_priority_queue() { - let i1 = SendQueueItem { - id: 1, - prio: PRIO_NORMAL, - data: empty_data(), - }; - let i2 = SendQueueItem { - id: 2, - prio: PRIO_HIGH, - data: empty_data(), - }; - let i2bis = SendQueueItem { - id: 20, - prio: PRIO_HIGH, - data: empty_data(), - }; - let i3 = SendQueueItem { - id: 3, - prio: PRIO_HIGH | PRIO_SECONDARY, - data: empty_data(), - }; - let i4 = SendQueueItem { - id: 4, - prio: PRIO_BACKGROUND | PRIO_SECONDARY, - data: empty_data(), - }; - let i5 = SendQueueItem { - id: 5, - prio: PRIO_BACKGROUND | PRIO_PRIMARY, - data: empty_data(), - }; - - let mut q = SendQueue::new(); - - q.push(i1); // 1 - let a = q.pop().unwrap(); // empty -> 1 - assert_eq!(a.id, 1); - assert!(q.pop().is_none()); - - q.push(a); // 1 - q.push(i2); // 2 1 - q.push(i2bis); // [2 20] 1 - let a = q.pop().unwrap(); // 20 1 -> 2 - assert_eq!(a.id, 2); - let b = q.pop().unwrap(); // 1 -> 20 - assert_eq!(b.id, 20); - let c = q.pop().unwrap(); // empty -> 1 - assert_eq!(c.id, 1); - assert!(q.pop().is_none()); - - q.push(a); // 2 - q.push(b); // [2 20] - q.push(c); // [2 20] 1 - q.push(i3); // [2 20] 3 1 - q.push(i4); // [2 20] 3 1 4 - q.push(i5); // [2 20] 3 1 5 4 - - let a = q.pop().unwrap(); // 20 3 1 5 4 -> 2 - assert_eq!(a.id, 2); - q.push(a); // [20 2] 3 1 5 4 - - let a = q.pop().unwrap(); // 2 3 1 5 4 -> 20 - assert_eq!(a.id, 20); - let b = q.pop().unwrap(); // 3 1 5 4 -> 2 - assert_eq!(b.id, 2); - q.push(b); // 2 3 1 5 4 - let b = q.pop().unwrap(); // 3 1 5 4 -> 2 - assert_eq!(b.id, 2); - let c = q.pop().unwrap(); // 1 5 4 -> 3 - assert_eq!(c.id, 3); - q.push(b); // 2 1 5 4 - let b = q.pop().unwrap(); // 1 5 4 -> 2 - assert_eq!(b.id, 2); - let e = q.pop().unwrap(); // 5 4 -> 1 - assert_eq!(e.id, 1); - let f = q.pop().unwrap(); // 4 -> 5 - assert_eq!(f.id, 5); - let g = q.pop().unwrap(); // empty -> 4 - assert_eq!(g.id, 4); - assert!(q.pop().is_none()); - } -} -- cgit v1.2.3 From 74e57016f63b6052cf6d539812859c3a46138eee Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 25 Jul 2022 15:04:52 +0200 Subject: Add some debugging --- src/send.rs | 46 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 11 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 46c4383..256fe4c 100644 --- a/src/send.rs +++ b/src/send.rs @@ -74,36 +74,54 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> { type Output = (RequestID, DataFrame); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { - for i in 0..self.queue.items.len() { - let (_prio, items_at_prio) = &mut self.queue.items[i]; - + for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() { let mut ready_item = None; for (j, item) in items_at_prio.iter_mut().enumerate() { let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); match Pin::new(&mut item_reader).poll(ctx) { Poll::Pending => (), Poll::Ready(ready_v) => { - ready_item = Some((j, ready_v, item.data.eos())); + ready_item = Some((j, ready_v)); break; } } } - if let Some((j, bytes_or_err, eos)) = ready_item { + if let Some((j, bytes_or_err)) = ready_item { + let item = items_at_prio.remove(j).unwrap(); + let id = item.id; + let eos = item.data.eos(); + let data_frame = match bytes_or_err { - Ok(bytes) => DataFrame::Data(bytes, !eos), + Ok(bytes) => { + trace!( + "send queue poll next ready: id {} eos {:?} bytes {}", + id, + eos, + bytes.len() + ); + DataFrame::Data(bytes, !eos) + } Err(e) => DataFrame::Error(match e { - ReadExactError::Stream(code) => code, + ReadExactError::Stream(code) => { + trace!( + "send queue poll next ready: id {} eos {:?} ERROR {}", + id, + eos, + code + ); + code + } _ => unreachable!(), }), }; - let item = items_at_prio.remove(j).unwrap(); - let id = item.id; - if !eos { + + if !eos && !matches!(data_frame, DataFrame::Error(_)) { items_at_prio.push_back(item); } else if items_at_prio.is_empty() { self.queue.items.remove(i); } + return Poll::Ready((id, data_frame)); } } @@ -173,6 +191,7 @@ pub(crate) trait SendLoop: Sync { match futures::future::select(recv_fut, send_fut).await { Either::Left((sth, _send_fut)) => { if let Some((id, prio, data)) = sth { + trace!("send_loop: add stream {} to send", id); sending.push(SendQueueItem { id, prio, @@ -183,7 +202,12 @@ pub(crate) trait SendLoop: Sync { }; } Either::Right(((id, data), _recv_fut)) => { - trace!("send_loop: sending bytes for {}", id); + trace!( + "send_loop: id {}, send {} bytes, header_size {}", + id, + data.data().len(), + hex::encode(data.header()) + ); let header_id = RequestID::to_be_bytes(id); write.write_all(&header_id[..]).await?; -- cgit v1.2.3 From b55f61c38b01da01314d99ced543aba713dbd2a9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 26 Jul 2022 12:11:48 +0200 Subject: Fix things going wrong when sending chan is closed --- src/send.rs | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 256fe4c..fd415c6 100644 --- a/src/send.rs +++ b/src/send.rs @@ -5,7 +5,7 @@ use std::task::{Context, Poll}; use async_trait::async_trait; use bytes::Bytes; -use log::trace; +use log::*; use futures::AsyncWriteExt; use kuska_handshake::async_std::BoxStreamWrite; @@ -172,24 +172,38 @@ impl DataFrame { pub(crate) trait SendLoop: Sync { async fn send_loop( self: Arc, - mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>, + msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>, mut write: BoxStreamWrite, ) -> Result<(), Error> where W: AsyncWriteExt + Unpin + Send + Sync, { let mut sending = SendQueue::new(); - let mut should_exit = false; - while !should_exit || !sending.is_empty() { - let recv_fut = msg_recv.recv(); - futures::pin_mut!(recv_fut); + let mut msg_recv = Some(msg_recv); + while msg_recv.is_some() || !sending.is_empty() { + debug!( + "Sending: {:?}", + sending + .items + .iter() + .map(|(_, i)| i.iter().map(|x| x.id)) + .flatten() + .collect::>() + ); + + let recv_fut = async { + if let Some(chan) = &mut msg_recv { + chan.recv().await + } else { + futures::future::pending().await + } + }; let send_fut = sending.next_ready(); // recv_fut is cancellation-safe according to tokio doc, // send_fut is cancellation-safe as implemented above? - use futures::future::Either; - match futures::future::select(recv_fut, send_fut).await { - Either::Left((sth, _send_fut)) => { + tokio::select! { + sth = recv_fut => { if let Some((id, prio, data)) = sth { trace!("send_loop: add stream {} to send", id); sending.push(SendQueueItem { @@ -198,10 +212,10 @@ pub(crate) trait SendLoop: Sync { data: ByteStreamReader::new(data), }); } else { - should_exit = true; + msg_recv = None; }; } - Either::Right(((id, data), _recv_fut)) => { + (id, data) = send_fut => { trace!( "send_loop: id {}, send {} bytes, header_size {}", id, -- cgit v1.2.3 From 7909a95d3c02a738c9a088c1cb8a5d6f70b06046 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 11:21:24 +0200 Subject: Stream errors are now std::io::Error --- src/send.rs | 61 ++++++++++++++++++++++++++++++++----------------------------- 1 file changed, 32 insertions(+), 29 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index fd415c6..f362962 100644 --- a/src/send.rs +++ b/src/send.rs @@ -18,9 +18,11 @@ use crate::stream::*; // Messages are sent by chunks // Chunk format: // - u32 BE: request id (same for request and response) -// - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag -// when this is not the last chunk of the message -// - [u8; chunk_length] chunk data +// - u16 BE: chunk length + flags: +// CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream +// ERROR_MARKER if this chunk denotes an error +// (these two flags are exclusive, an error denotes the end of the stream) +// - [u8; chunk_length] chunk data / error message pub(crate) type RequestID = u32; pub(crate) type ChunkLength = u16; @@ -28,6 +30,7 @@ pub(crate) type ChunkLength = u16; pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; +pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF; struct SendQueue { items: Vec<(u8, VecDeque)>, @@ -92,29 +95,12 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> { let id = item.id; let eos = item.data.eos(); - let data_frame = match bytes_or_err { - Ok(bytes) => { - trace!( - "send queue poll next ready: id {} eos {:?} bytes {}", - id, - eos, - bytes.len() - ); - DataFrame::Data(bytes, !eos) - } - Err(e) => DataFrame::Error(match e { - ReadExactError::Stream(code) => { - trace!( - "send queue poll next ready: id {} eos {:?} ERROR {}", - id, - eos, - code - ); - code - } - _ => unreachable!(), - }), - }; + let packet = bytes_or_err.map_err(|e| match e { + ReadExactError::Stream(err) => err, + _ => unreachable!(), + }); + + let data_frame = DataFrame::from_packet(packet, !eos); if !eos && !matches!(data_frame, DataFrame::Error(_)) { items_at_prio.push_back(item); @@ -139,15 +125,32 @@ enum DataFrame { /// (albeit sub-optimal) to set it to true if there is nothing coming after Data(Bytes, bool), /// An error code automatically signals the end of the stream - Error(u8), + Error(Bytes), } impl DataFrame { + fn from_packet(p: Packet, has_cont: bool) -> Self { + match p { + Ok(bytes) => { + assert!(bytes.len() <= MAX_CHUNK_LENGTH as usize); + Self::Data(bytes, has_cont) + } + Err(e) => { + let msg = format!("{}", e); + let mut msg = Bytes::from(msg.into_bytes()); + if msg.len() > MAX_CHUNK_LENGTH as usize { + msg = msg.slice(..MAX_CHUNK_LENGTH as usize); + } + Self::Error(msg) + } + } + } + fn header(&self) -> [u8; 2] { let header_u16 = match self { DataFrame::Data(data, false) => data.len() as u16, DataFrame::Data(data, true) => data.len() as u16 | CHUNK_HAS_CONTINUATION, - DataFrame::Error(e) => *e as u16 | ERROR_MARKER, + DataFrame::Error(msg) => msg.len() as u16 | ERROR_MARKER, }; ChunkLength::to_be_bytes(header_u16) } @@ -155,7 +158,7 @@ impl DataFrame { fn data(&self) -> &[u8] { match self { DataFrame::Data(ref data, _) => &data[..], - DataFrame::Error(_) => &[], + DataFrame::Error(ref msg) => &msg[..], } } } -- cgit v1.2.3 From 745c78618479c4177647e4d7fed97d5fd2d00d4f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 11:34:53 +0200 Subject: Also encode errorkind in stream --- src/send.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index f362962..287fe40 100644 --- a/src/send.rs +++ b/src/send.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_trait::async_trait; -use bytes::Bytes; +use bytes::{Bytes, BytesMut, BufMut}; use log::*; use futures::AsyncWriteExt; @@ -22,7 +22,11 @@ use crate::stream::*; // CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream // ERROR_MARKER if this chunk denotes an error // (these two flags are exclusive, an error denotes the end of the stream) -// - [u8; chunk_length] chunk data / error message +// - [u8; chunk_length], either +// - if not error: chunk data +// - if error: +// - u8: error kind, encoded using error::io_errorkind_to_u8 +// - rest: error message pub(crate) type RequestID = u32; pub(crate) type ChunkLength = u16; @@ -136,12 +140,17 @@ impl DataFrame { Self::Data(bytes, has_cont) } Err(e) => { - let msg = format!("{}", e); - let mut msg = Bytes::from(msg.into_bytes()); - if msg.len() > MAX_CHUNK_LENGTH as usize { - msg = msg.slice(..MAX_CHUNK_LENGTH as usize); + let mut buf = BytesMut::new(); + buf.put_u8(io_errorkind_to_u8(e.kind())); + + let msg = format!("{}", e).into_bytes(); + if msg.len() > (MAX_CHUNK_LENGTH - 1) as usize { + buf.put(&msg[..(MAX_CHUNK_LENGTH - 1) as usize]); + } else { + buf.put(&msg[..]); } - Self::Error(msg) + + Self::Error(buf.freeze()) } } } -- cgit v1.2.3 From cd203f5708907c2bf172a3c5b7c5b40e2557b2f4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:15:50 +0200 Subject: Add OrderTag to Req and Resp, refactor errors --- src/send.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 287fe40..c40787f 100644 --- a/src/send.rs +++ b/src/send.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_trait::async_trait; -use bytes::{Bytes, BytesMut, BufMut}; +use bytes::{BufMut, Bytes, BytesMut}; use log::*; use futures::AsyncWriteExt; @@ -36,6 +36,8 @@ pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF; +pub(crate) type SendStream = (RequestID, RequestPriority, ByteStream); + struct SendQueue { items: Vec<(u8, VecDeque)>, } @@ -184,7 +186,7 @@ impl DataFrame { pub(crate) trait SendLoop: Sync { async fn send_loop( self: Arc, - msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>, + msg_recv: mpsc::UnboundedReceiver, mut write: BoxStreamWrite, ) -> Result<(), Error> where -- cgit v1.2.3 From 4a59b73d7bfd0f136f654e874afb5d2a9bf4df2e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:46:33 +0200 Subject: Add actual support for order tag --- src/send.rs | 112 ++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 78 insertions(+), 34 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index c40787f..ea6cf9f 100644 --- a/src/send.rs +++ b/src/send.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -7,7 +7,7 @@ use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use log::*; -use futures::AsyncWriteExt; +use futures::{AsyncWriteExt, Future}; use kuska_handshake::async_std::BoxStreamWrite; use tokio::sync::mpsc; @@ -36,15 +36,21 @@ pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF; -pub(crate) type SendStream = (RequestID, RequestPriority, ByteStream); +pub(crate) type SendStream = (RequestID, RequestPriority, Option, ByteStream); struct SendQueue { - items: Vec<(u8, VecDeque)>, + items: Vec<(u8, SendQueuePriority)>, +} + +struct SendQueuePriority { + items: VecDeque, + order: HashMap>, } struct SendQueueItem { id: RequestID, prio: RequestPriority, + order_tag: Option, data: ByteStreamReader, } @@ -59,11 +65,11 @@ impl SendQueue { let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) { Ok(i) => i, Err(i) => { - self.items.insert(i, (prio, VecDeque::new())); + self.items.insert(i, (prio, SendQueuePriority::new())); i } }; - self.items[pos_prio].1.push_back(item); + self.items[pos_prio].1.push(item); } fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) @@ -75,29 +81,34 @@ impl SendQueue { } } -struct SendQueuePollNextReady<'a> { - queue: &'a mut SendQueue, -} - -impl<'a> futures::Future for SendQueuePollNextReady<'a> { - type Output = (RequestID, DataFrame); - - fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { - for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() { - let mut ready_item = None; - for (j, item) in items_at_prio.iter_mut().enumerate() { - let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); - match Pin::new(&mut item_reader).poll(ctx) { - Poll::Pending => (), - Poll::Ready(ready_v) => { - ready_item = Some((j, ready_v)); - break; - } +impl SendQueuePriority { + fn new() -> Self { + Self { + items: VecDeque::new(), + order: HashMap::new(), + } + } + fn push(&mut self, item: SendQueueItem) { + if let Some(OrderTag(stream, order)) = item.order_tag { + let order_vec = self.order.entry(stream).or_default(); + let i = order_vec.iter().take_while(|o2| **o2 < order).count(); + order_vec.insert(i, order); + } + self.items.push_back(item); + } + fn is_empty(&self) -> bool { + self.items.is_empty() + } + fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> { + for (j, item) in self.items.iter_mut().enumerate() { + if let Some(OrderTag(stream, order)) = item.order_tag { + if order > *self.order.get(&stream).unwrap().front().unwrap() { + continue; } } - if let Some((j, bytes_or_err)) = ready_item { - let item = items_at_prio.remove(j).unwrap(); + let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); + if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) { let id = item.id; let eos = item.data.eos(); @@ -106,15 +117,47 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> { _ => unreachable!(), }); + if eos || packet.is_err() { + if let Some(OrderTag(stream, order)) = item.order_tag { + assert_eq!( + self.order.get_mut(&stream).unwrap().pop_front(), + Some(order) + ) + } + self.items.remove(j); + } + let data_frame = DataFrame::from_packet(packet, !eos); - if !eos && !matches!(data_frame, DataFrame::Error(_)) { - items_at_prio.push_back(item); - } else if items_at_prio.is_empty() { + return Poll::Ready((id, data_frame)); + } + } + + Poll::Pending + } + fn dump(&self, prio: u8) -> String { + self.items + .iter() + .map(|i| format!("[{} {} {:?}]", prio, i.id, i.order_tag)) + .collect::>() + .join(" ") + } +} + +struct SendQueuePollNextReady<'a> { + queue: &'a mut SendQueue, +} + +impl<'a> futures::Future for SendQueuePollNextReady<'a> { + type Output = (RequestID, DataFrame); + + fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() { + if let Poll::Ready(res) = items_at_prio.poll_next_ready(ctx) { + if items_at_prio.is_empty() { self.queue.items.remove(i); } - - return Poll::Ready((id, data_frame)); + return Poll::Ready(res); } } // If the queue is empty, this futures is eternally pending. @@ -200,8 +243,7 @@ pub(crate) trait SendLoop: Sync { sending .items .iter() - .map(|(_, i)| i.iter().map(|x| x.id)) - .flatten() + .map(|(prio, i)| i.dump(*prio)) .collect::>() ); @@ -217,12 +259,14 @@ pub(crate) trait SendLoop: Sync { // recv_fut is cancellation-safe according to tokio doc, // send_fut is cancellation-safe as implemented above? tokio::select! { + biased; // always read incomming channel first if it has data sth = recv_fut => { - if let Some((id, prio, data)) = sth { + if let Some((id, prio, order_tag, data)) = sth { trace!("send_loop: add stream {} to send", id); sending.push(SendQueueItem { id, prio, + order_tag, data: ByteStreamReader::new(data), }); } else { -- cgit v1.2.3 From 32925667385db9e1d9e56ebae67d03d8096f7c46 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 14:43:27 +0200 Subject: fix trace message --- src/send.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 3b01cb5..d927d98 100644 --- a/src/send.rs +++ b/src/send.rs @@ -247,6 +247,7 @@ pub(crate) trait SendLoop: Sync { .iter() .map(|(prio, i)| i.dump(*prio)) .collect::>() + .join(" ; ") ); let recv_fut = async { -- cgit v1.2.3 From 522f420e2bf30d5ef6f50dccb88adf86882ac7c6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 15:54:11 +0200 Subject: Implement request cancellation --- src/send.rs | 57 +++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 12 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index d927d98..780bbcf 100644 --- a/src/send.rs +++ b/src/send.rs @@ -22,6 +22,7 @@ use crate::stream::*; // CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream // ERROR_MARKER if this chunk denotes an error // (these two flags are exclusive, an error denotes the end of the stream) +// **special value** 0xFFFF indicates a CANCEL message // - [u8; chunk_length], either // - if not error: chunk data // - if error: @@ -35,8 +36,14 @@ pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF; +pub(crate) const CANCEL_REQUEST: ChunkLength = 0xFFFF; -pub(crate) type SendStream = (RequestID, RequestPriority, Option, ByteStream); +pub(crate) enum SendItem { + Stream(RequestID, RequestPriority, Option, ByteStream), + Cancel(RequestID), +} + +// ---- struct SendQueue { items: Vec<(u8, SendQueuePriority)>, @@ -71,6 +78,11 @@ impl SendQueue { }; self.items[pos_prio].1.push(item); } + fn remove(&mut self, id: RequestID) { + for (_, prioq) in self.items.iter_mut() { + prioq.remove(id); + } + } fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) } @@ -96,6 +108,16 @@ impl SendQueuePriority { } self.items.push_back(item); } + fn remove(&mut self, id: RequestID) { + if let Some(i) = self.items.iter().position(|x| x.id == id) { + let item = self.items.remove(i).unwrap(); + if let Some(OrderTag(stream, order)) = item.order_tag { + let order_vec = self.order.get_mut(&stream).unwrap(); + let j = order_vec.iter().position(|x| *x == order).unwrap(); + order_vec.remove(j).unwrap(); + } + } + } fn is_empty(&self) -> bool { self.items.is_empty() } @@ -229,7 +251,7 @@ impl DataFrame { pub(crate) trait SendLoop: Sync { async fn send_loop( self: Arc, - msg_recv: mpsc::UnboundedReceiver, + msg_recv: mpsc::UnboundedReceiver, mut write: BoxStreamWrite, debug_name: String, ) -> Result<(), Error> @@ -264,16 +286,27 @@ pub(crate) trait SendLoop: Sync { tokio::select! { biased; // always read incomming channel first if it has data sth = recv_fut => { - if let Some((id, prio, order_tag, data)) = sth { - trace!("send_loop({}): add stream {} to send", debug_name, id); - sending.push(SendQueueItem { - id, - prio, - order_tag, - data: ByteStreamReader::new(data), - }); - } else { - msg_recv = None; + match sth { + Some(SendItem::Stream(id, prio, order_tag, data)) => { + trace!("send_loop({}): add stream {} to send", debug_name, id); + sending.push(SendQueueItem { + id, + prio, + order_tag, + data: ByteStreamReader::new(data), + }) + } + Some(SendItem::Cancel(id)) => { + trace!("send_loop({}): cancelling {}", debug_name, id); + sending.remove(id); + let header_id = RequestID::to_be_bytes(id); + write.write_all(&header_id[..]).await?; + write.write_all(&ChunkLength::to_be_bytes(CANCEL_REQUEST)).await?; + write.flush().await?; + } + None => { + msg_recv = None; + } }; } (id, data) = send_fut => { -- cgit v1.2.3 From 0f799a7768997c37e3e1b6861c097c4cd934acde Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 19:42:49 +0200 Subject: Implement Least Attained First scheduling of streams --- src/send.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 780bbcf..2606434 100644 --- a/src/send.rs +++ b/src/send.rs @@ -59,6 +59,7 @@ struct SendQueueItem { prio: RequestPriority, order_tag: Option, data: ByteStreamReader, + sent: usize, } impl SendQueue { @@ -106,7 +107,7 @@ impl SendQueuePriority { let i = order_vec.iter().take_while(|o2| **o2 < order).count(); order_vec.insert(i, order); } - self.items.push_back(item); + self.items.push_front(item); } fn remove(&mut self, id: RequestID) { if let Some(i) = self.items.iter().position(|x| x.id == id) { @@ -139,7 +140,11 @@ impl SendQueuePriority { _ => unreachable!(), }); - if eos || packet.is_err() { + let is_err = packet.is_err(); + let data_frame = DataFrame::from_packet(packet, !eos); + item.sent += data_frame.data().len(); + + if eos || is_err { if let Some(OrderTag(stream, order)) = item.order_tag { assert_eq!( self.order.get_mut(&stream).unwrap().pop_front(), @@ -147,10 +152,16 @@ impl SendQueuePriority { ) } self.items.remove(j); + } else { + for k in j..self.items.len() - 1 { + if self.items[k].sent >= self.items[k + 1].sent { + self.items.swap(k, k + 1); + } else { + break; + } + } } - let data_frame = DataFrame::from_packet(packet, !eos); - return Poll::Ready((id, data_frame)); } } @@ -160,7 +171,7 @@ impl SendQueuePriority { fn dump(&self, prio: u8) -> String { self.items .iter() - .map(|i| format!("[{} {} {:?}]", prio, i.id, i.order_tag)) + .map(|i| format!("[{} {} {:?} @{}]", prio, i.id, i.order_tag, i.sent)) .collect::>() .join(" ") } @@ -294,6 +305,7 @@ pub(crate) trait SendLoop: Sync { prio, order_tag, data: ByteStreamReader::new(data), + sent: 0, }) } Some(SendItem::Cancel(id)) => { -- cgit v1.2.3 From 18d5abc981faf2d76ced42bad5cb69aa83128832 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 12:20:49 +0200 Subject: add precision to protocol description --- src/send.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 2606434..0ca62fd 100644 --- a/src/send.rs +++ b/src/send.rs @@ -28,6 +28,7 @@ use crate::stream::*; // - if error: // - u8: error kind, encoded using error::io_errorkind_to_u8 // - rest: error message +// - absent for cancel message pub(crate) type RequestID = u32; pub(crate) type ChunkLength = u16; -- cgit v1.2.3 From c00676feba3819883b2888799d5f743c4ca9bca0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 12:25:37 +0200 Subject: Uniformize flag naming --- src/send.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 0ca62fd..af5f00c 100644 --- a/src/send.rs +++ b/src/send.rs @@ -19,8 +19,8 @@ use crate::stream::*; // Chunk format: // - u32 BE: request id (same for request and response) // - u16 BE: chunk length + flags: -// CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream -// ERROR_MARKER if this chunk denotes an error +// CHUNK_FLAG_HAS_CONTINUATION when this is not the last chunk of the stream +// CHUNK_FLAG_ERROR if this chunk denotes an error // (these two flags are exclusive, an error denotes the end of the stream) // **special value** 0xFFFF indicates a CANCEL message // - [u8; chunk_length], either @@ -28,14 +28,14 @@ use crate::stream::*; // - if error: // - u8: error kind, encoded using error::io_errorkind_to_u8 // - rest: error message -// - absent for cancel message +// - absent for cancel messag pub(crate) type RequestID = u32; pub(crate) type ChunkLength = u16; pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; -pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; -pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; +pub(crate) const CHUNK_FLAG_ERROR: ChunkLength = 0x4000; +pub(crate) const CHUNK_FLAG_HAS_CONTINUATION: ChunkLength = 0x8000; pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF; pub(crate) const CANCEL_REQUEST: ChunkLength = 0xFFFF; @@ -237,8 +237,8 @@ impl DataFrame { fn header(&self) -> [u8; 2] { let header_u16 = match self { DataFrame::Data(data, false) => data.len() as u16, - DataFrame::Data(data, true) => data.len() as u16 | CHUNK_HAS_CONTINUATION, - DataFrame::Error(msg) => msg.len() as u16 | ERROR_MARKER, + DataFrame::Data(data, true) => data.len() as u16 | CHUNK_FLAG_HAS_CONTINUATION, + DataFrame::Error(msg) => msg.len() as u16 | CHUNK_FLAG_ERROR, }; ChunkLength::to_be_bytes(header_u16) } -- cgit v1.2.3 From b509e6057f850971e3339404cfd2240193871402 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 12:28:01 +0200 Subject: Missing cleanup --- src/send.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index af5f00c..4e16179 100644 --- a/src/send.rs +++ b/src/send.rs @@ -84,6 +84,7 @@ impl SendQueue { for (_, prioq) in self.items.iter_mut() { prioq.remove(id); } + self.items.retain(|(_prio, q)| !q.is_empty()); } fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) -- cgit v1.2.3 From 395f942fc745f5947005cad3a0e2ac15403fdbc9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 12:34:03 +0200 Subject: Fix potential memory leak --- src/send.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'src/send.rs') diff --git a/src/send.rs b/src/send.rs index 4e16179..0db0ba7 100644 --- a/src/send.rs +++ b/src/send.rs @@ -118,6 +118,9 @@ impl SendQueuePriority { let order_vec = self.order.get_mut(&stream).unwrap(); let j = order_vec.iter().position(|x| *x == order).unwrap(); order_vec.remove(j).unwrap(); + if order_vec.is_empty() { + self.order.remove(&stream); + } } } } @@ -147,14 +150,19 @@ impl SendQueuePriority { item.sent += data_frame.data().len(); if eos || is_err { + // If item had an order tag, remove it from the corresponding ordering list if let Some(OrderTag(stream, order)) = item.order_tag { - assert_eq!( - self.order.get_mut(&stream).unwrap().pop_front(), - Some(order) - ) + let order_stream = self.order.get_mut(&stream).unwrap(); + assert_eq!(order_stream.pop_front(), Some(order)); + if order_stream.is_empty() { + self.order.remove(&stream); + } } + // Remove item from sending queue self.items.remove(j); } else { + // Move item later in send queue to implement LAS scheduling + // (LAS = Least Attained Service) for k in j..self.items.len() - 1 { if self.items[k].sent >= self.items[k + 1].sent { self.items.swap(k, k + 1); -- cgit v1.2.3