aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-06-30 18:33:14 +0200
committerAlex Auvolat <alex@adnab.me>2020-06-30 18:33:14 +0200
commitfbe8fe81f278aca68de1b686948f94a8c084dcde (patch)
treed39b64d0bf36033b21307826e7ce6b0ab0848ecf /src/rpc
parentade29cf63a7c4426f0fa52f12c0ab8e0f0103cb1 (diff)
downloadgarage-fbe8fe81f278aca68de1b686948f94a8c084dcde.tar.gz
garage-fbe8fe81f278aca68de1b686948f94a8c084dcde.zip
Add automatic peer discovery from Consul
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml1
-rw-r--r--src/rpc/consul.rs52
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/membership.rs57
4 files changed, 108 insertions, 3 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index d7a09255..b55e9e90 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -22,6 +22,7 @@ log = "0.4"
rmp-serde = "0.14.3"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
+serde_json = "1.0"
futures = "0.3"
futures-util = "0.3"
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
new file mode 100644
index 00000000..63051a6b
--- /dev/null
+++ b/src/rpc/consul.rs
@@ -0,0 +1,52 @@
+use std::net::{IpAddr, SocketAddr};
+
+use hyper::client::Client;
+use hyper::StatusCode;
+use hyper::{Body, Method, Request};
+use serde::Deserialize;
+
+use garage_util::error::Error;
+
+#[derive(Deserialize, Clone)]
+struct ConsulEntry {
+ #[serde(alias = "Address")]
+ address: String,
+ #[serde(alias = "ServicePort")]
+ service_port: u16,
+}
+
+pub async fn get_consul_nodes(
+ consul_host: &str,
+ consul_service_name: &str,
+) -> Result<Vec<SocketAddr>, Error> {
+ let url = format!(
+ "http://{}/v1/catalog/service/{}",
+ consul_host, consul_service_name
+ );
+ let req = Request::builder()
+ .uri(url)
+ .method(Method::GET)
+ .body(Body::default())?;
+
+ let client = Client::new();
+
+ let resp = client.request(req).await?;
+ if resp.status() != StatusCode::OK {
+ return Err(Error::Message(format!("HTTP error {}", resp.status())));
+ }
+
+ let body = hyper::body::to_bytes(resp.into_body()).await?;
+ let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?;
+
+ let mut ret = vec![];
+ for ent in entries {
+ let ip = ent
+ .address
+ .parse::<IpAddr>()
+ .map_err(|e| Error::Message(format!("Could not parse IP address: {}", e)))?;
+ ret.push(SocketAddr::new(ip, ent.service_port));
+ }
+ debug!("Got nodes from Consul: {:?}", ret);
+
+ Ok(ret)
+}
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 3fae6c3e..4c5f6e31 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -1,6 +1,7 @@
#[macro_use]
extern crate log;
+pub mod consul;
pub mod membership;
pub mod rpc_client;
pub mod rpc_server;
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index dcda2c40..d19c1eb7 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -21,10 +21,12 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use crate::consul::get_consul_nodes;
use crate::rpc_client::*;
use crate::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
+const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
@@ -420,16 +422,34 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
- pub async fn bootstrap(self: Arc<Self>, peers: &[SocketAddr]) {
+ pub async fn bootstrap(
+ self: Arc<Self>,
+ peers: &[SocketAddr],
+ consul_host: Option<String>,
+ consul_service_name: Option<String>,
+ ) {
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
+ let self2 = self.clone();
self.clone()
.background
.spawn_worker(format!("ping loop"), |stop_signal| {
- self.ping_loop(stop_signal).map(Ok)
+ self2.ping_loop(stop_signal).map(Ok)
})
.await;
+
+ if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
+ let self2 = self.clone();
+ self.clone()
+ .background
+ .spawn_worker(format!("Consul loop"), |stop_signal| {
+ self2
+ .consul_loop(stop_signal, consul_host, consul_service_name)
+ .map(Ok)
+ })
+ .await;
+ }
}
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@@ -639,7 +659,7 @@ impl System {
Ok(Message::Ok)
}
- pub async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
+ async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
let restart_at = tokio::time::delay_for(PING_INTERVAL);
@@ -665,6 +685,37 @@ impl System {
}
}
+ async fn consul_loop(
+ self: Arc<Self>,
+ mut stop_signal: watch::Receiver<bool>,
+ consul_host: String,
+ consul_service_name: String,
+ ) {
+ loop {
+ let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
+
+ match get_consul_nodes(&consul_host, &consul_service_name).await {
+ Ok(mut node_list) => {
+ let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
+ self.clone().ping_nodes(ping_addrs).await;
+ }
+ Err(e) => {
+ warn!("Could not retrieve node list from Consul: {}", e);
+ }
+ }
+
+ select! {
+ _ = restart_at.fuse() => (),
+ must_exit = stop_signal.recv().fuse() => {
+ match must_exit {
+ None | Some(true) => return,
+ _ => (),
+ }
+ }
+ }
+ }
+ }
+
pub fn pull_status(
self: Arc<Self>,
peer: UUID,