diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/message.rs | 21 | ||||
-rw-r--r-- | src/send.rs | 18 | ||||
-rw-r--r-- | src/stream.rs | 8 |
3 files changed, 18 insertions, 29 deletions
diff --git a/src/message.rs b/src/message.rs index 2ed5c98..56e6e8e 100644 --- a/src/message.rs +++ b/src/message.rs @@ -103,7 +103,10 @@ impl<M: Message> Req<M> { _phantom: Default::default(), msg: Arc::new(msg), msg_ser: Some(enc.msg), - stream: enc.stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None), + stream: enc + .stream + .map(AttachedStream::Stream) + .unwrap_or(AttachedStream::None), }) } } @@ -147,7 +150,9 @@ impl<M: Message> Clone for Req<M> { let stream = match &self.stream { AttachedStream::None => AttachedStream::None, AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()), - AttachedStream::Stream(_) => panic!("Cannot clone a Req<_> with a non-buffer attached stream"), + AttachedStream::Stream(_) => { + panic!("Cannot clone a Req<_> with a non-buffer attached stream") + } }; Self { _phantom: Default::default(), @@ -231,7 +236,9 @@ impl<M: Message> Resp<M> { Ok(Self { _phantom: Default::default(), msg, - stream: stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None), + stream: stream + .map(AttachedStream::Stream) + .unwrap_or(AttachedStream::None), }) } RespEnc::Error { code, message } => Err(Error::Remote(code, message)), @@ -293,7 +300,9 @@ pub(crate) struct ReqEnc { impl ReqEnc { pub(crate) fn encode(self) -> ByteStream { - let mut buf = BytesMut::with_capacity(64); + let mut buf = BytesMut::with_capacity( + self.path.len() + self.telemetry_id.len() + self.msg.len() + 16, + ); buf.put_u8(self.prio); @@ -375,7 +384,7 @@ impl RespEnc { pub(crate) fn encode(self) -> ByteStream { match self { RespEnc::Success { msg, stream } => { - let mut buf = BytesMut::with_capacity(64); + let mut buf = BytesMut::with_capacity(msg.len() + 8); buf.put_u8(0); @@ -391,7 +400,7 @@ impl RespEnc { } } RespEnc::Error { code, message } => { - let mut buf = BytesMut::with_capacity(64); + let mut buf = BytesMut::with_capacity(message.len() + 8); buf.put_u8(1 + message.len() as u8); buf.put_u8(code); buf.put(message.as_bytes()); diff --git a/src/send.rs b/src/send.rs index a8cf966..f1df6f7 100644 --- a/src/send.rs +++ b/src/send.rs @@ -30,7 +30,7 @@ pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; struct SendQueue { - items: VecDeque<(u8, VecDeque<SendQueueItem>)>, + items: Vec<(u8, VecDeque<SendQueueItem>)>, } struct SendQueueItem { @@ -42,7 +42,7 @@ struct SendQueueItem { impl SendQueue { fn new() -> Self { Self { - items: VecDeque::with_capacity(64), + items: Vec::with_capacity(64), } } fn push(&mut self, item: SendQueueItem) { @@ -56,20 +56,6 @@ impl SendQueue { }; self.items[pos_prio].1.push_back(item); } - // used only in tests. They should probably be rewriten - #[allow(dead_code)] - fn pop(&mut self) -> Option<SendQueueItem> { - match self.items.pop_front() { - None => None, - Some((prio, mut items_at_prio)) => { - let ret = items_at_prio.pop_front(); - if !items_at_prio.is_empty() { - self.items.push_front((prio, items_at_prio)); - } - ret.or_else(|| self.pop()) - } - } - } fn is_empty(&self) -> bool { self.items.iter().all(|(_k, v)| v.is_empty()) } diff --git a/src/stream.rs b/src/stream.rs index ae57d62..aa7641f 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -83,13 +83,7 @@ impl ByteStreamReader { } pub fn take_buffer(&mut self) -> Bytes { - let bytes = Bytes::from( - self .buf - .iter() - .map(|x| &x[..]) - .collect::<Vec<_>>() - .concat(), - ); + let bytes = Bytes::from(self.buf.iter().map(|x| &x[..]).collect::<Vec<_>>().concat()); self.buf.clear(); self.buf_len = 0; bytes |