diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-01 15:54:11 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-01 15:54:11 +0200 |
commit | 522f420e2bf30d5ef6f50dccb88adf86882ac7c6 (patch) | |
tree | a4a4085e8bdf9d3699bba96d4350bfe2039290e5 /src/recv.rs | |
parent | 32925667385db9e1d9e56ebae67d03d8096f7c46 (diff) | |
download | netapp-522f420e2bf30d5ef6f50dccb88adf86882ac7c6.tar.gz netapp-522f420e2bf30d5ef6f50dccb88adf86882ac7c6.zip |
Implement request cancellation
Diffstat (limited to 'src/recv.rs')
-rw-r--r-- | src/recv.rs | 18 |
1 files changed, 17 insertions, 1 deletions
diff --git a/src/recv.rs b/src/recv.rs index ac93c4b..8909190 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -53,6 +53,7 @@ impl Drop for Sender { #[async_trait] pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream); + fn cancel_handler(self: &Arc<Self>, _id: RequestID) {} async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error> where @@ -78,6 +79,18 @@ pub(crate) trait RecvLoop: Sync + 'static { read.read_exact(&mut header_size[..]).await?; let size = ChunkLength::from_be_bytes(header_size); + if size == CANCEL_REQUEST { + if let Some(mut stream) = streams.remove(&id) { + let _ = stream.send(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "netapp: cancel requested", + ))); + stream.end(); + } + self.cancel_handler(id); + continue; + } + let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let is_error = (size & ERROR_MARKER) != 0; let size = (size & CHUNK_LENGTH_MASK) as usize; @@ -88,7 +101,10 @@ pub(crate) trait RecvLoop: Sync + 'static { let kind = u8_to_io_errorkind(next_slice[0]); let msg = std::str::from_utf8(&next_slice[1..]).unwrap_or("<invalid utf8 error message>"); - debug!("recv_loop({}): got id {}, error {:?}: {}", debug_name, id, kind, msg); + debug!( + "recv_loop({}): got id {}, error {:?}: {}", + debug_name, id, kind, msg + ); Some(Err(std::io::Error::new(kind, msg.to_string()))) } else { trace!( |