aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-01-11 10:58:08 +0000
committerAlex <alex@adnab.me>2024-01-11 10:58:08 +0000
commit8a6ec1d6111a60e602c90ade2200b2dab5733fe3 (patch)
treeb8daac4f41050339c87106d72ce7224f7eef38aa /src/rpc/system.rs
parent723e56b37f13f078a15e067343191fb1bf96e8b2 (diff)
parent0041b013a473e3ae72f50209d8f79db75a72848b (diff)
downloadgarage-8a6ec1d6111a60e602c90ade2200b2dab5733fe3.tar.gz
garage-8a6ec1d6111a60e602c90ade2200b2dab5733fe3.zip
Merge pull request 'NLnet task 3' (#667) from nlnet-task3 into next-0.10
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/667
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs431
1 files changed, 181 insertions, 250 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 4b40bec4..83cc6816 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -1,10 +1,10 @@
//! Module containing structs related to membership management
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
@@ -13,8 +13,7 @@ use futures::join;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
use tokio::select;
-use tokio::sync::watch;
-use tokio::sync::Mutex;
+use tokio::sync::{watch, Notify};
use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*;
@@ -34,9 +33,10 @@ use garage_util::time::*;
use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
-use crate::layout::*;
+use crate::layout::{
+ self, manager::LayoutManager, LayoutHelper, LayoutHistory, NodeRoleV, RpcLayoutDigest,
+};
use crate::replication_mode::*;
-use crate::ring::*;
use crate::rpc_helper::*;
use crate::system_metrics::*;
@@ -47,10 +47,10 @@ const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
/// Version tag used for version check upon Netapp connection.
/// Cluster nodes with different version tags are deemed
/// incompatible and will refuse to connect.
-pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008
+pub const GARAGE_VERSION_TAG: u64 = 0x676172616765000A; // garage 0x000A
/// RPC endpoint used for calls related to membership
-pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
+pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -59,17 +59,22 @@ pub enum SystemRpc {
Ok,
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format)
Connect(String),
- /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
- PullClusterLayout,
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus),
- /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
- AdvertiseClusterLayout(ClusterLayout),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
ReturnKnownNodes(Vec<KnownNodeInfo>),
+
+ /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
+ PullClusterLayout,
+ /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
+ AdvertiseClusterLayout(LayoutHistory),
+ /// Ask other node its cluster layout update trackers.
+ PullClusterLayoutTrackers,
+ /// Advertisement of cluster layout update trackers.
+ AdvertiseClusterLayoutTrackers(layout::UpdateTrackers),
}
impl Rpc for SystemRpc {
@@ -85,7 +90,6 @@ pub struct System {
/// The id of this node
pub id: Uuid,
- persist_cluster_layout: Persister<ClusterLayout>,
persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
@@ -93,9 +97,8 @@ pub struct System {
pub netapp: Arc<NetApp>,
fullmesh: Arc<FullMeshPeeringStrategy>,
- pub rpc: RpcHelper,
- system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+ pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
@@ -107,15 +110,13 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ pub layout_manager: Arc<LayoutManager>,
+
metrics: SystemMetrics,
replication_mode: ReplicationMode,
replication_factor: usize,
- /// The ring
- pub ring: watch::Receiver<Arc<Ring>>,
- update_ring: Mutex<watch::Sender<Arc<Ring>>>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
/// Path to data directory
@@ -125,14 +126,13 @@ pub struct System {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeStatus {
/// Hostname of the node
- pub hostname: String,
+ pub hostname: Option<String>,
/// Replication factor configured on the node
pub replication_factor: usize,
- /// Cluster layout version
- pub cluster_layout_version: u64,
- /// Hash of cluster layout staging data
- pub cluster_layout_staging_hash: Hash,
+
+ /// Cluster layout digest
+ pub layout_digest: RpcLayoutDigest,
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
#[serde(default)]
@@ -248,8 +248,7 @@ impl System {
replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
- let replication_factor = replication_mode.replication_factor();
-
+ // ---- setup netapp RPC protocol ----
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!(
@@ -257,82 +256,40 @@ impl System {
hex::encode(&node_key.public_key()[..8])
);
- let persist_cluster_layout: Persister<ClusterLayout> =
- Persister::new(&config.metadata_dir, "cluster_layout");
- let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
-
- let cluster_layout = match persist_cluster_layout.load() {
- Ok(x) => {
- if x.replication_factor != replication_factor {
- return Err(Error::Message(format!(
- "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
- x.replication_factor,
- replication_factor
- )));
- }
- x
- }
- Err(e) => {
- info!(
- "No valid previous cluster layout stored ({}), starting fresh.",
- e
- );
- ClusterLayout::new(replication_factor)
- }
- };
-
- let metrics = SystemMetrics::new(replication_factor);
-
- let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
- local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
+ let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
+ let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
- let ring = Ring::new(cluster_layout, replication_factor);
- let (update_ring, ring) = watch::channel(Arc::new(ring));
-
- let rpc_public_addr = match &config.rpc_public_addr {
- Some(a_str) => {
- use std::net::ToSocketAddrs;
- match a_str.to_socket_addrs() {
- Err(e) => {
- error!(
- "Cannot resolve rpc_public_addr {} from config file: {}.",
- a_str, e
- );
- None
- }
- Ok(a) => {
- let a = a.collect::<Vec<_>>();
- if a.is_empty() {
- error!("rpc_public_addr {} resolve to no known IP address", a_str);
- }
- if a.len() > 1 {
- warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
- }
- a.into_iter().next()
- }
- }
- }
- None => {
- let addr =
- get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
- if let Some(a) = addr {
- warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
- }
- addr
- }
- };
+ // ---- setup netapp public listener and full mesh peering strategy ----
+ let rpc_public_addr = get_rpc_public_addr(config);
if rpc_public_addr.is_none() {
warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
}
- let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
fullmesh.set_ping_timeout_millis(ping_timeout);
}
- let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
+ let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
+
+ // ---- setup cluster layout and layout manager ----
+ let replication_factor = replication_mode.replication_factor();
+
+ let layout_manager = LayoutManager::new(
+ config,
+ netapp.id,
+ system_endpoint.clone(),
+ fullmesh.clone(),
+ replication_mode,
+ )?;
+
+ // ---- set up metrics and status exchange ----
+ let metrics = SystemMetrics::new(replication_factor);
+
+ let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
+ local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
+ // ---- if enabled, set up additionnal peer discovery methods ----
#[cfg(feature = "consul-discovery")]
let consul_discovery = match &config.consul_discovery {
Some(cfg) => Some(
@@ -351,20 +308,14 @@ impl System {
warn!("Kubernetes discovery is not enabled in this build.");
}
+ // ---- done ----
let sys = Arc::new(System {
id: netapp.id.into(),
- persist_cluster_layout,
persist_peer_list,
local_status: ArcSwap::new(Arc::new(local_status)),
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
- fullmesh: fullmesh.clone(),
- rpc: RpcHelper::new(
- netapp.id.into(),
- fullmesh,
- ring.clone(),
- config.rpc_timeout_msec.map(Duration::from_millis),
- ),
+ fullmesh,
system_endpoint,
replication_mode,
replication_factor,
@@ -376,10 +327,9 @@ impl System {
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
+ layout_manager,
metrics,
- ring,
- update_ring: Mutex::new(update_ring),
metadata_dir: config.metadata_dir.clone(),
data_dir: config.data_dir.clone(),
});
@@ -399,6 +349,20 @@ impl System {
);
}
+ // ---- Public utilities / accessors ----
+
+ pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHelper> {
+ self.layout_manager.layout()
+ }
+
+ pub fn layout_notify(&self) -> Arc<Notify> {
+ self.layout_manager.change_notify.clone()
+ }
+
+ pub fn rpc_helper(&self) -> &RpcHelper {
+ &self.layout_manager.rpc_helper
+ }
+
// ---- Administrative operations (directly available and
// also available through RPC) ----
@@ -425,18 +389,6 @@ impl System {
known_nodes
}
- pub fn get_cluster_layout(&self) -> ClusterLayout {
- self.ring.borrow().layout.clone()
- }
-
- pub async fn update_cluster_layout(
- self: &Arc<Self>,
- layout: &ClusterLayout,
- ) -> Result<(), Error> {
- self.handle_advertise_cluster_layout(layout).await?;
- Ok(())
- }
-
pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node)
.await
@@ -466,47 +418,63 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- let ring: Arc<_> = self.ring.borrow().clone();
let quorum = self.replication_mode.write_quorum();
- let replication_factor = self.replication_factor;
+ // Gather information about running nodes.
+ // Technically, `nodes` contains currently running nodes, as well
+ // as nodes that this Garage process has been connected to at least
+ // once since it started.
let nodes = self
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+ let node_up = |x: &Uuid| nodes.get(x).map(|n| n.is_up).unwrap_or(false);
+
+ // Acquire a rwlock read-lock to the current cluster layout
+ let layout = self.cluster_layout();
+
+ // Obtain information about nodes that have a role as storage nodes
+ // in one of the active layout versions
+ let mut storage_nodes = HashSet::<Uuid>::with_capacity(16);
+ for ver in layout.versions.iter() {
+ storage_nodes.extend(
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
+ .map(|(n, _, _)| *n),
+ )
+ }
+ let storage_nodes_ok = storage_nodes.iter().filter(|x| node_up(x)).count();
+
+ // Determine the number of partitions that have:
+ // - a quorum of up nodes for all write sets (i.e. are available)
+ // - for which all nodes in all write sets are up (i.e. are fully healthy)
+ let partitions = layout.current().partitions().collect::<Vec<_>>();
+ let mut partitions_quorum = 0;
+ let mut partitions_all_ok = 0;
+ for (_, hash) in partitions.iter() {
+ let mut write_sets = layout
+ .versions
+ .iter()
+ .map(|x| x.nodes_of(hash, x.replication_factor));
+ let has_quorum = write_sets
+ .clone()
+ .all(|set| set.filter(|x| node_up(x)).count() >= quorum);
+ let all_ok = write_sets.all(|mut set| set.all(|x| node_up(&x)));
+ if has_quorum {
+ partitions_quorum += 1;
+ }
+ if all_ok {
+ partitions_all_ok += 1;
+ }
+ }
- let storage_nodes = ring
- .layout
- .roles
- .items()
- .iter()
- .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
- .collect::<Vec<_>>();
- let storage_nodes_ok = storage_nodes
- .iter()
- .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count();
-
- let partitions = ring.partitions();
- let partitions_n_up = partitions
- .iter()
- .map(|(_, h)| {
- let pn = ring.get_nodes(h, ring.replication_factor);
- pn.iter()
- .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count()
- })
- .collect::<Vec<usize>>();
- let partitions_all_ok = partitions_n_up
- .iter()
- .filter(|c| **c == replication_factor)
- .count();
- let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
-
+ // Determine overall cluster status
let status =
- if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() {
+ if partitions_all_ok == partitions.len() && storage_nodes_ok == storage_nodes.len() {
ClusterHealthStatus::Healthy
} else if partitions_quorum == partitions.len() {
ClusterHealthStatus::Degraded
@@ -546,7 +514,7 @@ impl System {
if let Err(e) = c
.publish_consul_service(
self.netapp.id,
- &self.local_status.load_full().hostname,
+ &self.local_status.load_full().hostname.as_deref().unwrap(),
rpc_public_addr,
)
.await
@@ -573,7 +541,7 @@ impl System {
if let Err(e) = publish_kubernetes_node(
k,
self.netapp.id,
- &self.local_status.load_full().hostname,
+ &self.local_status.load_full().hostname.as_deref().unwrap(),
rpc_public_addr,
)
.await
@@ -582,22 +550,10 @@ impl System {
}
}
- /// Save network configuration to disc
- async fn save_cluster_layout(&self) -> Result<(), Error> {
- let ring: Arc<Ring> = self.ring.borrow().clone();
- self.persist_cluster_layout
- .save_async(&ring.layout)
- .await
- .expect("Cannot save current cluster layout");
- Ok(())
- }
-
fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
- let ring = self.ring.borrow();
- new_si.cluster_layout_version = ring.layout.version;
- new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
+ new_si.layout_digest = self.layout_manager.layout().digest();
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@@ -611,11 +567,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let ring = self.ring.borrow().clone();
- SystemRpc::AdvertiseClusterLayout(ring.layout.clone())
- }
-
fn handle_get_known_nodes(&self) -> SystemRpc {
let known_nodes = self.get_known_nodes();
SystemRpc::ReturnKnownNodes(known_nodes)
@@ -635,11 +586,8 @@ impl System {
std::process::exit(1);
}
- if info.cluster_layout_version > local_info.cluster_layout_version
- || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
- {
- tokio::spawn(self.clone().pull_cluster_layout(from));
- }
+ self.layout_manager
+ .handle_advertise_status(from, &info.layout_digest);
self.node_status
.write()
@@ -649,57 +597,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- async fn handle_advertise_cluster_layout(
- self: &Arc<Self>,
- adv: &ClusterLayout,
- ) -> Result<SystemRpc, Error> {
- if adv.replication_factor != self.replication_factor {
- let msg = format!(
- "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
- adv.replication_factor,
- self.replication_factor
- );
- error!("{}", msg);
- return Err(Error::Message(msg));
- }
-
- let update_ring = self.update_ring.lock().await;
- let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
-
- let prev_layout_check = layout.check().is_ok();
- if layout.merge(adv) {
- if prev_layout_check && layout.check().is_err() {
- error!("New cluster layout is invalid, discarding.");
- return Err(Error::Message(
- "New cluster layout is invalid, discarding.".into(),
- ));
- }
-
- let ring = Ring::new(layout.clone(), self.replication_factor);
- update_ring.send(Arc::new(ring))?;
- drop(update_ring);
-
- let self2 = self.clone();
- tokio::spawn(async move {
- if let Err(e) = self2
- .rpc
- .broadcast(
- &self2.system_endpoint,
- SystemRpc::AdvertiseClusterLayout(layout),
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await
- {
- warn!("Error while broadcasting new cluster layout: {}", e);
- }
- });
-
- self.save_cluster_layout().await?;
- }
-
- Ok(SystemRpc::Ok)
- }
-
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
@@ -707,7 +604,7 @@ impl System {
self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
let _ = self
- .rpc
+ .rpc_helper()
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
@@ -725,9 +622,9 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let not_configured = self.ring.borrow().layout.check().is_err();
+ let not_configured = self.cluster_layout().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
- let expected_n_nodes = self.ring.borrow().layout.num_nodes();
+ let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = self
.fullmesh
.get_peer_list()
@@ -832,48 +729,49 @@ impl System {
.save_async(&PeerList(peer_list))
.await
}
-
- async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
- let resp = self
- .rpc
- .call(
- &self.system_endpoint,
- peer,
- SystemRpc::PullClusterLayout,
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await;
- if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
- let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
- }
- }
}
#[async_trait]
impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg {
+ // ---- system functions -> System ----
SystemRpc::Connect(node) => self.handle_connect(node).await,
- SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()),
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
+ SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+
+ // ---- layout functions -> LayoutManager ----
+ SystemRpc::PullClusterLayout => Ok(self.layout_manager.handle_pull_cluster_layout()),
SystemRpc::AdvertiseClusterLayout(adv) => {
- self.clone().handle_advertise_cluster_layout(adv).await
+ self.layout_manager
+ .handle_advertise_cluster_layout(adv)
+ .await
}
- SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+ SystemRpc::PullClusterLayoutTrackers => {
+ Ok(self.layout_manager.handle_pull_cluster_layout_trackers())
+ }
+ SystemRpc::AdvertiseClusterLayoutTrackers(adv) => {
+ self.layout_manager
+ .handle_advertise_cluster_layout_trackers(adv)
+ .await
+ }
+
+ // ---- other -> Error ----
m => Err(Error::unexpected_rpc_message(m)),
}
}
}
impl NodeStatus {
- fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
+ fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self {
NodeStatus {
- hostname: gethostname::gethostname()
- .into_string()
- .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ hostname: Some(
+ gethostname::gethostname()
+ .into_string()
+ .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ ),
replication_factor,
- cluster_layout_version: layout.version,
- cluster_layout_staging_hash: layout.staging_hash,
+ layout_digest: layout_manager.layout().digest(),
meta_disk_avail: None,
data_disk_avail: None,
}
@@ -881,10 +779,9 @@ impl NodeStatus {
fn unknown() -> Self {
NodeStatus {
- hostname: "?".to_string(),
+ hostname: None,
replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ layout_digest: Default::default(),
meta_disk_avail: None,
data_disk_avail: None,
}
@@ -963,6 +860,40 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip())
}
+fn get_rpc_public_addr(config: &Config) -> Option<SocketAddr> {
+ match &config.rpc_public_addr {
+ Some(a_str) => {
+ use std::net::ToSocketAddrs;
+ match a_str.to_socket_addrs() {
+ Err(e) => {
+ error!(
+ "Cannot resolve rpc_public_addr {} from config file: {}.",
+ a_str, e
+ );
+ None
+ }
+ Ok(a) => {
+ let a = a.collect::<Vec<_>>();
+ if a.is_empty() {
+ error!("rpc_public_addr {} resolve to no known IP address", a_str);
+ }
+ if a.len() > 1 {
+ warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
+ }
+ a.into_iter().next()
+ }
+ }
+ }
+ None => {
+ let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
+ if let Some(a) = addr {
+ warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
+ }
+ addr
+ }
+ }
+}
+
async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
let mut ret = vec![];