diff options
Diffstat (limited to 'src/stream.rs')
-rw-r--r-- | src/stream.rs | 47 |
1 files changed, 39 insertions, 8 deletions
diff --git a/src/stream.rs b/src/stream.rs index 05ee051..82f7be3 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -9,19 +9,23 @@ use tokio::io::AsyncRead; use crate::bytes_buf::BytesBuf; -/// A stream of associated data. +/// A stream of bytes (click to read more). /// /// When sent through Netapp, the Vec may be split in smaller chunk in such a way /// consecutive Vec may get merged, but Vec and error code may not be reordered /// -/// Error code 255 means the stream was cut before its end. Other codes have no predefined -/// meaning, it's up to your application to define their semantic. +/// Items sent in the ByteStream may be errors of type `std::io::Error`. +/// An error indicates the end of the ByteStream: a reader should no longer read +/// after recieving an error, and a writer should stop writing after sending an error. pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>; +/// A packet sent in a ByteStream, which may contain either +/// a Bytes object or an error pub type Packet = Result<Bytes, std::io::Error>; // ---- +/// A helper struct to read defined lengths of data from a BytesStream pub struct ByteStreamReader { stream: ByteStream, buf: BytesBuf, @@ -30,6 +34,7 @@ pub struct ByteStreamReader { } impl ByteStreamReader { + /// Creates a new `ByteStreamReader` from a `ByteStream` pub fn new(stream: ByteStream) -> Self { ByteStreamReader { stream, @@ -39,6 +44,8 @@ impl ByteStreamReader { } } + /// Read exactly `read_len` bytes from the underlying stream + /// (returns a future) pub fn read_exact(&mut self, read_len: usize) -> ByteStreamReadExact<'_> { ByteStreamReadExact { reader: self, @@ -47,6 +54,8 @@ impl ByteStreamReader { } } + /// Read at most `read_len` bytes from the underlying stream, or less + /// if the end of the stream is reached (returns a future) pub fn read_exact_or_eos(&mut self, read_len: usize) -> ByteStreamReadExact<'_> { ByteStreamReadExact { reader: self, @@ -55,10 +64,14 @@ impl ByteStreamReader { } } + /// Read exactly one byte from the underlying stream and returns it + /// as an u8 pub async fn read_u8(&mut self) -> Result<u8, ReadExactError> { Ok(self.read_exact(1).await?[0]) } + /// Read exactly two bytes from the underlying stream and returns them as an u16 (using + /// big-endian decoding) pub async fn read_u16(&mut self) -> Result<u16, ReadExactError> { let bytes = self.read_exact(2).await?; let mut b = [0u8; 2]; @@ -66,6 +79,8 @@ impl ByteStreamReader { Ok(u16::from_be_bytes(b)) } + /// Read exactly four bytes from the underlying stream and returns them as an u32 (using + /// big-endian decoding) pub async fn read_u32(&mut self) -> Result<u32, ReadExactError> { let bytes = self.read_exact(4).await?; let mut b = [0u8; 4]; @@ -73,6 +88,8 @@ impl ByteStreamReader { Ok(u32::from_be_bytes(b)) } + /// Transforms the stream reader back into the underlying stream (starting + /// after everything that the reader has read) pub fn into_stream(self) -> ByteStream { let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok)); if let Some(err) = self.err { @@ -84,10 +101,21 @@ impl ByteStreamReader { } } + /// Tries to fill the internal read buffer from the underlying stream. + /// Calling this might be necessary to ensure that `.eos()` returns a correct + /// result, otherwise the reader might not be aware that the underlying + /// stream has nothing left to return. + pub async fn fill_buffer(&mut self) { + let packet = self.stream.next().await; + self.add_stream_next(packet); + } + + /// Clears the internal read buffer and returns its content pub fn take_buffer(&mut self) -> Bytes { self.buf.take_all() } + /// Returns true if the end of the underlying stream has been reached pub fn eos(&self) -> bool { self.buf.is_empty() && self.eos } @@ -110,18 +138,19 @@ impl ByteStreamReader { } } } - - pub async fn fill_buffer(&mut self) { - let packet = self.stream.next().await; - self.add_stream_next(packet); - } } +/// The error kind that can be returned by `ByteStreamReader::read_exact` and +/// `ByteStreamReader::read_exact_or_eos` pub enum ReadExactError { + /// The end of the stream was reached before the requested number of bytes could be read UnexpectedEos, + /// The underlying data stream returned an IO error when trying to read Stream(std::io::Error), } +/// The future returned by `ByteStreamReader::read_exact` and +/// `ByteStreamReader::read_exact_or_eos` #[pin_project::pin_project] pub struct ByteStreamReadExact<'a> { #[pin] @@ -160,10 +189,12 @@ impl<'a> Future for ByteStreamReadExact<'a> { // ---- +/// Turns a `tokio::io::AsyncRead` asynchronous reader into a `ByteStream` pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream { Box::pin(tokio_util::io::ReaderStream::new(reader)) } +/// Turns a `ByteStream` into a `tokio::io::AsyncRead` asynchronous reader pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { tokio_util::io::StreamReader::new(stream) } |