aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-22 17:04:33 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-22 17:04:33 +0000
commit8971f34c816c42a0eb2bbcead5ac7f05854ddfb6 (patch)
tree8f2b272e1d033cab12b17271d036e3d1c0a183ba
parente8214cb1807d3145907c7ed9e077fa45ada4aeea (diff)
downloadgarage-8971f34c816c42a0eb2bbcead5ac7f05854ddfb6.tar.gz
garage-8971f34c816c42a0eb2bbcead5ac7f05854ddfb6.zip
Well they still have to exit when we're exiting though
-rw-r--r--src/background.rs9
-rw-r--r--src/rpc_client.rs1
-rw-r--r--src/rpc_server.rs14
3 files changed, 12 insertions, 12 deletions
diff --git a/src/background.rs b/src/background.rs
index f0dbdcb5..937062dd 100644
--- a/src/background.rs
+++ b/src/background.rs
@@ -2,6 +2,8 @@ use core::future::Future;
use std::pin::Pin;
use futures::future::join_all;
+use futures::select;
+use futures_util::future::*;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch, Notify};
@@ -88,7 +90,7 @@ impl BackgroundRunner {
}
async fn runner(self: Arc<Self>, i: usize) {
- let stop_signal = self.stop_signal.clone();
+ let mut stop_signal = self.stop_signal.clone();
loop {
let must_exit: bool = *stop_signal.borrow();
if let Some(job) = self.dequeue_job(must_exit).await {
@@ -100,7 +102,10 @@ impl BackgroundRunner {
info!("Background runner {} exiting", i);
return;
}
- self.job_notify.notified().await;
+ select! {
+ _ = self.job_notify.notified().fuse() => (),
+ _ = stop_signal.recv().fuse() => (),
+ }
}
}
}
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index a5c44a2e..aeb66956 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -263,4 +263,3 @@ impl RpcHttpClient {
}
}
}
-
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 51661a66..bcf7496f 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -47,16 +47,13 @@ where
let begin_time = Instant::now();
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?;
- let req_str = debug_serialize(&msg);
match handler(msg, sockaddr).await {
Ok(resp) => {
let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
- trace!(
- "]RPC:{},ok ({} ms), request: {}",
- name,
- (Instant::now() - begin_time).as_millis(),
- req_str,
- );
+ let rpc_duration = (Instant::now() - begin_time).as_millis();
+ if rpc_duration > 100 {
+ debug!("RPC {} ok, took long: {} ms", name, rpc_duration,);
+ }
Ok(Response::new(Body::from(resp_bytes)))
}
Err(e) => {
@@ -65,11 +62,10 @@ where
let mut err_response = Response::new(Body::from(rep_bytes));
*err_response.status_mut() = e.http_status_code();
warn!(
- "RPC error ({}): {} ({} ms), request: {}",
+ "RPC error ({}): {} ({} ms)",
name,
e,
(Instant::now() - begin_time).as_millis(),
- req_str,
);
Ok(err_response)
}