diff options
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r-- | src/rpc/system.rs | 431 |
1 files changed, 181 insertions, 250 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 4cec369b..f22247c3 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -1,10 +1,10 @@ //! Module containing structs related to membership management -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; use std::sync::atomic::Ordering; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; use arc_swap::ArcSwap; @@ -13,8 +13,7 @@ use futures::join; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::sign::ed25519; use tokio::select; -use tokio::sync::watch; -use tokio::sync::Mutex; +use tokio::sync::{watch, Notify}; use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::message::*; @@ -34,9 +33,10 @@ use garage_util::time::*; use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; -use crate::layout::*; +use crate::layout::{ + self, manager::LayoutManager, LayoutHelper, LayoutHistory, NodeRoleV, RpcLayoutDigest, +}; use crate::replication_mode::*; -use crate::ring::*; use crate::rpc_helper::*; use crate::system_metrics::*; @@ -47,10 +47,10 @@ const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); /// Version tag used for version check upon Netapp connection. /// Cluster nodes with different version tags are deemed /// incompatible and will refuse to connect. -pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008 +pub const GARAGE_VERSION_TAG: u64 = 0x676172616765000A; // garage 0x000A /// RPC endpoint used for calls related to membership -pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; +pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc"; /// RPC messages related to membership #[derive(Debug, Serialize, Deserialize, Clone)] @@ -59,17 +59,22 @@ pub enum SystemRpc { Ok, /// Request to connect to a specific node (in <pubkey>@<host>:<port> format, pubkey = full-length node ID) Connect(String), - /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout - PullClusterLayout, /// Advertise Garage status. Answered with another AdvertiseStatus. /// Exchanged with every node on a regular basis. AdvertiseStatus(NodeStatus), - /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout - AdvertiseClusterLayout(ClusterLayout), /// Get known nodes states GetKnownNodes, /// Return known nodes ReturnKnownNodes(Vec<KnownNodeInfo>), + + /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout + PullClusterLayout, + /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout + AdvertiseClusterLayout(LayoutHistory), + /// Ask other node its cluster layout update trackers. + PullClusterLayoutTrackers, + /// Advertisement of cluster layout update trackers. + AdvertiseClusterLayoutTrackers(layout::UpdateTrackers), } impl Rpc for SystemRpc { @@ -85,7 +90,6 @@ pub struct System { /// The id of this node pub id: Uuid, - persist_cluster_layout: Persister<ClusterLayout>, persist_peer_list: Persister<PeerList>, local_status: ArcSwap<NodeStatus>, @@ -93,9 +97,8 @@ pub struct System { pub netapp: Arc<NetApp>, fullmesh: Arc<FullMeshPeeringStrategy>, - pub rpc: RpcHelper, - system_endpoint: Arc<Endpoint<SystemRpc, System>>, + pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>, rpc_listen_addr: SocketAddr, #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] @@ -107,15 +110,13 @@ pub struct System { #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option<KubernetesDiscoveryConfig>, + pub layout_manager: Arc<LayoutManager>, + metrics: SystemMetrics, replication_mode: ReplicationMode, replication_factor: usize, - /// The ring - pub ring: watch::Receiver<Arc<Ring>>, - update_ring: Mutex<watch::Sender<Arc<Ring>>>, - /// Path to metadata directory pub metadata_dir: PathBuf, /// Path to data directory @@ -125,14 +126,13 @@ pub struct System { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeStatus { /// Hostname of the node - pub hostname: String, + pub hostname: Option<String>, /// Replication factor configured on the node pub replication_factor: usize, - /// Cluster layout version - pub cluster_layout_version: u64, - /// Hash of cluster layout staging data - pub cluster_layout_staging_hash: Hash, + + /// Cluster layout digest + pub layout_digest: RpcLayoutDigest, /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) #[serde(default)] @@ -248,8 +248,7 @@ impl System { replication_mode: ReplicationMode, config: &Config, ) -> Result<Arc<Self>, Error> { - let replication_factor = replication_mode.replication_factor(); - + // ---- setup netapp RPC protocol ---- let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!( @@ -257,82 +256,40 @@ impl System { hex::encode(&node_key.public_key()[..8]) ); - let persist_cluster_layout: Persister<ClusterLayout> = - Persister::new(&config.metadata_dir, "cluster_layout"); - let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); - - let cluster_layout = match persist_cluster_layout.load() { - Ok(x) => { - if x.replication_factor != replication_factor { - return Err(Error::Message(format!( - "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", - x.replication_factor, - replication_factor - ))); - } - x - } - Err(e) => { - info!( - "No valid previous cluster layout stored ({}), starting fresh.", - e - ); - ClusterLayout::new(replication_factor) - } - }; - - let metrics = SystemMetrics::new(replication_factor); - - let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); - local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); + let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); - let ring = Ring::new(cluster_layout, replication_factor); - let (update_ring, ring) = watch::channel(Arc::new(ring)); - - let rpc_public_addr = match &config.rpc_public_addr { - Some(a_str) => { - use std::net::ToSocketAddrs; - match a_str.to_socket_addrs() { - Err(e) => { - error!( - "Cannot resolve rpc_public_addr {} from config file: {}.", - a_str, e - ); - None - } - Ok(a) => { - let a = a.collect::<Vec<_>>(); - if a.is_empty() { - error!("rpc_public_addr {} resolve to no known IP address", a_str); - } - if a.len() > 1 { - warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); - } - a.into_iter().next() - } - } - } - None => { - let addr = - get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); - if let Some(a) = addr { - warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a); - } - addr - } - }; + // ---- setup netapp public listener and full mesh peering strategy ---- + let rpc_public_addr = get_rpc_public_addr(config); if rpc_public_addr.is_none() { warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication."); } - let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); if let Some(ping_timeout) = config.rpc_ping_timeout_msec { fullmesh.set_ping_timeout_millis(ping_timeout); } - let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); + let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); + + // ---- setup cluster layout and layout manager ---- + let replication_factor = replication_mode.replication_factor(); + + let layout_manager = LayoutManager::new( + config, + netapp.id, + system_endpoint.clone(), + fullmesh.clone(), + replication_mode, + )?; + + // ---- set up metrics and status exchange ---- + let metrics = SystemMetrics::new(replication_factor); + + let mut local_status = NodeStatus::initial(replication_factor, &layout_manager); + local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); + // ---- if enabled, set up additionnal peer discovery methods ---- #[cfg(feature = "consul-discovery")] let consul_discovery = match &config.consul_discovery { Some(cfg) => Some( @@ -351,20 +308,14 @@ impl System { warn!("Kubernetes discovery is not enabled in this build."); } + // ---- done ---- let sys = Arc::new(System { id: netapp.id.into(), - persist_cluster_layout, persist_peer_list, local_status: ArcSwap::new(Arc::new(local_status)), node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), - fullmesh: fullmesh.clone(), - rpc: RpcHelper::new( - netapp.id.into(), - fullmesh, - ring.clone(), - config.rpc_timeout_msec.map(Duration::from_millis), - ), + fullmesh, system_endpoint, replication_mode, replication_factor, @@ -376,10 +327,9 @@ impl System { consul_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), + layout_manager, metrics, - ring, - update_ring: Mutex::new(update_ring), metadata_dir: config.metadata_dir.clone(), data_dir: config.data_dir.clone(), }); @@ -399,6 +349,20 @@ impl System { ); } + // ---- Public utilities / accessors ---- + + pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHelper> { + self.layout_manager.layout() + } + + pub fn layout_notify(&self) -> Arc<Notify> { + self.layout_manager.change_notify.clone() + } + + pub fn rpc_helper(&self) -> &RpcHelper { + &self.layout_manager.rpc_helper + } + // ---- Administrative operations (directly available and // also available through RPC) ---- @@ -425,18 +389,6 @@ impl System { known_nodes } - pub fn get_cluster_layout(&self) -> ClusterLayout { - self.ring.borrow().layout.clone() - } - - pub async fn update_cluster_layout( - self: &Arc<Self>, - layout: &ClusterLayout, - ) -> Result<(), Error> { - self.handle_advertise_cluster_layout(layout).await?; - Ok(()) - } - pub async fn connect(&self, node: &str) -> Result<(), Error> { let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node) .await @@ -466,47 +418,63 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let ring: Arc<_> = self.ring.borrow().clone(); let quorum = self.replication_mode.write_quorum(); - let replication_factor = self.replication_factor; + // Gather information about running nodes. + // Technically, `nodes` contains currently running nodes, as well + // as nodes that this Garage process has been connected to at least + // once since it started. let nodes = self .get_known_nodes() .into_iter() .map(|n| (n.id, n)) .collect::<HashMap<Uuid, _>>(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); + let node_up = |x: &Uuid| nodes.get(x).map(|n| n.is_up).unwrap_or(false); + + // Acquire a rwlock read-lock to the current cluster layout + let layout = self.cluster_layout(); + + // Obtain information about nodes that have a role as storage nodes + // in one of the active layout versions + let mut storage_nodes = HashSet::<Uuid>::with_capacity(16); + for ver in layout.versions.iter() { + storage_nodes.extend( + ver.roles + .items() + .iter() + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some())) + .map(|(n, _, _)| *n), + ) + } + let storage_nodes_ok = storage_nodes.iter().filter(|x| node_up(x)).count(); + + // Determine the number of partitions that have: + // - a quorum of up nodes for all write sets (i.e. are available) + // - for which all nodes in all write sets are up (i.e. are fully healthy) + let partitions = layout.current().partitions().collect::<Vec<_>>(); + let mut partitions_quorum = 0; + let mut partitions_all_ok = 0; + for (_, hash) in partitions.iter() { + let mut write_sets = layout + .versions + .iter() + .map(|x| x.nodes_of(hash, x.replication_factor)); + let has_quorum = write_sets + .clone() + .all(|set| set.filter(|x| node_up(x)).count() >= quorum); + let all_ok = write_sets.all(|mut set| set.all(|x| node_up(&x))); + if has_quorum { + partitions_quorum += 1; + } + if all_ok { + partitions_all_ok += 1; + } + } - let storage_nodes = ring - .layout - .roles - .items() - .iter() - .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some())) - .collect::<Vec<_>>(); - let storage_nodes_ok = storage_nodes - .iter() - .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) - .count(); - - let partitions = ring.partitions(); - let partitions_n_up = partitions - .iter() - .map(|(_, h)| { - let pn = ring.get_nodes(h, ring.replication_factor); - pn.iter() - .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) - .count() - }) - .collect::<Vec<usize>>(); - let partitions_all_ok = partitions_n_up - .iter() - .filter(|c| **c == replication_factor) - .count(); - let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count(); - + // Determine overall cluster status let status = - if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() { + if partitions_all_ok == partitions.len() && storage_nodes_ok == storage_nodes.len() { ClusterHealthStatus::Healthy } else if partitions_quorum == partitions.len() { ClusterHealthStatus::Degraded @@ -546,7 +514,7 @@ impl System { if let Err(e) = c .publish_consul_service( self.netapp.id, - &self.local_status.load_full().hostname, + &self.local_status.load_full().hostname.as_deref().unwrap(), rpc_public_addr, ) .await @@ -573,7 +541,7 @@ impl System { if let Err(e) = publish_kubernetes_node( k, self.netapp.id, - &self.local_status.load_full().hostname, + &self.local_status.load_full().hostname.as_deref().unwrap(), rpc_public_addr, ) .await @@ -582,22 +550,10 @@ impl System { } } - /// Save network configuration to disc - async fn save_cluster_layout(&self) -> Result<(), Error> { - let ring: Arc<Ring> = self.ring.borrow().clone(); - self.persist_cluster_layout - .save_async(&ring.layout) - .await - .expect("Cannot save current cluster layout"); - Ok(()) - } - fn update_local_status(&self) { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); - let ring = self.ring.borrow(); - new_si.cluster_layout_version = ring.layout.version; - new_si.cluster_layout_staging_hash = ring.layout.staging_hash; + new_si.layout_digest = self.layout_manager.layout().digest(); new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); @@ -611,11 +567,6 @@ impl System { Ok(SystemRpc::Ok) } - fn handle_pull_cluster_layout(&self) -> SystemRpc { - let ring = self.ring.borrow().clone(); - SystemRpc::AdvertiseClusterLayout(ring.layout.clone()) - } - fn handle_get_known_nodes(&self) -> SystemRpc { let known_nodes = self.get_known_nodes(); SystemRpc::ReturnKnownNodes(known_nodes) @@ -635,11 +586,8 @@ impl System { std::process::exit(1); } - if info.cluster_layout_version > local_info.cluster_layout_version - || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash - { - tokio::spawn(self.clone().pull_cluster_layout(from)); - } + self.layout_manager + .handle_advertise_status(from, &info.layout_digest); self.node_status .write() @@ -649,57 +597,6 @@ impl System { Ok(SystemRpc::Ok) } - async fn handle_advertise_cluster_layout( - self: &Arc<Self>, - adv: &ClusterLayout, - ) -> Result<SystemRpc, Error> { - if adv.replication_factor != self.replication_factor { - let msg = format!( - "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", - adv.replication_factor, - self.replication_factor - ); - error!("{}", msg); - return Err(Error::Message(msg)); - } - - let update_ring = self.update_ring.lock().await; - let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); - - let prev_layout_check = layout.check().is_ok(); - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - error!("New cluster layout is invalid, discarding."); - return Err(Error::Message( - "New cluster layout is invalid, discarding.".into(), - )); - } - - let ring = Ring::new(layout.clone(), self.replication_factor); - update_ring.send(Arc::new(ring))?; - drop(update_ring); - - let self2 = self.clone(); - tokio::spawn(async move { - if let Err(e) = self2 - .rpc - .broadcast( - &self2.system_endpoint, - SystemRpc::AdvertiseClusterLayout(layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); - } - }); - - self.save_cluster_layout().await?; - } - - Ok(SystemRpc::Ok) - } - async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL; @@ -707,7 +604,7 @@ impl System { self.update_local_status(); let local_status: NodeStatus = self.local_status.load().as_ref().clone(); let _ = self - .rpc + .rpc_helper() .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), @@ -725,9 +622,9 @@ impl System { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { - let not_configured = self.ring.borrow().layout.check().is_err(); + let not_configured = self.cluster_layout().check().is_err(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; - let expected_n_nodes = self.ring.borrow().layout.num_nodes(); + let expected_n_nodes = self.cluster_layout().all_nodes().len(); let bad_peers = self .fullmesh .get_peer_list() @@ -832,48 +729,49 @@ impl System { .save_async(&PeerList(peer_list)) .await } - - async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) { - let resp = self - .rpc - .call( - &self.system_endpoint, - peer, - SystemRpc::PullClusterLayout, - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await; - if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { - let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; - } - } } #[async_trait] impl EndpointHandler<SystemRpc> for System { async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> { match msg { + // ---- system functions -> System ---- SystemRpc::Connect(node) => self.handle_connect(node).await, - SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()), SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await, + SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), + + // ---- layout functions -> LayoutManager ---- + SystemRpc::PullClusterLayout => Ok(self.layout_manager.handle_pull_cluster_layout()), SystemRpc::AdvertiseClusterLayout(adv) => { - self.clone().handle_advertise_cluster_layout(adv).await + self.layout_manager + .handle_advertise_cluster_layout(adv) + .await } - SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), + SystemRpc::PullClusterLayoutTrackers => { + Ok(self.layout_manager.handle_pull_cluster_layout_trackers()) + } + SystemRpc::AdvertiseClusterLayoutTrackers(adv) => { + self.layout_manager + .handle_advertise_cluster_layout_trackers(adv) + .await + } + + // ---- other -> Error ---- m => Err(Error::unexpected_rpc_message(m)), } } } impl NodeStatus { - fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self { + fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self { NodeStatus { - hostname: gethostname::gethostname() - .into_string() - .unwrap_or_else(|_| "<invalid utf-8>".to_string()), + hostname: Some( + gethostname::gethostname() + .into_string() + .unwrap_or_else(|_| "<invalid utf-8>".to_string()), + ), replication_factor, - cluster_layout_version: layout.version, - cluster_layout_staging_hash: layout.staging_hash, + layout_digest: layout_manager.layout().digest(), meta_disk_avail: None, data_disk_avail: None, } @@ -881,10 +779,9 @@ impl NodeStatus { fn unknown() -> Self { NodeStatus { - hostname: "?".to_string(), + hostname: None, replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), + layout_digest: Default::default(), meta_disk_avail: None, data_disk_avail: None, } @@ -963,6 +860,40 @@ fn get_default_ip() -> Option<IpAddr> { .map(|a| a.ip()) } +fn get_rpc_public_addr(config: &Config) -> Option<SocketAddr> { + match &config.rpc_public_addr { + Some(a_str) => { + use std::net::ToSocketAddrs; + match a_str.to_socket_addrs() { + Err(e) => { + error!( + "Cannot resolve rpc_public_addr {} from config file: {}.", + a_str, e + ); + None + } + Ok(a) => { + let a = a.collect::<Vec<_>>(); + if a.is_empty() { + error!("rpc_public_addr {} resolve to no known IP address", a_str); + } + if a.len() > 1 { + warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); + } + a.into_iter().next() + } + } + } + None => { + let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); + if let Some(a) = addr { + warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a); + } + addr + } + } +} + async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { let mut ret = vec![]; |