aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-01 10:29:26 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-01 10:29:26 +0200
commit263db66fcee65deda39de18baa837228ea38baf1 (patch)
tree2e6bec80ddc5529eebcdd61757fed03e97b2badf
parent3fd30c6e280fba41377c8b563352d756e8bc1caf (diff)
downloadnetapp-263db66fcee65deda39de18baa837228ea38baf1.tar.gz
netapp-263db66fcee65deda39de18baa837228ea38baf1.zip
Refactor: create a BytesBuf utility crate (will also be usefull in Garage)
-rw-r--r--src/bytes_buf.rs167
-rw-r--r--src/lib.rs1
-rw-r--r--src/stream.rs52
3 files changed, 177 insertions, 43 deletions
diff --git a/src/bytes_buf.rs b/src/bytes_buf.rs
new file mode 100644
index 0000000..46c7039
--- /dev/null
+++ b/src/bytes_buf.rs
@@ -0,0 +1,167 @@
+use std::collections::VecDeque;
+
+pub use bytes::Bytes;
+
+/// A circular buffer of bytes, internally represented as a list of Bytes
+/// for optimization, but that for all intent and purposes acts just like
+/// a big byte slice which can be extended on the right and from which
+/// one can take on the left.
+pub struct BytesBuf {
+ buf: VecDeque<Bytes>,
+ buf_len: usize,
+}
+
+impl BytesBuf {
+ /// Creates a new empty BytesBuf
+ pub fn new() -> Self {
+ Self {
+ buf: VecDeque::new(),
+ buf_len: 0,
+ }
+ }
+
+ /// Returns the number of bytes stored in the BytesBuf
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.buf_len
+ }
+
+ /// Returns true iff the BytesBuf contains zero bytes
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.buf_len == 0
+ }
+
+ /// Adds some bytes to the right of the buffer
+ pub fn extend(&mut self, b: Bytes) {
+ if !b.is_empty() {
+ self.buf_len += b.len();
+ self.buf.push_back(b);
+ }
+ }
+
+ /// Takes the whole content of the buffer and returns it as a single Bytes unit
+ pub fn take_all(&mut self) -> Bytes {
+ if self.buf.len() == 0 {
+ Bytes::new()
+ } else if self.buf.len() == 1 {
+ self.buf_len = 0;
+ self.buf.pop_back().unwrap()
+ } else {
+ let mut ret = Vec::with_capacity(self.buf_len);
+ for b in self.buf.iter() {
+ ret.extend(&b[..]);
+ }
+ self.buf.clear();
+ self.buf_len = 0;
+ Bytes::from(ret)
+ }
+ }
+
+ /// Takes at most max_len bytes from the left of the buffer
+ pub fn take_max(&mut self, max_len: usize) -> Bytes {
+ if self.buf_len <= max_len {
+ self.take_all()
+ } else {
+ self.take_exact_ok(max_len)
+ }
+ }
+
+ /// Take exactly len bytes from the left of the buffer, returns None if
+ /// the BytesBuf doesn't contain enough data
+ pub fn take_exact(&mut self, len: usize) -> Option<Bytes> {
+ if self.buf_len < len {
+ None
+ } else {
+ Some(self.take_exact_ok(len))
+ }
+ }
+
+ fn take_exact_ok(&mut self, len: usize) -> Bytes {
+ assert!(len <= self.buf_len);
+ let front = self.buf.pop_front().unwrap();
+ if front.len() > len {
+ self.buf.push_front(front.slice(len..));
+ self.buf_len -= len;
+ front.slice(..len)
+ } else if front.len() == len {
+ self.buf_len -= len;
+ front
+ } else {
+ let mut ret = Vec::with_capacity(len);
+ ret.extend(&front[..]);
+ self.buf_len -= front.len();
+ while ret.len() < len {
+ let front = self.buf.pop_front().unwrap();
+ if front.len() > len - ret.len() {
+ let take = len - ret.len();
+ ret.extend(front.slice(..take));
+ self.buf.push_front(front.slice(take..));
+ self.buf_len -= take;
+ break;
+ } else {
+ ret.extend(&front[..]);
+ self.buf_len -= front.len();
+ }
+ }
+ Bytes::from(ret)
+ }
+ }
+
+ /// Return the internal sequence of Bytes slices that make up the buffer
+ pub fn into_slices(self) -> VecDeque<Bytes> {
+ self.buf
+ }
+}
+
+impl From<Bytes> for BytesBuf {
+ fn from(b: Bytes) -> BytesBuf {
+ let mut ret = BytesBuf::new();
+ ret.extend(b);
+ ret
+ }
+}
+
+impl From<BytesBuf> for Bytes {
+ fn from(mut b: BytesBuf) -> Bytes {
+ b.take_all()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_bytes_buf() {
+ let mut buf = BytesBuf::new();
+ assert!(buf.len() == 0);
+ assert!(buf.is_empty());
+
+ buf.extend(Bytes::from(b"Hello, world!".to_vec()));
+ assert!(buf.len() == 13);
+ assert!(!buf.is_empty());
+
+ buf.extend(Bytes::from(b"1234567890".to_vec()));
+ assert!(buf.len() == 23);
+ assert!(!buf.is_empty());
+
+ assert_eq!(buf.take_all(), Bytes::from(b"Hello, world!1234567890".to_vec()));
+ assert!(buf.len() == 0);
+ assert!(buf.is_empty());
+
+ buf.extend(Bytes::from(b"1234567890".to_vec()));
+ buf.extend(Bytes::from(b"Hello, world!".to_vec()));
+ assert!(buf.len() == 23);
+ assert!(!buf.is_empty());
+
+ assert_eq!(buf.take_max(12), Bytes::from(b"1234567890He".to_vec()));
+ assert!(buf.len() == 11);
+
+ assert_eq!(buf.take_exact(12), None);
+ assert!(buf.len() == 11);
+ assert_eq!(buf.take_exact(11), Some(Bytes::from(b"llo, world!".to_vec())));
+ assert!(buf.len() == 0);
+ assert!(buf.is_empty());
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index bd41048..18091c8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -16,6 +16,7 @@
pub mod error;
pub mod stream;
pub mod util;
+pub mod bytes_buf;
pub mod endpoint;
pub mod message;
diff --git a/src/stream.rs b/src/stream.rs
index 5ba2ed4..cc664ce 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -1,4 +1,3 @@
-use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -8,6 +7,8 @@ use futures::Future;
use futures::{Stream, StreamExt, TryStreamExt};
use tokio::io::AsyncRead;
+use crate::bytes_buf::BytesBuf;
+
/// A stream of associated data.
///
/// When sent through Netapp, the Vec may be split in smaller chunk in such a way
@@ -23,8 +24,7 @@ pub type Packet = Result<Bytes, u8>;
pub struct ByteStreamReader {
stream: ByteStream,
- buf: VecDeque<Bytes>,
- buf_len: usize,
+ buf: BytesBuf,
eos: bool,
err: Option<u8>,
}
@@ -33,8 +33,7 @@ impl ByteStreamReader {
pub fn new(stream: ByteStream) -> Self {
ByteStreamReader {
stream,
- buf: VecDeque::with_capacity(8),
- buf_len: 0,
+ buf: BytesBuf::new(),
eos: false,
err: None,
}
@@ -75,7 +74,7 @@ impl ByteStreamReader {
}
pub fn into_stream(self) -> ByteStream {
- let buf_stream = futures::stream::iter(self.buf.into_iter().map(Ok));
+ let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok));
if let Some(err) = self.err {
Box::pin(buf_stream.chain(futures::stream::once(async move { Err(err) })))
} else if self.eos {
@@ -86,45 +85,15 @@ impl ByteStreamReader {
}
pub fn take_buffer(&mut self) -> Bytes {
- let bytes = Bytes::from(self.buf.iter().map(|x| &x[..]).collect::<Vec<_>>().concat());
- self.buf.clear();
- self.buf_len = 0;
- bytes
+ self.buf.take_all()
}
pub fn eos(&self) -> bool {
- self.buf_len == 0 && self.eos
+ self.buf.is_empty() && self.eos
}
fn try_get(&mut self, read_len: usize) -> Option<Bytes> {
- if self.buf_len >= read_len {
- let mut slices = Vec::with_capacity(self.buf.len());
- let mut taken = 0;
- while taken < read_len {
- let front = self.buf.pop_front().unwrap();
- if taken + front.len() <= read_len {
- taken += front.len();
- self.buf_len -= front.len();
- slices.push(front);
- } else {
- let front_take = read_len - taken;
- slices.push(front.slice(..front_take));
- self.buf.push_front(front.slice(front_take..));
- self.buf_len -= front_take;
- break;
- }
- }
- Some(
- slices
- .iter()
- .map(|x| &x[..])
- .collect::<Vec<_>>()
- .concat()
- .into(),
- )
- } else {
- None
- }
+ self.buf.take_exact(read_len)
}
}
@@ -164,10 +133,7 @@ impl<'a> Future for ByteStreamReadExact<'a> {
match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) {
Some(Ok(slice)) => {
- if !slice.is_empty() {
- this.reader.buf_len += slice.len();
- this.reader.buf.push_back(slice);
- }
+ this.reader.buf.extend(slice);
}
Some(Err(e)) => {
this.reader.err = Some(e);