diff options
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r-- | src/rpc/system.rs | 73 |
1 files changed, 30 insertions, 43 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 9e0bfa11..d6576f20 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -23,12 +23,15 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_util::background::BackgroundRunner; use garage_util::config::Config; +#[cfg(feature = "kubernetes-discovery")] +use garage_util::config::KubernetesDiscoveryConfig; use garage_util::data::*; use garage_util::error::*; use garage_util::persister::Persister; use garage_util::time::*; -use crate::consul::*; +#[cfg(feature = "consul-discovery")] +use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; @@ -90,12 +93,14 @@ pub struct System { system_endpoint: Arc<Endpoint<SystemRpc, System>>, rpc_listen_addr: SocketAddr, + #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] rpc_public_addr: Option<SocketAddr>, bootstrap_peers: Vec<String>, - consul_discovery: Option<ConsulDiscoveryParam>, + #[cfg(feature = "consul-discovery")] + consul_discovery: Option<ConsulDiscovery>, #[cfg(feature = "kubernetes-discovery")] - kubernetes_discovery: Option<KubernetesDiscoveryParam>, + kubernetes_discovery: Option<KubernetesDiscoveryConfig>, replication_factor: usize, @@ -285,29 +290,21 @@ impl System { let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); - let consul_discovery = match (&config.consul_host, &config.consul_service_name) { - (Some(ch), Some(csn)) => Some(ConsulDiscoveryParam { - consul_host: ch.to_string(), - service_name: csn.to_string(), - }), - _ => None, - }; - - #[cfg(feature = "kubernetes-discovery")] - let kubernetes_discovery = match ( - &config.kubernetes_service_name, - &config.kubernetes_namespace, - ) { - (Some(ksn), Some(kn)) => Some(KubernetesDiscoveryParam { - service_name: ksn.to_string(), - namespace: kn.to_string(), - skip_crd: config.kubernetes_skip_crd, - }), - _ => None, + #[cfg(feature = "consul-discovery")] + let consul_discovery = match &config.consul_discovery { + Some(cfg) => Some( + ConsulDiscovery::new(cfg.clone()) + .ok_or_message("Invalid Consul discovery configuration")?, + ), + None => None, }; + #[cfg(not(feature = "consul-discovery"))] + if config.consul_discovery.is_some() { + warn!("Consul discovery is not enabled in this build."); + } #[cfg(not(feature = "kubernetes-discovery"))] - if config.kubernetes_service_name.is_some() || config.kubernetes_namespace.is_some() { + if config.kubernetes_discovery.is_some() { warn!("Kubernetes discovery is not enabled in this build."); } @@ -329,11 +326,13 @@ impl System { system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, + #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), + #[cfg(feature = "consul-discovery")] consul_discovery, #[cfg(feature = "kubernetes-discovery")] - kubernetes_discovery, + kubernetes_discovery: config.kubernetes_discovery.clone(), ring, update_ring: Mutex::new(update_ring), @@ -432,6 +431,7 @@ impl System { // ---- INTERNALS ---- + #[cfg(feature = "consul-discovery")] async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { let c = match &self.consul_discovery { Some(c) => c, @@ -446,9 +446,7 @@ impl System { } }; - publish_consul_service( - &c.consul_host, - &c.service_name, + c.publish_consul_service( self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, @@ -473,8 +471,7 @@ impl System { }; publish_kubernetes_node( - &k.service_name, - &k.namespace, + k, self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, @@ -644,8 +641,9 @@ impl System { } // Fetch peer list from Consul + #[cfg(feature = "consul-discovery")] if let Some(c) = &self.consul_discovery { - match get_consul_nodes(&c.consul_host, &c.service_name).await { + match c.get_consul_nodes().await { Ok(node_list) => { ping_list.extend(node_list); } @@ -667,7 +665,7 @@ impl System { }; } - match get_kubernetes_nodes(&k.service_name, &k.namespace).await { + match get_kubernetes_nodes(k).await { Ok(node_list) => { ping_list.extend(node_list); } @@ -691,6 +689,7 @@ impl System { warn!("Could not save peer list to file: {}", e); } + #[cfg(feature = "consul-discovery")] self.background.spawn(self.clone().advertise_to_consul()); #[cfg(feature = "kubernetes-discovery")] @@ -785,15 +784,3 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { ret } - -struct ConsulDiscoveryParam { - consul_host: String, - service_name: String, -} - -#[cfg(feature = "kubernetes-discovery")] -struct KubernetesDiscoveryParam { - service_name: String, - namespace: String, - skip_crd: bool, -} |