aboutsummaryrefslogtreecommitdiff
path: root/src/recv.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-01 15:54:11 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-01 15:54:11 +0200
commit522f420e2bf30d5ef6f50dccb88adf86882ac7c6 (patch)
treea4a4085e8bdf9d3699bba96d4350bfe2039290e5 /src/recv.rs
parent32925667385db9e1d9e56ebae67d03d8096f7c46 (diff)
downloadnetapp-522f420e2bf30d5ef6f50dccb88adf86882ac7c6.tar.gz
netapp-522f420e2bf30d5ef6f50dccb88adf86882ac7c6.zip
Implement request cancellation
Diffstat (limited to 'src/recv.rs')
-rw-r--r--src/recv.rs18
1 files changed, 17 insertions, 1 deletions
diff --git a/src/recv.rs b/src/recv.rs
index ac93c4b..8909190 100644
--- a/src/recv.rs
+++ b/src/recv.rs
@@ -53,6 +53,7 @@ impl Drop for Sender {
#[async_trait]
pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
+ fn cancel_handler(self: &Arc<Self>, _id: RequestID) {}
async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error>
where
@@ -78,6 +79,18 @@ pub(crate) trait RecvLoop: Sync + 'static {
read.read_exact(&mut header_size[..]).await?;
let size = ChunkLength::from_be_bytes(header_size);
+ if size == CANCEL_REQUEST {
+ if let Some(mut stream) = streams.remove(&id) {
+ let _ = stream.send(Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "netapp: cancel requested",
+ )));
+ stream.end();
+ }
+ self.cancel_handler(id);
+ continue;
+ }
+
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
let is_error = (size & ERROR_MARKER) != 0;
let size = (size & CHUNK_LENGTH_MASK) as usize;
@@ -88,7 +101,10 @@ pub(crate) trait RecvLoop: Sync + 'static {
let kind = u8_to_io_errorkind(next_slice[0]);
let msg =
std::str::from_utf8(&next_slice[1..]).unwrap_or("<invalid utf8 error message>");
- debug!("recv_loop({}): got id {}, error {:?}: {}", debug_name, id, kind, msg);
+ debug!(
+ "recv_loop({}): got id {}, error {:?}: {}",
+ debug_name, id, kind, msg
+ );
Some(Err(std::io::Error::new(kind, msg.to_string())))
} else {
trace!(