aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs100
1 files changed, 59 insertions, 41 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 3f5f7fb1..aa8947ea 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -23,12 +23,13 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
-use garage_util::data::Uuid;
+use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::*;
+use crate::layout::*;
use crate::ring::*;
use crate::rpc_helper::*;
@@ -48,13 +49,13 @@ pub enum SystemRpc {
Ok,
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format)
Connect(String),
- /// Ask other node its config. Answered with AdvertiseConfig
- PullConfig,
+ /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
+ PullClusterLayout,
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus),
- /// Advertisement of nodes config. Sent spontanously or in response to PullConfig
- AdvertiseConfig(NetworkConfig),
+ /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
+ AdvertiseClusterLayout(ClusterLayout),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
@@ -70,7 +71,7 @@ pub struct System {
/// The id of this node
pub id: Uuid,
- persist_config: Persister<NetworkConfig>,
+ persist_cluster_layout: Persister<ClusterLayout>,
persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
local_status: ArcSwap<NodeStatus>,
@@ -103,8 +104,10 @@ pub struct NodeStatus {
pub hostname: String,
/// Replication factor configured on the node
pub replication_factor: usize,
- /// Configuration version
- pub config_version: u64,
+ /// Cluster layout version
+ pub cluster_layout_version: u64,
+ /// Hash of cluster layout staging data
+ pub cluster_layout_staging_hash: Hash,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -187,17 +190,17 @@ impl System {
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!("Node public key: {}", hex::encode(&node_key.public_key()));
- let persist_config = Persister::new(&config.metadata_dir, "network_config");
+ let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
- let net_config = match persist_config.load() {
+ let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => x,
Err(e) => {
info!(
- "No valid previous network configuration stored ({}), starting fresh.",
+ "No valid previous cluster layout stored ({}), starting fresh.",
e
);
- NetworkConfig::new()
+ ClusterLayout::new(replication_factor)
}
};
@@ -206,10 +209,11 @@ impl System {
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
replication_factor,
- config_version: net_config.version,
+ cluster_layout_version: cluster_layout.version,
+ cluster_layout_staging_hash: cluster_layout.staging_hash,
};
- let ring = Ring::new(net_config, replication_factor);
+ let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
if let Some(addr) = config.rpc_public_addr {
@@ -229,7 +233,7 @@ impl System {
let sys = Arc::new(System {
id: netapp.id.into(),
- persist_config,
+ persist_cluster_layout,
persist_peer_list,
local_status: ArcSwap::new(Arc::new(local_status)),
node_status: RwLock::new(HashMap::new()),
@@ -292,12 +296,12 @@ impl System {
}
/// Save network configuration to disc
- async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
+ async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
- self.persist_config
- .save_async(&ring.config)
+ self.persist_cluster_layout
+ .save_async(&ring.layout)
.await
- .expect("Cannot save current cluster configuration");
+ .expect("Cannot save current cluster layout");
Ok(())
}
@@ -305,7 +309,8 @@ impl System {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
let ring = self.ring.borrow();
- new_si.config_version = ring.config.version;
+ new_si.cluster_layout_version = ring.layout.version;
+ new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
self.local_status.swap(Arc::new(new_si));
}
@@ -337,9 +342,9 @@ impl System {
)));
}
- fn handle_pull_config(&self) -> SystemRpc {
+ fn handle_pull_cluster_layout(&self) -> SystemRpc {
let ring = self.ring.borrow().clone();
- SystemRpc::AdvertiseConfig(ring.config.clone())
+ SystemRpc::AdvertiseClusterLayout(ring.layout.clone())
}
fn handle_get_known_nodes(&self) -> SystemRpc {
@@ -360,7 +365,8 @@ impl System {
.unwrap_or(NodeStatus {
hostname: "?".to_string(),
replication_factor: 0,
- config_version: 0,
+ cluster_layout_version: 0,
+ cluster_layout_staging_hash: Hash::from([0u8; 32]),
}),
})
.collect::<Vec<_>>();
@@ -381,10 +387,12 @@ impl System {
std::process::exit(1);
}
- if info.config_version > local_info.config_version {
+ if info.cluster_layout_version > local_info.cluster_layout_version
+ || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
+ {
let self2 = self.clone();
self.background.spawn_cancellable(async move {
- self2.pull_config(from).await;
+ self2.pull_cluster_layout(from).await;
Ok(())
});
}
@@ -397,32 +405,39 @@ impl System {
Ok(SystemRpc::Ok)
}
- async fn handle_advertise_config(
+ async fn handle_advertise_cluster_layout(
self: Arc<Self>,
- adv: &NetworkConfig,
+ adv: &ClusterLayout,
) -> Result<SystemRpc, Error> {
let update_ring = self.update_ring.lock().await;
- let ring: Arc<Ring> = self.ring.borrow().clone();
+ let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
+
+ let prev_layout_check = layout.check();
+ if layout.merge(adv) {
+ if prev_layout_check && !layout.check() {
+ error!("New cluster layout is invalid, discarding.");
+ return Err(Error::Message(
+ "New cluster layout is invalid, discarding.".into(),
+ ));
+ }
- if adv.version > ring.config.version {
- let ring = Ring::new(adv.clone(), self.replication_factor);
+ let ring = Ring::new(layout.clone(), self.replication_factor);
update_ring.send(Arc::new(ring))?;
drop(update_ring);
let self2 = self.clone();
- let adv = adv.clone();
self.background.spawn_cancellable(async move {
self2
.rpc
.broadcast(
&self2.system_endpoint,
- SystemRpc::AdvertiseConfig(adv),
+ SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
Ok(())
});
- self.background.spawn(self.clone().save_network_config());
+ self.background.spawn(self.clone().save_cluster_layout());
}
Ok(SystemRpc::Ok)
@@ -456,14 +471,15 @@ impl System {
};
while !*stop_signal.borrow() {
- let not_configured = self.ring.borrow().config.members.is_empty();
+ let not_configured = !self.ring.borrow().layout.check();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
+ let expected_n_nodes = self.ring.borrow().layout.num_nodes();
let bad_peers = self
.fullmesh
.get_peer_list()
.iter()
.filter(|p| p.is_up())
- .count() != self.ring.borrow().config.members.len();
+ .count() != expected_n_nodes;
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
@@ -533,18 +549,18 @@ impl System {
self.persist_peer_list.save_async(&peer_list).await
}
- async fn pull_config(self: Arc<Self>, peer: Uuid) {
+ async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
let resp = self
.rpc
.call(
&self.system_endpoint,
peer,
- SystemRpc::PullConfig,
+ SystemRpc::PullClusterLayout,
RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT),
)
.await;
- if let Ok(SystemRpc::AdvertiseConfig(config)) = resp {
- let _: Result<_, _> = self.handle_advertise_config(&config).await;
+ if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
+ let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
}
}
}
@@ -554,9 +570,11 @@ impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg {
SystemRpc::Connect(node) => self.handle_connect(node).await,
- SystemRpc::PullConfig => Ok(self.handle_pull_config()),
+ SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()),
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
- SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(adv).await,
+ SystemRpc::AdvertiseClusterLayout(adv) => {
+ self.clone().handle_advertise_cluster_layout(adv).await
+ }
SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
}