aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-09-14 16:09:38 +0200
committerAlex Auvolat <alex@adnab.me>2022-09-14 16:09:38 +0200
commite46dc2a8ef8a12e49aed3883b34b538b5f65ca31 (patch)
tree19bb547df310c40ead34b14b6bad3c8e1d408397 /src/rpc
parent80fdbfb0aa1b7186db3aeef888c0954748a35c62 (diff)
downloadgarage-e46dc2a8ef8a12e49aed3883b34b538b5f65ca31.tar.gz
garage-e46dc2a8ef8a12e49aed3883b34b538b5f65ca31.zip
Allow for hostnames in bootstrap_peers and rpc_public_addr (fix #353)resolve-peer-names
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml2
-rw-r--r--src/rpc/system.rs73
2 files changed, 58 insertions, 17 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 079cfe34..e51f1f73 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
-netapp = { version = "0.5.0", features = ["telemetry"] }
+netapp = { version = "0.5.1", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 228b66a4..2c6136a8 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -18,7 +18,7 @@ use tokio::sync::Mutex;
use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*;
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-use netapp::util::parse_and_resolve_peer_addr;
+use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
@@ -92,7 +92,7 @@ pub struct System {
rpc_listen_addr: SocketAddr,
rpc_public_addr: Option<SocketAddr>,
- bootstrap_peers: Vec<(NodeID, SocketAddr)>,
+ bootstrap_peers: Vec<String>,
consul_discovery: Option<ConsulDiscoveryParam>,
#[cfg(feature = "kubernetes-discovery")]
@@ -242,8 +242,29 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
- let rpc_public_addr = match config.rpc_public_addr {
- Some(a) => Some(a),
+ let rpc_public_addr = match &config.rpc_public_addr {
+ Some(a_str) => {
+ use std::net::ToSocketAddrs;
+ match a_str.to_socket_addrs() {
+ Err(e) => {
+ error!(
+ "Cannot resolve rpc_public_addr {} from config file: {}.",
+ a_str, e
+ );
+ None
+ }
+ Ok(a) => {
+ let a = a.collect::<Vec<_>>();
+ if a.is_empty() {
+ error!("rpc_public_addr {} resolve to no known IP address", a_str);
+ }
+ if a.len() > 1 {
+ warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
+ }
+ a.into_iter().next()
+ }
+ }
+ }
None => {
let addr =
get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
@@ -253,13 +274,12 @@ impl System {
addr
}
};
+ if rpc_public_addr.is_none() {
+ warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
+ }
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
- let fullmesh = FullMeshPeeringStrategy::new(
- netapp.clone(),
- config.bootstrap_peers.clone(),
- rpc_public_addr,
- );
+ let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
@@ -370,12 +390,14 @@ impl System {
}
pub async fn connect(&self, node: &str) -> Result<(), Error> {
- let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
- Error::Message(format!(
- "Unable to parse or resolve node specification: {}",
- node
- ))
- })?;
+ let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node)
+ .await
+ .ok_or_else(|| {
+ Error::Message(format!(
+ "Unable to parse or resolve node specification: {}",
+ node
+ ))
+ })?;
let mut errors = vec![];
for ip in addrs.iter() {
match self
@@ -604,7 +626,7 @@ impl System {
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
- let mut ping_list = self.bootstrap_peers.clone();
+ let mut ping_list = resolve_peers(&self.bootstrap_peers).await;
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
@@ -735,6 +757,25 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip())
}
+async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
+ let mut ret = vec![];
+
+ for peer in peers.iter() {
+ match parse_and_resolve_peer_addr_async(peer).await {
+ Some((pubkey, addrs)) => {
+ for ip in addrs {
+ ret.push((pubkey, ip));
+ }
+ }
+ None => {
+ warn!("Unable to parse and/or resolve peer hostname {}", peer);
+ }
+ }
+ }
+
+ ret
+}
+
struct ConsulDiscoveryParam {
consul_host: String,
service_name: String,