diff options
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r-- | src/rpc/membership.rs | 49 |
1 files changed, 19 insertions, 30 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 44d7122a..4e9822fa 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -11,13 +11,14 @@ use futures::future::join_all; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use tokio::prelude::*; +use tokio::io::AsyncWriteExt; use tokio::sync::watch; use tokio::sync::Mutex; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use garage_util::time::*; use crate::consul::get_consul_nodes; use crate::ring::*; @@ -315,23 +316,17 @@ impl System { self.clone().ping_nodes(bootstrap_peers).await; let self2 = self.clone(); - self.clone() - .background + self.background .spawn_worker(format!("ping loop"), |stop_signal| { - self2.ping_loop(stop_signal).map(Ok) - }) - .await; + self2.ping_loop(stop_signal) + }); if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { let self2 = self.clone(); - self.clone() - .background + self.background .spawn_worker(format!("Consul loop"), |stop_signal| { - self2 - .consul_loop(stop_signal, consul_host, consul_service_name) - .map(Ok) - }) - .await; + self2.consul_loop(stop_signal, consul_host, consul_service_name) + }); } } @@ -399,7 +394,7 @@ impl System { if has_changes { status.recalculate_hash(); } - if let Err(e) = update_locked.0.broadcast(Arc::new(status)) { + if let Err(e) = update_locked.0.send(Arc::new(status)) { error!("In ping_nodes: could not save status update ({})", e); } drop(update_locked); @@ -425,7 +420,7 @@ impl System { let status_hash = status.hash; let config_version = self.ring.borrow().config.version; - update_locked.0.broadcast(Arc::new(status))?; + update_locked.0.send(Arc::new(status))?; drop(update_locked); if is_new || status_hash != ping.status_hash { @@ -507,7 +502,7 @@ impl System { if has_changed { status.recalculate_hash(); } - update_lock.0.broadcast(Arc::new(status))?; + update_lock.0.send(Arc::new(status))?; drop(update_lock); if to_ping.len() > 0 { @@ -527,7 +522,7 @@ impl System { if adv.version > ring.config.version { let ring = Ring::new(adv.clone()); - update_lock.1.broadcast(Arc::new(ring))?; + update_lock.1.send(Arc::new(ring))?; drop(update_lock); self.background.spawn_cancellable( @@ -543,7 +538,7 @@ impl System { async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) { loop { - let restart_at = tokio::time::delay_for(PING_INTERVAL); + let restart_at = tokio::time::sleep(PING_INTERVAL); let status = self.status.borrow().clone(); let ping_addrs = status @@ -557,10 +552,9 @@ impl System { select! { _ = restart_at.fuse() => (), - must_exit = stop_signal.recv().fuse() => { - match must_exit { - None | Some(true) => return, - _ => (), + _ = stop_signal.changed().fuse() => { + if *stop_signal.borrow() { + return; } } } @@ -573,8 +567,8 @@ impl System { consul_host: String, consul_service_name: String, ) { - loop { - let restart_at = tokio::time::delay_for(CONSUL_INTERVAL); + while !*stop_signal.borrow() { + let restart_at = tokio::time::sleep(CONSUL_INTERVAL); match get_consul_nodes(&consul_host, &consul_service_name).await { Ok(mut node_list) => { @@ -588,12 +582,7 @@ impl System { select! { _ = restart_at.fuse() => (), - must_exit = stop_signal.recv().fuse() => { - match must_exit { - None | Some(true) => return, - _ => (), - } - } + _ = stop_signal.changed().fuse() => (), } } } |