aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs295
1 files changed, 105 insertions, 190 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index a7433b68..a8e88425 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -14,7 +14,6 @@ use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
use tokio::select;
use tokio::sync::watch;
-use tokio::sync::Mutex;
use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*;
@@ -34,6 +33,7 @@ use garage_util::time::*;
use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
+use crate::layout::manager::LayoutManager;
use crate::layout::*;
use crate::replication_mode::*;
use crate::rpc_helper::*;
@@ -49,7 +49,7 @@ const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
pub const GARAGE_VERSION_TAG: u64 = 0x676172616765000A; // garage 0x000A
/// RPC endpoint used for calls related to membership
-pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
+pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -58,17 +58,17 @@ pub enum SystemRpc {
Ok,
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format)
Connect(String),
- /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
- PullClusterLayout,
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus),
- /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
- AdvertiseClusterLayout(LayoutHistory),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
ReturnKnownNodes(Vec<KnownNodeInfo>),
+ /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
+ PullClusterLayout,
+ /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
+ AdvertiseClusterLayout(LayoutHistory),
}
impl Rpc for SystemRpc {
@@ -84,7 +84,6 @@ pub struct System {
/// The id of this node
pub id: Uuid,
- persist_cluster_layout: Persister<LayoutHistory>,
persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
@@ -92,9 +91,8 @@ pub struct System {
pub netapp: Arc<NetApp>,
fullmesh: Arc<FullMeshPeeringStrategy>,
- pub rpc: RpcHelper,
- system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+ pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
@@ -106,15 +104,13 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ pub layout_manager: LayoutManager,
+
metrics: SystemMetrics,
replication_mode: ReplicationMode,
replication_factor: usize,
- /// The layout
- pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
- update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
/// Path to data directory
@@ -128,8 +124,11 @@ pub struct NodeStatus {
/// Replication factor configured on the node
pub replication_factor: usize,
+
/// Cluster layout version
pub cluster_layout_version: u64,
+ /// Hash of cluster layout update trackers
+ // (TODO) pub cluster_layout_trackers_hash: Hash,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
@@ -247,8 +246,7 @@ impl System {
replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
- let replication_factor = replication_mode.replication_factor();
-
+ // ---- setup netapp RPC protocol ----
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!(
@@ -256,81 +254,40 @@ impl System {
hex::encode(&node_key.public_key()[..8])
);
- let persist_cluster_layout: Persister<LayoutHistory> =
- Persister::new(&config.metadata_dir, "cluster_layout");
- let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
-
- let cluster_layout = match persist_cluster_layout.load() {
- Ok(x) => {
- if x.current().replication_factor != replication_factor {
- return Err(Error::Message(format!(
- "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
- x.current().replication_factor,
- replication_factor
- )));
- }
- x
- }
- Err(e) => {
- info!(
- "No valid previous cluster layout stored ({}), starting fresh.",
- e
- );
- LayoutHistory::new(replication_factor)
- }
- };
-
- let metrics = SystemMetrics::new(replication_factor);
-
- let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
- local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
+ let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
+ let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
- let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout));
-
- let rpc_public_addr = match &config.rpc_public_addr {
- Some(a_str) => {
- use std::net::ToSocketAddrs;
- match a_str.to_socket_addrs() {
- Err(e) => {
- error!(
- "Cannot resolve rpc_public_addr {} from config file: {}.",
- a_str, e
- );
- None
- }
- Ok(a) => {
- let a = a.collect::<Vec<_>>();
- if a.is_empty() {
- error!("rpc_public_addr {} resolve to no known IP address", a_str);
- }
- if a.len() > 1 {
- warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
- }
- a.into_iter().next()
- }
- }
- }
- None => {
- let addr =
- get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
- if let Some(a) = addr {
- warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
- }
- addr
- }
- };
+ // ---- setup netapp public listener and full mesh peering strategy ----
+ let rpc_public_addr = get_rpc_public_addr(config);
if rpc_public_addr.is_none() {
warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
}
- let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
fullmesh.set_ping_timeout_millis(ping_timeout);
}
- let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
+ let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
+ // ---- setup cluster layout and layout manager ----
+ let replication_factor = replication_mode.replication_factor();
+
+ let layout_manager = LayoutManager::new(
+ config,
+ netapp.id,
+ system_endpoint.clone(),
+ fullmesh.clone(),
+ replication_factor,
+ )?;
+
+ // ---- set up metrics and status exchange ----
+ let metrics = SystemMetrics::new(replication_factor);
+
+ let mut local_status = NodeStatus::initial(replication_factor, &layout_manager.history());
+ local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
+
+ // ---- if enabled, set up additionnal peer discovery methods ----
#[cfg(feature = "consul-discovery")]
let consul_discovery = match &config.consul_discovery {
Some(cfg) => Some(
@@ -349,20 +306,14 @@ impl System {
warn!("Kubernetes discovery is not enabled in this build.");
}
+ // ---- done ----
let sys = Arc::new(System {
id: netapp.id.into(),
- persist_cluster_layout,
persist_peer_list,
local_status: ArcSwap::new(Arc::new(local_status)),
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
- rpc: RpcHelper::new(
- netapp.id.into(),
- fullmesh,
- layout_watch.clone(),
- config.rpc_timeout_msec.map(Duration::from_millis),
- ),
system_endpoint,
replication_mode,
replication_factor,
@@ -374,10 +325,9 @@ impl System {
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
+ layout_manager,
metrics,
- layout_watch,
- update_layout: Mutex::new(update_layout),
metadata_dir: config.metadata_dir.clone(),
data_dir: config.data_dir.clone(),
});
@@ -397,6 +347,20 @@ impl System {
);
}
+ // ---- Public utilities / accessors ----
+
+ pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
+ self.layout_manager.history()
+ }
+
+ pub fn layout_watch(&self) -> watch::Receiver<Arc<LayoutHistory>> {
+ self.layout_manager.layout_watch.clone()
+ }
+
+ pub fn rpc_helper(&self) -> &RpcHelper {
+ &self.layout_manager.rpc_helper
+ }
+
// ---- Administrative operations (directly available and
// also available through RPC) ----
@@ -423,18 +387,6 @@ impl System {
known_nodes
}
- pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
- self.layout_watch.borrow()
- }
-
- pub async fn update_cluster_layout(
- self: &Arc<Self>,
- layout: &LayoutHistory,
- ) -> Result<(), Error> {
- self.handle_advertise_cluster_layout(layout).await?;
- Ok(())
- }
-
pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node)
.await
@@ -464,7 +416,7 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- let layout: Arc<_> = self.layout_watch.borrow().clone();
+ let layout: Arc<_> = self.cluster_layout().clone();
let quorum = self.replication_mode.write_quorum();
let replication_factor = self.replication_factor;
@@ -581,20 +533,10 @@ impl System {
}
}
- /// Save network configuration to disc
- async fn save_cluster_layout(&self) -> Result<(), Error> {
- let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
- self.persist_cluster_layout
- .save_async(&layout)
- .await
- .expect("Cannot save current cluster layout");
- Ok(())
- }
-
fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
- let layout = self.layout_watch.borrow();
+ let layout = self.cluster_layout();
new_si.cluster_layout_version = layout.current().version;
new_si.cluster_layout_staging_hash = layout.staging_hash;
@@ -610,11 +552,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let layout = self.layout_watch.borrow().as_ref().clone();
- SystemRpc::AdvertiseClusterLayout(layout)
- }
-
fn handle_get_known_nodes(&self) -> SystemRpc {
let known_nodes = self.get_known_nodes();
SystemRpc::ReturnKnownNodes(known_nodes)
@@ -637,7 +574,10 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{
- tokio::spawn(self.clone().pull_cluster_layout(from));
+ tokio::spawn({
+ let system = self.clone();
+ async move { system.layout_manager.pull_cluster_layout(from).await }
+ });
}
self.node_status
@@ -648,57 +588,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- async fn handle_advertise_cluster_layout(
- self: &Arc<Self>,
- adv: &LayoutHistory,
- ) -> Result<SystemRpc, Error> {
- if adv.current().replication_factor != self.replication_factor {
- let msg = format!(
- "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
- adv.current().replication_factor,
- self.replication_factor
- );
- error!("{}", msg);
- return Err(Error::Message(msg));
- }
-
- let update_layout = self.update_layout.lock().await;
- // TODO: don't clone each time an AdvertiseClusterLayout is received
- let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
-
- let prev_layout_check = layout.check().is_ok();
- if layout.merge(adv) {
- if prev_layout_check && layout.check().is_err() {
- error!("New cluster layout is invalid, discarding.");
- return Err(Error::Message(
- "New cluster layout is invalid, discarding.".into(),
- ));
- }
-
- update_layout.send(Arc::new(layout.clone()))?;
- drop(update_layout);
-
- let self2 = self.clone();
- tokio::spawn(async move {
- if let Err(e) = self2
- .rpc
- .broadcast(
- &self2.system_endpoint,
- SystemRpc::AdvertiseClusterLayout(layout),
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await
- {
- warn!("Error while broadcasting new cluster layout: {}", e);
- }
- });
-
- self.save_cluster_layout().await?;
- }
-
- Ok(SystemRpc::Ok)
- }
-
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
@@ -706,7 +595,7 @@ impl System {
self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
let _ = self
- .rpc
+ .rpc_helper()
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
@@ -724,9 +613,9 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let not_configured = self.layout_watch.borrow().check().is_err();
+ let not_configured = self.cluster_layout().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
- let expected_n_nodes = self.layout_watch.borrow().current().num_nodes();
+ let expected_n_nodes = self.cluster_layout().current().num_nodes();
let bad_peers = self
.fullmesh
.get_peer_list()
@@ -831,34 +720,26 @@ impl System {
.save_async(&PeerList(peer_list))
.await
}
-
- async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
- let resp = self
- .rpc
- .call(
- &self.system_endpoint,
- peer,
- SystemRpc::PullClusterLayout,
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await;
- if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
- let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
- }
- }
}
#[async_trait]
impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg {
+ // ---- system functions -> System ----
SystemRpc::Connect(node) => self.handle_connect(node).await,
- SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()),
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
+ SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+
+ // ---- layout functions -> LayoutManager ----
+ SystemRpc::PullClusterLayout => Ok(self.layout_manager.handle_pull_cluster_layout()),
SystemRpc::AdvertiseClusterLayout(adv) => {
- self.clone().handle_advertise_cluster_layout(adv).await
+ self.layout_manager
+ .handle_advertise_cluster_layout(adv)
+ .await
}
- SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+
+ // ---- other -> Error ----
m => Err(Error::unexpected_rpc_message(m)),
}
}
@@ -962,6 +843,40 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip())
}
+fn get_rpc_public_addr(config: &Config) -> Option<SocketAddr> {
+ match &config.rpc_public_addr {
+ Some(a_str) => {
+ use std::net::ToSocketAddrs;
+ match a_str.to_socket_addrs() {
+ Err(e) => {
+ error!(
+ "Cannot resolve rpc_public_addr {} from config file: {}.",
+ a_str, e
+ );
+ None
+ }
+ Ok(a) => {
+ let a = a.collect::<Vec<_>>();
+ if a.is_empty() {
+ error!("rpc_public_addr {} resolve to no known IP address", a_str);
+ }
+ if a.len() > 1 {
+ warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
+ }
+ a.into_iter().next()
+ }
+ }
+ }
+ None => {
+ let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
+ if let Some(a) = addr {
+ warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
+ }
+ addr
+ }
+ }
+}
+
async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
let mut ret = vec![];