From 02ba9016ab6eca5bb4964549de002573d5a3a07a Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Fri, 5 May 2023 16:18:24 -0600 Subject: register consul services against local agent instead of catalog api --- src/rpc/Cargo.toml | 1 + src/rpc/consul_services.rs | 166 +++++++++++++++++++++++++++++++++++++++++++++ src/rpc/lib.rs | 2 + src/rpc/system.rs | 69 ++++++++++++++++++- 4 files changed, 235 insertions(+), 3 deletions(-) create mode 100644 src/rpc/consul_services.rs (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index f0fde7a7..d168e246 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -50,4 +50,5 @@ netapp = { version = "0.5.2", features = ["telemetry"] } [features] kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ] consul-discovery = [ "reqwest", "err-derive" ] +consul-service-discovery = [ "reqwest", "err-derive" ] system-libs = [ "sodiumoxide/use-pkg-config" ] diff --git a/src/rpc/consul_services.rs b/src/rpc/consul_services.rs new file mode 100644 index 00000000..461bb195 --- /dev/null +++ b/src/rpc/consul_services.rs @@ -0,0 +1,166 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io::Read; +use std::net::{IpAddr, SocketAddr}; + +use err_derive::Error; +use serde::{Deserialize, Serialize}; + +use netapp::NodeID; + +use garage_util::config::ConsulServiceConfig; + +const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; + +#[derive(Deserialize, Clone, Debug)] +struct ConsulQueryEntry { + #[serde(rename = "Address")] + address: String, + #[serde(rename = "ServicePort")] + service_port: u16, + #[serde(rename = "ServiceMeta")] + service_meta: HashMap, +} + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Name")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, + #[serde(rename = "Meta")] + meta: HashMap, +} + +// ---- + +pub struct ConsulServiceDiscovery { + config: ConsulServiceConfig, + client: reqwest::Client, +} + +impl ConsulServiceDiscovery { + pub fn new(config: ConsulServiceConfig) -> Result { + let mut builder: reqwest::ClientBuilder = match &config.ca_cert { + Some(client_ca) => { + let mut ca_cert_buf = vec![]; + File::open(client_ca)?.read_to_end(&mut ca_cert_buf)?; + + let req: reqwest::ClientBuilder = reqwest::Client::builder() + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) + .use_rustls_tls(); + + if config.tls_skip_verify { + req.danger_accept_invalid_certs(true) + } else { + req + } + } + None => reqwest::Client::builder(), + }; + + if let Some(token) = &config.consul_http_token { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert("x-consul-token", reqwest::header::HeaderValue::from_str(&token)?); + builder = builder.default_headers(headers); + } + + let client = builder.build()?; + + Ok(Self { client, config }) + } + + // ---- READING FROM CONSUL CATALOG ---- + + pub async fn get_consul_services(&self) -> Result, ConsulError> { + let url = format!( + "{}/v1/catalog/service/{}", + self.config.consul_http_addr, self.config.service_name + ); + + let req = self.client.get(&url); + let http = req.send().await?; + let entries: Vec = http.json().await?; + + let mut ret = vec![]; + for ent in entries { + let ip = ent.address.parse::().ok(); + let pubkey = ent + .service_meta + .get(&format!("{}-pubkey", META_PREFIX)) + .and_then(|k| hex::decode(k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); + if let (Some(ip), Some(pubkey)) = (ip, pubkey) { + ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); + } else { + warn!( + "Could not process node spec from Consul: {:?} (invalid IP or public key)", + ent + ); + } + } + debug!("Got nodes from Consul: {:?}", ret); + + Ok(ret) + } + + // ---- PUBLISHING TO CONSUL CATALOG ---- + + pub async fn publish_consul_service( + &self, + node_id: NodeID, + hostname: &str, + rpc_public_addr: SocketAddr, + ) -> Result<(), ConsulError> { + let node = format!("garage:{}", hex::encode(&node_id[..8])); + + let tags = [ + vec!["advertised-by-garage".into(), hostname.into()], + self.config.tags.clone(), + ] + .concat(); + + let advertisement: ConsulPublishService = ConsulPublishService { + service_id: node.clone(), + service_name: self.config.service_name.clone(), + tags, + meta: [ + (format!("{}-pubkey", META_PREFIX), hex::encode(node_id)), + (format!("{}-hostname", META_PREFIX), hostname.to_string()), + ] + .iter() + .cloned() + .collect(), + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }; + + let url = format!( + "{}/v1/agent/service/register?replace-existing-checks", + self.config.consul_http_addr + ); + + let req = self.client.put(&url); + let http = req.json(&advertisement).send().await?; + http.error_for_status()?; + + Ok(()) + } +} + +/// Regroup all Consul discovery errors +#[derive(Debug, Error)] +pub enum ConsulError { + #[error(display = "IO error: {}", _0)] + Io(#[error(source)] std::io::Error), + #[error(display = "HTTP error: {}", _0)] + Reqwest(#[error(source)] reqwest::Error), + #[error(display = "Invalid HTTP header error: {}", _0)] + HeaderValue(#[error(source)] reqwest::header::InvalidHeaderValue), +} diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 5aec92c0..2c5ed107 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -8,6 +8,8 @@ mod system_metrics; #[cfg(feature = "consul-discovery")] mod consul; +#[cfg(feature = "consul-service-discovery")] +mod consul_services; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index b42e49fc..b0fef6f2 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -32,6 +32,8 @@ use garage_util::time::*; #[cfg(feature = "consul-discovery")] use crate::consul::ConsulDiscovery; +#[cfg(feature = "consul-service-discovery")] +use crate::consul_services::ConsulServiceDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; @@ -98,12 +100,14 @@ pub struct System { system_endpoint: Arc>, rpc_listen_addr: SocketAddr, - #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] + #[cfg(any(feature = "consul-discovery", feature = "consul-service-discovery", feature = "kubernetes-discovery"))] rpc_public_addr: Option, bootstrap_peers: Vec, #[cfg(feature = "consul-discovery")] consul_discovery: Option, + #[cfg(feature = "consul-service-discovery")] + consul_service_discovery: Option, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option, @@ -346,6 +350,19 @@ impl System { warn!("Consul discovery is not enabled in this build."); } + #[cfg(feature = "consul-service-discovery")] + let consul_service_discovery = match &config.consul_service_discovery { + Some(cfg) => Some( + ConsulServiceDiscovery::new(cfg.clone()) + .ok_or_message("Invalid Consul service discovery configuration")?, + ), + None => None, + }; + #[cfg(not(feature = "consul-service-discovery"))] + if config.consul_service_discovery.is_some() { + warn!("Consul service discovery is not enabled in this build."); + } + #[cfg(not(feature = "kubernetes-discovery"))] if config.kubernetes_discovery.is_some() { warn!("Kubernetes discovery is not enabled in this build."); @@ -369,11 +386,13 @@ impl System { replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, - #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] + #[cfg(any(feature = "consul-discovery", feature = "consul-service-discovery", feature = "kubernetes-discovery"))] rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), #[cfg(feature = "consul-discovery")] consul_discovery, + #[cfg(feature = "consul-service-discovery")] + consul_service_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), metrics, @@ -555,6 +574,34 @@ impl System { } } + #[cfg(feature = "consul-service-discovery")] + async fn advertise_to_consul(self: Arc) { + let c = match &self.consul_service_discovery { + Some(c) => c, + _ => return, + }; + + let rpc_public_addr = match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected."); + return; + } + }; + + if let Err(e) = c + .publish_consul_service( + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + { + error!("Error while publishing Consul service: {}", e); + } + } + + #[cfg(feature = "kubernetes-discovery")] async fn advertise_to_kubernetes(self: Arc) { let k = match &self.kubernetes_discovery { @@ -744,7 +791,7 @@ impl System { ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr))) } - // Fetch peer list from Consul + // Fetch peer list from Consul Nodes #[cfg(feature = "consul-discovery")] if let Some(c) = &self.consul_discovery { match c.get_consul_nodes().await { @@ -757,6 +804,19 @@ impl System { } } + // Fetch peer list from Consul Services + #[cfg(feature = "consul-service-discovery")] + if let Some(c) = &self.consul_service_discovery { + match c.get_consul_services().await { + Ok(node_list) => { + ping_list.extend(node_list); + } + Err(e) => { + warn!("Could not retrieve service list from Consul: {}", e); + } + } + } + // Fetch peer list from Kubernetes #[cfg(feature = "kubernetes-discovery")] if let Some(k) = &self.kubernetes_discovery { @@ -796,6 +856,9 @@ impl System { #[cfg(feature = "consul-discovery")] tokio::spawn(self.clone().advertise_to_consul()); + #[cfg(feature = "consul-service-discovery")] + tokio::spawn(self.clone().advertise_to_consul()); + #[cfg(feature = "kubernetes-discovery")] tokio::spawn(self.clone().advertise_to_kubernetes()); -- cgit v1.2.3 From 4d6e6fc155bb263a04f7f6dfbb77933f5d2d0b2e Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Mon, 8 May 2023 13:10:23 -0600 Subject: cargo fmt --- src/rpc/consul_services.rs | 5 ++++- src/rpc/system.rs | 13 ++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/consul_services.rs b/src/rpc/consul_services.rs index 461bb195..928c7691 100644 --- a/src/rpc/consul_services.rs +++ b/src/rpc/consul_services.rs @@ -67,7 +67,10 @@ impl ConsulServiceDiscovery { if let Some(token) = &config.consul_http_token { let mut headers = reqwest::header::HeaderMap::new(); - headers.insert("x-consul-token", reqwest::header::HeaderValue::from_str(&token)?); + headers.insert( + "x-consul-token", + reqwest::header::HeaderValue::from_str(&token)?, + ); builder = builder.default_headers(headers); } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index b0fef6f2..287cd666 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -100,7 +100,11 @@ pub struct System { system_endpoint: Arc>, rpc_listen_addr: SocketAddr, - #[cfg(any(feature = "consul-discovery", feature = "consul-service-discovery", feature = "kubernetes-discovery"))] + #[cfg(any( + feature = "consul-discovery", + feature = "consul-service-discovery", + feature = "kubernetes-discovery" + ))] rpc_public_addr: Option, bootstrap_peers: Vec, @@ -386,7 +390,11 @@ impl System { replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, - #[cfg(any(feature = "consul-discovery", feature = "consul-service-discovery", feature = "kubernetes-discovery"))] + #[cfg(any( + feature = "consul-discovery", + feature = "consul-service-discovery", + feature = "kubernetes-discovery" + ))] rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), #[cfg(feature = "consul-discovery")] @@ -601,7 +609,6 @@ impl System { } } - #[cfg(feature = "kubernetes-discovery")] async fn advertise_to_kubernetes(self: Arc) { let k = match &self.kubernetes_discovery { -- cgit v1.2.3 From bd6485565e78c0bbb9ee830c4e5b114c6248dc97 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Mon, 8 May 2023 19:29:47 -0600 Subject: allow additional ServiceMeta, docs --- src/rpc/consul_services.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/consul_services.rs b/src/rpc/consul_services.rs index 928c7691..aaf3c4a1 100644 --- a/src/rpc/consul_services.rs +++ b/src/rpc/consul_services.rs @@ -129,17 +129,22 @@ impl ConsulServiceDiscovery { ] .concat(); + let mut meta = HashMap::from([ + (format!("{}-pubkey", META_PREFIX), hex::encode(node_id)), + (format!("{}-hostname", META_PREFIX), hostname.to_string()), + ]); + + if let Some(global_meta) = &self.config.meta { + for (key, value) in global_meta.into_iter() { + meta.insert(key.clone(), value.clone()); + } + } + let advertisement: ConsulPublishService = ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, - meta: [ - (format!("{}-pubkey", META_PREFIX), hex::encode(node_id)), - (format!("{}-hostname", META_PREFIX), hostname.to_string()), - ] - .iter() - .cloned() - .collect(), + meta, address: rpc_public_addr.ip(), port: rpc_public_addr.port(), }; -- cgit v1.2.3 From fd7dbea5b86ed8757e76e1114e2154538c5a3c16 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Wed, 10 May 2023 13:20:39 -0600 Subject: follow feedback, fold into existing feature --- src/rpc/consul.rs | 193 ++++++++++++++++++++++++++++++++------------- src/rpc/consul_services.rs | 174 ---------------------------------------- src/rpc/lib.rs | 2 - src/rpc/system.rs | 76 +----------------- 4 files changed, 139 insertions(+), 306 deletions(-) delete mode 100644 src/rpc/consul_services.rs (limited to 'src/rpc') diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index f85f789c..cd29893f 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -9,6 +9,9 @@ use serde::{Deserialize, Serialize}; use netapp::NodeID; use garage_util::config::ConsulDiscoveryConfig; +use garage_util::config::ConsulDiscoveryMode; + +const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; #[derive(Deserialize, Clone, Debug)] struct ConsulQueryEntry { @@ -18,6 +21,8 @@ struct ConsulQueryEntry { service_port: u16, #[serde(rename = "NodeMeta")] node_meta: HashMap, + #[serde(rename = "ServiceMeta")] + service_meta: HashMap, } #[derive(Serialize, Clone, Debug)] @@ -29,25 +34,42 @@ struct ConsulPublishEntry { #[serde(rename = "NodeMeta")] node_meta: HashMap, #[serde(rename = "Service")] - service: ConsulPublishService, + service: ConsulPublishCatalogService, } #[derive(Serialize, Clone, Debug)] -struct ConsulPublishService { +struct ConsulPublishCatalogService { #[serde(rename = "ID")] service_id: String, #[serde(rename = "Service")] service_name: String, #[serde(rename = "Tags")] tags: Vec, + #[serde(rename = "Meta")] + service_meta: HashMap, #[serde(rename = "Address")] address: IpAddr, #[serde(rename = "Port")] port: u16, } -// ---- +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Name")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, + #[serde(rename = "Meta")] + meta: HashMap, +} +// ---- pub struct ConsulDiscovery { config: ConsulDiscoveryConfig, client: reqwest::Client, @@ -55,42 +77,60 @@ pub struct ConsulDiscovery { impl ConsulDiscovery { pub fn new(config: ConsulDiscoveryConfig) -> Result { - let client = match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - - let mut client_key_buf = vec![]; - File::open(client_key)?.read_to_end(&mut client_key_buf)?; - - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; - - if config.tls_skip_verify { - reqwest::Client::builder() - .use_rustls_tls() - .danger_accept_invalid_certs(true) - .identity(identity) - .build()? - } else if let Some(ca_cert) = &config.ca_cert { + let mut builder: reqwest::ClientBuilder = reqwest::Client::builder(); + builder = builder.danger_accept_invalid_certs(config.tls_skip_verify); + + let client: reqwest::Client = match &config.mode { + ConsulDiscoveryMode::Node => { + if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + builder = builder.use_rustls_tls(); + builder = builder + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); + } + + match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + builder = builder.use_rustls_tls(); + builder = builder.identity(identity); + } + (None, None) => {} + _ => return Err(ConsulError::InvalidTLSConfig), + } + + builder.build()? + } + ConsulDiscoveryMode::Service => { + if let Some(ca_cert) = &config.ca_cert { let mut ca_cert_buf = vec![]; File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + builder = builder + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); + builder = builder.use_rustls_tls(); + } - reqwest::Client::builder() - .use_rustls_tls() - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) - .identity(identity) - .build()? - } else { - reqwest::Client::builder() - .use_rustls_tls() - .identity(identity) - .build()? + if let Some(token) = &config.consul_http_token { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + "x-consul-token", + reqwest::header::HeaderValue::from_str(&token)?, + ); + builder = builder.default_headers(headers); } + + builder.build()? } - (None, None) => reqwest::Client::new(), - _ => return Err(ConsulError::InvalidTLSConfig), }; Ok(Self { client, config }) @@ -110,11 +150,14 @@ impl ConsulDiscovery { let mut ret = vec![]; for ent in entries { let ip = ent.address.parse::().ok(); - let pubkey = ent - .node_meta - .get("pubkey") - .and_then(|k| hex::decode(k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); + let pubkey = match &self.config.mode { + ConsulDiscoveryMode::Node => ent.node_meta.get("pubkey"), + ConsulDiscoveryMode::Service => { + ent.service_meta.get(&format!("{}-pubkey", META_PREFIX)) + } + } + .and_then(|k| hex::decode(k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); } else { @@ -138,29 +181,63 @@ impl ConsulDiscovery { rpc_public_addr: SocketAddr, ) -> Result<(), ConsulError> { let node = format!("garage:{}", hex::encode(&node_id[..8])); + let tags = [ + vec!["advertised-by-garage".into(), hostname.into()], + self.config.tags.clone(), + ] + .concat(); + + let meta_prefix: String = match &self.config.mode { + ConsulDiscoveryMode::Node => "".to_string(), + ConsulDiscoveryMode::Service => format!("{}-", META_PREFIX), + }; - let advertisement = ConsulPublishEntry { - node: node.clone(), - address: rpc_public_addr.ip(), - node_meta: [ - ("pubkey".to_string(), hex::encode(node_id)), - ("hostname".to_string(), hostname.to_string()), - ] - .iter() - .cloned() - .collect(), - service: ConsulPublishService { + let mut meta = HashMap::from([ + (format!("{}pubkey", meta_prefix), hex::encode(node_id)), + (format!("{}hostname", meta_prefix), hostname.to_string()), + ]); + + if let Some(global_meta) = &self.config.meta { + for (key, value) in global_meta.into_iter() { + meta.insert(key.clone(), value.clone()); + } + } + + let url = format!( + "{}/v1/{}", + self.config.consul_http_addr, + (match &self.config.mode { + ConsulDiscoveryMode::Node => "catalog/register", + ConsulDiscoveryMode::Service => "agent/service/register?replace-existing-checks", + }) + ); + + let req = self.client.put(&url); + let http = (match &self.config.mode { + ConsulDiscoveryMode::Node => req.json(&ConsulPublishEntry { + node: node.clone(), + address: rpc_public_addr.ip(), + node_meta: meta.clone(), + service: ConsulPublishCatalogService { + service_id: node.clone(), + service_name: self.config.service_name.clone(), + tags, + service_meta: meta.clone(), + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }), + ConsulDiscoveryMode::Service => req.json(&ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), - tags: vec!["advertised-by-garage".into(), hostname.into()], + tags, + meta, address: rpc_public_addr.ip(), port: rpc_public_addr.port(), - }, - }; - - let url = format!("{}/v1/catalog/register", self.config.consul_http_addr); - - let http = self.client.put(&url).json(&advertisement).send().await?; + }), + }) + .send() + .await?; http.error_for_status()?; Ok(()) @@ -176,4 +253,6 @@ pub enum ConsulError { Reqwest(#[error(source)] reqwest::Error), #[error(display = "Invalid Consul TLS configuration")] InvalidTLSConfig, + #[error(display = "Token error: {}", _0)] + Token(#[error(source)] reqwest::header::InvalidHeaderValue), } diff --git a/src/rpc/consul_services.rs b/src/rpc/consul_services.rs deleted file mode 100644 index aaf3c4a1..00000000 --- a/src/rpc/consul_services.rs +++ /dev/null @@ -1,174 +0,0 @@ -use std::collections::HashMap; -use std::fs::File; -use std::io::Read; -use std::net::{IpAddr, SocketAddr}; - -use err_derive::Error; -use serde::{Deserialize, Serialize}; - -use netapp::NodeID; - -use garage_util::config::ConsulServiceConfig; - -const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; - -#[derive(Deserialize, Clone, Debug)] -struct ConsulQueryEntry { - #[serde(rename = "Address")] - address: String, - #[serde(rename = "ServicePort")] - service_port: u16, - #[serde(rename = "ServiceMeta")] - service_meta: HashMap, -} - -#[derive(Serialize, Clone, Debug)] -struct ConsulPublishService { - #[serde(rename = "ID")] - service_id: String, - #[serde(rename = "Name")] - service_name: String, - #[serde(rename = "Tags")] - tags: Vec, - #[serde(rename = "Address")] - address: IpAddr, - #[serde(rename = "Port")] - port: u16, - #[serde(rename = "Meta")] - meta: HashMap, -} - -// ---- - -pub struct ConsulServiceDiscovery { - config: ConsulServiceConfig, - client: reqwest::Client, -} - -impl ConsulServiceDiscovery { - pub fn new(config: ConsulServiceConfig) -> Result { - let mut builder: reqwest::ClientBuilder = match &config.ca_cert { - Some(client_ca) => { - let mut ca_cert_buf = vec![]; - File::open(client_ca)?.read_to_end(&mut ca_cert_buf)?; - - let req: reqwest::ClientBuilder = reqwest::Client::builder() - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) - .use_rustls_tls(); - - if config.tls_skip_verify { - req.danger_accept_invalid_certs(true) - } else { - req - } - } - None => reqwest::Client::builder(), - }; - - if let Some(token) = &config.consul_http_token { - let mut headers = reqwest::header::HeaderMap::new(); - headers.insert( - "x-consul-token", - reqwest::header::HeaderValue::from_str(&token)?, - ); - builder = builder.default_headers(headers); - } - - let client = builder.build()?; - - Ok(Self { client, config }) - } - - // ---- READING FROM CONSUL CATALOG ---- - - pub async fn get_consul_services(&self) -> Result, ConsulError> { - let url = format!( - "{}/v1/catalog/service/{}", - self.config.consul_http_addr, self.config.service_name - ); - - let req = self.client.get(&url); - let http = req.send().await?; - let entries: Vec = http.json().await?; - - let mut ret = vec![]; - for ent in entries { - let ip = ent.address.parse::().ok(); - let pubkey = ent - .service_meta - .get(&format!("{}-pubkey", META_PREFIX)) - .and_then(|k| hex::decode(k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); - if let (Some(ip), Some(pubkey)) = (ip, pubkey) { - ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); - } else { - warn!( - "Could not process node spec from Consul: {:?} (invalid IP or public key)", - ent - ); - } - } - debug!("Got nodes from Consul: {:?}", ret); - - Ok(ret) - } - - // ---- PUBLISHING TO CONSUL CATALOG ---- - - pub async fn publish_consul_service( - &self, - node_id: NodeID, - hostname: &str, - rpc_public_addr: SocketAddr, - ) -> Result<(), ConsulError> { - let node = format!("garage:{}", hex::encode(&node_id[..8])); - - let tags = [ - vec!["advertised-by-garage".into(), hostname.into()], - self.config.tags.clone(), - ] - .concat(); - - let mut meta = HashMap::from([ - (format!("{}-pubkey", META_PREFIX), hex::encode(node_id)), - (format!("{}-hostname", META_PREFIX), hostname.to_string()), - ]); - - if let Some(global_meta) = &self.config.meta { - for (key, value) in global_meta.into_iter() { - meta.insert(key.clone(), value.clone()); - } - } - - let advertisement: ConsulPublishService = ConsulPublishService { - service_id: node.clone(), - service_name: self.config.service_name.clone(), - tags, - meta, - address: rpc_public_addr.ip(), - port: rpc_public_addr.port(), - }; - - let url = format!( - "{}/v1/agent/service/register?replace-existing-checks", - self.config.consul_http_addr - ); - - let req = self.client.put(&url); - let http = req.json(&advertisement).send().await?; - http.error_for_status()?; - - Ok(()) - } -} - -/// Regroup all Consul discovery errors -#[derive(Debug, Error)] -pub enum ConsulError { - #[error(display = "IO error: {}", _0)] - Io(#[error(source)] std::io::Error), - #[error(display = "HTTP error: {}", _0)] - Reqwest(#[error(source)] reqwest::Error), - #[error(display = "Invalid HTTP header error: {}", _0)] - HeaderValue(#[error(source)] reqwest::header::InvalidHeaderValue), -} diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 2c5ed107..5aec92c0 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -8,8 +8,6 @@ mod system_metrics; #[cfg(feature = "consul-discovery")] mod consul; -#[cfg(feature = "consul-service-discovery")] -mod consul_services; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 287cd666..b42e49fc 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -32,8 +32,6 @@ use garage_util::time::*; #[cfg(feature = "consul-discovery")] use crate::consul::ConsulDiscovery; -#[cfg(feature = "consul-service-discovery")] -use crate::consul_services::ConsulServiceDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; @@ -100,18 +98,12 @@ pub struct System { system_endpoint: Arc>, rpc_listen_addr: SocketAddr, - #[cfg(any( - feature = "consul-discovery", - feature = "consul-service-discovery", - feature = "kubernetes-discovery" - ))] + #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] rpc_public_addr: Option, bootstrap_peers: Vec, #[cfg(feature = "consul-discovery")] consul_discovery: Option, - #[cfg(feature = "consul-service-discovery")] - consul_service_discovery: Option, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option, @@ -354,19 +346,6 @@ impl System { warn!("Consul discovery is not enabled in this build."); } - #[cfg(feature = "consul-service-discovery")] - let consul_service_discovery = match &config.consul_service_discovery { - Some(cfg) => Some( - ConsulServiceDiscovery::new(cfg.clone()) - .ok_or_message("Invalid Consul service discovery configuration")?, - ), - None => None, - }; - #[cfg(not(feature = "consul-service-discovery"))] - if config.consul_service_discovery.is_some() { - warn!("Consul service discovery is not enabled in this build."); - } - #[cfg(not(feature = "kubernetes-discovery"))] if config.kubernetes_discovery.is_some() { warn!("Kubernetes discovery is not enabled in this build."); @@ -390,17 +369,11 @@ impl System { replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, - #[cfg(any( - feature = "consul-discovery", - feature = "consul-service-discovery", - feature = "kubernetes-discovery" - ))] + #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), #[cfg(feature = "consul-discovery")] consul_discovery, - #[cfg(feature = "consul-service-discovery")] - consul_service_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), metrics, @@ -582,33 +555,6 @@ impl System { } } - #[cfg(feature = "consul-service-discovery")] - async fn advertise_to_consul(self: Arc) { - let c = match &self.consul_service_discovery { - Some(c) => c, - _ => return, - }; - - let rpc_public_addr = match self.rpc_public_addr { - Some(addr) => addr, - None => { - warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected."); - return; - } - }; - - if let Err(e) = c - .publish_consul_service( - self.netapp.id, - &self.local_status.load_full().hostname, - rpc_public_addr, - ) - .await - { - error!("Error while publishing Consul service: {}", e); - } - } - #[cfg(feature = "kubernetes-discovery")] async fn advertise_to_kubernetes(self: Arc) { let k = match &self.kubernetes_discovery { @@ -798,7 +744,7 @@ impl System { ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr))) } - // Fetch peer list from Consul Nodes + // Fetch peer list from Consul #[cfg(feature = "consul-discovery")] if let Some(c) = &self.consul_discovery { match c.get_consul_nodes().await { @@ -811,19 +757,6 @@ impl System { } } - // Fetch peer list from Consul Services - #[cfg(feature = "consul-service-discovery")] - if let Some(c) = &self.consul_service_discovery { - match c.get_consul_services().await { - Ok(node_list) => { - ping_list.extend(node_list); - } - Err(e) => { - warn!("Could not retrieve service list from Consul: {}", e); - } - } - } - // Fetch peer list from Kubernetes #[cfg(feature = "kubernetes-discovery")] if let Some(k) = &self.kubernetes_discovery { @@ -863,9 +796,6 @@ impl System { #[cfg(feature = "consul-discovery")] tokio::spawn(self.clone().advertise_to_consul()); - #[cfg(feature = "consul-service-discovery")] - tokio::spawn(self.clone().advertise_to_consul()); - #[cfg(feature = "kubernetes-discovery")] tokio::spawn(self.clone().advertise_to_kubernetes()); -- cgit v1.2.3 From 011f4730483dc7b75720c7b800702ea7c1a925c1 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Wed, 10 May 2023 13:22:14 -0600 Subject: revert rpc/Cargo.toml --- src/rpc/Cargo.toml | 1 - 1 file changed, 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index d168e246..f0fde7a7 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -50,5 +50,4 @@ netapp = { version = "0.5.2", features = ["telemetry"] } [features] kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ] consul-discovery = [ "reqwest", "err-derive" ] -consul-service-discovery = [ "reqwest", "err-derive" ] system-libs = [ "sodiumoxide/use-pkg-config" ] -- cgit v1.2.3 From 6b69404f1a53b927b4ce3cabdbb41f58e832a963 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Wed, 10 May 2023 20:11:14 -0600 Subject: rename mode to consul_http_api --- src/rpc/consul.rs | 58 ++++++++++++++++++++++++------------------------------- 1 file changed, 25 insertions(+), 33 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index cd29893f..08fb0418 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -8,8 +8,8 @@ use serde::{Deserialize, Serialize}; use netapp::NodeID; +use garage_util::config::ConsulDiscoveryAPI; use garage_util::config::ConsulDiscoveryConfig; -use garage_util::config::ConsulDiscoveryMode; const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; @@ -78,18 +78,18 @@ pub struct ConsulDiscovery { impl ConsulDiscovery { pub fn new(config: ConsulDiscoveryConfig) -> Result { let mut builder: reqwest::ClientBuilder = reqwest::Client::builder(); - builder = builder.danger_accept_invalid_certs(config.tls_skip_verify); - - let client: reqwest::Client = match &config.mode { - ConsulDiscoveryMode::Node => { - if let Some(ca_cert) = &config.ca_cert { - let mut ca_cert_buf = vec![]; - File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - builder = builder.use_rustls_tls(); - builder = builder - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); - } + if config.tls_skip_verify { + builder = builder.danger_accept_invalid_certs(true); + } else if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + builder = builder.use_rustls_tls(); + builder = + builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); + } + let client: reqwest::Client = match &config.consul_http_api { + ConsulDiscoveryAPI::Catalog => { match (&config.client_cert, &config.client_key) { (Some(client_cert), Some(client_key)) => { let mut client_cert_buf = vec![]; @@ -111,15 +111,7 @@ impl ConsulDiscovery { builder.build()? } - ConsulDiscoveryMode::Service => { - if let Some(ca_cert) = &config.ca_cert { - let mut ca_cert_buf = vec![]; - File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - builder = builder - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); - builder = builder.use_rustls_tls(); - } - + ConsulDiscoveryAPI::Agent => { if let Some(token) = &config.consul_http_token { let mut headers = reqwest::header::HeaderMap::new(); headers.insert( @@ -150,9 +142,9 @@ impl ConsulDiscovery { let mut ret = vec![]; for ent in entries { let ip = ent.address.parse::().ok(); - let pubkey = match &self.config.mode { - ConsulDiscoveryMode::Node => ent.node_meta.get("pubkey"), - ConsulDiscoveryMode::Service => { + let pubkey = match &self.config.consul_http_api { + ConsulDiscoveryAPI::Catalog => ent.node_meta.get("pubkey"), + ConsulDiscoveryAPI::Agent => { ent.service_meta.get(&format!("{}-pubkey", META_PREFIX)) } } @@ -187,9 +179,9 @@ impl ConsulDiscovery { ] .concat(); - let meta_prefix: String = match &self.config.mode { - ConsulDiscoveryMode::Node => "".to_string(), - ConsulDiscoveryMode::Service => format!("{}-", META_PREFIX), + let meta_prefix: String = match &self.config.consul_http_api { + ConsulDiscoveryAPI::Catalog => "".to_string(), + ConsulDiscoveryAPI::Agent => format!("{}-", META_PREFIX), }; let mut meta = HashMap::from([ @@ -206,15 +198,15 @@ impl ConsulDiscovery { let url = format!( "{}/v1/{}", self.config.consul_http_addr, - (match &self.config.mode { - ConsulDiscoveryMode::Node => "catalog/register", - ConsulDiscoveryMode::Service => "agent/service/register?replace-existing-checks", + (match &self.config.consul_http_api { + ConsulDiscoveryAPI::Catalog => "catalog/register", + ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks", }) ); let req = self.client.put(&url); - let http = (match &self.config.mode { - ConsulDiscoveryMode::Node => req.json(&ConsulPublishEntry { + let http = (match &self.config.consul_http_api { + ConsulDiscoveryAPI::Catalog => req.json(&ConsulPublishEntry { node: node.clone(), address: rpc_public_addr.ip(), node_meta: meta.clone(), @@ -227,7 +219,7 @@ impl ConsulDiscovery { port: rpc_public_addr.port(), }, }), - ConsulDiscoveryMode::Service => req.json(&ConsulPublishService { + ConsulDiscoveryAPI::Agent => req.json(&ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, -- cgit v1.2.3 From b7705041268e49f2a5ba9a719372048f85c3de83 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Mon, 15 May 2023 16:15:56 -0600 Subject: simplify code according to feedback --- src/rpc/consul.rs | 110 ++++++++++++++++++++++-------------------------------- 1 file changed, 45 insertions(+), 65 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index 08fb0418..ab8d1112 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -19,10 +19,15 @@ struct ConsulQueryEntry { address: String, #[serde(rename = "ServicePort")] service_port: u16, - #[serde(rename = "NodeMeta")] - node_meta: HashMap, #[serde(rename = "ServiceMeta")] - service_meta: HashMap, + meta: HashMap, +} + +#[derive(Serialize, Clone, Debug)] +#[serde(untagged)] +enum PublishRequest { + Catalog(ConsulPublishEntry), + Service(ConsulPublishService), } #[derive(Serialize, Clone, Debug)] @@ -31,8 +36,6 @@ struct ConsulPublishEntry { node: String, #[serde(rename = "Address")] address: IpAddr, - #[serde(rename = "NodeMeta")] - node_meta: HashMap, #[serde(rename = "Service")] service: ConsulPublishCatalogService, } @@ -46,7 +49,7 @@ struct ConsulPublishCatalogService { #[serde(rename = "Tags")] tags: Vec, #[serde(rename = "Meta")] - service_meta: HashMap, + meta: HashMap, #[serde(rename = "Address")] address: IpAddr, #[serde(rename = "Port")] @@ -77,42 +80,36 @@ pub struct ConsulDiscovery { impl ConsulDiscovery { pub fn new(config: ConsulDiscoveryConfig) -> Result { - let mut builder: reqwest::ClientBuilder = reqwest::Client::builder(); + let mut builder: reqwest::ClientBuilder = reqwest::Client::builder().use_rustls_tls(); if config.tls_skip_verify { builder = builder.danger_accept_invalid_certs(true); } else if let Some(ca_cert) = &config.ca_cert { let mut ca_cert_buf = vec![]; File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - builder = builder.use_rustls_tls(); builder = builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); } - let client: reqwest::Client = match &config.consul_http_api { - ConsulDiscoveryAPI::Catalog => { - match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - - let mut client_key_buf = vec![]; - File::open(client_key)?.read_to_end(&mut client_key_buf)?; - - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; - - builder = builder.use_rustls_tls(); - builder = builder.identity(identity); - } - (None, None) => {} - _ => return Err(ConsulError::InvalidTLSConfig), - } + match &config.api { + ConsulDiscoveryAPI::Catalog => match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - builder.build()? - } + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + builder = builder.identity(identity); + } + (None, None) => {} + _ => return Err(ConsulError::InvalidTLSConfig), + }, ConsulDiscoveryAPI::Agent => { - if let Some(token) = &config.consul_http_token { + if let Some(token) = &config.token { let mut headers = reqwest::header::HeaderMap::new(); headers.insert( "x-consul-token", @@ -120,11 +117,11 @@ impl ConsulDiscovery { ); builder = builder.default_headers(headers); } - - builder.build()? } }; + let client: reqwest::Client = builder.build()?; + Ok(Self { client, config }) } @@ -142,14 +139,11 @@ impl ConsulDiscovery { let mut ret = vec![]; for ent in entries { let ip = ent.address.parse::().ok(); - let pubkey = match &self.config.consul_http_api { - ConsulDiscoveryAPI::Catalog => ent.node_meta.get("pubkey"), - ConsulDiscoveryAPI::Agent => { - ent.service_meta.get(&format!("{}-pubkey", META_PREFIX)) - } - } - .and_then(|k| hex::decode(k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); + let pubkey = ent + .meta + .get(&format!("{}-pubkey", META_PREFIX)) + .and_then(|k| hex::decode(k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); } else { @@ -179,47 +173,34 @@ impl ConsulDiscovery { ] .concat(); - let meta_prefix: String = match &self.config.consul_http_api { - ConsulDiscoveryAPI::Catalog => "".to_string(), - ConsulDiscoveryAPI::Agent => format!("{}-", META_PREFIX), - }; - - let mut meta = HashMap::from([ - (format!("{}pubkey", meta_prefix), hex::encode(node_id)), - (format!("{}hostname", meta_prefix), hostname.to_string()), - ]); - - if let Some(global_meta) = &self.config.meta { - for (key, value) in global_meta.into_iter() { - meta.insert(key.clone(), value.clone()); - } - } + let mut meta = self.config.meta.clone().unwrap_or_default(); + meta.insert(format!("{}-pubkey", META_PREFIX), hex::encode(node_id)); + meta.insert(format!("{}-hostname", META_PREFIX), hostname.to_string()); let url = format!( "{}/v1/{}", self.config.consul_http_addr, - (match &self.config.consul_http_api { + (match &self.config.api { ConsulDiscoveryAPI::Catalog => "catalog/register", ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks", }) ); let req = self.client.put(&url); - let http = (match &self.config.consul_http_api { - ConsulDiscoveryAPI::Catalog => req.json(&ConsulPublishEntry { + let advertisement: PublishRequest = match &self.config.api { + ConsulDiscoveryAPI::Catalog => PublishRequest::Catalog(ConsulPublishEntry { node: node.clone(), address: rpc_public_addr.ip(), - node_meta: meta.clone(), service: ConsulPublishCatalogService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, - service_meta: meta.clone(), + meta: meta.clone(), address: rpc_public_addr.ip(), port: rpc_public_addr.port(), }, }), - ConsulDiscoveryAPI::Agent => req.json(&ConsulPublishService { + ConsulDiscoveryAPI::Agent => PublishRequest::Service(ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, @@ -227,9 +208,8 @@ impl ConsulDiscovery { address: rpc_public_addr.ip(), port: rpc_public_addr.port(), }), - }) - .send() - .await?; + }; + let http = req.json(&advertisement).send().await?; http.error_for_status()?; Ok(()) -- cgit v1.2.3