aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/garage/main.rs8
-rw-r--r--src/rpc/Cargo.toml2
-rw-r--r--src/rpc/system.rs73
-rw-r--r--src/util/config.rs28
4 files changed, 68 insertions, 43 deletions
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 0eca24ae..e5cba553 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -162,7 +162,13 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?;
- if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr) {
+ if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) {
+ use std::net::ToSocketAddrs;
+ let a = a
+ .to_socket_addrs()
+ .ok_or_message("unable to resolve rpc_public_addr specified in config file")?
+ .next()
+ .ok_or_message("unable to resolve rpc_public_addr specified in config file")?;
(node_id, a)
} else {
let default_addr = SocketAddr::new(
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 079cfe34..e51f1f73 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
-netapp = { version = "0.5.0", features = ["telemetry"] }
+netapp = { version = "0.5.1", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 228b66a4..2c6136a8 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -18,7 +18,7 @@ use tokio::sync::Mutex;
use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*;
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-use netapp::util::parse_and_resolve_peer_addr;
+use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
@@ -92,7 +92,7 @@ pub struct System {
rpc_listen_addr: SocketAddr,
rpc_public_addr: Option<SocketAddr>,
- bootstrap_peers: Vec<(NodeID, SocketAddr)>,
+ bootstrap_peers: Vec<String>,
consul_discovery: Option<ConsulDiscoveryParam>,
#[cfg(feature = "kubernetes-discovery")]
@@ -242,8 +242,29 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
- let rpc_public_addr = match config.rpc_public_addr {
- Some(a) => Some(a),
+ let rpc_public_addr = match &config.rpc_public_addr {
+ Some(a_str) => {
+ use std::net::ToSocketAddrs;
+ match a_str.to_socket_addrs() {
+ Err(e) => {
+ error!(
+ "Cannot resolve rpc_public_addr {} from config file: {}.",
+ a_str, e
+ );
+ None
+ }
+ Ok(a) => {
+ let a = a.collect::<Vec<_>>();
+ if a.is_empty() {
+ error!("rpc_public_addr {} resolve to no known IP address", a_str);
+ }
+ if a.len() > 1 {
+ warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
+ }
+ a.into_iter().next()
+ }
+ }
+ }
None => {
let addr =
get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
@@ -253,13 +274,12 @@ impl System {
addr
}
};
+ if rpc_public_addr.is_none() {
+ warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
+ }
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
- let fullmesh = FullMeshPeeringStrategy::new(
- netapp.clone(),
- config.bootstrap_peers.clone(),
- rpc_public_addr,
- );
+ let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
@@ -370,12 +390,14 @@ impl System {
}
pub async fn connect(&self, node: &str) -> Result<(), Error> {
- let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
- Error::Message(format!(
- "Unable to parse or resolve node specification: {}",
- node
- ))
- })?;
+ let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node)
+ .await
+ .ok_or_else(|| {
+ Error::Message(format!(
+ "Unable to parse or resolve node specification: {}",
+ node
+ ))
+ })?;
let mut errors = vec![];
for ip in addrs.iter() {
match self
@@ -604,7 +626,7 @@ impl System {
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
- let mut ping_list = self.bootstrap_peers.clone();
+ let mut ping_list = resolve_peers(&self.bootstrap_peers).await;
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
@@ -735,6 +757,25 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip())
}
+async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
+ let mut ret = vec![];
+
+ for peer in peers.iter() {
+ match parse_and_resolve_peer_addr_async(peer).await {
+ Some((pubkey, addrs)) => {
+ for ip in addrs {
+ ret.push((pubkey, ip));
+ }
+ }
+ None => {
+ warn!("Unable to parse and/or resolve peer hostname {}", peer);
+ }
+ }
+ }
+
+ ret
+}
+
struct ConsulDiscoveryParam {
consul_host: String,
service_name: String,
diff --git a/src/util/config.rs b/src/util/config.rs
index cccad101..5e113e13 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -3,12 +3,8 @@ use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf;
-use serde::de::Error as SerdeError;
use serde::{de, Deserialize};
-use netapp::util::parse_and_resolve_peer_addr;
-use netapp::NodeID;
-
use crate::error::Error;
/// Represent the whole configuration
@@ -43,11 +39,11 @@ pub struct Config {
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
/// Public IP address of this node
- pub rpc_public_addr: Option<SocketAddr>,
+ pub rpc_public_addr: Option<String>,
/// Bootstrap peers RPC address
- #[serde(deserialize_with = "deserialize_vec_addr", default)]
- pub bootstrap_peers: Vec<(NodeID, SocketAddr)>,
+ #[serde(default)]
+ pub bootstrap_peers: Vec<String>,
/// Consul host to connect to to discover more peers
pub consul_host: Option<String>,
/// Consul service name to use
@@ -154,24 +150,6 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
Ok(toml::from_str(&config)?)
}
-fn deserialize_vec_addr<'de, D>(deserializer: D) -> Result<Vec<(NodeID, SocketAddr)>, D::Error>
-where
- D: de::Deserializer<'de>,
-{
- let mut ret = vec![];
-
- for peer in <Vec<&str>>::deserialize(deserializer)? {
- let (pubkey, addrs) = parse_and_resolve_peer_addr(peer).ok_or_else(|| {
- D::Error::custom(format!("Unable to parse or resolve peer: {}", peer))
- })?;
- for ip in addrs {
- ret.push((pubkey, ip));
- }
- }
-
- Ok(ret)
-}
-
fn default_compression() -> Option<i32> {
Some(1)
}