aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/consul.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-15 11:05:09 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-22 16:55:24 +0200
commit1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch)
treed6437f105a630fa197b67446b5c3b2902335c34a /src/rpc/consul.rs
parent4067797d0142ee7860aff8da95d65820d6cc0889 (diff)
downloadgarage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz
garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src/rpc/consul.rs')
-rw-r--r--src/rpc/consul.rs125
1 files changed, 113 insertions, 12 deletions
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index 63051a6b..82bf99ba 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -1,24 +1,31 @@
+use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use hyper::client::Client;
use hyper::StatusCode;
use hyper::{Body, Method, Request};
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
+
+use netapp::NodeID;
use garage_util::error::Error;
-#[derive(Deserialize, Clone)]
-struct ConsulEntry {
- #[serde(alias = "Address")]
+// ---- READING FROM CONSUL CATALOG ----
+
+#[derive(Deserialize, Clone, Debug)]
+struct ConsulQueryEntry {
+ #[serde(rename = "Address")]
address: String,
- #[serde(alias = "ServicePort")]
+ #[serde(rename = "ServicePort")]
service_port: u16,
+ #[serde(rename = "NodeMeta")]
+ node_meta: HashMap<String, String>,
}
pub async fn get_consul_nodes(
consul_host: &str,
consul_service_name: &str,
-) -> Result<Vec<SocketAddr>, Error> {
+) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
let url = format!(
"http://{}/v1/catalog/service/{}",
consul_host, consul_service_name
@@ -36,17 +43,111 @@ pub async fn get_consul_nodes(
}
let body = hyper::body::to_bytes(resp.into_body()).await?;
- let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?;
+ let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?;
let mut ret = vec![];
for ent in entries {
- let ip = ent
- .address
- .parse::<IpAddr>()
- .map_err(|e| Error::Message(format!("Could not parse IP address: {}", e)))?;
- ret.push(SocketAddr::new(ip, ent.service_port));
+ let ip = ent.address.parse::<IpAddr>().ok();
+ let pubkey = ent
+ .node_meta
+ .get("pubkey")
+ .map(|k| hex::decode(&k).ok())
+ .flatten()
+ .map(|k| NodeID::from_slice(&k[..]))
+ .flatten();
+ if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
+ ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
+ } else {
+ warn!(
+ "Could not process node spec from Consul: {:?} (invalid IP or public key)",
+ ent
+ );
+ }
}
debug!("Got nodes from Consul: {:?}", ret);
Ok(ret)
}
+
+// ---- PUBLISHING TO CONSUL CATALOG ----
+
+#[derive(Serialize, Clone, Debug)]
+struct ConsulPublishEntry {
+ #[serde(rename = "Node")]
+ node: String,
+ #[serde(rename = "Address")]
+ address: IpAddr,
+ #[serde(rename = "NodeMeta")]
+ node_meta: HashMap<String, String>,
+ #[serde(rename = "Service")]
+ service: ConsulPublishService,
+}
+
+#[derive(Serialize, Clone, Debug)]
+struct ConsulPublishService {
+ #[serde(rename = "ID")]
+ service_id: String,
+ #[serde(rename = "Service")]
+ service_name: String,
+ #[serde(rename = "Tags")]
+ tags: Vec<String>,
+ #[serde(rename = "Address")]
+ address: IpAddr,
+ #[serde(rename = "Port")]
+ port: u16,
+}
+
+pub async fn publish_consul_service(
+ consul_host: &str,
+ consul_service_name: &str,
+ node_id: NodeID,
+ hostname: &str,
+ rpc_public_addr: SocketAddr,
+) -> Result<(), Error> {
+ let node = format!("garage:{}", hex::encode(&node_id[..8]));
+
+ let advertisment = ConsulPublishEntry {
+ node: node.clone(),
+ address: rpc_public_addr.ip(),
+ node_meta: [
+ ("pubkey".to_string(), hex::encode(node_id)),
+ ("hostname".to_string(), hostname.to_string()),
+ ]
+ .iter()
+ .cloned()
+ .collect(),
+ service: ConsulPublishService {
+ service_id: node.clone(),
+ service_name: consul_service_name.to_string(),
+ tags: vec!["advertised-by-garage".into(), hostname.into()],
+ address: rpc_public_addr.ip(),
+ port: rpc_public_addr.port(),
+ },
+ };
+
+ let url = format!("http://{}/v1/catalog/register", consul_host);
+ let req_body = serde_json::to_string(&advertisment)?;
+ debug!("Request body for consul adv: {}", req_body);
+
+ let req = Request::builder()
+ .uri(url)
+ .method(Method::PUT)
+ .body(Body::from(req_body))?;
+
+ let client = Client::new();
+
+ let resp = client.request(req).await?;
+ debug!("Response of advertising to Consul: {:?}", resp);
+ let resp_code = resp.status();
+ debug!(
+ "{}",
+ std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?)
+ .unwrap_or("<invalid utf8>")
+ );
+
+ if resp_code != StatusCode::OK {
+ return Err(Error::Message(format!("HTTP error {}", resp_code)));
+ }
+
+ Ok(())
+}