aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-01 11:34:53 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-01 11:34:53 +0200
commit745c78618479c4177647e4d7fed97d5fd2d00d4f (patch)
tree0f1d0dfa4bda8c4ae286ec12d726e03a55d7c2ec
parent7909a95d3c02a738c9a088c1cb8a5d6f70b06046 (diff)
downloadnetapp-745c78618479c4177647e4d7fed97d5fd2d00d4f.tar.gz
netapp-745c78618479c4177647e4d7fed97d5fd2d00d4f.zip
Also encode errorkind in stream
-rw-r--r--src/error.rs36
-rw-r--r--src/recv.rs7
-rw-r--r--src/send.rs23
-rw-r--r--src/stream.rs36
4 files changed, 56 insertions, 46 deletions
diff --git a/src/error.rs b/src/error.rs
index f374341..2fa4594 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -109,3 +109,39 @@ where
}
}
}
+
+// ---- Helpers for serializing I/O Errors
+
+pub(crate) fn u8_to_io_errorkind(v: u8) -> std::io::ErrorKind {
+ use std::io::ErrorKind;
+ match v {
+ 101 => ErrorKind::ConnectionAborted,
+ 102 => ErrorKind::BrokenPipe,
+ 103 => ErrorKind::WouldBlock,
+ 104 => ErrorKind::InvalidInput,
+ 105 => ErrorKind::InvalidData,
+ 106 => ErrorKind::TimedOut,
+ 107 => ErrorKind::Interrupted,
+ 108 => ErrorKind::UnexpectedEof,
+ 109 => ErrorKind::OutOfMemory,
+ 110 => ErrorKind::ConnectionReset,
+ _ => ErrorKind::Other,
+ }
+}
+
+pub(crate) fn io_errorkind_to_u8(kind: std::io::ErrorKind) -> u8 {
+ use std::io::ErrorKind;
+ match kind {
+ ErrorKind::ConnectionAborted => 101,
+ ErrorKind::BrokenPipe => 102,
+ ErrorKind::WouldBlock => 103,
+ ErrorKind::InvalidInput => 104,
+ ErrorKind::InvalidData => 105,
+ ErrorKind::TimedOut => 106,
+ ErrorKind::Interrupted => 107,
+ ErrorKind::UnexpectedEof => 108,
+ ErrorKind::OutOfMemory => 109,
+ ErrorKind::ConnectionReset => 110,
+ _ => 100,
+ }
+}
diff --git a/src/recv.rs b/src/recv.rs
index f8d68da..f8606f3 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -81,9 +81,10 @@ pub(crate) trait RecvLoop: Sync + 'static {
read.read_exact(&mut next_slice[..]).await?;
let packet = if is_error {
- let msg = String::from_utf8(next_slice).unwrap_or("<invalid utf8 error message>".into());
- debug!("recv_loop: got id {}, error: {}", id, msg);
- Some(Err(std::io::Error::new(std::io::ErrorKind::Other, msg)))
+ let kind = u8_to_io_errorkind(next_slice[0]);
+ let msg = std::str::from_utf8(&next_slice[1..]).unwrap_or("<invalid utf8 error message>");
+ debug!("recv_loop: got id {}, error {:?}: {}", id, kind, msg);
+ Some(Err(std::io::Error::new(kind, msg.to_string())))
} else {
trace!(
"recv_loop: got id {}, size {}, has_cont {}",
diff --git a/src/send.rs b/src/send.rs
index f362962..287fe40 100644
--- a/src/send.rs
+++ b/src/send.rs
@@ -4,7 +4,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut, BufMut};
use log::*;
use futures::AsyncWriteExt;
@@ -22,7 +22,11 @@ use crate::stream::*;
// CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream
// ERROR_MARKER if this chunk denotes an error
// (these two flags are exclusive, an error denotes the end of the stream)
-// - [u8; chunk_length] chunk data / error message
+// - [u8; chunk_length], either
+// - if not error: chunk data
+// - if error:
+// - u8: error kind, encoded using error::io_errorkind_to_u8
+// - rest: error message
pub(crate) type RequestID = u32;
pub(crate) type ChunkLength = u16;
@@ -136,12 +140,17 @@ impl DataFrame {
Self::Data(bytes, has_cont)
}
Err(e) => {
- let msg = format!("{}", e);
- let mut msg = Bytes::from(msg.into_bytes());
- if msg.len() > MAX_CHUNK_LENGTH as usize {
- msg = msg.slice(..MAX_CHUNK_LENGTH as usize);
+ let mut buf = BytesMut::new();
+ buf.put_u8(io_errorkind_to_u8(e.kind()));
+
+ let msg = format!("{}", e).into_bytes();
+ if msg.len() > (MAX_CHUNK_LENGTH - 1) as usize {
+ buf.put(&msg[..(MAX_CHUNK_LENGTH - 1) as usize]);
+ } else {
+ buf.put(&msg[..]);
}
- Self::Error(msg)
+
+ Self::Error(buf.freeze())
}
}
}
diff --git a/src/stream.rs b/src/stream.rs
index 3518246..6e00e5f 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -150,42 +150,6 @@ impl<'a> Future for ByteStreamReadExact<'a> {
// ----
-/*
-fn u8_to_io_error(v: u8) -> std::io::Error {
- use std::io::{Error, ErrorKind};
- let kind = match v {
- 101 => ErrorKind::ConnectionAborted,
- 102 => ErrorKind::BrokenPipe,
- 103 => ErrorKind::WouldBlock,
- 104 => ErrorKind::InvalidInput,
- 105 => ErrorKind::InvalidData,
- 106 => ErrorKind::TimedOut,
- 107 => ErrorKind::Interrupted,
- 108 => ErrorKind::UnexpectedEof,
- 109 => ErrorKind::OutOfMemory,
- 110 => ErrorKind::ConnectionReset,
- _ => ErrorKind::Other,
- };
- Error::new(kind, "(in netapp stream)")
-}
-
-fn io_error_to_u8(e: std::io::Error) -> u8 {
- use std::io::ErrorKind;
- match e.kind() {
- ErrorKind::ConnectionAborted => 101,
- ErrorKind::BrokenPipe => 102,
- ErrorKind::WouldBlock => 103,
- ErrorKind::InvalidInput => 104,
- ErrorKind::InvalidData => 105,
- ErrorKind::TimedOut => 106,
- ErrorKind::Interrupted => 107,
- ErrorKind::UnexpectedEof => 108,
- ErrorKind::OutOfMemory => 109,
- ErrorKind::ConnectionReset => 110,
- _ => 100,
- }
-}
-*/
pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
Box::pin(tokio_util::io::ReaderStream::new(reader))