diff options
author | Alex Auvolat <alex@adnab.me> | 2021-11-09 12:24:04 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-11-16 16:05:53 +0100 |
commit | c94406f4282d48e2e2ac82ffb57eafaad23f7edc (patch) | |
tree | 01fe1b272e18fdae993e2207d8d3aea4a301ec56 /src/rpc/system.rs | |
parent | 53888995bdd7c672d2e3ab8bb6a3529195c127a9 (diff) | |
download | garage-c94406f4282d48e2e2ac82ffb57eafaad23f7edc.tar.gz garage-c94406f4282d48e2e2ac82ffb57eafaad23f7edc.zip |
Improve how node roles are assigned in Garagev0.5-beta1
- change the terminology: the network configuration becomes the role
table, the configuration of a nodes becomes a node's role
- the modification of the role table takes place in two steps: first,
changes are staged in a CRDT data structure. Then, once the user is
happy with the changes, they can commit them all at once (or revert
them).
- update documentation
- fix tests
- implement smarter partition assignation algorithm
This patch breaks the format of the network configuration: when
migrating, the cluster will be in a state where no roles are assigned.
All roles must be re-assigned and commited at once. This migration
should not pose an issue.
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r-- | src/rpc/system.rs | 100 |
1 files changed, 59 insertions, 41 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 3f5f7fb1..aa8947ea 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -23,12 +23,13 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_util::background::BackgroundRunner; use garage_util::config::Config; -use garage_util::data::Uuid; +use garage_util::data::*; use garage_util::error::*; use garage_util::persister::Persister; use garage_util::time::*; use crate::consul::*; +use crate::layout::*; use crate::ring::*; use crate::rpc_helper::*; @@ -48,13 +49,13 @@ pub enum SystemRpc { Ok, /// Request to connect to a specific node (in <pubkey>@<host>:<port> format) Connect(String), - /// Ask other node its config. Answered with AdvertiseConfig - PullConfig, + /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout + PullClusterLayout, /// Advertise Garage status. Answered with another AdvertiseStatus. /// Exchanged with every node on a regular basis. AdvertiseStatus(NodeStatus), - /// Advertisement of nodes config. Sent spontanously or in response to PullConfig - AdvertiseConfig(NetworkConfig), + /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout + AdvertiseClusterLayout(ClusterLayout), /// Get known nodes states GetKnownNodes, /// Return known nodes @@ -70,7 +71,7 @@ pub struct System { /// The id of this node pub id: Uuid, - persist_config: Persister<NetworkConfig>, + persist_cluster_layout: Persister<ClusterLayout>, persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>, local_status: ArcSwap<NodeStatus>, @@ -103,8 +104,10 @@ pub struct NodeStatus { pub hostname: String, /// Replication factor configured on the node pub replication_factor: usize, - /// Configuration version - pub config_version: u64, + /// Cluster layout version + pub cluster_layout_version: u64, + /// Hash of cluster layout staging data + pub cluster_layout_staging_hash: Hash, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -187,17 +190,17 @@ impl System { gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!("Node public key: {}", hex::encode(&node_key.public_key())); - let persist_config = Persister::new(&config.metadata_dir, "network_config"); + let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); - let net_config = match persist_config.load() { + let cluster_layout = match persist_cluster_layout.load() { Ok(x) => x, Err(e) => { info!( - "No valid previous network configuration stored ({}), starting fresh.", + "No valid previous cluster layout stored ({}), starting fresh.", e ); - NetworkConfig::new() + ClusterLayout::new(replication_factor) } }; @@ -206,10 +209,11 @@ impl System { .into_string() .unwrap_or_else(|_| "<invalid utf-8>".to_string()), replication_factor, - config_version: net_config.version, + cluster_layout_version: cluster_layout.version, + cluster_layout_staging_hash: cluster_layout.staging_hash, }; - let ring = Ring::new(net_config, replication_factor); + let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); if let Some(addr) = config.rpc_public_addr { @@ -229,7 +233,7 @@ impl System { let sys = Arc::new(System { id: netapp.id.into(), - persist_config, + persist_cluster_layout, persist_peer_list, local_status: ArcSwap::new(Arc::new(local_status)), node_status: RwLock::new(HashMap::new()), @@ -292,12 +296,12 @@ impl System { } /// Save network configuration to disc - async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { + async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> { let ring: Arc<Ring> = self.ring.borrow().clone(); - self.persist_config - .save_async(&ring.config) + self.persist_cluster_layout + .save_async(&ring.layout) .await - .expect("Cannot save current cluster configuration"); + .expect("Cannot save current cluster layout"); Ok(()) } @@ -305,7 +309,8 @@ impl System { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let ring = self.ring.borrow(); - new_si.config_version = ring.config.version; + new_si.cluster_layout_version = ring.layout.version; + new_si.cluster_layout_staging_hash = ring.layout.staging_hash; self.local_status.swap(Arc::new(new_si)); } @@ -337,9 +342,9 @@ impl System { ))); } - fn handle_pull_config(&self) -> SystemRpc { + fn handle_pull_cluster_layout(&self) -> SystemRpc { let ring = self.ring.borrow().clone(); - SystemRpc::AdvertiseConfig(ring.config.clone()) + SystemRpc::AdvertiseClusterLayout(ring.layout.clone()) } fn handle_get_known_nodes(&self) -> SystemRpc { @@ -360,7 +365,8 @@ impl System { .unwrap_or(NodeStatus { hostname: "?".to_string(), replication_factor: 0, - config_version: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), }), }) .collect::<Vec<_>>(); @@ -381,10 +387,12 @@ impl System { std::process::exit(1); } - if info.config_version > local_info.config_version { + if info.cluster_layout_version > local_info.cluster_layout_version + || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash + { let self2 = self.clone(); self.background.spawn_cancellable(async move { - self2.pull_config(from).await; + self2.pull_cluster_layout(from).await; Ok(()) }); } @@ -397,32 +405,39 @@ impl System { Ok(SystemRpc::Ok) } - async fn handle_advertise_config( + async fn handle_advertise_cluster_layout( self: Arc<Self>, - adv: &NetworkConfig, + adv: &ClusterLayout, ) -> Result<SystemRpc, Error> { let update_ring = self.update_ring.lock().await; - let ring: Arc<Ring> = self.ring.borrow().clone(); + let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); + + let prev_layout_check = layout.check(); + if layout.merge(adv) { + if prev_layout_check && !layout.check() { + error!("New cluster layout is invalid, discarding."); + return Err(Error::Message( + "New cluster layout is invalid, discarding.".into(), + )); + } - if adv.version > ring.config.version { - let ring = Ring::new(adv.clone(), self.replication_factor); + let ring = Ring::new(layout.clone(), self.replication_factor); update_ring.send(Arc::new(ring))?; drop(update_ring); let self2 = self.clone(); - let adv = adv.clone(); self.background.spawn_cancellable(async move { self2 .rpc .broadcast( &self2.system_endpoint, - SystemRpc::AdvertiseConfig(adv), + SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) .await; Ok(()) }); - self.background.spawn(self.clone().save_network_config()); + self.background.spawn(self.clone().save_cluster_layout()); } Ok(SystemRpc::Ok) @@ -456,14 +471,15 @@ impl System { }; while !*stop_signal.borrow() { - let not_configured = self.ring.borrow().config.members.is_empty(); + let not_configured = !self.ring.borrow().layout.check(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; + let expected_n_nodes = self.ring.borrow().layout.num_nodes(); let bad_peers = self .fullmesh .get_peer_list() .iter() .filter(|p| p.is_up()) - .count() != self.ring.borrow().config.members.len(); + .count() != expected_n_nodes; if not_configured || no_peers || bad_peers { info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); @@ -533,18 +549,18 @@ impl System { self.persist_peer_list.save_async(&peer_list).await } - async fn pull_config(self: Arc<Self>, peer: Uuid) { + async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) { let resp = self .rpc .call( &self.system_endpoint, peer, - SystemRpc::PullConfig, + SystemRpc::PullClusterLayout, RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), ) .await; - if let Ok(SystemRpc::AdvertiseConfig(config)) = resp { - let _: Result<_, _> = self.handle_advertise_config(&config).await; + if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { + let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; } } } @@ -554,9 +570,11 @@ impl EndpointHandler<SystemRpc> for System { async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> { match msg { SystemRpc::Connect(node) => self.handle_connect(node).await, - SystemRpc::PullConfig => Ok(self.handle_pull_config()), + SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()), SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await, - SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(adv).await, + SystemRpc::AdvertiseClusterLayout(adv) => { + self.clone().handle_advertise_cluster_layout(adv).await + } SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), } |