aboutsummaryrefslogtreecommitdiff
path: root/src/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.rs')
-rw-r--r--src/stream.rs169
1 files changed, 169 insertions, 0 deletions
diff --git a/src/stream.rs b/src/stream.rs
new file mode 100644
index 0000000..05ee051
--- /dev/null
+++ b/src/stream.rs
@@ -0,0 +1,169 @@
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use bytes::Bytes;
+
+use futures::Future;
+use futures::{Stream, StreamExt};
+use tokio::io::AsyncRead;
+
+use crate::bytes_buf::BytesBuf;
+
+/// A stream of associated data.
+///
+/// When sent through Netapp, the Vec may be split in smaller chunk in such a way
+/// consecutive Vec may get merged, but Vec and error code may not be reordered
+///
+/// Error code 255 means the stream was cut before its end. Other codes have no predefined
+/// meaning, it's up to your application to define their semantic.
+pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
+
+pub type Packet = Result<Bytes, std::io::Error>;
+
+// ----
+
+pub struct ByteStreamReader {
+ stream: ByteStream,
+ buf: BytesBuf,
+ eos: bool,
+ err: Option<std::io::Error>,
+}
+
+impl ByteStreamReader {
+ pub fn new(stream: ByteStream) -> Self {
+ ByteStreamReader {
+ stream,
+ buf: BytesBuf::new(),
+ eos: false,
+ err: None,
+ }
+ }
+
+ pub fn read_exact(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
+ ByteStreamReadExact {
+ reader: self,
+ read_len,
+ fail_on_eos: true,
+ }
+ }
+
+ pub fn read_exact_or_eos(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
+ ByteStreamReadExact {
+ reader: self,
+ read_len,
+ fail_on_eos: false,
+ }
+ }
+
+ pub async fn read_u8(&mut self) -> Result<u8, ReadExactError> {
+ Ok(self.read_exact(1).await?[0])
+ }
+
+ pub async fn read_u16(&mut self) -> Result<u16, ReadExactError> {
+ let bytes = self.read_exact(2).await?;
+ let mut b = [0u8; 2];
+ b.copy_from_slice(&bytes[..]);
+ Ok(u16::from_be_bytes(b))
+ }
+
+ pub async fn read_u32(&mut self) -> Result<u32, ReadExactError> {
+ let bytes = self.read_exact(4).await?;
+ let mut b = [0u8; 4];
+ b.copy_from_slice(&bytes[..]);
+ Ok(u32::from_be_bytes(b))
+ }
+
+ pub fn into_stream(self) -> ByteStream {
+ let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok));
+ if let Some(err) = self.err {
+ Box::pin(buf_stream.chain(futures::stream::once(async move { Err(err) })))
+ } else if self.eos {
+ Box::pin(buf_stream)
+ } else {
+ Box::pin(buf_stream.chain(self.stream))
+ }
+ }
+
+ pub fn take_buffer(&mut self) -> Bytes {
+ self.buf.take_all()
+ }
+
+ pub fn eos(&self) -> bool {
+ self.buf.is_empty() && self.eos
+ }
+
+ fn try_get(&mut self, read_len: usize) -> Option<Bytes> {
+ self.buf.take_exact(read_len)
+ }
+
+ fn add_stream_next(&mut self, packet: Option<Packet>) {
+ match packet {
+ Some(Ok(slice)) => {
+ self.buf.extend(slice);
+ }
+ Some(Err(e)) => {
+ self.err = Some(e);
+ self.eos = true;
+ }
+ None => {
+ self.eos = true;
+ }
+ }
+ }
+
+ pub async fn fill_buffer(&mut self) {
+ let packet = self.stream.next().await;
+ self.add_stream_next(packet);
+ }
+}
+
+pub enum ReadExactError {
+ UnexpectedEos,
+ Stream(std::io::Error),
+}
+
+#[pin_project::pin_project]
+pub struct ByteStreamReadExact<'a> {
+ #[pin]
+ reader: &'a mut ByteStreamReader,
+ read_len: usize,
+ fail_on_eos: bool,
+}
+
+impl<'a> Future for ByteStreamReadExact<'a> {
+ type Output = Result<Bytes, ReadExactError>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes, ReadExactError>> {
+ let mut this = self.project();
+
+ loop {
+ if let Some(bytes) = this.reader.try_get(*this.read_len) {
+ return Poll::Ready(Ok(bytes));
+ }
+ if let Some(err) = &this.reader.err {
+ let err = std::io::Error::new(err.kind(), format!("{}", err));
+ return Poll::Ready(Err(ReadExactError::Stream(err)));
+ }
+ if this.reader.eos {
+ if *this.fail_on_eos {
+ return Poll::Ready(Err(ReadExactError::UnexpectedEos));
+ } else {
+ return Poll::Ready(Ok(this.reader.take_buffer()));
+ }
+ }
+
+ let next_packet = futures::ready!(this.reader.stream.as_mut().poll_next(cx));
+ this.reader.add_stream_next(next_packet);
+ }
+ }
+}
+
+// ----
+
+pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
+ Box::pin(tokio_util::io::ReaderStream::new(reader))
+}
+
+pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
+ tokio_util::io::StreamReader::new(stream)
+}