aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r--src/rpc/membership.rs215
1 files changed, 134 insertions, 81 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 4e9822fa..9fb24ad4 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -11,13 +11,13 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::get_consul_nodes;
@@ -26,7 +26,7 @@ use crate::rpc_client::*;
use crate::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
-const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
+const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
@@ -69,7 +69,8 @@ pub struct AdvertisedNode {
pub struct System {
pub id: UUID,
- metadata_dir: PathBuf,
+ persist_config: Persister<NetworkConfig>,
+ persist_status: Persister<Vec<AdvertisedNode>>,
rpc_local_port: u16,
state_info: StateInfo,
@@ -80,11 +81,16 @@ pub struct System {
pub(crate) status: watch::Receiver<Arc<Status>>,
pub ring: watch::Receiver<Arc<Ring>>,
- update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
+ update_lock: Mutex<Updaters>,
pub background: Arc<BackgroundRunner>,
}
+struct Updaters {
+ update_status: watch::Sender<Arc<Status>>,
+ update_ring: watch::Sender<Arc<Ring>>,
+}
+
#[derive(Debug, Clone)]
pub struct Status {
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
@@ -144,6 +150,25 @@ impl Status {
debug!("END --");
self.hash = blake2sum(nodes_txt.as_bytes());
}
+
+ fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
+ let mut mem = vec![];
+ for (node, status) in self.nodes.iter() {
+ let state_info = if *node == system.id {
+ system.state_info.clone()
+ } else {
+ status.state_info.clone()
+ };
+ mem.push(AdvertisedNode {
+ id: *node,
+ addr: status.addr,
+ is_up: status.is_up(),
+ last_seen: status.last_seen,
+ state_info,
+ });
+ }
+ mem
+ }
}
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
@@ -169,23 +194,6 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
}
-fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
- let mut path = metadata_dir.clone();
- path.push("network_config");
-
- let mut file = std::fs::OpenOptions::new()
- .read(true)
- .open(path.as_path())?;
-
- let mut net_config_bytes = vec![];
- file.read_to_end(&mut net_config_bytes)?;
-
- let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
- .expect("Unable to parse network configuration file (has version format changed?).");
-
- Ok(net_config)
-}
-
impl System {
pub fn new(
metadata_dir: PathBuf,
@@ -196,7 +204,10 @@ impl System {
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
info!("Node ID: {}", hex::encode(&id));
- let net_config = match read_network_config(&metadata_dir) {
+ let persist_config = Persister::new(&metadata_dir, "network_config");
+ let persist_status = Persister::new(&metadata_dir, "peer_info");
+
+ let net_config = match persist_config.load() {
Ok(x) => x,
Err(e) => {
info!(
@@ -206,6 +217,7 @@ impl System {
NetworkConfig::new()
}
};
+
let mut status = Status {
nodes: HashMap::new(),
hash: Hash::default(),
@@ -231,14 +243,18 @@ impl System {
let sys = Arc::new(System {
id,
- metadata_dir,
+ persist_config,
+ persist_status,
rpc_local_port: rpc_server.bind_addr.port(),
state_info,
rpc_http_client,
rpc_client,
status,
ring,
- update_lock: Mutex::new((update_status, update_ring)),
+ update_lock: Mutex::new(Updaters {
+ update_status,
+ update_ring,
+ }),
background,
});
sys.clone().register_handler(rpc_server, rpc_path);
@@ -272,14 +288,11 @@ impl System {
}
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
- let mut path = self.metadata_dir.clone();
- path.push("network_config");
-
let ring = self.ring.borrow().clone();
- let data = rmp_to_vec_all_named(&ring.config)?;
-
- let mut f = tokio::fs::File::create(path.as_path()).await?;
- f.write_all(&data[..]).await?;
+ self.persist_config
+ .save_async(&ring.config)
+ .await
+ .expect("Cannot save current cluster configuration");
Ok(())
}
@@ -308,26 +321,21 @@ impl System {
pub async fn bootstrap(
self: Arc<Self>,
- peers: &[SocketAddr],
+ peers: Vec<SocketAddr>,
consul_host: Option<String>,
consul_service_name: Option<String>,
) {
- let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
- self.clone().ping_nodes(bootstrap_peers).await;
+ let self2 = self.clone();
+ self.background
+ .spawn_worker(format!("discovery loop"), |stop_signal| {
+ self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
+ });
let self2 = self.clone();
self.background
.spawn_worker(format!("ping loop"), |stop_signal| {
self2.ping_loop(stop_signal)
});
-
- if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
- let self2 = self.clone();
- self.background
- .spawn_worker(format!("Consul loop"), |stop_signal| {
- self2.consul_loop(stop_signal, consul_host, consul_service_name)
- });
- }
}
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@@ -394,9 +402,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
- if let Err(e) = update_locked.0.send(Arc::new(status)) {
- error!("In ping_nodes: could not save status update ({})", e);
- }
+ self.update_status(&update_locked, status).await;
drop(update_locked);
if to_advertise.len() > 0 {
@@ -420,7 +426,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
- update_locked.0.send(Arc::new(status))?;
+ self.update_status(&update_locked, status).await;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@@ -436,23 +442,9 @@ impl System {
}
fn handle_pull_status(&self) -> Result<Message, Error> {
- let status = self.status.borrow().clone();
- let mut mem = vec![];
- for (node, status) in status.nodes.iter() {
- let state_info = if *node == self.id {
- self.state_info.clone()
- } else {
- status.state_info.clone()
- };
- mem.push(AdvertisedNode {
- id: *node,
- addr: status.addr,
- is_up: status.is_up(),
- last_seen: status.last_seen,
- state_info,
- });
- }
- Ok(Message::AdvertiseNodesUp(mem))
+ Ok(Message::AdvertiseNodesUp(
+ self.status.borrow().to_serializable_membership(self),
+ ))
}
fn handle_pull_config(&self) -> Result<Message, Error> {
@@ -502,7 +494,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
- update_lock.0.send(Arc::new(status))?;
+ self.update_status(&update_lock, status).await;
drop(update_lock);
if to_ping.len() > 0 {
@@ -522,7 +514,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
- update_lock.1.send(Arc::new(ring))?;
+ update_lock.update_ring.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@@ -537,7 +529,7 @@ impl System {
}
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
- loop {
+ while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
@@ -552,34 +544,64 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- _ = stop_signal.changed().fuse() => {
- if *stop_signal.borrow() {
- return;
- }
- }
+ _ = stop_signal.changed().fuse() => (),
}
}
}
- async fn consul_loop(
+ async fn discovery_loop(
self: Arc<Self>,
+ bootstrap_peers: Vec<SocketAddr>,
+ consul_host: Option<String>,
+ consul_service_name: Option<String>,
mut stop_signal: watch::Receiver<bool>,
- consul_host: String,
- consul_service_name: String,
) {
+ let consul_config = match (consul_host, consul_service_name) {
+ (Some(ch), Some(csn)) => Some((ch, csn)),
+ _ => None,
+ };
+
while !*stop_signal.borrow() {
- let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
+ let not_configured = self.ring.borrow().config.members.len() == 0;
+ let no_peers = self.status.borrow().nodes.len() < 3;
+ let bad_peers = self
+ .status
+ .borrow()
+ .nodes
+ .iter()
+ .filter(|(_, v)| v.is_up())
+ .count() != self.ring.borrow().config.members.len();
- match get_consul_nodes(&consul_host, &consul_service_name).await {
- Ok(mut node_list) => {
- let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
- self.clone().ping_nodes(ping_addrs).await;
+ if not_configured || no_peers || bad_peers {
+ info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
+
+ let mut ping_list = bootstrap_peers
+ .iter()
+ .map(|ip| (*ip, None))
+ .collect::<Vec<_>>();
+
+ match self.persist_status.load_async().await {
+ Ok(peers) => {
+ ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
+ }
+ _ => (),
}
- Err(e) => {
- warn!("Could not retrieve node list from Consul: {}", e);
+
+ if let Some((consul_host, consul_service_name)) = &consul_config {
+ match get_consul_nodes(consul_host, consul_service_name).await {
+ Ok(node_list) => {
+ ping_list.extend(node_list.iter().map(|a| (*a, None)));
+ }
+ Err(e) => {
+ warn!("Could not retrieve node list from Consul: {}", e);
+ }
+ }
}
+
+ self.clone().ping_nodes(ping_list).await;
}
+ let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
_ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => (),
@@ -611,4 +633,35 @@ impl System {
let _: Result<_, _> = self.handle_advertise_config(&config).await;
}
}
+
+ async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
+ if status.hash != self.status.borrow().hash {
+ let mut list = status.to_serializable_membership(&self);
+
+ // Combine with old peer list to make sure no peer is lost
+ match self.persist_status.load_async().await {
+ Ok(old_list) => {
+ for pp in old_list {
+ if !list.iter().any(|np| pp.id == np.id) {
+ list.push(pp);
+ }
+ }
+ }
+ _ => (),
+ }
+
+ if list.len() > 0 {
+ info!("Persisting new peer list ({} peers)", list.len());
+ self.persist_status
+ .save_async(&list)
+ .await
+ .expect("Unable to persist peer list");
+ }
+ }
+
+ updaters
+ .update_status
+ .send(Arc::new(status))
+ .expect("Could not update internal membership status");
+ }
}