diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 9 | ||||
-rw-r--r-- | src/rpc/kubernetes.rs | 118 | ||||
-rw-r--r-- | src/rpc/lib.rs | 1 | ||||
-rw-r--r-- | src/rpc/system.rs | 79 |
4 files changed, 206 insertions, 1 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index b49a126a..f06606e5 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -30,6 +30,15 @@ serde = { version = "1.0", default-features = false, features = ["derive", "rc"] serde_bytes = "0.11" serde_json = "1.0" +# newer version requires rust edition 2021 +kube = { version = "0.62", features = ["runtime", "derive"] } +k8s-openapi = { version = "0.13", features = ["v1_22"] } +openssl = { version = "0.10", features = ["vendored"] } +schemars = "0.8" + +# newer version requires rust edition 2021 +pnet = "0.28" + futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs new file mode 100644 index 00000000..8c0d6cdf --- /dev/null +++ b/src/rpc/kubernetes.rs @@ -0,0 +1,118 @@ +use std::collections::BTreeMap; +use std::net::{IpAddr, SocketAddr}; + +use kube::{ + api::{ListParams, Patch, PatchParams, PostParams}, + Api, Client, CustomResource, CustomResourceExt, +}; + +use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use netapp::NodeID; + +use garage_util::error::Error; + +static K8S_GROUP: &str = "deuxfleurs.fr"; + +#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[kube( + group = "deuxfleurs.fr", + version = "v1", + kind = "GarageNode", + namespaced +)] +pub struct Node { + hostname: String, + address: IpAddr, + port: u16, +} + +pub async fn create_kubernetes_crd() -> Result<(), Error> { + let client = Client::try_default().await?; + let crds: Api<CustomResourceDefinition> = Api::all(client.clone()); + + let params = PatchParams::apply(&format!("garage.{}", K8S_GROUP)); + let crd = GarageNode::crd(); + let patch = Patch::Apply(crd); + crds.patch(&format!("garagenodes.{}", K8S_GROUP), ¶ms, &patch) + .await?; + + Ok(()) +} + +pub async fn get_kubernetes_nodes( + kubernetes_service_name: &str, + kubernetes_namespace: &str, +) -> Result<Vec<(NodeID, SocketAddr)>, Error> { + let client = Client::try_default().await?; + let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace); + + let lp = ListParams::default().labels(&format!( + "garage.{}/service={}", + K8S_GROUP, kubernetes_service_name + )); + + let nodes = nodes.list(&lp).await?; + let mut ret = Vec::with_capacity(nodes.items.len()); + + for node in nodes { + println!("Found Pod: {:?}", node.metadata.name); + + let pubkey = &node + .metadata + .name + .map(|k| hex::decode(&k).ok()) + .flatten() + .map(|k| NodeID::from_slice(&k[..])) + .flatten(); + + if let Some(pubkey) = pubkey { + ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port))) + } + } + + Ok(ret) +} + +pub async fn publish_kubernetes_node( + kubernetes_service_name: &str, + kubernetes_namespace: &str, + node_id: NodeID, + hostname: &str, + rpc_public_addr: SocketAddr, +) -> Result<(), Error> { + let node_pubkey = hex::encode(node_id); + + let mut node = GarageNode::new( + &node_pubkey, + Node { + hostname: hostname.to_string(), + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + ); + + let labels = node.metadata.labels.insert(BTreeMap::new()); + labels.insert( + format!("garage.{}/service", K8S_GROUP), + kubernetes_service_name.to_string(), + ); + + debug!("Node object to be applied: {:#?}", node); + + let client = Client::try_default().await?; + let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace); + + if let Ok(old_node) = nodes.get(&node_pubkey).await { + node.metadata.resource_version = old_node.metadata.resource_version; + nodes + .replace(&node_pubkey, &PostParams::default(), &node) + .await?; + } else { + nodes.create(&PostParams::default(), &node).await?; + }; + + Ok(()) +} diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index b72392ab..2c877a7f 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -4,6 +4,7 @@ extern crate log; mod consul; +mod kubernetes; pub mod layout; pub mod ring; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 6bca6e3e..c8fc0ad5 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -1,7 +1,7 @@ //! Module containing structs related to membership management use std::collections::HashMap; use std::io::{Read, Write}; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::path::Path; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -29,6 +29,7 @@ use garage_util::persister::Persister; use garage_util::time::*; use crate::consul::*; +use crate::kubernetes::*; use crate::layout::*; use crate::ring::*; use crate::rpc_helper::*; @@ -88,6 +89,11 @@ pub struct System { bootstrap_peers: Vec<(NodeID, SocketAddr)>, consul_host: Option<String>, consul_service_name: Option<String>, + + kubernetes_service_name: Option<String>, + kubernetes_namespace: Option<String>, + kubernetes_skip_crd: bool, + replication_factor: usize, /// The ring @@ -247,6 +253,10 @@ impl System { bootstrap_peers: config.bootstrap_peers.clone(), consul_host: config.consul_host.clone(), consul_service_name: config.consul_service_name.clone(), + kubernetes_service_name: config.kubernetes_service_name.clone(), + kubernetes_namespace: config.kubernetes_namespace.clone(), + kubernetes_skip_crd: config.kubernetes_skip_crd, + ring, update_ring: Mutex::new(update_ring), background, @@ -295,6 +305,44 @@ impl System { .err_context("Error while publishing Consul service") } + fn get_default_ip() -> IpAddr { + pnet::datalink::interfaces() + .iter() + .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty()) + .unwrap() + .ips + .first() + .unwrap() + .ip() + } + + async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> { + let (kubernetes_service_name, kubernetes_namespace) = + match (&self.kubernetes_service_name, &self.kubernetes_namespace) { + (Some(ch), Some(csn)) => (ch, csn), + _ => return Ok(()), + }; + + let rpc_public_addr = + match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("No rpc_public_addr configured, using first address on first network interface"); + SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port()) + } + }; + + publish_kubernetes_node( + kubernetes_service_name, + kubernetes_namespace, + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + .err_context("Error while publishing node to kubernetes") + } + /// Save network configuration to disc async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> { let ring: Arc<Ring> = self.ring.borrow().clone(); @@ -470,6 +518,11 @@ impl System { _ => None, }; + let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) { + (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), + _ => None, + }; + while !*stop_signal.borrow() { let not_configured = !self.ring.borrow().layout.check(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; @@ -503,6 +556,28 @@ impl System { } } + // Fetch peer list from Kubernetes + if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config { + if !self.kubernetes_skip_crd { + match create_kubernetes_crd().await { + Ok(()) => (), + Err(e) => { + error!("Failed to create kubernetes custom resource: {}", e) + } + }; + } + + match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await + { + Ok(node_list) => { + ping_list.extend(node_list); + } + Err(e) => { + warn!("Could not retrieve node list from Kubernetes: {}", e); + } + } + } + for (node_id, node_addr) in ping_list { tokio::spawn( self.netapp @@ -518,6 +593,8 @@ impl System { } self.background.spawn(self.clone().advertise_to_consul()); + self.background + .spawn(self.clone().advertise_to_kubernetes()); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { |