diff options
author | Alex <alex@adnab.me> | 2022-10-18 20:20:55 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-10-18 20:20:55 +0000 |
commit | 52547506586c10cd8200fbdf6112dd53663e5200 (patch) | |
tree | 3ea0cd34ae65c037ebb51a35fd83c71912f2c9e2 /src | |
parent | 5670599372f6c3c60dcd74279a0741248fc510c3 (diff) | |
parent | 57b5c2c754a8d6d55882eae3dad44e328d961084 (diff) | |
download | garage-0.8.0-rc2.tar.gz garage-0.8.0-rc2.zip |
Merge pull request 'Add TLS support for Consul discovery + refactoring' (#405) from consul-tls into mainv0.8.0-rc2v0.8-rc2
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/405
Diffstat (limited to 'src')
-rw-r--r-- | src/garage/Cargo.toml | 2 | ||||
-rw-r--r-- | src/garage/main.rs | 2 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 6 | ||||
-rw-r--r-- | src/rpc/consul.rs | 234 | ||||
-rw-r--r-- | src/rpc/kubernetes.rs | 16 | ||||
-rw-r--r-- | src/rpc/lib.rs | 1 | ||||
-rw-r--r-- | src/rpc/system.rs | 73 | ||||
-rw-r--r-- | src/util/config.rs | 45 |
8 files changed, 212 insertions, 167 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index ddc23170..cbc0dc61 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -81,6 +81,8 @@ sled = [ "garage_model/sled" ] lmdb = [ "garage_model/lmdb" ] sqlite = [ "garage_model/sqlite" ] +# Automatic registration and discovery via Consul API +consul-discovery = [ "garage_rpc/consul-discovery" ] # Automatic registration and discovery via Kubernetes API kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] # Prometheus exporter (/metrics endpoint). diff --git a/src/garage/main.rs b/src/garage/main.rs index e5cba553..5b2a85c0 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -90,6 +90,8 @@ async fn main() { "lmdb", #[cfg(feature = "sqlite")] "sqlite", + #[cfg(feature = "consul-discovery")] + "consul-discovery", #[cfg(feature = "kubernetes-discovery")] "kubernetes-discovery", #[cfg(feature = "metrics")] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 883929e8..2c2ddc0b 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -29,11 +29,13 @@ rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" serde_json = "1.0" +err-derive = { version = "0.3", optional = true } # newer version requires rust edition 2021 kube = { version = "0.75", default-features = false, features = ["runtime", "derive", "client", "rustls-tls"], optional = true } k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true } schemars = { version = "0.8", optional = true } +reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] } # newer version requires rust edition 2021 pnet_datalink = "0.28" @@ -46,9 +48,7 @@ opentelemetry = "0.17" netapp = { version = "0.5.2", features = ["telemetry"] } -hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } - - [features] kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ] +consul-discovery = [ "reqwest", "err-derive" ] system-libs = [ "sodiumoxide/use-pkg-config" ] diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index 15acbcef..b1772a1a 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -1,16 +1,14 @@ use std::collections::HashMap; +use std::fs::File; +use std::io::Read; use std::net::{IpAddr, SocketAddr}; -use hyper::client::Client; -use hyper::StatusCode; -use hyper::{Body, Method, Request}; +use err_derive::Error; use serde::{Deserialize, Serialize}; use netapp::NodeID; -use garage_util::error::Error; - -// ---- READING FROM CONSUL CATALOG ---- +use garage_util::config::ConsulDiscoveryConfig; #[derive(Deserialize, Clone, Debug)] struct ConsulQueryEntry { @@ -22,53 +20,6 @@ struct ConsulQueryEntry { node_meta: HashMap<String, String>, } -pub async fn get_consul_nodes( - consul_host: &str, - consul_service_name: &str, -) -> Result<Vec<(NodeID, SocketAddr)>, Error> { - let url = format!( - "http://{}/v1/catalog/service/{}", - consul_host, consul_service_name - ); - let req = Request::builder() - .uri(url) - .method(Method::GET) - .body(Body::default())?; - - let client = Client::new(); - - let resp = client.request(req).await?; - if resp.status() != StatusCode::OK { - return Err(Error::Message(format!("HTTP error {}", resp.status()))); - } - - let body = hyper::body::to_bytes(resp.into_body()).await?; - let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?; - - let mut ret = vec![]; - for ent in entries { - let ip = ent.address.parse::<IpAddr>().ok(); - let pubkey = ent - .node_meta - .get("pubkey") - .and_then(|k| hex::decode(&k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); - if let (Some(ip), Some(pubkey)) = (ip, pubkey) { - ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); - } else { - warn!( - "Could not process node spec from Consul: {:?} (invalid IP or public key)", - ent - ); - } - } - debug!("Got nodes from Consul: {:?}", ret); - - Ok(ret) -} - -// ---- PUBLISHING TO CONSUL CATALOG ---- - #[derive(Serialize, Clone, Debug)] struct ConsulPublishEntry { #[serde(rename = "Node")] @@ -95,57 +46,134 @@ struct ConsulPublishService { port: u16, } -pub async fn publish_consul_service( - consul_host: &str, - consul_service_name: &str, - node_id: NodeID, - hostname: &str, - rpc_public_addr: SocketAddr, -) -> Result<(), Error> { - let node = format!("garage:{}", hex::encode(&node_id[..8])); - - let advertisment = ConsulPublishEntry { - node: node.clone(), - address: rpc_public_addr.ip(), - node_meta: [ - ("pubkey".to_string(), hex::encode(node_id)), - ("hostname".to_string(), hostname.to_string()), - ] - .iter() - .cloned() - .collect(), - service: ConsulPublishService { - service_id: node.clone(), - service_name: consul_service_name.to_string(), - tags: vec!["advertised-by-garage".into(), hostname.into()], +// ---- + +pub struct ConsulDiscovery { + config: ConsulDiscoveryConfig, + client: reqwest::Client, +} + +impl ConsulDiscovery { + pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> { + let client = match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + if config.tls_skip_verify { + reqwest::Client::builder() + .use_rustls_tls() + .danger_accept_invalid_certs(true) + .identity(identity) + .build()? + } else if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + + reqwest::Client::builder() + .use_rustls_tls() + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) + .identity(identity) + .build()? + } else { + reqwest::Client::builder() + .use_rustls_tls() + .identity(identity) + .build()? + } + } + (None, None) => reqwest::Client::new(), + _ => return Err(ConsulError::InvalidTLSConfig), + }; + + Ok(Self { client, config }) + } + + // ---- READING FROM CONSUL CATALOG ---- + + pub async fn get_consul_nodes(&self) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> { + let url = format!( + "{}/v1/catalog/service/{}", + self.config.consul_http_addr, self.config.service_name + ); + + let http = self.client.get(&url).send().await?; + let entries: Vec<ConsulQueryEntry> = http.json().await?; + + let mut ret = vec![]; + for ent in entries { + let ip = ent.address.parse::<IpAddr>().ok(); + let pubkey = ent + .node_meta + .get("pubkey") + .and_then(|k| hex::decode(&k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); + if let (Some(ip), Some(pubkey)) = (ip, pubkey) { + ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); + } else { + warn!( + "Could not process node spec from Consul: {:?} (invalid IP or public key)", + ent + ); + } + } + debug!("Got nodes from Consul: {:?}", ret); + + Ok(ret) + } + + // ---- PUBLISHING TO CONSUL CATALOG ---- + + pub async fn publish_consul_service( + &self, + node_id: NodeID, + hostname: &str, + rpc_public_addr: SocketAddr, + ) -> Result<(), ConsulError> { + let node = format!("garage:{}", hex::encode(&node_id[..8])); + + let advertisement = ConsulPublishEntry { + node: node.clone(), address: rpc_public_addr.ip(), - port: rpc_public_addr.port(), - }, - }; - - let url = format!("http://{}/v1/catalog/register", consul_host); - let req_body = serde_json::to_string(&advertisment)?; - debug!("Request body for consul adv: {}", req_body); - - let req = Request::builder() - .uri(url) - .method(Method::PUT) - .body(Body::from(req_body))?; - - let client = Client::new(); - - let resp = client.request(req).await?; - debug!("Response of advertising to Consul: {:?}", resp); - let resp_code = resp.status(); - let resp_bytes = &hyper::body::to_bytes(resp.into_body()).await?; - debug!( - "{}", - std::str::from_utf8(resp_bytes).unwrap_or("<invalid utf8>") - ); - - if resp_code != StatusCode::OK { - return Err(Error::Message(format!("HTTP error {}", resp_code))); + node_meta: [ + ("pubkey".to_string(), hex::encode(node_id)), + ("hostname".to_string(), hostname.to_string()), + ] + .iter() + .cloned() + .collect(), + service: ConsulPublishService { + service_id: node.clone(), + service_name: self.config.service_name.clone(), + tags: vec!["advertised-by-garage".into(), hostname.into()], + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }; + + let url = format!("{}/v1/catalog/register", self.config.consul_http_addr); + + let http = self.client.put(&url).json(&advertisement).send().await?; + http.error_for_status()?; + + Ok(()) } +} - Ok(()) +/// Regroup all Consul discovery errors +#[derive(Debug, Error)] +pub enum ConsulError { + #[error(display = "IO error: {}", _0)] + Io(#[error(source)] std::io::Error), + #[error(display = "HTTP error: {}", _0)] + Reqwest(#[error(source)] reqwest::Error), + #[error(display = "Invalid Consul TLS configuration")] + InvalidTLSConfig, } diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs index 197245aa..63c6567d 100644 --- a/src/rpc/kubernetes.rs +++ b/src/rpc/kubernetes.rs @@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize}; use netapp::NodeID; +use garage_util::config::KubernetesDiscoveryConfig; + static K8S_GROUP: &str = "deuxfleurs.fr"; #[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)] @@ -41,15 +43,14 @@ pub async fn create_kubernetes_crd() -> Result<(), kube::Error> { } pub async fn get_kubernetes_nodes( - kubernetes_service_name: &str, - kubernetes_namespace: &str, + kubernetes_config: &KubernetesDiscoveryConfig, ) -> Result<Vec<(NodeID, SocketAddr)>, kube::Error> { let client = Client::try_default().await?; - let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace); + let nodes: Api<GarageNode> = Api::namespaced(client.clone(), &kubernetes_config.namespace); let lp = ListParams::default().labels(&format!( "garage.{}/service={}", - K8S_GROUP, kubernetes_service_name + K8S_GROUP, kubernetes_config.service_name )); let nodes = nodes.list(&lp).await?; @@ -73,8 +74,7 @@ pub async fn get_kubernetes_nodes( } pub async fn publish_kubernetes_node( - kubernetes_service_name: &str, - kubernetes_namespace: &str, + kubernetes_config: &KubernetesDiscoveryConfig, node_id: NodeID, hostname: &str, rpc_public_addr: SocketAddr, @@ -93,13 +93,13 @@ pub async fn publish_kubernetes_node( let labels = node.metadata.labels.insert(BTreeMap::new()); labels.insert( format!("garage.{}/service", K8S_GROUP), - kubernetes_service_name.to_string(), + kubernetes_config.service_name.to_string(), ); debug!("Node object to be applied: {:#?}", node); let client = Client::try_default().await?; - let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace); + let nodes: Api<GarageNode> = Api::namespaced(client.clone(), &kubernetes_config.namespace); if let Ok(old_node) = nodes.get(&node_pubkey).await { node.metadata.resource_version = old_node.metadata.resource_version; diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 392ff48f..92caf75d 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -3,6 +3,7 @@ #[macro_use] extern crate tracing; +#[cfg(feature = "consul-discovery")] mod consul; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 9e0bfa11..d6576f20 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -23,12 +23,15 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_util::background::BackgroundRunner; use garage_util::config::Config; +#[cfg(feature = "kubernetes-discovery")] +use garage_util::config::KubernetesDiscoveryConfig; use garage_util::data::*; use garage_util::error::*; use garage_util::persister::Persister; use garage_util::time::*; -use crate::consul::*; +#[cfg(feature = "consul-discovery")] +use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; @@ -90,12 +93,14 @@ pub struct System { system_endpoint: Arc<Endpoint<SystemRpc, System>>, rpc_listen_addr: SocketAddr, + #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] rpc_public_addr: Option<SocketAddr>, bootstrap_peers: Vec<String>, - consul_discovery: Option<ConsulDiscoveryParam>, + #[cfg(feature = "consul-discovery")] + consul_discovery: Option<ConsulDiscovery>, #[cfg(feature = "kubernetes-discovery")] - kubernetes_discovery: Option<KubernetesDiscoveryParam>, + kubernetes_discovery: Option<KubernetesDiscoveryConfig>, replication_factor: usize, @@ -285,29 +290,21 @@ impl System { let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); - let consul_discovery = match (&config.consul_host, &config.consul_service_name) { - (Some(ch), Some(csn)) => Some(ConsulDiscoveryParam { - consul_host: ch.to_string(), - service_name: csn.to_string(), - }), - _ => None, - }; - - #[cfg(feature = "kubernetes-discovery")] - let kubernetes_discovery = match ( - &config.kubernetes_service_name, - &config.kubernetes_namespace, - ) { - (Some(ksn), Some(kn)) => Some(KubernetesDiscoveryParam { - service_name: ksn.to_string(), - namespace: kn.to_string(), - skip_crd: config.kubernetes_skip_crd, - }), - _ => None, + #[cfg(feature = "consul-discovery")] + let consul_discovery = match &config.consul_discovery { + Some(cfg) => Some( + ConsulDiscovery::new(cfg.clone()) + .ok_or_message("Invalid Consul discovery configuration")?, + ), + None => None, }; + #[cfg(not(feature = "consul-discovery"))] + if config.consul_discovery.is_some() { + warn!("Consul discovery is not enabled in this build."); + } #[cfg(not(feature = "kubernetes-discovery"))] - if config.kubernetes_service_name.is_some() || config.kubernetes_namespace.is_some() { + if config.kubernetes_discovery.is_some() { warn!("Kubernetes discovery is not enabled in this build."); } @@ -329,11 +326,13 @@ impl System { system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, + #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), + #[cfg(feature = "consul-discovery")] consul_discovery, #[cfg(feature = "kubernetes-discovery")] - kubernetes_discovery, + kubernetes_discovery: config.kubernetes_discovery.clone(), ring, update_ring: Mutex::new(update_ring), @@ -432,6 +431,7 @@ impl System { // ---- INTERNALS ---- + #[cfg(feature = "consul-discovery")] async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { let c = match &self.consul_discovery { Some(c) => c, @@ -446,9 +446,7 @@ impl System { } }; - publish_consul_service( - &c.consul_host, - &c.service_name, + c.publish_consul_service( self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, @@ -473,8 +471,7 @@ impl System { }; publish_kubernetes_node( - &k.service_name, - &k.namespace, + k, self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, @@ -644,8 +641,9 @@ impl System { } // Fetch peer list from Consul + #[cfg(feature = "consul-discovery")] if let Some(c) = &self.consul_discovery { - match get_consul_nodes(&c.consul_host, &c.service_name).await { + match c.get_consul_nodes().await { Ok(node_list) => { ping_list.extend(node_list); } @@ -667,7 +665,7 @@ impl System { }; } - match get_kubernetes_nodes(&k.service_name, &k.namespace).await { + match get_kubernetes_nodes(k).await { Ok(node_list) => { ping_list.extend(node_list); } @@ -691,6 +689,7 @@ impl System { warn!("Could not save peer list to file: {}", e); } + #[cfg(feature = "consul-discovery")] self.background.spawn(self.clone().advertise_to_consul()); #[cfg(feature = "kubernetes-discovery")] @@ -785,15 +784,3 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { ret } - -struct ConsulDiscoveryParam { - consul_host: String, - service_name: String, -} - -#[cfg(feature = "kubernetes-discovery")] -struct KubernetesDiscoveryParam { - service_name: String, - namespace: String, - skip_crd: bool, -} diff --git a/src/util/config.rs b/src/util/config.rs index 2d4b4f57..04f8375a 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -46,20 +46,17 @@ pub struct Config { /// Timeout for Netapp RPC calls pub rpc_timeout_msec: Option<u64>, + // -- Bootstraping and discovery /// Bootstrap peers RPC address #[serde(default)] pub bootstrap_peers: Vec<String>, - /// Consul host to connect to to discover more peers - pub consul_host: Option<String>, - /// Consul service name to use - pub consul_service_name: Option<String>, - /// Kubernetes namespace the service discovery resources are be created in - pub kubernetes_namespace: Option<String>, - /// Service name to filter for in k8s custom resources - pub kubernetes_service_name: Option<String>, - /// Skip creation of the garagenodes CRD + + /// Configuration for automatic node discovery through Consul #[serde(default)] - pub kubernetes_skip_crd: bool, + pub consul_discovery: Option<ConsulDiscoveryConfig>, + /// Configuration for automatic node discovery through Kubernetes + #[serde(default)] + pub kubernetes_discovery: Option<KubernetesDiscoveryConfig>, // -- DB /// Database engine to use for metadata (options: sled, sqlite, lmdb) @@ -129,6 +126,34 @@ pub struct AdminConfig { pub trace_sink: Option<String>, } +#[derive(Deserialize, Debug, Clone)] +pub struct ConsulDiscoveryConfig { + /// Consul http or https address to connect to to discover more peers + pub consul_http_addr: String, + /// Consul service name to use + pub service_name: String, + /// CA TLS certificate to use when connecting to Consul + pub ca_cert: Option<String>, + /// Client TLS certificate to use when connecting to Consul + pub client_cert: Option<String>, + /// Client TLS key to use when connecting to Consul + pub client_key: Option<String>, + /// Skip TLS hostname verification + #[serde(default)] + pub tls_skip_verify: bool, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct KubernetesDiscoveryConfig { + /// Kubernetes namespace the service discovery resources are be created in + pub namespace: String, + /// Service name to filter for in k8s custom resources + pub service_name: String, + /// Skip creation of the garagenodes CRD + #[serde(default)] + pub skip_crd: bool, +} + fn default_db_engine() -> String { "sled".into() } |