aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-10-18 20:20:55 +0000
committerAlex <alex@adnab.me>2022-10-18 20:20:55 +0000
commit52547506586c10cd8200fbdf6112dd53663e5200 (patch)
tree3ea0cd34ae65c037ebb51a35fd83c71912f2c9e2 /src
parent5670599372f6c3c60dcd74279a0741248fc510c3 (diff)
parent57b5c2c754a8d6d55882eae3dad44e328d961084 (diff)
downloadgarage-0.8.0-rc2.tar.gz
garage-0.8.0-rc2.zip
Merge pull request 'Add TLS support for Consul discovery + refactoring' (#405) from consul-tls into mainv0.8.0-rc2v0.8-rc2
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/405
Diffstat (limited to 'src')
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/main.rs2
-rw-r--r--src/rpc/Cargo.toml6
-rw-r--r--src/rpc/consul.rs234
-rw-r--r--src/rpc/kubernetes.rs16
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/system.rs73
-rw-r--r--src/util/config.rs45
8 files changed, 212 insertions, 167 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index ddc23170..cbc0dc61 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -81,6 +81,8 @@ sled = [ "garage_model/sled" ]
lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ]
+# Automatic registration and discovery via Consul API
+consul-discovery = [ "garage_rpc/consul-discovery" ]
# Automatic registration and discovery via Kubernetes API
kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
# Prometheus exporter (/metrics endpoint).
diff --git a/src/garage/main.rs b/src/garage/main.rs
index e5cba553..5b2a85c0 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -90,6 +90,8 @@ async fn main() {
"lmdb",
#[cfg(feature = "sqlite")]
"sqlite",
+ #[cfg(feature = "consul-discovery")]
+ "consul-discovery",
#[cfg(feature = "kubernetes-discovery")]
"kubernetes-discovery",
#[cfg(feature = "metrics")]
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 883929e8..2c2ddc0b 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -29,11 +29,13 @@ rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_json = "1.0"
+err-derive = { version = "0.3", optional = true }
# newer version requires rust edition 2021
kube = { version = "0.75", default-features = false, features = ["runtime", "derive", "client", "rustls-tls"], optional = true }
k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true }
schemars = { version = "0.8", optional = true }
+reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] }
# newer version requires rust edition 2021
pnet_datalink = "0.28"
@@ -46,9 +48,7 @@ opentelemetry = "0.17"
netapp = { version = "0.5.2", features = ["telemetry"] }
-hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
-
-
[features]
kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ]
+consul-discovery = [ "reqwest", "err-derive" ]
system-libs = [ "sodiumoxide/use-pkg-config" ]
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index 15acbcef..b1772a1a 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -1,16 +1,14 @@
use std::collections::HashMap;
+use std::fs::File;
+use std::io::Read;
use std::net::{IpAddr, SocketAddr};
-use hyper::client::Client;
-use hyper::StatusCode;
-use hyper::{Body, Method, Request};
+use err_derive::Error;
use serde::{Deserialize, Serialize};
use netapp::NodeID;
-use garage_util::error::Error;
-
-// ---- READING FROM CONSUL CATALOG ----
+use garage_util::config::ConsulDiscoveryConfig;
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
@@ -22,53 +20,6 @@ struct ConsulQueryEntry {
node_meta: HashMap<String, String>,
}
-pub async fn get_consul_nodes(
- consul_host: &str,
- consul_service_name: &str,
-) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
- let url = format!(
- "http://{}/v1/catalog/service/{}",
- consul_host, consul_service_name
- );
- let req = Request::builder()
- .uri(url)
- .method(Method::GET)
- .body(Body::default())?;
-
- let client = Client::new();
-
- let resp = client.request(req).await?;
- if resp.status() != StatusCode::OK {
- return Err(Error::Message(format!("HTTP error {}", resp.status())));
- }
-
- let body = hyper::body::to_bytes(resp.into_body()).await?;
- let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?;
-
- let mut ret = vec![];
- for ent in entries {
- let ip = ent.address.parse::<IpAddr>().ok();
- let pubkey = ent
- .node_meta
- .get("pubkey")
- .and_then(|k| hex::decode(&k).ok())
- .and_then(|k| NodeID::from_slice(&k[..]));
- if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
- ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
- } else {
- warn!(
- "Could not process node spec from Consul: {:?} (invalid IP or public key)",
- ent
- );
- }
- }
- debug!("Got nodes from Consul: {:?}", ret);
-
- Ok(ret)
-}
-
-// ---- PUBLISHING TO CONSUL CATALOG ----
-
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishEntry {
#[serde(rename = "Node")]
@@ -95,57 +46,134 @@ struct ConsulPublishService {
port: u16,
}
-pub async fn publish_consul_service(
- consul_host: &str,
- consul_service_name: &str,
- node_id: NodeID,
- hostname: &str,
- rpc_public_addr: SocketAddr,
-) -> Result<(), Error> {
- let node = format!("garage:{}", hex::encode(&node_id[..8]));
-
- let advertisment = ConsulPublishEntry {
- node: node.clone(),
- address: rpc_public_addr.ip(),
- node_meta: [
- ("pubkey".to_string(), hex::encode(node_id)),
- ("hostname".to_string(), hostname.to_string()),
- ]
- .iter()
- .cloned()
- .collect(),
- service: ConsulPublishService {
- service_id: node.clone(),
- service_name: consul_service_name.to_string(),
- tags: vec!["advertised-by-garage".into(), hostname.into()],
+// ----
+
+pub struct ConsulDiscovery {
+ config: ConsulDiscoveryConfig,
+ client: reqwest::Client,
+}
+
+impl ConsulDiscovery {
+ pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
+ let client = match (&config.client_cert, &config.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)?.read_to_end(&mut client_key_buf)?;
+
+ let identity = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ if config.tls_skip_verify {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .danger_accept_invalid_certs(true)
+ .identity(identity)
+ .build()?
+ } else if let Some(ca_cert) = &config.ca_cert {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
+
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
+ .identity(identity)
+ .build()?
+ } else {
+ reqwest::Client::builder()
+ .use_rustls_tls()
+ .identity(identity)
+ .build()?
+ }
+ }
+ (None, None) => reqwest::Client::new(),
+ _ => return Err(ConsulError::InvalidTLSConfig),
+ };
+
+ Ok(Self { client, config })
+ }
+
+ // ---- READING FROM CONSUL CATALOG ----
+
+ pub async fn get_consul_nodes(&self) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
+ let url = format!(
+ "{}/v1/catalog/service/{}",
+ self.config.consul_http_addr, self.config.service_name
+ );
+
+ let http = self.client.get(&url).send().await?;
+ let entries: Vec<ConsulQueryEntry> = http.json().await?;
+
+ let mut ret = vec![];
+ for ent in entries {
+ let ip = ent.address.parse::<IpAddr>().ok();
+ let pubkey = ent
+ .node_meta
+ .get("pubkey")
+ .and_then(|k| hex::decode(&k).ok())
+ .and_then(|k| NodeID::from_slice(&k[..]));
+ if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
+ ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
+ } else {
+ warn!(
+ "Could not process node spec from Consul: {:?} (invalid IP or public key)",
+ ent
+ );
+ }
+ }
+ debug!("Got nodes from Consul: {:?}", ret);
+
+ Ok(ret)
+ }
+
+ // ---- PUBLISHING TO CONSUL CATALOG ----
+
+ pub async fn publish_consul_service(
+ &self,
+ node_id: NodeID,
+ hostname: &str,
+ rpc_public_addr: SocketAddr,
+ ) -> Result<(), ConsulError> {
+ let node = format!("garage:{}", hex::encode(&node_id[..8]));
+
+ let advertisement = ConsulPublishEntry {
+ node: node.clone(),
address: rpc_public_addr.ip(),
- port: rpc_public_addr.port(),
- },
- };
-
- let url = format!("http://{}/v1/catalog/register", consul_host);
- let req_body = serde_json::to_string(&advertisment)?;
- debug!("Request body for consul adv: {}", req_body);
-
- let req = Request::builder()
- .uri(url)
- .method(Method::PUT)
- .body(Body::from(req_body))?;
-
- let client = Client::new();
-
- let resp = client.request(req).await?;
- debug!("Response of advertising to Consul: {:?}", resp);
- let resp_code = resp.status();
- let resp_bytes = &hyper::body::to_bytes(resp.into_body()).await?;
- debug!(
- "{}",
- std::str::from_utf8(resp_bytes).unwrap_or("<invalid utf8>")
- );
-
- if resp_code != StatusCode::OK {
- return Err(Error::Message(format!("HTTP error {}", resp_code)));
+ node_meta: [
+ ("pubkey".to_string(), hex::encode(node_id)),
+ ("hostname".to_string(), hostname.to_string()),
+ ]
+ .iter()
+ .cloned()
+ .collect(),
+ service: ConsulPublishService {
+ service_id: node.clone(),
+ service_name: self.config.service_name.clone(),
+ tags: vec!["advertised-by-garage".into(), hostname.into()],
+ address: rpc_public_addr.ip(),
+ port: rpc_public_addr.port(),
+ },
+ };
+
+ let url = format!("{}/v1/catalog/register", self.config.consul_http_addr);
+
+ let http = self.client.put(&url).json(&advertisement).send().await?;
+ http.error_for_status()?;
+
+ Ok(())
}
+}
- Ok(())
+/// Regroup all Consul discovery errors
+#[derive(Debug, Error)]
+pub enum ConsulError {
+ #[error(display = "IO error: {}", _0)]
+ Io(#[error(source)] std::io::Error),
+ #[error(display = "HTTP error: {}", _0)]
+ Reqwest(#[error(source)] reqwest::Error),
+ #[error(display = "Invalid Consul TLS configuration")]
+ InvalidTLSConfig,
}
diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs
index 197245aa..63c6567d 100644
--- a/src/rpc/kubernetes.rs
+++ b/src/rpc/kubernetes.rs
@@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID;
+use garage_util::config::KubernetesDiscoveryConfig;
+
static K8S_GROUP: &str = "deuxfleurs.fr";
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
@@ -41,15 +43,14 @@ pub async fn create_kubernetes_crd() -> Result<(), kube::Error> {
}
pub async fn get_kubernetes_nodes(
- kubernetes_service_name: &str,
- kubernetes_namespace: &str,
+ kubernetes_config: &KubernetesDiscoveryConfig,
) -> Result<Vec<(NodeID, SocketAddr)>, kube::Error> {
let client = Client::try_default().await?;
- let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
+ let nodes: Api<GarageNode> = Api::namespaced(client.clone(), &kubernetes_config.namespace);
let lp = ListParams::default().labels(&format!(
"garage.{}/service={}",
- K8S_GROUP, kubernetes_service_name
+ K8S_GROUP, kubernetes_config.service_name
));
let nodes = nodes.list(&lp).await?;
@@ -73,8 +74,7 @@ pub async fn get_kubernetes_nodes(
}
pub async fn publish_kubernetes_node(
- kubernetes_service_name: &str,
- kubernetes_namespace: &str,
+ kubernetes_config: &KubernetesDiscoveryConfig,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
@@ -93,13 +93,13 @@ pub async fn publish_kubernetes_node(
let labels = node.metadata.labels.insert(BTreeMap::new());
labels.insert(
format!("garage.{}/service", K8S_GROUP),
- kubernetes_service_name.to_string(),
+ kubernetes_config.service_name.to_string(),
);
debug!("Node object to be applied: {:#?}", node);
let client = Client::try_default().await?;
- let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
+ let nodes: Api<GarageNode> = Api::namespaced(client.clone(), &kubernetes_config.namespace);
if let Ok(old_node) = nodes.get(&node_pubkey).await {
node.metadata.resource_version = old_node.metadata.resource_version;
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 392ff48f..92caf75d 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -3,6 +3,7 @@
#[macro_use]
extern crate tracing;
+#[cfg(feature = "consul-discovery")]
mod consul;
#[cfg(feature = "kubernetes-discovery")]
mod kubernetes;
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 9e0bfa11..d6576f20 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -23,12 +23,15 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
+#[cfg(feature = "kubernetes-discovery")]
+use garage_util::config::KubernetesDiscoveryConfig;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*;
-use crate::consul::*;
+#[cfg(feature = "consul-discovery")]
+use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::*;
@@ -90,12 +93,14 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
+ #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<String>,
- consul_discovery: Option<ConsulDiscoveryParam>,
+ #[cfg(feature = "consul-discovery")]
+ consul_discovery: Option<ConsulDiscovery>,
#[cfg(feature = "kubernetes-discovery")]
- kubernetes_discovery: Option<KubernetesDiscoveryParam>,
+ kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
replication_factor: usize,
@@ -285,29 +290,21 @@ impl System {
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
- let consul_discovery = match (&config.consul_host, &config.consul_service_name) {
- (Some(ch), Some(csn)) => Some(ConsulDiscoveryParam {
- consul_host: ch.to_string(),
- service_name: csn.to_string(),
- }),
- _ => None,
- };
-
- #[cfg(feature = "kubernetes-discovery")]
- let kubernetes_discovery = match (
- &config.kubernetes_service_name,
- &config.kubernetes_namespace,
- ) {
- (Some(ksn), Some(kn)) => Some(KubernetesDiscoveryParam {
- service_name: ksn.to_string(),
- namespace: kn.to_string(),
- skip_crd: config.kubernetes_skip_crd,
- }),
- _ => None,
+ #[cfg(feature = "consul-discovery")]
+ let consul_discovery = match &config.consul_discovery {
+ Some(cfg) => Some(
+ ConsulDiscovery::new(cfg.clone())
+ .ok_or_message("Invalid Consul discovery configuration")?,
+ ),
+ None => None,
};
+ #[cfg(not(feature = "consul-discovery"))]
+ if config.consul_discovery.is_some() {
+ warn!("Consul discovery is not enabled in this build.");
+ }
#[cfg(not(feature = "kubernetes-discovery"))]
- if config.kubernetes_service_name.is_some() || config.kubernetes_namespace.is_some() {
+ if config.kubernetes_discovery.is_some() {
warn!("Kubernetes discovery is not enabled in this build.");
}
@@ -329,11 +326,13 @@ impl System {
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
+ #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(),
+ #[cfg(feature = "consul-discovery")]
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
- kubernetes_discovery,
+ kubernetes_discovery: config.kubernetes_discovery.clone(),
ring,
update_ring: Mutex::new(update_ring),
@@ -432,6 +431,7 @@ impl System {
// ---- INTERNALS ----
+ #[cfg(feature = "consul-discovery")]
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
let c = match &self.consul_discovery {
Some(c) => c,
@@ -446,9 +446,7 @@ impl System {
}
};
- publish_consul_service(
- &c.consul_host,
- &c.service_name,
+ c.publish_consul_service(
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
@@ -473,8 +471,7 @@ impl System {
};
publish_kubernetes_node(
- &k.service_name,
- &k.namespace,
+ k,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
@@ -644,8 +641,9 @@ impl System {
}
// Fetch peer list from Consul
+ #[cfg(feature = "consul-discovery")]
if let Some(c) = &self.consul_discovery {
- match get_consul_nodes(&c.consul_host, &c.service_name).await {
+ match c.get_consul_nodes().await {
Ok(node_list) => {
ping_list.extend(node_list);
}
@@ -667,7 +665,7 @@ impl System {
};
}
- match get_kubernetes_nodes(&k.service_name, &k.namespace).await {
+ match get_kubernetes_nodes(k).await {
Ok(node_list) => {
ping_list.extend(node_list);
}
@@ -691,6 +689,7 @@ impl System {
warn!("Could not save peer list to file: {}", e);
}
+ #[cfg(feature = "consul-discovery")]
self.background.spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
@@ -785,15 +784,3 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
ret
}
-
-struct ConsulDiscoveryParam {
- consul_host: String,
- service_name: String,
-}
-
-#[cfg(feature = "kubernetes-discovery")]
-struct KubernetesDiscoveryParam {
- service_name: String,
- namespace: String,
- skip_crd: bool,
-}
diff --git a/src/util/config.rs b/src/util/config.rs
index 2d4b4f57..04f8375a 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -46,20 +46,17 @@ pub struct Config {
/// Timeout for Netapp RPC calls
pub rpc_timeout_msec: Option<u64>,
+ // -- Bootstraping and discovery
/// Bootstrap peers RPC address
#[serde(default)]
pub bootstrap_peers: Vec<String>,
- /// Consul host to connect to to discover more peers
- pub consul_host: Option<String>,
- /// Consul service name to use
- pub consul_service_name: Option<String>,
- /// Kubernetes namespace the service discovery resources are be created in
- pub kubernetes_namespace: Option<String>,
- /// Service name to filter for in k8s custom resources
- pub kubernetes_service_name: Option<String>,
- /// Skip creation of the garagenodes CRD
+
+ /// Configuration for automatic node discovery through Consul
#[serde(default)]
- pub kubernetes_skip_crd: bool,
+ pub consul_discovery: Option<ConsulDiscoveryConfig>,
+ /// Configuration for automatic node discovery through Kubernetes
+ #[serde(default)]
+ pub kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
// -- DB
/// Database engine to use for metadata (options: sled, sqlite, lmdb)
@@ -129,6 +126,34 @@ pub struct AdminConfig {
pub trace_sink: Option<String>,
}
+#[derive(Deserialize, Debug, Clone)]
+pub struct ConsulDiscoveryConfig {
+ /// Consul http or https address to connect to to discover more peers
+ pub consul_http_addr: String,
+ /// Consul service name to use
+ pub service_name: String,
+ /// CA TLS certificate to use when connecting to Consul
+ pub ca_cert: Option<String>,
+ /// Client TLS certificate to use when connecting to Consul
+ pub client_cert: Option<String>,
+ /// Client TLS key to use when connecting to Consul
+ pub client_key: Option<String>,
+ /// Skip TLS hostname verification
+ #[serde(default)]
+ pub tls_skip_verify: bool,
+}
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct KubernetesDiscoveryConfig {
+ /// Kubernetes namespace the service discovery resources are be created in
+ pub namespace: String,
+ /// Service name to filter for in k8s custom resources
+ pub service_name: String,
+ /// Skip creation of the garagenodes CRD
+ #[serde(default)]
+ pub skip_crd: bool,
+}
+
fn default_db_engine() -> String {
"sled".into()
}