aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs37
1 files changed, 31 insertions, 6 deletions
diff --git a/src/server.rs b/src/server.rs
index 2c12d9d..f9eb121 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,5 +1,6 @@
+use std::collections::HashMap;
use std::net::SocketAddr;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
@@ -53,7 +54,8 @@ pub(crate) struct ServerConn {
netapp: Arc<NetApp>,
- resp_send: ArcSwapOption<mpsc::UnboundedSender<SendStream>>,
+ resp_send: ArcSwapOption<mpsc::UnboundedSender<SendItem>>,
+ running_handlers: Mutex<HashMap<RequestID, tokio::task::JoinHandle<()>>>,
}
impl ServerConn {
@@ -99,6 +101,7 @@ impl ServerConn {
remote_addr,
peer_id,
resp_send: ArcSwapOption::new(Some(Arc::new(resp_send))),
+ running_handlers: Mutex::new(HashMap::new()),
});
netapp.connected_as_server(peer_id, conn.clone());
@@ -174,10 +177,15 @@ impl SendLoop for ServerConn {}
#[async_trait]
impl RecvLoop for ServerConn {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
- let resp_send = self.resp_send.load_full().unwrap();
+ let resp_send = match self.resp_send.load_full() {
+ Some(c) => c,
+ None => return,
+ };
+
+ let mut rh = self.running_handlers.lock().unwrap();
let self2 = self.clone();
- tokio::spawn(async move {
+ let jh = tokio::spawn(async move {
debug!("server: recv_handler got {}", id);
let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
@@ -189,9 +197,26 @@ impl RecvLoop for ServerConn {
let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result);
resp_send
- .send((id, prio, resp_order, resp_stream))
+ .send(SendItem::Stream(id, prio, resp_order, resp_stream))
.log_err("ServerConn recv_handler send resp bytes");
- Ok::<_, Error>(())
+
+ self2.running_handlers.lock().unwrap().remove(&id);
});
+
+ rh.insert(id, jh);
+ }
+
+ fn cancel_handler(self: &Arc<Self>, id: RequestID) {
+ trace!("received cancel for request {}", id);
+
+ // If the handler is still running, abort it now
+ if let Some(jh) = self.running_handlers.lock().unwrap().remove(&id) {
+ jh.abort();
+ }
+
+ // Inform the response sender that we don't need to send the response
+ if let Some(resp_send) = self.resp_send.load_full() {
+ let _ = resp_send.send(SendItem::Cancel(id));
+ }
}
}