aboutsummaryrefslogtreecommitdiff
path: root/src/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.rs')
-rw-r--r--src/stream.rs17
1 files changed, 10 insertions, 7 deletions
diff --git a/src/stream.rs b/src/stream.rs
index cc664ce..3518246 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -4,7 +4,7 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures::Future;
-use futures::{Stream, StreamExt, TryStreamExt};
+use futures::{Stream, StreamExt};
use tokio::io::AsyncRead;
use crate::bytes_buf::BytesBuf;
@@ -18,7 +18,7 @@ use crate::bytes_buf::BytesBuf;
/// meaning, it's up to your application to define their semantic.
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
-pub type Packet = Result<Bytes, u8>;
+pub type Packet = Result<Bytes, std::io::Error>;
// ----
@@ -26,7 +26,7 @@ pub struct ByteStreamReader {
stream: ByteStream,
buf: BytesBuf,
eos: bool,
- err: Option<u8>,
+ err: Option<std::io::Error>,
}
impl ByteStreamReader {
@@ -99,7 +99,7 @@ impl ByteStreamReader {
pub enum ReadExactError {
UnexpectedEos,
- Stream(u8),
+ Stream(std::io::Error),
}
#[pin_project::pin_project]
@@ -120,7 +120,8 @@ impl<'a> Future for ByteStreamReadExact<'a> {
if let Some(bytes) = this.reader.try_get(*this.read_len) {
return Poll::Ready(Ok(bytes));
}
- if let Some(err) = this.reader.err {
+ if let Some(err) = &this.reader.err {
+ let err = std::io::Error::new(err.kind(), format!("{}", err));
return Poll::Ready(Err(ReadExactError::Stream(err)));
}
if this.reader.eos {
@@ -149,6 +150,7 @@ impl<'a> Future for ByteStreamReadExact<'a> {
// ----
+/*
fn u8_to_io_error(v: u8) -> std::io::Error {
use std::io::{Error, ErrorKind};
let kind = match v {
@@ -183,11 +185,12 @@ fn io_error_to_u8(e: std::io::Error) -> u8 {
_ => 100,
}
}
+*/
pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
- Box::pin(tokio_util::io::ReaderStream::new(reader).map_err(io_error_to_u8))
+ Box::pin(tokio_util::io::ReaderStream::new(reader))
}
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
- tokio_util::io::StreamReader::new(stream.map_err(u8_to_io_error))
+ tokio_util::io::StreamReader::new(stream)
}