use std::cmp::Ordering; use std::collections::VecDeque; use bytes::BytesMut; use crate::stream::ByteStream; pub use bytes::Bytes; /// A circular buffer of bytes, internally represented as a list of Bytes /// for optimization, but that for all intent and purposes acts just like /// a big byte slice which can be extended on the right and from which /// stuff can be taken on the left. pub struct BytesBuf { buf: VecDeque<Bytes>, buf_len: usize, } impl BytesBuf { /// Creates a new empty BytesBuf pub fn new() -> Self { Self { buf: VecDeque::new(), buf_len: 0, } } /// Returns the number of bytes stored in the BytesBuf #[inline] pub fn len(&self) -> usize { self.buf_len } /// Returns true iff the BytesBuf contains zero bytes #[inline] pub fn is_empty(&self) -> bool { self.buf_len == 0 } /// Adds some bytes to the right of the buffer pub fn extend(&mut self, b: Bytes) { if !b.is_empty() { self.buf_len += b.len(); self.buf.push_back(b); } } /// Takes the whole content of the buffer and returns it as a single Bytes unit pub fn take_all(&mut self) -> Bytes { if self.buf.is_empty() { Bytes::new() } else if self.buf.len() == 1 { self.buf_len = 0; self.buf.pop_back().unwrap() } else { let mut ret = BytesMut::with_capacity(self.buf_len); for b in self.buf.iter() { ret.extend_from_slice(&b[..]); } self.buf.clear(); self.buf_len = 0; ret.freeze() } } /// Takes at most max_len bytes from the left of the buffer pub fn take_max(&mut self, max_len: usize) -> Bytes { if self.buf_len <= max_len { self.take_all() } else { self.take_exact_ok(max_len) } } /// Take exactly len bytes from the left of the buffer, returns None if /// the BytesBuf doesn't contain enough data pub fn take_exact(&mut self, len: usize) -> Option<Bytes> { if self.buf_len < len { None } else { Some(self.take_exact_ok(len)) } } fn take_exact_ok(&mut self, len: usize) -> Bytes { assert!(len <= self.buf_len); let front = self.buf.pop_front().unwrap(); match front.len().cmp(&len) { Ordering::Greater => { self.buf.push_front(front.slice(len..)); self.buf_len -= len; front.slice(..len) } Ordering::Equal => { self.buf_len -= len; front } Ordering::Less => { let mut ret = BytesMut::with_capacity(len); ret.extend_from_slice(&front[..]); self.buf_len -= front.len(); while ret.len() < len { let front = self.buf.pop_front().unwrap(); if front.len() > len - ret.len() { let take = len - ret.len(); ret.extend_from_slice(&front[..take]); self.buf.push_front(front.slice(take..)); self.buf_len -= take; break; } else { ret.extend_from_slice(&front[..]); self.buf_len -= front.len(); } } ret.freeze() } } } /// Return the internal sequence of Bytes slices that make up the buffer pub fn into_slices(self) -> VecDeque<Bytes> { self.buf } /// Return the entire buffer concatenated into a single big Bytes pub fn into_bytes(mut self) -> Bytes { self.take_all() } /// Return the content as a stream of individual chunks pub fn into_stream(self) -> ByteStream { use futures::stream::StreamExt; Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x))) } } impl Default for BytesBuf { fn default() -> Self { Self::new() } } impl From<Bytes> for BytesBuf { fn from(b: Bytes) -> BytesBuf { let mut ret = BytesBuf::new(); ret.extend(b); ret } } impl From<BytesBuf> for Bytes { fn from(mut b: BytesBuf) -> Bytes { b.take_all() } } #[cfg(test)] mod test { use super::*; #[test] fn test_bytes_buf() { let mut buf = BytesBuf::new(); assert!(buf.len() == 0); assert!(buf.is_empty()); buf.extend(Bytes::from(b"Hello, world!".to_vec())); assert!(buf.len() == 13); assert!(!buf.is_empty()); buf.extend(Bytes::from(b"1234567890".to_vec())); assert!(buf.len() == 23); assert!(!buf.is_empty()); assert_eq!( buf.take_all(), Bytes::from(b"Hello, world!1234567890".to_vec()) ); assert!(buf.len() == 0); assert!(buf.is_empty()); buf.extend(Bytes::from(b"1234567890".to_vec())); buf.extend(Bytes::from(b"Hello, world!".to_vec())); assert!(buf.len() == 23); assert!(!buf.is_empty()); assert_eq!(buf.take_max(12), Bytes::from(b"1234567890He".to_vec())); assert!(buf.len() == 11); assert_eq!(buf.take_exact(12), None); assert!(buf.len() == 11); assert_eq!( buf.take_exact(11), Some(Bytes::from(b"llo, world!".to_vec())) ); assert!(buf.len() == 0); assert!(buf.is_empty()); } }