diff options
Diffstat (limited to 'src/send.rs')
-rw-r--r-- | src/send.rs | 57 |
1 files changed, 45 insertions, 12 deletions
diff --git a/src/send.rs b/src/send.rs index d927d98..780bbcf 100644 --- a/src/send.rs +++ b/src/send.rs @@ -22,6 +22,7 @@ use crate::stream::*; // CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream // ERROR_MARKER if this chunk denotes an error // (these two flags are exclusive, an error denotes the end of the stream) +// **special value** 0xFFFF indicates a CANCEL message // - [u8; chunk_length], either // - if not error: chunk data // - if error: @@ -35,8 +36,14 @@ pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF; +pub(crate) const CANCEL_REQUEST: ChunkLength = 0xFFFF; -pub(crate) type SendStream = (RequestID, RequestPriority, Option<OrderTag>, ByteStream); +pub(crate) enum SendItem { + Stream(RequestID, RequestPriority, Option<OrderTag>, ByteStream), + Cancel(RequestID), +} + +// ---- struct SendQueue { items: Vec<(u8, SendQueuePriority)>, @@ -71,6 +78,11 @@ impl SendQueue { }; self.items[pos_prio].1.push(item); } + fn remove(&mut self, id: RequestID) { + for (_, prioq) in self.items.iter_mut() { + prioq.remove(id); + } + } fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) } @@ -96,6 +108,16 @@ impl SendQueuePriority { } self.items.push_back(item); } + fn remove(&mut self, id: RequestID) { + if let Some(i) = self.items.iter().position(|x| x.id == id) { + let item = self.items.remove(i).unwrap(); + if let Some(OrderTag(stream, order)) = item.order_tag { + let order_vec = self.order.get_mut(&stream).unwrap(); + let j = order_vec.iter().position(|x| *x == order).unwrap(); + order_vec.remove(j).unwrap(); + } + } + } fn is_empty(&self) -> bool { self.items.is_empty() } @@ -229,7 +251,7 @@ impl DataFrame { pub(crate) trait SendLoop: Sync { async fn send_loop<W>( self: Arc<Self>, - msg_recv: mpsc::UnboundedReceiver<SendStream>, + msg_recv: mpsc::UnboundedReceiver<SendItem>, mut write: BoxStreamWrite<W>, debug_name: String, ) -> Result<(), Error> @@ -264,16 +286,27 @@ pub(crate) trait SendLoop: Sync { tokio::select! { biased; // always read incomming channel first if it has data sth = recv_fut => { - if let Some((id, prio, order_tag, data)) = sth { - trace!("send_loop({}): add stream {} to send", debug_name, id); - sending.push(SendQueueItem { - id, - prio, - order_tag, - data: ByteStreamReader::new(data), - }); - } else { - msg_recv = None; + match sth { + Some(SendItem::Stream(id, prio, order_tag, data)) => { + trace!("send_loop({}): add stream {} to send", debug_name, id); + sending.push(SendQueueItem { + id, + prio, + order_tag, + data: ByteStreamReader::new(data), + }) + } + Some(SendItem::Cancel(id)) => { + trace!("send_loop({}): cancelling {}", debug_name, id); + sending.remove(id); + let header_id = RequestID::to_be_bytes(id); + write.write_all(&header_id[..]).await?; + write.write_all(&ChunkLength::to_be_bytes(CANCEL_REQUEST)).await?; + write.flush().await?; + } + None => { + msg_recv = None; + } }; } (id, data) = send_fut => { |