aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/membership.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-14 11:50:12 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-22 15:55:18 +0200
commit4067797d0142ee7860aff8da95d65820d6cc0889 (patch)
treea1c91ab5043c556bc7b369f6c447686fa782a64d /src/rpc/membership.rs
parentdc017a0cab40cb2f33a01b420bb1b04038abb875 (diff)
downloadgarage-4067797d0142ee7860aff8da95d65820d6cc0889.tar.gz
garage-4067797d0142ee7860aff8da95d65820d6cc0889.zip
First port of Garage to Netapp
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r--src/rpc/membership.rs722
1 files changed, 0 insertions, 722 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
deleted file mode 100644
index a77eeed3..00000000
--- a/src/rpc/membership.rs
+++ /dev/null
@@ -1,722 +0,0 @@
-//! Module containing structs related to membership management
-use std::collections::HashMap;
-use std::fmt::Write as FmtWrite;
-use std::io::{Read, Write};
-use std::net::{IpAddr, SocketAddr};
-use std::path::{Path, PathBuf};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::time::Duration;
-
-use futures::future::join_all;
-use futures::select;
-use futures_util::future::*;
-use serde::{Deserialize, Serialize};
-use tokio::sync::watch;
-use tokio::sync::Mutex;
-
-use garage_util::background::BackgroundRunner;
-use garage_util::data::*;
-use garage_util::error::Error;
-use garage_util::persister::Persister;
-use garage_util::time::*;
-
-use crate::consul::get_consul_nodes;
-use crate::ring::*;
-use crate::rpc_client::*;
-use crate::rpc_server::*;
-
-const PING_INTERVAL: Duration = Duration::from_secs(10);
-const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
-const PING_TIMEOUT: Duration = Duration::from_secs(2);
-const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
-
-/// RPC endpoint used for calls related to membership
-pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
-
-/// RPC messages related to membership
-#[derive(Debug, Serialize, Deserialize)]
-pub enum Message {
- /// Response to successfull advertisements
- Ok,
- /// Message sent to detect other nodes status
- Ping(PingMessage),
- /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
- PullStatus,
- /// Ask other node its config. Answered with AdvertiseConfig
- PullConfig,
- /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
- AdvertiseNodesUp(Vec<AdvertisedNode>),
- /// Advertisement of nodes config. Sent spontanously or in response to PullConfig
- AdvertiseConfig(NetworkConfig),
-}
-
-impl RpcMessage for Message {}
-
-/// A ping, containing informations about status and config
-#[derive(Debug, Serialize, Deserialize)]
-pub struct PingMessage {
- id: Uuid,
- rpc_port: u16,
-
- status_hash: Hash,
- config_version: u64,
-
- state_info: StateInfo,
-}
-
-/// A node advertisement
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct AdvertisedNode {
- /// Id of the node this advertisement relates to
- pub id: Uuid,
- /// IP and port of the node
- pub addr: SocketAddr,
-
- /// Is the node considered up
- pub is_up: bool,
- /// When was the node last seen up, in milliseconds since UNIX epoch
- pub last_seen: u64,
-
- pub state_info: StateInfo,
-}
-
-/// This node's membership manager
-pub struct System {
- /// The id of this node
- pub id: Uuid,
-
- persist_config: Persister<NetworkConfig>,
- persist_status: Persister<Vec<AdvertisedNode>>,
- rpc_local_port: u16,
-
- state_info: StateInfo,
-
- rpc_http_client: Arc<RpcHttpClient>,
- rpc_client: Arc<RpcClient<Message>>,
-
- replication_factor: usize,
- pub(crate) status: watch::Receiver<Arc<Status>>,
- /// The ring
- pub ring: watch::Receiver<Arc<Ring>>,
-
- update_lock: Mutex<Updaters>,
-
- /// The job runner of this node
- pub background: Arc<BackgroundRunner>,
-}
-
-struct Updaters {
- update_status: watch::Sender<Arc<Status>>,
- update_ring: watch::Sender<Arc<Ring>>,
-}
-
-/// The status of each nodes, viewed by this node
-#[derive(Debug, Clone)]
-pub struct Status {
- /// Mapping of each node id to its known status
- pub nodes: HashMap<Uuid, Arc<StatusEntry>>,
- /// Hash of `nodes`, used to detect when nodes have different views of the cluster
- pub hash: Hash,
-}
-
-/// The status of a single node
-#[derive(Debug)]
-pub struct StatusEntry {
- /// The IP and port used to connect to this node
- pub addr: SocketAddr,
- /// Last time this node was seen
- pub last_seen: u64,
- /// Number of consecutive pings sent without reply to this node
- pub num_failures: AtomicUsize,
- pub state_info: StateInfo,
-}
-
-impl StatusEntry {
- /// is the node associated to this entry considered up
- pub fn is_up(&self) -> bool {
- self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
- }
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct StateInfo {
- /// Hostname of the node
- pub hostname: String,
- /// Replication factor configured on the node
- pub replication_factor: Option<usize>, // TODO Option is just for retrocompatibility. It should become a simple usize at some point
-}
-
-impl Status {
- fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
- let addr = SocketAddr::new(ip, info.rpc_port);
- let old_status = self.nodes.insert(
- info.id,
- Arc::new(StatusEntry {
- addr,
- last_seen: now_msec(),
- num_failures: AtomicUsize::from(0),
- state_info: info.state_info.clone(),
- }),
- );
- match old_status {
- None => {
- info!("Newly pingable node: {}", hex::encode(&info.id));
- true
- }
- Some(x) => x.addr != addr,
- }
- }
-
- fn recalculate_hash(&mut self) {
- let mut nodes = self.nodes.iter().collect::<Vec<_>>();
- nodes.sort_unstable_by_key(|(id, _status)| *id);
-
- let mut nodes_txt = String::new();
- debug!("Current set of pingable nodes: --");
- for (id, status) in nodes {
- debug!("{} {}", hex::encode(&id), status.addr);
- writeln!(&mut nodes_txt, "{} {}", hex::encode(&id), status.addr).unwrap();
- }
- debug!("END --");
- self.hash = blake2sum(nodes_txt.as_bytes());
- }
-
- fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
- let mut mem = vec![];
- for (node, status) in self.nodes.iter() {
- let state_info = if *node == system.id {
- system.state_info.clone()
- } else {
- status.state_info.clone()
- };
- mem.push(AdvertisedNode {
- id: *node,
- addr: status.addr,
- is_up: status.is_up(),
- last_seen: status.last_seen,
- state_info,
- });
- }
- mem
- }
-}
-
-fn gen_node_id(metadata_dir: &Path) -> Result<Uuid, Error> {
- let mut id_file = metadata_dir.to_path_buf();
- id_file.push("node_id");
- if id_file.as_path().exists() {
- let mut f = std::fs::File::open(id_file.as_path())?;
- let mut d = vec![];
- f.read_to_end(&mut d)?;
- if d.len() != 32 {
- return Err(Error::Message("Corrupt node_id file".to_string()));
- }
-
- let mut id = [0u8; 32];
- id.copy_from_slice(&d[..]);
- Ok(id.into())
- } else {
- let id = gen_uuid();
-
- let mut f = std::fs::File::create(id_file.as_path())?;
- f.write_all(id.as_slice())?;
- Ok(id)
- }
-}
-
-impl System {
- /// Create this node's membership manager
- pub fn new(
- metadata_dir: PathBuf,
- rpc_http_client: Arc<RpcHttpClient>,
- background: Arc<BackgroundRunner>,
- rpc_server: &mut RpcServer,
- replication_factor: usize,
- ) -> Arc<Self> {
- let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
- info!("Node ID: {}", hex::encode(&id));
-
- let persist_config = Persister::new(&metadata_dir, "network_config");
- let persist_status = Persister::new(&metadata_dir, "peer_info");
-
- let net_config = match persist_config.load() {
- Ok(x) => x,
- Err(e) => {
- match Persister::<garage_rpc_021::ring::NetworkConfig>::new(
- &metadata_dir,
- "network_config",
- )
- .load()
- {
- Ok(old_config) => NetworkConfig::migrate_from_021(old_config),
- Err(e2) => {
- info!(
- "No valid previous network configuration stored ({}, {}), starting fresh.",
- e, e2
- );
- NetworkConfig::new()
- }
- }
- }
- };
-
- let mut status = Status {
- nodes: HashMap::new(),
- hash: Hash::default(),
- };
- status.recalculate_hash();
- let (update_status, status) = watch::channel(Arc::new(status));
-
- let state_info = StateInfo {
- hostname: gethostname::gethostname()
- .into_string()
- .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
- replication_factor: Some(replication_factor),
- };
-
- let ring = Ring::new(net_config, replication_factor);
- let (update_ring, ring) = watch::channel(Arc::new(ring));
-
- let rpc_path = MEMBERSHIP_RPC_PATH.to_string();
- let rpc_client = RpcClient::new(
- RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.clone()),
- background.clone(),
- status.clone(),
- );
-
- let sys = Arc::new(System {
- id,
- persist_config,
- persist_status,
- rpc_local_port: rpc_server.bind_addr.port(),
- state_info,
- rpc_http_client,
- rpc_client,
- replication_factor,
- status,
- ring,
- update_lock: Mutex::new(Updaters {
- update_status,
- update_ring,
- }),
- background,
- });
- sys.clone().register_handler(rpc_server, rpc_path);
- sys
- }
-
- fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- rpc_server.add_handler::<Message, _, _>(path, move |msg, addr| {
- let self2 = self.clone();
- async move {
- match msg {
- Message::Ping(ping) => self2.handle_ping(&addr, &ping).await,
-
- Message::PullStatus => Ok(self2.handle_pull_status()),
- Message::PullConfig => Ok(self2.handle_pull_config()),
- Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await,
- Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await,
-
- _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
- }
- }
- });
- }
-
- /// Get an RPC client
- pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
- RpcClient::new(
- RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
- self.background.clone(),
- self.status.clone(),
- )
- }
-
- /// Save network configuration to disc
- async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
- let ring = self.ring.borrow().clone();
- self.persist_config
- .save_async(&ring.config)
- .await
- .expect("Cannot save current cluster configuration");
- Ok(())
- }
-
- fn make_ping(&self) -> Message {
- let status = self.status.borrow().clone();
- let ring = self.ring.borrow().clone();
- Message::Ping(PingMessage {
- id: self.id,
- rpc_port: self.rpc_local_port,
- status_hash: status.hash,
- config_version: ring.config.version,
- state_info: self.state_info.clone(),
- })
- }
-
- async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
- let status = self.status.borrow().clone();
- let to = status
- .nodes
- .keys()
- .filter(|x| **x != self.id)
- .cloned()
- .collect::<Vec<_>>();
- self.rpc_client.call_many(&to[..], msg, timeout).await;
- }
-
- /// Perform bootstraping, starting the ping loop
- pub async fn bootstrap(
- self: Arc<Self>,
- peers: Vec<SocketAddr>,
- consul_host: Option<String>,
- consul_service_name: Option<String>,
- ) {
- let self2 = self.clone();
- self.background
- .spawn_worker("discovery loop".to_string(), |stop_signal| {
- self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
- });
-
- let self2 = self.clone();
- self.background
- .spawn_worker("ping loop".to_string(), |stop_signal| {
- self2.ping_loop(stop_signal)
- });
- }
-
- async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<Uuid>)>) {
- let ping_msg = self.make_ping();
- let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
- let sys = self.clone();
- let ping_msg_ref = &ping_msg;
- async move {
- (
- id_option,
- addr,
- sys.rpc_client
- .by_addr()
- .call(&addr, ping_msg_ref, PING_TIMEOUT)
- .await,
- )
- }
- }))
- .await;
-
- let update_locked = self.update_lock.lock().await;
- let mut status: Status = self.status.borrow().as_ref().clone();
- let ring = self.ring.borrow().clone();
-
- let mut has_changes = false;
- let mut to_advertise = vec![];
-
- for (id_option, addr, ping_resp) in ping_resps {
- if let Ok(Ok(Message::Ping(info))) = ping_resp {
- let is_new = status.handle_ping(addr.ip(), &info);
- if is_new {
- has_changes = true;
- to_advertise.push(AdvertisedNode {
- id: info.id,
- addr: *addr,
- is_up: true,
- last_seen: now_msec(),
- state_info: info.state_info.clone(),
- });
- }
- if is_new || status.hash != info.status_hash {
- self.background
- .spawn_cancellable(self.clone().pull_status(info.id).map(Ok));
- }
- if is_new || ring.config.version < info.config_version {
- self.background
- .spawn_cancellable(self.clone().pull_config(info.id).map(Ok));
- }
- } else if let Some(id) = id_option {
- if let Some(st) = status.nodes.get_mut(id) {
- // we need to increment failure counter as call was done using by_addr so the
- // counter was not auto-incremented
- st.num_failures.fetch_add(1, Ordering::SeqCst);
- if !st.is_up() {
- warn!("Node {:?} seems to be down.", id);
- if !ring.config.members.contains_key(id) {
- info!("Removing node {:?} from status (not in config and not responding to pings anymore)", id);
- status.nodes.remove(&id);
- has_changes = true;
- }
- }
- }
- }
- }
- if has_changes {
- status.recalculate_hash();
- }
- self.update_status(&update_locked, status).await;
- drop(update_locked);
-
- if !to_advertise.is_empty() {
- self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT)
- .await;
- }
- }
-
- async fn handle_ping(
- self: Arc<Self>,
- from: &SocketAddr,
- ping: &PingMessage,
- ) -> Result<Message, Error> {
- let update_locked = self.update_lock.lock().await;
- let mut status: Status = self.status.borrow().as_ref().clone();
-
- let is_new = status.handle_ping(from.ip(), ping);
- if is_new {
- status.recalculate_hash();
- }
- let status_hash = status.hash;
- let config_version = self.ring.borrow().config.version;
-
- self.update_status(&update_locked, status).await;
- drop(update_locked);
-
- if is_new || status_hash != ping.status_hash {
- self.background
- .spawn_cancellable(self.clone().pull_status(ping.id).map(Ok));
- }
- if is_new || config_version < ping.config_version {
- self.background
- .spawn_cancellable(self.clone().pull_config(ping.id).map(Ok));
- }
-
- Ok(self.make_ping())
- }
-
- fn handle_pull_status(&self) -> Message {
- Message::AdvertiseNodesUp(self.status.borrow().to_serializable_membership(self))
- }
-
- fn handle_pull_config(&self) -> Message {
- let ring = self.ring.borrow().clone();
- Message::AdvertiseConfig(ring.config.clone())
- }
-
- async fn handle_advertise_nodes_up(
- self: Arc<Self>,
- adv: &[AdvertisedNode],
- ) -> Result<Message, Error> {
- let mut to_ping = vec![];
-
- let update_lock = self.update_lock.lock().await;
- let mut status: Status = self.status.borrow().as_ref().clone();
- let mut has_changed = false;
- let mut max_replication_factor = 0;
-
- for node in adv.iter() {
- if node.id == self.id {
- // learn our own ip address
- let self_addr = SocketAddr::new(node.addr.ip(), self.rpc_local_port);
- let old_self = status.nodes.insert(
- node.id,
- Arc::new(StatusEntry {
- addr: self_addr,
- last_seen: now_msec(),
- num_failures: AtomicUsize::from(0),
- state_info: self.state_info.clone(),
- }),
- );
- has_changed = match old_self {
- None => true,
- Some(x) => x.addr != self_addr,
- };
- } else {
- let ping_them = match status.nodes.get(&node.id) {
- // Case 1: new node
- None => true,
- // Case 2: the node might have changed address
- Some(our_node) => node.is_up && !our_node.is_up() && our_node.addr != node.addr,
- };
- max_replication_factor = std::cmp::max(
- max_replication_factor,
- node.state_info.replication_factor.unwrap_or_default(),
- );
- if ping_them {
- to_ping.push((node.addr, Some(node.id)));
- }
- }
- }
-
- if self.replication_factor < max_replication_factor {
- error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs",
- max_replication_factor,
- self.replication_factor);
- std::process::exit(1);
- }
- if has_changed {
- status.recalculate_hash();
- }
- self.update_status(&update_lock, status).await;
- drop(update_lock);
-
- if !to_ping.is_empty() {
- self.background
- .spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok));
- }
-
- Ok(Message::Ok)
- }
-
- async fn handle_advertise_config(
- self: Arc<Self>,
- adv: &NetworkConfig,
- ) -> Result<Message, Error> {
- let update_lock = self.update_lock.lock().await;
- let ring: Arc<Ring> = self.ring.borrow().clone();
-
- if adv.version > ring.config.version {
- let ring = Ring::new(adv.clone(), self.replication_factor);
- update_lock.update_ring.send(Arc::new(ring))?;
- drop(update_lock);
-
- self.background.spawn_cancellable(
- self.clone()
- .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
- .map(Ok),
- );
- self.background.spawn(self.clone().save_network_config());
- }
-
- Ok(Message::Ok)
- }
-
- async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
- while !*stop_signal.borrow() {
- let restart_at = tokio::time::sleep(PING_INTERVAL);
-
- let status = self.status.borrow().clone();
- let ping_addrs = status
- .nodes
- .iter()
- .filter(|(id, _)| **id != self.id)
- .map(|(id, status)| (status.addr, Some(*id)))
- .collect::<Vec<_>>();
-
- self.clone().ping_nodes(ping_addrs).await;
-
- select! {
- _ = restart_at.fuse() => {},
- _ = stop_signal.changed().fuse() => {},
- }
- }
- }
-
- async fn discovery_loop(
- self: Arc<Self>,
- bootstrap_peers: Vec<SocketAddr>,
- consul_host: Option<String>,
- consul_service_name: Option<String>,
- mut stop_signal: watch::Receiver<bool>,
- ) {
- let consul_config = match (consul_host, consul_service_name) {
- (Some(ch), Some(csn)) => Some((ch, csn)),
- _ => None,
- };
-
- while !*stop_signal.borrow() {
- let not_configured = self.ring.borrow().config.members.is_empty();
- let no_peers = self.status.borrow().nodes.len() < 3;
- let bad_peers = self
- .status
- .borrow()
- .nodes
- .iter()
- .filter(|(_, v)| v.is_up())
- .count() != self.ring.borrow().config.members.len();
-
- if not_configured || no_peers || bad_peers {
- info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
-
- let mut ping_list = bootstrap_peers
- .iter()
- .map(|ip| (*ip, None))
- .collect::<Vec<_>>();
-
- if let Ok(peers) = self.persist_status.load_async().await {
- ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
- }
-
- if let Some((consul_host, consul_service_name)) = &consul_config {
- match get_consul_nodes(consul_host, consul_service_name).await {
- Ok(node_list) => {
- ping_list.extend(node_list.iter().map(|a| (*a, None)));
- }
- Err(e) => {
- warn!("Could not retrieve node list from Consul: {}", e);
- }
- }
- }
-
- self.clone().ping_nodes(ping_list).await;
- }
-
- let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
- select! {
- _ = restart_at.fuse() => {},
- _ = stop_signal.changed().fuse() => {},
- }
- }
- }
-
- // for some reason fixing this is causing compilation error, see https://github.com/rust-lang/rust-clippy/issues/7052
- #[allow(clippy::manual_async_fn)]
- fn pull_status(
- self: Arc<Self>,
- peer: Uuid,
- ) -> impl futures::future::Future<Output = ()> + Send + 'static {
- async move {
- let resp = self
- .rpc_client
- .call(peer, Message::PullStatus, PING_TIMEOUT)
- .await;
- if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
- let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await;
- }
- }
- }
-
- async fn pull_config(self: Arc<Self>, peer: Uuid) {
- let resp = self
- .rpc_client
- .call(peer, Message::PullConfig, PING_TIMEOUT)
- .await;
- if let Ok(Message::AdvertiseConfig(config)) = resp {
- let _: Result<_, _> = self.handle_advertise_config(&config).await;
- }
- }
-
- async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
- if status.hash != self.status.borrow().hash {
- let mut list = status.to_serializable_membership(&self);
-
- // Combine with old peer list to make sure no peer is lost
- if let Ok(old_list) = self.persist_status.load_async().await {
- for pp in old_list {
- if !list.iter().any(|np| pp.id == np.id) {
- list.push(pp);
- }
- }
- }
-
- if !list.is_empty() {
- info!("Persisting new peer list ({} peers)", list.len());
- self.persist_status
- .save_async(&list)
- .await
- .expect("Unable to persist peer list");
- }
- }
-
- updaters
- .update_status
- .send(Arc::new(status))
- .expect("Could not update internal membership status");
- }
-}