diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-13 17:12:13 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-13 17:12:13 +0200 |
commit | 70839d70d86354232f168e63ce4062219acb85c7 (patch) | |
tree | 9c956af0339aa048f487c3a4e54c320be8d13647 /src/proto.rs | |
parent | 8dede69dee20b812ad1dcab5b374c60232409f4f (diff) | |
download | netapp-70839d70d86354232f168e63ce4062219acb85c7.tar.gz netapp-70839d70d86354232f168e63ce4062219acb85c7.zip |
Try to handle termination and closing of stuff properly
Diffstat (limited to 'src/proto.rs')
-rw-r--r-- | src/proto.rs | 56 |
1 files changed, 25 insertions, 31 deletions
diff --git a/src/proto.rs b/src/proto.rs index 3811e3f..f91ffc7 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use log::trace; @@ -50,7 +50,6 @@ type ChunkLength = u16; const MAX_CHUNK_LENGTH: ChunkLength = 0x4000; const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; - struct SendQueueItem { id: RequestID, prio: RequestPriority, @@ -59,31 +58,33 @@ struct SendQueueItem { } struct SendQueue { - items: BTreeMap<u8, VecDeque<SendQueueItem>>, + items: VecDeque<(u8, VecDeque<SendQueueItem>)>, } impl SendQueue { fn new() -> Self { Self { - items: BTreeMap::new(), + items: VecDeque::with_capacity(64), } } fn push(&mut self, item: SendQueueItem) { let prio = item.prio; - let mut items_at_prio = self - .items - .remove(&prio) - .unwrap_or_else(|| VecDeque::with_capacity(4)); - items_at_prio.push_back(item); - self.items.insert(prio, items_at_prio); + let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) { + Ok(i) => i, + Err(i) => { + self.items.insert(i, (prio, VecDeque::new())); + i + } + }; + self.items[pos_prio].1.push_back(item); } fn pop(&mut self) -> Option<SendQueueItem> { - match self.items.pop_first() { + match self.items.pop_front() { None => None, Some((prio, mut items_at_prio)) => { let ret = items_at_prio.pop_front(); if !items_at_prio.is_empty() { - self.items.insert(prio, items_at_prio); + self.items.push_front((prio, items_at_prio)); } ret.or_else(|| self.pop()) } @@ -98,7 +99,7 @@ impl SendQueue { pub(crate) trait SendLoop: Sync { async fn send_loop<W>( self: Arc<Self>, - mut msg_recv: mpsc::UnboundedReceiver<Option<(RequestID, RequestPriority, Vec<u8>)>>, + mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>, mut write: W, ) -> Result<(), Error> where @@ -107,18 +108,14 @@ pub(crate) trait SendLoop: Sync { let mut sending = SendQueue::new(); let mut should_exit = false; while !should_exit || !sending.is_empty() { - if let Ok(sth) = msg_recv.try_recv() { - if let Some((id, prio, data)) = sth { - trace!("send_loop: got {}, {} bytes", id, data.len()); - sending.push(SendQueueItem { - id, - prio, - data, - cursor: 0, - }); - } else { - should_exit = true; - } + if let Ok((id, prio, data)) = msg_recv.try_recv() { + trace!("send_loop: got {}, {} bytes", id, data.len()); + sending.push(SendQueueItem { + id, + prio, + data, + cursor: 0, + }); } else if let Some(mut item) = sending.pop() { trace!( "send_loop: sending bytes for {} ({} bytes, {} already sent)", @@ -149,10 +146,7 @@ pub(crate) trait SendLoop: Sync { } write.flush().await?; } else { - let sth = msg_recv - .recv() - .await - .ok_or_else(|| Error::Message("Connection closed.".into()))?; + let sth = msg_recv.recv().await; if let Some((id, prio, data)) = sth { trace!("send_loop: got {}, {} bytes", id, data.len()); sending.push(SendQueueItem { @@ -173,7 +167,7 @@ pub(crate) trait SendLoop: Sync { #[async_trait] pub(crate) trait RecvLoop: Sync + 'static { // Returns true if we should stop receiving after this - async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>); + fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>); async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> where @@ -205,7 +199,7 @@ pub(crate) trait RecvLoop: Sync + 'static { if has_cont { receiving.insert(id, msg_bytes); } else { - tokio::spawn(self.clone().recv_handler(id, msg_bytes)); + self.recv_handler(id, msg_bytes); } } } |