aboutsummaryrefslogtreecommitdiff
path: root/src/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.rs')
-rw-r--r--src/stream.rs47
1 files changed, 39 insertions, 8 deletions
diff --git a/src/stream.rs b/src/stream.rs
index 05ee051..82f7be3 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -9,19 +9,23 @@ use tokio::io::AsyncRead;
use crate::bytes_buf::BytesBuf;
-/// A stream of associated data.
+/// A stream of bytes (click to read more).
///
/// When sent through Netapp, the Vec may be split in smaller chunk in such a way
/// consecutive Vec may get merged, but Vec and error code may not be reordered
///
-/// Error code 255 means the stream was cut before its end. Other codes have no predefined
-/// meaning, it's up to your application to define their semantic.
+/// Items sent in the ByteStream may be errors of type `std::io::Error`.
+/// An error indicates the end of the ByteStream: a reader should no longer read
+/// after recieving an error, and a writer should stop writing after sending an error.
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
+/// A packet sent in a ByteStream, which may contain either
+/// a Bytes object or an error
pub type Packet = Result<Bytes, std::io::Error>;
// ----
+/// A helper struct to read defined lengths of data from a BytesStream
pub struct ByteStreamReader {
stream: ByteStream,
buf: BytesBuf,
@@ -30,6 +34,7 @@ pub struct ByteStreamReader {
}
impl ByteStreamReader {
+ /// Creates a new `ByteStreamReader` from a `ByteStream`
pub fn new(stream: ByteStream) -> Self {
ByteStreamReader {
stream,
@@ -39,6 +44,8 @@ impl ByteStreamReader {
}
}
+ /// Read exactly `read_len` bytes from the underlying stream
+ /// (returns a future)
pub fn read_exact(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
ByteStreamReadExact {
reader: self,
@@ -47,6 +54,8 @@ impl ByteStreamReader {
}
}
+ /// Read at most `read_len` bytes from the underlying stream, or less
+ /// if the end of the stream is reached (returns a future)
pub fn read_exact_or_eos(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
ByteStreamReadExact {
reader: self,
@@ -55,10 +64,14 @@ impl ByteStreamReader {
}
}
+ /// Read exactly one byte from the underlying stream and returns it
+ /// as an u8
pub async fn read_u8(&mut self) -> Result<u8, ReadExactError> {
Ok(self.read_exact(1).await?[0])
}
+ /// Read exactly two bytes from the underlying stream and returns them as an u16 (using
+ /// big-endian decoding)
pub async fn read_u16(&mut self) -> Result<u16, ReadExactError> {
let bytes = self.read_exact(2).await?;
let mut b = [0u8; 2];
@@ -66,6 +79,8 @@ impl ByteStreamReader {
Ok(u16::from_be_bytes(b))
}
+ /// Read exactly four bytes from the underlying stream and returns them as an u32 (using
+ /// big-endian decoding)
pub async fn read_u32(&mut self) -> Result<u32, ReadExactError> {
let bytes = self.read_exact(4).await?;
let mut b = [0u8; 4];
@@ -73,6 +88,8 @@ impl ByteStreamReader {
Ok(u32::from_be_bytes(b))
}
+ /// Transforms the stream reader back into the underlying stream (starting
+ /// after everything that the reader has read)
pub fn into_stream(self) -> ByteStream {
let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok));
if let Some(err) = self.err {
@@ -84,10 +101,21 @@ impl ByteStreamReader {
}
}
+ /// Tries to fill the internal read buffer from the underlying stream.
+ /// Calling this might be necessary to ensure that `.eos()` returns a correct
+ /// result, otherwise the reader might not be aware that the underlying
+ /// stream has nothing left to return.
+ pub async fn fill_buffer(&mut self) {
+ let packet = self.stream.next().await;
+ self.add_stream_next(packet);
+ }
+
+ /// Clears the internal read buffer and returns its content
pub fn take_buffer(&mut self) -> Bytes {
self.buf.take_all()
}
+ /// Returns true if the end of the underlying stream has been reached
pub fn eos(&self) -> bool {
self.buf.is_empty() && self.eos
}
@@ -110,18 +138,19 @@ impl ByteStreamReader {
}
}
}
-
- pub async fn fill_buffer(&mut self) {
- let packet = self.stream.next().await;
- self.add_stream_next(packet);
- }
}
+/// The error kind that can be returned by `ByteStreamReader::read_exact` and
+/// `ByteStreamReader::read_exact_or_eos`
pub enum ReadExactError {
+ /// The end of the stream was reached before the requested number of bytes could be read
UnexpectedEos,
+ /// The underlying data stream returned an IO error when trying to read
Stream(std::io::Error),
}
+/// The future returned by `ByteStreamReader::read_exact` and
+/// `ByteStreamReader::read_exact_or_eos`
#[pin_project::pin_project]
pub struct ByteStreamReadExact<'a> {
#[pin]
@@ -160,10 +189,12 @@ impl<'a> Future for ByteStreamReadExact<'a> {
// ----
+/// Turns a `tokio::io::AsyncRead` asynchronous reader into a `ByteStream`
pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
Box::pin(tokio_util::io::ReaderStream::new(reader))
}
+/// Turns a `ByteStream` into a `tokio::io::AsyncRead` asynchronous reader
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
tokio_util::io::StreamReader::new(stream)
}