aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 13:48:43 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-22 13:48:43 +0200
commit50358b944ae7ee4b4aa292ede8bc5d185c86df65 (patch)
treeaaca17843db5f44770d6ff308c45d8321274c70b
parentaa1b29d41a680f9ae266ed4bdecec89db58226c1 (diff)
downloadnetapp-50358b944ae7ee4b4aa292ede8bc5d185c86df65.tar.gz
netapp-50358b944ae7ee4b4aa292ede8bc5d185c86df65.zip
Cargo fmt; better adapt with_capacity_values
-rw-r--r--src/message.rs21
-rw-r--r--src/send.rs18
-rw-r--r--src/stream.rs8
3 files changed, 18 insertions, 29 deletions
diff --git a/src/message.rs b/src/message.rs
index 2ed5c98..56e6e8e 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -103,7 +103,10 @@ impl<M: Message> Req<M> {
_phantom: Default::default(),
msg: Arc::new(msg),
msg_ser: Some(enc.msg),
- stream: enc.stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None),
+ stream: enc
+ .stream
+ .map(AttachedStream::Stream)
+ .unwrap_or(AttachedStream::None),
})
}
}
@@ -147,7 +150,9 @@ impl<M: Message> Clone for Req<M> {
let stream = match &self.stream {
AttachedStream::None => AttachedStream::None,
AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()),
- AttachedStream::Stream(_) => panic!("Cannot clone a Req<_> with a non-buffer attached stream"),
+ AttachedStream::Stream(_) => {
+ panic!("Cannot clone a Req<_> with a non-buffer attached stream")
+ }
};
Self {
_phantom: Default::default(),
@@ -231,7 +236,9 @@ impl<M: Message> Resp<M> {
Ok(Self {
_phantom: Default::default(),
msg,
- stream: stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None),
+ stream: stream
+ .map(AttachedStream::Stream)
+ .unwrap_or(AttachedStream::None),
})
}
RespEnc::Error { code, message } => Err(Error::Remote(code, message)),
@@ -293,7 +300,9 @@ pub(crate) struct ReqEnc {
impl ReqEnc {
pub(crate) fn encode(self) -> ByteStream {
- let mut buf = BytesMut::with_capacity(64);
+ let mut buf = BytesMut::with_capacity(
+ self.path.len() + self.telemetry_id.len() + self.msg.len() + 16,
+ );
buf.put_u8(self.prio);
@@ -375,7 +384,7 @@ impl RespEnc {
pub(crate) fn encode(self) -> ByteStream {
match self {
RespEnc::Success { msg, stream } => {
- let mut buf = BytesMut::with_capacity(64);
+ let mut buf = BytesMut::with_capacity(msg.len() + 8);
buf.put_u8(0);
@@ -391,7 +400,7 @@ impl RespEnc {
}
}
RespEnc::Error { code, message } => {
- let mut buf = BytesMut::with_capacity(64);
+ let mut buf = BytesMut::with_capacity(message.len() + 8);
buf.put_u8(1 + message.len() as u8);
buf.put_u8(code);
buf.put(message.as_bytes());
diff --git a/src/send.rs b/src/send.rs
index a8cf966..f1df6f7 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -30,7 +30,7 @@ pub(crate) const ERROR_MARKER: ChunkLength = 0x4000;
pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
struct SendQueue {
- items: VecDeque<(u8, VecDeque<SendQueueItem>)>,
+ items: Vec<(u8, VecDeque<SendQueueItem>)>,
}
struct SendQueueItem {
@@ -42,7 +42,7 @@ struct SendQueueItem {
impl SendQueue {
fn new() -> Self {
Self {
- items: VecDeque::with_capacity(64),
+ items: Vec::with_capacity(64),
}
}
fn push(&mut self, item: SendQueueItem) {
@@ -56,20 +56,6 @@ impl SendQueue {
};
self.items[pos_prio].1.push_back(item);
}
- // used only in tests. They should probably be rewriten
- #[allow(dead_code)]
- fn pop(&mut self) -> Option<SendQueueItem> {
- match self.items.pop_front() {
- None => None,
- Some((prio, mut items_at_prio)) => {
- let ret = items_at_prio.pop_front();
- if !items_at_prio.is_empty() {
- self.items.push_front((prio, items_at_prio));
- }
- ret.or_else(|| self.pop())
- }
- }
- }
fn is_empty(&self) -> bool {
self.items.iter().all(|(_k, v)| v.is_empty())
}
diff --git a/src/stream.rs b/src/stream.rs
index ae57d62..aa7641f 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -83,13 +83,7 @@ impl ByteStreamReader {
}
pub fn take_buffer(&mut self) -> Bytes {
- let bytes = Bytes::from(
- self .buf
- .iter()
- .map(|x| &x[..])
- .collect::<Vec<_>>()
- .concat(),
- );
+ let bytes = Bytes::from(self.buf.iter().map(|x| &x[..]).collect::<Vec<_>>().concat());
self.buf.clear();
self.buf_len = 0;
bytes