diff options
-rw-r--r-- | src/rpc/consul.rs | 243 | ||||
-rw-r--r-- | src/rpc/system.rs | 22 |
2 files changed, 133 insertions, 132 deletions
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index 05ed278a..cf050207 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -1,9 +1,8 @@ use std::collections::HashMap; +use std::fs::File; +use std::io::Read; use std::net::{IpAddr, SocketAddr}; -use tokio::fs::File; -use tokio::io::AsyncReadExt; - use err_derive::Error; use serde::{Deserialize, Serialize}; @@ -11,59 +10,6 @@ use netapp::NodeID; use garage_util::config::ConsulDiscoveryConfig; -async fn make_consul_client( - config: &ConsulDiscoveryConfig, -) -> Result<reqwest::Client, ConsulError> { - match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert) - .await? - .read_to_end(&mut client_cert_buf) - .await?; - - let mut client_key_buf = vec![]; - File::open(client_key) - .await? - .read_to_end(&mut client_key_buf) - .await?; - - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; - - if config.tls_skip_verify { - Ok(reqwest::Client::builder() - .use_rustls_tls() - .danger_accept_invalid_certs(true) - .identity(identity) - .build()?) - } else if let Some(ca_cert) = &config.ca_cert { - let mut ca_cert_buf = vec![]; - File::open(ca_cert) - .await? - .read_to_end(&mut ca_cert_buf) - .await?; - - Ok(reqwest::Client::builder() - .use_rustls_tls() - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) - .identity(identity) - .build()?) - } else { - Ok(reqwest::Client::builder() - .use_rustls_tls() - .identity(identity) - .build()?) - } - } - (None, None) => Ok(reqwest::Client::new()), - _ => Err(ConsulError::InvalidTLSConfig), - } -} - -// ---- READING FROM CONSUL CATALOG ---- - #[derive(Deserialize, Clone, Debug)] struct ConsulQueryEntry { #[serde(rename = "Address")] @@ -74,42 +20,6 @@ struct ConsulQueryEntry { node_meta: HashMap<String, String>, } -pub async fn get_consul_nodes( - consul_config: &ConsulDiscoveryConfig, -) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> { - let url = format!( - "http://{}/v1/catalog/service/{}", - consul_config.consul_host, consul_config.service_name - ); - - let client = make_consul_client(consul_config).await?; - let http = client.get(&url).send().await?; - let entries: Vec<ConsulQueryEntry> = http.json().await?; - - let mut ret = vec![]; - for ent in entries { - let ip = ent.address.parse::<IpAddr>().ok(); - let pubkey = ent - .node_meta - .get("pubkey") - .and_then(|k| hex::decode(&k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); - if let (Some(ip), Some(pubkey)) = (ip, pubkey) { - ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); - } else { - warn!( - "Could not process node spec from Consul: {:?} (invalid IP or public key)", - ent - ); - } - } - debug!("Got nodes from Consul: {:?}", ret); - - Ok(ret) -} - -// ---- PUBLISHING TO CONSUL CATALOG ---- - #[derive(Serialize, Clone, Debug)] struct ConsulPublishEntry { #[serde(rename = "Node")] @@ -136,43 +46,128 @@ struct ConsulPublishService { port: u16, } -pub async fn publish_consul_service( - consul_config: &ConsulDiscoveryConfig, - node_id: NodeID, - hostname: &str, - rpc_public_addr: SocketAddr, -) -> Result<(), ConsulError> { - let node = format!("garage:{}", hex::encode(&node_id[..8])); - - let advertisement = ConsulPublishEntry { - node: node.clone(), - address: rpc_public_addr.ip(), - node_meta: [ - ("pubkey".to_string(), hex::encode(node_id)), - ("hostname".to_string(), hostname.to_string()), - ] - .iter() - .cloned() - .collect(), - service: ConsulPublishService { - service_id: node.clone(), - service_name: consul_config.service_name.clone(), - tags: vec!["advertised-by-garage".into(), hostname.into()], - address: rpc_public_addr.ip(), - port: rpc_public_addr.port(), - }, - }; +// ---- - let url = format!("http://{}/v1/catalog/register", consul_config.consul_host); +pub struct ConsulDiscovery { + config: ConsulDiscoveryConfig, + client: reqwest::Client, +} + +impl ConsulDiscovery { + pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> { + let client = match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + if config.tls_skip_verify { + reqwest::Client::builder() + .use_rustls_tls() + .danger_accept_invalid_certs(true) + .identity(identity) + .build()? + } else if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + + reqwest::Client::builder() + .use_rustls_tls() + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) + .identity(identity) + .build()? + } else { + reqwest::Client::builder() + .use_rustls_tls() + .identity(identity) + .build()? + } + } + (None, None) => reqwest::Client::new(), + _ => return Err(ConsulError::InvalidTLSConfig), + }; + + Ok(Self { client, config }) + } + + // ---- READING FROM CONSUL CATALOG ---- + + pub async fn get_consul_nodes(&self) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> { + let url = format!( + "http://{}/v1/catalog/service/{}", + self.config.consul_host, self.config.service_name + ); + + let http = self.client.get(&url).send().await?; + let entries: Vec<ConsulQueryEntry> = http.json().await?; + + let mut ret = vec![]; + for ent in entries { + let ip = ent.address.parse::<IpAddr>().ok(); + let pubkey = ent + .node_meta + .get("pubkey") + .and_then(|k| hex::decode(&k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); + if let (Some(ip), Some(pubkey)) = (ip, pubkey) { + ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); + } else { + warn!( + "Could not process node spec from Consul: {:?} (invalid IP or public key)", + ent + ); + } + } + debug!("Got nodes from Consul: {:?}", ret); + + Ok(ret) + } - let client = make_consul_client(consul_config).await?; - let http = client.put(&url).json(&advertisement).send().await?; - http.error_for_status()?; + // ---- PUBLISHING TO CONSUL CATALOG ---- - Ok(()) + pub async fn publish_consul_service( + &self, + node_id: NodeID, + hostname: &str, + rpc_public_addr: SocketAddr, + ) -> Result<(), ConsulError> { + let node = format!("garage:{}", hex::encode(&node_id[..8])); + + let advertisement = ConsulPublishEntry { + node: node.clone(), + address: rpc_public_addr.ip(), + node_meta: [ + ("pubkey".to_string(), hex::encode(node_id)), + ("hostname".to_string(), hostname.to_string()), + ] + .iter() + .cloned() + .collect(), + service: ConsulPublishService { + service_id: node.clone(), + service_name: self.config.service_name.clone(), + tags: vec!["advertised-by-garage".into(), hostname.into()], + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }; + + let url = format!("http://{}/v1/catalog/register", self.config.consul_host); + + let http = self.client.put(&url).json(&advertisement).send().await?; + http.error_for_status()?; + + Ok(()) + } } -/// Regroup all Garage errors +/// Regroup all Consul discovery errors #[derive(Debug, Error)] pub enum ConsulError { #[error(display = "IO error: {}", _0)] diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 7b4cfbde..61e380c9 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -23,8 +23,6 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_util::background::BackgroundRunner; use garage_util::config::Config; -#[cfg(feature = "consul-discovery")] -use garage_util::config::ConsulDiscoveryConfig; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; use garage_util::data::*; @@ -33,7 +31,7 @@ use garage_util::persister::Persister; use garage_util::time::*; #[cfg(feature = "consul-discovery")] -use crate::consul::{get_consul_nodes, publish_consul_service}; +use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; @@ -100,7 +98,7 @@ pub struct System { bootstrap_peers: Vec<String>, #[cfg(feature = "consul-discovery")] - consul_discovery: Option<ConsulDiscoveryConfig>, + consul_discovery: Option<ConsulDiscovery>, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option<KubernetesDiscoveryConfig>, @@ -302,6 +300,15 @@ impl System { warn!("Kubernetes discovery is not enabled in this build."); } + #[cfg(feature = "consul-discovery")] + let consul_discovery = match &config.consul_discovery { + Some(cfg) => Some( + ConsulDiscovery::new(cfg.clone()) + .ok_or_message("Invalid Consul discovery configuration")?, + ), + None => None, + }; + let sys = Arc::new(System { id: netapp.id.into(), persist_cluster_layout, @@ -324,7 +331,7 @@ impl System { rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), #[cfg(feature = "consul-discovery")] - consul_discovery: config.consul_discovery.clone(), + consul_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), @@ -440,8 +447,7 @@ impl System { } }; - publish_consul_service( - c, + c.publish_consul_service( self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, @@ -638,7 +644,7 @@ impl System { // Fetch peer list from Consul #[cfg(feature = "consul-discovery")] if let Some(c) = &self.consul_discovery { - match get_consul_nodes(c).await { + match c.get_consul_nodes().await { Ok(node_list) => { ping_list.extend(node_list); } |