diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/garage/main.rs | 8 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 2 | ||||
-rw-r--r-- | src/rpc/system.rs | 73 | ||||
-rw-r--r-- | src/util/config.rs | 28 |
4 files changed, 68 insertions, 43 deletions
diff --git a/src/garage/main.rs b/src/garage/main.rs index 0eca24ae..e5cba553 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -162,7 +162,13 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) .err_context(READ_KEY_ERROR)?; - if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr) { + if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) { + use std::net::ToSocketAddrs; + let a = a + .to_socket_addrs() + .ok_or_message("unable to resolve rpc_public_addr specified in config file")? + .next() + .ok_or_message("unable to resolve rpc_public_addr specified in config file")?; (node_id, a) } else { let default_addr = SocketAddr::new( diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 079cfe34..e51f1f73 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -netapp = { version = "0.5.0", features = ["telemetry"] } +netapp = { version = "0.5.1", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 228b66a4..2c6136a8 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -18,7 +18,7 @@ use tokio::sync::Mutex; use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::message::*; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -use netapp::util::parse_and_resolve_peer_addr; +use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_util::background::BackgroundRunner; @@ -92,7 +92,7 @@ pub struct System { rpc_listen_addr: SocketAddr, rpc_public_addr: Option<SocketAddr>, - bootstrap_peers: Vec<(NodeID, SocketAddr)>, + bootstrap_peers: Vec<String>, consul_discovery: Option<ConsulDiscoveryParam>, #[cfg(feature = "kubernetes-discovery")] @@ -242,8 +242,29 @@ impl System { let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); - let rpc_public_addr = match config.rpc_public_addr { - Some(a) => Some(a), + let rpc_public_addr = match &config.rpc_public_addr { + Some(a_str) => { + use std::net::ToSocketAddrs; + match a_str.to_socket_addrs() { + Err(e) => { + error!( + "Cannot resolve rpc_public_addr {} from config file: {}.", + a_str, e + ); + None + } + Ok(a) => { + let a = a.collect::<Vec<_>>(); + if a.is_empty() { + error!("rpc_public_addr {} resolve to no known IP address", a_str); + } + if a.len() > 1 { + warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); + } + a.into_iter().next() + } + } + } None => { let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); @@ -253,13 +274,12 @@ impl System { addr } }; + if rpc_public_addr.is_none() { + warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication."); + } let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); - let fullmesh = FullMeshPeeringStrategy::new( - netapp.clone(), - config.bootstrap_peers.clone(), - rpc_public_addr, - ); + let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); @@ -370,12 +390,14 @@ impl System { } pub async fn connect(&self, node: &str) -> Result<(), Error> { - let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { - Error::Message(format!( - "Unable to parse or resolve node specification: {}", - node - )) - })?; + let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node) + .await + .ok_or_else(|| { + Error::Message(format!( + "Unable to parse or resolve node specification: {}", + node + )) + })?; let mut errors = vec![]; for ip in addrs.iter() { match self @@ -604,7 +626,7 @@ impl System { if not_configured || no_peers || bad_peers { info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); - let mut ping_list = self.bootstrap_peers.clone(); + let mut ping_list = resolve_peers(&self.bootstrap_peers).await; // Add peer list from list stored on disk if let Ok(peers) = self.persist_peer_list.load_async().await { @@ -735,6 +757,25 @@ fn get_default_ip() -> Option<IpAddr> { .map(|a| a.ip()) } +async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { + let mut ret = vec![]; + + for peer in peers.iter() { + match parse_and_resolve_peer_addr_async(peer).await { + Some((pubkey, addrs)) => { + for ip in addrs { + ret.push((pubkey, ip)); + } + } + None => { + warn!("Unable to parse and/or resolve peer hostname {}", peer); + } + } + } + + ret +} + struct ConsulDiscoveryParam { consul_host: String, service_name: String, diff --git a/src/util/config.rs b/src/util/config.rs index cccad101..5e113e13 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -3,12 +3,8 @@ use std::io::Read; use std::net::SocketAddr; use std::path::PathBuf; -use serde::de::Error as SerdeError; use serde::{de, Deserialize}; -use netapp::util::parse_and_resolve_peer_addr; -use netapp::NodeID; - use crate::error::Error; /// Represent the whole configuration @@ -43,11 +39,11 @@ pub struct Config { /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, /// Public IP address of this node - pub rpc_public_addr: Option<SocketAddr>, + pub rpc_public_addr: Option<String>, /// Bootstrap peers RPC address - #[serde(deserialize_with = "deserialize_vec_addr", default)] - pub bootstrap_peers: Vec<(NodeID, SocketAddr)>, + #[serde(default)] + pub bootstrap_peers: Vec<String>, /// Consul host to connect to to discover more peers pub consul_host: Option<String>, /// Consul service name to use @@ -154,24 +150,6 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { Ok(toml::from_str(&config)?) } -fn deserialize_vec_addr<'de, D>(deserializer: D) -> Result<Vec<(NodeID, SocketAddr)>, D::Error> -where - D: de::Deserializer<'de>, -{ - let mut ret = vec![]; - - for peer in <Vec<&str>>::deserialize(deserializer)? { - let (pubkey, addrs) = parse_and_resolve_peer_addr(peer).ok_or_else(|| { - D::Error::custom(format!("Unable to parse or resolve peer: {}", peer)) - })?; - for ip in addrs { - ret.push((pubkey, ip)); - } - } - - Ok(ret) -} - fn default_compression() -> Option<i32> { Some(1) } |