aboutsummaryrefslogtreecommitdiff
path: root/src/send.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-01 11:21:24 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-01 11:21:24 +0200
commit7909a95d3c02a738c9a088c1cb8a5d6f70b06046 (patch)
treec38b20c9f2f6da6971ceb980bbb7a8be4668b661 /src/send.rs
parent263db66fcee65deda39de18baa837228ea38baf1 (diff)
downloadnetapp-7909a95d3c02a738c9a088c1cb8a5d6f70b06046.tar.gz
netapp-7909a95d3c02a738c9a088c1cb8a5d6f70b06046.zip
Stream errors are now std::io::Error
Diffstat (limited to 'src/send.rs')
-rw-r--r--src/send.rs61
1 files changed, 32 insertions, 29 deletions
diff --git a/src/send.rs b/src/send.rs
index fd415c6..f362962 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -18,9 +18,11 @@ use crate::stream::*;
// Messages are sent by chunks
// Chunk format:
// - u32 BE: request id (same for request and response)
-// - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag
-// when this is not the last chunk of the message
-// - [u8; chunk_length] chunk data
+// - u16 BE: chunk length + flags:
+// CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream
+// ERROR_MARKER if this chunk denotes an error
+// (these two flags are exclusive, an error denotes the end of the stream)
+// - [u8; chunk_length] chunk data / error message
pub(crate) type RequestID = u32;
pub(crate) type ChunkLength = u16;
@@ -28,6 +30,7 @@ pub(crate) type ChunkLength = u16;
pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0;
pub(crate) const ERROR_MARKER: ChunkLength = 0x4000;
pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
+pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF;
struct SendQueue {
items: Vec<(u8, VecDeque<SendQueueItem>)>,
@@ -92,29 +95,12 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> {
let id = item.id;
let eos = item.data.eos();
- let data_frame = match bytes_or_err {
- Ok(bytes) => {
- trace!(
- "send queue poll next ready: id {} eos {:?} bytes {}",
- id,
- eos,
- bytes.len()
- );
- DataFrame::Data(bytes, !eos)
- }
- Err(e) => DataFrame::Error(match e {
- ReadExactError::Stream(code) => {
- trace!(
- "send queue poll next ready: id {} eos {:?} ERROR {}",
- id,
- eos,
- code
- );
- code
- }
- _ => unreachable!(),
- }),
- };
+ let packet = bytes_or_err.map_err(|e| match e {
+ ReadExactError::Stream(err) => err,
+ _ => unreachable!(),
+ });
+
+ let data_frame = DataFrame::from_packet(packet, !eos);
if !eos && !matches!(data_frame, DataFrame::Error(_)) {
items_at_prio.push_back(item);
@@ -139,15 +125,32 @@ enum DataFrame {
/// (albeit sub-optimal) to set it to true if there is nothing coming after
Data(Bytes, bool),
/// An error code automatically signals the end of the stream
- Error(u8),
+ Error(Bytes),
}
impl DataFrame {
+ fn from_packet(p: Packet, has_cont: bool) -> Self {
+ match p {
+ Ok(bytes) => {
+ assert!(bytes.len() <= MAX_CHUNK_LENGTH as usize);
+ Self::Data(bytes, has_cont)
+ }
+ Err(e) => {
+ let msg = format!("{}", e);
+ let mut msg = Bytes::from(msg.into_bytes());
+ if msg.len() > MAX_CHUNK_LENGTH as usize {
+ msg = msg.slice(..MAX_CHUNK_LENGTH as usize);
+ }
+ Self::Error(msg)
+ }
+ }
+ }
+
fn header(&self) -> [u8; 2] {
let header_u16 = match self {
DataFrame::Data(data, false) => data.len() as u16,
DataFrame::Data(data, true) => data.len() as u16 | CHUNK_HAS_CONTINUATION,
- DataFrame::Error(e) => *e as u16 | ERROR_MARKER,
+ DataFrame::Error(msg) => msg.len() as u16 | ERROR_MARKER,
};
ChunkLength::to_be_bytes(header_u16)
}
@@ -155,7 +158,7 @@ impl DataFrame {
fn data(&self) -> &[u8] {
match self {
DataFrame::Data(ref data, _) => &data[..],
- DataFrame::Error(_) => &[],
+ DataFrame::Error(ref msg) => &msg[..],
}
}
}