aboutsummaryrefslogtreecommitdiff
path: root/src/recv.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/recv.rs')
-rw-r--r--src/recv.rs38
1 files changed, 21 insertions, 17 deletions
diff --git a/src/recv.rs b/src/recv.rs
index 3bea709..f8d68da 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -35,7 +35,7 @@ impl Sender {
impl Drop for Sender {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
- let _ = inner.send(Err(255));
+ let _ = inner.send(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Netapp connection dropped before end of stream")));
}
}
}
@@ -76,25 +76,26 @@ pub(crate) trait RecvLoop: Sync + 'static {
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
let is_error = (size & ERROR_MARKER) != 0;
+ let size = (size & CHUNK_LENGTH_MASK) as usize;
+ let mut next_slice = vec![0; size as usize];
+ read.read_exact(&mut next_slice[..]).await?;
+
let packet = if is_error {
- trace!(
- "recv_loop: got id {}, header_size {:04x}, error {}",
- id,
- size,
- size & !ERROR_MARKER
- );
- Err((size & !ERROR_MARKER) as u8)
+ let msg = String::from_utf8(next_slice).unwrap_or("<invalid utf8 error message>".into());
+ debug!("recv_loop: got id {}, error: {}", id, msg);
+ Some(Err(std::io::Error::new(std::io::ErrorKind::Other, msg)))
} else {
- let size = size & !CHUNK_HAS_CONTINUATION;
- let mut next_slice = vec![0; size as usize];
- read.read_exact(&mut next_slice[..]).await?;
trace!(
- "recv_loop: got id {}, header_size {:04x}, {} bytes",
+ "recv_loop: got id {}, size {}, has_cont {}",
id,
size,
- next_slice.len()
+ has_cont
);
- Ok(Bytes::from(next_slice))
+ if !next_slice.is_empty() {
+ Some(Ok(Bytes::from(next_slice)))
+ } else {
+ None
+ }
};
let mut sender = if let Some(send) = streams.remove(&(id)) {
@@ -109,9 +110,12 @@ pub(crate) trait RecvLoop: Sync + 'static {
Sender::new(send)
};
- // If we get an error, the receiving end is disconnected.
- // We still need to reach eos before dropping this sender
- let _ = sender.send(packet);
+ if let Some(packet) = packet {
+ // If we cannot put packet in channel, it means that the
+ // receiving end of the channel is disconnected.
+ // We still need to reach eos before dropping this sender
+ let _ = sender.send(packet);
+ }
if has_cont {
assert!(!is_error);