aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMax Audron <audron@cocaine.farm>2022-03-06 14:50:00 +0100
committerMax Audron <audron@cocaine.farm>2022-03-12 13:05:52 +0100
commit9d44127245990cc55dbdff5a4bd0a1524348f110 (patch)
tree82110b06619b6cd7031398d8c1c6d42f7308b152 /src
parentc00b2c9948bc686a5f33805a5cc4295c933a723a (diff)
downloadgarage-9d44127245990cc55dbdff5a4bd0a1524348f110.tar.gz
garage-9d44127245990cc55dbdff5a4bd0a1524348f110.zip
add support for kubernetes service discovery
This commit adds support to discover garage instances running in kubernetes. Once enabled by setting `kubernetes_namespace` and `kubernetes_service_name` garage will create a Custom Resources `garagenodes.deuxfleurs.fr` with nodes public key as the resource name. and IP and Port information as spec in the namespace configured by `kubernetes_namespace`. For discovering nodes the resources are filtered with the optionally set `kubernetes_service_name` which sets a label `garage.deuxfleurs.fr/service` on the resources. This allows to separate multiple garage deployments in a single namespace. the `kubernetes_skip_crd` variable allows to disable the creation of the CRD by garage itself. The user must deploy this manually.
Diffstat (limited to 'src')
-rw-r--r--src/rpc/Cargo.toml9
-rw-r--r--src/rpc/kubernetes.rs118
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/system.rs79
-rw-r--r--src/util/Cargo.toml3
-rw-r--r--src/util/config.rs7
-rw-r--r--src/util/error.rs3
7 files changed, 219 insertions, 1 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index b49a126a..f06606e5 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -30,6 +30,15 @@ serde = { version = "1.0", default-features = false, features = ["derive", "rc"]
serde_bytes = "0.11"
serde_json = "1.0"
+# newer version requires rust edition 2021
+kube = { version = "0.62", features = ["runtime", "derive"] }
+k8s-openapi = { version = "0.13", features = ["v1_22"] }
+openssl = { version = "0.10", features = ["vendored"] }
+schemars = "0.8"
+
+# newer version requires rust edition 2021
+pnet = "0.28"
+
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs
new file mode 100644
index 00000000..8c0d6cdf
--- /dev/null
+++ b/src/rpc/kubernetes.rs
@@ -0,0 +1,118 @@
+use std::collections::BTreeMap;
+use std::net::{IpAddr, SocketAddr};
+
+use kube::{
+ api::{ListParams, Patch, PatchParams, PostParams},
+ Api, Client, CustomResource, CustomResourceExt,
+};
+
+use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
+use schemars::JsonSchema;
+use serde::{Deserialize, Serialize};
+
+use netapp::NodeID;
+
+use garage_util::error::Error;
+
+static K8S_GROUP: &str = "deuxfleurs.fr";
+
+#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
+#[kube(
+ group = "deuxfleurs.fr",
+ version = "v1",
+ kind = "GarageNode",
+ namespaced
+)]
+pub struct Node {
+ hostname: String,
+ address: IpAddr,
+ port: u16,
+}
+
+pub async fn create_kubernetes_crd() -> Result<(), Error> {
+ let client = Client::try_default().await?;
+ let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
+
+ let params = PatchParams::apply(&format!("garage.{}", K8S_GROUP));
+ let crd = GarageNode::crd();
+ let patch = Patch::Apply(crd);
+ crds.patch(&format!("garagenodes.{}", K8S_GROUP), &params, &patch)
+ .await?;
+
+ Ok(())
+}
+
+pub async fn get_kubernetes_nodes(
+ kubernetes_service_name: &str,
+ kubernetes_namespace: &str,
+) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
+ let client = Client::try_default().await?;
+ let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
+
+ let lp = ListParams::default().labels(&format!(
+ "garage.{}/service={}",
+ K8S_GROUP, kubernetes_service_name
+ ));
+
+ let nodes = nodes.list(&lp).await?;
+ let mut ret = Vec::with_capacity(nodes.items.len());
+
+ for node in nodes {
+ println!("Found Pod: {:?}", node.metadata.name);
+
+ let pubkey = &node
+ .metadata
+ .name
+ .map(|k| hex::decode(&k).ok())
+ .flatten()
+ .map(|k| NodeID::from_slice(&k[..]))
+ .flatten();
+
+ if let Some(pubkey) = pubkey {
+ ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port)))
+ }
+ }
+
+ Ok(ret)
+}
+
+pub async fn publish_kubernetes_node(
+ kubernetes_service_name: &str,
+ kubernetes_namespace: &str,
+ node_id: NodeID,
+ hostname: &str,
+ rpc_public_addr: SocketAddr,
+) -> Result<(), Error> {
+ let node_pubkey = hex::encode(node_id);
+
+ let mut node = GarageNode::new(
+ &node_pubkey,
+ Node {
+ hostname: hostname.to_string(),
+ address: rpc_public_addr.ip(),
+ port: rpc_public_addr.port(),
+ },
+ );
+
+ let labels = node.metadata.labels.insert(BTreeMap::new());
+ labels.insert(
+ format!("garage.{}/service", K8S_GROUP),
+ kubernetes_service_name.to_string(),
+ );
+
+ debug!("Node object to be applied: {:#?}", node);
+
+ let client = Client::try_default().await?;
+ let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
+
+ if let Ok(old_node) = nodes.get(&node_pubkey).await {
+ node.metadata.resource_version = old_node.metadata.resource_version;
+ nodes
+ .replace(&node_pubkey, &PostParams::default(), &node)
+ .await?;
+ } else {
+ nodes.create(&PostParams::default(), &node).await?;
+ };
+
+ Ok(())
+}
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index b72392ab..2c877a7f 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -4,6 +4,7 @@
extern crate log;
mod consul;
+mod kubernetes;
pub mod layout;
pub mod ring;
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 6bca6e3e..c8fc0ad5 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -1,7 +1,7 @@
//! Module containing structs related to membership management
use std::collections::HashMap;
use std::io::{Read, Write};
-use std::net::SocketAddr;
+use std::net::{IpAddr, SocketAddr};
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -29,6 +29,7 @@ use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::*;
+use crate::kubernetes::*;
use crate::layout::*;
use crate::ring::*;
use crate::rpc_helper::*;
@@ -88,6 +89,11 @@ pub struct System {
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>,
consul_service_name: Option<String>,
+
+ kubernetes_service_name: Option<String>,
+ kubernetes_namespace: Option<String>,
+ kubernetes_skip_crd: bool,
+
replication_factor: usize,
/// The ring
@@ -247,6 +253,10 @@ impl System {
bootstrap_peers: config.bootstrap_peers.clone(),
consul_host: config.consul_host.clone(),
consul_service_name: config.consul_service_name.clone(),
+ kubernetes_service_name: config.kubernetes_service_name.clone(),
+ kubernetes_namespace: config.kubernetes_namespace.clone(),
+ kubernetes_skip_crd: config.kubernetes_skip_crd,
+
ring,
update_ring: Mutex::new(update_ring),
background,
@@ -295,6 +305,44 @@ impl System {
.err_context("Error while publishing Consul service")
}
+ fn get_default_ip() -> IpAddr {
+ pnet::datalink::interfaces()
+ .iter()
+ .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
+ .unwrap()
+ .ips
+ .first()
+ .unwrap()
+ .ip()
+ }
+
+ async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
+ let (kubernetes_service_name, kubernetes_namespace) =
+ match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
+ (Some(ch), Some(csn)) => (ch, csn),
+ _ => return Ok(()),
+ };
+
+ let rpc_public_addr =
+ match self.rpc_public_addr {
+ Some(addr) => addr,
+ None => {
+ warn!("No rpc_public_addr configured, using first address on first network interface");
+ SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port())
+ }
+ };
+
+ publish_kubernetes_node(
+ kubernetes_service_name,
+ kubernetes_namespace,
+ self.netapp.id,
+ &self.local_status.load_full().hostname,
+ rpc_public_addr,
+ )
+ .await
+ .err_context("Error while publishing node to kubernetes")
+ }
+
/// Save network configuration to disc
async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
@@ -470,6 +518,11 @@ impl System {
_ => None,
};
+ let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
+ (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
+ _ => None,
+ };
+
while !*stop_signal.borrow() {
let not_configured = !self.ring.borrow().layout.check();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
@@ -503,6 +556,28 @@ impl System {
}
}
+ // Fetch peer list from Kubernetes
+ if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config {
+ if !self.kubernetes_skip_crd {
+ match create_kubernetes_crd().await {
+ Ok(()) => (),
+ Err(e) => {
+ error!("Failed to create kubernetes custom resource: {}", e)
+ }
+ };
+ }
+
+ match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await
+ {
+ Ok(node_list) => {
+ ping_list.extend(node_list);
+ }
+ Err(e) => {
+ warn!("Could not retrieve node list from Kubernetes: {}", e);
+ }
+ }
+ }
+
for (node_id, node_addr) in ping_list {
tokio::spawn(
self.netapp
@@ -518,6 +593,8 @@ impl System {
}
self.background.spawn(self.clone().advertise_to_consul());
+ self.background
+ .spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index d5200f98..4e3c8c25 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -38,3 +38,6 @@ netapp = "0.3.0"
http = "0.2"
hyper = "0.14"
+
+kube = { version = "0.62", features = ["runtime", "derive"] }
+k8s-openapi = { version = "0.13", features = ["v1_22"] }
diff --git a/src/util/config.rs b/src/util/config.rs
index f1f4b06a..19c75478 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -52,6 +52,13 @@ pub struct Config {
pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>,
+ /// Kubernetes namespace the service discovery resources are be created in
+ pub kubernetes_namespace: Option<String>,
+ /// Service name to filter for in k8s custom resources
+ pub kubernetes_service_name: Option<String>,
+ /// Skip creation of the garagenodes CRD
+ #[serde(default)]
+ pub kubernetes_skip_crd: bool,
/// Sled cache size, in bytes
#[serde(default = "default_sled_cache_capacity")]
diff --git a/src/util/error.rs b/src/util/error.rs
index bdb3a69b..93b28038 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -23,6 +23,9 @@ pub enum Error {
#[error(display = "Invalid HTTP header value: {}", _0)]
HttpHeader(#[error(source)] http::header::ToStrError),
+ #[error(display = "kubernetes error: {}", _0)]
+ Kubernetes(#[error(source)] kube::Error),
+
#[error(display = "Netapp error: {}", _0)]
Netapp(#[error(source)] netapp::error::Error),