diff options
author | Alex Auvolat <alex@adnab.me> | 2022-03-16 12:09:50 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-03-16 12:12:33 +0100 |
commit | 4fc6a2ef554ec5e69522905ca4c18d2a39d22051 (patch) | |
tree | a6ac34a8dbd8623b390c182529dc393bc7e8444d /src | |
parent | ba6b56ae68d5842d814769418d484093865261aa (diff) | |
download | garage-4fc6a2ef554ec5e69522905ca4c18d2a39d22051.tar.gz garage-4fc6a2ef554ec5e69522905ca4c18d2a39d22051.zip |
Add feature flag for Kubernetes discovery
Diffstat (limited to 'src')
-rw-r--r-- | src/rpc/Cargo.toml | 12 | ||||
-rw-r--r-- | src/rpc/kubernetes.rs | 8 | ||||
-rw-r--r-- | src/rpc/lib.rs | 1 | ||||
-rw-r--r-- | src/rpc/system.rs | 160 | ||||
-rw-r--r-- | src/util/Cargo.toml | 3 | ||||
-rw-r--r-- | src/util/error.rs | 3 |
6 files changed, 108 insertions, 79 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index a027f219..c15a82a7 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -32,13 +32,13 @@ serde_bytes = "0.11" serde_json = "1.0" # newer version requires rust edition 2021 -kube = { version = "0.62", features = ["runtime", "derive"] } -k8s-openapi = { version = "0.13", features = ["v1_22"] } -openssl = { version = "0.10", features = ["vendored"] } -schemars = "0.8" +kube = { version = "0.62", features = ["runtime", "derive"], optional = true } +k8s-openapi = { version = "0.13", features = ["v1_22"], optional = true } +openssl = { version = "0.10", features = ["vendored"], optional = true } +schemars = { version = "0.8", optional = true } # newer version requires rust edition 2021 -pnet = "0.28" +pnet_datalink = "0.28" futures = "0.3" futures-util = "0.3" @@ -52,3 +52,5 @@ netapp = { version = "0.4", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } +[features] +kubernetes-discovery = [ "kube", "k8s-openapi", "openssl", "schemars" ] diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs index 272d9162..939a0eed 100644 --- a/src/rpc/kubernetes.rs +++ b/src/rpc/kubernetes.rs @@ -12,8 +12,6 @@ use serde::{Deserialize, Serialize}; use netapp::NodeID; -use garage_util::error::Error; - static K8S_GROUP: &str = "deuxfleurs.fr"; #[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)] @@ -29,7 +27,7 @@ pub struct Node { port: u16, } -pub async fn create_kubernetes_crd() -> Result<(), Error> { +pub async fn create_kubernetes_crd() -> Result<(), kube::Error> { let client = Client::try_default().await?; let crds: Api<CustomResourceDefinition> = Api::all(client.clone()); @@ -45,7 +43,7 @@ pub async fn create_kubernetes_crd() -> Result<(), Error> { pub async fn get_kubernetes_nodes( kubernetes_service_name: &str, kubernetes_namespace: &str, -) -> Result<Vec<(NodeID, SocketAddr)>, Error> { +) -> Result<Vec<(NodeID, SocketAddr)>, kube::Error> { let client = Client::try_default().await?; let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace); @@ -80,7 +78,7 @@ pub async fn publish_kubernetes_node( node_id: NodeID, hostname: &str, rpc_public_addr: SocketAddr, -) -> Result<(), Error> { +) -> Result<(), kube::Error> { let node_pubkey = hex::encode(node_id); let mut node = GarageNode::new( diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index b8fb9772..392ff48f 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -4,6 +4,7 @@ extern crate tracing; mod consul; +#[cfg(feature = "kubernetes-discovery")] mod kubernetes; pub mod layout; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 741f68e2..68d94ea5 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -29,6 +29,7 @@ use garage_util::persister::Persister; use garage_util::time::*; use crate::consul::*; +#[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; use crate::ring::*; @@ -90,12 +91,10 @@ pub struct System { rpc_listen_addr: SocketAddr, rpc_public_addr: Option<SocketAddr>, bootstrap_peers: Vec<(NodeID, SocketAddr)>, - consul_host: Option<String>, - consul_service_name: Option<String>, - kubernetes_service_name: Option<String>, - kubernetes_namespace: Option<String>, - kubernetes_skip_crd: bool, + consul_discovery: Option<ConsulDiscoveryParam>, + #[cfg(feature = "kubernetes-discovery")] + kubernetes_discovery: Option<KubernetesDiscoveryParam>, replication_factor: usize, @@ -228,15 +227,53 @@ impl System { let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); + let rpc_public_addr = match config.rpc_public_addr { + Some(a) => Some(a), + None => { + let addr = + get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); + if let Some(a) = addr { + warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a); + } + addr + } + }; + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new( netapp.clone(), config.bootstrap_peers.clone(), - config.rpc_public_addr, + rpc_public_addr, ); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); + let consul_discovery = match (&config.consul_host, &config.consul_service_name) { + (Some(ch), Some(csn)) => Some(ConsulDiscoveryParam { + consul_host: ch.to_string(), + service_name: csn.to_string(), + }), + _ => None, + }; + + #[cfg(feature = "kubernetes-discovery")] + let kubernetes_discovery = match ( + &config.kubernetes_service_name, + &config.kubernetes_namespace, + ) { + (Some(ksn), Some(kn)) => Some(KubernetesDiscoveryParam { + service_name: ksn.to_string(), + namespace: kn.to_string(), + skip_crd: config.kubernetes_skip_crd, + }), + _ => None, + }; + + #[cfg(not(feature = "kubernetes-discovery"))] + if config.kubernetes_service_name.is_some() || config.kubernetes_namespace.is_some() { + warn!("Kubernetes discovery is not enabled in this build."); + } + let sys = Arc::new(System { id: netapp.id.into(), persist_cluster_layout, @@ -249,13 +286,11 @@ impl System { system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, - rpc_public_addr: config.rpc_public_addr, + rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), - consul_host: config.consul_host.clone(), - consul_service_name: config.consul_service_name.clone(), - kubernetes_service_name: config.kubernetes_service_name.clone(), - kubernetes_namespace: config.kubernetes_namespace.clone(), - kubernetes_skip_crd: config.kubernetes_skip_crd, + consul_discovery, + #[cfg(feature = "kubernetes-discovery")] + kubernetes_discovery, ring, update_ring: Mutex::new(update_ring), @@ -280,23 +315,22 @@ impl System { // ---- INTERNALS ---- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { - let (consul_host, consul_service_name) = - match (&self.consul_host, &self.consul_service_name) { - (Some(ch), Some(csn)) => (ch, csn), - _ => return Ok(()), - }; + let c = match &self.consul_discovery { + Some(c) => c, + _ => return Ok(()), + }; let rpc_public_addr = match self.rpc_public_addr { Some(addr) => addr, None => { - warn!("Not advertising to Consul because rpc_public_addr is not defined in config file."); + warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected."); return Ok(()); } }; publish_consul_service( - consul_host, - consul_service_name, + &c.consul_host, + &c.service_name, self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, @@ -305,36 +339,24 @@ impl System { .err_context("Error while publishing Consul service") } - fn get_default_ip() -> IpAddr { - pnet::datalink::interfaces() - .iter() - .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty()) - .unwrap() - .ips - .first() - .unwrap() - .ip() - } - + #[cfg(feature = "kubernetes-discovery")] async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> { - let (kubernetes_service_name, kubernetes_namespace) = - match (&self.kubernetes_service_name, &self.kubernetes_namespace) { - (Some(ch), Some(csn)) => (ch, csn), - _ => return Ok(()), - }; - - let rpc_public_addr = - match self.rpc_public_addr { - Some(addr) => addr, - None => { - warn!("No rpc_public_addr configured, using first address on first network interface"); - SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port()) - } - }; + let k = match &self.kubernetes_discovery { + Some(k) => k, + _ => return Ok(()), + }; + + let rpc_public_addr = match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected."); + return Ok(()); + } + }; publish_kubernetes_node( - kubernetes_service_name, - kubernetes_namespace, + &k.service_name, + &k.namespace, self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, @@ -513,16 +535,6 @@ impl System { } async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { - let consul_config = match (&self.consul_host, &self.consul_service_name) { - (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), - _ => None, - }; - - let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) { - (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), - _ => None, - }; - while !*stop_signal.borrow() { let not_configured = !self.ring.borrow().layout.check(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; @@ -545,8 +557,8 @@ impl System { } // Fetch peer list from Consul - if let Some((consul_host, consul_service_name)) = &consul_config { - match get_consul_nodes(consul_host, consul_service_name).await { + if let Some(c) = &self.consul_discovery { + match get_consul_nodes(&c.consul_host, &c.service_name).await { Ok(node_list) => { ping_list.extend(node_list); } @@ -557,8 +569,9 @@ impl System { } // Fetch peer list from Kubernetes - if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config { - if !self.kubernetes_skip_crd { + #[cfg(feature = "kubernetes-discovery")] + if let Some(k) = &self.kubernetes_discovery { + if !k.skip_crd { match create_kubernetes_crd().await { Ok(()) => (), Err(e) => { @@ -567,8 +580,7 @@ impl System { }; } - match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await - { + match get_kubernetes_nodes(&k.service_name, &k.namespace).await { Ok(node_list) => { ping_list.extend(node_list); } @@ -593,6 +605,8 @@ impl System { } self.background.spawn(self.clone().advertise_to_consul()); + + #[cfg(feature = "kubernetes-discovery")] self.background .spawn(self.clone().advertise_to_kubernetes()); @@ -657,3 +671,23 @@ impl EndpointHandler<SystemRpc> for System { } } } + +fn get_default_ip() -> Option<IpAddr> { + pnet_datalink::interfaces() + .iter() + .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty()) + .and_then(|e| e.ips.first()) + .map(|a| a.ip()) +} + +struct ConsulDiscoveryParam { + consul_host: String, + service_name: String, +} + +#[cfg(feature = "kubernetes-discovery")] +struct KubernetesDiscoveryParam { + service_name: String, + namespace: String, + skip_crd: bool, +} diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index dfa4e822..1ad778e4 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -40,7 +40,4 @@ netapp = "0.4" http = "0.2" hyper = "0.14" -kube = { version = "0.62", features = ["runtime", "derive"] } -k8s-openapi = { version = "0.13", features = ["v1_22"] } - opentelemetry = "0.17" diff --git a/src/util/error.rs b/src/util/error.rs index 93b28038..bdb3a69b 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -23,9 +23,6 @@ pub enum Error { #[error(display = "Invalid HTTP header value: {}", _0)] HttpHeader(#[error(source)] http::header::ToStrError), - #[error(display = "kubernetes error: {}", _0)] - Kubernetes(#[error(source)] kube::Error), - #[error(display = "Netapp error: {}", _0)] Netapp(#[error(source)] netapp::error::Error), |