diff options
author | Alex Auvolat <alex@adnab.me> | 2022-09-01 15:54:11 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-09-01 15:54:11 +0200 |
commit | 522f420e2bf30d5ef6f50dccb88adf86882ac7c6 (patch) | |
tree | a4a4085e8bdf9d3699bba96d4350bfe2039290e5 /src/server.rs | |
parent | 32925667385db9e1d9e56ebae67d03d8096f7c46 (diff) | |
download | netapp-522f420e2bf30d5ef6f50dccb88adf86882ac7c6.tar.gz netapp-522f420e2bf30d5ef6f50dccb88adf86882ac7c6.zip |
Implement request cancellation
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 37 |
1 files changed, 31 insertions, 6 deletions
diff --git a/src/server.rs b/src/server.rs index 2c12d9d..f9eb121 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,6 @@ +use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -53,7 +54,8 @@ pub(crate) struct ServerConn { netapp: Arc<NetApp>, - resp_send: ArcSwapOption<mpsc::UnboundedSender<SendStream>>, + resp_send: ArcSwapOption<mpsc::UnboundedSender<SendItem>>, + running_handlers: Mutex<HashMap<RequestID, tokio::task::JoinHandle<()>>>, } impl ServerConn { @@ -99,6 +101,7 @@ impl ServerConn { remote_addr, peer_id, resp_send: ArcSwapOption::new(Some(Arc::new(resp_send))), + running_handlers: Mutex::new(HashMap::new()), }); netapp.connected_as_server(peer_id, conn.clone()); @@ -174,10 +177,15 @@ impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { - let resp_send = self.resp_send.load_full().unwrap(); + let resp_send = match self.resp_send.load_full() { + Some(c) => c, + None => return, + }; + + let mut rh = self.running_handlers.lock().unwrap(); let self2 = self.clone(); - tokio::spawn(async move { + let jh = tokio::spawn(async move { debug!("server: recv_handler got {}", id); let (prio, resp_enc_result) = match ReqEnc::decode(stream).await { @@ -189,9 +197,26 @@ impl RecvLoop for ServerConn { let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result); resp_send - .send((id, prio, resp_order, resp_stream)) + .send(SendItem::Stream(id, prio, resp_order, resp_stream)) .log_err("ServerConn recv_handler send resp bytes"); - Ok::<_, Error>(()) + + self2.running_handlers.lock().unwrap().remove(&id); }); + + rh.insert(id, jh); + } + + fn cancel_handler(self: &Arc<Self>, id: RequestID) { + trace!("received cancel for request {}", id); + + // If the handler is still running, abort it now + if let Some(jh) = self.running_handlers.lock().unwrap().remove(&id) { + jh.abort(); + } + + // Inform the response sender that we don't need to send the response + if let Some(resp_send) = self.resp_send.load_full() { + let _ = resp_send.send(SendItem::Cancel(id)); + } } } |