diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-21 00:29:50 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-21 00:29:50 +0200 |
commit | 5e986c443f014d96bd0c27113fcf4571f2e1e881 (patch) | |
tree | 4dfaf4ae73037a8c5c0264d3f1efe2ff41db700b /src/rpc/consul.rs | |
parent | 64553479554ba437d7400cc51fe69b0f83d469b7 (diff) | |
download | garage-5e986c443f014d96bd0c27113fcf4571f2e1e881.tar.gz garage-5e986c443f014d96bd0c27113fcf4571f2e1e881.zip |
Discovery via consul
Diffstat (limited to 'src/rpc/consul.rs')
-rw-r--r-- | src/rpc/consul.rs | 97 |
1 files changed, 91 insertions, 6 deletions
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index fca4f517..82bf99ba 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -4,19 +4,21 @@ use std::net::{IpAddr, SocketAddr}; use hyper::client::Client; use hyper::StatusCode; use hyper::{Body, Method, Request}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use netapp::NodeID; use garage_util::error::Error; +// ---- READING FROM CONSUL CATALOG ---- + #[derive(Deserialize, Clone, Debug)] -struct ConsulEntry { - #[serde(alias = "Address")] +struct ConsulQueryEntry { + #[serde(rename = "Address")] address: String, - #[serde(alias = "ServicePort")] + #[serde(rename = "ServicePort")] service_port: u16, - #[serde(alias = "NodeMeta")] + #[serde(rename = "NodeMeta")] node_meta: HashMap<String, String>, } @@ -41,7 +43,7 @@ pub async fn get_consul_nodes( } let body = hyper::body::to_bytes(resp.into_body()).await?; - let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?; + let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?; let mut ret = vec![]; for ent in entries { @@ -66,3 +68,86 @@ pub async fn get_consul_nodes( Ok(ret) } + +// ---- PUBLISHING TO CONSUL CATALOG ---- + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishEntry { + #[serde(rename = "Node")] + node: String, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "NodeMeta")] + node_meta: HashMap<String, String>, + #[serde(rename = "Service")] + service: ConsulPublishService, +} + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Service")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec<String>, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, +} + +pub async fn publish_consul_service( + consul_host: &str, + consul_service_name: &str, + node_id: NodeID, + hostname: &str, + rpc_public_addr: SocketAddr, +) -> Result<(), Error> { + let node = format!("garage:{}", hex::encode(&node_id[..8])); + + let advertisment = ConsulPublishEntry { + node: node.clone(), + address: rpc_public_addr.ip(), + node_meta: [ + ("pubkey".to_string(), hex::encode(node_id)), + ("hostname".to_string(), hostname.to_string()), + ] + .iter() + .cloned() + .collect(), + service: ConsulPublishService { + service_id: node.clone(), + service_name: consul_service_name.to_string(), + tags: vec!["advertised-by-garage".into(), hostname.into()], + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }; + + let url = format!("http://{}/v1/catalog/register", consul_host); + let req_body = serde_json::to_string(&advertisment)?; + debug!("Request body for consul adv: {}", req_body); + + let req = Request::builder() + .uri(url) + .method(Method::PUT) + .body(Body::from(req_body))?; + + let client = Client::new(); + + let resp = client.request(req).await?; + debug!("Response of advertising to Consul: {:?}", resp); + let resp_code = resp.status(); + debug!( + "{}", + std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?) + .unwrap_or("<invalid utf8>") + ); + + if resp_code != StatusCode::OK { + return Err(Error::Message(format!("HTTP error {}", resp_code))); + } + + Ok(()) +} |