aboutsummaryrefslogtreecommitdiff
path: root/src/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.rs')
-rw-r--r--src/stream.rs52
1 files changed, 9 insertions, 43 deletions
diff --git a/src/stream.rs b/src/stream.rs
index 5ba2ed4..cc664ce 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -1,4 +1,3 @@
-use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -8,6 +7,8 @@ use futures::Future;
use futures::{Stream, StreamExt, TryStreamExt};
use tokio::io::AsyncRead;
+use crate::bytes_buf::BytesBuf;
+
/// A stream of associated data.
///
/// When sent through Netapp, the Vec may be split in smaller chunk in such a way
@@ -23,8 +24,7 @@ pub type Packet = Result<Bytes, u8>;
pub struct ByteStreamReader {
stream: ByteStream,
- buf: VecDeque<Bytes>,
- buf_len: usize,
+ buf: BytesBuf,
eos: bool,
err: Option<u8>,
}
@@ -33,8 +33,7 @@ impl ByteStreamReader {
pub fn new(stream: ByteStream) -> Self {
ByteStreamReader {
stream,
- buf: VecDeque::with_capacity(8),
- buf_len: 0,
+ buf: BytesBuf::new(),
eos: false,
err: None,
}
@@ -75,7 +74,7 @@ impl ByteStreamReader {
}
pub fn into_stream(self) -> ByteStream {
- let buf_stream = futures::stream::iter(self.buf.into_iter().map(Ok));
+ let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok));
if let Some(err) = self.err {
Box::pin(buf_stream.chain(futures::stream::once(async move { Err(err) })))
} else if self.eos {
@@ -86,45 +85,15 @@ impl ByteStreamReader {
}
pub fn take_buffer(&mut self) -> Bytes {
- let bytes = Bytes::from(self.buf.iter().map(|x| &x[..]).collect::<Vec<_>>().concat());
- self.buf.clear();
- self.buf_len = 0;
- bytes
+ self.buf.take_all()
}
pub fn eos(&self) -> bool {
- self.buf_len == 0 && self.eos
+ self.buf.is_empty() && self.eos
}
fn try_get(&mut self, read_len: usize) -> Option<Bytes> {
- if self.buf_len >= read_len {
- let mut slices = Vec::with_capacity(self.buf.len());
- let mut taken = 0;
- while taken < read_len {
- let front = self.buf.pop_front().unwrap();
- if taken + front.len() <= read_len {
- taken += front.len();
- self.buf_len -= front.len();
- slices.push(front);
- } else {
- let front_take = read_len - taken;
- slices.push(front.slice(..front_take));
- self.buf.push_front(front.slice(front_take..));
- self.buf_len -= front_take;
- break;
- }
- }
- Some(
- slices
- .iter()
- .map(|x| &x[..])
- .collect::<Vec<_>>()
- .concat()
- .into(),
- )
- } else {
- None
- }
+ self.buf.take_exact(read_len)
}
}
@@ -164,10 +133,7 @@ impl<'a> Future for ByteStreamReadExact<'a> {
match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) {
Some(Ok(slice)) => {
- if !slice.is_empty() {
- this.reader.buf_len += slice.len();
- this.reader.buf.push_back(slice);
- }
+ this.reader.buf.extend(slice);
}
Some(Err(e)) => {
this.reader.err = Some(e);