aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 16:42:26 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-22 16:42:58 +0200
commitab80ade4f0034cbdcf15a99c674730f85eb06870 (patch)
treef851cf1957a49ed5a48942f30829845e2ab491ec
parenta5e5fd040891c02b1f88bdafdec9e92090094548 (diff)
downloadnetapp-ab80ade4f0034cbdcf15a99c674730f85eb06870.tar.gz
netapp-ab80ade4f0034cbdcf15a99c674730f85eb06870.zip
Conversion between ByteStream and AsyncRead
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml2
-rw-r--r--src/stream.rs51
3 files changed, 52 insertions, 3 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 2312190..692b2e0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -449,7 +449,7 @@ dependencies = [
[[package]]
name = "netapp"
-version = "0.4.4"
+version = "0.5.0"
dependencies = [
"arc-swap",
"async-trait",
diff --git a/Cargo.toml b/Cargo.toml
index dd05d8c..377e09d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,7 +23,7 @@ telemetry = ["opentelemetry", "opentelemetry-contrib", "rand"]
futures = "0.3.17"
pin-project = "1.0.10"
tokio = { version = "1.0", default-features = false, features = ["net", "rt", "rt-multi-thread", "sync", "time", "macros", "io-util", "signal"] }
-tokio-util = { version = "0.6.8", default-features = false, features = ["compat"] }
+tokio-util = { version = "0.6.8", default-features = false, features = ["compat", "io"] }
tokio-stream = "0.1.7"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
diff --git a/src/stream.rs b/src/stream.rs
index aa7641f..f5607b3 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -5,7 +5,8 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures::Future;
-use futures::{Stream, StreamExt};
+use futures::{Stream, StreamExt, TryStreamExt};
+use tokio::io::AsyncRead;
/// A stream of associated data.
///
@@ -18,6 +19,8 @@ pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
pub type Packet = Result<Bytes, u8>;
+// ----
+
pub struct ByteStreamReader {
stream: ByteStream,
buf: VecDeque<Bytes>,
@@ -175,3 +178,49 @@ impl<'a> Future for ByteStreamReadExact<'a> {
}
}
}
+
+// ----
+
+fn u8_to_io_error(v: u8) -> std::io::Error {
+ use std::io::{Error, ErrorKind};
+ let kind = match v {
+ 101 => ErrorKind::ConnectionAborted,
+ 102 => ErrorKind::BrokenPipe,
+ 103 => ErrorKind::WouldBlock,
+ 104 => ErrorKind::InvalidInput,
+ 105 => ErrorKind::InvalidData,
+ 106 => ErrorKind::TimedOut,
+ 107 => ErrorKind::Interrupted,
+ 108 => ErrorKind::UnexpectedEof,
+ 109 => ErrorKind::OutOfMemory,
+ 110 => ErrorKind::ConnectionReset,
+ _ => ErrorKind::Other,
+ };
+ Error::new(kind, "(in netapp stream)")
+}
+
+fn io_error_to_u8(e: std::io::Error) -> u8 {
+ use std::io::{ErrorKind};
+ match e.kind() {
+ ErrorKind::ConnectionAborted => 101,
+ ErrorKind::BrokenPipe => 102,
+ ErrorKind::WouldBlock => 103,
+ ErrorKind::InvalidInput => 104,
+ ErrorKind::InvalidData => 105,
+ ErrorKind::TimedOut => 106,
+ ErrorKind::Interrupted => 107,
+ ErrorKind::UnexpectedEof => 108,
+ ErrorKind::OutOfMemory => 109,
+ ErrorKind::ConnectionReset => 110,
+ _ => 100,
+ }
+}
+
+pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
+ Box::pin(tokio_util::io::ReaderStream::new(reader)
+ .map_err(io_error_to_u8))
+}
+
+pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
+ tokio_util::io::StreamReader::new(stream.map_err(u8_to_io_error))
+}