diff options
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r-- | src/rpc/membership.rs | 215 |
1 files changed, 134 insertions, 81 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 4e9822fa..9fb24ad4 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -11,13 +11,13 @@ use futures::future::join_all; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncWriteExt; use tokio::sync::watch; use tokio::sync::Mutex; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use garage_util::persister::Persister; use garage_util::time::*; use crate::consul::get_consul_nodes; @@ -26,7 +26,7 @@ use crate::rpc_client::*; use crate::rpc_server::*; const PING_INTERVAL: Duration = Duration::from_secs(10); -const CONSUL_INTERVAL: Duration = Duration::from_secs(60); +const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; @@ -69,7 +69,8 @@ pub struct AdvertisedNode { pub struct System { pub id: UUID, - metadata_dir: PathBuf, + persist_config: Persister<NetworkConfig>, + persist_status: Persister<Vec<AdvertisedNode>>, rpc_local_port: u16, state_info: StateInfo, @@ -80,11 +81,16 @@ pub struct System { pub(crate) status: watch::Receiver<Arc<Status>>, pub ring: watch::Receiver<Arc<Ring>>, - update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>, + update_lock: Mutex<Updaters>, pub background: Arc<BackgroundRunner>, } +struct Updaters { + update_status: watch::Sender<Arc<Status>>, + update_ring: watch::Sender<Arc<Ring>>, +} + #[derive(Debug, Clone)] pub struct Status { pub nodes: HashMap<UUID, Arc<StatusEntry>>, @@ -144,6 +150,25 @@ impl Status { debug!("END --"); self.hash = blake2sum(nodes_txt.as_bytes()); } + + fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> { + let mut mem = vec![]; + for (node, status) in self.nodes.iter() { + let state_info = if *node == system.id { + system.state_info.clone() + } else { + status.state_info.clone() + }; + mem.push(AdvertisedNode { + id: *node, + addr: status.addr, + is_up: status.is_up(), + last_seen: status.last_seen, + state_info, + }); + } + mem + } } fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { @@ -169,23 +194,6 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { } } -fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> { - let mut path = metadata_dir.clone(); - path.push("network_config"); - - let mut file = std::fs::OpenOptions::new() - .read(true) - .open(path.as_path())?; - - let mut net_config_bytes = vec![]; - file.read_to_end(&mut net_config_bytes)?; - - let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..]) - .expect("Unable to parse network configuration file (has version format changed?)."); - - Ok(net_config) -} - impl System { pub fn new( metadata_dir: PathBuf, @@ -196,7 +204,10 @@ impl System { let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID"); info!("Node ID: {}", hex::encode(&id)); - let net_config = match read_network_config(&metadata_dir) { + let persist_config = Persister::new(&metadata_dir, "network_config"); + let persist_status = Persister::new(&metadata_dir, "peer_info"); + + let net_config = match persist_config.load() { Ok(x) => x, Err(e) => { info!( @@ -206,6 +217,7 @@ impl System { NetworkConfig::new() } }; + let mut status = Status { nodes: HashMap::new(), hash: Hash::default(), @@ -231,14 +243,18 @@ impl System { let sys = Arc::new(System { id, - metadata_dir, + persist_config, + persist_status, rpc_local_port: rpc_server.bind_addr.port(), state_info, rpc_http_client, rpc_client, status, ring, - update_lock: Mutex::new((update_status, update_ring)), + update_lock: Mutex::new(Updaters { + update_status, + update_ring, + }), background, }); sys.clone().register_handler(rpc_server, rpc_path); @@ -272,14 +288,11 @@ impl System { } async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { - let mut path = self.metadata_dir.clone(); - path.push("network_config"); - let ring = self.ring.borrow().clone(); - let data = rmp_to_vec_all_named(&ring.config)?; - - let mut f = tokio::fs::File::create(path.as_path()).await?; - f.write_all(&data[..]).await?; + self.persist_config + .save_async(&ring.config) + .await + .expect("Cannot save current cluster configuration"); Ok(()) } @@ -308,26 +321,21 @@ impl System { pub async fn bootstrap( self: Arc<Self>, - peers: &[SocketAddr], + peers: Vec<SocketAddr>, consul_host: Option<String>, consul_service_name: Option<String>, ) { - let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>(); - self.clone().ping_nodes(bootstrap_peers).await; + let self2 = self.clone(); + self.background + .spawn_worker(format!("discovery loop"), |stop_signal| { + self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal) + }); let self2 = self.clone(); self.background .spawn_worker(format!("ping loop"), |stop_signal| { self2.ping_loop(stop_signal) }); - - if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { - let self2 = self.clone(); - self.background - .spawn_worker(format!("Consul loop"), |stop_signal| { - self2.consul_loop(stop_signal, consul_host, consul_service_name) - }); - } } async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) { @@ -394,9 +402,7 @@ impl System { if has_changes { status.recalculate_hash(); } - if let Err(e) = update_locked.0.send(Arc::new(status)) { - error!("In ping_nodes: could not save status update ({})", e); - } + self.update_status(&update_locked, status).await; drop(update_locked); if to_advertise.len() > 0 { @@ -420,7 +426,7 @@ impl System { let status_hash = status.hash; let config_version = self.ring.borrow().config.version; - update_locked.0.send(Arc::new(status))?; + self.update_status(&update_locked, status).await; drop(update_locked); if is_new || status_hash != ping.status_hash { @@ -436,23 +442,9 @@ impl System { } fn handle_pull_status(&self) -> Result<Message, Error> { - let status = self.status.borrow().clone(); - let mut mem = vec![]; - for (node, status) in status.nodes.iter() { - let state_info = if *node == self.id { - self.state_info.clone() - } else { - status.state_info.clone() - }; - mem.push(AdvertisedNode { - id: *node, - addr: status.addr, - is_up: status.is_up(), - last_seen: status.last_seen, - state_info, - }); - } - Ok(Message::AdvertiseNodesUp(mem)) + Ok(Message::AdvertiseNodesUp( + self.status.borrow().to_serializable_membership(self), + )) } fn handle_pull_config(&self) -> Result<Message, Error> { @@ -502,7 +494,7 @@ impl System { if has_changed { status.recalculate_hash(); } - update_lock.0.send(Arc::new(status))?; + self.update_status(&update_lock, status).await; drop(update_lock); if to_ping.len() > 0 { @@ -522,7 +514,7 @@ impl System { if adv.version > ring.config.version { let ring = Ring::new(adv.clone()); - update_lock.1.send(Arc::new(ring))?; + update_lock.update_ring.send(Arc::new(ring))?; drop(update_lock); self.background.spawn_cancellable( @@ -537,7 +529,7 @@ impl System { } async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) { - loop { + while !*stop_signal.borrow() { let restart_at = tokio::time::sleep(PING_INTERVAL); let status = self.status.borrow().clone(); @@ -552,34 +544,64 @@ impl System { select! { _ = restart_at.fuse() => (), - _ = stop_signal.changed().fuse() => { - if *stop_signal.borrow() { - return; - } - } + _ = stop_signal.changed().fuse() => (), } } } - async fn consul_loop( + async fn discovery_loop( self: Arc<Self>, + bootstrap_peers: Vec<SocketAddr>, + consul_host: Option<String>, + consul_service_name: Option<String>, mut stop_signal: watch::Receiver<bool>, - consul_host: String, - consul_service_name: String, ) { + let consul_config = match (consul_host, consul_service_name) { + (Some(ch), Some(csn)) => Some((ch, csn)), + _ => None, + }; + while !*stop_signal.borrow() { - let restart_at = tokio::time::sleep(CONSUL_INTERVAL); + let not_configured = self.ring.borrow().config.members.len() == 0; + let no_peers = self.status.borrow().nodes.len() < 3; + let bad_peers = self + .status + .borrow() + .nodes + .iter() + .filter(|(_, v)| v.is_up()) + .count() != self.ring.borrow().config.members.len(); - match get_consul_nodes(&consul_host, &consul_service_name).await { - Ok(mut node_list) => { - let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>(); - self.clone().ping_nodes(ping_addrs).await; + if not_configured || no_peers || bad_peers { + info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); + + let mut ping_list = bootstrap_peers + .iter() + .map(|ip| (*ip, None)) + .collect::<Vec<_>>(); + + match self.persist_status.load_async().await { + Ok(peers) => { + ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); + } + _ => (), } - Err(e) => { - warn!("Could not retrieve node list from Consul: {}", e); + + if let Some((consul_host, consul_service_name)) = &consul_config { + match get_consul_nodes(consul_host, consul_service_name).await { + Ok(node_list) => { + ping_list.extend(node_list.iter().map(|a| (*a, None))); + } + Err(e) => { + warn!("Could not retrieve node list from Consul: {}", e); + } + } } + + self.clone().ping_nodes(ping_list).await; } + let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { _ = restart_at.fuse() => (), _ = stop_signal.changed().fuse() => (), @@ -611,4 +633,35 @@ impl System { let _: Result<_, _> = self.handle_advertise_config(&config).await; } } + + async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) { + if status.hash != self.status.borrow().hash { + let mut list = status.to_serializable_membership(&self); + + // Combine with old peer list to make sure no peer is lost + match self.persist_status.load_async().await { + Ok(old_list) => { + for pp in old_list { + if !list.iter().any(|np| pp.id == np.id) { + list.push(pp); + } + } + } + _ => (), + } + + if list.len() > 0 { + info!("Persisting new peer list ({} peers)", list.len()); + self.persist_status + .save_async(&list) + .await + .expect("Unable to persist peer list"); + } + } + + updaters + .update_status + .send(Arc::new(status)) + .expect("Could not update internal membership status"); + } } |