aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 13:23:42 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-22 13:23:42 +0200
commitf9db9a4b696569bbc56c40b9170320307ebcdd81 (patch)
tree548bb5d1bac583daadddbaea461a81db9e4d4bf4
parent5da59ebec5f3072d0b6c3b1ffc90eb8923c50ad9 (diff)
downloadnetapp-f9db9a4b696569bbc56c40b9170320307ebcdd81.tar.gz
netapp-f9db9a4b696569bbc56c40b9170320307ebcdd81.zip
Simplify send.rs
-rw-r--r--src/send.rs205
-rw-r--r--src/stream.rs29
2 files changed, 68 insertions, 166 deletions
diff --git a/src/send.rs b/src/send.rs
index 59805cf..a8cf966 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -8,7 +8,6 @@ use bytes::Bytes;
use log::trace;
use futures::AsyncWriteExt;
-use futures::Stream;
use kuska_handshake::async_std::BoxStreamWrite;
use tokio::sync::mpsc;
@@ -30,152 +29,14 @@ pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0;
pub(crate) const ERROR_MARKER: ChunkLength = 0x4000;
pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
+struct SendQueue {
+ items: VecDeque<(u8, VecDeque<SendQueueItem>)>,
+}
+
struct SendQueueItem {
id: RequestID,
prio: RequestPriority,
- data: DataReader,
-}
-
-#[pin_project::pin_project]
-struct DataReader {
- #[pin]
- reader: ByteStream,
- packet: Packet,
- pos: usize,
- buf: Vec<u8>,
- eos: bool,
-}
-
-impl From<ByteStream> for DataReader {
- fn from(data: ByteStream) -> DataReader {
- DataReader {
- reader: data,
- packet: Ok(Bytes::new()),
- pos: 0,
- buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize),
- eos: false,
- }
- }
-}
-
-enum DataFrame {
- Data {
- /// a fixed size buffer containing some data, possibly padded with 0s
- data: [u8; MAX_CHUNK_LENGTH as usize],
- /// actual lenght of data
- len: usize,
- /// whethere there may be more data comming from this stream. Can be used for some
- /// optimization. It's an error to set it to false if there is more data, but it is correct
- /// (albeit sub-optimal) to set it to true if there is nothing coming after
- may_have_more: bool,
- },
- /// An error code automatically signals the end of the stream
- Error(u8),
-}
-
-impl DataFrame {
- fn empty_last() -> Self {
- DataFrame::Data {
- data: [0; MAX_CHUNK_LENGTH as usize],
- len: 0,
- may_have_more: false,
- }
- }
-
- fn header(&self) -> [u8; 2] {
- let header_u16 = match self {
- DataFrame::Data {
- len,
- may_have_more: false,
- ..
- } => *len as u16,
- DataFrame::Data {
- len,
- may_have_more: true,
- ..
- } => *len as u16 | CHUNK_HAS_CONTINUATION,
- DataFrame::Error(e) => *e as u16 | ERROR_MARKER,
- };
- ChunkLength::to_be_bytes(header_u16)
- }
-
- fn data(&self) -> &[u8] {
- match self {
- DataFrame::Data { ref data, len, .. } => &data[..*len],
- DataFrame::Error(_) => &[],
- }
- }
-
- fn may_have_more(&self) -> bool {
- match self {
- DataFrame::Data { may_have_more, .. } => *may_have_more,
- DataFrame::Error(_) => false,
- }
- }
-}
-
-impl Stream for DataReader {
- type Item = DataFrame;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let mut this = self.project();
-
- if *this.eos {
- // eos was reached at previous call to poll_next, where a partial packet
- // was returned. Now return None
- return Poll::Ready(None);
- }
-
- loop {
- let packet = match this.packet {
- Ok(v) => v,
- Err(e) => {
- let e = *e;
- *this.packet = Ok(Bytes::new());
- *this.eos = true;
- return Poll::Ready(Some(DataFrame::Error(e)));
- }
- };
- let packet_left = packet.len() - *this.pos;
- let buf_left = MAX_CHUNK_LENGTH as usize - this.buf.len();
- let to_read = std::cmp::min(buf_left, packet_left);
- this.buf
- .extend_from_slice(&packet[*this.pos..*this.pos + to_read]);
- *this.pos += to_read;
- if this.buf.len() == MAX_CHUNK_LENGTH as usize {
- // we have a full buf, ready to send
- break;
- }
-
- // we don't have a full buf, packet is empty; try receive more
- if let Some(p) = futures::ready!(this.reader.as_mut().poll_next(cx)) {
- *this.packet = p;
- *this.pos = 0;
- // if buf is empty, we will loop and return the error directly. If buf
- // isn't empty, send it before by breaking.
- if this.packet.is_err() && !this.buf.is_empty() {
- break;
- }
- } else {
- *this.eos = true;
- break;
- }
- }
-
- let mut body = [0; MAX_CHUNK_LENGTH as usize];
- let len = this.buf.len();
- body[..len].copy_from_slice(this.buf);
- this.buf.clear();
- Poll::Ready(Some(DataFrame::Data {
- data: body,
- len,
- may_have_more: !*this.eos,
- }))
- }
-}
-
-struct SendQueue {
- items: VecDeque<(u8, VecDeque<SendQueueItem>)>,
+ data: ByteStreamReader,
}
impl SendQueue {
@@ -232,35 +93,69 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> {
let mut ready_item = None;
for (j, item) in items_at_prio.iter_mut().enumerate() {
- match Pin::new(&mut item.data).poll_next(ctx) {
+ let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
+ match Pin::new(&mut item_reader).poll(ctx) {
Poll::Pending => (),
Poll::Ready(ready_v) => {
- ready_item = Some((j, ready_v));
+ ready_item = Some((j, ready_v, item.data.eos()));
break;
}
}
}
- if let Some((j, ready_v)) = ready_item {
+ if let Some((j, bytes_or_err, eos)) = ready_item {
+ let data_frame = match bytes_or_err {
+ Ok(bytes) => DataFrame::Data(bytes, !eos),
+ Err(e) => DataFrame::Error(match e {
+ ReadExactError::Stream(code) => code,
+ _ => unreachable!(),
+ }),
+ };
let item = items_at_prio.remove(j).unwrap();
let id = item.id;
- if ready_v
- .as_ref()
- .map(|data| data.may_have_more())
- .unwrap_or(false)
- {
+ if !eos {
items_at_prio.push_back(item);
} else if items_at_prio.is_empty() {
self.queue.items.remove(i);
}
- return Poll::Ready((id, ready_v.unwrap_or_else(DataFrame::empty_last)));
+ return Poll::Ready((id, data_frame));
}
}
- // TODO what do we do if self.queue is empty? We won't get scheduled again.
+ // If the queue is empty, this futures is eternally pending.
+ // This is ok because we use it in a select with another future
+ // that can interrupt it.
Poll::Pending
}
}
+enum DataFrame {
+ /// a fixed size buffer containing some data + a boolean indicating whether
+ /// there may be more data comming from this stream. Can be used for some
+ /// optimization. It's an error to set it to false if there is more data, but it is correct
+ /// (albeit sub-optimal) to set it to true if there is nothing coming after
+ Data(Bytes, bool),
+ /// An error code automatically signals the end of the stream
+ Error(u8),
+}
+
+impl DataFrame {
+ fn header(&self) -> [u8; 2] {
+ let header_u16 = match self {
+ DataFrame::Data(data, false) => data.len() as u16,
+ DataFrame::Data(data, true) => data.len() as u16 | CHUNK_HAS_CONTINUATION,
+ DataFrame::Error(e) => *e as u16 | ERROR_MARKER,
+ };
+ ChunkLength::to_be_bytes(header_u16)
+ }
+
+ fn data(&self) -> &[u8] {
+ match self {
+ DataFrame::Data(ref data, _) => &data[..],
+ DataFrame::Error(_) => &[],
+ }
+ }
+}
+
/// The SendLoop trait, which is implemented both by the client and the server
/// connection objects (ServerConna and ClientConn) adds a method `.send_loop()`
/// that takes a channel of messages to send and an asynchronous writer,
@@ -295,7 +190,7 @@ pub(crate) trait SendLoop: Sync {
sending.push(SendQueueItem {
id,
prio,
- data: data.into(),
+ data: ByteStreamReader::new(data),
});
} else {
should_exit = true;
diff --git a/src/stream.rs b/src/stream.rs
index 6c23f4a..ae57d62 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -82,6 +82,23 @@ impl ByteStreamReader {
}
}
+ pub fn take_buffer(&mut self) -> Bytes {
+ let bytes = Bytes::from(
+ self .buf
+ .iter()
+ .map(|x| &x[..])
+ .collect::<Vec<_>>()
+ .concat(),
+ );
+ self.buf.clear();
+ self.buf_len = 0;
+ bytes
+ }
+
+ pub fn eos(&self) -> bool {
+ self.buf.is_empty() && self.eos
+ }
+
fn try_get(&mut self, read_len: usize) -> Option<Bytes> {
if self.buf_len >= read_len {
let mut slices = Vec::with_capacity(self.buf.len());
@@ -144,17 +161,7 @@ impl<'a> Future for ByteStreamReadExact<'a> {
if *this.fail_on_eos {
return Poll::Ready(Err(ReadExactError::UnexpectedEos));
} else {
- let bytes = Bytes::from(
- this.reader
- .buf
- .iter()
- .map(|x| &x[..])
- .collect::<Vec<_>>()
- .concat(),
- );
- this.reader.buf.clear();
- this.reader.buf_len = 0;
- return Poll::Ready(Ok(bytes));
+ return Poll::Ready(Ok(this.reader.take_buffer()));
}
}