diff options
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r-- | src/rpc/system.rs | 252 |
1 files changed, 179 insertions, 73 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 34031b10..7eb25195 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -16,9 +16,9 @@ use tokio::sync::watch; use tokio::sync::Mutex; use netapp::endpoint::{Endpoint, EndpointHandler}; +use netapp::message::*; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -use netapp::proto::*; -use netapp::util::parse_and_resolve_peer_addr; +use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_util::background::BackgroundRunner; @@ -37,10 +37,11 @@ use crate::rpc_helper::*; const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); -const PING_TIMEOUT: Duration = Duration::from_secs(2); -/// Version tag used for version check upon Netapp connection -pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650007; // garage 0x0007 +/// Version tag used for version check upon Netapp connection. +/// Cluster nodes with different version tags are deemed +/// incompatible and will refuse to connect. +pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008 /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; @@ -90,7 +91,7 @@ pub struct System { rpc_listen_addr: SocketAddr, rpc_public_addr: Option<SocketAddr>, - bootstrap_peers: Vec<(NodeID, SocketAddr)>, + bootstrap_peers: Vec<String>, consul_discovery: Option<ConsulDiscoveryParam>, #[cfg(feature = "kubernetes-discovery")] @@ -104,6 +105,9 @@ pub struct System { /// The job runner of this node pub background: Arc<BackgroundRunner>, + + /// Path to metadata directory + pub metadata_dir: PathBuf, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -194,7 +198,7 @@ impl System { replication_factor: usize, zone_redundancy: usize, config: &Config, - ) -> Arc<Self> { + ) -> Result<Arc<Self>, Error> { let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!( @@ -202,11 +206,21 @@ impl System { hex::encode(&node_key.public_key()[..8]) ); - let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout"); + let persist_cluster_layout: Persister<ClusterLayout> = + Persister::new(&config.metadata_dir, "cluster_layout"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); let cluster_layout = match persist_cluster_layout.load() { - Ok(x) => x, + Ok(x) => { + if x.replication_factor != replication_factor { + return Err(Error::Message(format!( + "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", + x.replication_factor, + replication_factor + ))); + } + x + } Err(e) => { info!( "No valid previous cluster layout stored ({}), starting fresh.", @@ -228,8 +242,29 @@ impl System { let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); - let rpc_public_addr = match config.rpc_public_addr { - Some(a) => Some(a), + let rpc_public_addr = match &config.rpc_public_addr { + Some(a_str) => { + use std::net::ToSocketAddrs; + match a_str.to_socket_addrs() { + Err(e) => { + error!( + "Cannot resolve rpc_public_addr {} from config file: {}.", + a_str, e + ); + None + } + Ok(a) => { + let a = a.collect::<Vec<_>>(); + if a.is_empty() { + error!("rpc_public_addr {} resolve to no known IP address", a_str); + } + if a.len() > 1 { + warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); + } + a.into_iter().next() + } + } + } None => { let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); @@ -239,13 +274,15 @@ impl System { addr } }; + if rpc_public_addr.is_none() { + warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication."); + } let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); - let fullmesh = FullMeshPeeringStrategy::new( - netapp.clone(), - config.bootstrap_peers.clone(), - rpc_public_addr, - ); + let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); + if let Some(ping_timeout) = config.rpc_ping_timeout_msec { + fullmesh.set_ping_timeout_millis(ping_timeout); + } let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); @@ -283,7 +320,13 @@ impl System { node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), fullmesh: fullmesh.clone(), - rpc: RpcHelper::new(netapp.id.into(), fullmesh, background.clone(), ring.clone()), + rpc: RpcHelper::new( + netapp.id.into(), + fullmesh, + background.clone(), + ring.clone(), + config.rpc_timeout_msec.map(Duration::from_millis), + ), system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, @@ -296,9 +339,10 @@ impl System { ring, update_ring: Mutex::new(update_ring), background, + metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); - sys + Ok(sys) } /// Perform bootstraping, starting the ping loop @@ -313,6 +357,80 @@ impl System { ); } + // ---- Administrative operations (directly available and + // also available through RPC) ---- + + pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> { + let node_status = self.node_status.read().unwrap(); + let known_nodes = self + .fullmesh + .get_peer_list() + .iter() + .map(|n| KnownNodeInfo { + id: n.id.into(), + addr: n.addr, + is_up: n.is_up(), + last_seen_secs_ago: n + .last_seen + .map(|t| (Instant::now().saturating_duration_since(t)).as_secs()), + status: node_status + .get(&n.id.into()) + .cloned() + .map(|(_, st)| st) + .unwrap_or(NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), + }), + }) + .collect::<Vec<_>>(); + known_nodes + } + + pub fn get_cluster_layout(&self) -> ClusterLayout { + self.ring.borrow().layout.clone() + } + + pub async fn update_cluster_layout( + self: &Arc<Self>, + layout: &ClusterLayout, + ) -> Result<(), Error> { + self.handle_advertise_cluster_layout(layout).await?; + Ok(()) + } + + pub async fn connect(&self, node: &str) -> Result<(), Error> { + let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node) + .await + .ok_or_else(|| { + Error::Message(format!( + "Unable to parse or resolve node specification: {}", + node + )) + })?; + let mut errors = vec![]; + for ip in addrs.iter() { + match self + .netapp + .clone() + .try_connect(*ip, pubkey) + .await + .err_context(CONNECT_ERROR_MESSAGE) + { + Ok(()) => return Ok(()), + Err(e) => { + errors.push((*ip, e)); + } + } + } + if errors.len() == 1 { + Err(Error::Message(errors[0].1.to_string())) + } else { + Err(Error::Message(format!("{:?}", errors))) + } + } + // ---- INTERNALS ---- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { @@ -385,32 +503,11 @@ impl System { self.local_status.swap(Arc::new(new_si)); } + // --- RPC HANDLERS --- + async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> { - let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { - Error::Message(format!( - "Unable to parse or resolve node specification: {}", - node - )) - })?; - let mut errors = vec![]; - for ip in addrs.iter() { - match self - .netapp - .clone() - .try_connect(*ip, pubkey) - .await - .err_context(CONNECT_ERROR_MESSAGE) - { - Ok(()) => return Ok(SystemRpc::Ok), - Err(e) => { - errors.push((*ip, e)); - } - } - } - return Err(Error::Message(format!( - "Could not connect to specified peers. Errors: {:?}", - errors - ))); + self.connect(node).await?; + Ok(SystemRpc::Ok) } fn handle_pull_cluster_layout(&self) -> SystemRpc { @@ -419,28 +516,7 @@ impl System { } fn handle_get_known_nodes(&self) -> SystemRpc { - let node_status = self.node_status.read().unwrap(); - let known_nodes = self - .fullmesh - .get_peer_list() - .iter() - .map(|n| KnownNodeInfo { - id: n.id.into(), - addr: n.addr, - is_up: n.is_up(), - last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), - status: node_status - .get(&n.id.into()) - .cloned() - .map(|(_, st)| st) - .unwrap_or(NodeStatus { - hostname: "?".to_string(), - replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), - }), - }) - .collect::<Vec<_>>(); + let known_nodes = self.get_known_nodes(); SystemRpc::ReturnKnownNodes(known_nodes) } @@ -452,7 +528,7 @@ impl System { let local_info = self.local_status.load(); if local_info.replication_factor < info.replication_factor { - error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs", + error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and will lead to data corruption. Shutting down for safety.", info.replication_factor, local_info.replication_factor); std::process::exit(1); @@ -477,9 +553,19 @@ impl System { } async fn handle_advertise_cluster_layout( - self: Arc<Self>, + self: &Arc<Self>, adv: &ClusterLayout, ) -> Result<SystemRpc, Error> { + if adv.replication_factor != self.replication_factor { + let msg = format!( + "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", + adv.replication_factor, + self.replication_factor + ); + error!("{}", msg); + return Err(Error::Message(msg)); + } + let update_ring = self.update_ring.lock().await; let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); @@ -505,7 +591,7 @@ impl System { SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) - .await; + .await?; Ok(()) }); self.background.spawn(self.clone().save_cluster_layout()); @@ -520,11 +606,12 @@ impl System { self.update_local_status(); let local_status: NodeStatus = self.local_status.load().as_ref().clone(); - self.rpc + let _ = self + .rpc .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH), ) .await; @@ -550,7 +637,7 @@ impl System { if not_configured || no_peers || bad_peers { info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); - let mut ping_list = self.bootstrap_peers.clone(); + let mut ping_list = resolve_peers(&self.bootstrap_peers).await; // Add peer list from list stored on disk if let Ok(peers) = self.persist_peer_list.load_async().await { @@ -648,7 +735,7 @@ impl System { &self.system_endpoint, peer, SystemRpc::PullClusterLayout, - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH), ) .await; if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { @@ -681,6 +768,25 @@ fn get_default_ip() -> Option<IpAddr> { .map(|a| a.ip()) } +async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { + let mut ret = vec![]; + + for peer in peers.iter() { + match parse_and_resolve_peer_addr_async(peer).await { + Some((pubkey, addrs)) => { + for ip in addrs { + ret.push((pubkey, ip)); + } + } + None => { + warn!("Unable to parse and/or resolve peer hostname {}", peer); + } + } + } + + ret +} + struct ConsulDiscoveryParam { consul_host: String, service_name: String, |