aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-04-10 15:23:12 +0000
committerAlex <alex@adnab.me>2024-04-10 15:23:12 +0000
commit1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e (patch)
tree47e42c4e6ae47590fbb5c8f94e90a23bf04c1674 /src/rpc/system.rs
parentb47706809cc9d28d1328bafdf9756e96388cca24 (diff)
parentff093ddbb8485409f389abe7b5e569cb38d222d2 (diff)
downloadgarage-1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e.tar.gz
garage-1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e.zip
Merge pull request 'Garage v1.0' (#683) from next-0.10 into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/683
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs468
1 files changed, 201 insertions, 267 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 9e475717..0e78060b 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -1,9 +1,9 @@
//! Module containing structs related to membership management
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant};
use arc_swap::ArcSwapOption;
@@ -12,12 +12,11 @@ use futures::join;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
use tokio::select;
-use tokio::sync::watch;
-use tokio::sync::Mutex;
+use tokio::sync::{watch, Notify};
use garage_net::endpoint::{Endpoint, EndpointHandler};
use garage_net::message::*;
-use garage_net::peering::PeeringManager;
+use garage_net::peering::{PeerConnState, PeeringManager};
use garage_net::util::parse_and_resolve_peer_addr_async;
use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
@@ -33,9 +32,10 @@ use garage_util::time::*;
use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
-use crate::layout::*;
+use crate::layout::{
+ self, manager::LayoutManager, LayoutHelper, LayoutHistory, NodeRoleV, RpcLayoutDigest,
+};
use crate::replication_mode::*;
-use crate::ring::*;
use crate::rpc_helper::*;
use crate::system_metrics::*;
@@ -46,10 +46,10 @@ const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
/// Version tag used for version check upon Netapp connection.
/// Cluster nodes with different version tags are deemed
/// incompatible and will refuse to connect.
-pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008
+pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650010; // garage 0x0010 (1.0)
/// RPC endpoint used for calls related to membership
-pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
+pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -58,17 +58,22 @@ pub enum SystemRpc {
Ok,
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format, pubkey = full-length node ID)
Connect(String),
- /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
- PullClusterLayout,
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus),
- /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
- AdvertiseClusterLayout(ClusterLayout),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
ReturnKnownNodes(Vec<KnownNodeInfo>),
+
+ /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
+ PullClusterLayout,
+ /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
+ AdvertiseClusterLayout(LayoutHistory),
+ /// Ask other node its cluster layout update trackers.
+ PullClusterLayoutTrackers,
+ /// Advertisement of cluster layout update trackers.
+ AdvertiseClusterLayoutTrackers(layout::UpdateTrackers),
}
impl Rpc for SystemRpc {
@@ -84,7 +89,6 @@ pub struct System {
/// The id of this node
pub id: Uuid,
- persist_cluster_layout: Persister<ClusterLayout>,
persist_peer_list: Persister<PeerList>,
pub(crate) local_status: RwLock<NodeStatus>,
@@ -92,9 +96,8 @@ pub struct System {
pub netapp: Arc<NetApp>,
peering: Arc<PeeringManager>,
- pub rpc: RpcHelper,
- system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+ pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
rpc_public_addr: Option<SocketAddr>,
@@ -105,14 +108,11 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
- metrics: ArcSwapOption<SystemMetrics>,
+ pub layout_manager: Arc<LayoutManager>,
- replication_mode: ReplicationMode,
- pub(crate) replication_factor: usize,
+ metrics: ArcSwapOption<SystemMetrics>,
- /// The ring
- pub ring: watch::Receiver<Arc<Ring>>,
- update_ring: Mutex<watch::Sender<Arc<Ring>>>,
+ pub(crate) replication_factor: ReplicationFactor,
/// Path to metadata directory
pub metadata_dir: PathBuf,
@@ -123,14 +123,13 @@ pub struct System {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeStatus {
/// Hostname of the node
- pub hostname: String,
+ pub hostname: Option<String>,
/// Replication factor configured on the node
pub replication_factor: usize,
- /// Cluster layout version
- pub cluster_layout_version: u64,
- /// Hash of cluster layout staging data
- pub cluster_layout_staging_hash: Hash,
+
+ /// Cluster layout digest
+ pub layout_digest: RpcLayoutDigest,
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
#[serde(default)]
@@ -143,7 +142,7 @@ pub struct NodeStatus {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KnownNodeInfo {
pub id: Uuid,
- pub addr: SocketAddr,
+ pub addr: Option<SocketAddr>,
pub is_up: bool,
pub last_seen_secs_ago: Option<u64>,
pub status: NodeStatus,
@@ -243,11 +242,11 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
- replication_mode: ReplicationMode,
+ replication_factor: ReplicationFactor,
+ consistency_mode: ConsistencyMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
- let replication_factor = replication_mode.replication_factor();
-
+ // ---- setup netapp RPC protocol ----
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!(
@@ -255,83 +254,39 @@ impl System {
hex::encode(&node_key.public_key()[..8])
);
- let persist_cluster_layout: Persister<ClusterLayout> =
- Persister::new(&config.metadata_dir, "cluster_layout");
- let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
-
- let cluster_layout = match persist_cluster_layout.load() {
- Ok(x) => {
- if x.replication_factor != replication_factor {
- return Err(Error::Message(format!(
- "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
- x.replication_factor,
- replication_factor
- )));
- }
- x
- }
- Err(e) => {
- info!(
- "No valid previous cluster layout stored ({}), starting fresh.",
- e
- );
- ClusterLayout::new(replication_factor)
- }
- };
-
- let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
- local_status.update_disk_usage(&config.metadata_dir, &config.data_dir);
+ let bind_outgoing_to = Some(config)
+ .filter(|x| x.rpc_bind_outgoing)
+ .map(|x| x.rpc_bind_addr.ip());
+ let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key, bind_outgoing_to);
+ let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
- let ring = Ring::new(cluster_layout, replication_factor);
- let (update_ring, ring) = watch::channel(Arc::new(ring));
-
- let rpc_public_addr = match &config.rpc_public_addr {
- Some(a_str) => {
- use std::net::ToSocketAddrs;
- match a_str.to_socket_addrs() {
- Err(e) => {
- error!(
- "Cannot resolve rpc_public_addr {} from config file: {}.",
- a_str, e
- );
- None
- }
- Ok(a) => {
- let a = a.collect::<Vec<_>>();
- if a.is_empty() {
- error!("rpc_public_addr {} resolve to no known IP address", a_str);
- }
- if a.len() > 1 {
- warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
- }
- a.into_iter().next()
- }
- }
- }
- None => {
- let addr =
- get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
- if let Some(a) = addr {
- warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
- }
- addr
- }
- };
+ // ---- setup netapp public listener and full mesh peering strategy ----
+ let rpc_public_addr = get_rpc_public_addr(config);
if rpc_public_addr.is_none() {
warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
}
- let bind_outgoing_to = Some(config)
- .filter(|x| x.rpc_bind_outgoing)
- .map(|x| x.rpc_bind_addr.ip());
- let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key, bind_outgoing_to);
let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr);
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
peering.set_ping_timeout_millis(ping_timeout);
}
- let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
+ let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
+
+ // ---- setup cluster layout and layout manager ----
+ let layout_manager = LayoutManager::new(
+ config,
+ netapp.id,
+ system_endpoint.clone(),
+ peering.clone(),
+ replication_factor,
+ consistency_mode,
+ )?;
+
+ let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
+ local_status.update_disk_usage(&config.metadata_dir, &config.data_dir);
+ // ---- if enabled, set up additionnal peer discovery methods ----
#[cfg(feature = "consul-discovery")]
let consul_discovery = match &config.consul_discovery {
Some(cfg) => Some(
@@ -350,22 +305,15 @@ impl System {
warn!("Kubernetes discovery is not enabled in this build.");
}
+ // ---- almost done ----
let sys = Arc::new(System {
id: netapp.id.into(),
- persist_cluster_layout,
persist_peer_list,
local_status: RwLock::new(local_status),
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
peering: peering.clone(),
- rpc: RpcHelper::new(
- netapp.id.into(),
- peering,
- ring.clone(),
- config.rpc_timeout_msec.map(Duration::from_millis),
- ),
system_endpoint,
- replication_mode,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
rpc_public_addr,
@@ -374,10 +322,9 @@ impl System {
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
+ layout_manager,
metrics: ArcSwapOption::new(None),
- ring,
- update_ring: Mutex::new(update_ring),
metadata_dir: config.metadata_dir.clone(),
data_dir: config.data_dir.clone(),
});
@@ -409,6 +356,20 @@ impl System {
self.metrics.store(None);
}
+ // ---- Public utilities / accessors ----
+
+ pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHelper> {
+ self.layout_manager.layout()
+ }
+
+ pub fn layout_notify(&self) -> Arc<Notify> {
+ self.layout_manager.change_notify.clone()
+ }
+
+ pub fn rpc_helper(&self) -> &RpcHelper {
+ &self.layout_manager.rpc_helper
+ }
+
// ---- Administrative operations (directly available and
// also available through RPC) ----
@@ -420,7 +381,11 @@ impl System {
.iter()
.map(|n| KnownNodeInfo {
id: n.id.into(),
- addr: n.addr,
+ addr: match n.state {
+ PeerConnState::Ourself => self.rpc_public_addr,
+ PeerConnState::Connected { addr } => Some(addr),
+ _ => None,
+ },
is_up: n.is_up(),
last_seen_secs_ago: n
.last_seen
@@ -435,18 +400,6 @@ impl System {
known_nodes
}
- pub fn get_cluster_layout(&self) -> ClusterLayout {
- self.ring.borrow().layout.clone()
- }
-
- pub async fn update_cluster_layout(
- self: &Arc<Self>,
- layout: &ClusterLayout,
- ) -> Result<(), Error> {
- self.handle_advertise_cluster_layout(layout).await?;
- Ok(())
- }
-
pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node)
.await
@@ -476,47 +429,65 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- let ring: Arc<_> = self.ring.borrow().clone();
- let quorum = self.replication_mode.write_quorum();
- let replication_factor = self.replication_factor;
-
+ let quorum = self
+ .replication_factor
+ .write_quorum(ConsistencyMode::Consistent);
+
+ // Gather information about running nodes.
+ // Technically, `nodes` contains currently running nodes, as well
+ // as nodes that this Garage process has been connected to at least
+ // once since it started.
let nodes = self
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+ let node_up = |x: &Uuid| nodes.get(x).map(|n| n.is_up).unwrap_or(false);
+
+ // Acquire a rwlock read-lock to the current cluster layout
+ let layout = self.cluster_layout();
+
+ // Obtain information about nodes that have a role as storage nodes
+ // in one of the active layout versions
+ let mut storage_nodes = HashSet::<Uuid>::with_capacity(16);
+ for ver in layout.versions().iter() {
+ storage_nodes.extend(
+ ver.roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
+ .map(|(n, _, _)| *n),
+ )
+ }
+ let storage_nodes_ok = storage_nodes.iter().filter(|x| node_up(x)).count();
+
+ // Determine the number of partitions that have:
+ // - a quorum of up nodes for all write sets (i.e. are available)
+ // - for which all nodes in all write sets are up (i.e. are fully healthy)
+ let partitions = layout.current().partitions().collect::<Vec<_>>();
+ let mut partitions_quorum = 0;
+ let mut partitions_all_ok = 0;
+ for (_, hash) in partitions.iter() {
+ let mut write_sets = layout
+ .versions()
+ .iter()
+ .map(|x| x.nodes_of(hash, x.replication_factor));
+ let has_quorum = write_sets
+ .clone()
+ .all(|set| set.filter(|x| node_up(x)).count() >= quorum);
+ let all_ok = write_sets.all(|mut set| set.all(|x| node_up(&x)));
+ if has_quorum {
+ partitions_quorum += 1;
+ }
+ if all_ok {
+ partitions_all_ok += 1;
+ }
+ }
- let storage_nodes = ring
- .layout
- .roles
- .items()
- .iter()
- .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
- .collect::<Vec<_>>();
- let storage_nodes_ok = storage_nodes
- .iter()
- .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count();
-
- let partitions = ring.partitions();
- let partitions_n_up = partitions
- .iter()
- .map(|(_, h)| {
- let pn = ring.get_nodes(h, ring.replication_factor);
- pn.iter()
- .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
- .count()
- })
- .collect::<Vec<usize>>();
- let partitions_all_ok = partitions_n_up
- .iter()
- .filter(|c| **c == replication_factor)
- .count();
- let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
-
+ // Determine overall cluster status
let status =
- if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() {
+ if partitions_all_ok == partitions.len() && storage_nodes_ok == storage_nodes.len() {
ClusterHealthStatus::Healthy
} else if partitions_quorum == partitions.len() {
ClusterHealthStatus::Degraded
@@ -553,7 +524,7 @@ impl System {
}
};
- let hostname = self.local_status.read().unwrap().hostname.clone();
+ let hostname = self.local_status.read().unwrap().hostname.clone().unwrap();
if let Err(e) = c
.publish_consul_service(self.netapp.id, &hostname, rpc_public_addr)
.await
@@ -577,30 +548,16 @@ impl System {
}
};
- let hostname = self.local_status.read().unwrap().hostname.clone();
+ let hostname = self.local_status.read().unwrap().hostname.clone().unwrap();
if let Err(e) = publish_kubernetes_node(k, self.netapp.id, &hostname, rpc_public_addr).await
{
error!("Error while publishing node to Kubernetes: {}", e);
}
}
- /// Save network configuration to disc
- async fn save_cluster_layout(&self) -> Result<(), Error> {
- let ring: Arc<Ring> = self.ring.borrow().clone();
- self.persist_cluster_layout
- .save_async(&ring.layout)
- .await
- .expect("Cannot save current cluster layout");
- Ok(())
- }
-
fn update_local_status(&self) {
let mut local_status = self.local_status.write().unwrap();
-
- let ring = self.ring.borrow();
- local_status.cluster_layout_version = ring.layout.version;
- local_status.cluster_layout_staging_hash = ring.layout.staging_hash;
-
+ local_status.layout_digest = self.layout_manager.layout().digest();
local_status.update_disk_usage(&self.metadata_dir, &self.data_dir);
}
@@ -611,11 +568,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let ring = self.ring.borrow().clone();
- SystemRpc::AdvertiseClusterLayout(ring.layout.clone())
- }
-
fn handle_get_known_nodes(&self) -> SystemRpc {
let known_nodes = self.get_known_nodes();
SystemRpc::ReturnKnownNodes(known_nodes)
@@ -635,11 +587,8 @@ impl System {
std::process::exit(1);
}
- if info.cluster_layout_version > local_info.cluster_layout_version
- || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
- {
- tokio::spawn(self.clone().pull_cluster_layout(from));
- }
+ self.layout_manager
+ .handle_advertise_status(from, &info.layout_digest);
drop(local_info);
@@ -651,57 +600,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- async fn handle_advertise_cluster_layout(
- self: &Arc<Self>,
- adv: &ClusterLayout,
- ) -> Result<SystemRpc, Error> {
- if adv.replication_factor != self.replication_factor {
- let msg = format!(
- "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
- adv.replication_factor,
- self.replication_factor
- );
- error!("{}", msg);
- return Err(Error::Message(msg));
- }
-
- let update_ring = self.update_ring.lock().await;
- let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
-
- let prev_layout_check = layout.check().is_ok();
- if layout.merge(adv) {
- if prev_layout_check && layout.check().is_err() {
- error!("New cluster layout is invalid, discarding.");
- return Err(Error::Message(
- "New cluster layout is invalid, discarding.".into(),
- ));
- }
-
- let ring = Ring::new(layout.clone(), self.replication_factor);
- update_ring.send(Arc::new(ring))?;
- drop(update_ring);
-
- let self2 = self.clone();
- tokio::spawn(async move {
- if let Err(e) = self2
- .rpc
- .broadcast(
- &self2.system_endpoint,
- SystemRpc::AdvertiseClusterLayout(layout),
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await
- {
- warn!("Error while broadcasting new cluster layout: {}", e);
- }
- });
-
- self.save_cluster_layout().await?;
- }
-
- Ok(SystemRpc::Ok)
- }
-
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
@@ -711,7 +609,7 @@ impl System {
let local_status: NodeStatus = self.local_status.read().unwrap().clone();
let _ = self
- .rpc
+ .rpc_helper()
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
@@ -736,10 +634,9 @@ impl System {
.filter(|p| p.is_up())
.count();
- let not_configured = self.ring.borrow().layout.check().is_err();
- let no_peers = n_connected < self.replication_factor;
-
- let expected_n_nodes = self.ring.borrow().layout.num_nodes();
+ let not_configured = !self.cluster_layout().is_check_ok();
+ let no_peers = n_connected < self.replication_factor.into();
+ let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = n_connected != expected_n_nodes;
if not_configured || no_peers || bad_peers {
@@ -791,8 +688,8 @@ impl System {
// If the layout is configured, and we already have some connections
// to other nodes in the cluster, we can skip trying to connect to
// nodes that are not in the cluster layout.
- let ring = self.ring.borrow();
- ping_list.retain(|(id, _)| ring.layout.node_ids().contains(&(*id).into()));
+ let layout = self.cluster_layout();
+ ping_list.retain(|(id, _)| layout.all_nodes().contains(&(*id).into()));
}
for (node_id, node_addr) in ping_list {
@@ -829,7 +726,10 @@ impl System {
.peering
.get_peer_list()
.iter()
- .map(|n| (n.id.into(), n.addr))
+ .filter_map(|n| match n.state {
+ PeerConnState::Connected { addr } => Some((n.id.into(), addr)),
+ _ => None,
+ })
.collect::<Vec<_>>();
// Before doing it, we read the current peer list file (if it exists)
@@ -847,48 +747,49 @@ impl System {
.save_async(&PeerList(peer_list))
.await
}
-
- async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
- let resp = self
- .rpc
- .call(
- &self.system_endpoint,
- peer,
- SystemRpc::PullClusterLayout,
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await;
- if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
- let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
- }
- }
}
#[async_trait]
impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg {
+ // ---- system functions -> System ----
SystemRpc::Connect(node) => self.handle_connect(node).await,
- SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()),
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
+ SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+
+ // ---- layout functions -> LayoutManager ----
+ SystemRpc::PullClusterLayout => Ok(self.layout_manager.handle_pull_cluster_layout()),
SystemRpc::AdvertiseClusterLayout(adv) => {
- self.clone().handle_advertise_cluster_layout(adv).await
+ self.layout_manager
+ .handle_advertise_cluster_layout(adv)
+ .await
}
- SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+ SystemRpc::PullClusterLayoutTrackers => {
+ Ok(self.layout_manager.handle_pull_cluster_layout_trackers())
+ }
+ SystemRpc::AdvertiseClusterLayoutTrackers(adv) => {
+ self.layout_manager
+ .handle_advertise_cluster_layout_trackers(adv)
+ .await
+ }
+
+ // ---- other -> Error ----
m => Err(Error::unexpected_rpc_message(m)),
}
}
}
impl NodeStatus {
- fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
+ fn initial(replication_factor: ReplicationFactor, layout_manager: &LayoutManager) -> Self {
NodeStatus {
- hostname: gethostname::gethostname()
- .into_string()
- .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
- replication_factor,
- cluster_layout_version: layout.version,
- cluster_layout_staging_hash: layout.staging_hash,
+ hostname: Some(
+ gethostname::gethostname()
+ .into_string()
+ .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ ),
+ replication_factor: replication_factor.into(),
+ layout_digest: layout_manager.layout().digest(),
meta_disk_avail: None,
data_disk_avail: None,
}
@@ -896,10 +797,9 @@ impl NodeStatus {
fn unknown() -> Self {
NodeStatus {
- hostname: "?".to_string(),
+ hostname: None,
replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ layout_digest: Default::default(),
meta_disk_avail: None,
data_disk_avail: None,
}
@@ -952,6 +852,40 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip())
}
+fn get_rpc_public_addr(config: &Config) -> Option<SocketAddr> {
+ match &config.rpc_public_addr {
+ Some(a_str) => {
+ use std::net::ToSocketAddrs;
+ match a_str.to_socket_addrs() {
+ Err(e) => {
+ error!(
+ "Cannot resolve rpc_public_addr {} from config file: {}.",
+ a_str, e
+ );
+ None
+ }
+ Ok(a) => {
+ let a = a.collect::<Vec<_>>();
+ if a.is_empty() {
+ error!("rpc_public_addr {} resolve to no known IP address", a_str);
+ }
+ if a.len() > 1 {
+ warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
+ }
+ a.into_iter().next()
+ }
+ }
+ }
+ None => {
+ let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
+ if let Some(a) = addr {
+ warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
+ }
+ addr
+ }
+ }
+}
+
async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
let mut ret = vec![];