diff options
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r-- | src/rpc/system.rs | 160 |
1 files changed, 97 insertions, 63 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 741f68e2..68d94ea5 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -29,6 +29,7 @@ use garage_util::persister::Persister; use garage_util::time::*; use crate::consul::*; +#[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; use crate::ring::*; @@ -90,12 +91,10 @@ pub struct System { rpc_listen_addr: SocketAddr, rpc_public_addr: Option<SocketAddr>, bootstrap_peers: Vec<(NodeID, SocketAddr)>, - consul_host: Option<String>, - consul_service_name: Option<String>, - kubernetes_service_name: Option<String>, - kubernetes_namespace: Option<String>, - kubernetes_skip_crd: bool, + consul_discovery: Option<ConsulDiscoveryParam>, + #[cfg(feature = "kubernetes-discovery")] + kubernetes_discovery: Option<KubernetesDiscoveryParam>, replication_factor: usize, @@ -228,15 +227,53 @@ impl System { let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); + let rpc_public_addr = match config.rpc_public_addr { + Some(a) => Some(a), + None => { + let addr = + get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); + if let Some(a) = addr { + warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a); + } + addr + } + }; + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new( netapp.clone(), config.bootstrap_peers.clone(), - config.rpc_public_addr, + rpc_public_addr, ); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); + let consul_discovery = match (&config.consul_host, &config.consul_service_name) { + (Some(ch), Some(csn)) => Some(ConsulDiscoveryParam { + consul_host: ch.to_string(), + service_name: csn.to_string(), + }), + _ => None, + }; + + #[cfg(feature = "kubernetes-discovery")] + let kubernetes_discovery = match ( + &config.kubernetes_service_name, + &config.kubernetes_namespace, + ) { + (Some(ksn), Some(kn)) => Some(KubernetesDiscoveryParam { + service_name: ksn.to_string(), + namespace: kn.to_string(), + skip_crd: config.kubernetes_skip_crd, + }), + _ => None, + }; + + #[cfg(not(feature = "kubernetes-discovery"))] + if config.kubernetes_service_name.is_some() || config.kubernetes_namespace.is_some() { + warn!("Kubernetes discovery is not enabled in this build."); + } + let sys = Arc::new(System { id: netapp.id.into(), persist_cluster_layout, @@ -249,13 +286,11 @@ impl System { system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, - rpc_public_addr: config.rpc_public_addr, + rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), - consul_host: config.consul_host.clone(), - consul_service_name: config.consul_service_name.clone(), - kubernetes_service_name: config.kubernetes_service_name.clone(), - kubernetes_namespace: config.kubernetes_namespace.clone(), - kubernetes_skip_crd: config.kubernetes_skip_crd, + consul_discovery, + #[cfg(feature = "kubernetes-discovery")] + kubernetes_discovery, ring, update_ring: Mutex::new(update_ring), @@ -280,23 +315,22 @@ impl System { // ---- INTERNALS ---- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { - let (consul_host, consul_service_name) = - match (&self.consul_host, &self.consul_service_name) { - (Some(ch), Some(csn)) => (ch, csn), - _ => return Ok(()), - }; + let c = match &self.consul_discovery { + Some(c) => c, + _ => return Ok(()), + }; let rpc_public_addr = match self.rpc_public_addr { Some(addr) => addr, None => { - warn!("Not advertising to Consul because rpc_public_addr is not defined in config file."); + warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected."); return Ok(()); } }; publish_consul_service( - consul_host, - consul_service_name, + &c.consul_host, + &c.service_name, self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, @@ -305,36 +339,24 @@ impl System { .err_context("Error while publishing Consul service") } - fn get_default_ip() -> IpAddr { - pnet::datalink::interfaces() - .iter() - .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty()) - .unwrap() - .ips - .first() - .unwrap() - .ip() - } - + #[cfg(feature = "kubernetes-discovery")] async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> { - let (kubernetes_service_name, kubernetes_namespace) = - match (&self.kubernetes_service_name, &self.kubernetes_namespace) { - (Some(ch), Some(csn)) => (ch, csn), - _ => return Ok(()), - }; - - let rpc_public_addr = - match self.rpc_public_addr { - Some(addr) => addr, - None => { - warn!("No rpc_public_addr configured, using first address on first network interface"); - SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port()) - } - }; + let k = match &self.kubernetes_discovery { + Some(k) => k, + _ => return Ok(()), + }; + + let rpc_public_addr = match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected."); + return Ok(()); + } + }; publish_kubernetes_node( - kubernetes_service_name, - kubernetes_namespace, + &k.service_name, + &k.namespace, self.netapp.id, &self.local_status.load_full().hostname, rpc_public_addr, @@ -513,16 +535,6 @@ impl System { } async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { - let consul_config = match (&self.consul_host, &self.consul_service_name) { - (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), - _ => None, - }; - - let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) { - (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), - _ => None, - }; - while !*stop_signal.borrow() { let not_configured = !self.ring.borrow().layout.check(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; @@ -545,8 +557,8 @@ impl System { } // Fetch peer list from Consul - if let Some((consul_host, consul_service_name)) = &consul_config { - match get_consul_nodes(consul_host, consul_service_name).await { + if let Some(c) = &self.consul_discovery { + match get_consul_nodes(&c.consul_host, &c.service_name).await { Ok(node_list) => { ping_list.extend(node_list); } @@ -557,8 +569,9 @@ impl System { } // Fetch peer list from Kubernetes - if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config { - if !self.kubernetes_skip_crd { + #[cfg(feature = "kubernetes-discovery")] + if let Some(k) = &self.kubernetes_discovery { + if !k.skip_crd { match create_kubernetes_crd().await { Ok(()) => (), Err(e) => { @@ -567,8 +580,7 @@ impl System { }; } - match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await - { + match get_kubernetes_nodes(&k.service_name, &k.namespace).await { Ok(node_list) => { ping_list.extend(node_list); } @@ -593,6 +605,8 @@ impl System { } self.background.spawn(self.clone().advertise_to_consul()); + + #[cfg(feature = "kubernetes-discovery")] self.background .spawn(self.clone().advertise_to_kubernetes()); @@ -657,3 +671,23 @@ impl EndpointHandler<SystemRpc> for System { } } } + +fn get_default_ip() -> Option<IpAddr> { + pnet_datalink::interfaces() + .iter() + .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty()) + .and_then(|e| e.ips.first()) + .map(|a| a.ip()) +} + +struct ConsulDiscoveryParam { + consul_host: String, + service_name: String, +} + +#[cfg(feature = "kubernetes-discovery")] +struct KubernetesDiscoveryParam { + service_name: String, + namespace: String, + skip_crd: bool, +} |