diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-21 00:29:50 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-21 00:29:50 +0200 |
commit | 5e986c443f014d96bd0c27113fcf4571f2e1e881 (patch) | |
tree | 4dfaf4ae73037a8c5c0264d3f1efe2ff41db700b | |
parent | 64553479554ba437d7400cc51fe69b0f83d469b7 (diff) | |
download | garage-5e986c443f014d96bd0c27113fcf4571f2e1e881.tar.gz garage-5e986c443f014d96bd0c27113fcf4571f2e1e881.zip |
Discovery via consul
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | src/rpc/consul.rs | 97 | ||||
-rw-r--r-- | src/rpc/system.rs | 34 | ||||
-rw-r--r-- | src/util/config.rs | 2 |
4 files changed, 125 insertions, 10 deletions
@@ -874,7 +874,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.3.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp#de981aace0e47a1fa65b38212ac21d91e52f7c15" +source = "git+https://git.deuxfleurs.fr/lx/netapp#e9add586a5fd6304473b9138b920e325629346f5" dependencies = [ "arc-swap", "async-trait", diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index fca4f517..82bf99ba 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -4,19 +4,21 @@ use std::net::{IpAddr, SocketAddr}; use hyper::client::Client; use hyper::StatusCode; use hyper::{Body, Method, Request}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use netapp::NodeID; use garage_util::error::Error; +// ---- READING FROM CONSUL CATALOG ---- + #[derive(Deserialize, Clone, Debug)] -struct ConsulEntry { - #[serde(alias = "Address")] +struct ConsulQueryEntry { + #[serde(rename = "Address")] address: String, - #[serde(alias = "ServicePort")] + #[serde(rename = "ServicePort")] service_port: u16, - #[serde(alias = "NodeMeta")] + #[serde(rename = "NodeMeta")] node_meta: HashMap<String, String>, } @@ -41,7 +43,7 @@ pub async fn get_consul_nodes( } let body = hyper::body::to_bytes(resp.into_body()).await?; - let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?; + let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?; let mut ret = vec![]; for ent in entries { @@ -66,3 +68,86 @@ pub async fn get_consul_nodes( Ok(ret) } + +// ---- PUBLISHING TO CONSUL CATALOG ---- + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishEntry { + #[serde(rename = "Node")] + node: String, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "NodeMeta")] + node_meta: HashMap<String, String>, + #[serde(rename = "Service")] + service: ConsulPublishService, +} + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Service")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec<String>, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, +} + +pub async fn publish_consul_service( + consul_host: &str, + consul_service_name: &str, + node_id: NodeID, + hostname: &str, + rpc_public_addr: SocketAddr, +) -> Result<(), Error> { + let node = format!("garage:{}", hex::encode(&node_id[..8])); + + let advertisment = ConsulPublishEntry { + node: node.clone(), + address: rpc_public_addr.ip(), + node_meta: [ + ("pubkey".to_string(), hex::encode(node_id)), + ("hostname".to_string(), hostname.to_string()), + ] + .iter() + .cloned() + .collect(), + service: ConsulPublishService { + service_id: node.clone(), + service_name: consul_service_name.to_string(), + tags: vec!["advertised-by-garage".into(), hostname.into()], + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }; + + let url = format!("http://{}/v1/catalog/register", consul_host); + let req_body = serde_json::to_string(&advertisment)?; + debug!("Request body for consul adv: {}", req_body); + + let req = Request::builder() + .uri(url) + .method(Method::PUT) + .body(Body::from(req_body))?; + + let client = Client::new(); + + let resp = client.request(req).await?; + debug!("Response of advertising to Consul: {:?}", resp); + let resp_code = resp.status(); + debug!( + "{}", + std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?) + .unwrap_or("<invalid utf8>") + ); + + if resp_code != StatusCode::OK { + return Err(Error::Message(format!("HTTP error {}", resp_code))); + } + + Ok(()) +} diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 68edabdf..3a81bc3b 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -28,7 +28,7 @@ use garage_util::error::Error; use garage_util::persister::Persister; use garage_util::time::*; -use crate::consul::get_consul_nodes; +use crate::consul::*; use crate::ring::*; use crate::rpc_helper::*; @@ -80,6 +80,7 @@ pub struct System { system_endpoint: Arc<Endpoint<SystemRpc, System>>, rpc_listen_addr: SocketAddr, + rpc_public_addr: Option<SocketAddr>, bootstrap_peers: Vec<(NodeID, SocketAddr)>, consul_host: Option<String>, consul_service_name: Option<String>, @@ -199,6 +200,7 @@ impl System { system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, + rpc_public_addr: config.rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), consul_host: config.consul_host.clone(), consul_service_name: config.consul_service_name.clone(), @@ -224,6 +226,32 @@ impl System { // ---- INTERNALS ---- + async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { + let (consul_host, consul_service_name) = + match (&self.consul_host, &self.consul_service_name) { + (Some(ch), Some(csn)) => (ch, csn), + _ => return Ok(()), + }; + + let rpc_public_addr = match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("Not advertising to Consul because rpc_public_addr is not defined in config file."); + return Ok(()); + } + }; + + publish_consul_service( + consul_host, + consul_service_name, + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + .map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e))) + } + /// Save network configuration to disc async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { let ring: Arc<Ring> = self.ring.borrow().clone(); @@ -375,7 +403,7 @@ impl System { } } - async fn discovery_loop(&self, mut stop_signal: watch::Receiver<bool>) { + async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { let consul_config = match (&self.consul_host, &self.consul_service_name) { (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), _ => None, @@ -419,6 +447,8 @@ impl System { } } + self.background.spawn(self.clone().advertise_to_consul()); + let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { _ = restart_at.fuse() => {}, diff --git a/src/util/config.rs b/src/util/config.rs index ed17f13c..f1c5c019 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -41,7 +41,7 @@ pub struct Config { /// Bootstrap peers RPC address #[serde(deserialize_with = "deserialize_vec_addr")] pub bootstrap_peers: Vec<(NodeID, SocketAddr)>, - /// Consule host to connect to to discover more peers + /// Consul host to connect to to discover more peers pub consul_host: Option<String>, /// Consul service name to use pub consul_service_name: Option<String>, |