diff options
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r-- | src/rpc/membership.rs | 57 |
1 files changed, 54 insertions, 3 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index dcda2c40..d19c1eb7 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -21,10 +21,12 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use crate::consul::get_consul_nodes; use crate::rpc_client::*; use crate::rpc_server::*; const PING_INTERVAL: Duration = Duration::from_secs(10); +const CONSUL_INTERVAL: Duration = Duration::from_secs(60); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; @@ -420,16 +422,34 @@ impl System { self.rpc_client.call_many(&to[..], msg, timeout).await; } - pub async fn bootstrap(self: Arc<Self>, peers: &[SocketAddr]) { + pub async fn bootstrap( + self: Arc<Self>, + peers: &[SocketAddr], + consul_host: Option<String>, + consul_service_name: Option<String>, + ) { let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>(); self.clone().ping_nodes(bootstrap_peers).await; + let self2 = self.clone(); self.clone() .background .spawn_worker(format!("ping loop"), |stop_signal| { - self.ping_loop(stop_signal).map(Ok) + self2.ping_loop(stop_signal).map(Ok) }) .await; + + if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { + let self2 = self.clone(); + self.clone() + .background + .spawn_worker(format!("Consul loop"), |stop_signal| { + self2 + .consul_loop(stop_signal, consul_host, consul_service_name) + .map(Ok) + }) + .await; + } } async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) { @@ -639,7 +659,7 @@ impl System { Ok(Message::Ok) } - pub async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) { + async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) { loop { let restart_at = tokio::time::delay_for(PING_INTERVAL); @@ -665,6 +685,37 @@ impl System { } } + async fn consul_loop( + self: Arc<Self>, + mut stop_signal: watch::Receiver<bool>, + consul_host: String, + consul_service_name: String, + ) { + loop { + let restart_at = tokio::time::delay_for(CONSUL_INTERVAL); + + match get_consul_nodes(&consul_host, &consul_service_name).await { + Ok(mut node_list) => { + let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>(); + self.clone().ping_nodes(ping_addrs).await; + } + Err(e) => { + warn!("Could not retrieve node list from Consul: {}", e); + } + } + + select! { + _ = restart_at.fuse() => (), + must_exit = stop_signal.recv().fuse() => { + match must_exit { + None | Some(true) => return, + _ => (), + } + } + } + } + } + pub fn pull_status( self: Arc<Self>, peer: UUID, |