aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-21 00:29:50 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-21 00:29:50 +0200
commit5e986c443f014d96bd0c27113fcf4571f2e1e881 (patch)
tree4dfaf4ae73037a8c5c0264d3f1efe2ff41db700b
parent64553479554ba437d7400cc51fe69b0f83d469b7 (diff)
downloadgarage-5e986c443f014d96bd0c27113fcf4571f2e1e881.tar.gz
garage-5e986c443f014d96bd0c27113fcf4571f2e1e881.zip
Discovery via consul
-rw-r--r--Cargo.lock2
-rw-r--r--src/rpc/consul.rs97
-rw-r--r--src/rpc/system.rs34
-rw-r--r--src/util/config.rs2
4 files changed, 125 insertions, 10 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0e53984a..5fca1c27 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -874,7 +874,7 @@ dependencies = [
[[package]]
name = "netapp"
version = "0.3.0"
-source = "git+https://git.deuxfleurs.fr/lx/netapp#de981aace0e47a1fa65b38212ac21d91e52f7c15"
+source = "git+https://git.deuxfleurs.fr/lx/netapp#e9add586a5fd6304473b9138b920e325629346f5"
dependencies = [
"arc-swap",
"async-trait",
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index fca4f517..82bf99ba 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -4,19 +4,21 @@ use std::net::{IpAddr, SocketAddr};
use hyper::client::Client;
use hyper::StatusCode;
use hyper::{Body, Method, Request};
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use netapp::NodeID;
use garage_util::error::Error;
+// ---- READING FROM CONSUL CATALOG ----
+
#[derive(Deserialize, Clone, Debug)]
-struct ConsulEntry {
- #[serde(alias = "Address")]
+struct ConsulQueryEntry {
+ #[serde(rename = "Address")]
address: String,
- #[serde(alias = "ServicePort")]
+ #[serde(rename = "ServicePort")]
service_port: u16,
- #[serde(alias = "NodeMeta")]
+ #[serde(rename = "NodeMeta")]
node_meta: HashMap<String, String>,
}
@@ -41,7 +43,7 @@ pub async fn get_consul_nodes(
}
let body = hyper::body::to_bytes(resp.into_body()).await?;
- let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?;
+ let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?;
let mut ret = vec![];
for ent in entries {
@@ -66,3 +68,86 @@ pub async fn get_consul_nodes(
Ok(ret)
}
+
+// ---- PUBLISHING TO CONSUL CATALOG ----
+
+#[derive(Serialize, Clone, Debug)]
+struct ConsulPublishEntry {
+ #[serde(rename = "Node")]
+ node: String,
+ #[serde(rename = "Address")]
+ address: IpAddr,
+ #[serde(rename = "NodeMeta")]
+ node_meta: HashMap<String, String>,
+ #[serde(rename = "Service")]
+ service: ConsulPublishService,
+}
+
+#[derive(Serialize, Clone, Debug)]
+struct ConsulPublishService {
+ #[serde(rename = "ID")]
+ service_id: String,
+ #[serde(rename = "Service")]
+ service_name: String,
+ #[serde(rename = "Tags")]
+ tags: Vec<String>,
+ #[serde(rename = "Address")]
+ address: IpAddr,
+ #[serde(rename = "Port")]
+ port: u16,
+}
+
+pub async fn publish_consul_service(
+ consul_host: &str,
+ consul_service_name: &str,
+ node_id: NodeID,
+ hostname: &str,
+ rpc_public_addr: SocketAddr,
+) -> Result<(), Error> {
+ let node = format!("garage:{}", hex::encode(&node_id[..8]));
+
+ let advertisment = ConsulPublishEntry {
+ node: node.clone(),
+ address: rpc_public_addr.ip(),
+ node_meta: [
+ ("pubkey".to_string(), hex::encode(node_id)),
+ ("hostname".to_string(), hostname.to_string()),
+ ]
+ .iter()
+ .cloned()
+ .collect(),
+ service: ConsulPublishService {
+ service_id: node.clone(),
+ service_name: consul_service_name.to_string(),
+ tags: vec!["advertised-by-garage".into(), hostname.into()],
+ address: rpc_public_addr.ip(),
+ port: rpc_public_addr.port(),
+ },
+ };
+
+ let url = format!("http://{}/v1/catalog/register", consul_host);
+ let req_body = serde_json::to_string(&advertisment)?;
+ debug!("Request body for consul adv: {}", req_body);
+
+ let req = Request::builder()
+ .uri(url)
+ .method(Method::PUT)
+ .body(Body::from(req_body))?;
+
+ let client = Client::new();
+
+ let resp = client.request(req).await?;
+ debug!("Response of advertising to Consul: {:?}", resp);
+ let resp_code = resp.status();
+ debug!(
+ "{}",
+ std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?)
+ .unwrap_or("<invalid utf8>")
+ );
+
+ if resp_code != StatusCode::OK {
+ return Err(Error::Message(format!("HTTP error {}", resp_code)));
+ }
+
+ Ok(())
+}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 68edabdf..3a81bc3b 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -28,7 +28,7 @@ use garage_util::error::Error;
use garage_util::persister::Persister;
use garage_util::time::*;
-use crate::consul::get_consul_nodes;
+use crate::consul::*;
use crate::ring::*;
use crate::rpc_helper::*;
@@ -80,6 +80,7 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
+ rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>,
consul_service_name: Option<String>,
@@ -199,6 +200,7 @@ impl System {
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
+ rpc_public_addr: config.rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(),
consul_host: config.consul_host.clone(),
consul_service_name: config.consul_service_name.clone(),
@@ -224,6 +226,32 @@ impl System {
// ---- INTERNALS ----
+ async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
+ let (consul_host, consul_service_name) =
+ match (&self.consul_host, &self.consul_service_name) {
+ (Some(ch), Some(csn)) => (ch, csn),
+ _ => return Ok(()),
+ };
+
+ let rpc_public_addr = match self.rpc_public_addr {
+ Some(addr) => addr,
+ None => {
+ warn!("Not advertising to Consul because rpc_public_addr is not defined in config file.");
+ return Ok(());
+ }
+ };
+
+ publish_consul_service(
+ consul_host,
+ consul_service_name,
+ self.netapp.id,
+ &self.local_status.load_full().hostname,
+ rpc_public_addr,
+ )
+ .await
+ .map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e)))
+ }
+
/// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
@@ -375,7 +403,7 @@ impl System {
}
}
- async fn discovery_loop(&self, mut stop_signal: watch::Receiver<bool>) {
+ async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
let consul_config = match (&self.consul_host, &self.consul_service_name) {
(Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
_ => None,
@@ -419,6 +447,8 @@ impl System {
}
}
+ self.background.spawn(self.clone().advertise_to_consul());
+
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
_ = restart_at.fuse() => {},
diff --git a/src/util/config.rs b/src/util/config.rs
index ed17f13c..f1c5c019 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -41,7 +41,7 @@ pub struct Config {
/// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")]
pub bootstrap_peers: Vec<(NodeID, SocketAddr)>,
- /// Consule host to connect to to discover more peers
+ /// Consul host to connect to to discover more peers
pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>,