diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/layout/manager.rs | 177 | ||||
-rw-r--r-- | src/rpc/layout/mod.rs | 2 | ||||
-rw-r--r-- | src/rpc/system.rs | 295 |
3 files changed, 284 insertions, 190 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs new file mode 100644 index 00000000..a8a77139 --- /dev/null +++ b/src/rpc/layout/manager.rs @@ -0,0 +1,177 @@ +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::watch; +use tokio::sync::Mutex; + +use netapp::endpoint::Endpoint; +use netapp::peering::fullmesh::FullMeshPeeringStrategy; +use netapp::NodeID; + +use garage_util::config::Config; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::persister::Persister; + +use super::*; +use crate::rpc_helper::*; +use crate::system::*; + +pub struct LayoutManager { + replication_factor: usize, + persist_cluster_layout: Persister<LayoutHistory>, + + pub layout_watch: watch::Receiver<Arc<LayoutHistory>>, + update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>, + + pub(crate) rpc_helper: RpcHelper, + system_endpoint: Arc<Endpoint<SystemRpc, System>>, +} + +impl LayoutManager { + pub fn new( + config: &Config, + node_id: NodeID, + system_endpoint: Arc<Endpoint<SystemRpc, System>>, + fullmesh: Arc<FullMeshPeeringStrategy>, + replication_factor: usize, + ) -> Result<Self, Error> { + let persist_cluster_layout: Persister<LayoutHistory> = + Persister::new(&config.metadata_dir, "cluster_layout"); + + let cluster_layout = match persist_cluster_layout.load() { + Ok(x) => { + if x.current().replication_factor != replication_factor { + return Err(Error::Message(format!( + "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", + x.current().replication_factor, + replication_factor + ))); + } + x + } + Err(e) => { + info!( + "No valid previous cluster layout stored ({}), starting fresh.", + e + ); + LayoutHistory::new(replication_factor) + } + }; + + let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); + + let rpc_helper = RpcHelper::new( + node_id.into(), + fullmesh, + layout_watch.clone(), + config.rpc_timeout_msec.map(Duration::from_millis), + ); + + Ok(Self { + replication_factor, + persist_cluster_layout, + layout_watch, + update_layout: Mutex::new(update_layout), + system_endpoint, + rpc_helper, + }) + } + + // ---- PUBLIC INTERFACE ---- + + pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> { + self.handle_advertise_cluster_layout(layout).await?; + Ok(()) + } + + pub fn history(&self) -> watch::Ref<Arc<LayoutHistory>> { + self.layout_watch.borrow() + } + + pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) { + let resp = self + .rpc_helper + .call( + &self.system_endpoint, + peer, + SystemRpc::PullClusterLayout, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await; + if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { + let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; + } + } + + // ---- INTERNALS --- + + /// Save network configuration to disc + async fn save_cluster_layout(&self) -> Result<(), Error> { + let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone(); + self.persist_cluster_layout + .save_async(&layout) + .await + .expect("Cannot save current cluster layout"); + Ok(()) + } + + // ---- RPC HANDLERS ---- + + pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { + let layout = self.layout_watch.borrow().as_ref().clone(); + SystemRpc::AdvertiseClusterLayout(layout) + } + + pub(crate) async fn handle_advertise_cluster_layout( + &self, + adv: &LayoutHistory, + ) -> Result<SystemRpc, Error> { + if adv.current().replication_factor != self.replication_factor { + let msg = format!( + "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", + adv.current().replication_factor, + self.replication_factor + ); + error!("{}", msg); + return Err(Error::Message(msg)); + } + + let update_layout = self.update_layout.lock().await; + // TODO: don't clone each time an AdvertiseClusterLayout is received + let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); + + let prev_layout_check = layout.check().is_ok(); + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + error!("New cluster layout is invalid, discarding."); + return Err(Error::Message( + "New cluster layout is invalid, discarding.".into(), + )); + } + + update_layout.send(Arc::new(layout.clone()))?; + drop(update_layout); + + /* TODO + tokio::spawn(async move { + if let Err(e) = system + .rpc_helper() + .broadcast( + &system.system_endpoint, + SystemRpc::AdvertiseClusterLayout(layout), + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } + }); + */ + + self.save_cluster_layout().await?; + } + + Ok(SystemRpc::Ok) + } +} diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 7c15988a..cd3764bc 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -3,6 +3,8 @@ mod history; mod schema; mod version; +pub mod manager; + // ---- re-exports ---- pub use history::*; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index a7433b68..a8e88425 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -14,7 +14,6 @@ use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::sign::ed25519; use tokio::select; use tokio::sync::watch; -use tokio::sync::Mutex; use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::message::*; @@ -34,6 +33,7 @@ use garage_util::time::*; use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; +use crate::layout::manager::LayoutManager; use crate::layout::*; use crate::replication_mode::*; use crate::rpc_helper::*; @@ -49,7 +49,7 @@ const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); pub const GARAGE_VERSION_TAG: u64 = 0x676172616765000A; // garage 0x000A /// RPC endpoint used for calls related to membership -pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; +pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc"; /// RPC messages related to membership #[derive(Debug, Serialize, Deserialize, Clone)] @@ -58,17 +58,17 @@ pub enum SystemRpc { Ok, /// Request to connect to a specific node (in <pubkey>@<host>:<port> format) Connect(String), - /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout - PullClusterLayout, /// Advertise Garage status. Answered with another AdvertiseStatus. /// Exchanged with every node on a regular basis. AdvertiseStatus(NodeStatus), - /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout - AdvertiseClusterLayout(LayoutHistory), /// Get known nodes states GetKnownNodes, /// Return known nodes ReturnKnownNodes(Vec<KnownNodeInfo>), + /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout + PullClusterLayout, + /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout + AdvertiseClusterLayout(LayoutHistory), } impl Rpc for SystemRpc { @@ -84,7 +84,6 @@ pub struct System { /// The id of this node pub id: Uuid, - persist_cluster_layout: Persister<LayoutHistory>, persist_peer_list: Persister<PeerList>, local_status: ArcSwap<NodeStatus>, @@ -92,9 +91,8 @@ pub struct System { pub netapp: Arc<NetApp>, fullmesh: Arc<FullMeshPeeringStrategy>, - pub rpc: RpcHelper, - system_endpoint: Arc<Endpoint<SystemRpc, System>>, + pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>, rpc_listen_addr: SocketAddr, #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] @@ -106,15 +104,13 @@ pub struct System { #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option<KubernetesDiscoveryConfig>, + pub layout_manager: LayoutManager, + metrics: SystemMetrics, replication_mode: ReplicationMode, replication_factor: usize, - /// The layout - pub layout_watch: watch::Receiver<Arc<LayoutHistory>>, - update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>, - /// Path to metadata directory pub metadata_dir: PathBuf, /// Path to data directory @@ -128,8 +124,11 @@ pub struct NodeStatus { /// Replication factor configured on the node pub replication_factor: usize, + /// Cluster layout version pub cluster_layout_version: u64, + /// Hash of cluster layout update trackers + // (TODO) pub cluster_layout_trackers_hash: Hash, /// Hash of cluster layout staging data pub cluster_layout_staging_hash: Hash, @@ -247,8 +246,7 @@ impl System { replication_mode: ReplicationMode, config: &Config, ) -> Result<Arc<Self>, Error> { - let replication_factor = replication_mode.replication_factor(); - + // ---- setup netapp RPC protocol ---- let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!( @@ -256,81 +254,40 @@ impl System { hex::encode(&node_key.public_key()[..8]) ); - let persist_cluster_layout: Persister<LayoutHistory> = - Persister::new(&config.metadata_dir, "cluster_layout"); - let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); - - let cluster_layout = match persist_cluster_layout.load() { - Ok(x) => { - if x.current().replication_factor != replication_factor { - return Err(Error::Message(format!( - "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", - x.current().replication_factor, - replication_factor - ))); - } - x - } - Err(e) => { - info!( - "No valid previous cluster layout stored ({}), starting fresh.", - e - ); - LayoutHistory::new(replication_factor) - } - }; - - let metrics = SystemMetrics::new(replication_factor); - - let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); - local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); + let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); - let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); - - let rpc_public_addr = match &config.rpc_public_addr { - Some(a_str) => { - use std::net::ToSocketAddrs; - match a_str.to_socket_addrs() { - Err(e) => { - error!( - "Cannot resolve rpc_public_addr {} from config file: {}.", - a_str, e - ); - None - } - Ok(a) => { - let a = a.collect::<Vec<_>>(); - if a.is_empty() { - error!("rpc_public_addr {} resolve to no known IP address", a_str); - } - if a.len() > 1 { - warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); - } - a.into_iter().next() - } - } - } - None => { - let addr = - get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); - if let Some(a) = addr { - warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a); - } - addr - } - }; + // ---- setup netapp public listener and full mesh peering strategy ---- + let rpc_public_addr = get_rpc_public_addr(config); if rpc_public_addr.is_none() { warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication."); } - let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); if let Some(ping_timeout) = config.rpc_ping_timeout_msec { fullmesh.set_ping_timeout_millis(ping_timeout); } - let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); + let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); + // ---- setup cluster layout and layout manager ---- + let replication_factor = replication_mode.replication_factor(); + + let layout_manager = LayoutManager::new( + config, + netapp.id, + system_endpoint.clone(), + fullmesh.clone(), + replication_factor, + )?; + + // ---- set up metrics and status exchange ---- + let metrics = SystemMetrics::new(replication_factor); + + let mut local_status = NodeStatus::initial(replication_factor, &layout_manager.history()); + local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); + + // ---- if enabled, set up additionnal peer discovery methods ---- #[cfg(feature = "consul-discovery")] let consul_discovery = match &config.consul_discovery { Some(cfg) => Some( @@ -349,20 +306,14 @@ impl System { warn!("Kubernetes discovery is not enabled in this build."); } + // ---- done ---- let sys = Arc::new(System { id: netapp.id.into(), - persist_cluster_layout, persist_peer_list, local_status: ArcSwap::new(Arc::new(local_status)), node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), fullmesh: fullmesh.clone(), - rpc: RpcHelper::new( - netapp.id.into(), - fullmesh, - layout_watch.clone(), - config.rpc_timeout_msec.map(Duration::from_millis), - ), system_endpoint, replication_mode, replication_factor, @@ -374,10 +325,9 @@ impl System { consul_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), + layout_manager, metrics, - layout_watch, - update_layout: Mutex::new(update_layout), metadata_dir: config.metadata_dir.clone(), data_dir: config.data_dir.clone(), }); @@ -397,6 +347,20 @@ impl System { ); } + // ---- Public utilities / accessors ---- + + pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> { + self.layout_manager.history() + } + + pub fn layout_watch(&self) -> watch::Receiver<Arc<LayoutHistory>> { + self.layout_manager.layout_watch.clone() + } + + pub fn rpc_helper(&self) -> &RpcHelper { + &self.layout_manager.rpc_helper + } + // ---- Administrative operations (directly available and // also available through RPC) ---- @@ -423,18 +387,6 @@ impl System { known_nodes } - pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> { - self.layout_watch.borrow() - } - - pub async fn update_cluster_layout( - self: &Arc<Self>, - layout: &LayoutHistory, - ) -> Result<(), Error> { - self.handle_advertise_cluster_layout(layout).await?; - Ok(()) - } - pub async fn connect(&self, node: &str) -> Result<(), Error> { let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node) .await @@ -464,7 +416,7 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let layout: Arc<_> = self.layout_watch.borrow().clone(); + let layout: Arc<_> = self.cluster_layout().clone(); let quorum = self.replication_mode.write_quorum(); let replication_factor = self.replication_factor; @@ -581,20 +533,10 @@ impl System { } } - /// Save network configuration to disc - async fn save_cluster_layout(&self) -> Result<(), Error> { - let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone(); - self.persist_cluster_layout - .save_async(&layout) - .await - .expect("Cannot save current cluster layout"); - Ok(()) - } - fn update_local_status(&self) { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); - let layout = self.layout_watch.borrow(); + let layout = self.cluster_layout(); new_si.cluster_layout_version = layout.current().version; new_si.cluster_layout_staging_hash = layout.staging_hash; @@ -610,11 +552,6 @@ impl System { Ok(SystemRpc::Ok) } - fn handle_pull_cluster_layout(&self) -> SystemRpc { - let layout = self.layout_watch.borrow().as_ref().clone(); - SystemRpc::AdvertiseClusterLayout(layout) - } - fn handle_get_known_nodes(&self) -> SystemRpc { let known_nodes = self.get_known_nodes(); SystemRpc::ReturnKnownNodes(known_nodes) @@ -637,7 +574,10 @@ impl System { if info.cluster_layout_version > local_info.cluster_layout_version || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { - tokio::spawn(self.clone().pull_cluster_layout(from)); + tokio::spawn({ + let system = self.clone(); + async move { system.layout_manager.pull_cluster_layout(from).await } + }); } self.node_status @@ -648,57 +588,6 @@ impl System { Ok(SystemRpc::Ok) } - async fn handle_advertise_cluster_layout( - self: &Arc<Self>, - adv: &LayoutHistory, - ) -> Result<SystemRpc, Error> { - if adv.current().replication_factor != self.replication_factor { - let msg = format!( - "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", - adv.current().replication_factor, - self.replication_factor - ); - error!("{}", msg); - return Err(Error::Message(msg)); - } - - let update_layout = self.update_layout.lock().await; - // TODO: don't clone each time an AdvertiseClusterLayout is received - let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); - - let prev_layout_check = layout.check().is_ok(); - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - error!("New cluster layout is invalid, discarding."); - return Err(Error::Message( - "New cluster layout is invalid, discarding.".into(), - )); - } - - update_layout.send(Arc::new(layout.clone()))?; - drop(update_layout); - - let self2 = self.clone(); - tokio::spawn(async move { - if let Err(e) = self2 - .rpc - .broadcast( - &self2.system_endpoint, - SystemRpc::AdvertiseClusterLayout(layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); - } - }); - - self.save_cluster_layout().await?; - } - - Ok(SystemRpc::Ok) - } - async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL; @@ -706,7 +595,7 @@ impl System { self.update_local_status(); let local_status: NodeStatus = self.local_status.load().as_ref().clone(); let _ = self - .rpc + .rpc_helper() .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), @@ -724,9 +613,9 @@ impl System { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { - let not_configured = self.layout_watch.borrow().check().is_err(); + let not_configured = self.cluster_layout().check().is_err(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; - let expected_n_nodes = self.layout_watch.borrow().current().num_nodes(); + let expected_n_nodes = self.cluster_layout().current().num_nodes(); let bad_peers = self .fullmesh .get_peer_list() @@ -831,34 +720,26 @@ impl System { .save_async(&PeerList(peer_list)) .await } - - async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) { - let resp = self - .rpc - .call( - &self.system_endpoint, - peer, - SystemRpc::PullClusterLayout, - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await; - if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { - let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; - } - } } #[async_trait] impl EndpointHandler<SystemRpc> for System { async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> { match msg { + // ---- system functions -> System ---- SystemRpc::Connect(node) => self.handle_connect(node).await, - SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()), SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await, + SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), + + // ---- layout functions -> LayoutManager ---- + SystemRpc::PullClusterLayout => Ok(self.layout_manager.handle_pull_cluster_layout()), SystemRpc::AdvertiseClusterLayout(adv) => { - self.clone().handle_advertise_cluster_layout(adv).await + self.layout_manager + .handle_advertise_cluster_layout(adv) + .await } - SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), + + // ---- other -> Error ---- m => Err(Error::unexpected_rpc_message(m)), } } @@ -962,6 +843,40 @@ fn get_default_ip() -> Option<IpAddr> { .map(|a| a.ip()) } +fn get_rpc_public_addr(config: &Config) -> Option<SocketAddr> { + match &config.rpc_public_addr { + Some(a_str) => { + use std::net::ToSocketAddrs; + match a_str.to_socket_addrs() { + Err(e) => { + error!( + "Cannot resolve rpc_public_addr {} from config file: {}.", + a_str, e + ); + None + } + Ok(a) => { + let a = a.collect::<Vec<_>>(); + if a.is_empty() { + error!("rpc_public_addr {} resolve to no known IP address", a_str); + } + if a.len() > 1 { + warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); + } + a.into_iter().next() + } + } + } + None => { + let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); + if let Some(a) = addr { + warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a); + } + addr + } + } +} + async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { let mut ret = vec![]; |