diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-01 11:21:24 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-01 11:21:24 +0200 |
commit | 7909a95d3c02a738c9a088c1cb8a5d6f70b06046 (patch) | |
tree | c38b20c9f2f6da6971ceb980bbb7a8be4668b661 /src/recv.rs | |
parent | 263db66fcee65deda39de18baa837228ea38baf1 (diff) | |
download | netapp-7909a95d3c02a738c9a088c1cb8a5d6f70b06046.tar.gz netapp-7909a95d3c02a738c9a088c1cb8a5d6f70b06046.zip |
Stream errors are now std::io::Error
Diffstat (limited to 'src/recv.rs')
-rw-r--r-- | src/recv.rs | 38 |
1 files changed, 21 insertions, 17 deletions
diff --git a/src/recv.rs b/src/recv.rs index 3bea709..f8d68da 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -35,7 +35,7 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { if let Some(inner) = self.inner.take() { - let _ = inner.send(Err(255)); + let _ = inner.send(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Netapp connection dropped before end of stream"))); } } } @@ -76,25 +76,26 @@ pub(crate) trait RecvLoop: Sync + 'static { let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let is_error = (size & ERROR_MARKER) != 0; + let size = (size & CHUNK_LENGTH_MASK) as usize; + let mut next_slice = vec![0; size as usize]; + read.read_exact(&mut next_slice[..]).await?; + let packet = if is_error { - trace!( - "recv_loop: got id {}, header_size {:04x}, error {}", - id, - size, - size & !ERROR_MARKER - ); - Err((size & !ERROR_MARKER) as u8) + let msg = String::from_utf8(next_slice).unwrap_or("<invalid utf8 error message>".into()); + debug!("recv_loop: got id {}, error: {}", id, msg); + Some(Err(std::io::Error::new(std::io::ErrorKind::Other, msg))) } else { - let size = size & !CHUNK_HAS_CONTINUATION; - let mut next_slice = vec![0; size as usize]; - read.read_exact(&mut next_slice[..]).await?; trace!( - "recv_loop: got id {}, header_size {:04x}, {} bytes", + "recv_loop: got id {}, size {}, has_cont {}", id, size, - next_slice.len() + has_cont ); - Ok(Bytes::from(next_slice)) + if !next_slice.is_empty() { + Some(Ok(Bytes::from(next_slice))) + } else { + None + } }; let mut sender = if let Some(send) = streams.remove(&(id)) { @@ -109,9 +110,12 @@ pub(crate) trait RecvLoop: Sync + 'static { Sender::new(send) }; - // If we get an error, the receiving end is disconnected. - // We still need to reach eos before dropping this sender - let _ = sender.send(packet); + if let Some(packet) = packet { + // If we cannot put packet in channel, it means that the + // receiving end of the channel is disconnected. + // We still need to reach eos before dropping this sender + let _ = sender.send(packet); + } if has_cont { assert!(!is_error); |