aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rpc/consul.rs243
-rw-r--r--src/rpc/system.rs22
2 files changed, 133 insertions, 132 deletions
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index 05ed278a..cf050207 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -1,9 +1,8 @@
use std::collections::HashMap;
+use std::fs::File;
+use std::io::Read;
use std::net::{IpAddr, SocketAddr};
-use tokio::fs::File;
-use tokio::io::AsyncReadExt;
-
use err_derive::Error;
use serde::{Deserialize, Serialize};
@@ -11,59 +10,6 @@ use netapp::NodeID;
use garage_util::config::ConsulDiscoveryConfig;
-async fn make_consul_client(
- config: &ConsulDiscoveryConfig,
-) -> Result<reqwest::Client, ConsulError> {
- match (&config.client_cert, &config.client_key) {
- (Some(client_cert), Some(client_key)) => {
- let mut client_cert_buf = vec![];
- File::open(client_cert)
- .await?
- .read_to_end(&mut client_cert_buf)
- .await?;
-
- let mut client_key_buf = vec![];
- File::open(client_key)
- .await?
- .read_to_end(&mut client_key_buf)
- .await?;
-
- let identity = reqwest::Identity::from_pem(
- &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
- )?;
-
- if config.tls_skip_verify {
- Ok(reqwest::Client::builder()
- .use_rustls_tls()
- .danger_accept_invalid_certs(true)
- .identity(identity)
- .build()?)
- } else if let Some(ca_cert) = &config.ca_cert {
- let mut ca_cert_buf = vec![];
- File::open(ca_cert)
- .await?
- .read_to_end(&mut ca_cert_buf)
- .await?;
-
- Ok(reqwest::Client::builder()
- .use_rustls_tls()
- .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
- .identity(identity)
- .build()?)
- } else {
- Ok(reqwest::Client::builder()
- .use_rustls_tls()
- .identity(identity)
- .build()?)
- }
- }
- (None, None) => Ok(reqwest::Client::new()),
- _ => Err(ConsulError::InvalidTLSConfig),
- }
-}
-
-// ---- READING FROM CONSUL CATALOG ----
-
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
#[serde(rename = "Address")]
@@ -74,42 +20,6 @@ struct ConsulQueryEntry {
node_meta: HashMap<String, String>,
}
-pub async fn get_consul_nodes(
- consul_config: &ConsulDiscoveryConfig,
-) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
- let url = format!(
- "http://{}/v1/catalog/service/{}",
- consul_config.consul_host, consul_config.service_name
- );
-
- let client = make_consul_client(consul_config).await?;
- let http = client.get(&url).send().await?;
- let entries: Vec<ConsulQueryEntry> = http.json().await?;
-
- let mut ret = vec![];
- for ent in entries {
- let ip = ent.address.parse::<IpAddr>().ok();
- let pubkey = ent
- .node_meta
- .get("pubkey")
- .and_then(|k| hex::decode(&k).ok())
- .and_then(|k| NodeID::from_slice(&k[..]));
- if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
- ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
- } else {
- warn!(
- "Could not process node spec from Consul: {:?} (invalid IP or public key)",
- ent
- );
- }
- }
- debug!("Got nodes from Consul: {:?}", ret);
-
- Ok(ret)
-}
-
-// ---- PUBLISHING TO CONSUL CATALOG ----
-
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishEntry {
#[serde(rename = "Node")]
@@ -136,43 +46,128 @@ struct ConsulPublishService {
port: u16,
}
-pub async fn publish_consul_service(
- consul_config: &ConsulDiscoveryConfig,
- node_id: NodeID,
- hostname: &str,
- rpc_public_addr: SocketAddr,
-) -> Result<(), ConsulError> {
- let node = format!("garage:{}", hex::encode(&node_id[..8]));
-
- let advertisement = ConsulPublishEntry {
- node: node.clone(),
- address: rpc_public_addr.ip(),
- node_meta: [
- ("pubkey".to_string(), hex::encode(node_id)),
- ("hostname".to_string(), hostname.to_string()),
- ]
- .iter()
- .cloned()
- .collect(),
- service: ConsulPublishService {
- service_id: node.clone(),
- service_name: consul_config.service_name.clone(),
- tags: vec!["advertised-by-garage".into(), hostname.into()],
- address: rpc_public_addr.ip(),
- port: rpc_public_addr.port(),
- },
- };
+// ----
- let url = format!("http://{}/v1/catalog/register", consul_config.consul_host);
+pub struct ConsulDiscovery {
+ config: ConsulDiscoveryConfig,
+ client: reqwest::Client,
+}
+
+impl ConsulDiscovery {
+ pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
+ let client = match (&config.client_cert, &config.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)?.read_to_end(&mut client_key_buf)?;
+
+ let identity = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ if config.tls_skip_verify {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .danger_accept_invalid_certs(true)
+ .identity(identity)
+ .build()?
+ } else if let Some(ca_cert) = &config.ca_cert {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
+
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
+ .identity(identity)
+ .build()?
+ } else {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .identity(identity)
+ .build()?
+ }
+ }
+ (None, None) => reqwest::Client::new(),
+ _ => return Err(ConsulError::InvalidTLSConfig),
+ };
+
+ Ok(Self { client, config })
+ }
+
+ // ---- READING FROM CONSUL CATALOG ----
+
+ pub async fn get_consul_nodes(&self) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
+ let url = format!(
+ "http://{}/v1/catalog/service/{}",
+ self.config.consul_host, self.config.service_name
+ );
+
+ let http = self.client.get(&url).send().await?;
+ let entries: Vec<ConsulQueryEntry> = http.json().await?;
+
+ let mut ret = vec![];
+ for ent in entries {
+ let ip = ent.address.parse::<IpAddr>().ok();
+ let pubkey = ent
+ .node_meta
+ .get("pubkey")
+ .and_then(|k| hex::decode(&k).ok())
+ .and_then(|k| NodeID::from_slice(&k[..]));
+ if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
+ ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
+ } else {
+ warn!(
+ "Could not process node spec from Consul: {:?} (invalid IP or public key)",
+ ent
+ );
+ }
+ }
+ debug!("Got nodes from Consul: {:?}", ret);
+
+ Ok(ret)
+ }
- let client = make_consul_client(consul_config).await?;
- let http = client.put(&url).json(&advertisement).send().await?;
- http.error_for_status()?;
+ // ---- PUBLISHING TO CONSUL CATALOG ----
- Ok(())
+ pub async fn publish_consul_service(
+ &self,
+ node_id: NodeID,
+ hostname: &str,
+ rpc_public_addr: SocketAddr,
+ ) -> Result<(), ConsulError> {
+ let node = format!("garage:{}", hex::encode(&node_id[..8]));
+
+ let advertisement = ConsulPublishEntry {
+ node: node.clone(),
+ address: rpc_public_addr.ip(),
+ node_meta: [
+ ("pubkey".to_string(), hex::encode(node_id)),
+ ("hostname".to_string(), hostname.to_string()),
+ ]
+ .iter()
+ .cloned()
+ .collect(),
+ service: ConsulPublishService {
+ service_id: node.clone(),
+ service_name: self.config.service_name.clone(),
+ tags: vec!["advertised-by-garage".into(), hostname.into()],
+ address: rpc_public_addr.ip(),
+ port: rpc_public_addr.port(),
+ },
+ };
+
+ let url = format!("http://{}/v1/catalog/register", self.config.consul_host);
+
+ let http = self.client.put(&url).json(&advertisement).send().await?;
+ http.error_for_status()?;
+
+ Ok(())
+ }
}
-/// Regroup all Garage errors
+/// Regroup all Consul discovery errors
#[derive(Debug, Error)]
pub enum ConsulError {
#[error(display = "IO error: {}", _0)]
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 7b4cfbde..61e380c9 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -23,8 +23,6 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
-#[cfg(feature = "consul-discovery")]
-use garage_util::config::ConsulDiscoveryConfig;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
use garage_util::data::*;
@@ -33,7 +31,7 @@ use garage_util::persister::Persister;
use garage_util::time::*;
#[cfg(feature = "consul-discovery")]
-use crate::consul::{get_consul_nodes, publish_consul_service};
+use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::*;
@@ -100,7 +98,7 @@ pub struct System {
bootstrap_peers: Vec<String>,
#[cfg(feature = "consul-discovery")]
- consul_discovery: Option<ConsulDiscoveryConfig>,
+ consul_discovery: Option<ConsulDiscovery>,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
@@ -302,6 +300,15 @@ impl System {
warn!("Kubernetes discovery is not enabled in this build.");
}
+ #[cfg(feature = "consul-discovery")]
+ let consul_discovery = match &config.consul_discovery {
+ Some(cfg) => Some(
+ ConsulDiscovery::new(cfg.clone())
+ .ok_or_message("Invalid Consul discovery configuration")?,
+ ),
+ None => None,
+ };
+
let sys = Arc::new(System {
id: netapp.id.into(),
persist_cluster_layout,
@@ -324,7 +331,7 @@ impl System {
rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(),
#[cfg(feature = "consul-discovery")]
- consul_discovery: config.consul_discovery.clone(),
+ consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
@@ -440,8 +447,7 @@ impl System {
}
};
- publish_consul_service(
- c,
+ c.publish_consul_service(
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
@@ -638,7 +644,7 @@ impl System {
// Fetch peer list from Consul
#[cfg(feature = "consul-discovery")]
if let Some(c) = &self.consul_discovery {
- match get_consul_nodes(c).await {
+ match c.get_consul_nodes().await {
Ok(node_list) => {
ping_list.extend(node_list);
}