diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 16:42:26 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-22 16:42:58 +0200 |
commit | ab80ade4f0034cbdcf15a99c674730f85eb06870 (patch) | |
tree | f851cf1957a49ed5a48942f30829845e2ab491ec /src | |
parent | a5e5fd040891c02b1f88bdafdec9e92090094548 (diff) | |
download | netapp-ab80ade4f0034cbdcf15a99c674730f85eb06870.tar.gz netapp-ab80ade4f0034cbdcf15a99c674730f85eb06870.zip |
Conversion between ByteStream and AsyncRead
Diffstat (limited to 'src')
-rw-r--r-- | src/stream.rs | 51 |
1 files changed, 50 insertions, 1 deletions
diff --git a/src/stream.rs b/src/stream.rs index aa7641f..f5607b3 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,7 +5,8 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::Future; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; +use tokio::io::AsyncRead; /// A stream of associated data. /// @@ -18,6 +19,8 @@ pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>; pub type Packet = Result<Bytes, u8>; +// ---- + pub struct ByteStreamReader { stream: ByteStream, buf: VecDeque<Bytes>, @@ -175,3 +178,49 @@ impl<'a> Future for ByteStreamReadExact<'a> { } } } + +// ---- + +fn u8_to_io_error(v: u8) -> std::io::Error { + use std::io::{Error, ErrorKind}; + let kind = match v { + 101 => ErrorKind::ConnectionAborted, + 102 => ErrorKind::BrokenPipe, + 103 => ErrorKind::WouldBlock, + 104 => ErrorKind::InvalidInput, + 105 => ErrorKind::InvalidData, + 106 => ErrorKind::TimedOut, + 107 => ErrorKind::Interrupted, + 108 => ErrorKind::UnexpectedEof, + 109 => ErrorKind::OutOfMemory, + 110 => ErrorKind::ConnectionReset, + _ => ErrorKind::Other, + }; + Error::new(kind, "(in netapp stream)") +} + +fn io_error_to_u8(e: std::io::Error) -> u8 { + use std::io::{ErrorKind}; + match e.kind() { + ErrorKind::ConnectionAborted => 101, + ErrorKind::BrokenPipe => 102, + ErrorKind::WouldBlock => 103, + ErrorKind::InvalidInput => 104, + ErrorKind::InvalidData => 105, + ErrorKind::TimedOut => 106, + ErrorKind::Interrupted => 107, + ErrorKind::UnexpectedEof => 108, + ErrorKind::OutOfMemory => 109, + ErrorKind::ConnectionReset => 110, + _ => 100, + } +} + +pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream { + Box::pin(tokio_util::io::ReaderStream::new(reader) + .map_err(io_error_to_u8)) +} + +pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { + tokio_util::io::StreamReader::new(stream.map_err(u8_to_io_error)) +} |