diff options
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r-- | src/rpc/system.rs | 79 |
1 files changed, 78 insertions, 1 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 6bca6e3e..c8fc0ad5 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -1,7 +1,7 @@ //! Module containing structs related to membership management use std::collections::HashMap; use std::io::{Read, Write}; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::path::Path; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -29,6 +29,7 @@ use garage_util::persister::Persister; use garage_util::time::*; use crate::consul::*; +use crate::kubernetes::*; use crate::layout::*; use crate::ring::*; use crate::rpc_helper::*; @@ -88,6 +89,11 @@ pub struct System { bootstrap_peers: Vec<(NodeID, SocketAddr)>, consul_host: Option<String>, consul_service_name: Option<String>, + + kubernetes_service_name: Option<String>, + kubernetes_namespace: Option<String>, + kubernetes_skip_crd: bool, + replication_factor: usize, /// The ring @@ -247,6 +253,10 @@ impl System { bootstrap_peers: config.bootstrap_peers.clone(), consul_host: config.consul_host.clone(), consul_service_name: config.consul_service_name.clone(), + kubernetes_service_name: config.kubernetes_service_name.clone(), + kubernetes_namespace: config.kubernetes_namespace.clone(), + kubernetes_skip_crd: config.kubernetes_skip_crd, + ring, update_ring: Mutex::new(update_ring), background, @@ -295,6 +305,44 @@ impl System { .err_context("Error while publishing Consul service") } + fn get_default_ip() -> IpAddr { + pnet::datalink::interfaces() + .iter() + .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty()) + .unwrap() + .ips + .first() + .unwrap() + .ip() + } + + async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> { + let (kubernetes_service_name, kubernetes_namespace) = + match (&self.kubernetes_service_name, &self.kubernetes_namespace) { + (Some(ch), Some(csn)) => (ch, csn), + _ => return Ok(()), + }; + + let rpc_public_addr = + match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("No rpc_public_addr configured, using first address on first network interface"); + SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port()) + } + }; + + publish_kubernetes_node( + kubernetes_service_name, + kubernetes_namespace, + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + .err_context("Error while publishing node to kubernetes") + } + /// Save network configuration to disc async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> { let ring: Arc<Ring> = self.ring.borrow().clone(); @@ -470,6 +518,11 @@ impl System { _ => None, }; + let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) { + (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), + _ => None, + }; + while !*stop_signal.borrow() { let not_configured = !self.ring.borrow().layout.check(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; @@ -503,6 +556,28 @@ impl System { } } + // Fetch peer list from Kubernetes + if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config { + if !self.kubernetes_skip_crd { + match create_kubernetes_crd().await { + Ok(()) => (), + Err(e) => { + error!("Failed to create kubernetes custom resource: {}", e) + } + }; + } + + match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await + { + Ok(node_list) => { + ping_list.extend(node_list); + } + Err(e) => { + warn!("Could not retrieve node list from Kubernetes: {}", e); + } + } + } + for (node_id, node_addr) in ping_list { tokio::spawn( self.netapp @@ -518,6 +593,8 @@ impl System { } self.background.spawn(self.clone().advertise_to_consul()); + self.background + .spawn(self.clone().advertise_to_kubernetes()); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { |