diff options
-rw-r--r-- | src/recv.rs | 5 | ||||
-rw-r--r-- | src/send.rs | 116 |
2 files changed, 61 insertions, 60 deletions
diff --git a/src/recv.rs b/src/recv.rs index 628612b..f5221e6 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -1,16 +1,13 @@ use std::collections::HashMap; - use std::sync::Arc; +use async_trait::async_trait; use log::trace; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::AsyncReadExt; -use async_trait::async_trait; - use crate::error::*; - use crate::send::*; use crate::util::Packet; diff --git a/src/send.rs b/src/send.rs index 330d41d..0179eb2 100644 --- a/src/send.rs +++ b/src/send.rs @@ -24,6 +24,7 @@ use crate::util::{ByteStream, Packet}; pub(crate) type RequestID = u32; pub(crate) type ChunkLength = u16; + pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; @@ -62,53 +63,58 @@ enum DataFrame { data: [u8; MAX_CHUNK_LENGTH as usize], /// actual lenght of data len: usize, + /// whethere there may be more data comming from this stream. Can be used for some + /// optimization. It's an error to set it to false if there is more data, but it is correct + /// (albeit sub-optimal) to set it to true if there is nothing coming after + may_have_more: bool, }, + /// An error code automatically signals the end of the stream Error(u8), } -struct DataReaderItem { - data: DataFrame, - /// whethere there may be more data comming from this stream. Can be used for some - /// optimization. It's an error to set it to false if there is more data, but it is correct - /// (albeit sub-optimal) to set it to true if there is nothing coming after - may_have_more: bool, -} - -impl DataReaderItem { +impl DataFrame { fn empty_last() -> Self { - DataReaderItem { - data: DataFrame::Data { - data: [0; MAX_CHUNK_LENGTH as usize], - len: 0, - }, + DataFrame::Data { + data: [0; MAX_CHUNK_LENGTH as usize], + len: 0, may_have_more: false, } } fn header(&self) -> [u8; 2] { - let continuation = if self.may_have_more { - CHUNK_HAS_CONTINUATION - } else { - 0 - }; - let len = match self.data { - DataFrame::Data { len, .. } => len as u16, - DataFrame::Error(e) => e as u16 | ERROR_MARKER, + let header_u16 = match self { + DataFrame::Data { + len, + may_have_more: false, + .. + } => *len as u16, + DataFrame::Data { + len, + may_have_more: true, + .. + } => *len as u16 | CHUNK_HAS_CONTINUATION, + DataFrame::Error(e) => *e as u16 | ERROR_MARKER, }; - - ChunkLength::to_be_bytes(len | continuation) + ChunkLength::to_be_bytes(header_u16) } fn data(&self) -> &[u8] { - match self.data { - DataFrame::Data { ref data, len } => &data[..len], + match self { + DataFrame::Data { ref data, len, .. } => &data[..*len], DataFrame::Error(_) => &[], } } + + fn may_have_more(&self) -> bool { + match self { + DataFrame::Data { may_have_more, .. } => *may_have_more, + DataFrame::Error(_) => false, + } + } } impl Stream for DataReader { - type Item = DataReaderItem; + type Item = DataFrame; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); @@ -125,10 +131,8 @@ impl Stream for DataReader { Err(e) => { let e = *e; *this.packet = Ok(Vec::new()); - return Poll::Ready(Some(DataReaderItem { - data: DataFrame::Error(e), - may_have_more: true, - })); + *this.eos = true; + return Poll::Ready(Some(DataFrame::Error(e))); } }; let packet_left = packet.len() - *this.pos; @@ -161,8 +165,9 @@ impl Stream for DataReader { let len = this.buf.len(); body[..len].copy_from_slice(this.buf); this.buf.clear(); - Poll::Ready(Some(DataReaderItem { - data: DataFrame::Data { data: body, len }, + Poll::Ready(Some(DataFrame::Data { + data: body, + len, may_have_more: !*this.eos, })) } @@ -218,38 +223,37 @@ struct SendQueuePollNextReady<'a> { } impl<'a> futures::Future for SendQueuePollNextReady<'a> { - type Output = (RequestID, DataReaderItem); + type Output = (RequestID, DataFrame); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { for i in 0..self.queue.items.len() { let (_prio, items_at_prio) = &mut self.queue.items[i]; - for _ in 0..items_at_prio.len() { - let mut item = items_at_prio.pop_front().unwrap(); - + let mut ready_item = None; + for (j, item) in items_at_prio.iter_mut().enumerate() { match Pin::new(&mut item.data).poll_next(ctx) { - Poll::Pending => items_at_prio.push_back(item), - Poll::Ready(Some(data)) => { - let id = item.id; - if data.may_have_more { - self.queue.push(item); - } else { - if items_at_prio.is_empty() { - // this priority level is empty, remove it - self.queue.items.remove(i); - } - } - return Poll::Ready((id, data)); - } - Poll::Ready(None) => { - if items_at_prio.is_empty() { - // this priority level is empty, remove it - self.queue.items.remove(i); - } - return Poll::Ready((item.id, DataReaderItem::empty_last())); + Poll::Pending => (), + Poll::Ready(ready_v) => { + ready_item = Some((j, ready_v)); + break; } } } + + if let Some((j, ready_v)) = ready_item { + let item = items_at_prio.remove(j).unwrap(); + let id = item.id; + if ready_v + .as_ref() + .map(|data| data.may_have_more()) + .unwrap_or(false) + { + items_at_prio.push_back(item); + } else if items_at_prio.is_empty() { + self.queue.items.remove(i); + } + return Poll::Ready((id, ready_v.unwrap_or_else(DataFrame::empty_last))); + } } // TODO what do we do if self.queue is empty? We won't get scheduled again. Poll::Pending |