aboutsummaryrefslogtreecommitdiff
path: root/src/proto.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-13 17:12:13 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-13 17:12:13 +0200
commit70839d70d86354232f168e63ce4062219acb85c7 (patch)
tree9c956af0339aa048f487c3a4e54c320be8d13647 /src/proto.rs
parent8dede69dee20b812ad1dcab5b374c60232409f4f (diff)
downloadnetapp-70839d70d86354232f168e63ce4062219acb85c7.tar.gz
netapp-70839d70d86354232f168e63ce4062219acb85c7.zip
Try to handle termination and closing of stuff properly
Diffstat (limited to 'src/proto.rs')
-rw-r--r--src/proto.rs56
1 files changed, 25 insertions, 31 deletions
diff --git a/src/proto.rs b/src/proto.rs
index 3811e3f..f91ffc7 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, HashMap, VecDeque};
+use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use log::trace;
@@ -50,7 +50,6 @@ type ChunkLength = u16;
const MAX_CHUNK_LENGTH: ChunkLength = 0x4000;
const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
-
struct SendQueueItem {
id: RequestID,
prio: RequestPriority,
@@ -59,31 +58,33 @@ struct SendQueueItem {
}
struct SendQueue {
- items: BTreeMap<u8, VecDeque<SendQueueItem>>,
+ items: VecDeque<(u8, VecDeque<SendQueueItem>)>,
}
impl SendQueue {
fn new() -> Self {
Self {
- items: BTreeMap::new(),
+ items: VecDeque::with_capacity(64),
}
}
fn push(&mut self, item: SendQueueItem) {
let prio = item.prio;
- let mut items_at_prio = self
- .items
- .remove(&prio)
- .unwrap_or_else(|| VecDeque::with_capacity(4));
- items_at_prio.push_back(item);
- self.items.insert(prio, items_at_prio);
+ let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) {
+ Ok(i) => i,
+ Err(i) => {
+ self.items.insert(i, (prio, VecDeque::new()));
+ i
+ }
+ };
+ self.items[pos_prio].1.push_back(item);
}
fn pop(&mut self) -> Option<SendQueueItem> {
- match self.items.pop_first() {
+ match self.items.pop_front() {
None => None,
Some((prio, mut items_at_prio)) => {
let ret = items_at_prio.pop_front();
if !items_at_prio.is_empty() {
- self.items.insert(prio, items_at_prio);
+ self.items.push_front((prio, items_at_prio));
}
ret.or_else(|| self.pop())
}
@@ -98,7 +99,7 @@ impl SendQueue {
pub(crate) trait SendLoop: Sync {
async fn send_loop<W>(
self: Arc<Self>,
- mut msg_recv: mpsc::UnboundedReceiver<Option<(RequestID, RequestPriority, Vec<u8>)>>,
+ mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
mut write: W,
) -> Result<(), Error>
where
@@ -107,18 +108,14 @@ pub(crate) trait SendLoop: Sync {
let mut sending = SendQueue::new();
let mut should_exit = false;
while !should_exit || !sending.is_empty() {
- if let Ok(sth) = msg_recv.try_recv() {
- if let Some((id, prio, data)) = sth {
- trace!("send_loop: got {}, {} bytes", id, data.len());
- sending.push(SendQueueItem {
- id,
- prio,
- data,
- cursor: 0,
- });
- } else {
- should_exit = true;
- }
+ if let Ok((id, prio, data)) = msg_recv.try_recv() {
+ trace!("send_loop: got {}, {} bytes", id, data.len());
+ sending.push(SendQueueItem {
+ id,
+ prio,
+ data,
+ cursor: 0,
+ });
} else if let Some(mut item) = sending.pop() {
trace!(
"send_loop: sending bytes for {} ({} bytes, {} already sent)",
@@ -149,10 +146,7 @@ pub(crate) trait SendLoop: Sync {
}
write.flush().await?;
} else {
- let sth = msg_recv
- .recv()
- .await
- .ok_or_else(|| Error::Message("Connection closed.".into()))?;
+ let sth = msg_recv.recv().await;
if let Some((id, prio, data)) = sth {
trace!("send_loop: got {}, {} bytes", id, data.len());
sending.push(SendQueueItem {
@@ -173,7 +167,7 @@ pub(crate) trait SendLoop: Sync {
#[async_trait]
pub(crate) trait RecvLoop: Sync + 'static {
// Returns true if we should stop receiving after this
- async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>);
+ fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
where
@@ -205,7 +199,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
if has_cont {
receiving.insert(id, msg_bytes);
} else {
- tokio::spawn(self.clone().recv_handler(id, msg_bytes));
+ self.recv_handler(id, msg_bytes);
}
}
}