diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 16:42:26 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-22 16:42:58 +0200 |
commit | ab80ade4f0034cbdcf15a99c674730f85eb06870 (patch) | |
tree | f851cf1957a49ed5a48942f30829845e2ab491ec | |
parent | a5e5fd040891c02b1f88bdafdec9e92090094548 (diff) | |
download | netapp-ab80ade4f0034cbdcf15a99c674730f85eb06870.tar.gz netapp-ab80ade4f0034cbdcf15a99c674730f85eb06870.zip |
Conversion between ByteStream and AsyncRead
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/stream.rs | 51 |
3 files changed, 52 insertions, 3 deletions
@@ -449,7 +449,7 @@ dependencies = [ [[package]] name = "netapp" -version = "0.4.4" +version = "0.5.0" dependencies = [ "arc-swap", "async-trait", @@ -23,7 +23,7 @@ telemetry = ["opentelemetry", "opentelemetry-contrib", "rand"] futures = "0.3.17" pin-project = "1.0.10" tokio = { version = "1.0", default-features = false, features = ["net", "rt", "rt-multi-thread", "sync", "time", "macros", "io-util", "signal"] } -tokio-util = { version = "0.6.8", default-features = false, features = ["compat"] } +tokio-util = { version = "0.6.8", default-features = false, features = ["compat", "io"] } tokio-stream = "0.1.7" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/stream.rs b/src/stream.rs index aa7641f..f5607b3 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,7 +5,8 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::Future; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; +use tokio::io::AsyncRead; /// A stream of associated data. /// @@ -18,6 +19,8 @@ pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>; pub type Packet = Result<Bytes, u8>; +// ---- + pub struct ByteStreamReader { stream: ByteStream, buf: VecDeque<Bytes>, @@ -175,3 +178,49 @@ impl<'a> Future for ByteStreamReadExact<'a> { } } } + +// ---- + +fn u8_to_io_error(v: u8) -> std::io::Error { + use std::io::{Error, ErrorKind}; + let kind = match v { + 101 => ErrorKind::ConnectionAborted, + 102 => ErrorKind::BrokenPipe, + 103 => ErrorKind::WouldBlock, + 104 => ErrorKind::InvalidInput, + 105 => ErrorKind::InvalidData, + 106 => ErrorKind::TimedOut, + 107 => ErrorKind::Interrupted, + 108 => ErrorKind::UnexpectedEof, + 109 => ErrorKind::OutOfMemory, + 110 => ErrorKind::ConnectionReset, + _ => ErrorKind::Other, + }; + Error::new(kind, "(in netapp stream)") +} + +fn io_error_to_u8(e: std::io::Error) -> u8 { + use std::io::{ErrorKind}; + match e.kind() { + ErrorKind::ConnectionAborted => 101, + ErrorKind::BrokenPipe => 102, + ErrorKind::WouldBlock => 103, + ErrorKind::InvalidInput => 104, + ErrorKind::InvalidData => 105, + ErrorKind::TimedOut => 106, + ErrorKind::Interrupted => 107, + ErrorKind::UnexpectedEof => 108, + ErrorKind::OutOfMemory => 109, + ErrorKind::ConnectionReset => 110, + _ => 100, + } +} + +pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream { + Box::pin(tokio_util::io::ReaderStream::new(reader) + .map_err(io_error_to_u8)) +} + +pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { + tokio_util::io::StreamReader::new(stream.map_err(u8_to_io_error)) +} |