diff options
Diffstat (limited to 'src/send.rs')
-rw-r--r-- | src/send.rs | 61 |
1 files changed, 32 insertions, 29 deletions
diff --git a/src/send.rs b/src/send.rs index fd415c6..f362962 100644 --- a/src/send.rs +++ b/src/send.rs @@ -18,9 +18,11 @@ use crate::stream::*; // Messages are sent by chunks // Chunk format: // - u32 BE: request id (same for request and response) -// - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag -// when this is not the last chunk of the message -// - [u8; chunk_length] chunk data +// - u16 BE: chunk length + flags: +// CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream +// ERROR_MARKER if this chunk denotes an error +// (these two flags are exclusive, an error denotes the end of the stream) +// - [u8; chunk_length] chunk data / error message pub(crate) type RequestID = u32; pub(crate) type ChunkLength = u16; @@ -28,6 +30,7 @@ pub(crate) type ChunkLength = u16; pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; +pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF; struct SendQueue { items: Vec<(u8, VecDeque<SendQueueItem>)>, @@ -92,29 +95,12 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> { let id = item.id; let eos = item.data.eos(); - let data_frame = match bytes_or_err { - Ok(bytes) => { - trace!( - "send queue poll next ready: id {} eos {:?} bytes {}", - id, - eos, - bytes.len() - ); - DataFrame::Data(bytes, !eos) - } - Err(e) => DataFrame::Error(match e { - ReadExactError::Stream(code) => { - trace!( - "send queue poll next ready: id {} eos {:?} ERROR {}", - id, - eos, - code - ); - code - } - _ => unreachable!(), - }), - }; + let packet = bytes_or_err.map_err(|e| match e { + ReadExactError::Stream(err) => err, + _ => unreachable!(), + }); + + let data_frame = DataFrame::from_packet(packet, !eos); if !eos && !matches!(data_frame, DataFrame::Error(_)) { items_at_prio.push_back(item); @@ -139,15 +125,32 @@ enum DataFrame { /// (albeit sub-optimal) to set it to true if there is nothing coming after Data(Bytes, bool), /// An error code automatically signals the end of the stream - Error(u8), + Error(Bytes), } impl DataFrame { + fn from_packet(p: Packet, has_cont: bool) -> Self { + match p { + Ok(bytes) => { + assert!(bytes.len() <= MAX_CHUNK_LENGTH as usize); + Self::Data(bytes, has_cont) + } + Err(e) => { + let msg = format!("{}", e); + let mut msg = Bytes::from(msg.into_bytes()); + if msg.len() > MAX_CHUNK_LENGTH as usize { + msg = msg.slice(..MAX_CHUNK_LENGTH as usize); + } + Self::Error(msg) + } + } + } + fn header(&self) -> [u8; 2] { let header_u16 = match self { DataFrame::Data(data, false) => data.len() as u16, DataFrame::Data(data, true) => data.len() as u16 | CHUNK_HAS_CONTINUATION, - DataFrame::Error(e) => *e as u16 | ERROR_MARKER, + DataFrame::Error(msg) => msg.len() as u16 | ERROR_MARKER, }; ChunkLength::to_be_bytes(header_u16) } @@ -155,7 +158,7 @@ impl DataFrame { fn data(&self) -> &[u8] { match self { DataFrame::Data(ref data, _) => &data[..], - DataFrame::Error(_) => &[], + DataFrame::Error(ref msg) => &msg[..], } } } |