aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r--src/rpc/membership.rs30
1 files changed, 14 insertions, 16 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 6749478a..6cc3ed2e 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -11,9 +11,9 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use tokio::prelude::*;
use tokio::sync::watch;
use tokio::sync::Mutex;
+use tokio::io::AsyncWriteExt;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -395,7 +395,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
- if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
+ if let Err(e) = update_locked.0.send(Arc::new(status)) {
error!("In ping_nodes: could not save status update ({})", e);
}
drop(update_locked);
@@ -421,7 +421,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
- update_locked.0.broadcast(Arc::new(status))?;
+ update_locked.0.send(Arc::new(status))?;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@@ -503,7 +503,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
- update_lock.0.broadcast(Arc::new(status))?;
+ update_lock.0.send(Arc::new(status))?;
drop(update_lock);
if to_ping.len() > 0 {
@@ -523,7 +523,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
- update_lock.1.broadcast(Arc::new(ring))?;
+ update_lock.1.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@@ -531,7 +531,7 @@ impl System {
.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
.map(Ok),
);
- self.background.spawn(self.clone().save_network_config());
+ self.background.spawn(self.clone().save_network_config()).await;
}
Ok(Message::Ok)
@@ -539,7 +539,7 @@ impl System {
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
- let restart_at = tokio::time::delay_for(PING_INTERVAL);
+ let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
let ping_addrs = status
@@ -553,10 +553,9 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
+ _ = stop_signal.changed().fuse() => {
+ if *stop_signal.borrow() {
+ return;
}
}
}
@@ -570,7 +569,7 @@ impl System {
consul_service_name: String,
) {
loop {
- let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
+ let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
@@ -584,10 +583,9 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
+ _ = stop_signal.changed().fuse() => {
+ if *stop_signal.borrow() {
+ return;
}
}
}