aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--src/garage/server.rs6
-rw-r--r--src/rpc/Cargo.toml1
-rw-r--r--src/rpc/consul.rs52
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/membership.rs57
-rw-r--r--src/util/config.rs2
7 files changed, 116 insertions, 4 deletions
diff --git a/Cargo.lock b/Cargo.lock
index c389056f..1b0aad78 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -424,6 +424,7 @@ dependencies = [
"rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rustls 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
+ "serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-rustls 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 52d03464..2b618c1a 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -61,7 +61,11 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
garage
.system
.clone()
- .bootstrap(&garage.config.bootstrap_peers[..])
+ .bootstrap(
+ &garage.config.bootstrap_peers[..],
+ garage.config.consul_host.clone(),
+ garage.config.consul_service_name.clone()
+ )
.map(|rv| {
info!("Bootstrap done");
Ok(rv)
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index d7a09255..b55e9e90 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -22,6 +22,7 @@ log = "0.4"
rmp-serde = "0.14.3"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
+serde_json = "1.0"
futures = "0.3"
futures-util = "0.3"
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
new file mode 100644
index 00000000..63051a6b
--- /dev/null
+++ b/src/rpc/consul.rs
@@ -0,0 +1,52 @@
+use std::net::{IpAddr, SocketAddr};
+
+use hyper::client::Client;
+use hyper::StatusCode;
+use hyper::{Body, Method, Request};
+use serde::Deserialize;
+
+use garage_util::error::Error;
+
+#[derive(Deserialize, Clone)]
+struct ConsulEntry {
+ #[serde(alias = "Address")]
+ address: String,
+ #[serde(alias = "ServicePort")]
+ service_port: u16,
+}
+
+pub async fn get_consul_nodes(
+ consul_host: &str,
+ consul_service_name: &str,
+) -> Result<Vec<SocketAddr>, Error> {
+ let url = format!(
+ "http://{}/v1/catalog/service/{}",
+ consul_host, consul_service_name
+ );
+ let req = Request::builder()
+ .uri(url)
+ .method(Method::GET)
+ .body(Body::default())?;
+
+ let client = Client::new();
+
+ let resp = client.request(req).await?;
+ if resp.status() != StatusCode::OK {
+ return Err(Error::Message(format!("HTTP error {}", resp.status())));
+ }
+
+ let body = hyper::body::to_bytes(resp.into_body()).await?;
+ let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?;
+
+ let mut ret = vec![];
+ for ent in entries {
+ let ip = ent
+ .address
+ .parse::<IpAddr>()
+ .map_err(|e| Error::Message(format!("Could not parse IP address: {}", e)))?;
+ ret.push(SocketAddr::new(ip, ent.service_port));
+ }
+ debug!("Got nodes from Consul: {:?}", ret);
+
+ Ok(ret)
+}
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 3fae6c3e..4c5f6e31 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -1,6 +1,7 @@
#[macro_use]
extern crate log;
+pub mod consul;
pub mod membership;
pub mod rpc_client;
pub mod rpc_server;
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index dcda2c40..d19c1eb7 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -21,10 +21,12 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use crate::consul::get_consul_nodes;
use crate::rpc_client::*;
use crate::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
+const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
@@ -420,16 +422,34 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
- pub async fn bootstrap(self: Arc<Self>, peers: &[SocketAddr]) {
+ pub async fn bootstrap(
+ self: Arc<Self>,
+ peers: &[SocketAddr],
+ consul_host: Option<String>,
+ consul_service_name: Option<String>,
+ ) {
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
+ let self2 = self.clone();
self.clone()
.background
.spawn_worker(format!("ping loop"), |stop_signal| {
- self.ping_loop(stop_signal).map(Ok)
+ self2.ping_loop(stop_signal).map(Ok)
})
.await;
+
+ if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
+ let self2 = self.clone();
+ self.clone()
+ .background
+ .spawn_worker(format!("Consul loop"), |stop_signal| {
+ self2
+ .consul_loop(stop_signal, consul_host, consul_service_name)
+ .map(Ok)
+ })
+ .await;
+ }
}
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@@ -639,7 +659,7 @@ impl System {
Ok(Message::Ok)
}
- pub async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
+ async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
let restart_at = tokio::time::delay_for(PING_INTERVAL);
@@ -665,6 +685,37 @@ impl System {
}
}
+ async fn consul_loop(
+ self: Arc<Self>,
+ mut stop_signal: watch::Receiver<bool>,
+ consul_host: String,
+ consul_service_name: String,
+ ) {
+ loop {
+ let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
+
+ match get_consul_nodes(&consul_host, &consul_service_name).await {
+ Ok(mut node_list) => {
+ let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
+ self.clone().ping_nodes(ping_addrs).await;
+ }
+ Err(e) => {
+ warn!("Could not retrieve node list from Consul: {}", e);
+ }
+ }
+
+ select! {
+ _ = restart_at.fuse() => (),
+ must_exit = stop_signal.recv().fuse() => {
+ match must_exit {
+ None | Some(true) => return,
+ _ => (),
+ }
+ }
+ }
+ }
+ }
+
pub fn pull_status(
self: Arc<Self>,
peer: UUID,
diff --git a/src/util/config.rs b/src/util/config.rs
index 5c01712b..b985114d 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -14,6 +14,8 @@ pub struct Config {
pub rpc_bind_addr: SocketAddr,
pub bootstrap_peers: Vec<SocketAddr>,
+ pub consul_host: Option<String>,
+ pub consul_service_name: Option<String>,
#[serde(default = "default_max_concurrent_rpc_requests")]
pub max_concurrent_rpc_requests: usize,