aboutsummaryrefslogtreecommitdiff
path: root/src/send.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-01 15:54:11 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-01 15:54:11 +0200
commit522f420e2bf30d5ef6f50dccb88adf86882ac7c6 (patch)
treea4a4085e8bdf9d3699bba96d4350bfe2039290e5 /src/send.rs
parent32925667385db9e1d9e56ebae67d03d8096f7c46 (diff)
downloadnetapp-522f420e2bf30d5ef6f50dccb88adf86882ac7c6.tar.gz
netapp-522f420e2bf30d5ef6f50dccb88adf86882ac7c6.zip
Implement request cancellation
Diffstat (limited to 'src/send.rs')
-rw-r--r--src/send.rs57
1 files changed, 45 insertions, 12 deletions
diff --git a/src/send.rs b/src/send.rs
index d927d98..780bbcf 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -22,6 +22,7 @@ use crate::stream::*;
// CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream
// ERROR_MARKER if this chunk denotes an error
// (these two flags are exclusive, an error denotes the end of the stream)
+// **special value** 0xFFFF indicates a CANCEL message
// - [u8; chunk_length], either
// - if not error: chunk data
// - if error:
@@ -35,8 +36,14 @@ pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0;
pub(crate) const ERROR_MARKER: ChunkLength = 0x4000;
pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF;
+pub(crate) const CANCEL_REQUEST: ChunkLength = 0xFFFF;
-pub(crate) type SendStream = (RequestID, RequestPriority, Option<OrderTag>, ByteStream);
+pub(crate) enum SendItem {
+ Stream(RequestID, RequestPriority, Option<OrderTag>, ByteStream),
+ Cancel(RequestID),
+}
+
+// ----
struct SendQueue {
items: Vec<(u8, SendQueuePriority)>,
@@ -71,6 +78,11 @@ impl SendQueue {
};
self.items[pos_prio].1.push(item);
}
+ fn remove(&mut self, id: RequestID) {
+ for (_, prioq) in self.items.iter_mut() {
+ prioq.remove(id);
+ }
+ }
fn is_empty(&self) -> bool {
self.items.iter().all(|(_k, v)| v.is_empty())
}
@@ -96,6 +108,16 @@ impl SendQueuePriority {
}
self.items.push_back(item);
}
+ fn remove(&mut self, id: RequestID) {
+ if let Some(i) = self.items.iter().position(|x| x.id == id) {
+ let item = self.items.remove(i).unwrap();
+ if let Some(OrderTag(stream, order)) = item.order_tag {
+ let order_vec = self.order.get_mut(&stream).unwrap();
+ let j = order_vec.iter().position(|x| *x == order).unwrap();
+ order_vec.remove(j).unwrap();
+ }
+ }
+ }
fn is_empty(&self) -> bool {
self.items.is_empty()
}
@@ -229,7 +251,7 @@ impl DataFrame {
pub(crate) trait SendLoop: Sync {
async fn send_loop<W>(
self: Arc<Self>,
- msg_recv: mpsc::UnboundedReceiver<SendStream>,
+ msg_recv: mpsc::UnboundedReceiver<SendItem>,
mut write: BoxStreamWrite<W>,
debug_name: String,
) -> Result<(), Error>
@@ -264,16 +286,27 @@ pub(crate) trait SendLoop: Sync {
tokio::select! {
biased; // always read incomming channel first if it has data
sth = recv_fut => {
- if let Some((id, prio, order_tag, data)) = sth {
- trace!("send_loop({}): add stream {} to send", debug_name, id);
- sending.push(SendQueueItem {
- id,
- prio,
- order_tag,
- data: ByteStreamReader::new(data),
- });
- } else {
- msg_recv = None;
+ match sth {
+ Some(SendItem::Stream(id, prio, order_tag, data)) => {
+ trace!("send_loop({}): add stream {} to send", debug_name, id);
+ sending.push(SendQueueItem {
+ id,
+ prio,
+ order_tag,
+ data: ByteStreamReader::new(data),
+ })
+ }
+ Some(SendItem::Cancel(id)) => {
+ trace!("send_loop({}): cancelling {}", debug_name, id);
+ sending.remove(id);
+ let header_id = RequestID::to_be_bytes(id);
+ write.write_all(&header_id[..]).await?;
+ write.write_all(&ChunkLength::to_be_bytes(CANCEL_REQUEST)).await?;
+ write.flush().await?;
+ }
+ None => {
+ msg_recv = None;
+ }
};
}
(id, data) = send_fut => {