aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r--src/rpc/membership.rs49
1 files changed, 19 insertions, 30 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 44d7122a..4e9822fa 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -11,13 +11,14 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use tokio::prelude::*;
+use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::time::*;
use crate::consul::get_consul_nodes;
use crate::ring::*;
@@ -315,23 +316,17 @@ impl System {
self.clone().ping_nodes(bootstrap_peers).await;
let self2 = self.clone();
- self.clone()
- .background
+ self.background
.spawn_worker(format!("ping loop"), |stop_signal| {
- self2.ping_loop(stop_signal).map(Ok)
- })
- .await;
+ self2.ping_loop(stop_signal)
+ });
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
- self.clone()
- .background
+ self.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
- self2
- .consul_loop(stop_signal, consul_host, consul_service_name)
- .map(Ok)
- })
- .await;
+ self2.consul_loop(stop_signal, consul_host, consul_service_name)
+ });
}
}
@@ -399,7 +394,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
- if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
+ if let Err(e) = update_locked.0.send(Arc::new(status)) {
error!("In ping_nodes: could not save status update ({})", e);
}
drop(update_locked);
@@ -425,7 +420,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
- update_locked.0.broadcast(Arc::new(status))?;
+ update_locked.0.send(Arc::new(status))?;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@@ -507,7 +502,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
- update_lock.0.broadcast(Arc::new(status))?;
+ update_lock.0.send(Arc::new(status))?;
drop(update_lock);
if to_ping.len() > 0 {
@@ -527,7 +522,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
- update_lock.1.broadcast(Arc::new(ring))?;
+ update_lock.1.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@@ -543,7 +538,7 @@ impl System {
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
- let restart_at = tokio::time::delay_for(PING_INTERVAL);
+ let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
let ping_addrs = status
@@ -557,10 +552,9 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
+ _ = stop_signal.changed().fuse() => {
+ if *stop_signal.borrow() {
+ return;
}
}
}
@@ -573,8 +567,8 @@ impl System {
consul_host: String,
consul_service_name: String,
) {
- loop {
- let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
+ while !*stop_signal.borrow() {
+ let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
@@ -588,12 +582,7 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
- }
- }
+ _ = stop_signal.changed().fuse() => (),
}
}
}