aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-03-16 12:09:50 +0100
committerGitea <gitea@fake.local>2022-03-24 16:57:43 +0100
commit9d0ed788879ad50450ebd44cd2a0b4fc942d45b6 (patch)
tree34225ffffac6b0bd3d2788d9827c08e4ae5e1afd /src
parent509d256c58ccb1aa0041569556465908453976b3 (diff)
downloadgarage-9d0ed788879ad50450ebd44cd2a0b4fc942d45b6.tar.gz
garage-9d0ed788879ad50450ebd44cd2a0b4fc942d45b6.zip
Add feature flag for Kubernetes discovery
Diffstat (limited to 'src')
-rw-r--r--src/rpc/Cargo.toml12
-rw-r--r--src/rpc/kubernetes.rs8
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/system.rs160
-rw-r--r--src/util/Cargo.toml3
-rw-r--r--src/util/error.rs3
6 files changed, 108 insertions, 79 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index ef7793c9..29591053 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -32,13 +32,13 @@ serde_bytes = "0.11"
serde_json = "1.0"
# newer version requires rust edition 2021
-kube = { version = "0.62", features = ["runtime", "derive"] }
-k8s-openapi = { version = "0.13", features = ["v1_22"] }
-openssl = { version = "0.10", features = ["vendored"] }
-schemars = "0.8"
+kube = { version = "0.62", features = ["runtime", "derive"], optional = true }
+k8s-openapi = { version = "0.13", features = ["v1_22"], optional = true }
+openssl = { version = "0.10", features = ["vendored"], optional = true }
+schemars = { version = "0.8", optional = true }
# newer version requires rust edition 2021
-pnet = "0.28"
+pnet_datalink = "0.28"
futures = "0.3"
futures-util = "0.3"
@@ -52,3 +52,5 @@ netapp = { version = "0.4.1", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
+[features]
+kubernetes-discovery = [ "kube", "k8s-openapi", "openssl", "schemars" ]
diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs
index 272d9162..939a0eed 100644
--- a/src/rpc/kubernetes.rs
+++ b/src/rpc/kubernetes.rs
@@ -12,8 +12,6 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID;
-use garage_util::error::Error;
-
static K8S_GROUP: &str = "deuxfleurs.fr";
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
@@ -29,7 +27,7 @@ pub struct Node {
port: u16,
}
-pub async fn create_kubernetes_crd() -> Result<(), Error> {
+pub async fn create_kubernetes_crd() -> Result<(), kube::Error> {
let client = Client::try_default().await?;
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
@@ -45,7 +43,7 @@ pub async fn create_kubernetes_crd() -> Result<(), Error> {
pub async fn get_kubernetes_nodes(
kubernetes_service_name: &str,
kubernetes_namespace: &str,
-) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
+) -> Result<Vec<(NodeID, SocketAddr)>, kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
@@ -80,7 +78,7 @@ pub async fn publish_kubernetes_node(
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
-) -> Result<(), Error> {
+) -> Result<(), kube::Error> {
let node_pubkey = hex::encode(node_id);
let mut node = GarageNode::new(
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index b8fb9772..392ff48f 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -4,6 +4,7 @@
extern crate tracing;
mod consul;
+#[cfg(feature = "kubernetes-discovery")]
mod kubernetes;
pub mod layout;
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 741f68e2..68d94ea5 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -29,6 +29,7 @@ use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::*;
+#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::*;
use crate::ring::*;
@@ -90,12 +91,10 @@ pub struct System {
rpc_listen_addr: SocketAddr,
rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
- consul_host: Option<String>,
- consul_service_name: Option<String>,
- kubernetes_service_name: Option<String>,
- kubernetes_namespace: Option<String>,
- kubernetes_skip_crd: bool,
+ consul_discovery: Option<ConsulDiscoveryParam>,
+ #[cfg(feature = "kubernetes-discovery")]
+ kubernetes_discovery: Option<KubernetesDiscoveryParam>,
replication_factor: usize,
@@ -228,15 +227,53 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
+ let rpc_public_addr = match config.rpc_public_addr {
+ Some(a) => Some(a),
+ None => {
+ let addr =
+ get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
+ if let Some(a) = addr {
+ warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
+ }
+ addr
+ }
+ };
+
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(
netapp.clone(),
config.bootstrap_peers.clone(),
- config.rpc_public_addr,
+ rpc_public_addr,
);
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
+ let consul_discovery = match (&config.consul_host, &config.consul_service_name) {
+ (Some(ch), Some(csn)) => Some(ConsulDiscoveryParam {
+ consul_host: ch.to_string(),
+ service_name: csn.to_string(),
+ }),
+ _ => None,
+ };
+
+ #[cfg(feature = "kubernetes-discovery")]
+ let kubernetes_discovery = match (
+ &config.kubernetes_service_name,
+ &config.kubernetes_namespace,
+ ) {
+ (Some(ksn), Some(kn)) => Some(KubernetesDiscoveryParam {
+ service_name: ksn.to_string(),
+ namespace: kn.to_string(),
+ skip_crd: config.kubernetes_skip_crd,
+ }),
+ _ => None,
+ };
+
+ #[cfg(not(feature = "kubernetes-discovery"))]
+ if config.kubernetes_service_name.is_some() || config.kubernetes_namespace.is_some() {
+ warn!("Kubernetes discovery is not enabled in this build.");
+ }
+
let sys = Arc::new(System {
id: netapp.id.into(),
persist_cluster_layout,
@@ -249,13 +286,11 @@ impl System {
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
- rpc_public_addr: config.rpc_public_addr,
+ rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(),
- consul_host: config.consul_host.clone(),
- consul_service_name: config.consul_service_name.clone(),
- kubernetes_service_name: config.kubernetes_service_name.clone(),
- kubernetes_namespace: config.kubernetes_namespace.clone(),
- kubernetes_skip_crd: config.kubernetes_skip_crd,
+ consul_discovery,
+ #[cfg(feature = "kubernetes-discovery")]
+ kubernetes_discovery,
ring,
update_ring: Mutex::new(update_ring),
@@ -280,23 +315,22 @@ impl System {
// ---- INTERNALS ----
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
- let (consul_host, consul_service_name) =
- match (&self.consul_host, &self.consul_service_name) {
- (Some(ch), Some(csn)) => (ch, csn),
- _ => return Ok(()),
- };
+ let c = match &self.consul_discovery {
+ Some(c) => c,
+ _ => return Ok(()),
+ };
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
- warn!("Not advertising to Consul because rpc_public_addr is not defined in config file.");
+ warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected.");
return Ok(());
}
};
publish_consul_service(
- consul_host,
- consul_service_name,
+ &c.consul_host,
+ &c.service_name,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
@@ -305,36 +339,24 @@ impl System {
.err_context("Error while publishing Consul service")
}
- fn get_default_ip() -> IpAddr {
- pnet::datalink::interfaces()
- .iter()
- .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
- .unwrap()
- .ips
- .first()
- .unwrap()
- .ip()
- }
-
+ #[cfg(feature = "kubernetes-discovery")]
async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
- let (kubernetes_service_name, kubernetes_namespace) =
- match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
- (Some(ch), Some(csn)) => (ch, csn),
- _ => return Ok(()),
- };
-
- let rpc_public_addr =
- match self.rpc_public_addr {
- Some(addr) => addr,
- None => {
- warn!("No rpc_public_addr configured, using first address on first network interface");
- SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port())
- }
- };
+ let k = match &self.kubernetes_discovery {
+ Some(k) => k,
+ _ => return Ok(()),
+ };
+
+ let rpc_public_addr = match self.rpc_public_addr {
+ Some(addr) => addr,
+ None => {
+ warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected.");
+ return Ok(());
+ }
+ };
publish_kubernetes_node(
- kubernetes_service_name,
- kubernetes_namespace,
+ &k.service_name,
+ &k.namespace,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
@@ -513,16 +535,6 @@ impl System {
}
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
- let consul_config = match (&self.consul_host, &self.consul_service_name) {
- (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
- _ => None,
- };
-
- let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
- (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
- _ => None,
- };
-
while !*stop_signal.borrow() {
let not_configured = !self.ring.borrow().layout.check();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
@@ -545,8 +557,8 @@ impl System {
}
// Fetch peer list from Consul
- if let Some((consul_host, consul_service_name)) = &consul_config {
- match get_consul_nodes(consul_host, consul_service_name).await {
+ if let Some(c) = &self.consul_discovery {
+ match get_consul_nodes(&c.consul_host, &c.service_name).await {
Ok(node_list) => {
ping_list.extend(node_list);
}
@@ -557,8 +569,9 @@ impl System {
}
// Fetch peer list from Kubernetes
- if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config {
- if !self.kubernetes_skip_crd {
+ #[cfg(feature = "kubernetes-discovery")]
+ if let Some(k) = &self.kubernetes_discovery {
+ if !k.skip_crd {
match create_kubernetes_crd().await {
Ok(()) => (),
Err(e) => {
@@ -567,8 +580,7 @@ impl System {
};
}
- match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await
- {
+ match get_kubernetes_nodes(&k.service_name, &k.namespace).await {
Ok(node_list) => {
ping_list.extend(node_list);
}
@@ -593,6 +605,8 @@ impl System {
}
self.background.spawn(self.clone().advertise_to_consul());
+
+ #[cfg(feature = "kubernetes-discovery")]
self.background
.spawn(self.clone().advertise_to_kubernetes());
@@ -657,3 +671,23 @@ impl EndpointHandler<SystemRpc> for System {
}
}
}
+
+fn get_default_ip() -> Option<IpAddr> {
+ pnet_datalink::interfaces()
+ .iter()
+ .find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
+ .and_then(|e| e.ips.first())
+ .map(|a| a.ip())
+}
+
+struct ConsulDiscoveryParam {
+ consul_host: String,
+ service_name: String,
+}
+
+#[cfg(feature = "kubernetes-discovery")]
+struct KubernetesDiscoveryParam {
+ service_name: String,
+ namespace: String,
+ skip_crd: bool,
+}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index a1b7b611..f42526de 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -40,7 +40,4 @@ netapp = "0.4.1"
http = "0.2"
hyper = "0.14"
-kube = { version = "0.62", features = ["runtime", "derive"] }
-k8s-openapi = { version = "0.13", features = ["v1_22"] }
-
opentelemetry = "0.17"
diff --git a/src/util/error.rs b/src/util/error.rs
index 93b28038..bdb3a69b 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -23,9 +23,6 @@ pub enum Error {
#[error(display = "Invalid HTTP header value: {}", _0)]
HttpHeader(#[error(source)] http::header::ToStrError),
- #[error(display = "kubernetes error: {}", _0)]
- Kubernetes(#[error(source)] kube::Error),
-
#[error(display = "Netapp error: {}", _0)]
Netapp(#[error(source)] netapp::error::Error),