aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r--src/rpc/membership.rs57
1 files changed, 54 insertions, 3 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index dcda2c40..d19c1eb7 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -21,10 +21,12 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use crate::consul::get_consul_nodes;
use crate::rpc_client::*;
use crate::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
+const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
@@ -420,16 +422,34 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
- pub async fn bootstrap(self: Arc<Self>, peers: &[SocketAddr]) {
+ pub async fn bootstrap(
+ self: Arc<Self>,
+ peers: &[SocketAddr],
+ consul_host: Option<String>,
+ consul_service_name: Option<String>,
+ ) {
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
+ let self2 = self.clone();
self.clone()
.background
.spawn_worker(format!("ping loop"), |stop_signal| {
- self.ping_loop(stop_signal).map(Ok)
+ self2.ping_loop(stop_signal).map(Ok)
})
.await;
+
+ if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
+ let self2 = self.clone();
+ self.clone()
+ .background
+ .spawn_worker(format!("Consul loop"), |stop_signal| {
+ self2
+ .consul_loop(stop_signal, consul_host, consul_service_name)
+ .map(Ok)
+ })
+ .await;
+ }
}
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@@ -639,7 +659,7 @@ impl System {
Ok(Message::Ok)
}
- pub async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
+ async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
let restart_at = tokio::time::delay_for(PING_INTERVAL);
@@ -665,6 +685,37 @@ impl System {
}
}
+ async fn consul_loop(
+ self: Arc<Self>,
+ mut stop_signal: watch::Receiver<bool>,
+ consul_host: String,
+ consul_service_name: String,
+ ) {
+ loop {
+ let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
+
+ match get_consul_nodes(&consul_host, &consul_service_name).await {
+ Ok(mut node_list) => {
+ let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
+ self.clone().ping_nodes(ping_addrs).await;
+ }
+ Err(e) => {
+ warn!("Could not retrieve node list from Consul: {}", e);
+ }
+ }
+
+ select! {
+ _ = restart_at.fuse() => (),
+ must_exit = stop_signal.recv().fuse() => {
+ match must_exit {
+ None | Some(true) => return,
+ _ => (),
+ }
+ }
+ }
+ }
+ }
+
pub fn pull_status(
self: Arc<Self>,
peer: UUID,