aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs79
1 files changed, 78 insertions, 1 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 6bca6e3e..c8fc0ad5 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -1,7 +1,7 @@
//! Module containing structs related to membership management
use std::collections::HashMap;
use std::io::{Read, Write};
-use std::net::SocketAddr;
+use std::net::{IpAddr, SocketAddr};
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -29,6 +29,7 @@ use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::*;
+use crate::kubernetes::*;
use crate::layout::*;
use crate::ring::*;
use crate::rpc_helper::*;
@@ -88,6 +89,11 @@ pub struct System {
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>,
consul_service_name: Option<String>,
+
+ kubernetes_service_name: Option<String>,
+ kubernetes_namespace: Option<String>,
+ kubernetes_skip_crd: bool,
+
replication_factor: usize,
/// The ring
@@ -247,6 +253,10 @@ impl System {
bootstrap_peers: config.bootstrap_peers.clone(),
consul_host: config.consul_host.clone(),
consul_service_name: config.consul_service_name.clone(),
+ kubernetes_service_name: config.kubernetes_service_name.clone(),
+ kubernetes_namespace: config.kubernetes_namespace.clone(),
+ kubernetes_skip_crd: config.kubernetes_skip_crd,
+
ring,
update_ring: Mutex::new(update_ring),
background,
@@ -295,6 +305,44 @@ impl System {
.err_context("Error while publishing Consul service")
}
+ fn get_default_ip() -> IpAddr {
+ pnet::datalink::interfaces()
+ .iter()
+ .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
+ .unwrap()
+ .ips
+ .first()
+ .unwrap()
+ .ip()
+ }
+
+ async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
+ let (kubernetes_service_name, kubernetes_namespace) =
+ match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
+ (Some(ch), Some(csn)) => (ch, csn),
+ _ => return Ok(()),
+ };
+
+ let rpc_public_addr =
+ match self.rpc_public_addr {
+ Some(addr) => addr,
+ None => {
+ warn!("No rpc_public_addr configured, using first address on first network interface");
+ SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port())
+ }
+ };
+
+ publish_kubernetes_node(
+ kubernetes_service_name,
+ kubernetes_namespace,
+ self.netapp.id,
+ &self.local_status.load_full().hostname,
+ rpc_public_addr,
+ )
+ .await
+ .err_context("Error while publishing node to kubernetes")
+ }
+
/// Save network configuration to disc
async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
@@ -470,6 +518,11 @@ impl System {
_ => None,
};
+ let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
+ (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
+ _ => None,
+ };
+
while !*stop_signal.borrow() {
let not_configured = !self.ring.borrow().layout.check();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
@@ -503,6 +556,28 @@ impl System {
}
}
+ // Fetch peer list from Kubernetes
+ if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config {
+ if !self.kubernetes_skip_crd {
+ match create_kubernetes_crd().await {
+ Ok(()) => (),
+ Err(e) => {
+ error!("Failed to create kubernetes custom resource: {}", e)
+ }
+ };
+ }
+
+ match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await
+ {
+ Ok(node_list) => {
+ ping_list.extend(node_list);
+ }
+ Err(e) => {
+ warn!("Could not retrieve node list from Kubernetes: {}", e);
+ }
+ }
+ }
+
for (node_id, node_addr) in ping_list {
tokio::spawn(
self.netapp
@@ -518,6 +593,8 @@ impl System {
}
self.background.spawn(self.clone().advertise_to_consul());
+ self.background
+ .spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {