aboutsummaryrefslogblamecommitdiff
path: root/src/stream.rs
blob: 6e00e5f1ac035642a9822baeb383fc5cf1b6c8f2 (plain) (tree)
1
2
3
4
5
6
7
8
9





                               
                                 
                         
 

                               








                                                                                          
                                                
 

       

                             
                      
                  
                                    





                                                
                                             







































                                                                                         
                                                                                                   








                                                                                                  
                                                
                                   


                                   
                                               

         
                                                                 
                                             




                         
                               



















                                                                                                    

                                                                                              





                                                                                               
                                                                                          




                                                                                          
                                                                      











                                                                  


       

                                                                                        
                                                           


                                                                                       
                                                 
 
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;

use futures::Future;
use futures::{Stream, StreamExt};
use tokio::io::AsyncRead;

use crate::bytes_buf::BytesBuf;

/// A stream of associated data.
///
/// When sent through Netapp, the Vec may be split in smaller chunk in such a way
/// consecutive Vec may get merged, but Vec and error code may not be reordered
///
/// Error code 255 means the stream was cut before its end. Other codes have no predefined
/// meaning, it's up to your application to define their semantic.
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;

pub type Packet = Result<Bytes, std::io::Error>;

// ----

pub struct ByteStreamReader {
	stream: ByteStream,
	buf: BytesBuf,
	eos: bool,
	err: Option<std::io::Error>,
}

impl ByteStreamReader {
	pub fn new(stream: ByteStream) -> Self {
		ByteStreamReader {
			stream,
			buf: BytesBuf::new(),
			eos: false,
			err: None,
		}
	}

	pub fn read_exact(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
		ByteStreamReadExact {
			reader: self,
			read_len,
			fail_on_eos: true,
		}
	}

	pub fn read_exact_or_eos(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
		ByteStreamReadExact {
			reader: self,
			read_len,
			fail_on_eos: false,
		}
	}

	pub async fn read_u8(&mut self) -> Result<u8, ReadExactError> {
		Ok(self.read_exact(1).await?[0])
	}

	pub async fn read_u16(&mut self) -> Result<u16, ReadExactError> {
		let bytes = self.read_exact(2).await?;
		let mut b = [0u8; 2];
		b.copy_from_slice(&bytes[..]);
		Ok(u16::from_be_bytes(b))
	}

	pub async fn read_u32(&mut self) -> Result<u32, ReadExactError> {
		let bytes = self.read_exact(4).await?;
		let mut b = [0u8; 4];
		b.copy_from_slice(&bytes[..]);
		Ok(u32::from_be_bytes(b))
	}

	pub fn into_stream(self) -> ByteStream {
		let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok));
		if let Some(err) = self.err {
			Box::pin(buf_stream.chain(futures::stream::once(async move { Err(err) })))
		} else if self.eos {
			Box::pin(buf_stream)
		} else {
			Box::pin(buf_stream.chain(self.stream))
		}
	}

	pub fn take_buffer(&mut self) -> Bytes {
		self.buf.take_all()
	}

	pub fn eos(&self) -> bool {
		self.buf.is_empty() && self.eos
	}

	fn try_get(&mut self, read_len: usize) -> Option<Bytes> {
		self.buf.take_exact(read_len)
	}
}

pub enum ReadExactError {
	UnexpectedEos,
	Stream(std::io::Error),
}

#[pin_project::pin_project]
pub struct ByteStreamReadExact<'a> {
	#[pin]
	reader: &'a mut ByteStreamReader,
	read_len: usize,
	fail_on_eos: bool,
}

impl<'a> Future for ByteStreamReadExact<'a> {
	type Output = Result<Bytes, ReadExactError>;

	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Bytes, ReadExactError>> {
		let mut this = self.project();

		loop {
			if let Some(bytes) = this.reader.try_get(*this.read_len) {
				return Poll::Ready(Ok(bytes));
			}
			if let Some(err) = &this.reader.err {
				let err = std::io::Error::new(err.kind(), format!("{}", err));
				return Poll::Ready(Err(ReadExactError::Stream(err)));
			}
			if this.reader.eos {
				if *this.fail_on_eos {
					return Poll::Ready(Err(ReadExactError::UnexpectedEos));
				} else {
					return Poll::Ready(Ok(this.reader.take_buffer()));
				}
			}

			match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) {
				Some(Ok(slice)) => {
					this.reader.buf.extend(slice);
				}
				Some(Err(e)) => {
					this.reader.err = Some(e);
					this.reader.eos = true;
				}
				None => {
					this.reader.eos = true;
				}
			}
		}
	}
}

// ----


pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
	Box::pin(tokio_util::io::ReaderStream::new(reader))
}

pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
	tokio_util::io::StreamReader::new(stream)
}