diff options
Diffstat (limited to 'src/stream.rs')
-rw-r--r-- | src/stream.rs | 52 |
1 files changed, 9 insertions, 43 deletions
diff --git a/src/stream.rs b/src/stream.rs index 5ba2ed4..cc664ce 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,4 +1,3 @@ -use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -8,6 +7,8 @@ use futures::Future; use futures::{Stream, StreamExt, TryStreamExt}; use tokio::io::AsyncRead; +use crate::bytes_buf::BytesBuf; + /// A stream of associated data. /// /// When sent through Netapp, the Vec may be split in smaller chunk in such a way @@ -23,8 +24,7 @@ pub type Packet = Result<Bytes, u8>; pub struct ByteStreamReader { stream: ByteStream, - buf: VecDeque<Bytes>, - buf_len: usize, + buf: BytesBuf, eos: bool, err: Option<u8>, } @@ -33,8 +33,7 @@ impl ByteStreamReader { pub fn new(stream: ByteStream) -> Self { ByteStreamReader { stream, - buf: VecDeque::with_capacity(8), - buf_len: 0, + buf: BytesBuf::new(), eos: false, err: None, } @@ -75,7 +74,7 @@ impl ByteStreamReader { } pub fn into_stream(self) -> ByteStream { - let buf_stream = futures::stream::iter(self.buf.into_iter().map(Ok)); + let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok)); if let Some(err) = self.err { Box::pin(buf_stream.chain(futures::stream::once(async move { Err(err) }))) } else if self.eos { @@ -86,45 +85,15 @@ impl ByteStreamReader { } pub fn take_buffer(&mut self) -> Bytes { - let bytes = Bytes::from(self.buf.iter().map(|x| &x[..]).collect::<Vec<_>>().concat()); - self.buf.clear(); - self.buf_len = 0; - bytes + self.buf.take_all() } pub fn eos(&self) -> bool { - self.buf_len == 0 && self.eos + self.buf.is_empty() && self.eos } fn try_get(&mut self, read_len: usize) -> Option<Bytes> { - if self.buf_len >= read_len { - let mut slices = Vec::with_capacity(self.buf.len()); - let mut taken = 0; - while taken < read_len { - let front = self.buf.pop_front().unwrap(); - if taken + front.len() <= read_len { - taken += front.len(); - self.buf_len -= front.len(); - slices.push(front); - } else { - let front_take = read_len - taken; - slices.push(front.slice(..front_take)); - self.buf.push_front(front.slice(front_take..)); - self.buf_len -= front_take; - break; - } - } - Some( - slices - .iter() - .map(|x| &x[..]) - .collect::<Vec<_>>() - .concat() - .into(), - ) - } else { - None - } + self.buf.take_exact(read_len) } } @@ -164,10 +133,7 @@ impl<'a> Future for ByteStreamReadExact<'a> { match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) { Some(Ok(slice)) => { - if !slice.is_empty() { - this.reader.buf_len += slice.len(); - this.reader.buf.push_back(slice); - } + this.reader.buf.extend(slice); } Some(Err(e)) => { this.reader.err = Some(e); |