diff options
Diffstat (limited to 'src/stream.rs')
-rw-r--r-- | src/stream.rs | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/src/stream.rs b/src/stream.rs index cc664ce..3518246 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -4,7 +4,7 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::Future; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt}; use tokio::io::AsyncRead; use crate::bytes_buf::BytesBuf; @@ -18,7 +18,7 @@ use crate::bytes_buf::BytesBuf; /// meaning, it's up to your application to define their semantic. pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>; -pub type Packet = Result<Bytes, u8>; +pub type Packet = Result<Bytes, std::io::Error>; // ---- @@ -26,7 +26,7 @@ pub struct ByteStreamReader { stream: ByteStream, buf: BytesBuf, eos: bool, - err: Option<u8>, + err: Option<std::io::Error>, } impl ByteStreamReader { @@ -99,7 +99,7 @@ impl ByteStreamReader { pub enum ReadExactError { UnexpectedEos, - Stream(u8), + Stream(std::io::Error), } #[pin_project::pin_project] @@ -120,7 +120,8 @@ impl<'a> Future for ByteStreamReadExact<'a> { if let Some(bytes) = this.reader.try_get(*this.read_len) { return Poll::Ready(Ok(bytes)); } - if let Some(err) = this.reader.err { + if let Some(err) = &this.reader.err { + let err = std::io::Error::new(err.kind(), format!("{}", err)); return Poll::Ready(Err(ReadExactError::Stream(err))); } if this.reader.eos { @@ -149,6 +150,7 @@ impl<'a> Future for ByteStreamReadExact<'a> { // ---- +/* fn u8_to_io_error(v: u8) -> std::io::Error { use std::io::{Error, ErrorKind}; let kind = match v { @@ -183,11 +185,12 @@ fn io_error_to_u8(e: std::io::Error) -> u8 { _ => 100, } } +*/ pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream { - Box::pin(tokio_util::io::ReaderStream::new(reader).map_err(io_error_to_u8)) + Box::pin(tokio_util::io::ReaderStream::new(reader)) } pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { - tokio_util::io::StreamReader::new(stream.map_err(u8_to_io_error)) + tokio_util::io::StreamReader::new(stream) } |